Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:57

0001 import datetime
0002 import json
0003 import os
0004 import re
0005 import socket
0006 import sys
0007 import time
0008 import traceback
0009 from typing import Any
0010 
0011 from packaging import version
0012 from pandacommon.pandautils.PandaUtils import naive_utcnow
0013 
0014 from pandajedi.jedicore import Interaction
0015 from pandajedi.jediddm.DDMInterface import DDMInterface
0016 from pandaserver.brokerage.SiteMapper import SiteMapper
0017 from pandaserver.dataservice import DataServiceUtils
0018 from pandaserver.dataservice.DataServiceUtils import select_scope
0019 from pandaserver.taskbuffer import JobUtils, ProcessGroups, SiteSpec
0020 
0021 
0022 # get nuclei where data is available
0023 def getNucleiWithData(siteMapper, ddmIF, datasetName, candidateNuclei, deepScan=False):
0024     # get replicas
0025     try:
0026         replica_map = ddmIF.listReplicasPerDataset(datasetName, deepScan)
0027     except Exception:
0028         errtype, errvalue = sys.exc_info()[:2]
0029         return errtype, f"ddmIF.listReplicasPerDataset failed with {errvalue}", None
0030     # check if remote source is available at any sites (not only nuclei)
0031     remote_source_available = False
0032     for tmp_replica_data in replica_map.values():
0033         for tmp_location, tmp_stat_data in tmp_replica_data.items():
0034             if tmp_stat_data[0]["total"] in [None, 0]:
0035                 continue
0036             if tmp_stat_data[0]["total"] != tmp_stat_data[0]["found"]:
0037                 continue
0038             if siteMapper.is_readable_remotely(tmp_location):
0039                 remote_source_available = True
0040                 break
0041     # loop over all clouds
0042     return_map = {}
0043     for tmpNucleus in candidateNuclei:
0044         tmpNucleusSpec = siteMapper.getNucleus(tmpNucleus)
0045         # loop over all datasets
0046         totalNum = 0
0047         totalSize = 0
0048         avaNumDisk = 0
0049         avaNumAny = 0
0050         avaSizeDisk = 0
0051         avaSizeAny = 0
0052         can_be_remote_source = False
0053         for tmpDataset, tmpRepMap in replica_map.items():
0054             tmpTotalNum = 0
0055             tmpTotalSize = 0
0056             tmpAvaNumDisk = 0
0057             tmpAvaNumAny = 0
0058             tmpAvaSizeDisk = 0
0059             tmpAvaSizeAny = 0
0060             # loop over all endpoints
0061             for tmpLoc, locData in tmpRepMap.items():
0062                 # get total
0063                 if tmpTotalNum == 0:
0064                     tmpTotalNum = locData[0]["total"]
0065                     tmpTotalSize = locData[0]["tsize"]
0066                 # check if the endpoint is associated
0067                 if tmpNucleusSpec.is_associated_for_input(tmpLoc):
0068                     # check blacklist
0069                     if siteMapper.is_readable_remotely(tmpLoc):
0070                         can_be_remote_source = True
0071                     # sum
0072                     tmpEndpoint = tmpNucleusSpec.getEndpoint(tmpLoc)
0073                     tmpAvaNum = locData[0]["found"]
0074                     tmpAvaSize = locData[0]["asize"]
0075                     # disk
0076                     if tmpEndpoint["is_tape"] != "Y":
0077                         # complete replica is available at DISK
0078                         if tmpTotalNum == tmpAvaNum and tmpTotalNum > 0:
0079                             tmpAvaNumDisk = tmpAvaNum
0080                             tmpAvaNumAny = tmpAvaNum
0081                             tmpAvaSizeDisk = tmpAvaSize
0082                             tmpAvaSizeAny = tmpAvaSize
0083                             break
0084                         if tmpAvaNum > tmpAvaNumDisk:
0085                             tmpAvaNumDisk = tmpAvaNum
0086                             tmpAvaSizeDisk = tmpAvaSize
0087                     # tape
0088                     if tmpAvaNumAny < tmpAvaNum:
0089                         tmpAvaNumAny = tmpAvaNum
0090                         tmpAvaSizeAny = tmpAvaSize
0091             # total
0092             totalNum = tmpTotalNum
0093             totalSize = tmpTotalSize
0094             avaNumDisk += tmpAvaNumDisk
0095             avaNumAny += tmpAvaNumAny
0096             avaSizeDisk += tmpAvaSizeDisk
0097             avaSizeAny += tmpAvaSizeAny
0098         # append
0099         if tmpNucleus in candidateNuclei or avaNumAny > 0:
0100             return_map[tmpNucleus] = {
0101                 "tot_num": totalNum,
0102                 "tot_size": totalSize,
0103                 "ava_num_disk": avaNumDisk,
0104                 "ava_num_any": avaNumAny,
0105                 "ava_size_disk": avaSizeDisk,
0106                 "ava_size_any": avaSizeAny,
0107                 "can_be_remote_source": can_be_remote_source,
0108             }
0109 
0110     return Interaction.SC_SUCCEEDED, return_map, remote_source_available
0111 
0112 
0113 # get sites where data is available and check if complete replica is available at online RSE
0114 def get_sites_with_data(
0115     site_list: list,
0116     site_mapper: SiteMapper,
0117     ddm_if: DDMInterface,
0118     dataset_name: str,
0119     element_list: list,
0120     max_missing_input_files: int,
0121     min_input_completeness: int,
0122 ) -> tuple[Any, dict | str, bool | None, bool | None, bool | None, bool | None, bool | None, list]:
0123     """
0124     Get sites where data is available and check if complete replica is available at online RSE
0125     1) regarded_as_complete_disk: True if a replica is regarded as complete at disk (missing files within threshold)
0126     2) complete_tape: True if a complete replica is available at tape
0127     3) truly_complete_disk: True if a truly complete replica is available at disk (no missing files)
0128     4) can_be_local_source: True if the site can read the replica locally over LAN
0129     5) can_be_remote_source: True if the site can read the replica remotely over WAN
0130     6) list_of_complete_replica_locations : list of RSEs where truly complete replica is available at disk
0131     Note that VP replicas are not taken into account for the above flags
0132 
0133     :param site_list: list of site names to be checked
0134     :param site_mapper: SiteMapper object
0135     :param ddm_if: DDMInterface object
0136     :param dataset_name: dataset name
0137     :param element_list: list of constituent datasets
0138     :param max_missing_input_files: maximum number of missing files to be regarded as complete
0139     :param min_input_completeness: minimum completeness (%) to be regarded as complete
0140 
0141     :return: tuple of (status code or exception type, dict of sites with data availability info, regarded_as_complete_disk, complete_tape, truly_complete_disk, can_be_local_source, can_be_remote_source, list_of_complete_replica_locations)
0142     """
0143     # get replicas
0144     try:
0145         replica_map = {
0146             dataset_name: ddm_if.listDatasetReplicas(dataset_name, use_vp=True, skip_incomplete_element=True, element_list=element_list, use_deep=True)
0147         }
0148     except Exception:
0149         errtype, errvalue = sys.exc_info()[:2]
0150         return errtype, f"ddmIF.listDatasetReplicas failed with {errvalue}", None, None, None, None, None, []
0151 
0152     # check completeness and storage availability
0153     is_tape = {}
0154     replica_availability_info = {}
0155     list_of_complete_replica_locations = []
0156     for tmp_rse, tmp_data_list in replica_map[dataset_name].items():
0157         # check tape attribute
0158         try:
0159             is_tape[tmp_rse] = ddm_if.getSiteProperty(tmp_rse, "is_tape")
0160         except Exception:
0161             is_tape[tmp_rse] = None
0162         # look for complete replicas
0163         tmp_regarded_as_complete_disk = False
0164         tmp_truly_complete_disk = False
0165         tmp_complete_tape = False
0166         tmp_can_be_local_source = False
0167         tmp_can_be_remote_source = False
0168         for tmp_data in tmp_data_list:
0169             if not tmp_data.get("vp"):
0170                 if tmp_data["found"] == tmp_data["total"]:
0171                     # truly complete
0172                     truly_complete = True
0173                 elif (
0174                     tmp_data["total"]
0175                     and tmp_data["found"]
0176                     and (
0177                         tmp_data["total"] - tmp_data["found"] <= max_missing_input_files
0178                         or tmp_data["found"] / tmp_data["total"] * 100 >= min_input_completeness
0179                     )
0180                 ):
0181                     # regarded as complete
0182                     truly_complete = False
0183                 else:
0184                     continue
0185                 list_of_complete_replica_locations.append(tmp_rse)
0186                 if is_tape[tmp_rse]:
0187                     tmp_complete_tape = True
0188                 else:
0189                     tmp_regarded_as_complete_disk = True
0190                     if truly_complete:
0191                         tmp_truly_complete_disk = True
0192                 # check if it is locally accessible over LAN
0193                 if site_mapper.is_readable_locally(tmp_rse):
0194                     tmp_can_be_local_source = True
0195                 # check if it is remotely accessible over WAN
0196                 if site_mapper.is_readable_remotely(tmp_rse):
0197                     tmp_can_be_remote_source = True
0198         replica_availability_info[tmp_rse] = {
0199             "regarded_as_complete_disk": tmp_regarded_as_complete_disk,
0200             "truly_complete_disk": tmp_truly_complete_disk,
0201             "complete_tape": tmp_complete_tape,
0202             "can_be_local_source": tmp_can_be_local_source,
0203             "can_be_remote_source": tmp_can_be_remote_source,
0204         }
0205 
0206     # loop over all candidate sites
0207     regarded_as_complete_disk = False
0208     truly_complete_disk = False
0209     complete_tape = False
0210     can_be_local_source = False
0211     can_be_remote_source = False
0212     return_map = {}
0213     if not site_list:
0214         # make sure at least one loop to set the flags
0215         site_list = [None]
0216     for tmp_site_name in site_list:
0217         if not site_mapper.checkSite(tmp_site_name):
0218             continue
0219         # get associated DDM endpoints
0220         tmp_site_spec = site_mapper.getSite(tmp_site_name)
0221         scope_input, scope_output = select_scope(tmp_site_spec, JobUtils.ANALY_PS, JobUtils.ANALY_PS)
0222         try:
0223             input_endpoints = tmp_site_spec.ddm_endpoints_input[scope_input].all.keys()
0224         except Exception:
0225             input_endpoints = {}
0226         # loop over all associated endpoints
0227         for tmp_rse in input_endpoints:
0228             if tmp_rse in replica_map[dataset_name]:
0229                 # check completeness
0230                 tmp_statistics = replica_map[dataset_name][tmp_rse][-1]
0231                 if tmp_statistics["found"] is None:
0232                     tmp_dataset_completeness = "unknown"
0233                     # refresh request
0234                     pass
0235                 elif tmp_statistics["total"] == tmp_statistics["found"]:
0236                     tmp_dataset_completeness = "complete"
0237                 else:
0238                     tmp_dataset_completeness = "incomplete"
0239                 # append
0240                 if tmp_site_name not in return_map:
0241                     return_map[tmp_site_name] = {}
0242                 return_map[tmp_site_name][tmp_rse] = {"tape": is_tape[tmp_rse], "state": tmp_dataset_completeness}
0243                 if "vp" in tmp_statistics:
0244                     return_map[tmp_site_name][tmp_rse]["vp"] = tmp_statistics["vp"]
0245     # set flags
0246     for tmp_rse in replica_availability_info:
0247         if replica_availability_info[tmp_rse]["can_be_local_source"] or replica_availability_info[tmp_rse]["can_be_remote_source"]:
0248             if replica_availability_info[tmp_rse]["regarded_as_complete_disk"]:
0249                 regarded_as_complete_disk = True
0250             if replica_availability_info[tmp_rse]["truly_complete_disk"]:
0251                 truly_complete_disk = True
0252             if replica_availability_info[tmp_rse]["complete_tape"]:
0253                 complete_tape = True
0254             if replica_availability_info[tmp_rse]["can_be_local_source"]:
0255                 can_be_local_source = True
0256             if replica_availability_info[tmp_rse]["can_be_remote_source"]:
0257                 can_be_remote_source = True
0258 
0259     return (
0260         Interaction.SC_SUCCEEDED,
0261         return_map,
0262         regarded_as_complete_disk,
0263         complete_tape,
0264         truly_complete_disk,
0265         can_be_local_source,
0266         can_be_remote_source,
0267         list_of_complete_replica_locations,
0268     )
0269 
0270 
0271 # get analysis sites where data is available at disk
0272 def getAnalSitesWithDataDisk(dataSiteMap, includeTape=False, use_vp=True, use_incomplete=False):
0273     sites_with_complete_replicas = []
0274     sites_with_incomplete_replicas = []
0275     sites_with_non_vp_disk_replicas = set()
0276     sites_using_vp = set()
0277     for tmp_site_name, tmpSeValMap in dataSiteMap.items():
0278         for tmpSE, tmpValMap in tmpSeValMap.items():
0279             # VP
0280             if tmpValMap.get("vp"):
0281                 sites_using_vp.add(tmp_site_name)
0282                 if not use_vp:
0283                     continue
0284             # on disk or tape
0285             if includeTape or not tmpValMap["tape"]:
0286                 # complete replica available at disk, tape, or VP
0287                 if tmpValMap["state"] == "complete":
0288                     # has complete replica
0289                     if tmp_site_name not in sites_with_complete_replicas:
0290                         sites_with_complete_replicas.append(tmp_site_name)
0291                     # has non-VP disk replica
0292                     if not tmpValMap["tape"] and not tmpValMap.get("vp"):
0293                         sites_with_non_vp_disk_replicas.add(tmp_site_name)
0294                 else:
0295                     # incomplete replica at disk
0296                     if tmp_site_name not in sites_with_incomplete_replicas:
0297                         sites_with_incomplete_replicas.append(tmp_site_name)
0298     # remove VP if complete non-VP disk replica is unavailable
0299     if sites_with_non_vp_disk_replicas:
0300         for tmpSiteNameVP in sites_using_vp:
0301             if tmpSiteNameVP in sites_with_complete_replicas:
0302                 sites_with_complete_replicas.remove(tmpSiteNameVP)
0303             if tmpSiteNameVP in sites_with_incomplete_replicas:
0304                 sites_with_incomplete_replicas.remove(tmpSiteNameVP)
0305     # return sites with complete
0306     if sites_with_complete_replicas:
0307         if not use_incomplete or sites_with_non_vp_disk_replicas:
0308             return sites_with_complete_replicas
0309         else:
0310             return sites_with_complete_replicas + sites_with_incomplete_replicas
0311     # return sites with incomplete if complete is unavailable
0312     return sites_with_incomplete_replicas
0313 
0314 
0315 # get the number of jobs in a status
0316 def getNumJobs(jobStatMap, computingSite, jobStatus, cloud=None, workQueue_tag=None):
0317     if computingSite not in jobStatMap:
0318         return 0
0319     nJobs = 0
0320     # loop over all workQueues
0321     for tmpWorkQueue, tmpWorkQueueVal in jobStatMap[computingSite].items():
0322         # workQueue is defined
0323         if workQueue_tag is not None and workQueue_tag != tmpWorkQueue:
0324             continue
0325         # loop over all job status
0326         for tmpJobStatus, tmpCount in tmpWorkQueueVal.items():
0327             if tmpJobStatus == jobStatus:
0328                 nJobs += tmpCount
0329 
0330     return nJobs
0331 
0332 
0333 def get_total_nq_nr_ratio(job_stat_map, work_queue_tag=None):
0334     """
0335     Get the ratio of number of queued jobs to number of running jobs
0336     """
0337     nRunning = 0
0338     nQueue = 0
0339 
0340     # loop over all workQueues
0341     for siteVal in job_stat_map.values():
0342         for tmpWorkQueue in siteVal:
0343             # workQueue is defined
0344             if work_queue_tag is not None and work_queue_tag != tmpWorkQueue:
0345                 continue
0346             tmpWorkQueueVal = siteVal[tmpWorkQueue]
0347             # loop over all job status
0348             for tmpJobStatus in ["defined", "assigned", "activated", "starting"]:
0349                 if tmpJobStatus in tmpWorkQueueVal:
0350                     nQueue += tmpWorkQueueVal[tmpJobStatus]
0351             if "running" in tmpWorkQueueVal:
0352                 nRunning += tmpWorkQueueVal["running"]
0353     try:
0354         ratio = float(nQueue) / float(nRunning)
0355     except Exception:
0356         ratio = None
0357 
0358     return ratio
0359 
0360 
0361 def hasZeroShare(site_spec, task_spec, ignore_priority, tmp_log):
0362     """
0363     Check if the site has a zero share for the given task. Zero share means there is a policy preventing the site to be used for the task.
0364 
0365     :param site_spec: SiteSpec object describing the site being checked in brokerage
0366     :param task_spec: TaskSpec object describing the task being brokered
0367     :param ignore_priority: Merging job chunks will skip the priority check
0368     :param tmp_log: Logger object
0369 
0370     :return: False means there is no policy defined and the site can be used.
0371              True means the site has a fair share policy that prevents the site to be used for the task
0372     """
0373 
0374     # there is no per-site share defined (CRIC "fairsharePolicy" field), the site can be used
0375     if site_spec.fairsharePolicy in ["", None]:
0376         return False
0377 
0378     try:
0379         # get the group of processing types from a pre-defined mapping
0380         processing_group = ProcessGroups.getProcessGroup(task_spec.processingType)
0381 
0382         # don't suppress test tasks - the site can be used
0383         if processing_group in ["test"]:
0384             return False
0385 
0386         # loop over all policies
0387         for policy in site_spec.fairsharePolicy.split(","):
0388             # Examples of policies are:
0389             # type=evgen:100%,type=simul:100%,type=any:0%
0390             # type=evgen:100%,type=simul:100%,type=any:0%,group=(AP_Higgs|AP_Susy|AP_Exotics|Higgs):0%
0391             # gshare=Express:100%,gshare=any:0%
0392             # priority>400:0
0393             tmp_processing_type = None
0394             tmp_working_group = None
0395             tmp_priority = None
0396             tmp_gshare = None
0397             tmp_fair_share = policy.split(":")[-1]
0398 
0399             # break down each fair share policy into its fields
0400             for tmp_field in policy.split(":"):
0401                 if tmp_field.startswith("type="):
0402                     tmp_processing_type = tmp_field.split("=")[-1]
0403                 elif tmp_field.startswith("group="):
0404                     tmp_working_group = tmp_field.split("=")[-1]
0405                 elif tmp_field.startswith("gshare="):
0406                     tmp_gshare = tmp_field.split("=")[-1]
0407                 elif tmp_field.startswith("priority"):
0408                     tmp_priority = re.sub("priority", "", tmp_field)
0409 
0410             # check for a matching processing type
0411             if tmp_processing_type not in ["any", None]:
0412                 if "*" in tmp_processing_type:
0413                     tmp_processing_type = tmp_processing_type.replace("*", ".*")
0414                 # if there is no match between the site's fair share policy and the task's processing type,
0415                 # so continue looking for other policies that trigger the zero share condition
0416                 if re.search(f"^{tmp_processing_type}$", processing_group) is None:
0417                     continue
0418 
0419             # check for matching working group
0420             if tmp_working_group not in ["any", None]:
0421                 # None causes an exception in re.search, so convert to empty string
0422                 task_working_group = task_spec.workingGroup or ""
0423                 if "*" in tmp_working_group:
0424                     tmp_working_group = tmp_working_group.replace("*", ".*")
0425                 # if there is no match between the site's fair share policy and the task's working group
0426                 # continue looking for other policies that trigger the zero share condition
0427                 if re.search(f"^{tmp_working_group}$", task_working_group) is None:
0428                     continue
0429 
0430             # check for matching gshare. Note that this only works for "leave gshares" in the fairsharePolicy,
0431             # i.e. the ones that have no sub-gshares, since the task only gets "leave gshares" assigned
0432             if tmp_gshare not in ["any", None] and task_spec.gshare is not None:
0433                 # None causes an exception in re.search, so convert to empty string
0434                 task_gshare = task_spec.gshare or ""
0435                 if "*" in tmp_gshare:
0436                     tmp_gshare = tmp_gshare.replace("*", ".*")
0437                 # if there is no match between the site's fair share policy and the task's gshare
0438                 # continue looking for other policies that trigger the zero share condition
0439                 if re.search(f"^{tmp_gshare}$", task_gshare) is None:
0440                     continue
0441 
0442             # check priority
0443             if tmp_priority is not None and not ignore_priority:
0444                 try:
0445                     exec(f"tmpStat = {task_spec.currentPriority}{tmp_priority}", globals())
0446                     tmp_log.debug(
0447                         f"Priority check for {site_spec.sitename}, {task_spec.currentPriority}): " f"{task_spec.currentPriority}{tmp_priority} = {tmpStat}"
0448                     )
0449                     if not tmpStat:
0450                         continue
0451                 except Exception:
0452                     error_type, error_value = sys.exc_info()[:2]
0453                     tmp_log.error(f"Priority check for {site_spec.sitename} failed with {error_type}:{error_value}")
0454 
0455             # check fair share value
0456             # if 0, we need to skip the site
0457             # if different than 0, the site can be used
0458             if tmp_fair_share in ["0", "0%"]:
0459                 return True
0460             else:
0461                 return False
0462 
0463     except Exception:
0464         error_type, error_value = sys.exc_info()[:2]
0465         tmp_log.error(f"hasZeroShare failed with {error_type}:{error_value}")
0466 
0467     # if we reach this point, it means there is no policy preventing the site to be used
0468     return False
0469 
0470 
0471 # check if site name is matched with one of list items
0472 def isMatched(siteName, nameList):
0473     for tmpName in nameList:
0474         # ignore empty
0475         if tmpName == "":
0476             continue
0477         # wild card
0478         if "*" in tmpName:
0479             tmpName = tmpName.replace("*", ".*")
0480             if re.search(tmpName, siteName) is not None:
0481                 return True
0482         else:
0483             # normal pattern
0484             if tmpName == siteName:
0485                 return True
0486 
0487     return False
0488 
0489 
0490 # get dict to set nucleus
0491 def getDictToSetNucleus(nucleusSpec, tmpDatasetSpecs):
0492     # get destinations
0493     return_map = {"datasets": [], "nucleus": nucleusSpec.name}
0494     for datasetSpec in tmpDatasetSpecs:
0495         # skip distributed datasets
0496         if DataServiceUtils.getDistributedDestination(datasetSpec.storageToken) is not None:
0497             continue
0498         # get endpoint relevant to token
0499         endPoint = nucleusSpec.getAssociatedEndpoint(datasetSpec.storageToken)
0500         if endPoint is None and not nucleusSpec.is_nucleus() and nucleusSpec.get_default_endpoint_out():
0501             # use default endpoint for satellite that doesn't have relevant endpoint
0502             endPoint = nucleusSpec.get_default_endpoint_out()
0503         if endPoint is None:
0504             continue
0505         token = endPoint["ddm_endpoint_name"]
0506         # add original token
0507         if datasetSpec.storageToken not in ["", None]:
0508             token += f"/{datasetSpec.storageToken.split('/')[-1]}"
0509         return_map["datasets"].append({"datasetID": datasetSpec.datasetID, "token": f"dst:{token}", "destination": f"nucleus:{nucleusSpec.name}"})
0510     return return_map
0511 
0512 
0513 # remove problematic sites
0514 def skipProblematicSites(candidateSpecList, ngSites, sitesUsedByTask, preSetSiteSpec, maxNumSites, tmpLog):
0515     skippedSites = set()
0516     usedSitesGood = []
0517     newSitesGood = []
0518     # collect sites already used by the task
0519     for candidateSpec in candidateSpecList:
0520         # check if problematic
0521         if (candidateSpec.siteName in ngSites or candidateSpec.unifiedName in ngSites) and (
0522             preSetSiteSpec is None or candidateSpec.siteName != preSetSiteSpec.siteName
0523         ):
0524             skippedSites.add(candidateSpec.siteName)
0525         else:
0526             if candidateSpec.siteName in sitesUsedByTask or candidateSpec.unifiedName in sitesUsedByTask:
0527                 usedSitesGood.append(candidateSpec)
0528             else:
0529                 newSitesGood.append(candidateSpec)
0530     # set number of sites if undefined
0531     if maxNumSites in [0, None]:
0532         maxNumSites = len(candidateSpecList)
0533     newcandidateSpecList = usedSitesGood + newSitesGood
0534     newcandidateSpecList = newcandidateSpecList[:maxNumSites]
0535     # dump
0536     for skippedSite in skippedSites:
0537         tmpLog.debug(f"getting rid of problematic site {skippedSite}")
0538     return newcandidateSpecList
0539 
0540 
0541 # get mapping between sites and input storage endpoints
0542 def getSiteInputStorageEndpointMap(site_list, site_mapper, prod_source_label, job_label):
0543     # make a map of panda sites to ddm endpoints
0544     ret_map = {}
0545     for site_name in site_list:
0546         tmp_site_spec = site_mapper.getSite(site_name)
0547         scope_input, scope_output = select_scope(tmp_site_spec, prod_source_label, job_label)
0548 
0549         # skip if scope not available
0550         if scope_input not in tmp_site_spec.ddm_endpoints_input:
0551             continue
0552 
0553         # add the schedconfig.ddm endpoints
0554         ret_map[site_name] = list(tmp_site_spec.ddm_endpoints_input[scope_input].all.keys())
0555 
0556     return ret_map
0557 
0558 
0559 # get to-running rate of sites from various resources
0560 CACHE_SiteToRunRateStats = {}
0561 
0562 
0563 def getSiteToRunRateStats(tbIF, vo, time_window=21600, cutoff=300, cache_lifetime=600):
0564     # initialize
0565     ret_val = False
0566     ret_map = {}
0567     # DB cache keys
0568     dc_main_key = "AtlasSites"
0569     dc_sub_key = "SiteToRunRate"
0570     # arguments for process lock
0571     this_prodsourcelabel = "user"
0572     this_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-broker"
0573     this_component = "Cache.SiteToRunRate"
0574     # timestamps
0575     current_time = naive_utcnow()
0576     starttime_max = current_time - datetime.timedelta(seconds=cutoff)
0577     starttime_min = current_time - datetime.timedelta(seconds=time_window)
0578     # rounded with 10 minutes
0579     starttime_max_rounded = starttime_max.replace(minute=starttime_max.minute // 10 * 10, second=0, microsecond=0)
0580     starttime_min_rounded = starttime_min.replace(minute=starttime_min.minute // 10 * 10, second=0, microsecond=0)
0581     real_interval_hours = (starttime_max_rounded - starttime_min_rounded).total_seconds() / 3600
0582     # local cache key
0583     local_cache_key = (starttime_min_rounded, starttime_max_rounded)
0584     # condition of query
0585     if local_cache_key in CACHE_SiteToRunRateStats and current_time <= CACHE_SiteToRunRateStats[local_cache_key]["exp"]:
0586         # query from local cache
0587         ret_val = True
0588         ret_map = CACHE_SiteToRunRateStats[local_cache_key]["data"]
0589     else:
0590         # try some times
0591         for _ in range(99):
0592             # skip if too long after original current time
0593             if naive_utcnow() - current_time > datetime.timedelta(seconds=min(10, cache_lifetime / 4)):
0594                 # break trying
0595                 break
0596             try:
0597                 # query from DB cache
0598                 cache_spec = tbIF.getCache_JEDI(main_key=dc_main_key, sub_key=dc_sub_key)
0599                 if cache_spec is not None:
0600                     expired_time = cache_spec.last_update + datetime.timedelta(seconds=cache_lifetime)
0601                     if current_time <= expired_time:
0602                         # valid DB cache
0603                         ret_val = True
0604                         ret_map = json.loads(cache_spec.data)
0605                         # fill local cache
0606                         CACHE_SiteToRunRateStats[local_cache_key] = {"exp": expired_time, "data": ret_map}
0607                         # break trying
0608                         break
0609                 # got process lock
0610                 got_lock = tbIF.lockProcess_JEDI(
0611                     vo=vo,
0612                     prodSourceLabel=this_prodsourcelabel,
0613                     cloud=None,
0614                     workqueue_id=None,
0615                     resource_name=None,
0616                     component=this_component,
0617                     pid=this_pid,
0618                     timeLimit=5,
0619                 )
0620                 if not got_lock:
0621                     # not getting lock, sleep and query cache again
0622                     time.sleep(1)
0623                     continue
0624                 # query from PanDA DB directly
0625                 ret_val, ret_map = tbIF.getSiteToRunRateStats(vo=vo, exclude_rwq=False, starttime_min=starttime_min, starttime_max=starttime_max)
0626                 if ret_val:
0627                     # expired time
0628                     expired_time = current_time + datetime.timedelta(seconds=cache_lifetime)
0629                     # fill local cache
0630                     CACHE_SiteToRunRateStats[local_cache_key] = {"exp": expired_time, "data": ret_map}
0631                     # json of data
0632                     data_json = json.dumps(ret_map)
0633                     # fill DB cache
0634                     tbIF.updateCache_JEDI(main_key=dc_main_key, sub_key=dc_sub_key, data=data_json)
0635                 # unlock process
0636                 tbIF.unlockProcess_JEDI(
0637                     vo=vo, prodSourceLabel=this_prodsourcelabel, cloud=None, workqueue_id=None, resource_name=None, component=this_component, pid=this_pid
0638                 )
0639                 # break trying
0640                 break
0641             except Exception as e:
0642                 # dump error message
0643                 err_str = f"AtlasBrokerUtils.getSiteToRunRateStats got {e.__class__.__name__}: {e} \n"
0644                 sys.stderr.write(err_str)
0645                 # break trying
0646                 break
0647         # delete outdated entries in local cache
0648         for lc_key in list(CACHE_SiteToRunRateStats.keys()):
0649             lc_time_min, lc_time_max = lc_key
0650             if lc_time_max < starttime_max_rounded - datetime.timedelta(seconds=cache_lifetime):
0651                 try:
0652                     del CACHE_SiteToRunRateStats[lc_key]
0653                 except Exception as e:
0654                     err_str = f"AtlasBrokerUtils.getSiteToRunRateStats when deleting outdated entries got {e.__class__.__name__}: {e} \n"
0655                     sys.stderr.write(err_str)
0656 
0657     return ret_val, ret_map
0658 
0659 
0660 # get users jobs stats from various resources
0661 CACHE_UsersJobsStats = {}
0662 
0663 
0664 def getUsersJobsStats(tbIF, vo, prod_source_label, cache_lifetime=60):
0665     # initialize
0666     ret_val = False
0667     ret_map = {}
0668     # DB cache keys
0669     dc_main_key = "AtlasSites"
0670     dc_sub_key = "UsersJobsStats"
0671     # arguments for process lock
0672     this_prodsourcelabel = prod_source_label
0673     this_pid = this_pid = f"{socket.getfqdn().split('.')[0]}-{os.getpid()}_{os.getpgrp()}-broker"
0674     this_component = "Cache.UsersJobsStats"
0675     # local cache key; a must if not using global variable
0676     local_cache_key = "_main"
0677     # timestamps
0678     current_time = naive_utcnow()
0679     # condition of query
0680     if local_cache_key in CACHE_UsersJobsStats and current_time <= CACHE_UsersJobsStats[local_cache_key]["exp"]:
0681         # query from local cache
0682         ret_val = True
0683         ret_map = CACHE_UsersJobsStats[local_cache_key]["data"]
0684     else:
0685         # try some times
0686         for _ in range(99):
0687             # skip if too long after original current time
0688             if naive_utcnow() - current_time > datetime.timedelta(seconds=min(15, cache_lifetime / 4)):
0689                 # break trying
0690                 break
0691             try:
0692                 # query from DB cache
0693                 cache_spec = tbIF.getCache_JEDI(main_key=dc_main_key, sub_key=dc_sub_key)
0694                 if cache_spec is not None:
0695                     expired_time = cache_spec.last_update + datetime.timedelta(seconds=cache_lifetime)
0696                     if current_time <= expired_time:
0697                         # valid DB cache
0698                         ret_val = True
0699                         ret_map = json.loads(cache_spec.data)
0700                         # fill local cache
0701                         CACHE_UsersJobsStats[local_cache_key] = {"exp": expired_time, "data": ret_map}
0702                         # break trying
0703                         break
0704                 # got process lock
0705                 got_lock = tbIF.lockProcess_JEDI(
0706                     vo=vo,
0707                     prodSourceLabel=this_prodsourcelabel,
0708                     cloud=None,
0709                     workqueue_id=None,
0710                     resource_name=None,
0711                     component=this_component,
0712                     pid=this_pid,
0713                     timeLimit=(cache_lifetime * 0.75 / 60),
0714                 )
0715                 if not got_lock:
0716                     # not getting lock, sleep and query cache again
0717                     time.sleep(1)
0718                     continue
0719                 # query from PanDA DB directly
0720                 ret_map = tbIF.getUsersJobsStats_JEDI(prod_source_label=this_prodsourcelabel)
0721                 if ret_map is not None:
0722                     # expired time
0723                     expired_time = current_time + datetime.timedelta(seconds=cache_lifetime)
0724                     # fill local cache
0725                     CACHE_UsersJobsStats[local_cache_key] = {"exp": expired_time, "data": ret_map}
0726                     # json of data
0727                     data_json = json.dumps(ret_map)
0728                     # fill DB cache
0729                     tbIF.updateCache_JEDI(main_key=dc_main_key, sub_key=dc_sub_key, data=data_json)
0730                     # make True return
0731                     ret_val = True
0732                 # unlock process
0733                 tbIF.unlockProcess_JEDI(
0734                     vo=vo, prodSourceLabel=this_prodsourcelabel, cloud=None, workqueue_id=None, resource_name=None, component=this_component, pid=this_pid
0735                 )
0736                 # break trying
0737                 break
0738             except Exception as e:
0739                 # dump error message
0740                 err_str = f"AtlasBrokerUtils.getUsersJobsStats got {e.__class__.__name__}: {e} \n"
0741                 sys.stderr.write(err_str)
0742                 # break trying
0743                 break
0744 
0745     return ret_val, ret_map
0746 
0747 
0748 # get gshare usage
0749 def getGShareUsage(tbIF, gshare, fresher_than_minutes_ago=15):
0750     # initialize
0751     ret_val = False
0752     ret_map = {}
0753     # timestamps
0754     current_time = naive_utcnow()
0755     # try some times
0756     for _ in range(99):
0757         now_time = naive_utcnow()
0758         # skip if too long after original current time
0759         if now_time - current_time > datetime.timedelta(seconds=max(3, fresher_than_minutes_ago / 3)):
0760             # break trying
0761             break
0762         try:
0763             # query from PanDA DB directly
0764             sql_get_gshare = "SELECT m.value_json FROM ATLAS_PANDA.Metrics m WHERE m.metric=:metric AND m.gshare=:gshare AND m.timestamp>=:min_timestamp "
0765             var_map = {
0766                 ":metric": "gshare_preference",
0767                 ":gshare": gshare,
0768                 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0769             }
0770             # result
0771             res = tbIF.querySQL(sql_get_gshare, var_map)
0772             if res:
0773                 value_json = res[0][0]
0774                 # json of data
0775                 ret_map = json.loads(value_json)
0776                 # make True return
0777                 ret_val = True
0778             # break trying
0779             break
0780         except Exception as e:
0781             # dump error message
0782             err_str = f"AtlasBrokerUtils.getGShareUsage got {e.__class__.__name__}: {e} \n"
0783             sys.stderr.write(err_str)
0784             # break trying
0785             break
0786 
0787     return ret_val, ret_map
0788 
0789 
0790 # get user evaluation
0791 def getUserEval(tbIF, user, fresher_than_minutes_ago=20):
0792     # initialize
0793     ret_val = False
0794     ret_map = {}
0795     # timestamps
0796     current_time = naive_utcnow()
0797     # try some times
0798     for _ in range(99):
0799         now_time = naive_utcnow()
0800         # skip if too long after original current time
0801         if now_time - current_time > datetime.timedelta(seconds=max(3, fresher_than_minutes_ago / 3)):
0802             # break trying
0803             break
0804         try:
0805             # query from PanDA DB directly
0806             sql_get_user_eval = f'SELECT m.value_json."{user}" FROM ATLAS_PANDA.Metrics m WHERE m.metric=:metric AND m.timestamp>=:min_timestamp '
0807             var_map = {
0808                 ":metric": "analy_user_eval",
0809                 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0810             }
0811             # result
0812             res = tbIF.querySQL(sql_get_user_eval, var_map)
0813             if res:
0814                 value_json = res[0][0]
0815                 # json of data
0816                 ret_map = json.loads(value_json) if value_json else None
0817                 # make True return
0818                 ret_val = True
0819             # break trying
0820             break
0821         except Exception as e:
0822             # dump error message
0823             err_str = f"AtlasBrokerUtils.getUserEval got {e.__class__.__name__}: {e} \n"
0824             sys.stderr.write(err_str)
0825             # break trying
0826             break
0827 
0828     return ret_val, ret_map
0829 
0830 
0831 # get user task evaluation
0832 def getUserTaskEval(tbIF, taskID, fresher_than_minutes_ago=15):
0833     # initialize
0834     ret_val = False
0835     ret_map = {}
0836     # timestamps
0837     current_time = naive_utcnow()
0838     # try some times
0839     for _ in range(99):
0840         now_time = naive_utcnow()
0841         # skip if too long after original current time
0842         if now_time - current_time > datetime.timedelta(seconds=max(3, fresher_than_minutes_ago / 3)):
0843             # break trying
0844             break
0845         try:
0846             # query from PanDA DB directly
0847             sql_get_task_eval = (
0848                 "SELECT tev.value_json "
0849                 "FROM ATLAS_PANDA.Task_Evaluation tev "
0850                 "WHERE tev.metric=:metric "
0851                 "AND tev.jediTaskID=:taskID "
0852                 "AND tev.timestamp>=:min_timestamp "
0853             )
0854             var_map = {
0855                 ":metric": "analy_task_eval",
0856                 ":taskID": taskID,
0857                 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0858             }
0859 
0860             res = tbIF.querySQL(sql_get_task_eval, var_map)
0861             if res:
0862                 value_json = res[0][0]
0863                 # json of data
0864                 ret_map = json.loads(value_json) if value_json else None
0865                 # make True return
0866                 ret_val = True
0867             # break trying
0868             break
0869         except Exception as e:
0870             # dump error message
0871             err_str = f"AtlasBrokerUtils.getUserTaskEval got {e.__class__.__name__}: {e} \n"
0872             sys.stderr.write(err_str)
0873             # break trying
0874             break
0875 
0876     return ret_val, ret_map
0877 
0878 
0879 # get analysis sites class
0880 def getAnalySitesClass(tbIF, fresher_than_minutes_ago=60):
0881     # initialize
0882     ret_val = False
0883     ret_map = {}
0884     # timestamps
0885     current_time = naive_utcnow()
0886     # try some times
0887     for _ in range(99):
0888         now_time = naive_utcnow()
0889         # skip if too long after original current time
0890         if now_time - current_time > datetime.timedelta(seconds=max(3, fresher_than_minutes_ago / 3)):
0891             # break trying
0892             break
0893         try:
0894             # query from PanDA DB directly
0895             sql_get_task_eval = "SELECT m.computingSite, m.value_json.class FROM ATLAS_PANDA.Metrics m WHERE m.metric=:metric AND m.timestamp>=:min_timestamp "
0896             var_map = {
0897                 ":metric": "analy_site_eval",
0898                 ":min_timestamp": now_time - datetime.timedelta(minutes=fresher_than_minutes_ago),
0899             }
0900             res = tbIF.querySQL(sql_get_task_eval, var_map)
0901             if res:
0902                 for site, class_value in res:
0903                     ret_map[site] = int(class_value)
0904                     # make True return
0905                 ret_val = True
0906             # break trying
0907             break
0908         except Exception as e:
0909             # dump error message
0910             err_str = f"AtlasBrokerUtils.getAnalySitesEval got {e.__class__.__name__}: {e} \n"
0911             sys.stderr.write(err_str)
0912             # break trying
0913             break
0914 
0915     return ret_val, ret_map
0916 
0917 
0918 def compare_version_string(version_string, comparison_string):
0919     """
0920     Compares a version string with another string composed of a comparison operator and a version string.
0921 
0922     Args:
0923         version_string (str): The version string to compare.
0924         comparison_string (str): The string containing the comparison operator and version string (e.g., ">=2.0").
0925 
0926     Returns:
0927         bool or None: True if the version string satisfies the comparison, False if it doesn't,
0928                        or None if the comparison string is invalid.
0929     """
0930     match = re.match(r"([=><]+)(.+)", comparison_string)
0931     if not match:
0932         return None
0933 
0934     operator = match.group(1).strip()
0935     version_to_compare = match.group(2).strip()
0936 
0937     try:
0938         version1 = version.parse(version_string)
0939         version2 = version.parse(version_to_compare)
0940     except version.InvalidVersion:
0941         return None
0942 
0943     if operator == "==":
0944         return version1 == version2
0945     elif operator == "!=":
0946         return version1 != version2
0947     elif operator == ">=":
0948         return version1 >= version2
0949     elif operator == "<=":
0950         return version1 <= version2
0951     elif operator == ">":
0952         return version1 > version2
0953     elif operator == "<":
0954         return version1 < version2
0955     else:
0956         return None
0957 
0958 
0959 # check SW with json
0960 class JsonSoftwareCheck:
0961     # constructor
0962     def __init__(self, site_mapper, sw_map, wn_architecture_level_map):
0963         self.siteMapper = site_mapper
0964         self.sw_map = sw_map
0965         self.wn_architecture_level_map = wn_architecture_level_map
0966 
0967     # get lists
0968     def check(
0969         self,
0970         site_list,
0971         cvmfs_tag,
0972         sw_project,
0973         sw_version,
0974         cmt_config,
0975         need_cvmfs,
0976         cmt_config_only,
0977         need_container=False,
0978         container_name=None,
0979         only_tags_fc=False,
0980         host_cpu_specs=None,
0981         host_cpu_pref=None,
0982         host_gpu_spec=None,
0983         log_stream=None,
0984     ):
0985         ok_sites = []
0986         no_auto_sites = []
0987         preference_weight_map = {}
0988 
0989         # Does the task define a preferred architecture level?
0990         preferred_architecture_level = host_cpu_pref.get("preferred_architecture_level") if host_cpu_pref else None
0991 
0992         for tmp_site_name in site_list:
0993             tmp_site_spec = self.siteMapper.getSite(tmp_site_name)
0994             if tmp_site_spec.releases == ["AUTO"] and tmp_site_name in self.sw_map:
0995                 go_ahead = False
0996                 try:
0997                     # convert to a dict
0998                     architecture_map = {}
0999                     if "architectures" in self.sw_map[tmp_site_name]:
1000                         for arch_spec in self.sw_map[tmp_site_name]["architectures"]:
1001                             if "type" in arch_spec:
1002                                 architecture_map[arch_spec["type"]] = arch_spec
1003 
1004                     # check if need CPU
1005                     if "cpu" in architecture_map:
1006                         need_cpu = False
1007                         for k in architecture_map["cpu"]:
1008                             if isinstance(architecture_map["cpu"][k], list):
1009                                 if "excl" in architecture_map["cpu"][k]:
1010                                     need_cpu = True
1011                                     break
1012                         if need_cpu and host_cpu_specs is None:
1013                             continue
1014 
1015                     # check if need GPU
1016                     if "gpu" in architecture_map:
1017                         need_gpu = False
1018                         for k in architecture_map["gpu"]:
1019                             if isinstance(architecture_map["gpu"][k], list):
1020                                 if "excl" in architecture_map["gpu"][k]:
1021                                     need_gpu = True
1022                                     break
1023                         if need_gpu and host_gpu_spec is None:
1024                             continue
1025 
1026                     # calculate the preference_weight_map for the site
1027                     if preferred_architecture_level:
1028                         preference_weight_map[tmp_site_name] = (
1029                             1 + self.wn_architecture_level_map.get(tmp_site_name, {}).get(preferred_architecture_level, {}).get("pct_within_queue", 0) / 100
1030                         )
1031 
1032                     if host_cpu_specs or host_gpu_spec:
1033                         # skip since the PQ doesn't describe HW spec
1034                         if not architecture_map:
1035                             continue
1036                         # check CPU
1037                         if host_cpu_specs:
1038                             host_ok = False
1039                             for host_cpu_spec in host_cpu_specs:
1040                                 # CPU not specified
1041                                 if "cpu" not in architecture_map:
1042                                     continue
1043 
1044                                 # check architecture
1045                                 if host_cpu_spec["arch"] == "*":
1046                                     if "excl" in architecture_map["cpu"]["arch"]:
1047                                         continue
1048                                 else:
1049                                     if "any" not in architecture_map["cpu"]["arch"]:
1050                                         if host_cpu_spec["arch"] not in architecture_map["cpu"]["arch"]:
1051                                             # check with regex
1052                                             if not [True for iii in architecture_map["cpu"]["arch"] if re.search("^" + host_cpu_spec["arch"] + "$", iii)]:
1053                                                 continue
1054 
1055                                 # check vendor
1056                                 if host_cpu_spec["vendor"] == "*":
1057                                     # task doesn't specify a vendor and PQ explicitly requests a specific vendor
1058                                     if "vendor" in architecture_map["cpu"] and "excl" in architecture_map["cpu"]["vendor"]:
1059                                         continue
1060                                 else:
1061                                     # task specifies a vendor and PQ doesn't request any specific vendor
1062                                     if "vendor" not in architecture_map["cpu"]:
1063                                         continue
1064                                     # task specifies a vendor and PQ doesn't accept any vendor or the specific vendor
1065                                     if "any" not in architecture_map["cpu"]["vendor"] and host_cpu_spec["vendor"] not in architecture_map["cpu"]["vendor"]:
1066                                         continue
1067 
1068                                 # check instruction set
1069                                 if host_cpu_spec["instr"] == "*":
1070                                     if "instr" in architecture_map["cpu"] and "excl" in architecture_map["cpu"]["instr"]:
1071                                         continue
1072                                 else:
1073                                     if "instr" not in architecture_map["cpu"]:
1074                                         continue
1075                                     if "any" not in architecture_map["cpu"]["instr"] and host_cpu_spec["instr"] not in architecture_map["cpu"]["instr"]:
1076                                         continue
1077                                 host_ok = True
1078                                 break
1079                             if not host_ok:
1080                                 continue
1081 
1082                         # check GPU
1083                         if host_gpu_spec:
1084                             # GPU not specified
1085                             if "gpu" not in architecture_map:
1086                                 continue
1087 
1088                             # check vendor
1089                             if host_gpu_spec["vendor"] == "*":
1090                                 # task doesn't specify CPU vendor and PQ explicitly requests a specific vendor
1091                                 if "vendor" in architecture_map["gpu"] and "excl" in architecture_map["gpu"]["vendor"]:
1092                                     continue
1093                             else:
1094                                 # task specifies a vendor and PQ doesn't request any specific vendor
1095                                 if "vendor" not in architecture_map["gpu"]:
1096                                     continue
1097                                 # task specifies a vendor and PQ doesn't accept any vendor or the specific vendor
1098                                 if "any" not in architecture_map["gpu"]["vendor"] and host_gpu_spec["vendor"] not in architecture_map["gpu"]["vendor"]:
1099                                     continue
1100 
1101                             # check model
1102                             if host_gpu_spec["model"] == "*":
1103                                 if "model" in architecture_map["gpu"] and "excl" in architecture_map["gpu"]["model"]:
1104                                     continue
1105                             else:
1106                                 if isinstance(host_gpu_spec["model"], dict):
1107                                     model_pattern = host_gpu_spec["model"]["pattern"]
1108                                     model_excl = host_gpu_spec["model"].get("excl", False)
1109                                 else:
1110                                     model_pattern = host_gpu_spec["model"]
1111                                     model_excl = False
1112                                 if "model" not in architecture_map["gpu"] or (
1113                                     "any" not in architecture_map["gpu"]["model"]
1114                                     and any(re.match(model_pattern, m, re.IGNORECASE) for m in architecture_map["gpu"]["model"])
1115                                     == model_excl
1116                                 ):
1117                                     continue
1118 
1119                             # check version
1120                             if "version" in host_gpu_spec:
1121                                 if "version" not in architecture_map["gpu"]:
1122                                     # PQ doesn't specify version
1123                                     continue
1124                                 elif "any" == architecture_map["gpu"]["version"]:
1125                                     # PQ accepts any version
1126                                     pass
1127                                 else:
1128                                     # check version at PQ
1129                                     if not compare_version_string(architecture_map["gpu"]["version"], host_gpu_spec["version"]):
1130                                         continue
1131                     go_ahead = True
1132                 except Exception as e:
1133                     if log_stream:
1134                         log_stream.error(f"json check {str(architecture_map)} failed for {tmp_site_name} {str(e)} {traceback.format_exc()} ")
1135                 if not go_ahead:
1136                     continue
1137                 # only HW check
1138                 if not (cvmfs_tag or cmt_config or sw_project or sw_version or container_name) and (host_cpu_specs or host_gpu_spec):
1139                     ok_sites.append(tmp_site_name)
1140                     continue
1141                 # check for fat container
1142                 if container_name:
1143                     # check for container
1144                     if not only_tags_fc and ("any" in self.sw_map[tmp_site_name]["containers"] or "/cvmfs" in self.sw_map[tmp_site_name]["containers"]):
1145                         # any in containers
1146                         ok_sites.append(tmp_site_name)
1147                     elif container_name in set([t["container_name"] for t in self.sw_map[tmp_site_name]["tags"] if t["container_name"]]):
1148                         # logical name in tags or any in containers
1149                         ok_sites.append(tmp_site_name)
1150                     elif container_name in set([s for t in self.sw_map[tmp_site_name]["tags"] for s in t["sources"] if t["sources"]]):
1151                         # full path in sources
1152                         ok_sites.append(tmp_site_name)
1153                     elif not only_tags_fc:
1154                         # get sources in all tag list
1155                         if "ALL" in self.sw_map:
1156                             source_list_in_all_tag = [s for t in self.sw_map["ALL"]["tags"] for s in t["sources"] if t["container_name"] == container_name]
1157                         else:
1158                             source_list_in_all_tag = []
1159                         # prefix with full path
1160                         for tmp_prefix in self.sw_map[tmp_site_name]["containers"]:
1161                             if container_name.startswith(tmp_prefix):
1162                                 ok_sites.append(tmp_site_name)
1163                                 break
1164                             toBreak = False
1165                             for source_in_all_tag in source_list_in_all_tag:
1166                                 if source_in_all_tag.startswith(tmp_prefix):
1167                                     ok_sites.append(tmp_site_name)
1168                                     toBreak = True
1169                                     break
1170                             if toBreak:
1171                                 break
1172                     continue
1173                 # only cmt config check
1174                 if cmt_config_only:
1175                     ok_sites.append(tmp_site_name)
1176                     continue
1177                 # check if CVMFS is available
1178                 if "any" in self.sw_map[tmp_site_name]["cvmfs"] or cvmfs_tag in self.sw_map[tmp_site_name]["cvmfs"]:
1179                     # check if container is available
1180                     if "any" in self.sw_map[tmp_site_name]["containers"] or "/cvmfs" in self.sw_map[tmp_site_name]["containers"]:
1181                         ok_sites.append(tmp_site_name)
1182                     # check cmt config
1183                     elif not need_container and cmt_config in self.sw_map[tmp_site_name]["cmtconfigs"]:
1184                         ok_sites.append(tmp_site_name)
1185                 elif not need_cvmfs:
1186                     if not need_container or "any" in self.sw_map[tmp_site_name]["containers"]:
1187                         # check tags
1188                         for tag in self.sw_map[tmp_site_name]["tags"]:
1189                             if tag["cmtconfig"] == cmt_config and tag["project"] == sw_project and tag["release"] == sw_version:
1190                                 ok_sites.append(tmp_site_name)
1191                                 break
1192                 # don't pass to subsequent check if AUTO is enabled
1193                 continue
1194             # use only AUTO for container or HW
1195             if container_name is not None or host_cpu_specs is not None or host_gpu_spec is not None:
1196                 continue
1197             no_auto_sites.append(tmp_site_name)
1198 
1199         return ok_sites, no_auto_sites, preference_weight_map
1200 
1201 
1202 # resolve cmt_config
1203 def resolve_cmt_config(queue_name: str, cmt_config: str, base_platform, sw_map: dict) -> str | None:
1204     """
1205     resolve cmt config at a given queue_name
1206     :param queue_name: queue name
1207     :param cmt_config: cmt confing to resolve
1208     :param base_platform: base platform
1209     :param sw_map: software map
1210     :return: resolved cmt config or None if unavailable or valid
1211     """
1212     # return None if queue_name is unavailable
1213     if queue_name not in sw_map:
1214         return None
1215     # return None if cmt_config is valid
1216     if cmt_config in sw_map[queue_name]["cmtconfigs"]:
1217         return None
1218     # check if cmt_config matches with any of the queue's cmt_configs
1219     for tmp_cmt_config in sw_map[queue_name]["cmtconfigs"]:
1220         if re.search("^" + cmt_config + "$", tmp_cmt_config):
1221             if base_platform:
1222                 # add base_platform if necessary
1223                 tmp_cmt_config = tmp_cmt_config + "@" + base_platform
1224             return tmp_cmt_config
1225     # return None if cmt_config is unavailable
1226     return None
1227 
1228 
1229 def check_endpoints_with_blacklist(
1230     site_spec: SiteSpec.SiteSpec, scope_input: str, scope_output: str, sites_in_nucleus: list, remote_source_available: bool
1231 ) -> str | None:
1232     """
1233     Check if site's endpoints are in the blacklist
1234 
1235     :param site_spec: site spec
1236     :param scope_input: input scope
1237     :param scope_output: output scope
1238     :param sites_in_nucleus: list of sites in nucleus
1239     :param remote_source_available: if remote source is available
1240 
1241     :return: description of blacklisted reason or None
1242     """
1243     tmp_msg = None
1244     receive_input_over_wan = False
1245     read_input_over_lan = False
1246     write_output_over_lan = False
1247     send_output_over_wan = False
1248     tmp_site_name = site_spec.sitename
1249     if scope_input in site_spec.ddm_endpoints_input:
1250         for tmp_input_endpoint in site_spec.ddm_endpoints_input[scope_input].all.values():
1251             tmp_read_input_over_lan = tmp_input_endpoint["detailed_status"].get("read_lan")
1252             tmp_receive_input_over_wan = tmp_input_endpoint["detailed_status"].get("write_wan")
1253             # can read input from local
1254             if tmp_read_input_over_lan not in ["OFF", "TEST"]:
1255                 read_input_over_lan = True
1256             # can receive input from remote to local
1257             if tmp_site_name not in sites_in_nucleus:
1258                 # satellite sites
1259                 if tmp_receive_input_over_wan not in ["OFF", "TEST"]:
1260                     receive_input_over_wan = True
1261             else:
1262                 # NA for nucleus sites
1263                 receive_input_over_wan = True
1264                 remote_source_available = True
1265     if scope_output in site_spec.ddm_endpoints_output:
1266         for tmp_output_endpoint in site_spec.ddm_endpoints_output[scope_output].all.values():
1267             tmp_write_output_over_lan = tmp_output_endpoint["detailed_status"].get("write_lan")
1268             tmp_send_output_over_wan = tmp_output_endpoint["detailed_status"].get("read_wan")
1269             # can write output to local
1270             if tmp_write_output_over_lan not in ["OFF", "TEST"]:
1271                 write_output_over_lan = True
1272             # can send output from local to remote
1273             if tmp_site_name not in sites_in_nucleus:
1274                 # satellite sites
1275                 if tmp_send_output_over_wan not in ["OFF", "TEST"]:
1276                     send_output_over_wan = True
1277             else:
1278                 # NA for nucleus sites
1279                 send_output_over_wan = True
1280                 remote_source_available = True
1281     # take the status for logging
1282     if not read_input_over_lan:
1283         tmp_msg = f"  skip site={tmp_site_name} since input endpoints cannot read over LAN, read_lan is not ON criteria=-read_lan_blacklist"
1284     elif not write_output_over_lan:
1285         tmp_msg = f"  skip site={tmp_site_name} since output endpoints cannot write over LAN, write_lan is not ON criteria=-write_lan_blacklist"
1286     elif not receive_input_over_wan:
1287         tmp_msg = f"  skip site={tmp_site_name} since input endpoints cannot receive files over WAN, write_wan is not ON criteria=-write_wan_blacklist"
1288     elif not send_output_over_wan:
1289         tmp_msg = f"  skip site={tmp_site_name} since output endpoints cannot send out files over WAN, read_wan is not ON criteria=-read_wan_blacklist"
1290     elif not remote_source_available:
1291         tmp_msg = f"  skip site={tmp_site_name} since source endpoints cannot transfer files over WAN and it is satellite criteria=-source_blacklist"
1292     return tmp_msg