File indexing completed on 2026-04-10 08:38:57
0001 import datetime
0002 import json
0003 import os
0004 import re
0005 import socket
0006 import sys
0007 import time
0008 import traceback
0009 from typing import Any
0010
0011 from packaging import version
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013
0014 from pandajedi.jedicore import Interaction
0015 from pandajedi.jediddm.DDMInterface import DDMInterface
0016 from pandaserver.brokerage.SiteMapper import SiteMapper
0017 from pandaserver.dataservice import DataServiceUtils
0018 from pandaserver.dataservice.DataServiceUtils import select_scope
0019 from pandaserver.taskbuffer import JobUtils, ProcessGroups, SiteSpec
0020
0021
0022
0023 def getNucleiWithData(siteMapper, ddmIF, datasetName, candidateNuclei, deepScan=False):
0024
0025 try:
0026 replica_map = ddmIF.listReplicasPerDataset(datasetName, deepScan)
0027 except Exception:
0028 errtype, errvalue = sys.exc_info()[:2]
0029 return errtype, f"ddmIF.listReplicasPerDataset failed with {errvalue}", None
0030
0031 remote_source_available = False
0032 for tmp_replica_data in replica_map.values():
0033 for tmp_location, tmp_stat_data in tmp_replica_data.items():
0034 if tmp_stat_data[0]["total"] in [None, 0]:
0035 continue
0036 if tmp_stat_data[0]["total"] != tmp_stat_data[0]["found"]:
0037 continue
0038 if siteMapper.is_readable_remotely(tmp_location):
0039 remote_source_available = True
0040 break
0041
0042 return_map = {}
0043 for tmpNucleus in candidateNuclei:
0044 tmpNucleusSpec = siteMapper.getNucleus(tmpNucleus)
0045
0046 totalNum = 0
0047 totalSize = 0
0048 avaNumDisk = 0
0049 avaNumAny = 0
0050 avaSizeDisk = 0
0051 avaSizeAny = 0
0052 can_be_remote_source = False
0053 for tmpDataset, tmpRepMap in replica_map.items():
0054 tmpTotalNum = 0
0055 tmpTotalSize = 0
0056 tmpAvaNumDisk = 0
0057 tmpAvaNumAny = 0
0058 tmpAvaSizeDisk = 0
0059 tmpAvaSizeAny = 0
0060
0061 for tmpLoc, locData in tmpRepMap.items():
0062
0063 if tmpTotalNum == 0:
0064 tmpTotalNum = locData[0]["total"]
0065 tmpTotalSize = locData[0]["tsize"]
0066
0067 if tmpNucleusSpec.is_associated_for_input(tmpLoc):
0068
0069 if siteMapper.is_readable_remotely(tmpLoc):
0070 can_be_remote_source = True
0071
0072 tmpEndpoint = tmpNucleusSpec.getEndpoint(tmpLoc)
0073 tmpAvaNum = locData[0]["found"]
0074 tmpAvaSize = locData[0]["asize"]
0075
0076 if tmpEndpoint["is_tape"] != "Y":
0077
0078 if tmpTotalNum == tmpAvaNum and tmpTotalNum > 0:
0079 tmpAvaNumDisk = tmpAvaNum
0080 tmpAvaNumAny = tmpAvaNum
0081 tmpAvaSizeDisk = tmpAvaSize
0082 tmpAvaSizeAny = tmpAvaSize
0083 break
0084 if tmpAvaNum > tmpAvaNumDisk:
0085 tmpAvaNumDisk = tmpAvaNum
0086 tmpAvaSizeDisk = tmpAvaSize
0087
0088 if tmpAvaNumAny < tmpAvaNum:
0089 tmpAvaNumAny = tmpAvaNum
0090 tmpAvaSizeAny = tmpAvaSize
0091
0092 totalNum = tmpTotalNum
0093 totalSize = tmpTotalSize
0094 avaNumDisk += tmpAvaNumDisk
0095 avaNumAny += tmpAvaNumAny
0096 avaSizeDisk += tmpAvaSizeDisk
0097 avaSizeAny += tmpAvaSizeAny
0098
0099 if tmpNucleus in candidateNuclei or avaNumAny > 0:
0100 return_map[tmpNucleus] = {
0101 "tot_num": totalNum,
0102 "tot_size": totalSize,
0103 "ava_num_disk": avaNumDisk,
0104 "ava_num_any": avaNumAny,
0105 "ava_size_disk": avaSizeDisk,
0106 "ava_size_any": avaSizeAny,
0107 "can_be_remote_source": can_be_remote_source,
0108 }
0109
0110 return Interaction.SC_SUCCEEDED, return_map, remote_source_available
0111
0112
0113
0114 def get_sites_with_data(
0115 site_list: list,
0116 site_mapper: SiteMapper,
0117 ddm_if: DDMInterface,
0118 dataset_name: str,
0119 element_list: list,
0120 max_missing_input_files: int,
0121 min_input_completeness: int,
0122 ) -> tuple[Any, dict | str, bool | None, bool | None, bool | None, bool | None, bool | None, list]:
0123 """
0124 Get sites where data is available and check if complete replica is available at online RSE
0125 1) regarded_as_complete_disk: True if a replica is regarded as complete at disk (missing files within threshold)
0126 2) complete_tape: True if a complete replica is available at tape
0127 3) truly_complete_disk: True if a truly complete replica is available at disk (no missing files)
0128 4) can_be_local_source: True if the site can read the replica locally over LAN
0129 5) can_be_remote_source: True if the site can read the replica remotely over WAN
0130 6) list_of_complete_replica_locations : list of RSEs where truly complete replica is available at disk
0131 Note that VP replicas are not taken into account for the above flags
0132
0133 :param site_list: list of site names to be checked
0134 :param site_mapper: SiteMapper object
0135 :param ddm_if: DDMInterface object
0136 :param dataset_name: dataset name
0137 :param element_list: list of constituent datasets
0138 :param max_missing_input_files: maximum number of missing files to be regarded as complete
0139 :param min_input_completeness: minimum completeness (%) to be regarded as complete
0140
0141 :return: tuple of (status code or exception type, dict of sites with data availability info, regarded_as_complete_disk, complete_tape, truly_complete_disk, can_be_local_source, can_be_remote_source, list_of_complete_replica_locations)
0142 """
0143
0144 try:
0145 replica_map = {
0146 dataset_name: ddm_if.listDatasetReplicas(dataset_name, use_vp=True, skip_incomplete_element=True, element_list=element_list, use_deep=True)
0147 }
0148 except Exception:
0149 errtype, errvalue = sys.exc_info()[:2]
0150 return errtype, f"ddmIF.listDatasetReplicas failed with {errvalue}", None, None, None, None, None, []
0151
0152
0153 is_tape = {}
0154 replica_availability_info = {}
0155 list_of_complete_replica_locations = []
0156 for tmp_rse, tmp_data_list in replica_map[dataset_name].items():
0157
0158 try:
0159 is_tape[tmp_rse] = ddm_if.getSiteProperty(tmp_rse, "is_tape")
0160 except Exception:
0161 is_tape[tmp_rse] = None
0162
0163 tmp_regarded_as_complete_disk = False
0164 tmp_truly_complete_disk = False
0165 tmp_complete_tape = False
0166 tmp_can_be_local_source = False
0167 tmp_can_be_remote_source = False
0168 for tmp_data in tmp_data_list:
0169 if not tmp_data.get("vp"):
0170 if tmp_data["found"] == tmp_data["total"]:
0171
0172 truly_complete = True
0173 elif (
0174 tmp_data["total"]
0175 and tmp_data["found"]
0176 and (
0177 tmp_data["total"] - tmp_data["found"] <= max_missing_input_files
0178 or tmp_data["found"] / tmp_data["total"] * 100 >= min_input_completeness
0179 )
0180 ):
0181
0182 truly_complete = False
0183 else:
0184 continue
0185 list_of_complete_replica_locations.append(tmp_rse)
0186 if is_tape[tmp_rse]:
0187 tmp_complete_tape = True
0188 else:
0189 tmp_regarded_as_complete_disk = True
0190 if truly_complete:
0191 tmp_truly_complete_disk = True
0192
0193 if site_mapper.is_readable_locally(tmp_rse):
0194 tmp_can_be_local_source = True
0195
0196 if site_mapper.is_readable_remotely(tmp_rse):
0197 tmp_can_be_remote_source = True
0198 replica_availability_info[tmp_rse] = {
0199 "regarded_as_complete_disk": tmp_regarded_as_complete_disk,
0200 "truly_complete_disk": tmp_truly_complete_disk,
0201 "complete_tape": tmp_complete_tape,
0202 "can_be_local_source": tmp_can_be_local_source,
0203 "can_be_remote_source": tmp_can_be_remote_source,
0204 }
0205
0206
0207 regarded_as_complete_disk = False
0208 truly_complete_disk = False
0209 complete_tape = False
0210 can_be_local_source = False
0211 can_be_remote_source = False
0212 return_map = {}
0213 if not site_list:
0214
0215 site_list = [None]
0216 for tmp_site_name in site_list:
0217 if not site_mapper.checkSite(tmp_site_name):
0218 continue
0219
0220 tmp_site_spec = site_mapper.getSite(tmp_site_name)
0221 scope_input, scope_output = select_scope(tmp_site_spec, JobUtils.ANALY_PS, JobUtils.ANALY_PS)
0222 try:
0223 input_endpoints = tmp_site_spec.ddm_endpoints_input[scope_input].all.keys()
0224 except Exception:
0225 input_endpoints = {}
0226
0227 for tmp_rse in input_endpoints:
0228 if tmp_rse in replica_map[dataset_name]:
0229
0230 tmp_statistics = replica_map[dataset_name][tmp_rse][-1]
0231 if tmp_statistics["found"] is None:
0232 tmp_dataset_completeness = "unknown"
0233
0234 pass
0235 elif tmp_statistics["total"] == tmp_statistics["found"]:
0236 tmp_dataset_completeness = "complete"
0237 else:
0238 tmp_dataset_completeness = "incomplete"
0239
0240 if tmp_site_name not in return_map:
0241 return_map[tmp_site_name] = {}
0242 return_map[tmp_site_name][tmp_rse] = {"tape": is_tape[tmp_rse], "state": tmp_dataset_completeness}
0243 if "vp" in tmp_statistics:
0244 return_map[tmp_site_name][tmp_rse]["vp"] = tmp_statistics["vp"]
0245
0246 for tmp_rse in replica_availability_info:
0247 if replica_availability_info[tmp_rse]["can_be_local_source"] or replica_availability_info[tmp_rse]["can_be_remote_source"]:
0248 if replica_availability_info[tmp_rse]["regarded_as_complete_disk"]:
0249 regarded_as_complete_disk = True
0250 if replica_availability_info[tmp_rse]["truly_complete_disk"]:
0251 truly_complete_disk = True
0252 if replica_availability_info[tmp_rse]["complete_tape"]:
0253 complete_tape = True
0254 if replica_availability_info[tmp_rse]["can_be_local_source"]:
0255 can_be_local_source = True
0256 if replica_availability_info[tmp_rse]["can_be_remote_source"]:
0257 can_be_remote_source = True
0258
0259 return (
0260 Interaction.SC_SUCCEEDED,
0261 return_map,
0262 regarded_as_complete_disk,
0263 complete_tape,
0264 truly_complete_disk,
0265 can_be_local_source,
0266 can_be_remote_source,
0267 list_of_complete_replica_locations,
0268 )
0269
0270
0271
0272 def getAnalSitesWithDataDisk(dataSiteMap, includeTape=False, use_vp=True, use_incomplete=False):
0273 sites_with_complete_replicas = []
0274 sites_with_incomplete_replicas = []
0275 sites_with_non_vp_disk_replicas = set()
0276 sites_using_vp = set()
0277 for tmp_site_name, tmpSeValMap in dataSiteMap.items():
0278 for tmpSE, tmpValMap in tmpSeValMap.items():
0279
0280 if tmpValMap.get("vp"):
0281 sites_using_vp.add(tmp_site_name)
0282 if not use_vp:
0283 continue
0284
0285 if includeTape or not tmpValMap["tape"]:
0286
0287 if tmpValMap["state"] == "complete":
0288
0289 if tmp_site_name not in sites_with_complete_replicas:
0290 sites_with_complete_replicas.append(tmp_site_name)
0291
0292 if not tmpValMap["tape"] and not tmpValMap.get("vp"):
0293 sites_with_non_vp_disk_replicas.add(tmp_site_name)
0294 else:
0295
0296 if tmp_site_name not in sites_with_incomplete_replicas:
0297 sites_with_incomplete_replicas.append(tmp_site_name)
0298
0299 if sites_with_non_vp_disk_replicas:
0300 for tmpSiteNameVP in sites_using_vp:
0301 if tmpSiteNameVP in sites_with_complete_replicas:
0302 sites_with_complete_replicas.remove(tmpSiteNameVP)
0303 if tmpSiteNameVP in sites_with_incomplete_replicas:
0304 sites_with_incomplete_replicas.remove(tmpSiteNameVP)
0305
0306 if sites_with_complete_replicas:
0307 if not use_incomplete or sites_with_non_vp_disk_replicas:
0308 return sites_with_complete_replicas
0309 else:
0310 return sites_with_complete_replicas + sites_with_incomplete_replicas
0311
0312 return sites_with_incomplete_replicas
0313
0314
0315
0316 def getNumJobs(jobStatMap, computingSite, jobStatus, cloud=None, workQueue_tag=None):
0317 if computingSite not in jobStatMap:
0318 return 0
0319 nJobs = 0
0320
0321 for tmpWorkQueue, tmpWorkQueueVal in jobStatMap[computingSite].items():
0322
0323 if workQueue_tag is not None and workQueue_tag != tmpWorkQueue:
0324 continue
0325
0326 for tmpJobStatus, tmpCount in tmpWorkQueueVal.items():
0327 if tmpJobStatus == jobStatus:
0328 nJobs += tmpCount
0329
0330 return nJobs
0331
0332
0333 def get_total_nq_nr_ratio(job_stat_map, work_queue_tag=None):
0334 """
0335 Get the ratio of number of queued jobs to number of running jobs
0336 """
0337 nRunning = 0
0338 nQueue = 0
0339
0340
0341 for siteVal in job_stat_map.values():
0342 for tmpWorkQueue in siteVal:
0343
0344 if work_queue_tag is not None and work_queue_tag != tmpWorkQueue:
0345 continue
0346 tmpWorkQueueVal = siteVal[tmpWorkQueue]
0347
0348 for tmpJobStatus in ["defined", "assigned", "activated", "starting"]:
0349 if tmpJobStatus in tmpWorkQueueVal:
0350 nQueue += tmpWorkQueueVal[tmpJobStatus]
0351 if "running" in tmpWorkQueueVal:
0352 nRunning += tmpWorkQueueVal["running"]
0353 try:
0354 ratio = float(nQueue) / float(nRunning)
0355 except Exception:
0356 ratio = None
0357
0358 return ratio
0359
0360
0361 def hasZeroShare(site_spec, task_spec, ignore_priority, tmp_log):
0362 """
0363 Check if the site has a zero share for the given task. Zero share means there is a policy preventing the site to be used for the task.
0364
0365 :param site_spec: SiteSpec object describing the site being checked in brokerage
0366 :param task_spec: TaskSpec object describing the task being brokered
0367 :param ignore_priority: Merging job chunks will skip the priority check
0368 :param tmp_log: Logger object
0369
0370 :return: False means there is no policy defined and the site can be used.
0371 True means the site has a fair share policy that prevents the site to be used for the task
0372 """
0373
0374
0375 if site_spec.fairsharePolicy in ["", None]:
0376 return False
0377
0378 try:
0379
0380 processing_group = ProcessGroups.getProcessGroup(task_spec.processingType)
0381
0382
0383 if processing_group in ["test"]:
0384 return False
0385
0386
0387 for policy in site_spec.fairsharePolicy.split(","):
0388
0389
0390
0391
0392
0393 tmp_processing_type = None
0394 tmp_working_group = None
0395 tmp_priority = None
0396 tmp_gshare = None
0397 tmp_fair_share = policy.split(":")[-1]
0398
0399
0400 for tmp_field in policy.split(":"):
0401 if tmp_field.startswith("type="):
0402 tmp_processing_type = tmp_field.split("=")[-1]
0403 elif tmp_field.startswith("group="):
0404 tmp_working_group = tmp_field.split("=")[-1]
0405 elif tmp_field.startswith("gshare="):
0406 tmp_gshare = tmp_field.split("=")[-1]
0407 elif tmp_field.startswith("priority"):
0408 tmp_priority = re.sub("priority", "", tmp_field)
0409
0410
0411 if tmp_processing_type not in ["any", None]:
0412 if "*" in tmp_processing_type:
0413 tmp_processing_type = tmp_processing_type.replace("*", ".*")
0414
0415
0416 if re.search(f"^{tmp_processing_type}$", processing_group) is None:
0417 continue
0418
0419
0420 if tmp_working_group not in ["any", None]:
0421
0422 task_working_group = task_spec.workingGroup or ""
0423 if "*" in tmp_working_group:
0424 tmp_working_group = tmp_working_group.replace("*", ".*")
0425
0426
0427 if re.search(f"^{tmp_working_group}$", task_working_group) is None:
0428 continue
0429
0430
0431
0432 if tmp_gshare not in ["any", None] and task_spec.gshare is not None:
0433
0434 task_gshare = task_spec.gshare or ""
0435 if "*" in tmp_gshare:
0436 tmp_gshare = tmp_gshare.replace("*", ".*")
0437
0438
0439 if re.search(f"^{tmp_gshare}$", task_gshare) is None:
0440 continue
0441
0442
0443 if tmp_priority is not None and not ignore_priority:
0444 try:
0445 exec(f"tmpStat = {task_spec.currentPriority}{tmp_priority}", globals())
0446 tmp_log.debug(
0447 f"Priority check for {site_spec.sitename}, {task_spec.currentPriority}): " f"{task_spec.currentPriority}{tmp_priority} = {tmpStat}"
0448 )
0449 if not tmpStat:
0450 continue
0451 except Exception:
0452 error_type, error_value = sys.exc_info()[:2]
0453 tmp_log.error(f"Priority check for {site_spec.sitename} failed with {error_type}:{error_value}")
0454
0455
0456
0457
0458 if tmp_fair_share in ["0", "0%"]:
0459 return True
0460 else:
0461 return False
0462
0463 except Exception:
0464 error_type, error_value = sys.exc_info()[:2]
0465 tmp_log.error(f"hasZeroShare failed with {error_type}:{error_value}")
0466
0467
0468 return False
0469
0470
0471
0472 def isMatched(siteName, nameList):
0473 for tmpName in nameList:
0474
0475 if tmpName == "":
0476 continue
0477
0478 if "*" in tmpName:
0479 tmpName = tmpName.replace("*", ".*")
0480 if re.search(tmpName, siteName) is not None:
0481 return True
0482 else:
0483
0484 if tmpName == siteName:
0485 return True
0486
0487 return False
0488
0489
0490
0491 def getDictToSetNucleus(nucleusSpec, tmpDatasetSpecs):
0492
0493 return_map = {"datasets": [], "nucleus": nucleusSpec.name}
0494 for datasetSpec in tmpDatasetSpecs:
0495
0496 if DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None:
0497 continue
0498
0499 endPoint = nucleusSpec.getAssociatedEndpoint(datasetSpec.storageToken)
0500 if endPoint is None and not nucleusSpec.is_nucleus() and nucleusSpec.get_default_endpoint_out():
0501
0502 endPoint = nucleusSpec.get_default_endpoint_out()
0503 if endPoint is None:
0504 continue
0505 token = endPoint["ddm_endpoint_name"]
0506
0507 if datasetSpec.storageToken not in ["", None]:
0508 token += f"/{datasetSpec.storageToken.split('/')[-1]}"
0509 return_map["datasets"].append({"datasetID": datasetSpec.datasetID, "token": f"dst:{token}", "destination": f"nucleus:{nucleusSpec.name}"})
0510 return return_map
0511
0512
0513
0514 def skipProblematicSites(candidateSpecList, ngSites, sitesUsedByTask, preSetSiteSpec, maxNumSites, tmpLog):
0515 skippedSites = set()
0516 usedSitesGood = []
0517 newSitesGood = []
0518
0519 for candidateSpec in candidateSpecList:
0520
0521 if (candidateSpec.siteName in ngSites or candidateSpec.unifiedName in ngSites) and (
0522 preSetSiteSpec is None or candidateSpec.siteName != preSetSiteSpec.siteName
0523 ):
0524 skippedSites.add(candidateSpec.siteName)
0525 else:
0526 if candidateSpec.siteName in sitesUsedByTask or candidateSpec.unifiedName in sitesUsedByTask:
0527 usedSitesGood.append(candidateSpec)
0528 else:
0529 newSitesGood.append(candidateSpec)
0530
0531 if maxNumSites in [0, None]:
0532 maxNumSites = len(candidateSpecList)
0533 newcandidateSpecList = usedSitesGood + newSitesGood
0534 newcandidateSpecList = newcandidateSpecList[:maxNumSites]
0535
0536 for skippedSite in skippedSites:
0537 tmpLog.debug(f"getting rid of problematic site {skippedSite}")
0538 return newcandidateSpecList
0539
0540
0541
0542 def getSiteInputStorageEndpointMap(site_list, site_mapper, prod_source_label, job_label):
0543
0544 ret_map = {}
0545 for site_name in site_list:
0546 tmp_site_spec = site_mapper.getSite(site_name)
0547 scope_input, scope_output = select_scope(tmp_site_spec, prod_source_label, job_label)
0548
0549
0550 if scope_input not in tmp_site_spec.ddm_endpoints_input:
0551 continue
0552
0553
0554 ret_map[site_name] = list(tmp_site_spec.ddm_endpoints_input[scope_input].all.keys())
0555
0556 return ret_map
0557
0558
0559
0560 CACHE_SiteToRunRateStats = {}
0561
0562
0563 def getSiteToRunRateStats(tbIF, vo, time_window=21600, cutoff=300, cache_lifetime=600):
0564
0565 ret_val = False
0566 ret_map = {}
0567
0568 dc_main_key = "AtlasSites"
0569 dc_sub_key = "SiteToRunRate"
0570
0571 this_prodsourcelabel = "user"
0572 this_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-broker"
0573 this_component = "Cache.SiteToRunRate"
0574
0575 current_time = naive_utcnow()
0576 starttime_max = current_time - datetime.timedelta(seconds=cutoff)
0577 starttime_min = current_time - datetime.timedelta(seconds=time_window)
0578
0579 starttime_max_rounded = starttime_max.replace(minute=starttime_max.minute // 10 * 10, second=0, microsecond=0)
0580 starttime_min_rounded = starttime_min.replace(minute=starttime_min.minute // 10 * 10, second=0, microsecond=0)
0581 real_interval_hours = (starttime_max_rounded - starttime_min_rounded).total_seconds() / 3600
0582
0583 local_cache_key = (starttime_min_rounded, starttime_max_rounded)
0584
0585 if local_cache_key in CACHE_SiteToRunRateStats and current_time <= CACHE_SiteToRunRateStats[local_cache_key]["exp"]:
0586
0587 ret_val = True
0588 ret_map = CACHE_SiteToRunRateStats[local_cache_key]["data"]
0589 else:
0590
0591 for _ in range(99):
0592
0593 if naive_utcnow() - current_time > datetime.timedelta(seconds=min(10, cache_lifetime / 4)):
0594
0595 break
0596 try:
0597
0598 cache_spec = tbIF.getCache_JEDI(main_key=dc_main_key, sub_key=dc_sub_key)
0599 if cache_spec is not None:
0600 expired_time = cache_spec.last_update + datetime.timedelta(seconds=cache_lifetime)
0601 if current_time <= expired_time:
0602
0603 ret_val = True
0604 ret_map = json.loads(cache_spec.data)
0605
0606 CACHE_SiteToRunRateStats[local_cache_key] = {"exp": expired_time, "data": ret_map}
0607
0608 break
0609
0610 got_lock = tbIF.lockProcess_JEDI(
0611 vo=vo,
0612 prodSourceLabel=this_prodsourcelabel,
0613 cloud=None,
0614 workqueue_id=None,
0615 resource_name=None,
0616 component=this_component,
0617 pid=this_pid,
0618 timeLimit=5,
0619 )
0620 if not got_lock:
0621
0622 time.sleep(1)
0623 continue
0624
0625 ret_val, ret_map = tbIF.getSiteToRunRateStats(vo=vo, exclude_rwq=False, starttime_min=starttime_min, starttime_max=starttime_max)
0626 if ret_val:
0627
0628 expired_time = current_time + datetime.timedelta(seconds=cache_lifetime)
0629
0630 CACHE_SiteToRunRateStats[local_cache_key] = {"exp": expired_time, "data": ret_map}
0631
0632 data_json = json.dumps(ret_map)
0633
0634 tbIF.updateCache_JEDI(main_key=dc_main_key, sub_key=dc_sub_key, data=data_json)
0635
0636 tbIF.unlockProcess_JEDI(
0637 vo=vo, prodSourceLabel=this_prodsourcelabel, cloud=None, workqueue_id=None, resource_name=None, component=this_component, pid=this_pid
0638 )
0639
0640 break
0641 except Exception as e:
0642
0643 err_str = f"AtlasBrokerUtils.getSiteToRunRateStats got {e.__class__.__name__}: {e} \n"
0644 sys.stderr.write(err_str)
0645
0646 break
0647
0648 for lc_key in list(CACHE_SiteToRunRateStats.keys()):
0649 lc_time_min, lc_time_max = lc_key
0650 if lc_time_max < starttime_max_rounded - datetime.timedelta(seconds=cache_lifetime):
0651 try:
0652 del CACHE_SiteToRunRateStats[lc_key]
0653 except Exception as e:
0654 err_str = f"AtlasBrokerUtils.getSiteToRunRateStats when deleting outdated entries got {e.__class__.__name__}: {e} \n"
0655 sys.stderr.write(err_str)
0656
0657 return ret_val, ret_map
0658
0659
0660
0661 CACHE_UsersJobsStats = {}
0662
0663
0664 def getUsersJobsStats(tbIF, vo, prod_source_label, cache_lifetime=60):
0665
0666 ret_val = False
0667 ret_map = {}
0668
0669 dc_main_key = "AtlasSites"
0670 dc_sub_key = "UsersJobsStats"
0671
0672 this_prodsourcelabel = prod_source_label
0673 this_pid = this_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-broker"
0674 this_component = "Cache.UsersJobsStats"
0675
0676 local_cache_key = "_main"
0677
0678 current_time = naive_utcnow()
0679
0680 if local_cache_key in CACHE_UsersJobsStats and current_time <= CACHE_UsersJobsStats[local_cache_key]["exp"]:
0681
0682 ret_val = True
0683 ret_map = CACHE_UsersJobsStats[local_cache_key]["data"]
0684 else:
0685
0686 for _ in range(99):
0687
0688 if naive_utcnow() - current_time > datetime.timedelta(seconds=min(15, cache_lifetime / 4)):
0689
0690 break
0691 try:
0692
0693 cache_spec = tbIF.getCache_JEDI(main_key=dc_main_key, sub_key=dc_sub_key)
0694 if cache_spec is not None:
0695 expired_time = cache_spec.last_update + datetime.timedelta(seconds=cache_lifetime)
0696 if current_time <= expired_time:
0697
0698 ret_val = True
0699 ret_map = json.loads(cache_spec.data)
0700
0701 CACHE_UsersJobsStats[local_cache_key] = {"exp": expired_time, "data": ret_map}
0702
0703 break
0704
0705 got_lock = tbIF.lockProcess_JEDI(
0706 vo=vo,
0707 prodSourceLabel=this_prodsourcelabel,
0708 cloud=None,
0709 workqueue_id=None,
0710 resource_name=None,
0711 component=this_component,
0712 pid=this_pid,
0713 timeLimit=(cache_lifetime * 0.75 / 60),
0714 )
0715 if not got_lock:
0716
0717 time.sleep(1)
0718 continue
0719
0720 ret_map = tbIF.getUsersJobsStats_JEDI(prod_source_label=this_prodsourcelabel)
0721 if ret_map is not None:
0722
0723 expired_time = current_time + datetime.timedelta(seconds=cache_lifetime)
0724
0725 CACHE_UsersJobsStats[local_cache_key] = {"exp": expired_time, "data": ret_map}
0726
0727 data_json = json.dumps(ret_map)
0728
0729 tbIF.updateCache_JEDI(main_key=dc_main_key, sub_key=dc_sub_key, data=data_json)
0730
0731 ret_val = True
0732
0733 tbIF.unlockProcess_JEDI(
0734 vo=vo, prodSourceLabel=this_prodsourcelabel, cloud=None, workqueue_id=None, resource_name=None, component=this_component, pid=this_pid
0735 )
0736
0737 break
0738 except Exception as e:
0739
0740 err_str = f"AtlasBrokerUtils.getUsersJobsStats got {e.__class__.__name__}: {e} \n"
0741 sys.stderr.write(err_str)
0742
0743 break
0744
0745 return ret_val, ret_map
0746
0747
0748
0749 def getGShareUsage(tbIF, gshare, fresher_than_minutes_ago=15):
0750
0751 ret_val = False
0752 ret_map = {}
0753
0754 current_time = naive_utcnow()
0755
0756 for _ in range(99):
0757 now_time = naive_utcnow()
0758
0759 if now_time - current_time > datetime.timedelta(seconds=max(3, fresher_than_minutes_ago / 3)):
0760
0761 break
0762 try:
0763
0764 sql_get_gshare = "SELECT m.value_json FROM ATLAS_PANDA.Metrics m WHERE m.metric=:metric AND m.gshare=:gshare AND m.timestamp>=:min_timestamp "
0765 var_map = {
0766 ":metric": "gshare_preference",
0767 ":gshare": gshare,
0768 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0769 }
0770
0771 res = tbIF.querySQL(sql_get_gshare, var_map)
0772 if res:
0773 value_json = res[0][0]
0774
0775 ret_map = json.loads(value_json)
0776
0777 ret_val = True
0778
0779 break
0780 except Exception as e:
0781
0782 err_str = f"AtlasBrokerUtils.getGShareUsage got {e.__class__.__name__}: {e} \n"
0783 sys.stderr.write(err_str)
0784
0785 break
0786
0787 return ret_val, ret_map
0788
0789
0790
0791 def getUserEval(tbIF, user, fresher_than_minutes_ago=20):
0792
0793 ret_val = False
0794 ret_map = {}
0795
0796 current_time = naive_utcnow()
0797
0798 for _ in range(99):
0799 now_time = naive_utcnow()
0800
0801 if now_time - current_time > datetime.timedelta(seconds=max(3, fresher_than_minutes_ago / 3)):
0802
0803 break
0804 try:
0805
0806 sql_get_user_eval = f'SELECT m.value_json."{user}" FROM ATLAS_PANDA.Metrics m WHERE m.metric=:metric AND m.timestamp>=:min_timestamp '
0807 var_map = {
0808 ":metric": "analy_user_eval",
0809 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0810 }
0811
0812 res = tbIF.querySQL(sql_get_user_eval, var_map)
0813 if res:
0814 value_json = res[0][0]
0815
0816 ret_map = json.loads(value_json) if value_json else None
0817
0818 ret_val = True
0819
0820 break
0821 except Exception as e:
0822
0823 err_str = f"AtlasBrokerUtils.getUserEval got {e.__class__.__name__}: {e} \n"
0824 sys.stderr.write(err_str)
0825
0826 break
0827
0828 return ret_val, ret_map
0829
0830
0831
0832 def getUserTaskEval(tbIF, taskID, fresher_than_minutes_ago=15):
0833
0834 ret_val = False
0835 ret_map = {}
0836
0837 current_time = naive_utcnow()
0838
0839 for _ in range(99):
0840 now_time = naive_utcnow()
0841
0842 if now_time - current_time > datetime.timedelta(seconds=max(3, fresher_than_minutes_ago / 3)):
0843
0844 break
0845 try:
0846
0847 sql_get_task_eval = (
0848 "SELECT tev.value_json "
0849 "FROM ATLAS_PANDA.Task_Evaluation tev "
0850 "WHERE tev.metric=:metric "
0851 "AND tev.jediTaskID=:taskID "
0852 "AND tev.timestamp>=:min_timestamp "
0853 )
0854 var_map = {
0855 ":metric": "analy_task_eval",
0856 ":taskID": taskID,
0857 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0858 }
0859
0860 res = tbIF.querySQL(sql_get_task_eval, var_map)
0861 if res:
0862 value_json = res[0][0]
0863
0864 ret_map = json.loads(value_json) if value_json else None
0865
0866 ret_val = True
0867
0868 break
0869 except Exception as e:
0870
0871 err_str = f"AtlasBrokerUtils.getUserTaskEval got {e.__class__.__name__}: {e} \n"
0872 sys.stderr.write(err_str)
0873
0874 break
0875
0876 return ret_val, ret_map
0877
0878
0879
0880 def getAnalySitesClass(tbIF, fresher_than_minutes_ago=60):
0881
0882 ret_val = False
0883 ret_map = {}
0884
0885 current_time = naive_utcnow()
0886
0887 for _ in range(99):
0888 now_time = naive_utcnow()
0889
0890 if now_time - current_time > datetime.timedelta(seconds=max(3, fresher_than_minutes_ago / 3)):
0891
0892 break
0893 try:
0894
0895 sql_get_task_eval = "SELECT m.computingSite, m.value_json.class FROM ATLAS_PANDA.Metrics m WHERE m.metric=:metric AND m.timestamp>=:min_timestamp "
0896 var_map = {
0897 ":metric": "analy_site_eval",
0898 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0899 }
0900 res = tbIF.querySQL(sql_get_task_eval, var_map)
0901 if res:
0902 for site, class_value in res:
0903 ret_map[site] = int(class_value)
0904
0905 ret_val = True
0906
0907 break
0908 except Exception as e:
0909
0910 err_str = f"AtlasBrokerUtils.getAnalySitesEval got {e.__class__.__name__}: {e} \n"
0911 sys.stderr.write(err_str)
0912
0913 break
0914
0915 return ret_val, ret_map
0916
0917
0918 def compare_version_string(version_string, comparison_string):
0919 """
0920 Compares a version string with another string composed of a comparison operator and a version string.
0921
0922 Args:
0923 version_string (str): The version string to compare.
0924 comparison_string (str): The string containing the comparison operator and version string (e.g., ">=2.0").
0925
0926 Returns:
0927 bool or None: True if the version string satisfies the comparison, False if it doesn't,
0928 or None if the comparison string is invalid.
0929 """
0930 match = re.match(r"([=><]+)(.+)", comparison_string)
0931 if not match:
0932 return None
0933
0934 operator = match.group(1).strip()
0935 version_to_compare = match.group(2).strip()
0936
0937 try:
0938 version1 = version.parse(version_string)
0939 version2 = version.parse(version_to_compare)
0940 except version.InvalidVersion:
0941 return None
0942
0943 if operator == "==":
0944 return version1 == version2
0945 elif operator == "!=":
0946 return version1 != version2
0947 elif operator == ">=":
0948 return version1 >= version2
0949 elif operator == "<=":
0950 return version1 <= version2
0951 elif operator == ">":
0952 return version1 > version2
0953 elif operator == "<":
0954 return version1 < version2
0955 else:
0956 return None
0957
0958
0959
0960 class JsonSoftwareCheck:
0961
0962 def __init__(self, site_mapper, sw_map, wn_architecture_level_map):
0963 self.siteMapper = site_mapper
0964 self.sw_map = sw_map
0965 self.wn_architecture_level_map = wn_architecture_level_map
0966
0967
0968 def check(
0969 self,
0970 site_list,
0971 cvmfs_tag,
0972 sw_project,
0973 sw_version,
0974 cmt_config,
0975 need_cvmfs,
0976 cmt_config_only,
0977 need_container=False,
0978 container_name=None,
0979 only_tags_fc=False,
0980 host_cpu_specs=None,
0981 host_cpu_pref=None,
0982 host_gpu_spec=None,
0983 log_stream=None,
0984 ):
0985 ok_sites = []
0986 no_auto_sites = []
0987 preference_weight_map = {}
0988
0989
0990 preferred_architecture_level = host_cpu_pref.get("preferred_architecture_level") if host_cpu_pref else None
0991
0992 for tmp_site_name in site_list:
0993 tmp_site_spec = self.siteMapper.getSite(tmp_site_name)
0994 if tmp_site_spec.releases == ["AUTO"] and tmp_site_name in self.sw_map:
0995 go_ahead = False
0996 try:
0997
0998 architecture_map = {}
0999 if "architectures" in self.sw_map[tmp_site_name]:
1000 for arch_spec in self.sw_map[tmp_site_name]["architectures"]:
1001 if "type" in arch_spec:
1002 architecture_map[arch_spec["type"]] = arch_spec
1003
1004
1005 if "cpu" in architecture_map:
1006 need_cpu = False
1007 for k in architecture_map["cpu"]:
1008 if isinstance(architecture_map["cpu"][k], list):
1009 if "excl" in architecture_map["cpu"][k]:
1010 need_cpu = True
1011 break
1012 if need_cpu and host_cpu_specs is None:
1013 continue
1014
1015
1016 if "gpu" in architecture_map:
1017 need_gpu = False
1018 for k in architecture_map["gpu"]:
1019 if isinstance(architecture_map["gpu"][k], list):
1020 if "excl" in architecture_map["gpu"][k]:
1021 need_gpu = True
1022 break
1023 if need_gpu and host_gpu_spec is None:
1024 continue
1025
1026
1027 if preferred_architecture_level:
1028 preference_weight_map[tmp_site_name] = (
1029 1 + self.wn_architecture_level_map.get(tmp_site_name, {}).get(preferred_architecture_level, {}).get("pct_within_queue", 0) / 100
1030 )
1031
1032 if host_cpu_specs or host_gpu_spec:
1033
1034 if not architecture_map:
1035 continue
1036
1037 if host_cpu_specs:
1038 host_ok = False
1039 for host_cpu_spec in host_cpu_specs:
1040
1041 if "cpu" not in architecture_map:
1042 continue
1043
1044
1045 if host_cpu_spec["arch"] == "*":
1046 if "excl" in architecture_map["cpu"]["arch"]:
1047 continue
1048 else:
1049 if "any" not in architecture_map["cpu"]["arch"]:
1050 if host_cpu_spec["arch"] not in architecture_map["cpu"]["arch"]:
1051
1052 if not [True for iii in architecture_map["cpu"]["arch"] if re.search("^" + host_cpu_spec["arch"] + "$", iii)]:
1053 continue
1054
1055
1056 if host_cpu_spec["vendor"] == "*":
1057
1058 if "vendor" in architecture_map["cpu"] and "excl" in architecture_map["cpu"]["vendor"]:
1059 continue
1060 else:
1061
1062 if "vendor" not in architecture_map["cpu"]:
1063 continue
1064
1065 if "any" not in architecture_map["cpu"]["vendor"] and host_cpu_spec["vendor"] not in architecture_map["cpu"]["vendor"]:
1066 continue
1067
1068
1069 if host_cpu_spec["instr"] == "*":
1070 if "instr" in architecture_map["cpu"] and "excl" in architecture_map["cpu"]["instr"]:
1071 continue
1072 else:
1073 if "instr" not in architecture_map["cpu"]:
1074 continue
1075 if "any" not in architecture_map["cpu"]["instr"] and host_cpu_spec["instr"] not in architecture_map["cpu"]["instr"]:
1076 continue
1077 host_ok = True
1078 break
1079 if not host_ok:
1080 continue
1081
1082
1083 if host_gpu_spec:
1084
1085 if "gpu" not in architecture_map:
1086 continue
1087
1088
1089 if host_gpu_spec["vendor"] == "*":
1090
1091 if "vendor" in architecture_map["gpu"] and "excl" in architecture_map["gpu"]["vendor"]:
1092 continue
1093 else:
1094
1095 if "vendor" not in architecture_map["gpu"]:
1096 continue
1097
1098 if "any" not in architecture_map["gpu"]["vendor"] and host_gpu_spec["vendor"] not in architecture_map["gpu"]["vendor"]:
1099 continue
1100
1101
1102 if host_gpu_spec["model"] == "*":
1103 if "model" in architecture_map["gpu"] and "excl" in architecture_map["gpu"]["model"]:
1104 continue
1105 else:
1106 if isinstance(host_gpu_spec["model"], dict):
1107 model_pattern = host_gpu_spec["model"]["pattern"]
1108 model_excl = host_gpu_spec["model"].get("excl", False)
1109 else:
1110 model_pattern = host_gpu_spec["model"]
1111 model_excl = False
1112 if "model" not in architecture_map["gpu"] or (
1113 "any" not in architecture_map["gpu"]["model"]
1114 and any(re.match(model_pattern, m, re.IGNORECASE) for m in architecture_map["gpu"]["model"])
1115 == model_excl
1116 ):
1117 continue
1118
1119
1120 if "version" in host_gpu_spec:
1121 if "version" not in architecture_map["gpu"]:
1122
1123 continue
1124 elif "any" == architecture_map["gpu"]["version"]:
1125
1126 pass
1127 else:
1128
1129 if not compare_version_string(architecture_map["gpu"]["version"], host_gpu_spec["version"]):
1130 continue
1131 go_ahead = True
1132 except Exception as e:
1133 if log_stream:
1134 log_stream.error(f"json check {str(architecture_map)} failed for {tmp_site_name} {str(e)} {traceback.format_exc()} ")
1135 if not go_ahead:
1136 continue
1137
1138 if not (cvmfs_tag or cmt_config or sw_project or sw_version or container_name) and (host_cpu_specs or host_gpu_spec):
1139 ok_sites.append(tmp_site_name)
1140 continue
1141
1142 if container_name:
1143
1144 if not only_tags_fc and ("any" in self.sw_map[tmp_site_name]["containers"] or "/cvmfs" in self.sw_map[tmp_site_name]["containers"]):
1145
1146 ok_sites.append(tmp_site_name)
1147 elif container_name in set([t["container_name"] for t in self.sw_map[tmp_site_name]["tags"] if t["container_name"]]):
1148
1149 ok_sites.append(tmp_site_name)
1150 elif container_name in set([s for t in self.sw_map[tmp_site_name]["tags"] for s in t["sources"] if t["sources"]]):
1151
1152 ok_sites.append(tmp_site_name)
1153 elif not only_tags_fc:
1154
1155 if "ALL" in self.sw_map:
1156 source_list_in_all_tag = [s for t in self.sw_map["ALL"]["tags"] for s in t["sources"] if t["container_name"] == container_name]
1157 else:
1158 source_list_in_all_tag = []
1159
1160 for tmp_prefix in self.sw_map[tmp_site_name]["containers"]:
1161 if container_name.startswith(tmp_prefix):
1162 ok_sites.append(tmp_site_name)
1163 break
1164 toBreak = False
1165 for source_in_all_tag in source_list_in_all_tag:
1166 if source_in_all_tag.startswith(tmp_prefix):
1167 ok_sites.append(tmp_site_name)
1168 toBreak = True
1169 break
1170 if toBreak:
1171 break
1172 continue
1173
1174 if cmt_config_only:
1175 ok_sites.append(tmp_site_name)
1176 continue
1177
1178 if "any" in self.sw_map[tmp_site_name]["cvmfs"] or cvmfs_tag in self.sw_map[tmp_site_name]["cvmfs"]:
1179
1180 if "any" in self.sw_map[tmp_site_name]["containers"] or "/cvmfs" in self.sw_map[tmp_site_name]["containers"]:
1181 ok_sites.append(tmp_site_name)
1182
1183 elif not need_container and cmt_config in self.sw_map[tmp_site_name]["cmtconfigs"]:
1184 ok_sites.append(tmp_site_name)
1185 elif not need_cvmfs:
1186 if not need_container or "any" in self.sw_map[tmp_site_name]["containers"]:
1187
1188 for tag in self.sw_map[tmp_site_name]["tags"]:
1189 if tag["cmtconfig"] == cmt_config and tag["project"] == sw_project and tag["release"] == sw_version:
1190 ok_sites.append(tmp_site_name)
1191 break
1192
1193 continue
1194
1195 if container_name is not None or host_cpu_specs is not None or host_gpu_spec is not None:
1196 continue
1197 no_auto_sites.append(tmp_site_name)
1198
1199 return ok_sites, no_auto_sites, preference_weight_map
1200
1201
1202
1203 def resolve_cmt_config(queue_name: str, cmt_config: str, base_platform, sw_map: dict) -> str | None:
1204 """
1205 resolve cmt config at a given queue_name
1206 :param queue_name: queue name
1207 :param cmt_config: cmt confing to resolve
1208 :param base_platform: base platform
1209 :param sw_map: software map
1210 :return: resolved cmt config or None if unavailable or valid
1211 """
1212
1213 if queue_name not in sw_map:
1214 return None
1215
1216 if cmt_config in sw_map[queue_name]["cmtconfigs"]:
1217 return None
1218
1219 for tmp_cmt_config in sw_map[queue_name]["cmtconfigs"]:
1220 if re.search("^" + cmt_config + "$", tmp_cmt_config):
1221 if base_platform:
1222
1223 tmp_cmt_config = tmp_cmt_config + "@" + base_platform
1224 return tmp_cmt_config
1225
1226 return None
1227
1228
1229 def check_endpoints_with_blacklist(
1230 site_spec: SiteSpec.SiteSpec, scope_input: str, scope_output: str, sites_in_nucleus: list, remote_source_available: bool
1231 ) -> str | None:
1232 """
1233 Check if site's endpoints are in the blacklist
1234
1235 :param site_spec: site spec
1236 :param scope_input: input scope
1237 :param scope_output: output scope
1238 :param sites_in_nucleus: list of sites in nucleus
1239 :param remote_source_available: if remote source is available
1240
1241 :return: description of blacklisted reason or None
1242 """
1243 tmp_msg = None
1244 receive_input_over_wan = False
1245 read_input_over_lan = False
1246 write_output_over_lan = False
1247 send_output_over_wan = False
1248 tmp_site_name = site_spec.sitename
1249 if scope_input in site_spec.ddm_endpoints_input:
1250 for tmp_input_endpoint in site_spec.ddm_endpoints_input[scope_input].all.values():
1251 tmp_read_input_over_lan = tmp_input_endpoint["detailed_status"].get("read_lan")
1252 tmp_receive_input_over_wan = tmp_input_endpoint["detailed_status"].get("write_wan")
1253
1254 if tmp_read_input_over_lan not in ["OFF", "TEST"]:
1255 read_input_over_lan = True
1256
1257 if tmp_site_name not in sites_in_nucleus:
1258
1259 if tmp_receive_input_over_wan not in ["OFF", "TEST"]:
1260 receive_input_over_wan = True
1261 else:
1262
1263 receive_input_over_wan = True
1264 remote_source_available = True
1265 if scope_output in site_spec.ddm_endpoints_output:
1266 for tmp_output_endpoint in site_spec.ddm_endpoints_output[scope_output].all.values():
1267 tmp_write_output_over_lan = tmp_output_endpoint["detailed_status"].get("write_lan")
1268 tmp_send_output_over_wan = tmp_output_endpoint["detailed_status"].get("read_wan")
1269
1270 if tmp_write_output_over_lan not in ["OFF", "TEST"]:
1271 write_output_over_lan = True
1272
1273 if tmp_site_name not in sites_in_nucleus:
1274
1275 if tmp_send_output_over_wan not in ["OFF", "TEST"]:
1276 send_output_over_wan = True
1277 else:
1278
1279 send_output_over_wan = True
1280 remote_source_available = True
1281
1282 if not read_input_over_lan:
1283 tmp_msg = f" skip site={tmp_site_name} since input endpoints cannot read over LAN, read_lan is not ON criteria=-read_lan_blacklist"
1284 elif not write_output_over_lan:
1285 tmp_msg = f" skip site={tmp_site_name} since output endpoints cannot write over LAN, write_lan is not ON criteria=-write_lan_blacklist"
1286 elif not receive_input_over_wan:
1287 tmp_msg = f" skip site={tmp_site_name} since input endpoints cannot receive files over WAN, write_wan is not ON criteria=-write_wan_blacklist"
1288 elif not send_output_over_wan:
1289 tmp_msg = f" skip site={tmp_site_name} since output endpoints cannot send out files over WAN, read_wan is not ON criteria=-read_wan_blacklist"
1290 elif not remote_source_available:
1291 tmp_msg = f" skip site={tmp_site_name} since source endpoints cannot transfer files over WAN and it is satellite criteria=-source_blacklist"
1292 return tmp_msg