Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:57

0001 import copy
0002 import datetime
0003 import math
0004 import random
0005 import re
0006 import sys
0007 import traceback
0008 
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 
0012 from pandajedi.jedicore import Interaction
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.SiteCandidate import SiteCandidate
0015 from pandaserver.dataservice.DataServiceUtils import select_scope
0016 from pandaserver.srvcore import CoreUtils
0017 from pandaserver.taskbuffer import JobUtils
0018 
0019 from . import AtlasBrokerUtils
0020 from .JobBrokerBase import JobBrokerBase
0021 
0022 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0023 
0024 APP = "jedi"
0025 COMPONENT = "jobbroker"
0026 VO = "atlas"
0027 
0028 
0029 # brokerage for ATLAS analysis
0030 class AtlasAnalJobBroker(JobBrokerBase):
0031     # constructor
0032     def __init__(self, ddmIF, taskBufferIF):
0033         JobBrokerBase.__init__(self, ddmIF, taskBufferIF)
0034         self.dataSiteMap = {}
0035 
0036         # load the SW availability map
0037         try:
0038             self.sw_map = taskBufferIF.load_sw_map()
0039         except Exception:
0040             logger.error("Failed to load the SW tags map!!!")
0041             self.sw_map = {}
0042 
0043         # load the worker node CPU architecture-level map
0044         try:
0045             self.architecture_level_map = taskBufferIF.get_architecture_level_map()
0046         except BaseException:
0047             logger.error("Failed to load the WN architecture level map!!!")
0048             self.architecture_level_map = {}
0049 
0050     # main
0051     def doBrokerage(self, taskSpec, cloudName, inputChunk, taskParamMap):
0052         # make logger
0053         if inputChunk.masterDataset:
0054             msg_tag = f"<jediTaskID={taskSpec.jediTaskID} datasetID={inputChunk.masterDataset.datasetID}>"
0055         else:
0056             msg_tag = f"<jediTaskID={taskSpec.jediTaskID}>"
0057         tmpLog = MsgWrapper(logger, msg_tag, monToken=f"<jediTaskID={taskSpec.jediTaskID} {naive_utcnow().isoformat('/')}>")
0058         tmpLog.debug("start")
0059         # return for failure
0060         retFatal = self.SC_FATAL, inputChunk
0061         retTmpError = self.SC_FAILED, inputChunk
0062         # new maxwdir
0063         newMaxwdir = {}
0064         # get primary site candidates
0065         sitePreAssigned = False
0066         siteListPreAssigned = False
0067         excludeList = []
0068         includeList = None
0069         scanSiteList = []
0070         # problematic sites
0071         problematic_sites_dict = {}
0072         # not to use VP replicas for merging, scouts, and forceStaged
0073         if inputChunk.isMerging or taskSpec.avoid_vp() or taskSpec.useScout() or taskSpec.useLocalIO():
0074             useVP = False
0075         else:
0076             useVP = True
0077         # avoid VP queues for merging
0078         avoidVP = False
0079         if inputChunk.isMerging:
0080             avoidVP = True
0081         # get workQueue
0082         workQueue = self.taskBufferIF.getWorkQueueMap().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
0083 
0084         # site limitation
0085         if taskSpec.useLimitedSites():
0086             if "excludedSite" in taskParamMap:
0087                 excludeList = taskParamMap["excludedSite"]
0088                 # str to list for task retry
0089                 try:
0090                     if not isinstance(excludeList, list):
0091                         excludeList = excludeList.split(",")
0092                 except Exception:
0093                     pass
0094             if "includedSite" in taskParamMap:
0095                 includeList = taskParamMap["includedSite"]
0096                 # str to list for task retry
0097                 if includeList == "":
0098                     includeList = None
0099                 try:
0100                     if not isinstance(includeList, list):
0101                         includeList = includeList.split(",")
0102                     siteListPreAssigned = True
0103                 except Exception:
0104                     pass
0105         # loop over all sites
0106         for siteName, tmpSiteSpec in self.siteMapper.siteSpecList.items():
0107             if tmpSiteSpec.type == "analysis" or tmpSiteSpec.is_grandly_unified():
0108                 scanSiteList.append(siteName)
0109         # preassigned
0110         preassignedSite = taskSpec.site
0111         if preassignedSite not in ["", None]:
0112             # site is pre-assigned
0113             if not self.siteMapper.checkSite(preassignedSite):
0114                 # check ddm for unknown site
0115                 includeList = []
0116                 for tmpSiteName in self.get_unified_sites(scanSiteList):
0117                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0118                     scope_input, scope_output = select_scope(tmpSiteSpec, JobUtils.ANALY_PS, JobUtils.ANALY_PS)
0119                     if scope_input in tmpSiteSpec.ddm_endpoints_input and preassignedSite in tmpSiteSpec.ddm_endpoints_input[scope_input].all:
0120                         includeList.append(tmpSiteName)
0121                 if not includeList:
0122                     includeList = None
0123                     tmpLog.info(f"site={preassignedSite} is ignored since unknown")
0124                 else:
0125                     tmpLog.info(f"site={preassignedSite} is converted to {','.join(includeList)}")
0126                 preassignedSite = None
0127             else:
0128                 tmpLog.info(f"site={preassignedSite} is pre-assigned")
0129                 sitePreAssigned = True
0130                 if preassignedSite not in scanSiteList:
0131                     scanSiteList.append(preassignedSite)
0132         tmpLog.info(f"initial {len(self.get_unified_sites(scanSiteList))} candidates")
0133         # allowed remote access protocol
0134         allowedRemoteProtocol = "fax"
0135         # MP
0136         if taskSpec.coreCount is not None and taskSpec.coreCount > 1:
0137             # use MCORE only
0138             useMP = "only"
0139         elif taskSpec.coreCount == 0:
0140             # use MCORE and normal
0141             useMP = "any"
0142         else:
0143             # not use MCORE
0144             useMP = "unuse"
0145         # get statistics of failures
0146         timeWindowForFC = self.taskBufferIF.getConfigValue("anal_jobbroker", "TW_DONE_JOB_STAT", "jedi", taskSpec.vo)
0147         if timeWindowForFC is None:
0148             timeWindowForFC = 6
0149 
0150         # get minimum bad jobs to skip PQ
0151         minBadJobsToSkipPQ = self.taskBufferIF.getConfigValue("anal_jobbroker", "MIN_BAD_JOBS_TO_SKIP_PQ", "jedi", taskSpec.vo)
0152         if minBadJobsToSkipPQ is None:
0153             minBadJobsToSkipPQ = 5
0154 
0155         # get total job stat
0156         totalJobStat = self.get_task_common("totalJobStat")
0157         if totalJobStat is None:
0158             if taskSpec.workingGroup:
0159                 totalJobStat = self.taskBufferIF.countJobsPerTarget_JEDI(taskSpec.workingGroup, False)
0160             else:
0161                 totalJobStat = self.taskBufferIF.countJobsPerTarget_JEDI(taskSpec.origUserName, True)
0162             self.set_task_common("totalJobStat", totalJobStat)
0163         # check total to cap
0164         if totalJobStat:
0165             if taskSpec.workingGroup:
0166                 gdp_token_jobs = "CAP_RUNNING_GROUP_JOBS"
0167                 gdp_token_cores = "CAP_RUNNING_GROUP_CORES"
0168             else:
0169                 gdp_token_jobs = "CAP_RUNNING_USER_JOBS"
0170                 gdp_token_cores = "CAP_RUNNING_USER_CORES"
0171             maxNumRunJobs = self.taskBufferIF.getConfigValue("prio_mgr", gdp_token_jobs)
0172             maxNumRunCores = self.taskBufferIF.getConfigValue("prio_mgr", gdp_token_cores)
0173             maxFactor = 2
0174 
0175             if maxNumRunJobs:
0176                 if totalJobStat["nRunJobs"] > maxNumRunJobs:
0177                     tmpLog.error(f"throttle to generate jobs due to too many running jobs {totalJobStat['nRunJobs']} > {gdp_token_jobs}")
0178                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0179                     return retTmpError
0180                 elif totalJobStat["nQueuedJobs"] > maxFactor * maxNumRunJobs:
0181                     tmpLog.error(f"throttle to generate jobs due to too many queued jobs {totalJobStat['nQueuedJobs']} > {maxFactor}x{gdp_token_jobs}")
0182                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0183                     return retTmpError
0184             if maxNumRunCores:
0185                 if totalJobStat["nRunCores"] > maxNumRunCores:
0186                     tmpLog.error(f"throttle to generate jobs due to too many running cores {totalJobStat['nRunCores']} > {gdp_token_cores}")
0187                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0188                     return retTmpError
0189                 elif totalJobStat["nQueuedCores"] > maxFactor * maxNumRunCores:
0190                     tmpLog.error(f"throttle to generate jobs due to too many queued cores {totalJobStat['nQueuedCores']} > {maxFactor}x{gdp_token_cores}")
0191                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0192                     return retTmpError
0193 
0194         # get gshare usage
0195         ret_val, gshare_usage_dict = AtlasBrokerUtils.getGShareUsage(tbIF=self.taskBufferIF, gshare=taskSpec.gshare)
0196         if not ret_val:
0197             tmpLog.warning(f"failed to get gshare usage of {taskSpec.gshare}")
0198         elif not gshare_usage_dict:
0199             tmpLog.warning(f"got empty gshare usage of {taskSpec.gshare}")
0200 
0201         # get L1 share usage
0202         l1_share_usage_dict = None
0203         if taskSpec.gshare == "User Analysis":
0204             l1_share_name = "L1 User Analysis"
0205             ret_val, l1_share_usage_dict = AtlasBrokerUtils.getGShareUsage(tbIF=self.taskBufferIF, gshare=l1_share_name)
0206             if not ret_val:
0207                 tmpLog.warning(f"failed to get gshare usage of {l1_share_name}")
0208             elif not l1_share_usage_dict:
0209                 tmpLog.warning(f"got empty gshare usage of {l1_share_name}")
0210 
0211         # get analy sites classification
0212         ret_val, analy_sites_class_dict = AtlasBrokerUtils.getAnalySitesClass(tbIF=self.taskBufferIF)
0213         if not ret_val:
0214             tmpLog.warning("failed to get analy sites classification")
0215         elif not analy_sites_class_dict:
0216             analy_sites_class_dict = {}
0217             tmpLog.warning("got empty analy sites classification")
0218 
0219         # get user usage
0220         ret_val, user_eval_dict = AtlasBrokerUtils.getUserEval(tbIF=self.taskBufferIF, user=taskSpec.origUserName)
0221         if not ret_val:
0222             tmpLog.warning(f"failed to get user evaluation of {taskSpec.origUserName}")
0223         elif not user_eval_dict:
0224             tmpLog.warning(f"got empty user evaluation of {taskSpec.origUserName}")
0225 
0226         # task evaluation
0227         ret_val, task_eval_dict = AtlasBrokerUtils.getUserTaskEval(tbIF=self.taskBufferIF, taskID=taskSpec.jediTaskID)
0228         if not ret_val:
0229             tmpLog.warning("failed to get user task evaluation")
0230         elif not task_eval_dict:
0231             tmpLog.warning("got empty user task evaluation")
0232 
0233         # parameters about User Analysis threshold
0234         threshold_A = self.taskBufferIF.getConfigValue("analy_eval", "USER_USAGE_THRESHOLD_A", "pandaserver", taskSpec.vo)
0235         if threshold_A is None:
0236             threshold_A = 1000
0237         threshold_B = self.taskBufferIF.getConfigValue("analy_eval", "USER_USAGE_THRESHOLD_B", "pandaserver", taskSpec.vo)
0238         if threshold_B is None:
0239             threshold_B = 10000
0240         user_analyis_to_throttle_threshold_perc_A = 100
0241         user_analyis_to_throttle_threshold_perc_B = min(95, user_analyis_to_throttle_threshold_perc_A)
0242         user_analyis_to_throttle_threshold_perc_C = min(90, user_analyis_to_throttle_threshold_perc_B)
0243         user_analyis_throttle_intensity_A = 1.0
0244 
0245         # parameters about Analysis Stabilizer
0246         base_queue_length_per_pq = self.taskBufferIF.getConfigValue("anal_jobbroker", "BASE_QUEUE_LENGTH_PER_PQ", "jedi", taskSpec.vo)
0247         if base_queue_length_per_pq is None:
0248             base_queue_length_per_pq = 100
0249         base_expected_wait_hour_on_pq = self.taskBufferIF.getConfigValue("anal_jobbroker", "BASE_EXPECTED_WAIT_HOUR_ON_PQ", "jedi", taskSpec.vo)
0250         if base_expected_wait_hour_on_pq is None:
0251             base_expected_wait_hour_on_pq = 8
0252         base_default_queue_length_per_pq_user = self.taskBufferIF.getConfigValue("anal_jobbroker", "BASE_DEFAULT_QUEUE_LENGTH_PER_PQ_USER", "jedi", taskSpec.vo)
0253         if base_default_queue_length_per_pq_user is None:
0254             base_default_queue_length_per_pq_user = 5
0255         base_queue_ratio_on_pq = self.taskBufferIF.getConfigValue("anal_jobbroker", "BASE_QUEUE_RATIO_ON_PQ", "jedi", taskSpec.vo)
0256         if base_queue_ratio_on_pq is None:
0257             base_queue_ratio_on_pq = 0.05
0258         static_max_queue_running_ratio = self.taskBufferIF.getConfigValue("anal_jobbroker", "STATIC_MAX_QUEUE_RUNNING_RATIO", "jedi", taskSpec.vo)
0259         if static_max_queue_running_ratio is None:
0260             static_max_queue_running_ratio = 2.0
0261         max_expected_wait_hour = self.taskBufferIF.getConfigValue("anal_jobbroker", "MAX_EXPECTED_WAIT_HOUR", "jedi", taskSpec.vo)
0262         if max_expected_wait_hour is None:
0263             max_expected_wait_hour = 12.0
0264 
0265         max_missing_input_files = self.taskBufferIF.getConfigValue("jobbroker", "MAX_MISSING_INPUT_FILES", "jedi", taskSpec.vo)
0266         if max_missing_input_files is None:
0267             max_missing_input_files = 10
0268         min_input_completeness = self.taskBufferIF.getConfigValue("jobbroker", "MIN_INPUT_COMPLETENESS", "jedi", taskSpec.vo)
0269         if min_input_completeness is None:
0270             min_input_completeness = 90
0271 
0272         # minimum brokerage weight
0273         min_weight_param = f"MIN_WEIGHT_{taskSpec.prodSourceLabel}_{taskSpec.gshare}"
0274         min_weight = self.taskBufferIF.getConfigValue("jobbroker", min_weight_param, "jedi", taskSpec.vo)
0275         if min_weight is None:
0276             min_weight_param = f"MIN_WEIGHT_{taskSpec.prodSourceLabel}"
0277             min_weight = self.taskBufferIF.getConfigValue("jobbroker", min_weight_param, "jedi", taskSpec.vo)
0278         if min_weight is None:
0279             min_weight = 0
0280 
0281         # throttle User Analysis tasks when close to gshare target
0282         if taskSpec.gshare in ["User Analysis"] and gshare_usage_dict and task_eval_dict:
0283             try:
0284                 usage_percent = gshare_usage_dict["usage_perc"] * 100
0285                 if l1_share_usage_dict and l1_share_usage_dict.get("eqiv_usage_perc") is not None:
0286                     usage_percent = min(usage_percent, l1_share_usage_dict["eqiv_usage_perc"] * 100)
0287                 task_class_value = task_eval_dict["class"]
0288                 usage_slot_ratio_A = 0.5
0289                 if user_eval_dict:
0290                     usage_slot_ratio_A = 1.0 - user_eval_dict["rem_slots_A"] / threshold_A
0291 
0292                 if task_class_value == -1 and usage_percent > user_analyis_to_throttle_threshold_perc_C:
0293                     # C-tasks to throttle
0294                     tmpLog.error(
0295                         "throttle to generate jobs due to gshare {gshare} > {threshold}% of target and task in class C".format(
0296                             gshare=taskSpec.gshare, threshold=user_analyis_to_throttle_threshold_perc_C
0297                         )
0298                     )
0299                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0300                     return retTmpError
0301                 elif task_class_value == 0 and usage_percent > user_analyis_to_throttle_threshold_perc_B:
0302                     # B-tasks to throttle
0303                     tmpLog.error(
0304                         "throttle to generate jobs due to gshare {gshare} > {threshold}% of target and task in class B".format(
0305                             gshare=taskSpec.gshare, threshold=user_analyis_to_throttle_threshold_perc_B
0306                         )
0307                     )
0308                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0309                     return retTmpError
0310                 elif (
0311                     task_class_value == 1 and usage_percent * usage_slot_ratio_A * user_analyis_throttle_intensity_A > user_analyis_to_throttle_threshold_perc_A
0312                 ):
0313                     # A-tasks to throttle
0314                     tmpLog.error(
0315                         "throttle to generate jobs due to gshare {gshare} > {threshold:.3%} of target and task in class A".format(
0316                             gshare=taskSpec.gshare, threshold=user_analyis_throttle_intensity_A / (usage_slot_ratio_A + 2**-20)
0317                         )
0318                     )
0319                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0320                     return retTmpError
0321             except Exception as e:
0322                 tmpLog.error(f"got error when checking low-ranked tasks to throttle; skipped : {e}")
0323 
0324         # check global disk quota
0325         if taskSpec.workingGroup:
0326             global_quota_ok, near_global_limit, quota_msg = self.ddmIF.check_global_quota(taskSpec.workingGroup)
0327             local_quota_ok, endpoints_over_local_quota = self.ddmIF.get_endpoints_over_local_quota(taskSpec.workingGroup)
0328         else:
0329             global_quota_ok, near_global_limit, quota_msg = self.ddmIF.check_global_quota(taskSpec.userName)
0330             local_quota_ok, endpoints_over_local_quota = self.ddmIF.get_endpoints_over_local_quota(taskSpec.userName)
0331 
0332         # over global quota
0333         if not global_quota_ok:
0334             tmpLog.error(f"throttle to generate jobs due to {quota_msg}")
0335             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0336             return retTmpError
0337 
0338         # close to global quota
0339         if near_global_limit and not inputChunk.isMerging:
0340             tmpLog.error(f"throttle to generate only merge jobs due to {quota_msg}")
0341             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0342             return retTmpError
0343 
0344         # cannot get local quota
0345         if not local_quota_ok:
0346             tmpLog.error(f"failed to check local quota")
0347             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0348             return retTmpError
0349 
0350         # get failure count
0351         failureCounts = self.get_task_common("failureCounts")
0352         if failureCounts is None:
0353             failureCounts = self.taskBufferIF.getFailureCountsForTask_JEDI(taskSpec.jediTaskID, timeWindowForFC)
0354             self.set_task_common("failureCounts", failureCounts)
0355 
0356         # IO intensity cutoff in kB/sec to allow input transfers
0357         io_intensity_key = "IO_INTENSITY_CUTOFF_USER"
0358         io_intensity_cutoff = self.taskBufferIF.getConfigValue("anal_jobbroker", io_intensity_key, "jedi", taskSpec.vo)
0359 
0360         # timelimit for data locality check
0361         loc_check_timeout_key = "DATA_CHECK_TIMEOUT_USER"
0362         loc_check_timeout_val = self.taskBufferIF.getConfigValue("anal_jobbroker", loc_check_timeout_key, "jedi", taskSpec.vo)
0363 
0364         # check input datasets
0365         element_map = dict()
0366         ddsList = set()
0367         complete_disk_ok = {}
0368         complete_tape_ok = {}
0369         true_complete_disk_ok = {}
0370         can_be_local_source = {}
0371         can_be_remote_source = {}
0372         list_of_complete_replica_locations = {}
0373         is_distributed_map = {}
0374         if inputChunk.getDatasets():
0375             for datasetSpec in inputChunk.getDatasets():
0376                 datasetName = datasetSpec.datasetName
0377                 isDistributed = None
0378                 if datasetName not in self.dataSiteMap:
0379                     # get the list of sites where data is available
0380                     tmpLog.info(f"getting the list of sites where {datasetName} is available")
0381                     (
0382                         tmpSt,
0383                         tmpRet,
0384                         tmp_complete_disk_ok,
0385                         tmp_complete_tape_ok,
0386                         tmp_truly_complete_disk,
0387                         tmp_can_be_local_source,
0388                         tmp_can_be_remote_source,
0389                         tmp_list_of_complete_replica_locations,
0390                     ) = AtlasBrokerUtils.get_sites_with_data(
0391                         self.get_unified_sites(scanSiteList),
0392                         self.siteMapper,
0393                         self.ddmIF,
0394                         datasetName,
0395                         element_map.get(datasetSpec.datasetName),
0396                         max_missing_input_files,
0397                         min_input_completeness,
0398                     )
0399                     if tmpSt in [Interaction.JEDITemporaryError, Interaction.JEDITimeoutError]:
0400                         tmpLog.error(f"temporary failed to get the list of sites where data is available, since {tmpRet}")
0401                         taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0402                         return retTmpError
0403                     if tmpSt == Interaction.JEDIFatalError:
0404                         tmpLog.error(f"fatal error when getting the list of sites where data is available, since {tmpRet}")
0405                         taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0406                         return retFatal
0407                     # append
0408                     self.dataSiteMap[datasetName] = tmpRet
0409                     complete_disk_ok[datasetName] = tmp_complete_disk_ok
0410                     complete_tape_ok[datasetName] = tmp_complete_tape_ok
0411                     true_complete_disk_ok[datasetName] = tmp_truly_complete_disk
0412                     can_be_local_source[datasetName] = tmp_can_be_local_source
0413                     can_be_remote_source[datasetName] = tmp_can_be_remote_source
0414                     list_of_complete_replica_locations[datasetName] = tmp_list_of_complete_replica_locations
0415                     if datasetName.startswith("ddo"):
0416                         tmpLog.debug(f" {len(tmpRet)} sites")
0417                     else:
0418                         tmpLog.debug(f" {len(tmpRet)} sites : {str(tmpRet)}")
0419                         # check if distributed
0420                         if tmpRet != {}:
0421                             isDistributed = True
0422                             for tmpMap in tmpRet.values():
0423                                 for tmpVal in tmpMap.values():
0424                                     if tmpVal["state"] == "complete":
0425                                         isDistributed = False
0426                                         break
0427                                 if not isDistributed:
0428                                     break
0429                             if isDistributed or datasetName.endswith("/"):
0430                                 # check if really distributed
0431                                 isDistributed = self.ddmIF.isDistributedDataset(datasetName)
0432                                 if isDistributed or datasetName.endswith("/"):
0433                                     isDistributed = True
0434                                     tmpLog.debug(f" {datasetName} is distributed")
0435                                     ddsList.add(datasetName)
0436                                     # disable VP since distributed datasets triggers transfers
0437                                     useVP = False
0438                                     avoidVP = True
0439                 tmp_rse_list = ",".join(list_of_complete_replica_locations[datasetName])
0440                 is_distributed_map[datasetName] = isDistributed
0441                 tmpLog.info(
0442                     f"replica_availability disk:{complete_disk_ok[datasetName]} tape:{complete_tape_ok[datasetName]}, is_distributed:{isDistributed}, remote_readable:{can_be_remote_source[datasetName]}, rses={tmp_rse_list}"
0443                 )
0444                 # check if the data is available at somewhere
0445                 if not complete_disk_ok[datasetName] and not complete_tape_ok[datasetName] and isDistributed is not True:
0446                     err_msg = f"{datasetName} is "
0447                     if list_of_complete_replica_locations[datasetName]:
0448                         tmp_is_single = len(list_of_complete_replica_locations[datasetName]) == 1
0449                         err_msg += f"only complete at {tmp_rse_list} which "
0450                         err_msg += "is " if tmp_is_single else "are "
0451                         err_msg += "currently in downtime or offline. "
0452                     else:
0453                         err_msg += "incomplete at online storage. "
0454                     if not taskSpec.allow_incomplete_input():
0455                         tmpLog.error(err_msg)
0456                         taskSpec.setErrDiag(err_msg)
0457                         retVal = retTmpError
0458                         return retVal
0459                     else:
0460                         err_msg += "However, the task allows incomplete input."
0461                         tmpLog.info(err_msg)
0462 
0463         # check if any input dataset is remotely unavailable
0464         remote_source_available = True
0465         remote_source_msg = ""
0466         for tmp_dataset_name, tmp_ok in can_be_remote_source.items():
0467             is_distributed = is_distributed_map.get(tmp_dataset_name, None)
0468             if not tmp_ok and is_distributed is not True:
0469                 remote_source_msg = f"data locality cannot be ignored since {tmp_dataset_name} is unreadable over WAN"
0470                 remote_source_available = False
0471                 break
0472 
0473         # two loops with/without data locality check
0474         scan_site_list_loops = [(copy.copy(scanSiteList), True)]
0475         to_ignore_data_loc = False
0476         if len(inputChunk.getDatasets()) > 0:
0477             nRealDS = 0
0478             for datasetSpec in inputChunk.getDatasets():
0479                 if not datasetSpec.isPseudo():
0480                     nRealDS += 1
0481             task_prio_cutoff_for_input_data_motion = 2000
0482             tmp_msg = "ignoring input data locality in the second loop due to "
0483             if taskSpec.taskPriority >= task_prio_cutoff_for_input_data_motion:
0484                 to_ignore_data_loc = True
0485                 tmp_msg += f"high taskPriority {taskSpec.taskPriority} is larger than or equal to {task_prio_cutoff_for_input_data_motion}"
0486             elif io_intensity_cutoff and taskSpec.ioIntensity and io_intensity_cutoff >= taskSpec.ioIntensity:
0487                 to_ignore_data_loc = True
0488                 tmp_msg += f"low ioIntensity {taskSpec.ioIntensity} is less than or equal to {io_intensity_key} ({io_intensity_cutoff})"
0489             elif loc_check_timeout_val and taskSpec.frozenTime and naive_utcnow() - taskSpec.frozenTime > datetime.timedelta(hours=loc_check_timeout_val):
0490                 to_ignore_data_loc = True
0491                 tmp_msg += "check timeout (last successful cycle at {} was more than {} ({}hrs) ago)".format(
0492                     taskSpec.frozenTime, loc_check_timeout_key, loc_check_timeout_val
0493                 )
0494             if not remote_source_available:
0495                 tmpLog.info(remote_source_msg)
0496             elif to_ignore_data_loc:
0497                 tmpLog.info(tmp_msg)
0498                 scan_site_list_loops.append((copy.copy(scanSiteList), False))
0499             elif taskSpec.taskPriority > 1000 or nRealDS > 1 or taskSpec.getNumSitesPerJob() > 0:
0500                 # add a loop without data locality check for high priority tasks, tasks with multiple input datasets, or tasks with job cloning
0501                 scan_site_list_loops.append((copy.copy(scanSiteList), False))
0502             # element map
0503             for datasetSpec in inputChunk.getDatasets():
0504                 if datasetSpec.datasetName.endswith("/"):
0505                     file_list = [f.lfn for f in datasetSpec.Files]
0506                     element_map[datasetSpec.datasetName] = self.taskBufferIF.get_origin_datasets(taskSpec.jediTaskID, datasetSpec.datasetName, file_list)
0507 
0508         retVal = None
0509         checkDataLocality = False
0510         scanSiteWoVP = []
0511         summaryList = []
0512         site_list_with_data = None
0513         overall_site_list = set()
0514         for i_loop, (scanSiteList, checkDataLocality) in enumerate(scan_site_list_loops):
0515             useUnionLocality = False
0516             self.init_summary_list("Job brokerage summary", f"data locality check: {checkDataLocality}", self.get_unified_sites(scanSiteList))
0517             if checkDataLocality:
0518                 tmpLog.info("!!! look for candidates WITH data locality check")
0519             else:
0520                 tmpLog.info("!!! look for candidates WITHOUT data locality check")
0521             ######################################
0522             # selection for data availability
0523             hasDDS = False
0524             dataWeight = {}
0525             remoteSourceList = {}
0526             sites_in_nucleus = []
0527             for datasetSpec in inputChunk.getDatasets():
0528                 datasetSpec.reset_distributed()
0529 
0530             if inputChunk.getDatasets() != [] and checkDataLocality:
0531                 oldScanSiteList = copy.copy(scanSiteList)
0532                 oldScanUnifiedSiteList = self.get_unified_sites(oldScanSiteList)
0533 
0534                 if ddsList:
0535                     hasDDS = True
0536 
0537                 for datasetSpec in inputChunk.getDatasets():
0538                     datasetName = datasetSpec.datasetName
0539                     if datasetName in ddsList:
0540                         datasetSpec.setDistributed()
0541 
0542                 # get the list of sites where data is available
0543                 scanSiteList = None
0544                 scanSiteListOnDisk = None
0545                 scanSiteListUnion = None
0546                 scanSiteListOnDiskUnion = None
0547                 scanSiteWoVpUnion = None
0548 
0549                 for datasetName, tmpDataSite in self.dataSiteMap.items():
0550                     # check if incomplete replica is allowed
0551                     if datasetName in ddsList:
0552                         useIncomplete = True
0553                     elif true_complete_disk_ok.get(datasetName) is False:
0554                         useIncomplete = True
0555                     else:
0556                         useIncomplete = False
0557                     # get sites where replica is available
0558                     tmpSiteList = AtlasBrokerUtils.getAnalSitesWithDataDisk(tmpDataSite, includeTape=True, use_incomplete=useIncomplete)
0559                     tmpDiskSiteList = AtlasBrokerUtils.getAnalSitesWithDataDisk(tmpDataSite, includeTape=False, use_vp=useVP, use_incomplete=useIncomplete)
0560                     tmpNonVpSiteList = AtlasBrokerUtils.getAnalSitesWithDataDisk(tmpDataSite, includeTape=True, use_vp=False, use_incomplete=useIncomplete)
0561                     # make weight map for local
0562                     for tmpSiteName in tmpSiteList:
0563                         if tmpSiteName not in dataWeight:
0564                             dataWeight[tmpSiteName] = 0
0565                         # give more weight to disk
0566                         if tmpSiteName in tmpDiskSiteList:
0567                             dataWeight[tmpSiteName] += 1
0568                         else:
0569                             dataWeight[tmpSiteName] += 0.001
0570 
0571                     # first list
0572                     if scanSiteList is None:
0573                         scanSiteList = []
0574                         for tmpSiteName in tmpSiteList:
0575                             if tmpSiteName not in oldScanUnifiedSiteList:
0576                                 continue
0577                             if tmpSiteName not in scanSiteList:
0578                                 scanSiteList.append(tmpSiteName)
0579                         scanSiteListOnDisk = set()
0580                         for tmpSiteName in tmpDiskSiteList:
0581                             if tmpSiteName not in oldScanUnifiedSiteList:
0582                                 continue
0583                             scanSiteListOnDisk.add(tmpSiteName)
0584                         scanSiteWoVP = tmpNonVpSiteList
0585                         scanSiteListUnion = set(scanSiteList)
0586                         scanSiteListOnDiskUnion = set(scanSiteListOnDisk)
0587                         scanSiteWoVpUnion = set(scanSiteWoVP)
0588                     else:
0589                         # pickup sites which have all data
0590                         newScanList = []
0591                         for tmpSiteName in tmpSiteList:
0592                             if tmpSiteName in scanSiteList and tmpSiteName not in newScanList:
0593                                 newScanList.append(tmpSiteName)
0594                             scanSiteListUnion.add(tmpSiteName)
0595                         scanSiteList = newScanList
0596                         tmpLog.debug(f"{datasetName} is available at {len(scanSiteList)} sites")
0597                         # pickup sites which have all data on DISK
0598                         newScanListOnDisk = set()
0599                         for tmpSiteName in tmpDiskSiteList:
0600                             if tmpSiteName in scanSiteListOnDisk:
0601                                 newScanListOnDisk.add(tmpSiteName)
0602                             scanSiteListOnDiskUnion.add(tmpSiteName)
0603                         scanSiteListOnDisk = newScanListOnDisk
0604                         # get common elements
0605                         scanSiteWoVP = list(set(scanSiteWoVP).intersection(tmpNonVpSiteList))
0606                         scanSiteWoVpUnion = scanSiteWoVpUnion.union(tmpNonVpSiteList)
0607                     tmpLog.info(
0608                         f"{datasetName} is available at {len(scanSiteList)} sites ({len(scanSiteListOnDisk)} sites on DISK). complete disk replica: {true_complete_disk_ok[datasetName]}, complete tape replica: {complete_tape_ok[datasetName]}"
0609                     )
0610                 # check for preassigned
0611                 if sitePreAssigned:
0612                     if preassignedSite not in scanSiteList and preassignedSite not in scanSiteListUnion:
0613                         scanSiteList = []
0614                         tmpLog.info(f"data is unavailable locally or remotely at preassigned site {preassignedSite}")
0615                     elif preassignedSite not in scanSiteList:
0616                         scanSiteList = list(scanSiteListUnion)
0617                 elif len(scanSiteListOnDisk) > 0:
0618                     # use only disk sites
0619                     scanSiteList = list(scanSiteListOnDisk)
0620                 elif not scanSiteList and scanSiteListUnion:
0621                     tmpLog.info("use union list for data locality check since no site has all data")
0622                     if scanSiteListOnDiskUnion:
0623                         scanSiteList = list(scanSiteListOnDiskUnion)
0624                     elif scanSiteListUnion:
0625                         scanSiteList = list(scanSiteListUnion)
0626                     scanSiteWoVP = list(scanSiteWoVpUnion)
0627                     useUnionLocality = True
0628                     # disable VP since union locality triggers transfers to VP
0629                     avoidVP = True
0630                 scanSiteList = self.get_pseudo_sites(scanSiteList, oldScanSiteList)
0631                 # dump
0632                 for tmpSiteName in oldScanSiteList:
0633                     if tmpSiteName not in scanSiteList:
0634                         pass
0635                 sites_in_nucleus = copy.copy(scanSiteList)
0636                 self.add_summary_message(oldScanSiteList, scanSiteList, "input data check", tmpLog, {})
0637                 if not scanSiteList:
0638                     self.dump_summary(tmpLog)
0639                     tmpLog.error("no candidates")
0640                     retVal = retTmpError
0641                     continue
0642 
0643             ######################################
0644             # selection for status
0645             newScanSiteList = []
0646             oldScanSiteList = copy.copy(scanSiteList)
0647             msg_map = {}
0648             for tmpSiteName in scanSiteList:
0649                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0650                 # skip unified queues
0651                 if tmpSiteSpec.is_unified:
0652                     continue
0653                 # check site status
0654                 skipFlag = False
0655                 if tmpSiteSpec.status in ["offline"]:
0656                     skipFlag = True
0657                 elif tmpSiteSpec.status in ["brokeroff", "test"]:
0658                     if siteListPreAssigned:
0659                         pass
0660                     elif not sitePreAssigned:
0661                         skipFlag = True
0662                     elif preassignedSite not in [tmpSiteName, tmpSiteSpec.get_unified_name()]:
0663                         skipFlag = True
0664                 if not skipFlag:
0665                     newScanSiteList.append(tmpSiteName)
0666                 else:
0667                     tmp_unified_name = tmpSiteSpec.get_unified_name()
0668                     msg_map[tmp_unified_name] = f"  skip site={tmp_unified_name} due to status={tmpSiteSpec.status} criteria=-status"
0669             scanSiteList = newScanSiteList
0670             self.add_summary_message(oldScanSiteList, scanSiteList, "status check", tmpLog, msg_map)
0671             if not scanSiteList:
0672                 self.dump_summary(tmpLog)
0673                 tmpLog.error("no candidates")
0674                 retVal = retTmpError
0675                 continue
0676 
0677             ######################################
0678             # selection for fairshare
0679             if not sitePreAssigned:
0680                 newScanSiteList = []
0681                 oldScanSiteList = copy.copy(scanSiteList)
0682                 msg_map = {}
0683                 for tmpSiteName in scanSiteList:
0684                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0685                     # check at the site
0686                     if AtlasBrokerUtils.hasZeroShare(tmpSiteSpec, taskSpec, inputChunk.isMerging, tmpLog):
0687                         msg_map[tmpSiteSpec.get_unified_name()] = f"  skip site={tmpSiteSpec.get_unified_name()} due to zero share criteria=-zeroshare"
0688                         continue
0689                     newScanSiteList.append(tmpSiteName)
0690                 scanSiteList = newScanSiteList
0691                 self.add_summary_message(oldScanSiteList, scanSiteList, "zero share check", tmpLog, msg_map)
0692                 if not scanSiteList:
0693                     self.dump_summary(tmpLog)
0694                     tmpLog.error("no candidates")
0695                     retVal = retTmpError
0696                     continue
0697 
0698             ######################################
0699             # selection for iointensity limits
0700             # get default disk IO limit from GDP config
0701             max_diskio_per_core_default = self.taskBufferIF.getConfigValue(COMPONENT, "MAX_DISKIO_DEFAULT", APP, VO)
0702             if not max_diskio_per_core_default:
0703                 max_diskio_per_core_default = 10**10
0704 
0705             # get the current disk IO usage per site
0706             diskio_percore_usage = self.taskBufferIF.getAvgDiskIO_JEDI()
0707             unified_site_list = self.get_unified_sites(scanSiteList)
0708             newScanSiteList = []
0709             oldScanSiteList = copy.copy(scanSiteList)
0710             msg_map = {}
0711             for tmpSiteName in unified_site_list:
0712                 tmp_site_spec = self.siteMapper.getSite(tmpSiteName)
0713 
0714                 # measured diskIO at queue
0715                 diskio_usage_tmp = diskio_percore_usage.get(tmpSiteName, 0)
0716 
0717                 # figure out queue or default limit
0718                 if tmp_site_spec.maxDiskio and tmp_site_spec.maxDiskio > 0:
0719                     # there is a limit specified in AGIS
0720                     diskio_limit_tmp = tmp_site_spec.maxDiskio
0721                 else:
0722                     # we need to use the default value from GDP Config
0723                     diskio_limit_tmp = max_diskio_per_core_default
0724 
0725                 # normalize task diskIO by site corecount
0726                 diskio_task_tmp = taskSpec.diskIO
0727                 if taskSpec.diskIO is not None and taskSpec.coreCount not in [None, 0, 1] and tmp_site_spec.coreCount not in [None, 0]:
0728                     diskio_task_tmp = taskSpec.diskIO / tmp_site_spec.coreCount
0729 
0730                 try:  # generate a log message parseable by logstash for monitoring
0731                     log_msg = f"diskIO measurements: site={tmpSiteName} jediTaskID={taskSpec.jediTaskID} "
0732                     if diskio_task_tmp is not None:
0733                         log_msg += f"diskIO_task={diskio_task_tmp:.2f} "
0734                     if diskio_usage_tmp is not None:
0735                         log_msg += f"diskIO_site_usage={diskio_usage_tmp:.2f} "
0736                     if diskio_limit_tmp is not None:
0737                         log_msg += f"diskIO_site_limit={diskio_limit_tmp:.2f} "
0738                     # tmpLog.info(log_msg)
0739                 except Exception:
0740                     tmpLog.debug("diskIO measurements: Error generating diskIO message")
0741 
0742                 # if the task has a diskIO defined, the queue is over the IO limit and the task IO is over the limit
0743                 if diskio_task_tmp and diskio_usage_tmp and diskio_limit_tmp and diskio_usage_tmp > diskio_limit_tmp and diskio_task_tmp > diskio_limit_tmp:
0744                     msg_map[tmpSiteName] = f"  skip site={tmpSiteName} due to diskIO overload criteria=-diskIO"
0745                     continue
0746 
0747                 newScanSiteList.append(tmpSiteName)
0748 
0749             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0750 
0751             self.add_summary_message(oldScanSiteList, scanSiteList, "diskIO check", tmpLog, msg_map)
0752             if not scanSiteList:
0753                 self.dump_summary(tmpLog)
0754                 tmpLog.error("no candidates")
0755                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0756                 retVal = retTmpError
0757                 continue
0758 
0759             ######################################
0760             # selection for VP
0761             if taskSpec.avoid_vp() or avoidVP or not checkDataLocality:
0762                 newScanSiteList = []
0763                 oldScanSiteList = copy.copy(scanSiteList)
0764                 msg_map = {}
0765                 for tmpSiteName in scanSiteList:
0766                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0767                     if not tmpSiteSpec.use_vp(JobUtils.ANALY_PS):
0768                         newScanSiteList.append(tmpSiteName)
0769                     else:
0770                         msg_map[tmpSiteSpec.get_unified_name()] = f"  skip site={tmpSiteSpec.get_unified_name()} to avoid VP"
0771                 scanSiteList = newScanSiteList
0772                 self.add_summary_message(oldScanSiteList, scanSiteList, "avoid VP queue check", tmpLog, msg_map)
0773                 if not scanSiteList:
0774                     self.dump_summary(tmpLog)
0775                     tmpLog.error("no candidates")
0776                     retVal = retTmpError
0777                     continue
0778             ######################################
0779             # selection for MP
0780             newScanSiteList = []
0781             oldScanSiteList = copy.copy(scanSiteList)
0782             msg_map = {}
0783             max_core_count = taskSpec.get_max_core_count()
0784             for tmpSiteName in scanSiteList:
0785                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0786                 # check at the site
0787                 if useMP == "any" or (useMP == "only" and tmpSiteSpec.coreCount > 1) or (useMP == "unuse" and tmpSiteSpec.coreCount in [0, 1, None]):
0788                     if max_core_count and tmpSiteSpec.coreCount and tmpSiteSpec.coreCount > max_core_count:
0789                         msg_map[tmpSiteSpec.get_unified_name()] = (
0790                             f"  skip site={tmpSiteSpec.get_unified_name()} due to larger core count "
0791                             f"site:{tmpSiteSpec.coreCount} than task_max={max_core_count} criteria=-max_cpucore"
0792                         )
0793                     else:
0794                         newScanSiteList.append(tmpSiteName)
0795                 else:
0796                     msg_map[tmpSiteSpec.get_unified_name()] = "  skip site=%s due to core mismatch cores_site=%s <> cores_task=%s criteria=-cpucore" % (
0797                         tmpSiteSpec.get_unified_name(),
0798                         tmpSiteSpec.coreCount,
0799                         taskSpec.coreCount,
0800                     )
0801             scanSiteList = newScanSiteList
0802             self.add_summary_message(oldScanSiteList, scanSiteList, "CPU core check", tmpLog, msg_map)
0803             if not scanSiteList:
0804                 self.dump_summary(tmpLog)
0805                 tmpLog.error("no candidates")
0806                 retVal = retTmpError
0807                 continue
0808             ######################################
0809             # selection for GPU + architecture
0810             newScanSiteList = []
0811             oldScanSiteList = copy.copy(scanSiteList)
0812             msg_map = {}
0813             jsonCheck = AtlasBrokerUtils.JsonSoftwareCheck(self.siteMapper, self.sw_map, self.architecture_level_map)
0814             for tmpSiteName in scanSiteList:
0815                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0816                 if tmpSiteSpec.isGPU() and not taskSpec.is_hpo_workflow():
0817                     if taskSpec.get_sw_platform() in ["", None]:
0818                         msg_map[tmpSiteSpec.get_unified_name()] = f"  skip site={tmpSiteSpec.get_unified_name()} since architecture is required for GPU queues"
0819                         continue
0820                     siteListWithCMTCONFIG = [tmpSiteSpec.get_unified_name()]
0821                     siteListWithCMTCONFIG, sitesNoJsonCheck, _ = jsonCheck.check(
0822                         siteListWithCMTCONFIG, None, None, None, taskSpec.get_sw_platform(), False, True
0823                     )
0824 
0825                     if len(siteListWithCMTCONFIG) == 0:
0826                         msg_map[tmpSiteSpec.get_unified_name()] = (
0827                             f"  skip site={tmpSiteSpec.get_unified_name()} since architecture={taskSpec.get_sw_platform()} is unavailable"
0828                         )
0829                         continue
0830 
0831                 newScanSiteList.append(tmpSiteName)
0832             scanSiteList = newScanSiteList
0833             self.add_summary_message(oldScanSiteList, scanSiteList, "architecture check", tmpLog, msg_map)
0834             if not scanSiteList:
0835                 self.dump_summary(tmpLog)
0836                 tmpLog.error("no candidates")
0837                 retVal = retTmpError
0838                 continue
0839             ######################################
0840             # selection for closed
0841             if not sitePreAssigned and not inputChunk.isMerging:
0842                 oldScanSiteList = copy.copy(scanSiteList)
0843                 newScanSiteList = []
0844                 msg_map = {}
0845                 for tmpSiteName in self.get_unified_sites(scanSiteList):
0846                     if tmpSiteName in failureCounts and "closed" in failureCounts[tmpSiteName]:
0847                         nClosed = failureCounts[tmpSiteName]["closed"]
0848                         if nClosed > 0:
0849                             msg_map[tmpSiteName] = f"  skip site={tmpSiteName} due to n_closed={nClosed} criteria=-closed"
0850                             continue
0851                     newScanSiteList.append(tmpSiteName)
0852                 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0853                 self.add_summary_message(oldScanSiteList, scanSiteList, "too many closed check", tmpLog, msg_map)
0854                 if not scanSiteList:
0855                     self.dump_summary(tmpLog)
0856                     tmpLog.error("no candidates")
0857                     retVal = retTmpError
0858                     continue
0859             ######################################
0860             # selection for release
0861             cmt_config = taskSpec.get_sw_platform()
0862             is_regexp_cmt_config = False
0863             if cmt_config:
0864                 if re.match(cmt_config, cmt_config) is None:
0865                     is_regexp_cmt_config = True
0866             base_platform = taskSpec.get_base_platform()
0867             resolved_platforms = {}
0868 
0869             host_cpu_spec = taskSpec.get_host_cpu_spec()
0870             host_cpu_pref = taskSpec.get_host_cpu_preference()
0871             host_gpu_spec = taskSpec.get_host_gpu_spec()
0872 
0873             if not sitePreAssigned:
0874                 unified_site_list = self.get_unified_sites(scanSiteList)
0875                 if taskSpec.transHome is not None:
0876                     transHome = taskSpec.transHome
0877                 else:
0878                     transHome = ""
0879                 # remove AnalysisTransforms-
0880                 transHome = re.sub("^[^-]+-*", "", transHome)
0881                 transHome = re.sub("_", "-", transHome)
0882                 if (
0883                     re.search("rel_\d+(\n|$)", transHome) is None
0884                     and taskSpec.transHome not in ["AnalysisTransforms", None]
0885                     and re.search("\d{4}-\d{2}-\d{2}T\d{4}$", transHome) is None
0886                     and re.search("-\d+\.\d+\.\d+$", transHome) is None
0887                 ):
0888                     # cache is checked
0889                     siteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0890                         unified_site_list,
0891                         "atlas",
0892                         transHome.split("-")[0],
0893                         transHome.split("-")[1],
0894                         taskSpec.get_sw_platform(),
0895                         False,
0896                         False,
0897                         container_name=taskSpec.container_name,
0898                         only_tags_fc=taskSpec.use_only_tags_fc(),
0899                         host_cpu_specs=host_cpu_spec,
0900                         host_cpu_pref=host_cpu_pref,
0901                         host_gpu_spec=host_gpu_spec,
0902                         log_stream=tmpLog,
0903                     )
0904                     sitesAuto = copy.copy(siteListWithSW)
0905 
0906                 elif (transHome == "" and taskSpec.transUses is not None) or (
0907                     re.search("-\d+\.\d+\.\d+$", transHome) is not None and (taskSpec.transUses is None or re.search("-\d+\.\d+$", taskSpec.transUses) is None)
0908                 ):
0909                     siteListWithSW = []
0910                     sitesNoJsonCheck = unified_site_list
0911                     # remove Atlas-
0912                     if taskSpec.transUses is not None:
0913                         transUses = taskSpec.transUses.split("-")[-1]
0914                     else:
0915                         transUses = None
0916                     if transUses is not None:
0917                         # release is checked
0918                         tmpSiteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0919                             unified_site_list,
0920                             "atlas",
0921                             "AtlasOffline",
0922                             transUses,
0923                             taskSpec.get_sw_platform(),
0924                             False,
0925                             False,
0926                             container_name=taskSpec.container_name,
0927                             only_tags_fc=taskSpec.use_only_tags_fc(),
0928                             host_cpu_specs=host_cpu_spec,
0929                             host_gpu_spec=host_gpu_spec,
0930                             log_stream=tmpLog,
0931                         )
0932                         siteListWithSW += tmpSiteListWithSW
0933                     if len(transHome.split("-")) == 2:
0934                         tmpSiteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0935                             sitesNoJsonCheck,
0936                             "atlas",
0937                             transHome.split("-")[0],
0938                             transHome.split("-")[1],
0939                             taskSpec.get_sw_platform(),
0940                             False,
0941                             False,
0942                             container_name=taskSpec.container_name,
0943                             only_tags_fc=taskSpec.use_only_tags_fc(),
0944                             host_cpu_specs=host_cpu_spec,
0945                             host_gpu_spec=host_gpu_spec,
0946                             log_stream=tmpLog,
0947                         )
0948                         siteListWithSW += tmpSiteListWithSW
0949                     sitesAuto = copy.copy(siteListWithSW)
0950 
0951                 else:
0952                     # nightlies or standalone uses only AUTO
0953                     if taskSpec.transHome is not None:
0954                         # CVMFS check for nightlies
0955                         siteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0956                             unified_site_list,
0957                             "nightlies",
0958                             None,
0959                             None,
0960                             taskSpec.get_sw_platform(),
0961                             True,
0962                             False,
0963                             container_name=taskSpec.container_name,
0964                             only_tags_fc=taskSpec.use_only_tags_fc(),
0965                             host_cpu_specs=host_cpu_spec,
0966                             host_gpu_spec=host_gpu_spec,
0967                             log_stream=tmpLog,
0968                         )
0969                         sitesAuto = copy.copy(siteListWithSW)
0970 
0971                     else:
0972                         # no CVMFS check for standalone SW
0973                         siteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0974                             unified_site_list,
0975                             None,
0976                             None,
0977                             None,
0978                             taskSpec.get_sw_platform(),
0979                             False,
0980                             True,
0981                             container_name=taskSpec.container_name,
0982                             only_tags_fc=taskSpec.use_only_tags_fc(),
0983                             host_cpu_specs=host_cpu_spec,
0984                             host_gpu_spec=host_gpu_spec,
0985                             log_stream=tmpLog,
0986                         )
0987                         sitesAuto = copy.copy(siteListWithSW)
0988 
0989                 newScanSiteList = []
0990                 oldScanSiteList = copy.copy(scanSiteList)
0991                 sitesAny = []
0992                 msg_map = {}
0993                 for tmpSiteName in unified_site_list:
0994                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0995                     if cmt_config:
0996                         platforms = AtlasBrokerUtils.resolve_cmt_config(tmpSiteName, cmt_config, base_platform, self.sw_map)
0997                         if platforms:
0998                             resolved_platforms[tmpSiteName] = platforms
0999                     if tmpSiteName in siteListWithSW:
1000                         # passed
1001                         if not is_regexp_cmt_config or tmpSiteName in resolved_platforms:
1002                             newScanSiteList.append(tmpSiteName)
1003                         else:
1004                             # cmtconfig is not resolved
1005                             msg_map[tmpSiteName] = f"  skip site={tmpSiteName} due to unresolved regexp in cmtconfig={cmt_config} criteria=-regexpcmtconfig"
1006                     elif host_cpu_spec is None and host_gpu_spec is None and tmpSiteSpec.releases == ["ANY"]:
1007                         # release check is disabled or release is available
1008                         newScanSiteList.append(tmpSiteName)
1009                         sitesAny.append(tmpSiteName)
1010                     else:
1011                         # release is unavailable
1012                         msg_map[tmpSiteName] = (
1013                             f"  skip site={tmpSiteName} due to missing SW cache={taskSpec.transHome}:{taskSpec.get_sw_platform()} container_name='{taskSpec.container_name}' "
1014                             f"or irrelevant HW cpu={str(host_cpu_spec)} gpu={str(host_gpu_spec)} criteria=-cache"
1015                         )
1016                 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1017                 self.add_summary_message(oldScanSiteList, scanSiteList, "SW/HW check", tmpLog, msg_map)
1018                 tmpLog.info(f"   {len(sitesAuto)} with AUTO, {len(sitesAny)} with ANY")
1019                 if not scanSiteList:
1020                     self.dump_summary(tmpLog)
1021                     tmpLog.error("no candidates")
1022                     retVal = retTmpError
1023                     continue
1024             ######################################
1025             # selection for memory
1026             origMinRamCount = inputChunk.getMaxRamCount()
1027             if origMinRamCount not in [0, None]:
1028                 newScanSiteList = []
1029                 oldScanSiteList = copy.copy(scanSiteList)
1030                 msg_map = {}
1031                 for tmpSiteName in scanSiteList:
1032                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1033                     # scale RAM by nCores
1034                     minRamCount = origMinRamCount
1035                     if taskSpec.ramPerCore() and not inputChunk.isMerging:
1036                         if tmpSiteSpec.coreCount not in [None, 0]:
1037                             minRamCount = origMinRamCount * tmpSiteSpec.coreCount
1038                     minRamCount = JobUtils.compensate_ram_count(minRamCount)
1039                     # site max memory requirement
1040                     site_maxmemory = 0
1041                     if tmpSiteSpec.maxrss not in [0, None]:
1042                         site_maxmemory = tmpSiteSpec.maxrss
1043                     if site_maxmemory not in [0, None] and minRamCount != 0 and minRamCount > site_maxmemory:
1044                         msg_map[tmpSiteSpec.get_unified_name()] = (
1045                             f"  skip site={tmpSiteSpec.get_unified_name()} due to insufficient RAM less than job's core-scaled requirement {minRamCount} MB criteria=-lowmemory"
1046                         )
1047                         continue
1048                     # site min memory requirement
1049                     site_minmemory = 0
1050                     if tmpSiteSpec.minrss not in [0, None]:
1051                         site_minmemory = tmpSiteSpec.minrss
1052                     if site_minmemory not in [0, None] and minRamCount != 0 and minRamCount < site_minmemory:
1053                         msg_map[tmpSiteSpec.get_unified_name()] = (
1054                             f"  skip site={tmpSiteSpec.get_unified_name()} due to RAM lower limit greater than job's core-scaled requirement {minRamCount} MB criteria=-highmemory"
1055                         )
1056                         continue
1057                     newScanSiteList.append(tmpSiteName)
1058                 scanSiteList = newScanSiteList
1059                 ramUnit = taskSpec.ramUnit
1060                 if ramUnit is None:
1061                     ramUnit = "MB"
1062                 self.add_summary_message(oldScanSiteList, scanSiteList, "memory check", tmpLog, msg_map)
1063                 if not scanSiteList:
1064                     self.dump_summary(tmpLog)
1065                     tmpLog.error("no candidates")
1066                     retVal = retTmpError
1067                     continue
1068             ######################################
1069             # selection for scratch disk
1070             tmpMaxAtomSize = inputChunk.getMaxAtomSize()
1071             if not inputChunk.isMerging:
1072                 tmpEffAtomSize = inputChunk.getMaxAtomSize(effectiveSize=True)
1073                 tmpOutDiskSize = taskSpec.getOutDiskSize()
1074                 tmpWorkDiskSize = taskSpec.getWorkDiskSize()
1075                 minDiskCountS = tmpOutDiskSize * tmpEffAtomSize + tmpWorkDiskSize + tmpMaxAtomSize
1076                 minDiskCountS = minDiskCountS // 1024 // 1024
1077                 maxSizePerJob = taskSpec.getMaxSizePerJob()
1078                 if maxSizePerJob is None:
1079                     maxSizePerJob = None
1080                 else:
1081                     maxSizePerJob //= 1024 * 1024
1082                 # size for direct IO sites
1083                 minDiskCountR = tmpOutDiskSize * tmpEffAtomSize + tmpWorkDiskSize
1084                 minDiskCountR = minDiskCountR // 1024 // 1024
1085                 tmpLog.info(f"maxAtomSize={tmpMaxAtomSize} effectiveAtomSize={tmpEffAtomSize} outDiskCount={tmpOutDiskSize} workDiskSize={tmpWorkDiskSize}")
1086             else:
1087                 maxSizePerJob = None
1088                 minDiskCountS = 2 * tmpMaxAtomSize // 1024 // 1024
1089                 minDiskCountR = "NA"
1090             tmpLog.info(f"minDiskCountScratch={minDiskCountS} minDiskCountRemote={minDiskCountR} nGBPerJobInMB={maxSizePerJob}")
1091             newScanSiteList = []
1092             oldScanSiteList = copy.copy(scanSiteList)
1093             msg_map = {}
1094             for tmpSiteName in self.get_unified_sites(scanSiteList):
1095                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1096                 # check at the site
1097                 if tmpSiteSpec.maxwdir:
1098                     if CoreUtils.use_direct_io_for_job(taskSpec, tmpSiteSpec, inputChunk):
1099                         minDiskCount = minDiskCountR
1100                         if maxSizePerJob is not None and not taskSpec.useLocalIO():
1101                             tmpMinDiskCountR = tmpOutDiskSize * maxSizePerJob + tmpWorkDiskSize
1102                             tmpMinDiskCountR /= 1024 * 1024
1103                             if tmpMinDiskCountR > minDiskCount:
1104                                 minDiskCount = tmpMinDiskCountR
1105                     else:
1106                         minDiskCount = minDiskCountS
1107                         if maxSizePerJob is not None and maxSizePerJob > minDiskCount:
1108                             minDiskCount = maxSizePerJob
1109 
1110                     # get site and task corecount to scale maxwdir
1111                     if tmpSiteSpec.coreCount in [None, 0, 1]:
1112                         site_cc = 1
1113                     else:
1114                         site_cc = tmpSiteSpec.coreCount
1115 
1116                     if taskSpec.coreCount in [None, 0, 1]:
1117                         task_cc = 1
1118                     else:
1119                         task_cc = site_cc
1120 
1121                     maxwdir_scaled = tmpSiteSpec.maxwdir * task_cc / site_cc
1122 
1123                     if minDiskCount > maxwdir_scaled:
1124                         msg_map[tmpSiteName] = f"  skip site={tmpSiteName} due to small scratch disk={maxwdir_scaled} < {minDiskCount} criteria=-disk"
1125                         continue
1126                     newMaxwdir[tmpSiteName] = maxwdir_scaled
1127                 newScanSiteList.append(tmpSiteName)
1128             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1129             self.add_summary_message(oldScanSiteList, scanSiteList, "scratch disk check", tmpLog, msg_map)
1130             if not scanSiteList:
1131                 self.dump_summary(tmpLog)
1132                 tmpLog.error("no candidates")
1133                 retVal = retTmpError
1134                 continue
1135             ######################################
1136             # selection for available space in SE
1137             newScanSiteList = []
1138             oldScanSiteList = copy.copy(scanSiteList)
1139             msg_map = {}
1140             for tmpSiteName in self.get_unified_sites(scanSiteList):
1141                 # check endpoint
1142                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1143                 scope_input, scope_output = select_scope(tmpSiteSpec, JobUtils.ANALY_PS, JobUtils.ANALY_PS)
1144                 if scope_output not in tmpSiteSpec.ddm_endpoints_output:
1145                     msg_map[tmpSiteName] = f"  skip site={tmpSiteName} since {scope_output} output endpoint undefined criteria=-disk"
1146                     continue
1147                 tmp_output_endpoint = tmpSiteSpec.ddm_endpoints_output[scope_output].getEndPoint(tmpSiteSpec.ddm_output[scope_output])
1148                 if tmp_output_endpoint is not None:
1149                     # free space must be >= 200GB
1150                     diskThreshold = 200
1151                     tmpSpaceSize = 0
1152                     if tmp_output_endpoint["space_expired"] is not None:
1153                         tmpSpaceSize += tmp_output_endpoint["space_expired"]
1154                     if tmp_output_endpoint["space_free"] is not None:
1155                         tmpSpaceSize += tmp_output_endpoint["space_free"]
1156                     if (
1157                         tmpSpaceSize < diskThreshold and "skip_RSE_check" not in tmpSiteSpec.catchall
1158                     ):  # skip_RSE_check: exceptional bypass of RSEs without storage reporting
1159                         msg_map[tmpSiteName] = f"  skip site={tmpSiteName} due to disk shortage in SE {tmpSpaceSize} < {diskThreshold}GB criteria=-disk"
1160                         continue
1161                 # check if blacklisted
1162                 tmp_msg = AtlasBrokerUtils.check_endpoints_with_blacklist(tmpSiteSpec, scope_input, scope_output, sites_in_nucleus, remote_source_available)
1163                 if tmp_msg is not None:
1164                     msg_map[tmpSiteName] = f"  skip site={tmpSiteName} since {tmpSiteSpec.ddm_output[scope_output]} is blacklisted in DDM criteria=-blacklist"
1165                     continue
1166                 # local quota
1167                 if tmpSiteSpec.ddm_output[scope_output] in endpoints_over_local_quota:
1168                     msg_map[tmpSiteName] = f"  skip site={tmpSiteName} since {tmpSiteSpec.ddm_output[scope_output]} is over local quota criteria=-local_quota"
1169                     continue
1170                 newScanSiteList.append(tmpSiteName)
1171             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1172             self.add_summary_message(oldScanSiteList, scanSiteList, "storage space check", tmpLog, msg_map)
1173             if not scanSiteList:
1174                 self.dump_summary(tmpLog)
1175                 tmpLog.error("no candidates")
1176                 retVal = retTmpError
1177                 continue
1178             ######################################
1179             # selection for walltime
1180             if not taskSpec.useHS06():
1181                 tmpMaxAtomSize = inputChunk.getMaxAtomSize(effectiveSize=True)
1182                 if taskSpec.walltime is not None:
1183                     minWalltime = taskSpec.walltime * tmpMaxAtomSize
1184                 else:
1185                     minWalltime = None
1186                 strMinWalltime = f"walltime*inputSize={taskSpec.walltime}*{tmpMaxAtomSize}"
1187             else:
1188                 tmpMaxAtomSize = inputChunk.getMaxAtomSize(getNumEvents=True)
1189                 if taskSpec.getCpuTime() is not None:
1190                     minWalltime = taskSpec.getCpuTime() * tmpMaxAtomSize
1191                 else:
1192                     minWalltime = None
1193                 strMinWalltime = f"cpuTime*nEventsPerJob={taskSpec.getCpuTime()}*{tmpMaxAtomSize}"
1194             if minWalltime and minWalltime > 0 and not inputChunk.isMerging:
1195                 newScanSiteList = []
1196                 oldScanSiteList = copy.copy(scanSiteList)
1197                 msg_map = {}
1198                 for tmpSiteName in scanSiteList:
1199                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1200                     siteMaxTime = tmpSiteSpec.maxtime
1201                     origSiteMaxTime = siteMaxTime
1202                     # check max walltime at the site
1203                     tmpSiteStr = f"{siteMaxTime}"
1204                     if taskSpec.useHS06():
1205                         oldSiteMaxTime = siteMaxTime
1206                         siteMaxTime -= taskSpec.baseWalltime
1207                         tmpSiteStr = f"({oldSiteMaxTime}-{taskSpec.baseWalltime})"
1208                     if siteMaxTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1209                         siteMaxTime *= tmpSiteSpec.coreCount
1210                         tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1211                     if taskSpec.useHS06():
1212                         if siteMaxTime not in [None, 0]:
1213                             siteMaxTime *= tmpSiteSpec.corepower
1214                             tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1215                         siteMaxTime *= float(taskSpec.cpuEfficiency) / 100.0
1216                         siteMaxTime = int(siteMaxTime)
1217                         tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1218                     if origSiteMaxTime != 0 and minWalltime and minWalltime > siteMaxTime:
1219                         tmpMsg = f"  skip site={tmpSiteName} due to short site walltime {tmpSiteStr} (site upper limit) less than {strMinWalltime} "
1220                         tmpMsg += "criteria=-shortwalltime"
1221                         msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1222                         continue
1223                     # check min walltime at the site
1224                     siteMinTime = tmpSiteSpec.mintime
1225                     origSiteMinTime = siteMinTime
1226                     tmpSiteStr = f"{siteMinTime}"
1227                     if taskSpec.useHS06():
1228                         oldSiteMinTime = siteMinTime
1229                         siteMinTime -= taskSpec.baseWalltime
1230                         tmpSiteStr = f"({oldSiteMinTime}-{taskSpec.baseWalltime})"
1231                     if siteMinTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1232                         siteMinTime *= tmpSiteSpec.coreCount
1233                         tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1234                     if taskSpec.useHS06():
1235                         if siteMinTime not in [None, 0]:
1236                             siteMinTime *= tmpSiteSpec.corepower
1237                             tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1238                         siteMinTime *= float(taskSpec.cpuEfficiency) / 100.0
1239                         siteMinTime = int(siteMinTime)
1240                         tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1241                     if origSiteMinTime != 0 and (minWalltime is None or minWalltime < siteMinTime):
1242                         tmpMsg = f"  skip site {tmpSiteName} due to short job walltime {tmpSiteStr} (site lower limit) greater than {strMinWalltime} "
1243                         tmpMsg += "criteria=-longwalltime"
1244                         msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1245                         continue
1246                     newScanSiteList.append(tmpSiteName)
1247                 scanSiteList = newScanSiteList
1248                 if not taskSpec.useHS06():
1249                     tmp_unit = taskSpec.walltimeUnit + "PerMB" if taskSpec.walltimeUnit else "kSI2ksecondsPerMB"
1250                     tmpLog.info(f"walltime check with {strMinWalltime}({tmp_unit})")
1251                 else:
1252                     tmpLog.info(f"walltime check with {strMinWalltime}({taskSpec.cpuTimeUnit}*nEventsPerJob)")
1253                 self.add_summary_message(oldScanSiteList, scanSiteList, "walltime check", tmpLog, msg_map)
1254                 if not scanSiteList:
1255                     self.dump_summary(tmpLog)
1256                     tmpLog.error("no candidates")
1257                     retVal = retTmpError
1258                     continue
1259 
1260             ######################################
1261             # selection for zero walltime: scouts, merges, and walltime-undefined jobs
1262             # must only go to sites with at least 24hr*10HS06s of available walltime
1263             if (
1264                 (not sitePreAssigned and inputChunk.useScout())
1265                 or (not taskSpec.walltime and not taskSpec.walltimeUnit and not taskSpec.cpuTimeUnit)
1266                 or (not taskSpec.getCpuTime() and taskSpec.cpuTimeUnit)
1267             ):
1268                 newScanSiteList = []
1269                 oldScanSiteList = copy.copy(scanSiteList)
1270                 msg_map = {}
1271                 for tmpSiteName in scanSiteList:
1272                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1273                     siteMaxTime = tmpSiteSpec.maxtime
1274                     tmpSiteStr = f"{siteMaxTime}"
1275                     if taskSpec.useHS06():
1276                         oldSiteMaxTime = siteMaxTime
1277                         siteMaxTime -= taskSpec.baseWalltime
1278                         tmpSiteStr = f"({oldSiteMaxTime}-{taskSpec.baseWalltime})"
1279                     if siteMaxTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1280                         siteMaxTime *= tmpSiteSpec.coreCount
1281                         tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1282                     if taskSpec.useHS06():
1283                         if siteMaxTime not in [None, 0]:
1284                             siteMaxTime *= tmpSiteSpec.corepower
1285                             tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1286                         siteMaxTime *= float(taskSpec.cpuEfficiency) / 100.0
1287                         siteMaxTime = int(siteMaxTime)
1288                         tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1289                     minTimeForZeroWalltime = 24 * 60 * 60 * 10
1290                     str_minTimeForZeroWalltime = "24hr*10HS06s"
1291                     if tmpSiteSpec.coreCount not in [None, 0]:
1292                         minTimeForZeroWalltime *= tmpSiteSpec.coreCount
1293                         str_minTimeForZeroWalltime += f"*{tmpSiteSpec.coreCount}cores"
1294                     if siteMaxTime != 0 and siteMaxTime < minTimeForZeroWalltime:
1295                         tmpMsg = f"  skip site={tmpSiteName} due to site walltime {tmpSiteStr} (site upper limit) insufficient "
1296                         if inputChunk.useScout():
1297                             tmpMsg += f"for scouts ({str_minTimeForZeroWalltime} at least) "
1298                             tmpMsg += "criteria=-scoutwalltime"
1299                         else:
1300                             tmpMsg += f"for zero walltime ({str_minTimeForZeroWalltime} at least) "
1301                             tmpMsg += "criteria=-zerowalltime"
1302                         msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1303                         continue
1304                     newScanSiteList.append(tmpSiteName)
1305                 scanSiteList = newScanSiteList
1306                 self.add_summary_message(oldScanSiteList, scanSiteList, "zero walltime check", tmpLog, msg_map)
1307                 if not scanSiteList:
1308                     self.dump_summary(tmpLog)
1309                     tmpLog.error("no candidates")
1310                     retVal = retTmpError
1311                     continue
1312 
1313             ######################################
1314             # selection for nPilot
1315             nWNmap = self.taskBufferIF.getCurrentSiteData()
1316             nPilotMap = {}
1317             newScanSiteList = []
1318             oldScanSiteList = copy.copy(scanSiteList)
1319             msg_map = {}
1320             for tmpSiteName in self.get_unified_sites(scanSiteList):
1321                 # check at the site
1322                 nPilot = 0
1323                 if tmpSiteName in nWNmap:
1324                     nPilot = nWNmap[tmpSiteName]["getJob"] + nWNmap[tmpSiteName]["updateJob"]
1325                 if nPilot == 0 and taskSpec.prodSourceLabel not in ["test"]:
1326                     msg_map[tmpSiteName] = f"  skip site={tmpSiteName} due to no pilot criteria=-nopilot"
1327                     if not self.testMode:
1328                         continue
1329                 newScanSiteList.append(tmpSiteName)
1330                 nPilotMap[tmpSiteName] = nPilot
1331             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1332             self.add_summary_message(oldScanSiteList, scanSiteList, "pilot activity check", tmpLog, msg_map)
1333             if not scanSiteList:
1334                 self.dump_summary(tmpLog)
1335                 tmpLog.error("no candidates")
1336                 retVal = retTmpError
1337                 continue
1338 
1339             ######################################
1340             # check inclusion and exclusion
1341             newScanSiteList = []
1342             oldScanSiteList = copy.copy(scanSiteList)
1343             sitesForANY = []
1344             msg_map = {}
1345             for tmpSiteName in self.get_unified_sites(scanSiteList):
1346                 autoSite = False
1347                 # check exclusion
1348                 if AtlasBrokerUtils.isMatched(tmpSiteName, excludeList):
1349                     msg_map[tmpSiteName] = f"  skip site={tmpSiteName} excluded criteria=-excluded"
1350                     continue
1351 
1352                 # check inclusion
1353                 if includeList is not None and not AtlasBrokerUtils.isMatched(tmpSiteName, includeList):
1354                     if "AUTO" in includeList:
1355                         autoSite = True
1356                     else:
1357                         msg_map[tmpSiteName] = f"  skip site={tmpSiteName} not included criteria=-notincluded"
1358                         continue
1359                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1360 
1361                 # check cloud
1362                 if taskSpec.cloud not in [None, "", "any", tmpSiteSpec.cloud]:
1363                     msg_map[tmpSiteName] = f"  skip site={tmpSiteName} cloud mismatch criteria=-cloudmismatch"
1364                     continue
1365                 if autoSite:
1366                     sitesForANY.append(tmpSiteName)
1367                 else:
1368                     newScanSiteList.append(tmpSiteName)
1369             # use AUTO sites if no sites are included
1370             if newScanSiteList == []:
1371                 newScanSiteList = sitesForANY
1372             else:
1373                 for tmpSiteName in sitesForANY:
1374                     msg_map[tmpSiteName] = f"  skip site={tmpSiteName} not included criteria=-notincluded"
1375             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1376             self.add_summary_message(oldScanSiteList, scanSiteList, "include/exclude check", tmpLog, msg_map)
1377             if not scanSiteList:
1378                 self.dump_summary(tmpLog)
1379                 tmpLog.error("no candidates")
1380                 retVal = retTmpError
1381                 continue
1382             ######################################
1383             # sites already used by task
1384             tmpSt, sitesUsedByTask = self.taskBufferIF.getSitesUsedByTask_JEDI(taskSpec.jediTaskID)
1385             if not tmpSt:
1386                 tmpLog.error("failed to get sites which already used by task")
1387                 retVal = retTmpError
1388                 continue
1389             sitesUsedByTask = self.get_unified_sites(sitesUsedByTask)
1390             ######################################
1391             # calculate weight
1392             tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
1393             if not tmpSt:
1394                 tmpLog.error("failed to get job statistics with priority")
1395                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1396                 return retTmpError
1397             tmpSt, siteToRunRateMap = AtlasBrokerUtils.getSiteToRunRateStats(tbIF=self.taskBufferIF, vo=taskSpec.vo)
1398             if not tmpSt:
1399                 tmpLog.error("failed to get site to-running rate")
1400                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1401                 return retTmpError
1402             # check for preassigned
1403             if sitePreAssigned:
1404                 oldScanSiteList = copy.copy(scanSiteList)
1405                 if preassignedSite not in scanSiteList and preassignedSite not in self.get_unified_sites(scanSiteList):
1406                     tmpLog.info(f"preassigned site {preassignedSite} did not pass all tests")
1407                     self.add_summary_message(oldScanSiteList, [], "preassign check", tmpLog, {})
1408                     self.dump_summary(tmpLog)
1409                     tmpLog.error("no candidates")
1410                     retVal = retFatal
1411                     continue
1412                 else:
1413                     newScanSiteList = []
1414                     msg_map = {}
1415                     for tmpPseudoSiteName in scanSiteList:
1416                         tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1417                         tmpSiteName = tmpSiteSpec.get_unified_name()
1418                         if tmpSiteName != preassignedSite:
1419                             msg_map[tmpSiteName] = f"  skip site={tmpSiteName} non pre-assigned site criteria=-nonpreassigned"
1420                             continue
1421                         newScanSiteList.append(tmpSiteName)
1422                     scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1423                 self.add_summary_message(oldScanSiteList, scanSiteList, "preassign check", tmpLog, msg_map)
1424             ######################################
1425             # selection for hospital
1426             newScanSiteList = []
1427             oldScanSiteList = copy.copy(scanSiteList)
1428             hasNormalSite = False
1429             for tmpSiteName in self.get_unified_sites(scanSiteList):
1430                 if not tmpSiteName.endswith("_HOSPITAL"):
1431                     hasNormalSite = True
1432                     break
1433             if hasNormalSite:
1434                 msg_map = {}
1435                 for tmpSiteName in self.get_unified_sites(scanSiteList):
1436                     # remove hospital
1437                     if tmpSiteName.endswith("_HOSPITAL"):
1438                         msg_map[tmpSiteName] = f"  skip site={tmpSiteName} due to hospital queue criteria=-hospital"
1439                         continue
1440                     newScanSiteList.append(tmpSiteName)
1441                 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1442                 self.add_summary_message(oldScanSiteList, scanSiteList, "hospital check", tmpLog, msg_map)
1443                 if not scanSiteList:
1444                     self.dump_summary(tmpLog)
1445                     tmpLog.error("no candidates")
1446                     retVal = retTmpError
1447                     continue
1448             ######################################
1449             # cap with resource type
1450             if not sitePreAssigned:
1451                 # count jobs per resource type
1452                 tmpRet, tmpStatMap = self.taskBufferIF.getJobStatisticsByResourceTypeSite(workQueue)
1453                 newScanSiteList = []
1454                 oldScanSiteList = copy.copy(scanSiteList)
1455                 msg_map = {}
1456                 RT_Cap = 2
1457                 for tmpSiteName in self.get_unified_sites(scanSiteList):
1458                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1459                     tmpUnifiedName = tmpSiteSpec.get_unified_name()
1460                     if tmpUnifiedName in tmpStatMap and taskSpec.resource_type in tmpStatMap[tmpUnifiedName]:
1461                         tmpSiteStatMap = tmpStatMap[tmpUnifiedName][taskSpec.resource_type]
1462                         tmpRTrunning = tmpSiteStatMap.get("running", 0)
1463                         tmpRTqueue = tmpSiteStatMap.get("defined", 0)
1464                         tmpRTqueue += tmpSiteStatMap.get("assigned", 0)
1465                         tmpRTqueue += tmpSiteStatMap.get("activated", 0)
1466                         tmpRTqueue += tmpSiteStatMap.get("starting", 0)
1467                         if tmpRTqueue > max(20, tmpRTrunning * RT_Cap):
1468                             tmpMsg = f"  skip site={tmpSiteName} "
1469                             tmpMsg += "since nQueue/max(20,nRun) with gshare+resource_type is "
1470                             tmpMsg += f"{tmpRTqueue}/max(20,{tmpRTrunning}) > {RT_Cap} "
1471                             tmpMsg += "criteria=-cap_rt"
1472                             msg_map[tmpSiteName] = tmpMsg
1473                     newScanSiteList.append(tmpSiteName)
1474                 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1475                 self.add_summary_message(oldScanSiteList, scanSiteList, "cap with gshare+resource_type check", tmpLog, msg_map)
1476                 if not scanSiteList:
1477                     self.dump_summary(tmpLog)
1478                     tmpLog.error("no candidates")
1479                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1480                     return retTmpError
1481             ######################################
1482             # selection for un-overloaded sites
1483             if not inputChunk.isMerging:
1484                 newScanSiteList = []
1485                 oldScanSiteList = copy.copy(scanSiteList)
1486                 overloadedNonVP = []
1487                 msg_map = {}
1488                 msgList = []
1489                 msgListVP = []
1490                 minQueue = self.taskBufferIF.getConfigValue("anal_jobbroker", "OVERLOAD_MIN_QUEUE", "jedi", taskSpec.vo)
1491                 if minQueue is None:
1492                     minQueue = 20
1493                 ratioOffset = self.taskBufferIF.getConfigValue("anal_jobbroker", "OVERLOAD_RATIO_OFFSET", "jedi", taskSpec.vo)
1494                 if ratioOffset is None:
1495                     ratioOffset = 1.2
1496                 grandRatio = AtlasBrokerUtils.get_total_nq_nr_ratio(jobStatPrioMap, taskSpec.gshare)
1497                 tmpLog.info(f"grand nQueue/nRunning ratio : {grandRatio}")
1498                 tmpLog.info(f"sites with non-VP data : {','.join(scanSiteWoVP)}")
1499                 for tmpPseudoSiteName in scanSiteList:
1500                     tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1501                     tmpSiteName = tmpSiteSpec.get_unified_name()
1502                     # get nQueue and nRunning
1503                     nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running", workQueue_tag=taskSpec.gshare)
1504                     nQueue = 0
1505                     for jobStatus in ["defined", "assigned", "activated", "starting"]:
1506                         nQueue += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, jobStatus, workQueue_tag=taskSpec.gshare)
1507                     # skip if overloaded
1508                     if nQueue > minQueue and (nRunning == 0 or float(nQueue) / float(nRunning) > grandRatio * ratioOffset):
1509                         tmpMsg = f"  skip site={tmpSiteName} "
1510                         tmpMsg += f"nQueue>minQueue({minQueue}) and "
1511                         if nRunning == 0:
1512                             tmpMsg += "nRunning=0 "
1513                             problematic_sites_dict.setdefault(tmpSiteName, set())
1514                             problematic_sites_dict[tmpSiteName].add(f"nQueue({nQueue})>minQueue({minQueue}) and nRunning=0")
1515                         else:
1516                             tmpMsg += f"nQueue({nQueue})/nRunning({nRunning}) > grandRatio({grandRatio:.2f})*offset({ratioOffset}) "
1517                         if tmpSiteName in scanSiteWoVP or checkDataLocality is False or inputChunk.getDatasets() == []:
1518                             tmpMsg += "criteria=-overloaded"
1519                             overloadedNonVP.append(tmpPseudoSiteName)
1520                             msgListVP.append(tmpMsg)
1521                         else:
1522                             tmpMsg += "and VP criteria=-overloaded_vp"
1523                             msgList.append(tmpMsg)
1524                     else:
1525                         newScanSiteList.append(tmpPseudoSiteName)
1526                 if len(newScanSiteList) > 0:
1527                     scanSiteList = newScanSiteList
1528                     for tmpMsg in msgList + msgListVP:
1529                         tmp_site_name = tmpMsg.split("skip site=")[1].split(" ")[0]
1530                         tmpSiteSpec = self.siteMapper.getSite(tmp_site_name)
1531                         msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1532                 else:
1533                     scanSiteList = overloadedNonVP
1534                     for tmpMsg in msgList:
1535                         tmp_site_name = tmpMsg.split("skip site=")[1].split(" ")[0]
1536                         tmpSiteSpec = self.siteMapper.getSite(tmp_site_name)
1537                         msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1538                 self.add_summary_message(oldScanSiteList, scanSiteList, "overload check", tmpLog, msg_map)
1539                 if not scanSiteList:
1540                     self.dump_summary(tmpLog)
1541                     tmpLog.error("no candidates")
1542                     retVal = retTmpError
1543                     continue
1544             ######################################
1545             # consider grid usage of the user
1546             user_name = CoreUtils.clean_user_id(taskSpec.userName)
1547             tmpSt, jobsStatsPerUser = AtlasBrokerUtils.getUsersJobsStats(
1548                 tbIF=self.taskBufferIF, vo=taskSpec.vo, prod_source_label=taskSpec.prodSourceLabel, cache_lifetime=60
1549             )
1550             if not tmpSt:
1551                 tmpLog.error("failed to get users jobs statistics")
1552                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1553                 # send info to logger
1554                 return retTmpError
1555             elif not inputChunk.isMerging:
1556                 # loop over sites
1557                 for tmpPseudoSiteName in scanSiteList:
1558                     tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1559                     tmpSiteName = tmpSiteSpec.get_unified_name()
1560                     # get info about site
1561                     nRunning_pq_total = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running")
1562                     nRunning_pq_in_gshare = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running", workQueue_tag=taskSpec.gshare)
1563                     nQueue_pq_in_gshare = 0
1564                     for jobStatus in ["defined", "assigned", "activated", "starting"]:
1565                         nQueue_pq_in_gshare += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, jobStatus, workQueue_tag=taskSpec.gshare)
1566                     # get to-running-rate
1567                     try:
1568                         site_to_running_rate = siteToRunRateMap[tmpSiteName]
1569                         if isinstance(site_to_running_rate, dict):
1570                             site_to_running_rate = sum(site_to_running_rate.values())
1571                     except KeyError:
1572                         site_to_running_rate = 0
1573                     finally:
1574                         to_running_rate = nRunning_pq_in_gshare * site_to_running_rate / nRunning_pq_total if nRunning_pq_total > 0 else 0
1575                     # get conditions of the site whether to throttle
1576                     if nQueue_pq_in_gshare < base_queue_length_per_pq:
1577                         # not throttle since overall queue length of the site is not large enough
1578                         tmpLog.debug(f"not throttle on {tmpSiteName} since nQ({nQueue_pq_in_gshare}) < base queue length ({base_queue_length_per_pq})")
1579                         continue
1580                     allowed_queue_length_from_wait_time = base_expected_wait_hour_on_pq * to_running_rate
1581                     if nQueue_pq_in_gshare < allowed_queue_length_from_wait_time:
1582                         # not statisfy since overall waiting time of the site is not long enough
1583                         tmpLog.debug(
1584                             "not throttle on {0} since nQ({1}) < {2:.3f} = toRunningRate({3:.3f} /hr) * base wait time ({4} hr)".format(
1585                                 tmpSiteName, nQueue_pq_in_gshare, allowed_queue_length_from_wait_time, to_running_rate, base_expected_wait_hour_on_pq
1586                             )
1587                         )
1588                         continue
1589                     # get user jobs stats under the gshare
1590                     try:
1591                         user_jobs_stats_map = jobsStatsPerUser[tmpSiteName][taskSpec.gshare][user_name]
1592                     except KeyError:
1593                         continue
1594                     else:
1595                         nQ_pq_user = user_jobs_stats_map["nQueue"]
1596                         nR_pq_user = user_jobs_stats_map["nRunning"]
1597                         nUsers_pq = len(jobsStatsPerUser[tmpSiteName][taskSpec.gshare])
1598                         try:
1599                             nR_pq = jobsStatsPerUser[tmpSiteName][taskSpec.gshare]["_total"]["nRunning"]
1600                         except KeyError:
1601                             nR_pq = nRunning_pq_in_gshare
1602                     # evaluate max nQueue per PQ
1603                     nQ_pq_limit_map = {
1604                         "base_limit": base_queue_length_per_pq,
1605                         "static_limit": static_max_queue_running_ratio * nR_pq,
1606                         "dynamic_limit": max_expected_wait_hour * to_running_rate,
1607                     }
1608                     max_nQ_pq = max(nQ_pq_limit_map.values())
1609                     # description for max nQueue per PQ
1610                     description_of_max_nQ_pq = f"max_nQ_pq({max_nQ_pq:.3f}) "
1611                     for k, v in nQ_pq_limit_map.items():
1612                         if v == max_nQ_pq:
1613                             if k in ["base_limit"]:
1614                                 description_of_max_nQ_pq += f"= {k} = BASE_QUEUE_LENGTH_PER_PQ({base_queue_length_per_pq})"
1615                             elif k in ["static_limit"]:
1616                                 description_of_max_nQ_pq += f"= {k} = STATIC_MAX_QUEUE_RUNNING_RATIO({static_max_queue_running_ratio:.3f}) * nR_pq({nR_pq})"
1617                             elif k in ["dynamic_limit"]:
1618                                 description_of_max_nQ_pq += "= {key} = MAX_EXPECTED_WAIT_HOUR({value:.3f} hr) * toRunningRate_pq({trr:.3f} /hr)".format(
1619                                     key=k, value=max_expected_wait_hour, trr=to_running_rate
1620                                 )
1621                             break
1622                     # evaluate fraction per user
1623                     user_fraction_map = {
1624                         "equal_distr": 1 / nUsers_pq,
1625                         "prop_to_nR": nR_pq_user / nR_pq if nR_pq > 0 else 0,
1626                     }
1627                     max_user_fraction = max(user_fraction_map.values())
1628                     # description for max fraction per user
1629                     description_of_max_user_fraction = f"max_user_fraction({max_user_fraction:.3f}) "
1630                     for k, v in user_fraction_map.items():
1631                         if v == max_user_fraction:
1632                             if k in ["equal_distr"]:
1633                                 description_of_max_user_fraction += f"= {k} = 1 / nUsers_pq({nUsers_pq})"
1634                             elif k in ["prop_to_nR"]:
1635                                 description_of_max_user_fraction += f"= {k} = nR_pq_user({nR_pq_user}) / nR_pq({nR_pq})"
1636                             break
1637                     # evaluate max nQueue per PQ per user
1638                     nQ_pq_user_limit_map = {
1639                         "constant_base_user_limit": base_default_queue_length_per_pq_user,
1640                         "ratio_base_user_limit": base_queue_ratio_on_pq * nR_pq,
1641                         "dynamic_user_limit": max_nQ_pq * max_user_fraction,
1642                     }
1643                     max_nQ_pq_user = max(nQ_pq_user_limit_map.values())
1644                     # description for max fraction per user
1645                     description_of_max_nQ_pq_user = f"max_nQ_pq_user({max_nQ_pq_user:.3f}) "
1646                     for k, v in nQ_pq_user_limit_map.items():
1647                         if v == max_nQ_pq_user:
1648                             if k in ["constant_base_user_limit"]:
1649                                 description_of_max_nQ_pq_user += f"= {k} = BASE_DEFAULT_QUEUE_LENGTH_PER_PQ_USER({base_default_queue_length_per_pq_user})"
1650                             elif k in ["ratio_base_user_limit"]:
1651                                 description_of_max_nQ_pq_user += f"= {k} = BASE_QUEUE_RATIO_ON_PQ({base_queue_ratio_on_pq:.3f}) * nR_pq({nR_pq})"
1652                             elif k in ["dynamic_user_limit"]:
1653                                 description_of_max_nQ_pq_user += f"= {k} = max_nQ_pq({max_nQ_pq:.3f}) * max_user_fraction({max_user_fraction:.3f})"
1654                                 description_of_max_nQ_pq_user += f" , where {description_of_max_nQ_pq} , and {description_of_max_user_fraction}"
1655                             break
1656                     # # Analysis Stabilizer: skip sites where the user queues too much
1657                     if nQ_pq_user > max_nQ_pq_user:
1658                         tmpMsg = f" consider {tmpSiteName} unsuitable for the user due to long queue of the user: "
1659                         tmpMsg += f"nQ_pq_user({nQ_pq_user}) > {description_of_max_nQ_pq_user} "
1660                         # view as problematic site in order to throttle
1661                         problematic_sites_dict.setdefault(tmpSiteName, set())
1662                         problematic_sites_dict[tmpSiteName].add(tmpMsg)
1663                     # problematic sites with too many failed and closed jobs
1664                     if tmpSiteName in failureCounts:
1665                         nFailed = failureCounts[tmpSiteName].get("failed", 0)
1666                         nClosed = failureCounts[tmpSiteName].get("closed", 0)
1667                         nFinished = failureCounts[tmpSiteName].get("finished", 0)
1668                         if not inputChunk.isMerging and (nFailed + nClosed) > max(2 * nFinished, minBadJobsToSkipPQ):
1669                             problematic_sites_dict.setdefault(tmpSiteName, set())
1670                             problematic_sites_dict[tmpSiteName].add("too many failed or closed jobs for last 6h")
1671             # check if good sites are still available after removing problematic sites when
1672             # * the brokerage has two loops
1673             # * it is doing the first loop with data locality check
1674             # * data locality check is disabled in the second loop due to low IO, special task priority, or timeout
1675             if len(scan_site_list_loops) > 1 and i_loop == 0 and to_ignore_data_loc:
1676                 candidates_with_problems = []
1677                 for tmpPseudoSiteName in scanSiteList:
1678                     tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1679                     tmpSiteName = tmpSiteSpec.get_unified_name()
1680                     if tmpSiteName in problematic_sites_dict:
1681                         candidates_with_problems.append([tmpPseudoSiteName, list(problematic_sites_dict[tmpSiteName])[0]])
1682                 if len(scanSiteList) == len(candidates_with_problems):
1683                     msg_map = {}
1684                     for tmpPseudoSiteName, error_diag in candidates_with_problems:
1685                         tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1686                         msg_map[tmpSiteSpec.get_unified_name()] = (
1687                             f"  skip site={tmpSiteSpec.get_unified_name()} due to a temporary user-specific problem: {error_diag} " "criteria=-tmp_user_problem"
1688                         )
1689                     self.add_summary_message(scanSiteList, [], "temp user problem check", tmpLog, msg_map)
1690                     self.dump_summary(tmpLog)
1691                     tmpLog.error("no candidates")
1692                     retVal = retTmpError
1693                     continue
1694 
1695             ############
1696             # loop end
1697             overall_site_list.update(scanSiteList)
1698             if site_list_with_data is None:
1699                 # preserve site list with data
1700                 site_list_with_data = set(scanSiteList)
1701             if len(overall_site_list) >= taskSpec.getNumSitesPerJob():
1702                 retVal = None
1703                 break
1704         # failed
1705         if retVal is not None:
1706             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1707             return retVal
1708         # get list of available files
1709         scanSiteList = list(overall_site_list)
1710         availableFileMap = {}
1711         for datasetSpec in inputChunk.getDatasets():
1712             try:
1713                 # get list of site to be scanned
1714                 tmpLog.debug(f"getting the list of available files for {datasetSpec.datasetName}")
1715                 fileScanSiteList = []
1716                 for tmpPseudoSiteName in scanSiteList:
1717                     tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1718                     tmpSiteName = tmpSiteSpec.get_unified_name()
1719                     if tmpSiteName in fileScanSiteList:
1720                         continue
1721                     fileScanSiteList.append(tmpSiteName)
1722                     if tmpSiteName in remoteSourceList and datasetSpec.datasetName in remoteSourceList[tmpSiteName]:
1723                         for tmpRemoteSite in remoteSourceList[tmpSiteName][datasetSpec.datasetName]:
1724                             if tmpRemoteSite not in fileScanSiteList:
1725                                 fileScanSiteList.append(tmpRemoteSite)
1726                 # mapping between sites and input storage endpoints
1727                 siteStorageEP = AtlasBrokerUtils.getSiteInputStorageEndpointMap(fileScanSiteList, self.siteMapper, JobUtils.ANALY_PS, JobUtils.ANALY_PS)
1728                 # disable file lookup for merge jobs
1729                 if inputChunk.isMerging:
1730                     checkCompleteness = False
1731                 else:
1732                     checkCompleteness = True
1733                 if not datasetSpec.isMaster():
1734                     useCompleteOnly = True
1735                 else:
1736                     useCompleteOnly = False
1737                 # get available files per site/endpoint
1738                 tmpAvFileMap = self.ddmIF.getAvailableFiles(
1739                     datasetSpec,
1740                     siteStorageEP,
1741                     self.siteMapper,
1742                     check_completeness=checkCompleteness,
1743                     use_vp=useVP,
1744                     file_scan_in_container=False,
1745                     complete_only=useCompleteOnly,
1746                     element_list=element_map.get(datasetSpec.datasetName),
1747                 )
1748                 if tmpAvFileMap is None:
1749                     raise Interaction.JEDITemporaryError("ddmIF.getAvailableFiles failed")
1750                 availableFileMap[datasetSpec.datasetName] = tmpAvFileMap
1751             except Exception:
1752                 errtype, errvalue = sys.exc_info()[:2]
1753                 tmpLog.error(f"failed to get available files with {errtype.__name__} {errvalue}")
1754                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1755                 return retTmpError
1756         # make data weight
1757         totalSize = 0
1758         totalNumFiles = 0
1759         totalDiskSizeMap = dict()
1760         totalTapeSizeMap = dict()
1761         for datasetSpec in inputChunk.getDatasets():
1762             totalNumFiles += len(datasetSpec.Files)
1763             for fileSpec in datasetSpec.Files:
1764                 totalSize += fileSpec.fsize
1765             if datasetSpec.datasetName in availableFileMap:
1766                 for tmpSiteName, tmpAvFileMap in availableFileMap[datasetSpec.datasetName].items():
1767                     totalDiskSizeMap.setdefault(tmpSiteName, 0)
1768                     totalTapeSizeMap.setdefault(tmpSiteName, 0)
1769                     for fileSpec in tmpAvFileMap["localdisk"]:
1770                         totalDiskSizeMap[tmpSiteName] += fileSpec.fsize
1771                     for fileSpec in tmpAvFileMap["localtape"]:
1772                         totalTapeSizeMap[tmpSiteName] += fileSpec.fsize
1773         totalSize //= 1024 * 1024 * 1024
1774         tmpLog.info(f"totalInputSize={totalSize} GB")
1775         for tmpSiteName in totalDiskSizeMap.keys():
1776             totalDiskSizeMap[tmpSiteName] //= 1024 * 1024 * 1024
1777         for tmpSiteName in totalTapeSizeMap.keys():
1778             totalTapeSizeMap[tmpSiteName] //= 1024 * 1024 * 1024
1779         ######################################
1780         # final procedure
1781         tmpLog.info(f"{len(scanSiteList)} candidates for final check")
1782         weightMap = {}
1783         weightStr = {}
1784         candidateSpecList = []
1785         preSiteCandidateSpec = None
1786         basic_weight_compar_map = {}
1787         workerStat = self.taskBufferIF.ups_load_worker_stats()
1788         for tmpPseudoSiteName in scanSiteList:
1789             tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1790             tmpSiteName = tmpSiteSpec.get_unified_name()
1791             nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running", workQueue_tag=taskSpec.gshare)
1792             nDefined = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "defined", workQueue_tag=taskSpec.gshare)
1793             nAssigned = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "assigned", workQueue_tag=taskSpec.gshare)
1794             nActivated = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "activated", workQueue_tag=taskSpec.gshare)
1795             nStarting = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "starting", workQueue_tag=taskSpec.gshare)
1796             # get num workers
1797             nWorkers = 0
1798             nWorkersCutoff = 20
1799             if tmpSiteName in workerStat:
1800                 for _, tmpLabelStat in workerStat[tmpSiteName].items():
1801                     for _, tmpResStat in tmpLabelStat.items():
1802                         for tmpResType, tmpCounts in tmpResStat.items():
1803                             for tmpStatus, tmpNum in tmpCounts.items():
1804                                 if tmpStatus in ["running", "submitted"]:
1805                                     nWorkers += tmpNum
1806                 # cap
1807                 nWorkers = min(nWorkersCutoff, nWorkers)
1808             # use nWorkers to bootstrap
1809             if tmpSiteName in nPilotMap and nPilotMap[tmpSiteName] > 0 and nRunning < nWorkersCutoff and nWorkers > nRunning:
1810                 tmpLog.debug(f"using nWorkers={nWorkers} as nRunning at {tmpPseudoSiteName} since original nRunning={nRunning} is low")
1811                 nRunning = nWorkers
1812             # take into account the number of standby jobs
1813             numStandby = tmpSiteSpec.getNumStandby(taskSpec.gshare, taskSpec.resource_type)
1814             if numStandby is None:
1815                 pass
1816             elif numStandby == 0:
1817                 # use the number of starting jobs as the number of standby jobs
1818                 nRunning = nStarting + nRunning
1819                 tmpLog.debug(f"using dynamic workload provisioning at {tmpPseudoSiteName} to set nRunning={nRunning}")
1820             else:
1821                 # the number of standby jobs is defined
1822                 nRunning = max(int(numStandby / tmpSiteSpec.coreCount), nRunning)
1823                 tmpLog.debug(f"using static workload provisioning at {tmpPseudoSiteName} with nStandby={numStandby} to set nRunning={nRunning}")
1824             nFailed = 0
1825             nClosed = 0
1826             nFinished = 0
1827             if tmpSiteName in failureCounts:
1828                 if "failed" in failureCounts[tmpSiteName]:
1829                     nFailed = failureCounts[tmpSiteName]["failed"]
1830                 if "closed" in failureCounts[tmpSiteName]:
1831                     nClosed = failureCounts[tmpSiteName]["closed"]
1832                 if "finished" in failureCounts[tmpSiteName]:
1833                     nFinished = failureCounts[tmpSiteName]["finished"]
1834             # to-running rate
1835             try:
1836                 site_to_running_rate = siteToRunRateMap[tmpSiteName]
1837                 if isinstance(site_to_running_rate, dict):
1838                     site_to_running_rate = sum(site_to_running_rate.values())
1839             except KeyError:
1840                 to_running_rate_str = "0(unknown)"
1841                 to_running_rate = 0
1842             else:
1843                 site_n_running = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running")
1844                 to_running_rate = nRunning * site_to_running_rate / site_n_running if site_n_running > 0 else 0
1845                 to_running_rate_str = f"{to_running_rate:.3f}"
1846             # site class value; by default mid-class (= 0) if unclassified
1847             site_class_value = analy_sites_class_dict.get(tmpSiteName, 0)
1848             site_class_value = 0 if site_class_value is None else site_class_value
1849             # calculate original basic weight
1850             orig_basic_weight = float(nRunning + 1) / float(nActivated + nAssigned + nDefined + nStarting + 1)
1851             # available site, take in account of new basic weight
1852             basic_weight_compar_map[tmpSiteName] = {}
1853             basic_weight_compar_map[tmpSiteName]["orig"] = orig_basic_weight
1854             basic_weight_compar_map[tmpSiteName]["trr"] = to_running_rate
1855             basic_weight_compar_map[tmpSiteName]["nq"] = nActivated + nAssigned + nDefined + nStarting
1856             basic_weight_compar_map[tmpSiteName]["nr"] = nRunning
1857             basic_weight_compar_map[tmpSiteName]["class"] = site_class_value
1858             basic_weight_compar_map[tmpSiteName]["nDefined"] = nDefined
1859             basic_weight_compar_map[tmpSiteName]["nActivated"] = nActivated
1860             basic_weight_compar_map[tmpSiteName]["nStarting"] = nStarting
1861             basic_weight_compar_map[tmpSiteName]["nAssigned"] = nAssigned
1862             basic_weight_compar_map[tmpSiteName]["nFailed"] = nFailed
1863             basic_weight_compar_map[tmpSiteName]["nClosed"] = nClosed
1864             basic_weight_compar_map[tmpSiteName]["nFinished"] = nFinished
1865         # compute new basic weight
1866         try:
1867             n_avail_sites = len(basic_weight_compar_map)
1868             if n_avail_sites == 0:
1869                 tmpLog.debug("WEIGHT-COMPAR: zero available sites, skip")
1870             else:
1871                 # task class value
1872                 task_class_value = task_eval_dict.get("class", 1) if task_eval_dict is not None else 1
1873                 # get nFilesPerJob
1874                 if not inputChunk.isMerging:
1875                     nFilesPerJob = taskSpec.getNumFilesPerJob()
1876                 else:
1877                     nFilesPerJob = taskSpec.getNumFilesPerMergeJob()
1878                 if nFilesPerJob is None or nFilesPerJob < 1:
1879                     nFilesPerJob = 1
1880                 #
1881                 _maxSizePerJob = taskSpec.getMaxSizePerJob()
1882                 if _maxSizePerJob is not None:
1883                     _maxSizePerJob += inputChunk.defaultOutputSize
1884                     _maxSizePerJob += taskSpec.getWorkDiskSize()
1885                 else:
1886                     if taskSpec.useScout():
1887                         _maxSizePerJob = inputChunk.maxInputSizeScouts * 1024 * 1024
1888                     else:
1889                         _maxSizePerJob = inputChunk.maxInputSizeAvalanche * 1024 * 1024
1890                 # count subchunks
1891                 n_subchunks = 0
1892                 while True:
1893                     subchunk, _ = inputChunk.getSubChunk(
1894                         None,
1895                         maxNumFiles=taskSpec.getMaxNumFilesPerJob(),
1896                         nFilesPerJob=taskSpec.getNumFilesPerJob(),
1897                         walltimeGradient=(taskSpec.getCpuTime() if taskSpec.useHS06() else None),
1898                         maxWalltime=(taskSpec.getMaxWalltime() if taskSpec.getMaxWalltime() is not None else 345600),
1899                         sizeGradients=taskSpec.getOutDiskSize(),
1900                         sizeIntercepts=taskSpec.getWorkDiskSize(),
1901                         maxSize=_maxSizePerJob,
1902                         nEventsPerJob=taskSpec.getNumEventsPerJob(),
1903                         coreCount=taskSpec.coreCount,
1904                         corePower=10,
1905                         respectLB=taskSpec.respectLumiblock(),
1906                     )
1907                     if subchunk is None:
1908                         break
1909                     else:
1910                         n_subchunks += 1
1911                 inputChunk.resetUsedCounters()
1912                 n_jobs_to_submit = n_subchunks
1913                 # parameters for small additional weight
1914                 weight_epsilon_init = 0.001
1915                 weight_epsilon_hi_mid = 0.05
1916                 weight_epsilon_hi_lo = 0.002
1917                 weight_epsilon_mid_lo = 0.1
1918                 # initialize
1919                 tmpSt, siteToRunRateMap = AtlasBrokerUtils.getSiteToRunRateStats(tbIF=self.taskBufferIF, vo=taskSpec.vo)
1920                 weight_comparison_avail_sites = set(basic_weight_compar_map.keys())
1921                 site_class_n_site_dict = {1: 0, 0: 0, -1: 0}
1922                 site_class_rem_q_len_dict = {1: 0, 0: 0, -1: 0}
1923                 total_rem_q_len = 0
1924                 # loop over sites for metrics
1925                 for site, bw_map in basic_weight_compar_map.items():
1926                     # site class count
1927                     site_class_n_site_dict[bw_map["class"]] += 1
1928                     # get info about site
1929                     nRunning_pq_total = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, site, "running")
1930                     nRunning_pq_in_gshare = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, site, "running", workQueue_tag=taskSpec.gshare)
1931                     nQueue_pq_in_gshare = 0
1932                     for jobStatus in ["defined", "assigned", "activated", "starting"]:
1933                         nQueue_pq_in_gshare += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, site, jobStatus, workQueue_tag=taskSpec.gshare)
1934                     # get to-running-rate
1935                     try:
1936                         site_to_running_rate = siteToRunRateMap[site]
1937                         if isinstance(site_to_running_rate, dict):
1938                             site_to_running_rate = sum(site_to_running_rate.values())
1939                     except Exception:
1940                         site_to_running_rate = 0
1941                     finally:
1942                         to_running_rate = nRunning_pq_in_gshare * site_to_running_rate / nRunning_pq_total if nRunning_pq_total > 0 else 0
1943                     # get user jobs stats under the gshare
1944                     try:
1945                         user_jobs_stats_map = jobsStatsPerUser[site][taskSpec.gshare][user_name]
1946                     except KeyError:
1947                         nQ_pq_user = 0
1948                         nR_pq_user = 0
1949                     else:
1950                         nQ_pq_user = user_jobs_stats_map["nQueue"]
1951                         nR_pq_user = user_jobs_stats_map["nRunning"]
1952                     try:
1953                         nUsers_pq = len(jobsStatsPerUser[site][taskSpec.gshare])
1954                     except KeyError:
1955                         nUsers_pq = 1
1956                     try:
1957                         nR_pq = jobsStatsPerUser[site][taskSpec.gshare]["_total"]["nRunning"]
1958                     except KeyError:
1959                         nR_pq = nRunning_pq_in_gshare
1960                     # evaluate max nQueue per PQ
1961                     nQ_pq_limit_map = {
1962                         "base_limit": base_queue_length_per_pq,
1963                         "static_limit": static_max_queue_running_ratio * nR_pq,
1964                         "dynamic_limit": max_expected_wait_hour * to_running_rate,
1965                     }
1966                     max_nQ_pq = max(nQ_pq_limit_map.values())
1967                     # evaluate fraction per user
1968                     user_fraction_map = {
1969                         "equal_distr": 1 / (nUsers_pq),
1970                         "prop_to_nR": nR_pq_user / nR_pq if nR_pq > 0 else 0,
1971                     }
1972                     max_user_fraction = max(user_fraction_map.values())
1973                     # evaluate max nQueue per PQ per user
1974                     nQ_pq_user_limit_map = {
1975                         "constant_base_user_limit": base_default_queue_length_per_pq_user,
1976                         "ratio_base_user_limit": base_queue_ratio_on_pq * nR_pq,
1977                         "dynamic_user_limit": max_nQ_pq * max_user_fraction,
1978                     }
1979                     max_nQ_pq_user = max(nQ_pq_user_limit_map.values())
1980                     # fill in metrics for the site
1981                     bw_map["user_r"] = nR_pq_user
1982                     bw_map["user_q_len"] = nQ_pq_user
1983                     bw_map["max_q_len"] = max_nQ_pq_user
1984                     bw_map["rem_q_len"] = max(bw_map["max_q_len"] - bw_map["user_q_len"], 0)
1985                     site_class_rem_q_len_dict[bw_map["class"]] += bw_map["rem_q_len"]
1986                     total_rem_q_len += bw_map["rem_q_len"]
1987                 # main weight, for User Analysis, determined by number of jobs to submit to each site class
1988                 main_weight_site_class_dict = {1: weight_epsilon_init, 0: weight_epsilon_init, -1: weight_epsilon_init}
1989                 if taskSpec.gshare in ["User Analysis", "Express Analysis"]:
1990                     n_jobs_to_submit_rem = min(n_jobs_to_submit, total_rem_q_len)
1991                     if task_class_value == -1:
1992                         # C-task
1993                         main_weight_site_class_dict[-1] = n_jobs_to_submit_rem
1994                     elif task_class_value == 0:
1995                         # B-task
1996                         main_weight_site_class_dict[0] = min(n_jobs_to_submit_rem, site_class_rem_q_len_dict[0])
1997                         n_jobs_to_submit_rem -= main_weight_site_class_dict[0]
1998                         main_weight_site_class_dict[-1] = n_jobs_to_submit_rem
1999                         if main_weight_site_class_dict[0] > 0:
2000                             main_weight_site_class_dict[0] -= weight_epsilon_mid_lo
2001                             main_weight_site_class_dict[-1] += weight_epsilon_mid_lo
2002                     else:
2003                         # S-task or A-task or unclassified task
2004                         main_weight_site_class_dict[1] = min(n_jobs_to_submit_rem, site_class_rem_q_len_dict[1])
2005                         n_jobs_to_submit_rem -= main_weight_site_class_dict[1]
2006                         main_weight_site_class_dict[0] = min(n_jobs_to_submit_rem, site_class_rem_q_len_dict[0])
2007                         n_jobs_to_submit_rem -= main_weight_site_class_dict[0]
2008                         main_weight_site_class_dict[-1] = n_jobs_to_submit_rem
2009                         if main_weight_site_class_dict[1] > 0:
2010                             main_weight_site_class_dict[1] -= weight_epsilon_hi_mid + weight_epsilon_hi_lo
2011                             main_weight_site_class_dict[0] += weight_epsilon_hi_mid
2012                             main_weight_site_class_dict[-1] += weight_epsilon_hi_lo
2013                 # find the weights
2014                 for site in weight_comparison_avail_sites:
2015                     bw_map = basic_weight_compar_map[site]
2016                     # main weight by site & task class for User Analysis, and constant for group shares
2017                     nbw_main = n_jobs_to_submit
2018                     if taskSpec.gshare in ["User Analysis", "Express Analysis"]:
2019                         nbw_main = main_weight_site_class_dict[bw_map["class"]]
2020                     # secondary weight proportional to remaing queue length
2021                     nbw_sec = 1
2022                     if taskSpec.gshare in ["User Analysis", "Express Analysis"]:
2023                         _nbw_numer = max(bw_map["rem_q_len"] - nbw_main / site_class_n_site_dict[bw_map["class"]], nbw_main * 0.001)
2024                         reduced_site_class_rem_q_len = site_class_rem_q_len_dict[bw_map["class"]] - nbw_main
2025                         if reduced_site_class_rem_q_len > 0 and not inputChunk.isMerging:
2026                             nbw_sec = _nbw_numer / reduced_site_class_rem_q_len
2027                         elif site_class_rem_q_len_dict[bw_map["class"]] > 0:
2028                             nbw_sec = bw_map["rem_q_len"] / site_class_rem_q_len_dict[bw_map["class"]]
2029                         elif site_class_n_site_dict[bw_map["class"]] > 0:
2030                             nbw_sec = 1 / site_class_n_site_dict[bw_map["class"]]
2031                     else:
2032                         reduced_total_rem_q_len = total_rem_q_len - nbw_main
2033                         _nbw_numer = max(bw_map["rem_q_len"] - nbw_main / n_avail_sites, nbw_main * 0.001)
2034                         if reduced_total_rem_q_len > 0 and not inputChunk.isMerging:
2035                             nbw_sec = _nbw_numer / reduced_total_rem_q_len
2036                         elif total_rem_q_len > 0:
2037                             nbw_sec = bw_map["rem_q_len"] / total_rem_q_len
2038                         elif site_class_n_site_dict[bw_map["class"]] > 0:
2039                             nbw_sec = 1 / n_avail_sites
2040                     # new basic weight
2041                     new_basic_weight = nbw_main * nbw_sec + 2**-20
2042                     bw_map["new"] = new_basic_weight
2043                 # log message to compare weights
2044                 orig_sum = 0
2045                 new_sum = 0
2046                 for bw_map in basic_weight_compar_map.values():
2047                     orig_sum += bw_map["orig"]
2048                     new_sum += bw_map["new"]
2049                 for site in basic_weight_compar_map:
2050                     bw_map = basic_weight_compar_map[site]
2051                     if bw_map["nr"] == 0:
2052                         trr_over_r = None
2053                     else:
2054                         trr_over_r = bw_map["trr"] / bw_map["nr"]
2055                     bw_map["trr_over_r"] = f"{trr_over_r:6.3f}" if trr_over_r is not None else "None"
2056                     if orig_sum == 0:
2057                         normalized_orig = 0
2058                     else:
2059                         normalized_orig = bw_map["orig"] / orig_sum
2060                     bw_map["normalized_orig"] = normalized_orig
2061                     if new_sum == 0:
2062                         normalized_new = 0
2063                     else:
2064                         normalized_new = bw_map["new"] / new_sum
2065                     bw_map["normalized_new"] = normalized_new
2066                 prt_str_list = []
2067                 prt_str_temp = (
2068                     ""
2069                     " {site:>30} |"
2070                     " {class:>2} |"
2071                     " {nq:>6} |"
2072                     " {nr:>6} |"
2073                     " {trr:7.2f} |"
2074                     " {user_q_len:>5} |"
2075                     " {max_q_len:9.2f} |"
2076                     " {rem_q_len:9.2f} |"
2077                     " {orig:7.2f} |"
2078                     " {new:7.3f} |"
2079                     " {normalized_orig:6.1%} |"
2080                     " {normalized_new:6.1%} |"
2081                 )
2082                 prt_str_title = (
2083                     ""
2084                     " {site:>30} |"
2085                     " {cl:>2} |"
2086                     " {nq:>6} |"
2087                     " {nr:>6} |"
2088                     " {trr:>7} |"
2089                     " {user_q_len:>5} |"
2090                     " {max_q_len:>9} |"
2091                     " {rem_q_len:>9} |"
2092                     " {orig:>7} |"
2093                     " {new:>7} |"
2094                     " {normalized_orig:>6} |"
2095                     " {normalized_new:>6} |"
2096                 ).format(
2097                     site="Site",
2098                     cl="Cl",
2099                     nq="Q",
2100                     nr="R",
2101                     trr="TRR",
2102                     user_q_len="UserQ",
2103                     max_q_len="UserQ_max",
2104                     rem_q_len="UserQ_rem",
2105                     orig="Wb_orig",
2106                     new="Wb_new",
2107                     normalized_orig="orig_%",
2108                     normalized_new="new_%",
2109                 )
2110                 prt_str_list.append(prt_str_title)
2111                 for site in sorted(basic_weight_compar_map):
2112                     bw_map = basic_weight_compar_map[site]
2113                     prt_str = prt_str_temp.format(site=site, **{k: (x if x is not None else math.nan) for k, x in bw_map.items()})
2114                     prt_str_list.append(prt_str)
2115                 tmpLog.info(f"gshare: {taskSpec.gshare} ,{' merging,' if inputChunk.isMerging else ''} task_class: {task_class_value}")
2116                 tmpLog.debug(
2117                     "WEIGHT-COMPAR: for gshare={},{} cl={}, nJobsEst={} got \n{}".format(
2118                         taskSpec.gshare, (" merging," if inputChunk.isMerging else ""), task_class_value, n_jobs_to_submit, "\n".join(prt_str_list)
2119                     )
2120                 )
2121         except Exception as e:
2122             tmpLog.error(f"{traceback.format_exc()}")
2123         # choose basic weight
2124         _basic_weight_version = "new"
2125         # finish computing weight
2126         for tmpPseudoSiteName in scanSiteList:
2127             tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
2128             tmpSiteName = tmpSiteSpec.get_unified_name()
2129             bw_map = basic_weight_compar_map[tmpSiteName]
2130             # fill basic weight
2131             weight = bw_map[_basic_weight_version]
2132             # penalty according to throttled jobs
2133             nThrottled = 0
2134             if tmpSiteName in remoteSourceList:
2135                 nThrottled = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "throttled", workQueue_tag=taskSpec.gshare)
2136                 weight /= float(nThrottled + 1)
2137             # normalize weights by taking data availability into account
2138             diskNorm = 10
2139             tapeNorm = 1000
2140             localSize = totalSize
2141             if checkDataLocality and not useUnionLocality:
2142                 tmpDataWeight = 1
2143                 if tmpSiteName in dataWeight:
2144                     weight *= dataWeight[tmpSiteName]
2145                     tmpDataWeight = dataWeight[tmpSiteName]
2146             else:
2147                 tmpDataWeight = 1
2148                 if totalSize > 0:
2149                     if tmpSiteName in totalDiskSizeMap:
2150                         tmpDataWeight += totalDiskSizeMap[tmpSiteName] / diskNorm
2151                         localSize = totalDiskSizeMap[tmpSiteName]
2152                     elif tmpSiteName in totalTapeSizeMap:
2153                         tmpDataWeight += totalTapeSizeMap[tmpSiteName] / tapeNorm
2154                         localSize = totalTapeSizeMap[tmpSiteName]
2155                 weight *= tmpDataWeight
2156             # make candidate
2157             siteCandidateSpec = SiteCandidate(tmpPseudoSiteName, tmpSiteName)
2158             # preassigned
2159             if sitePreAssigned and tmpSiteName == preassignedSite:
2160                 preSiteCandidateSpec = siteCandidateSpec
2161             # override attributes
2162             siteCandidateSpec.override_attribute("maxwdir", newMaxwdir.get(tmpSiteName))
2163             if cmt_config:
2164                 platforms = resolved_platforms.get(tmpSiteName)
2165                 if platforms:
2166                     siteCandidateSpec.override_attribute("platforms", platforms)
2167             # set weight
2168             siteCandidateSpec.weight = weight
2169             tmpStr = (
2170                 f"""weight={weight:.7f} """
2171                 f"""gshare={taskSpec.gshare} class={bw_map["class"]} trr={bw_map["trr"]:.3f} """
2172                 f"""userR={bw_map["user_r"]} userQ={bw_map["user_q_len"]} userQRem={bw_map["rem_q_len"]:.3f} """
2173                 f"""nRunning={bw_map["nr"]} nDefined={bw_map["nDefined"]} nActivated={bw_map["nActivated"]} """
2174                 f"""nStarting={bw_map["nStarting"]} nAssigned={bw_map["nAssigned"]} """
2175                 f"""nFailed={bw_map["nFailed"]} nClosed={bw_map["nClosed"]} nFinished={bw_map["nFinished"]} """
2176                 f"""dataW={tmpDataWeight} totalInGB={totalSize} localInGB={localSize} nFiles={totalNumFiles} """
2177             )
2178             weightStr[tmpPseudoSiteName] = tmpStr
2179             # append
2180             if tmpSiteName in sitesUsedByTask:
2181                 candidateSpecList.append(siteCandidateSpec)
2182             else:
2183                 if weight not in weightMap:
2184                     weightMap[weight] = []
2185                 weightMap[weight].append(siteCandidateSpec)
2186         # sort candidates by weights and data availability
2187         candidateSpecListWithData = []
2188         weightList = sorted(weightMap.keys())
2189         weightList.reverse()
2190         for weightVal in weightList:
2191             sitesWithWeight = weightMap[weightVal]
2192             for tmp_site in list(sitesWithWeight):
2193                 if tmp_site in site_list_with_data:
2194                     candidateSpecListWithData.append(tmp_site)
2195                     sitesWithWeight.remove(tmp_site)
2196             random.shuffle(sitesWithWeight)
2197             candidateSpecList += sitesWithWeight
2198         candidateSpecList = candidateSpecListWithData + candidateSpecList
2199         # limit the number of sites
2200         if not hasDDS:
2201             if taskSpec.getNumSitesPerJob() > 1:
2202                 # use the number of sites in the task spec for job cloning
2203                 maxNumSites = max(len(site_list_with_data), taskSpec.getNumSitesPerJob())
2204             else:
2205                 maxNumSites = 10
2206         else:
2207             # use all sites for distributed datasets
2208             maxNumSites = None
2209         # remove problematic sites
2210         oldScanSiteList = copy.copy(scanSiteList)
2211         candidateSpecList = AtlasBrokerUtils.skipProblematicSites(
2212             candidateSpecList, set(problematic_sites_dict), sitesUsedByTask, preSiteCandidateSpec, maxNumSites, tmpLog
2213         )
2214         # append preassigned
2215         if sitePreAssigned and preSiteCandidateSpec is not None and preSiteCandidateSpec not in candidateSpecList:
2216             candidateSpecList.append(preSiteCandidateSpec)
2217         # collect site names
2218         scanSiteList = []
2219         for siteCandidateSpec in candidateSpecList:
2220             scanSiteList.append(siteCandidateSpec.siteName)
2221         # append candidates
2222         newScanSiteList = []
2223         msgList = []
2224         below_min_weight = []
2225         for siteCandidateSpec in candidateSpecList:
2226             tmpPseudoSiteName = siteCandidateSpec.siteName
2227             tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
2228             tmpSiteName = tmpSiteSpec.get_unified_name()
2229             # preassigned
2230             if sitePreAssigned and tmpSiteName != preassignedSite:
2231                 tmpLog.info(f"  skip site={tmpPseudoSiteName} non pre-assigned site criteria=-nonpreassigned")
2232                 try:
2233                     del weightStr[tmpPseudoSiteName]
2234                 except Exception:
2235                     pass
2236                 continue
2237             # below minimum brokerage weight
2238             if min_weight > 0 and siteCandidateSpec.weight < min_weight:
2239                 below_min_weight.append(tmpSiteName)
2240                 continue
2241             # set available files
2242             if inputChunk.getDatasets() == [] or (not checkDataLocality and not tmpSiteSpec.use_only_local_data()):
2243                 isAvailable = True
2244             else:
2245                 isAvailable = False
2246             isAvailableBase = isAvailable
2247             for tmpDatasetName, availableFiles in availableFileMap.items():
2248                 tmpDatasetSpec = inputChunk.getDatasetWithName(tmpDatasetName)
2249                 isAvailable = isAvailableBase
2250                 # check remote files
2251                 if tmpSiteName in remoteSourceList and tmpDatasetName in remoteSourceList[tmpSiteName] and not tmpSiteSpec.use_only_local_data():
2252                     for tmpRemoteSite in remoteSourceList[tmpSiteName][tmpDatasetName]:
2253                         if tmpRemoteSite in availableFiles and len(tmpDatasetSpec.Files) <= len(availableFiles[tmpRemoteSite]["localdisk"]):
2254                             # use only remote disk files
2255                             siteCandidateSpec.add_remote_files(availableFiles[tmpRemoteSite]["localdisk"])
2256                             # set remote site and access protocol
2257                             siteCandidateSpec.remoteProtocol = allowedRemoteProtocol
2258                             siteCandidateSpec.remoteSource = tmpRemoteSite
2259                             isAvailable = True
2260                             break
2261                 # local files
2262                 if tmpSiteName in availableFiles:
2263                     if (
2264                         len(tmpDatasetSpec.Files) <= len(availableFiles[tmpSiteName]["localdisk"])
2265                         or len(tmpDatasetSpec.Files) <= len(availableFiles[tmpSiteName]["cache"])
2266                         or len(tmpDatasetSpec.Files) <= len(availableFiles[tmpSiteName]["localtape"])
2267                         or tmpDatasetSpec.isDistributed()
2268                         or true_complete_disk_ok[tmpDatasetName] is False
2269                         or ((checkDataLocality is False or useUnionLocality) and not tmpSiteSpec.use_only_local_data())
2270                     ):
2271                         siteCandidateSpec.add_local_disk_files(availableFiles[tmpSiteName]["localdisk"])
2272                         # add cached files to local list since cached files go to pending when reassigned
2273                         siteCandidateSpec.add_local_disk_files(availableFiles[tmpSiteName]["cache"])
2274                         siteCandidateSpec.add_local_tape_files(availableFiles[tmpSiteName]["localtape"])
2275                         siteCandidateSpec.add_cache_files(availableFiles[tmpSiteName]["cache"])
2276                         siteCandidateSpec.add_remote_files(availableFiles[tmpSiteName]["remote"])
2277                         siteCandidateSpec.addAvailableFiles(availableFiles[tmpSiteName]["all"])
2278                         isAvailable = True
2279                     else:
2280                         tmpMsg = "{0} is incomplete at {1} : nFiles={2} nLocal={3} nCached={4} nTape={5}"
2281                         tmpLog.debug(
2282                             tmpMsg.format(
2283                                 tmpDatasetName,
2284                                 tmpPseudoSiteName,
2285                                 len(tmpDatasetSpec.Files),
2286                                 len(availableFiles[tmpSiteName]["localdisk"]),
2287                                 len(availableFiles[tmpSiteName]["cache"]),
2288                                 len(availableFiles[tmpSiteName]["localtape"]),
2289                             )
2290                         )
2291                 if not isAvailable:
2292                     break
2293             # append
2294             if not isAvailable and tmpSiteSpec.use_only_local_data():
2295                 tmpLog.info(f"  skip site={siteCandidateSpec.siteName} file unavailable criteria=-fileunavailable")
2296                 try:
2297                     del weightStr[siteCandidateSpec.siteName]
2298                 except Exception:
2299                     pass
2300                 continue
2301             inputChunk.addSiteCandidate(siteCandidateSpec)
2302             newScanSiteList.append(siteCandidateSpec.siteName)
2303             tmpMsg = "  use site={0} with {1} nLocalDisk={2} nLocalTape={3} nCache={4} nRemote={5} criteria=+use".format(
2304                 siteCandidateSpec.siteName,
2305                 weightStr[siteCandidateSpec.siteName],
2306                 len(siteCandidateSpec.localDiskFiles),
2307                 len(siteCandidateSpec.localTapeFiles),
2308                 len(siteCandidateSpec.cacheFiles),
2309                 len(siteCandidateSpec.remoteFiles),
2310             )
2311             msgList.append(tmpMsg)
2312             del weightStr[siteCandidateSpec.siteName]
2313         # dump
2314         for tmpPseudoSiteName in oldScanSiteList:
2315             tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
2316             tmpSiteName = tmpSiteSpec.get_unified_name()
2317             tmpWeightStr = None
2318             if tmpSiteName in weightStr:
2319                 tmpWeightStr = weightStr[tmpSiteName]
2320             elif tmpPseudoSiteName in weightStr:
2321                 tmpWeightStr = weightStr[tmpPseudoSiteName]
2322             if tmpWeightStr is not None:
2323                 if tmpSiteName in problematic_sites_dict:
2324                     bad_reasons = " ; ".join(list(problematic_sites_dict[tmpSiteName]))
2325                     tmpMsg = f"  skip site={tmpPseudoSiteName} {bad_reasons} ; with {tmpWeightStr} criteria=-badsite"
2326                 elif tmpSiteName in below_min_weight:
2327                     tmpMsg = f"  skip site={tmpPseudoSiteName} due to weight below the minimum {min_weight_param}={min_weight} with {tmpWeightStr} criteria=-below_min_weight"
2328                 else:
2329                     tmpMsg = f"  skip site={tmpPseudoSiteName} due to low weight and not-used by old jobs with {tmpWeightStr} criteria=-low_weight"
2330                 tmpLog.info(tmpMsg)
2331         for tmpMsg in msgList:
2332             tmpLog.info(tmpMsg)
2333         scanSiteList = newScanSiteList
2334         self.add_summary_message(oldScanSiteList, scanSiteList, "final check", tmpLog, {})
2335         if not scanSiteList:
2336             self.dump_summary(tmpLog)
2337             tmpLog.error("no candidates")
2338             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
2339             return retTmpError
2340         self.dump_summary(tmpLog, scanSiteList)
2341         # return
2342         tmpLog.debug("done")
2343         return self.SC_SUCCEEDED, inputChunk