Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:57

0001 import copy
0002 import datetime
0003 import re
0004 
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.PandaUtils import naive_utcnow
0007 
0008 from pandajedi.jedicore import Interaction
0009 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0010 from pandajedi.jedicore.SiteCandidate import SiteCandidate
0011 from pandaserver.dataservice import DataServiceUtils
0012 from pandaserver.dataservice.DataServiceUtils import select_scope
0013 from pandaserver.srvcore import CoreUtils
0014 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0015 
0016 from . import AtlasBrokerUtils
0017 from .JobBrokerBase import JobBrokerBase
0018 
0019 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0020 
0021 # definitions for network
0022 AGIS_CLOSENESS = "AGIS_closeness"
0023 BLOCKED_LINK = -1
0024 MIN_CLOSENESS = 0  # closeness goes from 0(best) to 11(worst)
0025 MAX_CLOSENESS = 11
0026 # NWS tags need to be prepended with activity
0027 TRANSFERRED_1H = "_done_1h"
0028 TRANSFERRED_6H = "_done_6h"
0029 QUEUED = "_queued"
0030 ZERO_TRANSFERS = 0.00001
0031 URG_ACTIVITY = "Express"
0032 PRD_ACTIVITY = "Production Output"
0033 # NWS tags for FTS throughput
0034 FTS_1H = "dashb_mbps_1h"
0035 FTS_1D = "dashb_mbps_1d"
0036 FTS_1W = "dashb_mbps_1w"
0037 APP = "jedi"
0038 COMPONENT = "jobbroker"
0039 VO = "atlas"
0040 
0041 WORLD_NUCLEUS_WEIGHT = 4
0042 
0043 
0044 # brokerage for ATLAS production
0045 class AtlasProdJobBroker(JobBrokerBase):
0046     # constructor
0047     def __init__(self, ddmIF, taskBufferIF):
0048         JobBrokerBase.__init__(self, ddmIF, taskBufferIF)
0049         self.dataSiteMap = {}
0050         self.suppressLogSending = False
0051 
0052         self.nwActive = taskBufferIF.getConfigValue(COMPONENT, "NW_ACTIVE", APP, VO)
0053         if self.nwActive is None:
0054             self.nwActive = False
0055 
0056         self.nwQueueImportance = taskBufferIF.getConfigValue(COMPONENT, "NW_QUEUE_IMPORTANCE", APP, VO)
0057         if self.nwQueueImportance is None:
0058             self.nwQueueImportance = 0.5
0059         self.nwThroughputImportance = 1 - self.nwQueueImportance
0060 
0061         self.nw_threshold = taskBufferIF.getConfigValue(COMPONENT, "NW_THRESHOLD", APP, VO)
0062         if self.nw_threshold is None:
0063             self.nw_threshold = 1.7
0064 
0065         self.queue_threshold = taskBufferIF.getConfigValue(COMPONENT, "NQUEUED_SAT_CAP", APP, VO)
0066         if self.queue_threshold is None:
0067             self.queue_threshold = 150
0068 
0069         self.total_queue_threshold = taskBufferIF.getConfigValue(COMPONENT, "NQUEUED_NUC_CAP_FOR_JOBS", APP, VO)
0070         if self.total_queue_threshold is None:
0071             self.total_queue_threshold = 10000
0072 
0073         self.nw_weight_multiplier = taskBufferIF.getConfigValue(COMPONENT, "NW_WEIGHT_MULTIPLIER", APP, VO)
0074         if self.nw_weight_multiplier is None:
0075             self.nw_weight_multiplier = 1
0076 
0077         self.io_intensity_cutoff = taskBufferIF.getConfigValue(COMPONENT, "IO_INTENSITY_CUTOFF", APP, VO)
0078         if self.io_intensity_cutoff is None:
0079             self.io_intensity_cutoff = 200
0080 
0081         self.max_prio_for_bootstrap = taskBufferIF.getConfigValue(COMPONENT, "MAX_PRIO_TO_BOOTSTRAP", APP, VO)
0082         if self.max_prio_for_bootstrap is None:
0083             self.max_prio_for_bootstrap = 150
0084 
0085         # load the SW availability map
0086         try:
0087             self.sw_map = taskBufferIF.load_sw_map()
0088         except BaseException:
0089             logger.error("Failed to load the SW tags map!!!")
0090             self.sw_map = {}
0091 
0092         # load the worker node CPU architecture-level map
0093         try:
0094             self.architecture_level_map = taskBufferIF.get_architecture_level_map()
0095         except BaseException:
0096             logger.error("Failed to load the WN architecture level map!!!")
0097             self.architecture_level_map = {}
0098 
0099     def convertMBpsToWeight(self, mbps):
0100         """
0101         Takes MBps value and converts to a weight between 1 and 2
0102         """
0103         mbps_thresholds = [200, 100, 75, 50, 20, 10, 2, 1]
0104         weights = [2, 1.9, 1.8, 1.6, 1.5, 1.3, 1.2, 1.1]
0105 
0106         for threshold, weight in zip(mbps_thresholds, weights):
0107             if mbps > threshold:
0108                 return weight
0109         return 1
0110 
0111     # main
0112     def doBrokerage(self, taskSpec, cloudName, inputChunk, taskParamMap, hintForTB=False, siteListForTB=None, glLog=None):
0113         # suppress sending log
0114         if hintForTB:
0115             self.suppressLogSending = True
0116         # make logger
0117         if glLog is None:
0118             tmpLog = MsgWrapper(
0119                 logger,
0120                 f"<jediTaskID={taskSpec.jediTaskID}>",
0121                 monToken=f"<jediTaskID={taskSpec.jediTaskID} {naive_utcnow().isoformat('/')}>",
0122             )
0123         else:
0124             tmpLog = glLog
0125 
0126         if not hintForTB:
0127             tmpLog.debug(f"start currentPriority={taskSpec.currentPriority}")
0128 
0129         if self.nwActive:
0130             tmpLog.debug("Network weights are ACTIVE!")
0131         else:
0132             tmpLog.debug("Network weights are PASSIVE!")
0133 
0134         timeNow = naive_utcnow()
0135 
0136         # return for failure
0137         retTmpError = self.SC_FAILED, inputChunk
0138 
0139         # new maxwdir
0140         newMaxwdir = {}
0141 
0142         # short of work
0143         work_shortage = self.taskBufferIF.getConfigValue("core", "WORK_SHORTAGE", APP, VO)
0144         if work_shortage is True:
0145             tmp_status, core_statistics = self.taskBufferIF.get_core_statistics(taskSpec.vo, taskSpec.prodSourceLabel)
0146             if not tmp_status:
0147                 tmpLog.error("failed to get core statistics")
0148                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0149                 return retTmpError
0150             tmpLog.debug(f"Work shortage is {work_shortage}")
0151         else:
0152             core_statistics = {}
0153 
0154         # thresholds for incomplete datasets
0155         max_missing_input_files = self.taskBufferIF.getConfigValue("jobbroker", "MAX_MISSING_INPUT_FILES", "jedi", taskSpec.vo)
0156         if max_missing_input_files is None:
0157             max_missing_input_files = 10
0158         min_input_completeness = self.taskBufferIF.getConfigValue("jobbroker", "MIN_INPUT_COMPLETENESS", "jedi", taskSpec.vo)
0159         if min_input_completeness is None:
0160             min_input_completeness = 90
0161 
0162         # minimum brokerage weight
0163         min_weight_param = f"MIN_WEIGHT_{taskSpec.prodSourceLabel}_{taskSpec.gshare}"
0164         min_weight = self.taskBufferIF.getConfigValue("jobbroker", min_weight_param, "jedi", taskSpec.vo)
0165         if min_weight is None:
0166             min_weight_param = f"MIN_WEIGHT_{taskSpec.prodSourceLabel}"
0167             min_weight = self.taskBufferIF.getConfigValue("jobbroker", min_weight_param, "jedi", taskSpec.vo)
0168         if min_weight is None:
0169             min_weight = 0
0170 
0171         # get sites in the cloud
0172         siteSkippedTmp = dict()
0173         sitePreAssigned = False
0174         siteListPreAssigned = False
0175         if siteListForTB is not None:
0176             scanSiteList = siteListForTB
0177 
0178         elif taskSpec.site not in ["", None] and inputChunk.getPreassignedSite() is None:
0179             # convert to pseudo queues if needed
0180             scanSiteList = []
0181             for tmpSiteName in taskSpec.site.split(","):
0182                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0183                 if tmpSiteSpec is None or not tmpSiteSpec.is_unified:
0184                     scanSiteList.append(tmpSiteName)
0185                 else:
0186                     scanSiteList += self.get_pseudo_sites([tmpSiteName], self.siteMapper.getCloud(cloudName)["sites"])
0187             for tmpSiteName in scanSiteList:
0188                 tmpLog.info(f"site={tmpSiteName} is pre-assigned criteria=+preassign")
0189             if len(scanSiteList) > 1:
0190                 siteListPreAssigned = True
0191             else:
0192                 sitePreAssigned = True
0193 
0194         elif inputChunk.getPreassignedSite() is not None:
0195             if (
0196                 inputChunk.masterDataset.creationTime is not None
0197                 and inputChunk.masterDataset.modificationTime is not None
0198                 and inputChunk.masterDataset.modificationTime != inputChunk.masterDataset.creationTime
0199                 and timeNow - inputChunk.masterDataset.modificationTime > datetime.timedelta(hours=24)
0200                 and taskSpec.frozenTime is not None
0201                 and timeNow - taskSpec.frozenTime > datetime.timedelta(hours=6)
0202             ):
0203                 # ignore pre-assigned site since pmerge is timed out
0204                 tmpLog.info("ignore pre-assigned for pmerge due to timeout")
0205                 scanSiteList = self.siteMapper.getCloud(cloudName)["sites"]
0206                 tmpLog.info(f"cloud={cloudName} has {len(scanSiteList)} candidates")
0207             else:
0208                 # pmerge
0209                 siteListPreAssigned = True
0210                 scanSiteList = DataServiceUtils.getSitesShareDDM(self.siteMapper, inputChunk.getPreassignedSite(), JobUtils.PROD_PS, JobUtils.PROD_PS)
0211                 scanSiteList.append(inputChunk.getPreassignedSite())
0212                 tmp_msg = (
0213                     f"use site={scanSiteList} since they share DDM endpoints with original_site={inputChunk.getPreassignedSite()} "
0214                     f"which is pre-assigned in masterDS criteria=+premerge"
0215                 )
0216                 tmpLog.info(tmp_msg)
0217 
0218         else:
0219             scanSiteList = self.siteMapper.getCloud(cloudName)["sites"]
0220             tmpLog.info(f"cloud={cloudName} has {len(scanSiteList)} candidates")
0221 
0222         # high prio ES
0223         if taskSpec.useEventService() and not taskSpec.useJobCloning() and (taskSpec.currentPriority >= 900 or inputChunk.useScout()):
0224             esHigh = True
0225         else:
0226             esHigh = False
0227 
0228         # get job statistics
0229         tmpSt, jobStatMap = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
0230         if not tmpSt:
0231             tmpLog.error("failed to get job statistics")
0232             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0233             return retTmpError
0234 
0235         tmp_st, transferring_job_map = self.taskBufferIF.get_num_jobs_with_status_by_nucleus(taskSpec.vo, "transferring")
0236         if not tmp_st:
0237             tmpLog.error("failed to get transferring job statistics")
0238             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0239             return retTmpError
0240 
0241         # get destination for WORLD cloud
0242         nucleusSpec = None
0243         nucleus_with_storages_unwritable_over_wan = {}
0244         if not hintForTB:
0245             # get nucleus
0246             nucleusSpec = self.siteMapper.getNucleus(taskSpec.nucleus)
0247             if nucleusSpec is None:
0248                 tmpLog.error(f"unknown nucleus {taskSpec.nucleus}")
0249                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0250                 return retTmpError
0251             sites_in_nucleus = nucleusSpec.allPandaSites
0252             # get black list
0253             for tmp_name, tmp_nucleus_dict in self.siteMapper.nuclei.items():
0254                 default_endpoint_out = tmp_nucleus_dict.get_default_endpoint_out()
0255                 if not default_endpoint_out:
0256                     continue
0257                 receive_output_over_wan = default_endpoint_out["detailed_status"].get("write_wan")
0258                 if receive_output_over_wan in ["OFF", "TEST"]:
0259                     nucleus_with_storages_unwritable_over_wan[tmp_name] = receive_output_over_wan
0260 
0261         else:
0262             # use all sites in nuclei for WORLD task brokerage
0263             sites_in_nucleus = []
0264             for tmpNucleus in self.siteMapper.nuclei.values():
0265                 sites_in_nucleus += tmpNucleus.allPandaSites
0266 
0267         # sites sharing SE with T1
0268         if len(sites_in_nucleus) > 0:
0269             sites_sharing_output_storages_in_nucleus = DataServiceUtils.getSitesShareDDM(
0270                 self.siteMapper, sites_in_nucleus[0], JobUtils.PROD_PS, JobUtils.PROD_PS, True
0271             )
0272         else:
0273             sites_sharing_output_storages_in_nucleus = []
0274 
0275         # core count
0276         if inputChunk.isMerging and taskSpec.mergeCoreCount is not None:
0277             taskCoreCount = taskSpec.mergeCoreCount
0278         else:
0279             taskCoreCount = taskSpec.coreCount
0280 
0281         # MP
0282         if taskCoreCount is not None and taskCoreCount > 1:
0283             # use MCORE only
0284             useMP = "only"
0285         elif taskCoreCount == 0 and (taskSpec.currentPriority >= 900 or inputChunk.useScout()):
0286             # use MCORE only for ES scouts and close to completion
0287             useMP = "only"
0288         elif taskCoreCount == 0:
0289             # use MCORE and normal
0290             useMP = "any"
0291         else:
0292             # not use MCORE
0293             useMP = "unuse"
0294 
0295         # get workQueue
0296         workQueue = self.taskBufferIF.getWorkQueueMap().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
0297         if workQueue.is_global_share:
0298             wq_tag = workQueue.queue_name
0299             wq_tag_global_share = wq_tag
0300         else:
0301             wq_tag = workQueue.queue_id
0302             workQueueGS = self.taskBufferIF.getWorkQueueMap().getQueueWithIDGshare(None, taskSpec.gshare)
0303             wq_tag_global_share = workQueueGS.queue_name
0304 
0305         # init summary list
0306         self.init_summary_list("Job brokerage summary", None, self.get_unified_sites(scanSiteList))
0307 
0308         ######################################
0309         # check dataset completeness
0310         remote_source_available = True
0311         if inputChunk.getDatasets() and not taskSpec.inputPreStaging():
0312             for datasetSpec in inputChunk.getDatasets():
0313                 datasetName = datasetSpec.datasetName
0314                 # skip distributed datasets
0315                 is_distributed = self.ddmIF.isDistributedDataset(datasetName)
0316                 if is_distributed:
0317                     tmpLog.debug(f"completeness check disabled for {datasetName} since it is distributed")
0318                     continue
0319                 # check if complete replicas are available at online endpoints
0320                 (
0321                     tmpSt,
0322                     tmpRet,
0323                     tmp_complete_disk_ok,
0324                     tmp_complete_tape_ok,
0325                     tmp_truly_complete_disk,
0326                     tmp_can_be_local_source,
0327                     tmp_can_be_remote_source,
0328                     tmp_list_of_complete_replica_locations,
0329                 ) = AtlasBrokerUtils.get_sites_with_data(
0330                     [],
0331                     self.siteMapper,
0332                     self.ddmIF,
0333                     datasetName,
0334                     [],
0335                     max_missing_input_files,
0336                     min_input_completeness,
0337                 )
0338                 if tmpSt != Interaction.SC_SUCCEEDED:
0339                     tmpLog.error(f"failed to get available storage endpoints with {datasetName}")
0340                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0341                     return retTmpError
0342                 # pending if the dataset is incomplete or missing at online endpoints
0343                 if not tmp_complete_disk_ok and not tmp_complete_tape_ok:
0344                     err_msg = f"{datasetName} is "
0345                     if tmp_list_of_complete_replica_locations:
0346                         tmp_rse_list = ",".join(tmp_list_of_complete_replica_locations)
0347                         tmp_is_single = len(tmp_list_of_complete_replica_locations) == 1
0348                         err_msg += f"only complete at {tmp_rse_list} which "
0349                         err_msg += "is " if tmp_is_single else "are "
0350                         err_msg += "currently in downtime or offline"
0351                     else:
0352                         err_msg += "incomplete at any online storage"
0353                     tmpLog.error(err_msg)
0354                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0355                     return retTmpError
0356                 tmp_msg = f"complete replicas of {datasetName} are available at online storages"
0357                 if not tmp_can_be_remote_source:
0358                     if tmp_can_be_local_source:
0359                         tmp_msg += ", but files cannot be sent out to satellites since read_wan is not ON"
0360                         remote_source_available = False
0361                     else:
0362                         tmp_msg = f"complete replicas of {datasetName} are available at endpoints where read_wan/lan are not ON"
0363                         tmpLog.error(tmp_msg)
0364                         taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0365                         return retTmpError
0366                 tmpLog.info(tmp_msg)
0367 
0368         ######################################
0369         # selection for status
0370         if not sitePreAssigned and not siteListPreAssigned:
0371             newScanSiteList = []
0372             oldScanSiteList = copy.copy(scanSiteList)
0373             msg_map = {}
0374             tmpLog.set_message_slot()
0375             for tmpSiteName in scanSiteList:
0376                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0377 
0378                 # skip unified queues
0379                 if tmpSiteSpec.is_unified:
0380                     continue
0381                 # check site status
0382                 msgFlag = False
0383                 skipFlag = False
0384 
0385                 if tmpSiteSpec.status in ["online", "standby"]:
0386                     newScanSiteList.append(tmpSiteName)
0387                 else:
0388                     msgFlag = True
0389                     skipFlag = True
0390 
0391                 if msgFlag:
0392                     tmpStr = f"  skip site={tmpSiteName} due to status={tmpSiteSpec.status} criteria=-status"
0393                     if skipFlag:
0394                         msg_map[tmpSiteSpec.get_unified_name()] = (
0395                             f"  skip site={tmpSiteSpec.get_unified_name()} due to status={tmpSiteSpec.status} criteria=-status"
0396                         )
0397                     else:
0398                         siteSkippedTmp[tmpSiteName] = tmpStr
0399 
0400             tmpLog.set_message_slot()
0401             scanSiteList = newScanSiteList
0402             self.add_summary_message(oldScanSiteList, scanSiteList, "status check", tmpLog, msg_map)
0403             if not scanSiteList:
0404                 self.dump_summary(tmpLog)
0405                 tmpLog.error("no candidates")
0406                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0407                 return retTmpError
0408 
0409         #################################################
0410         # get the nucleus and the network map
0411         nucleus = taskSpec.nucleus
0412         storageMapping = self.taskBufferIF.getPandaSiteToOutputStorageSiteMapping()
0413 
0414         if nucleus:
0415             # get connectivity stats to the nucleus
0416             if inputChunk.isExpress():
0417                 transferred_tag = f"{URG_ACTIVITY}{TRANSFERRED_6H}"
0418                 queued_tag = f"{URG_ACTIVITY}{QUEUED}"
0419             else:
0420                 transferred_tag = f"{PRD_ACTIVITY}{TRANSFERRED_6H}"
0421                 queued_tag = f"{PRD_ACTIVITY}{QUEUED}"
0422 
0423             networkMap = self.taskBufferIF.getNetworkMetrics(nucleus, [AGIS_CLOSENESS, transferred_tag, queued_tag, FTS_1H, FTS_1D, FTS_1W])
0424 
0425         #####################################################
0426         # filtering out blacklisted or links with long queues
0427         if nucleus:
0428             # get the number of files being transferred to the nucleus
0429             if queued_tag in networkMap["total"]:
0430                 totalQueued = networkMap["total"][queued_tag]
0431             else:
0432                 totalQueued = 0
0433 
0434             tmpLog.debug(f"Total number of files being transferred to the nucleus : {totalQueued}")
0435             newScanSiteList = []
0436             oldScanSiteList = copy.copy(scanSiteList)
0437             newSkippedTmp = dict()
0438             msg_map = {}
0439             tmpLog.set_message_slot()
0440             for tmpPandaSiteName in self.get_unified_sites(scanSiteList):
0441                 try:
0442                     tmpAtlasSiteName = storageMapping[tmpPandaSiteName]["default"]
0443                     skipFlag = False
0444                     tempFlag = False
0445                     criteria = "-link_unusable"
0446                     from_str = f"from satellite={tmpAtlasSiteName}"
0447                     reason = ""
0448                     if nucleus == tmpAtlasSiteName:
0449                         # nucleus
0450                         pass
0451                     elif nucleus in nucleus_with_storages_unwritable_over_wan:
0452                         # destination blacklisted
0453                         reason = (
0454                             nucleusSpec.get_default_endpoint_out()["ddm_endpoint_name"]
0455                             + f" at nucleus={nucleus} unwritable over WAN write_wan={nucleus_with_storages_unwritable_over_wan[nucleus]}"
0456                         )
0457                         criteria = "-dest_blacklisted"
0458                         from_str = ""
0459                         tempFlag = True
0460                     elif totalQueued >= self.total_queue_threshold:
0461                         # total exceed
0462                         reason = f"too many output files being transferred to the nucleus {totalQueued}(>{self.total_queue_threshold} total limit)"
0463                         criteria = "-links_full"
0464                         from_str = ""
0465                         tempFlag = True
0466                     elif tmpAtlasSiteName not in networkMap:
0467                         # Don't skip missing links for the moment. In later stages missing links
0468                         # default to the worst connectivity and will be penalized.
0469                         pass
0470                     elif AGIS_CLOSENESS in networkMap[tmpAtlasSiteName] and networkMap[tmpAtlasSiteName][AGIS_CLOSENESS] == BLOCKED_LINK:
0471                         # blocked link
0472                         reason = f"agis_closeness={networkMap[tmpAtlasSiteName][AGIS_CLOSENESS]}"
0473                         skipFlag = True
0474                     elif queued_tag in networkMap[tmpAtlasSiteName] and networkMap[tmpAtlasSiteName][queued_tag] >= self.queue_threshold:
0475                         # too many queued
0476                         reason = f"too many output files queued in the channel {networkMap[tmpAtlasSiteName][queued_tag]}(>{self.queue_threshold} link limit)"
0477                         tempFlag = True
0478 
0479                     # add
0480                     tmpStr = f"  skip site={tmpPandaSiteName} due to {reason}"
0481                     if from_str:
0482                         tmpStr += f", {from_str} to nucleus={nucleus}"
0483                     tmpStr += f": criteria={criteria}"
0484                     if skipFlag:
0485                         msg_map[tmpPandaSiteName] = tmpStr
0486                     else:
0487                         newScanSiteList.append(tmpPandaSiteName)
0488                         if tempFlag:
0489                             newSkippedTmp[tmpPandaSiteName] = tmpStr
0490                 except KeyError:
0491                     # Don't skip missing links for the moment. In later stages missing links
0492                     # default to the worst connectivity and will be penalized.
0493                     newScanSiteList.append(tmpPandaSiteName)
0494             tmpLog.unset_message_slot()
0495             siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
0496             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0497             self.add_summary_message(oldScanSiteList, scanSiteList, "link check", tmpLog, msg_map)
0498             if not scanSiteList:
0499                 self.dump_summary(tmpLog)
0500                 tmpLog.error("no candidates")
0501                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0502                 return retTmpError
0503         ######################################
0504         # selection for high priorities
0505         t1WeightForHighPrio = 1
0506         if (taskSpec.currentPriority >= 800 or inputChunk.useScout()) and not sitePreAssigned and not siteListPreAssigned:
0507             if not taskSpec.useEventService():
0508                 if taskSpec.currentPriority >= 900 or inputChunk.useScout():
0509                     t1WeightForHighPrio = 100
0510             newScanSiteList = []
0511             oldScanSiteList = copy.copy(scanSiteList)
0512             msg_map = {}
0513             tmpLog.set_message_slot()
0514 
0515             for tmpSiteName in scanSiteList:
0516                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0517                 if not tmpSiteSpec.is_opportunistic():
0518                     newScanSiteList.append(tmpSiteName)
0519                 else:
0520                     tmp_msg = f"  skip site={tmpSiteName} to avoid opportunistic for high priority jobs "
0521                     tmp_msg += "criteria=-opportunistic"
0522                     msg_map[tmpSiteSpec.get_unified_name()] = (
0523                         f"  skip site={tmpSiteSpec.get_unified_name()} to avoid opportunistic for high priority jobs criteria=-opportunistic"
0524                     )
0525 
0526             tmpLog.unset_message_slot()
0527             scanSiteList = newScanSiteList
0528             self.add_summary_message(oldScanSiteList, scanSiteList, "opportunistic check", tmpLog, msg_map)
0529 
0530             if not scanSiteList:
0531                 self.dump_summary(tmpLog)
0532                 tmpLog.error("no candidates")
0533                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0534                 return retTmpError
0535 
0536         ######################################
0537         # selection to avoid slow or inactive sites
0538         if (
0539             (taskSpec.currentPriority >= 800 or inputChunk.useScout() or inputChunk.isMerging or taskSpec.mergeOutput())
0540             and not sitePreAssigned
0541             and not siteListPreAssigned
0542         ):
0543             # get inactive sites
0544             inactiveTimeLimit = 2
0545             inactiveSites = self.taskBufferIF.getInactiveSites_JEDI("production", inactiveTimeLimit)
0546             newScanSiteList = []
0547             oldScanSiteList = copy.copy(scanSiteList)
0548             newSkippedTmp = dict()
0549             tmpLog.set_message_slot()
0550 
0551             for tmpSiteName in self.get_unified_sites(scanSiteList):
0552                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0553                 nToGetAll = AtlasBrokerUtils.getNumJobs(jobStatMap, tmpSiteSpec.get_unified_name(), "activated") + AtlasBrokerUtils.getNumJobs(
0554                     jobStatMap, tmpSiteSpec.get_unified_name(), "starting"
0555                 )
0556 
0557                 if tmpSiteName in inactiveSites and nToGetAll > 0:
0558                     tmp_msg = (
0559                         f"  skip site={tmpSiteName} since high prio/scouts/merge needs to avoid inactive sites "
0560                         f"(laststart is older than {inactiveTimeLimit}h) criteria=-inactive"
0561                     )
0562                     # temporary problem
0563                     newSkippedTmp[tmpSiteName] = tmp_msg
0564                 newScanSiteList.append(tmpSiteName)
0565 
0566             tmpLog.unset_message_slot()
0567             siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
0568             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0569             self.add_summary_message(oldScanSiteList, scanSiteList, "slowness/inactive check", tmpLog, {})
0570             if not scanSiteList:
0571                 self.dump_summary(tmpLog)
0572                 tmpLog.error("no candidates")
0573                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0574                 return retTmpError
0575 
0576         ######################################
0577         # selection for fairshare
0578         if sitePreAssigned and taskSpec.prodSourceLabel not in [JobUtils.PROD_PS] or workQueue.queue_name not in ["test", "validation"]:
0579             newScanSiteList = []
0580             oldScanSiteList = copy.copy(scanSiteList)
0581             msg_map = {}
0582             tmpLog.set_message_slot()
0583             for tmpSiteName in scanSiteList:
0584                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0585                 # check at the site
0586                 if AtlasBrokerUtils.hasZeroShare(tmpSiteSpec, taskSpec, inputChunk.isMerging, tmpLog):
0587                     msg_map[tmpSiteSpec.get_unified_name()] = f"  skip site={tmpSiteSpec.get_unified_name()} due to zero share criteria=-zeroshare"
0588                     continue
0589                 newScanSiteList.append(tmpSiteName)
0590             tmpLog.unset_message_slot()
0591             scanSiteList = newScanSiteList
0592             self.add_summary_message(oldScanSiteList, scanSiteList, "zero share check", tmpLog, msg_map)
0593             if not scanSiteList:
0594                 self.dump_summary(tmpLog)
0595                 tmpLog.error("no candidates")
0596                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0597                 return retTmpError
0598 
0599         ######################################
0600         # selection for jumbo jobs
0601         if not sitePreAssigned and taskSpec.getNumJumboJobs() is None:
0602             newScanSiteList = []
0603             oldScanSiteList = copy.copy(scanSiteList)
0604             msg_map = {}
0605             tmpLog.set_message_slot()
0606             for tmpSiteName in scanSiteList:
0607                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0608                 if tmpSiteSpec.useJumboJobs():
0609                     msg_map[tmpSiteSpec.get_unified_name()] = f"  skip site={tmpSiteSpec.get_unified_name()} since it is only for jumbo jobs criteria=-jumbo"
0610                     continue
0611                 newScanSiteList.append(tmpSiteName)
0612             tmpLog.unset_message_slot()
0613             scanSiteList = newScanSiteList
0614             tmpLog.info(f"{len(scanSiteList)} candidates passed jumbo job check")
0615             self.add_summary_message(oldScanSiteList, scanSiteList, "jumbo job check", tmpLog, msg_map)
0616             if scanSiteList == []:
0617                 self.dump_summary(tmpLog)
0618                 tmpLog.error("no candidates")
0619                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0620                 return retTmpError
0621         elif not sitePreAssigned and taskSpec.getNumJumboJobs() is not None:
0622             nReadyEvents = self.taskBufferIF.getNumReadyEvents(taskSpec.jediTaskID)
0623             if nReadyEvents is not None:
0624                 newScanSiteList = []
0625                 oldScanSiteList = copy.copy(scanSiteList)
0626                 msg_map = {}
0627                 tmpLog.set_message_slot()
0628                 for tmpSiteName in scanSiteList:
0629                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0630                     if tmpSiteSpec.useJumboJobs():
0631                         minEvents = tmpSiteSpec.getMinEventsForJumbo()
0632                         if minEvents is not None and nReadyEvents < minEvents:
0633                             msg_map[tmpSiteSpec.get_unified_name()] = (
0634                                 f"  skip site={tmpSiteSpec.get_unified_name()} since not enough events ({minEvents}<{nReadyEvents}) "
0635                                 "for jumbo criteria=-fewevents"
0636                             )
0637                             continue
0638                     newScanSiteList.append(tmpSiteName)
0639                 tmpLog.unset_message_slot()
0640                 scanSiteList = newScanSiteList
0641                 self.add_summary_message(oldScanSiteList, scanSiteList, "jumbo events check", tmpLog, msg_map)
0642                 if not scanSiteList:
0643                     self.dump_summary(tmpLog)
0644                     tmpLog.error("no candidates")
0645                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0646                     return retTmpError
0647 
0648         ######################################
0649         # selection for iointensity limits
0650 
0651         # get default disk IO limit from GDP config
0652         max_diskio_per_core_default = self.taskBufferIF.getConfigValue(COMPONENT, "MAX_DISKIO_DEFAULT", APP, VO)
0653         if not max_diskio_per_core_default:
0654             max_diskio_per_core_default = 10**10
0655 
0656         # get the current disk IO usage per site
0657         diskio_percore_usage = self.taskBufferIF.getAvgDiskIO_JEDI()
0658         unified_site_list = self.get_unified_sites(scanSiteList)
0659         newScanSiteList = []
0660         oldScanSiteList = copy.copy(scanSiteList)
0661         msg_map = {}
0662         tmpLog.set_message_slot()
0663         newSkippedTmp = dict()
0664         for tmpSiteName in unified_site_list:
0665             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0666 
0667             # measured diskIO at queue
0668             diskio_usage_tmp = diskio_percore_usage.get(tmpSiteName, 0)
0669 
0670             # figure out queue or default limit
0671             if tmpSiteSpec.maxDiskio and tmpSiteSpec.maxDiskio > 0:
0672                 # there is a limit specified in AGIS
0673                 diskio_limit_tmp = tmpSiteSpec.maxDiskio
0674             else:
0675                 # we need to use the default value from GDP Config
0676                 diskio_limit_tmp = max_diskio_per_core_default
0677 
0678             # normalize task diskIO by site corecount
0679             diskio_task_tmp = taskSpec.diskIO
0680             if taskSpec.diskIO is not None and taskSpec.coreCount not in [None, 0, 1] and tmpSiteSpec.coreCount not in [None, 0]:
0681                 diskio_task_tmp = taskSpec.diskIO // tmpSiteSpec.coreCount
0682 
0683             try:  # generate a log message parseable by logstash for monitoring
0684                 log_msg = f"diskIO measurements: site={tmpSiteName} jediTaskID={taskSpec.jediTaskID} "
0685                 if diskio_task_tmp is not None:
0686                     log_msg += f"diskIO_task={diskio_task_tmp:.2f} "
0687                 if diskio_usage_tmp is not None:
0688                     log_msg += f"diskIO_site_usage={diskio_usage_tmp:.2f} "
0689                 if diskio_limit_tmp is not None:
0690                     log_msg += f"diskIO_site_limit={diskio_limit_tmp:.2f} "
0691                 tmpLog.info(log_msg)
0692             except Exception:
0693                 tmpLog.debug("diskIO measurements: Error generating diskIO message")
0694 
0695             # if the task has a diskIO defined, the queue is over the IO limit and the task IO is over the limit
0696             if diskio_task_tmp and diskio_usage_tmp and diskio_limit_tmp and diskio_usage_tmp > diskio_limit_tmp and diskio_task_tmp > diskio_limit_tmp:
0697                 tmp_msg = f"  skip site={tmpSiteName} due to diskIO overload criteria=-diskIO"
0698                 newSkippedTmp[tmpSiteName] = tmp_msg
0699                 msg_map[tmpSiteName] = tmp_msg
0700 
0701             newScanSiteList.append(tmpSiteName)
0702 
0703         tmpLog.unset_message_slot()
0704         siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
0705         scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0706         self.add_summary_message(oldScanSiteList, scanSiteList, "diskIO check", tmpLog, msg_map)
0707         if not scanSiteList:
0708             self.dump_summary(tmpLog)
0709             tmpLog.error("no candidates")
0710             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0711             return retTmpError
0712 
0713         ######################################
0714         # selection for MP
0715         if not sitePreAssigned:
0716             newScanSiteList = []
0717             oldScanSiteList = copy.copy(scanSiteList)
0718             msg_map = {}
0719             tmpLog.set_message_slot()
0720             max_core_count = taskSpec.get_max_core_count()
0721             for tmpSiteName in scanSiteList:
0722                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0723                 # check at the site
0724                 if useMP == "any" or (useMP == "only" and tmpSiteSpec.coreCount > 1) or (useMP == "unuse" and tmpSiteSpec.coreCount in [0, 1, None]):
0725                     if max_core_count and tmpSiteSpec.coreCount and tmpSiteSpec.coreCount > max_core_count:
0726                         msg_map[tmpSiteSpec.get_unified_name()] = (
0727                             f"  skip site={tmpSiteSpec.get_unified_name()} due to larger core count site:{tmpSiteSpec.coreCount} "
0728                             f"than task_max={max_core_count} criteria=-max_cpucore"
0729                         )
0730                     else:
0731                         newScanSiteList.append(tmpSiteName)
0732                 else:
0733                     msg_map[tmpSiteSpec.get_unified_name()] = (
0734                         f"  skip site={tmpSiteSpec.get_unified_name()} due to core mismatch "
0735                         f"site:{tmpSiteSpec.coreCount} <> task:{taskCoreCount} criteria=-cpucore"
0736                     )
0737             tmpLog.unset_message_slot()
0738             scanSiteList = newScanSiteList
0739             self.add_summary_message(oldScanSiteList, scanSiteList, "core count check", tmpLog, msg_map)
0740             if not scanSiteList:
0741                 self.dump_summary(tmpLog)
0742                 tmpLog.error("no candidates")
0743                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0744                 return retTmpError
0745 
0746         ######################################
0747         # selection for release and CPU/GPU architectures
0748         cmt_config = taskSpec.get_sw_platform()
0749         is_regexp_cmt_config = False
0750         if cmt_config:
0751             if re.match(cmt_config, cmt_config) is None:
0752                 is_regexp_cmt_config = True
0753         base_platform = taskSpec.get_base_platform()
0754         resolved_platforms = {}
0755         preference_weight_map = {}
0756         if taskSpec.transHome is not None:
0757             jsonCheck = AtlasBrokerUtils.JsonSoftwareCheck(self.siteMapper, self.sw_map, self.architecture_level_map)
0758             unified_site_list = self.get_unified_sites(scanSiteList)
0759 
0760             host_cpu_spec = taskSpec.get_host_cpu_spec()
0761             host_cpu_pref = taskSpec.get_host_cpu_preference()
0762             host_gpu_spec = taskSpec.get_host_gpu_spec()
0763 
0764             if taskSpec.get_base_platform() is None:
0765                 use_container = False
0766             else:
0767                 use_container = True
0768 
0769             need_cvmfs = False
0770 
0771             # 3 digits base release or normal tasks
0772             if (re.search("-\d+\.\d+\.\d+$", taskSpec.transHome) is not None) or (
0773                 re.search("rel_\d+(\n|$)", taskSpec.transHome) is None and re.search("\d{4}-\d{2}-\d{2}T\d{4}$", taskSpec.transHome) is None
0774             ):
0775                 cvmfs_repo = "atlas"
0776                 sw_project = taskSpec.transHome.split("-")[0]
0777                 sw_version = taskSpec.transHome.split("-")[1]
0778                 cmt_config_only = False
0779 
0780             # nightlies
0781             else:
0782                 cvmfs_repo = "nightlies"
0783                 sw_project = None
0784                 sw_version = None
0785                 cmt_config_only = False
0786 
0787             siteListWithSW, sitesNoJsonCheck, preference_weight_map = jsonCheck.check(
0788                 unified_site_list,
0789                 cvmfs_repo,
0790                 sw_project,
0791                 sw_version,
0792                 cmt_config,
0793                 need_cvmfs,
0794                 cmt_config_only,
0795                 need_container=use_container,
0796                 container_name=taskSpec.container_name,
0797                 only_tags_fc=taskSpec.use_only_tags_fc(),
0798                 host_cpu_specs=host_cpu_spec,
0799                 host_cpu_pref=host_cpu_pref,
0800                 host_gpu_spec=host_gpu_spec,
0801                 log_stream=tmpLog,
0802             )
0803             sitesAuto = copy.copy(siteListWithSW)
0804             sitesAny = []
0805 
0806             newScanSiteList = []
0807             oldScanSiteList = copy.copy(scanSiteList)
0808             msg_map = {}
0809             tmpLog.set_message_slot()
0810 
0811             for tmpSiteName in unified_site_list:
0812                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0813                 if cmt_config:
0814                     platforms = AtlasBrokerUtils.resolve_cmt_config(tmpSiteName, cmt_config, base_platform, self.sw_map)
0815                     if platforms:
0816                         resolved_platforms[tmpSiteName] = platforms
0817                 if tmpSiteName in siteListWithSW:
0818                     # passed
0819                     if not is_regexp_cmt_config or tmpSiteName in resolved_platforms:
0820                         newScanSiteList.append(tmpSiteName)
0821                     else:
0822                         # cmtconfig is not resolved
0823                         msg_map[tmpSiteName] = f"  skip site={tmpSiteName} due to unresolved regexp in cmtconfig={cmt_config} criteria=-regexpcmtconfig"
0824                 elif (
0825                     not (taskSpec.container_name and taskSpec.use_only_tags_fc())
0826                     and host_cpu_spec is None
0827                     and host_gpu_spec is None
0828                     and tmpSiteSpec.releases == ["ANY"]
0829                 ):
0830                     # release check is disabled or release is available
0831                     newScanSiteList.append(tmpSiteName)
0832                     sitesAny.append(tmpSiteName)
0833                 else:
0834                     if tmpSiteSpec.releases == ["AUTO"]:
0835                         autoStr = "with AUTO"
0836                     else:
0837                         autoStr = "without AUTO"
0838                     # release is unavailable
0839                     msg_map[tmpSiteName] = (
0840                         f"  skip site={tmpSiteName} {autoStr} due to missing SW cache={taskSpec.transHome}:{taskSpec.get_sw_platform()} sw_platform='{taskSpec.container_name}' "
0841                         f"or irrelevant HW cpu={str(host_cpu_spec)} gpu={str(host_gpu_spec)} criteria=-cache"
0842                     )
0843 
0844             tmpLog.unset_message_slot()
0845             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0846             self.add_summary_message(oldScanSiteList, scanSiteList, "SW/HW check", tmpLog, msg_map)
0847             tmpLog.info(f"   {len(sitesAuto)} with AUTO, {len(sitesAny)} with ANY")
0848             if not scanSiteList:
0849                 self.dump_summary(tmpLog)
0850                 tmpLog.error("no candidates")
0851                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0852                 return retTmpError
0853 
0854         ######################################
0855         # selection for memory
0856         origMinRamCount = inputChunk.getMaxRamCount()
0857         if origMinRamCount not in [0, None]:
0858             if inputChunk.isMerging:
0859                 strMinRamCount = f"{origMinRamCount}(MB)"
0860             else:
0861                 str_ram_unit = taskSpec.ramUnit
0862                 if str_ram_unit:
0863                     str_ram_unit = str_ram_unit.replace("PerCore", " ").strip()
0864                 strMinRamCount = f"{origMinRamCount}({str_ram_unit})"
0865             if not inputChunk.isMerging and taskSpec.baseRamCount not in [0, None]:
0866                 strMinRamCount += f"+{taskSpec.baseRamCount}"
0867             newScanSiteList = []
0868             oldScanSiteList = copy.copy(scanSiteList)
0869             msg_map = {}
0870             tmpLog.set_message_slot()
0871             for tmpSiteName in scanSiteList:
0872                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0873                 # job memory requirement
0874                 minRamCount = origMinRamCount
0875                 if taskSpec.ramPerCore() and not inputChunk.isMerging:
0876                     if tmpSiteSpec.coreCount not in [None, 0]:
0877                         minRamCount = origMinRamCount * tmpSiteSpec.coreCount
0878                     minRamCount += taskSpec.baseRamCount
0879                 # compensate
0880                 minRamCount = JobUtils.compensate_ram_count(minRamCount)
0881                 # site max memory requirement
0882                 site_maxmemory = 0
0883                 if tmpSiteSpec.maxrss not in [0, None]:
0884                     site_maxmemory = tmpSiteSpec.maxrss
0885                 # check at the site
0886                 if site_maxmemory not in [0, None] and minRamCount != 0 and minRamCount > site_maxmemory:
0887                     tmp_msg = f"  skip site={tmpSiteName} due to sue to insufficient RAM less than less than job's core-scaled requirement {minRamCount} MB "
0888                     tmp_msg += "criteria=-lowmemory"
0889                     msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
0890                     continue
0891                 # site min memory requirement
0892                 site_minmemory = 0
0893                 if tmpSiteSpec.minrss not in [0, None]:
0894                     site_minmemory = tmpSiteSpec.minrss
0895                 if site_minmemory not in [0, None] and minRamCount != 0 and minRamCount < site_minmemory:
0896                     tmp_msg = f"  skip site={tmpSiteName} due to RAM lower limit greater than than job's core-scaled requirement {minRamCount} MB "
0897                     tmp_msg += "criteria=-highmemory"
0898                     msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
0899                     continue
0900                 newScanSiteList.append(tmpSiteName)
0901             tmpLog.unset_message_slot()
0902             scanSiteList = newScanSiteList
0903             self.add_summary_message(oldScanSiteList, scanSiteList, "memory check", tmpLog, msg_map)
0904             if not scanSiteList:
0905                 self.dump_summary(tmpLog)
0906                 tmpLog.error("no candidates")
0907                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0908                 return retTmpError
0909 
0910         ######################################
0911         # selection for scratch disk
0912         if taskSpec.outputScaleWithEvents():
0913             minDiskCount = max(taskSpec.getOutDiskSize() * inputChunk.getMaxAtomSize(getNumEvents=True), inputChunk.defaultOutputSize)
0914         else:
0915             minDiskCount = max(taskSpec.getOutDiskSize() * inputChunk.getMaxAtomSize(effectiveSize=True), inputChunk.defaultOutputSize)
0916         minDiskCount += taskSpec.getWorkDiskSize()
0917         minDiskCountL = minDiskCount
0918         minDiskCountD = minDiskCount
0919         minDiskCountL += inputChunk.getMaxAtomSize()
0920         minDiskCountL = minDiskCountL // 1024 // 1024
0921         minDiskCountD = minDiskCountD // 1024 // 1024
0922         newScanSiteList = []
0923         oldScanSiteList = copy.copy(scanSiteList)
0924         msg_map = {}
0925         tmpLog.set_message_slot()
0926         for tmpSiteName in self.get_unified_sites(scanSiteList):
0927             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0928             # check direct access
0929             if taskSpec.allowInputLAN() == "only" and not tmpSiteSpec.isDirectIO() and not tmpSiteSpec.always_use_direct_io() and not inputChunk.isMerging:
0930                 tmp_msg = f"  skip site={tmpSiteName} since direct IO is disabled "
0931                 tmp_msg += "criteria=-remoteio"
0932                 msg_map[tmpSiteName] = tmp_msg
0933                 continue
0934             # check scratch size
0935             if tmpSiteSpec.maxwdir != 0:
0936                 if CoreUtils.use_direct_io_for_job(taskSpec, tmpSiteSpec, inputChunk):
0937                     # size for remote access
0938                     minDiskCount = minDiskCountD
0939                 else:
0940                     # size for copy-to-scratch
0941                     minDiskCount = minDiskCountL
0942 
0943                 # get site and task corecount to scale maxwdir
0944                 if tmpSiteSpec.coreCount in [None, 0, 1]:
0945                     site_cc = 1
0946                 else:
0947                     site_cc = tmpSiteSpec.coreCount
0948 
0949                 if taskSpec.coreCount in [None, 0, 1] and not taskSpec.useEventService():
0950                     task_cc = 1
0951                 else:
0952                     task_cc = site_cc
0953 
0954                 maxwdir_scaled = tmpSiteSpec.maxwdir * task_cc / site_cc
0955 
0956                 if minDiskCount > maxwdir_scaled:
0957                     tmp_msg = f"  skip site={tmpSiteName} due to small scratch disk {maxwdir_scaled} MB less than {minDiskCount} MB"
0958                     tmp_msg += " criteria=-disk"
0959                     msg_map[tmpSiteName] = tmp_msg
0960                     continue
0961                 newMaxwdir[tmpSiteName] = maxwdir_scaled
0962             newScanSiteList.append(tmpSiteName)
0963         tmpLog.unset_message_slot()
0964         scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0965         self.add_summary_message(oldScanSiteList, scanSiteList, "disk check", tmpLog, msg_map)
0966         if not scanSiteList:
0967             self.dump_summary(tmpLog)
0968             tmpLog.error("no candidates")
0969             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0970             return retTmpError
0971 
0972         ######################################
0973         # selection for available space in SE
0974         newScanSiteList = []
0975         oldScanSiteList = copy.copy(scanSiteList)
0976         msg_map = {}
0977         tmpLog.set_message_slot()
0978         newSkippedTmp = dict()
0979         for tmpSiteName in self.get_unified_sites(scanSiteList):
0980             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0981             scope_input, scope_output = select_scope(tmpSiteSpec, JobUtils.PROD_PS, JobUtils.PROD_PS)
0982             # check endpoint
0983             tmp_default_output_endpoint = tmpSiteSpec.ddm_endpoints_output[scope_output].getEndPoint(tmpSiteSpec.ddm_output[scope_output])
0984             tmp_msg = None
0985             # check free size on output endpoint
0986             if tmp_default_output_endpoint is not None:
0987                 tmpSpaceSize = 0
0988                 if tmp_default_output_endpoint["space_free"] is not None:
0989                     tmpSpaceSize += tmp_default_output_endpoint["space_free"]
0990                 if tmp_default_output_endpoint["space_expired"] is not None:
0991                     tmpSpaceSize += tmp_default_output_endpoint["space_expired"]
0992                 diskThreshold = 200
0993 
0994                 # skip_RSE_check: exceptional bypass of RSEs without storage reporting
0995                 if tmpSpaceSize < diskThreshold and "skip_RSE_check" not in tmpSiteSpec.catchall:
0996                     tmp_msg = (
0997                         f"  skip site={tmpSiteName} due to disk shortage at "
0998                         f"{tmpSiteSpec.ddm_output[scope_output]} {tmpSpaceSize}GB < {diskThreshold}GB criteria=-disk"
0999                     )
1000             # check if blacklisted
1001             if not tmp_msg:
1002                 tmp_msg = AtlasBrokerUtils.check_endpoints_with_blacklist(tmpSiteSpec, scope_input, scope_output, sites_in_nucleus, remote_source_available)
1003             if tmp_msg is not None:
1004                 newSkippedTmp[tmpSiteName] = tmp_msg
1005                 msg_map[tmpSiteName] = tmp_msg
1006             newScanSiteList.append(tmpSiteName)
1007         tmpLog.unset_message_slot()
1008         siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
1009         scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1010         self.add_summary_message(oldScanSiteList, scanSiteList, "Storage check", tmpLog, msg_map)
1011         if not scanSiteList:
1012             self.dump_summary(tmpLog)
1013             tmpLog.error("no candidates")
1014             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1015             return retTmpError
1016 
1017         ######################################
1018         # selection for walltime
1019         if taskSpec.useEventService() and not taskSpec.useJobCloning():
1020             nEsConsumers = taskSpec.getNumEventServiceConsumer()
1021             if nEsConsumers is None:
1022                 nEsConsumers = 1
1023             maxAttemptEsJob = taskSpec.getMaxAttemptEsJob()
1024             if maxAttemptEsJob is None:
1025                 maxAttemptEsJob = EventServiceUtils.defMaxAttemptEsJob
1026             else:
1027                 maxAttemptEsJob += 1
1028         else:
1029             nEsConsumers = 1
1030             maxAttemptEsJob = 1
1031         maxWalltime = None
1032         maxWalltime_dyn = None
1033         minWalltime_dyn = None
1034         minWalltime = None
1035         strMaxWalltime = None
1036         strMaxWalltime_dyn = None
1037         strMinWalltime_dyn = None
1038         if not taskSpec.useHS06():
1039             tmpMaxAtomSize = inputChunk.getMaxAtomSize(effectiveSize=True)
1040             if taskSpec.walltime is not None:
1041                 minWalltime = taskSpec.walltime * tmpMaxAtomSize
1042             else:
1043                 minWalltime = None
1044             # take # of consumers into account
1045             if not taskSpec.useEventService() or taskSpec.useJobCloning():
1046                 strMinWalltime = f"walltime*inputSize={taskSpec.walltime}*{tmpMaxAtomSize}"
1047             else:
1048                 strMinWalltime = f"walltime*inputSize/nEsConsumers/maxAttemptEsJob={taskSpec.walltime}*{tmpMaxAtomSize}/{nEsConsumers}/{maxAttemptEsJob}"
1049         else:
1050             tmpMaxAtomSize = inputChunk.getMaxAtomSize(getNumEvents=True)
1051             if taskSpec.getCpuTime() is not None:
1052                 minWalltime = taskSpec.getCpuTime() * tmpMaxAtomSize
1053                 if taskSpec.dynamicNumEvents():
1054                     # use minGranularity as the smallest chunk
1055                     minGranularity = taskSpec.get_min_granularity()
1056                     minWalltime_dyn = taskSpec.getCpuTime() * minGranularity
1057                     strMinWalltime_dyn = f"cpuTime*minGranularity={taskSpec.getCpuTime()}*{minGranularity}"
1058                     # use most consecutive events as the largest chunk
1059                     eventJump, totalEvents = inputChunk.check_event_jump_and_sum()
1060                     # use maxEventsPerJob if smaller
1061                     maxEventsPerJob = taskSpec.get_max_events_per_job()
1062                     if maxEventsPerJob:
1063                         totalEvents = min(totalEvents, maxEventsPerJob)
1064                     maxWalltime_dyn = taskSpec.getCpuTime() * totalEvents
1065                     strMaxWalltime_dyn = f"cpuTime*maxEventsPerJob={taskSpec.getCpuTime()}*{totalEvents}"
1066             # take # of consumers into account
1067             if not taskSpec.useEventService() or taskSpec.useJobCloning():
1068                 strMinWalltime = f"cpuTime*nEventsPerJob={taskSpec.getCpuTime()}*{tmpMaxAtomSize}"
1069             else:
1070                 strMinWalltime = f"cpuTime*nEventsPerJob/nEsConsumers/maxAttemptEsJob={taskSpec.getCpuTime()}*{tmpMaxAtomSize}/{nEsConsumers}/{maxAttemptEsJob}"
1071         if minWalltime:
1072             minWalltime /= nEsConsumers * maxAttemptEsJob
1073         newScanSiteList = []
1074         oldScanSiteList = copy.copy(scanSiteList)
1075         msg_map = {}
1076         tmpLog.set_message_slot()
1077         for tmpSiteName in scanSiteList:
1078             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1079             siteMaxTime = tmpSiteSpec.maxtime
1080             origSiteMaxTime = siteMaxTime
1081             # check max walltime at the site
1082             tmpSiteStr = f"{siteMaxTime}"
1083             if taskSpec.useHS06():
1084                 oldSiteMaxTime = siteMaxTime
1085                 siteMaxTime -= taskSpec.baseWalltime
1086                 tmpSiteStr = f"({oldSiteMaxTime}-{taskSpec.baseWalltime})"
1087             if siteMaxTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1088                 siteMaxTime *= tmpSiteSpec.coreCount
1089                 tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1090             if taskSpec.useHS06():
1091                 if siteMaxTime not in [None, 0]:
1092                     siteMaxTime *= tmpSiteSpec.corepower
1093                     tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1094                 siteMaxTime *= float(taskSpec.cpuEfficiency) / 100.0
1095                 siteMaxTime = int(siteMaxTime)
1096                 tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1097             if origSiteMaxTime != 0:
1098                 toSkip = False
1099                 if minWalltime_dyn and tmpSiteSpec.mintime and tmpSiteSpec.mintime > 0:
1100                     if minWalltime_dyn > siteMaxTime:
1101                         tmp_msg = f"  skip site={tmpSiteName} due to short site walltime {tmpSiteStr} " f"(site upper limit) less than {strMinWalltime_dyn} "
1102                         toSkip = True
1103                 else:
1104                     if minWalltime and minWalltime > siteMaxTime:
1105                         tmp_msg = f"  skip site={tmpSiteName} due to short site walltime {tmpSiteStr} " f"(site upper limit) less than {strMinWalltime} "
1106                         toSkip = True
1107                 if toSkip:
1108                     tmp_msg += "criteria=-shortwalltime"
1109                     msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1110                     continue
1111             # sending scouts or merge or walltime-undefined jobs to only sites where walltime is more than 1 day
1112             if (
1113                 (not sitePreAssigned and inputChunk.useScout())
1114                 or inputChunk.isMerging
1115                 or (not taskSpec.walltime and not taskSpec.walltimeUnit and not taskSpec.cpuTimeUnit)
1116                 or (not taskSpec.getCpuTime() and taskSpec.cpuTimeUnit)
1117             ):
1118                 minTimeForZeroWalltime = 24
1119                 str_minTimeForZeroWalltime = f"{minTimeForZeroWalltime}hr*10HS06s"
1120                 minTimeForZeroWalltime *= 60 * 60 * 10
1121 
1122                 if tmpSiteSpec.coreCount not in [None, 0]:
1123                     minTimeForZeroWalltime *= tmpSiteSpec.coreCount
1124                     str_minTimeForZeroWalltime += f"*{tmpSiteSpec.coreCount}cores"
1125 
1126                 if siteMaxTime != 0 and siteMaxTime < minTimeForZeroWalltime:
1127                     tmp_msg = f"  skip site={tmpSiteName} due to site walltime {tmpSiteStr} (site upper limit) insufficient "
1128                     if inputChunk.useScout():
1129                         tmp_msg += f"for scouts ({str_minTimeForZeroWalltime} at least) "
1130                         tmp_msg += "criteria=-scoutwalltime"
1131                     else:
1132                         tmp_msg += f"for zero walltime ({str_minTimeForZeroWalltime} at least) "
1133                         tmp_msg += "criteria=-zerowalltime"
1134                     msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1135                     continue
1136             # check min walltime at the site
1137             siteMinTime = tmpSiteSpec.mintime
1138             origSiteMinTime = siteMinTime
1139             tmpSiteStr = f"{siteMinTime}"
1140             if taskSpec.useHS06():
1141                 oldSiteMinTime = siteMinTime
1142                 siteMinTime -= taskSpec.baseWalltime
1143                 tmpSiteStr = f"({oldSiteMinTime}-{taskSpec.baseWalltime})"
1144             if siteMinTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1145                 siteMinTime *= tmpSiteSpec.coreCount
1146                 tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1147             if taskSpec.useHS06():
1148                 if siteMinTime not in [None, 0]:
1149                     siteMinTime *= tmpSiteSpec.corepower
1150                     tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1151                 siteMinTime *= float(taskSpec.cpuEfficiency) / 100.0
1152                 siteMinTime = int(siteMinTime)
1153                 tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1154             if origSiteMinTime != 0:
1155                 toSkip = False
1156                 if minWalltime_dyn and tmpSiteSpec.mintime and tmpSiteSpec.mintime > 0:
1157                     if minWalltime_dyn < siteMinTime and (maxWalltime_dyn is None or maxWalltime_dyn < siteMinTime):
1158                         tmp_msg = f"  skip site {tmpSiteName} due to short job walltime {tmpSiteStr} " f"(site lower limit) greater than {strMinWalltime_dyn} "
1159                         if maxWalltime_dyn:
1160                             tmp_msg += f"and {strMinWalltime_dyn} "
1161                         toSkip = True
1162                 else:
1163                     if (minWalltime is None or minWalltime < siteMinTime) and (maxWalltime is None or maxWalltime < siteMinTime):
1164                         tmp_msg = f"  skip site {tmpSiteName} due to short job walltime {tmpSiteStr} " f"(site lower limit) greater than {strMinWalltime} "
1165                         if maxWalltime:
1166                             tmp_msg += f"and {strMaxWalltime} "
1167                         toSkip = True
1168                 if toSkip:
1169                     tmp_msg += "criteria=-longwalltime"
1170                     msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1171                     continue
1172             newScanSiteList.append(tmpSiteName)
1173         tmpLog.unset_message_slot()
1174         scanSiteList = newScanSiteList
1175         if not taskSpec.useHS06():
1176             tmpLog.info(f"walltime check with {minWalltime}({taskSpec.walltimeUnit})")
1177         else:
1178             tmpStr = f"walltime check with {strMinWalltime}({taskSpec.cpuTimeUnit})"
1179             if maxWalltime:
1180                 tmpStr += f" and {strMaxWalltime}"
1181             if minWalltime_dyn:
1182                 tmpStr += f" for normal PQs, {strMinWalltime_dyn}"
1183                 if maxWalltime_dyn:
1184                     tmpStr += f" and {strMaxWalltime_dyn}"
1185                 tmpStr += " for PQs with non-zero mintime"
1186             tmpLog.info(tmpStr)
1187         self.add_summary_message(oldScanSiteList, scanSiteList, "walltime check", tmpLog, msg_map)
1188         if not scanSiteList:
1189             self.dump_summary(tmpLog)
1190             tmpLog.error("no candidates")
1191             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1192             return retTmpError
1193 
1194         ######################################
1195         # selection for network connectivity
1196         if not sitePreAssigned:
1197             ipConnectivity = taskSpec.getIpConnectivity()
1198             ipStack = taskSpec.getIpStack()
1199             if ipConnectivity or ipStack:
1200                 newScanSiteList = []
1201                 oldScanSiteList = copy.copy(scanSiteList)
1202                 msg_map = {}
1203                 tmpLog.set_message_slot()
1204                 for tmpSiteName in scanSiteList:
1205                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1206                     # check connectivity
1207                     if ipConnectivity:
1208                         wn_connectivity = tmpSiteSpec.get_wn_connectivity()
1209                         if wn_connectivity == "full":
1210                             pass
1211                         elif wn_connectivity == "http" and ipConnectivity == "http":
1212                             pass
1213                         else:
1214                             tmp_msg = f"  skip site={tmpSiteName} due to insufficient connectivity site={wn_connectivity} vs task={ipConnectivity} "
1215                             tmp_msg += "criteria=-network"
1216                             msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1217                             continue
1218                     # check IP stack
1219                     if ipStack:
1220                         wn_ipstack = tmpSiteSpec.get_ipstack()
1221                         if ipStack != wn_ipstack:
1222                             tmp_msg = f"  skip site={tmpSiteName} due to IP stack mismatch site={wn_ipstack} vs task={ipStack} "
1223                             tmp_msg += "criteria=-network"
1224                             msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1225                             continue
1226 
1227                     newScanSiteList.append(tmpSiteName)
1228                 tmpLog.unset_message_slot()
1229                 scanSiteList = newScanSiteList
1230                 self.add_summary_message(oldScanSiteList, scanSiteList, "network check", tmpLog, msg_map)
1231                 if not scanSiteList:
1232                     self.dump_summary(tmpLog)
1233                     tmpLog.error("no candidates")
1234                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1235                     return retTmpError
1236 
1237         ######################################
1238         # selection for event service
1239         if not sitePreAssigned:
1240             newScanSiteList = []
1241             oldScanSiteList = copy.copy(scanSiteList)
1242             msg_map = {}
1243             tmpLog.set_message_slot()
1244             for tmpSiteName in scanSiteList:
1245                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1246                 # event service
1247                 if taskSpec.useEventService() and not taskSpec.useJobCloning():
1248                     if tmpSiteSpec.getJobSeed() == "std":
1249                         tmp_msg = f"  skip site={tmpSiteName} since EventService is not allowed "
1250                         tmp_msg += "criteria=-es"
1251                         msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1252                         continue
1253                     if tmpSiteSpec.getJobSeed() == "eshigh" and not esHigh:
1254                         tmp_msg = f"  skip site={tmpSiteName} since low prio EventService is not allowed "
1255                         tmp_msg += "criteria=-eshigh"
1256                         msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1257                         continue
1258                 else:
1259                     if tmpSiteSpec.getJobSeed() == "es":
1260                         tmp_msg = f"  skip site={tmpSiteName} since only EventService is allowed "
1261                         tmp_msg += "criteria=-nones"
1262                         msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1263                         continue
1264                 # skip UCORE/SCORE
1265                 if (
1266                     taskSpec.useEventService()
1267                     and not taskSpec.useJobCloning()
1268                     and tmpSiteSpec.sitename != tmpSiteSpec.get_unified_name()
1269                     and tmpSiteSpec.coreCount == 1
1270                 ):
1271                     tmp_msg = f"  skip site={tmpSiteName} since EventService on UCORE/SCORE "
1272                     tmp_msg += "criteria=-es_ucore"
1273                     msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1274                     continue
1275                 newScanSiteList.append(tmpSiteName)
1276             tmpLog.unset_message_slot()
1277             scanSiteList = newScanSiteList
1278             self.add_summary_message(oldScanSiteList, scanSiteList, "EventService check", tmpLog, msg_map)
1279             if not scanSiteList:
1280                 self.dump_summary(tmpLog)
1281                 tmpLog.error("no candidates")
1282                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1283                 return retTmpError
1284 
1285         ######################################
1286         # selection for transferring
1287         newScanSiteList = []
1288         oldScanSiteList = copy.copy(scanSiteList)
1289         msg_map = {}
1290         tmpLog.set_message_slot()
1291         newSkippedTmp = dict()
1292         for tmpSiteName in self.get_unified_sites(scanSiteList):
1293             try:
1294                 tmpAtlasSiteName = storageMapping[tmpSiteName]["default"]
1295                 if tmpSiteName not in sites_in_nucleus + sites_sharing_output_storages_in_nucleus and nucleus != tmpAtlasSiteName:
1296                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1297                     # limit
1298                     def_maxTransferring = 2000
1299                     if tmpSiteSpec.transferringlimit == 0:
1300                         # use default value
1301                         maxTransferring = def_maxTransferring
1302                     else:
1303                         maxTransferring = tmpSiteSpec.transferringlimit
1304                     # transferring jobs with nuclei in downtime
1305                     n_jobs_bad_transfer = 0
1306                     if tmpSiteName in transferring_job_map:
1307                         for tmp_nucleus in transferring_job_map[tmpSiteName]:
1308                             if tmp_nucleus in nucleus_with_storages_unwritable_over_wan:
1309                                 n_jobs_bad_transfer += transferring_job_map[tmpSiteName][tmp_nucleus]
1310                     # check at the site
1311                     nTraJobs = AtlasBrokerUtils.getNumJobs(jobStatMap, tmpSiteName, "transferring") - n_jobs_bad_transfer
1312                     nRunJobs = AtlasBrokerUtils.getNumJobs(jobStatMap, tmpSiteName, "running")
1313                     if max(maxTransferring, 2 * nRunJobs) < nTraJobs:
1314                         tmpStr = "  skip site=%s due to too many transferring=%s greater than max(%s,2x%s) criteria=-transferring" % (
1315                             tmpSiteName,
1316                             nTraJobs,
1317                             maxTransferring,
1318                             nRunJobs,
1319                         )
1320                         newSkippedTmp[tmpSiteName] = tmpStr
1321                         msg_map[tmpSiteName] = tmpStr
1322             except KeyError:
1323                 pass
1324             newScanSiteList.append(tmpSiteName)
1325         tmpLog.unset_message_slot()
1326         siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
1327         scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1328         self.add_summary_message(oldScanSiteList, scanSiteList, "transferring check", tmpLog, msg_map)
1329         if not scanSiteList:
1330             self.dump_summary(tmpLog)
1331             tmpLog.error("no candidates")
1332             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1333             return retTmpError
1334 
1335         ######################################
1336         # selection for pledge when work is short
1337         if not sitePreAssigned and work_shortage:
1338             newScanSiteList = []
1339             oldScanSiteList = copy.copy(scanSiteList)
1340             msg_map = {}
1341             tmpLog.set_message_slot()
1342             for tmpSiteName in self.get_unified_sites(scanSiteList):
1343                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1344                 if tmpSiteSpec.is_opportunistic():
1345                     # skip opportunistic sites when plenty of work is unavailable
1346                     tmp_msg = f"  skip site={tmpSiteName} to avoid opportunistic in case of work shortage "
1347                     tmp_msg += "criteria=-non_pledged"
1348                     msg_map[tmpSiteName] = tmp_msg
1349                     continue
1350                 elif tmpSiteSpec.pledgedCPU is not None and tmpSiteSpec.pledgedCPU > 0:
1351                     # check number of cores
1352                     tmp_stat_dict = core_statistics.get(tmpSiteName, {})
1353                     n_running_cores = tmp_stat_dict.get("running", 0)
1354                     n_starting_cores = tmp_stat_dict.get("starting", 0)
1355                     tmpLog.debug(f"  {tmpSiteName} running={n_running_cores} starting={n_starting_cores}")
1356                     if n_running_cores + n_starting_cores > tmpSiteSpec.pledgedCPU:
1357                         tmp_msg = f"  skip site={tmpSiteName} since nCores(running+starting)={n_running_cores+n_starting_cores} more than pledgedCPU={tmpSiteSpec.pledgedCPU} "
1358                         tmp_msg += "in case of work shortage "
1359                         tmp_msg += "criteria=-over_pledged"
1360                         msg_map[tmpSiteName] = tmp_msg
1361                         continue
1362                 newScanSiteList.append(tmpSiteName)
1363             tmpLog.unset_message_slot()
1364             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1365             self.add_summary_message(oldScanSiteList, scanSiteList, "pledge check", tmpLog, msg_map)
1366             if not scanSiteList:
1367                 self.dump_summary(tmpLog)
1368                 tmpLog.error("no candidates")
1369                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1370                 return retTmpError
1371 
1372         ######################################
1373         # selection for T1 weight
1374         if taskSpec.getT1Weight() < 0 and not inputChunk.isMerging:
1375             useT1Weight = True
1376         else:
1377             useT1Weight = False
1378         t1Weight = taskSpec.getT1Weight()
1379         if t1Weight == 0:
1380             tmpLog.info(f"IO intensity {taskSpec.ioIntensity}")
1381             # use T1 weight if IO intensive
1382             t1Weight = 1
1383             if taskSpec.ioIntensity is not None and taskSpec.ioIntensity > 500:
1384                 t1Weight = WORLD_NUCLEUS_WEIGHT
1385 
1386         oldScanSiteList = copy.copy(scanSiteList)
1387         msg_map = {}
1388         if t1Weight < 0 and not inputChunk.isMerging:
1389             newScanSiteList = []
1390             tmpLog.set_message_slot()
1391             for tmpSiteName in scanSiteList:
1392                 if tmpSiteName not in sites_in_nucleus + sites_sharing_output_storages_in_nucleus:
1393                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1394                     msg_map[tmpSiteSpec.get_unified_name()] = f"  skip site={tmpSiteSpec.get_unified_name()} due to negative T1 weight criteria=-t1weight"
1395                     continue
1396                 newScanSiteList.append(tmpSiteName)
1397             tmpLog.unset_message_slot()
1398             scanSiteList = newScanSiteList
1399             t1Weight = 1
1400         t1Weight = max(t1Weight, t1WeightForHighPrio)
1401         tmpLog.info(f"T1 weight {t1Weight}")
1402         self.add_summary_message(oldScanSiteList, scanSiteList, "T1 weight check", tmpLog, msg_map)
1403         if not scanSiteList:
1404             self.dump_summary(tmpLog)
1405             tmpLog.error("no candidates")
1406             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1407             return retTmpError
1408 
1409         ######################################
1410         # selection for full chain
1411         if nucleusSpec:
1412             full_chain = taskSpec.check_full_chain_with_nucleus(nucleusSpec)
1413             oldScanSiteList = copy.copy(scanSiteList)
1414             msg_map = {}
1415             tmpLog.set_message_slot()
1416             newScanSiteList = []
1417             for tmpSiteName in scanSiteList:
1418                 if full_chain:
1419                     # skip PQs not in the nucleus
1420                     if tmpSiteName not in sites_in_nucleus:
1421                         tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1422                         msg_map[tmpSiteSpec.get_unified_name()] = (
1423                             f"  skip site={tmpSiteSpec.get_unified_name()} not in nucleus for full chain criteria=-full_chain"
1424                         )
1425                         continue
1426                 else:
1427                     # skip PQs only for full-chain
1428                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1429                     if tmpSiteSpec.bare_nucleus_mode() == "only":
1430                         msg_map[tmpSiteSpec.get_unified_name()] = f"  skip site={tmpSiteSpec.get_unified_name()} only for full chain criteria=-not_full_chain"
1431                         continue
1432                 newScanSiteList.append(tmpSiteName)
1433             tmpLog.unset_message_slot()
1434             scanSiteList = newScanSiteList
1435             self.add_summary_message(oldScanSiteList, scanSiteList, "full chain check", tmpLog, msg_map)
1436             if not scanSiteList:
1437                 self.dump_summary(tmpLog)
1438                 tmpLog.error("no candidates")
1439                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1440                 return retTmpError
1441 
1442         ######################################
1443         # selection for nPilot
1444         nPilotMap = {}
1445         if not sitePreAssigned and not siteListPreAssigned:
1446             nWNmap = self.taskBufferIF.getCurrentSiteData()
1447             newScanSiteList = []
1448             oldScanSiteList = copy.copy(scanSiteList)
1449             tmpLog.set_message_slot()
1450             newSkippedTmp = dict()
1451             for tmpSiteName in self.get_unified_sites(scanSiteList):
1452                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1453                 # check at the site
1454                 nPilot = 0
1455                 if tmpSiteName in nWNmap:
1456                     nPilot = nWNmap[tmpSiteName]["getJob"] + nWNmap[tmpSiteName]["updateJob"]
1457                 # skip no pilot sites unless the task and the site use jumbo jobs or the site is standby
1458                 if (
1459                     nPilot == 0
1460                     and ("test" not in taskSpec.prodSourceLabel or inputChunk.isExpress())
1461                     and (taskSpec.getNumJumboJobs() is None or not tmpSiteSpec.useJumboJobs())
1462                     and tmpSiteSpec.getNumStandby(wq_tag, taskSpec.resource_type) is None
1463                 ):
1464                     tmpStr = f"  skip site={tmpSiteName} due to no pilot criteria=-nopilot"
1465                     newSkippedTmp[tmpSiteName] = tmpStr
1466                 newScanSiteList.append(tmpSiteName)
1467                 nPilotMap[tmpSiteName] = nPilot
1468             tmpLog.unset_message_slot()
1469             siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
1470             scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1471             self.add_summary_message(oldScanSiteList, scanSiteList, "pilot check", tmpLog, {})
1472             if not scanSiteList:
1473                 self.dump_summary(tmpLog)
1474                 tmpLog.error("no candidates")
1475                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1476                 return retTmpError
1477 
1478         # return if to give a hint for task brokerage
1479         if hintForTB:
1480             ######################################
1481             # temporary problems
1482             newScanSiteList = []
1483             oldScanSiteList = copy.copy(scanSiteList)
1484             msg_map = {}
1485             tmpLog.set_message_slot()
1486             for tmpSiteName in scanSiteList:
1487                 if tmpSiteName in siteSkippedTmp:
1488                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1489                     msg_map[tmpSiteSpec.get_unified_name()] = siteSkippedTmp[tmpSiteName]
1490                 else:
1491                     newScanSiteList.append(tmpSiteName)
1492             tmpLog.unset_message_slot()
1493             scanSiteList = newScanSiteList
1494             self.add_summary_message(oldScanSiteList, scanSiteList, "temporary problem check", tmpLog, msg_map)
1495             if not scanSiteList:
1496                 self.dump_summary(tmpLog)
1497                 tmpLog.error("no candidates")
1498                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1499                 return retTmpError
1500             return self.SC_SUCCEEDED, scanSiteList
1501 
1502         ######################################
1503         # get available files
1504         siteSizeMap = {}
1505         siteSizeMapWT = {}
1506         availableFileMap = {}
1507         siteFilesMap = {}
1508         siteFilesMapWT = {}
1509         for datasetSpec in inputChunk.getDatasets():
1510             try:
1511                 # mapping between sites and input storage endpoints
1512                 siteStorageEP = AtlasBrokerUtils.getSiteInputStorageEndpointMap(
1513                     self.get_unified_sites(scanSiteList), self.siteMapper, JobUtils.PROD_PS, JobUtils.PROD_PS
1514                 )
1515                 # disable file lookup for merge jobs or secondary datasets
1516                 checkCompleteness = True
1517                 useCompleteOnly = False
1518                 if inputChunk.isMerging:
1519                     checkCompleteness = False
1520                 if not datasetSpec.isMaster() or (taskSpec.ioIntensity and taskSpec.ioIntensity > self.io_intensity_cutoff and not taskSpec.inputPreStaging()):
1521                     useCompleteOnly = True
1522                 # get available files per site/endpoint
1523                 tmpLog.debug(f"getting available files for {datasetSpec.datasetName}")
1524                 tmpAvFileMap = self.ddmIF.getAvailableFiles(
1525                     datasetSpec,
1526                     siteStorageEP,
1527                     self.siteMapper,
1528                     check_completeness=checkCompleteness,
1529                     storage_token=datasetSpec.storageToken,
1530                     complete_only=useCompleteOnly,
1531                 )
1532                 tmpLog.debug("got")
1533                 if tmpAvFileMap is None:
1534                     raise Interaction.JEDITemporaryError("ddmIF.getAvailableFiles failed")
1535                 availableFileMap[datasetSpec.datasetName] = tmpAvFileMap
1536             except Exception as e:
1537                 tmp_str = str(e).replace("\\n", " ")
1538                 tmpLog.error(f"failed to get available files with {tmp_str}")
1539                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1540                 return retTmpError
1541             # loop over all sites to get the size of available files
1542             for tmpSiteName in self.get_unified_sites(scanSiteList):
1543                 siteSizeMap.setdefault(tmpSiteName, 0)
1544                 siteSizeMapWT.setdefault(tmpSiteName, 0)
1545                 siteFilesMap.setdefault(tmpSiteName, set())
1546                 siteFilesMapWT.setdefault(tmpSiteName, set())
1547                 # get the total size of available files
1548                 if tmpSiteName in availableFileMap[datasetSpec.datasetName]:
1549                     availableFiles = availableFileMap[datasetSpec.datasetName][tmpSiteName]
1550                     for tmpFileSpec in availableFiles["localdisk"] + availableFiles["cache"]:
1551                         if tmpFileSpec.lfn not in siteFilesMap[tmpSiteName]:
1552                             siteSizeMap[tmpSiteName] += tmpFileSpec.fsize
1553                             siteSizeMapWT[tmpSiteName] += tmpFileSpec.fsize
1554                         siteFilesMap[tmpSiteName].add(tmpFileSpec.lfn)
1555                         siteFilesMapWT[tmpSiteName].add(tmpFileSpec.lfn)
1556                     for tmpFileSpec in availableFiles["localtape"]:
1557                         if tmpFileSpec.lfn not in siteFilesMapWT[tmpSiteName]:
1558                             siteSizeMapWT[tmpSiteName] += tmpFileSpec.fsize
1559                             siteFilesMapWT[tmpSiteName].add(tmpFileSpec.lfn)
1560         # get max total size
1561         allLFNs = set()
1562         totalSize = 0
1563         for datasetSpec in inputChunk.getDatasets():
1564             for fileSpec in datasetSpec.Files:
1565                 if fileSpec.lfn not in allLFNs:
1566                     allLFNs.add(fileSpec.lfn)
1567                     try:
1568                         totalSize += fileSpec.fsize
1569                     except Exception:
1570                         pass
1571         # get max num of available files
1572         maxNumFiles = len(allLFNs)
1573 
1574         ######################################
1575         # selection for fileSizeToMove
1576         moveSizeCutoffGB = self.taskBufferIF.getConfigValue(COMPONENT, "SIZE_CUTOFF_TO_MOVE_INPUT", APP, VO)
1577         if moveSizeCutoffGB is None:
1578             moveSizeCutoffGB = 10
1579         moveNumFilesCutoff = self.taskBufferIF.getConfigValue(COMPONENT, "NUM_CUTOFF_TO_MOVE_INPUT", APP, VO)
1580         if moveNumFilesCutoff is None:
1581             moveNumFilesCutoff = 100
1582         if (
1583             not sitePreAssigned
1584             and totalSize > 0
1585             and not inputChunk.isMerging
1586             and taskSpec.ioIntensity is not None
1587             and taskSpec.ioIntensity > self.io_intensity_cutoff
1588             and not (taskSpec.useEventService() and not taskSpec.useJobCloning())
1589         ):
1590             newScanSiteList = []
1591             newScanSiteListWT = []
1592             msgList = []
1593             msgListDT = []
1594             for tmpSiteName in self.get_unified_sites(scanSiteList):
1595                 # file size to move in MB
1596                 mbToMove = int((totalSize - siteSizeMap[tmpSiteName]) / (1024 * 1024))
1597                 nFilesToMove = maxNumFiles - len(siteFilesMap[tmpSiteName])
1598                 mbToMoveWT = int((totalSize - siteSizeMapWT[tmpSiteName]) / (1024 * 1024))
1599                 nFilesToMoveWT = maxNumFiles - len(siteFilesMapWT[tmpSiteName])
1600                 if min(mbToMove, mbToMoveWT) > moveSizeCutoffGB * 1024 or min(nFilesToMove, nFilesToMoveWT) > moveNumFilesCutoff:
1601                     tmp_msg = f"  skip site={tmpSiteName} "
1602                     if mbToMove > moveSizeCutoffGB * 1024:
1603                         tmp_msg += f"since size of missing input is too large ({int(mbToMove / 1024)} GB > {moveSizeCutoffGB} GB) "
1604                     else:
1605                         tmp_msg += f"since the number of missing input files is too large ({nFilesToMove} > {moveNumFilesCutoff}) "
1606                     tmp_msg += f"for IO intensive task ({taskSpec.ioIntensity} > {self.io_intensity_cutoff} kBPerS) criteria=-io"
1607                     msgListDT.append(tmp_msg)
1608                     continue
1609                 if mbToMove > moveSizeCutoffGB * 1024 or nFilesToMove > moveNumFilesCutoff:
1610                     tmp_msg = f"  skip site={tmpSiteName} "
1611                     if mbToMove > moveSizeCutoffGB * 1024:
1612                         tmp_msg += f"since size of missing disk input is too large ({int(mbToMove / 1024)} GB > {moveSizeCutoffGB} GB) "
1613                     else:
1614                         tmp_msg += f"since the number of missing disk input files is too large ({nFilesToMove} > {moveNumFilesCutoff}) "
1615                     tmp_msg += f"for IO intensive task ({taskSpec.ioIntensity} > {self.io_intensity_cutoff} kBPerS) criteria=-io"
1616                     msgList.append(tmp_msg)
1617                     newScanSiteListWT.append(tmpSiteName)
1618                 else:
1619                     newScanSiteList.append(tmpSiteName)
1620             if len(newScanSiteList + newScanSiteListWT) == 0:
1621                 # disable if no candidate
1622                 tmpLog.info("disabled IO check since no candidate passed")
1623             else:
1624                 tmpLog.set_message_slot()
1625                 for tmp_msg in msgListDT:
1626                     tmpLog.info(tmp_msg)
1627                 if len(newScanSiteList) == 0:
1628                     # use candidates with TAPE replicas
1629                     newScanSiteList = newScanSiteListWT
1630                 else:
1631                     for tmp_msg in msgList:
1632                         tmpLog.info(tmp_msg)
1633                 tmpLog.unset_message_slot()
1634                 oldScanSiteList = copy.copy(scanSiteList)
1635                 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1636                 self.add_summary_message(oldScanSiteList, scanSiteList, "IO check", tmpLog, {})
1637                 if scanSiteList == []:
1638                     self.dump_summary(tmpLog)
1639                     tmpLog.error("no candidates")
1640                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1641                     return retTmpError
1642 
1643         ######################################
1644         # temporary problems
1645         newScanSiteList = []
1646         oldScanSiteList = copy.copy(scanSiteList)
1647         msg_map = {}
1648         tmpLog.set_message_slot()
1649         for tmpSiteName in scanSiteList:
1650             if tmpSiteName in siteSkippedTmp:
1651                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1652                 msg_map[tmpSiteSpec.get_unified_name()] = siteSkippedTmp[tmpSiteName]
1653             else:
1654                 newScanSiteList.append(tmpSiteName)
1655         tmpLog.unset_message_slot()
1656         scanSiteList = newScanSiteList
1657         self.add_summary_message(oldScanSiteList, scanSiteList, "temporary problem check", tmpLog, msg_map)
1658         if scanSiteList == []:
1659             self.dump_summary(tmpLog)
1660             tmpLog.error("no candidates")
1661             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1662             return retTmpError
1663 
1664         ######################################
1665         # calculate weight
1666         jobStatPrioMapGS = dict()
1667         jobStatPrioMapGSOnly = dict()
1668         if workQueue.is_global_share:
1669             tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
1670         else:
1671             tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsWithWorkQueue_JEDI(taskSpec.vo, taskSpec.prodSourceLabel)
1672             if tmpSt:
1673                 tmpSt, jobStatPrioMapGS = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
1674                 tmpSt, jobStatPrioMapGSOnly = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo, True)
1675         if tmpSt:
1676             # count jobs per resource type
1677             tmpSt, tmpStatMapRT = self.taskBufferIF.getJobStatisticsByResourceTypeSite(workQueue)
1678         if not tmpSt:
1679             tmpLog.error("failed to get job statistics with priority")
1680             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1681             return retTmpError
1682         # get resource types per site
1683         site_resource_type_map = self.taskBufferIF.get_distinct_resource_types_per_site(taskSpec.jediTaskID)
1684         workerStat = self.taskBufferIF.ups_load_worker_stats()
1685         upsQueues = set(self.taskBufferIF.ups_get_queues())
1686         tmpLog.info(f"calculate weight and check cap for {len(scanSiteList)} candidates")
1687         cutoffName = f"NQUEUELIMITSITE_{taskSpec.gshare}"
1688         cutOffValue = self.taskBufferIF.getConfigValue(COMPONENT, cutoffName, APP, VO)
1689         if not cutOffValue:
1690             cutOffValue = 20
1691         else:
1692             tmpLog.info(f"using {cutoffName}={cutOffValue} as lower limit for nQueued")
1693         weightMapPrimary = {}
1694         weightMapSecondary = {}
1695         weightMapJumbo = {}
1696         largestNumRun = None
1697         for tmpPseudoSiteName in scanSiteList:
1698             tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1699             tmpSiteName = tmpSiteSpec.get_unified_name()
1700             if not workQueue.is_global_share and esHigh and tmpSiteSpec.getJobSeed() == "eshigh":
1701                 tmp_wq_tag = wq_tag_global_share
1702                 tmp_jobStatPrioMap = jobStatPrioMapGS
1703             else:
1704                 tmp_wq_tag = wq_tag
1705                 tmp_jobStatPrioMap = jobStatPrioMap
1706             nRunning = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "running", None, tmp_wq_tag)
1707             nRunningAll = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "running", None, None)
1708             corrNumPilotStr = ""
1709             if not workQueue.is_global_share:
1710                 # correction factor for nPilot
1711                 nRunningGS = AtlasBrokerUtils.getNumJobs(jobStatPrioMapGS, tmpSiteName, "running", None, wq_tag_global_share)
1712                 nRunningGSOnly = AtlasBrokerUtils.getNumJobs(jobStatPrioMapGSOnly, tmpSiteName, "running", None, wq_tag_global_share)
1713                 corrNumPilot = float(nRunningGS - nRunningGSOnly + 1) / float(nRunningGS + 1)
1714                 corrNumPilotStr = f"*(nRunResourceQueue({nRunningGS - nRunningGSOnly})+1)/(nRunGlobalShare({nRunningGS})+1)"
1715             else:
1716                 corrNumPilot = 1
1717             nDefined = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "defined", None, tmp_wq_tag) + self.getLiveCount(tmpSiteName)
1718             nAssigned = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "assigned", None, tmp_wq_tag)
1719             nActivated = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "activated", None, tmp_wq_tag)
1720             nStarting = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "starting", None, tmp_wq_tag)
1721             if tmpSiteName in nPilotMap:
1722                 nPilot = nPilotMap[tmpSiteName]
1723             else:
1724                 nPilot = 0
1725             # get num workers
1726             nWorkers = 0
1727             nWorkersCutoff = 20
1728             if tmpSiteName in workerStat:
1729                 for tmpHarvesterID, tmpLabelStat in workerStat[tmpSiteName].items():
1730                     for tmpHarvesterID, tmpResStat in tmpLabelStat.items():
1731                         for tmpResType, tmpCounts in tmpResStat.items():
1732                             for tmpStatus, tmpNum in tmpCounts.items():
1733                                 if tmpStatus in ["running", "submitted"]:
1734                                     nWorkers += tmpNum
1735                 # cap
1736                 nWorkers = min(nWorkersCutoff, nWorkers)
1737             # use nWorkers to bootstrap
1738             if (
1739                 nPilot > 0
1740                 and nRunning < nWorkersCutoff
1741                 and nWorkers > nRunning
1742                 and tmpSiteName in upsQueues
1743                 and taskSpec.currentPriority <= self.max_prio_for_bootstrap
1744             ):
1745                 tmpLog.debug(f"using nWorkers={nWorkers} as nRunning at {tmpPseudoSiteName} since original nRunning={nRunning} is low")
1746                 nRunning = nWorkers
1747             # take into account the number of standby jobs
1748             numStandby = tmpSiteSpec.getNumStandby(wq_tag, taskSpec.resource_type)
1749             if numStandby is None:
1750                 pass
1751             elif taskSpec.currentPriority > self.max_prio_for_bootstrap:
1752                 # don't use numSlots for high prio tasks
1753                 tmpLog.debug(f"ignored numSlots at {tmpPseudoSiteName} due to prio={taskSpec.currentPriority} > {self.max_prio_for_bootstrap}")
1754             elif numStandby == 0:
1755                 # use the number of starting jobs as the number of standby jobs
1756                 nRunning = nStarting + nRunning
1757                 tmpLog.debug(f"using nStarting+nRunning at {tmpPseudoSiteName} to set nRunning={nRunning} due to numSlot={numStandby}")
1758             else:
1759                 # the number of standby jobs is defined
1760                 nRunning = max(int(numStandby / tmpSiteSpec.coreCount), nRunning)
1761                 tmpLog.debug(f"using numSlots={numStandby}/coreCount at {tmpPseudoSiteName} to set nRunning={nRunning}")
1762             manyAssigned = float(nAssigned + 1) / float(nActivated + 1)
1763             manyAssigned = min(2.0, manyAssigned)
1764             manyAssigned = max(1.0, manyAssigned)
1765             # take into account nAssigned when jobs need input data transfer
1766             if tmpSiteName not in siteSizeMap or siteSizeMap[tmpSiteName] >= totalSize:
1767                 useAssigned = False
1768             else:
1769                 useAssigned = True
1770             # stat with resource type
1771             RT_Cap = 2
1772             useCapRT = False
1773             tmpRTqueue = 0
1774             tmpRTrunning = 0
1775             resource_type_str = taskSpec.resource_type
1776             if tmpSiteName in tmpStatMapRT:
1777                 # loop over all resource types for the site since task and job may have different resource types
1778                 tmp_resource_types = site_resource_type_map.get(tmpSiteName, [taskSpec.resource_type])
1779                 resource_type_str = ",".join(tmp_resource_types)
1780                 for tmp_resource_type in tmp_resource_types:
1781                     if tmp_resource_type in tmpStatMapRT[tmpSiteName]:
1782                         useCapRT = True
1783                         tmpRTrunning += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("running", 0)                        
1784                         tmpRTqueue += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("defined", 0)
1785                         if useAssigned:
1786                             tmpRTqueue += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("assigned", 0)
1787                         tmpRTqueue += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("activated", 0)
1788                         tmpRTqueue += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("starting", 0)
1789                 if useCapRT:
1790                     tmpRTrunning = max(tmpRTrunning, nRunning)
1791             if totalSize == 0 or totalSize - siteSizeMap[tmpSiteName] <= 0:
1792                 weight = float(nRunning + 1) / float(nActivated + nStarting + nDefined + 10)
1793                 weightStr = (
1794                     f"nRun={nRunning} nAct={nActivated} nStart={nStarting} nDef={nDefined} nPilot={nPilot}{corrNumPilotStr} "
1795                     f"totalSizeMB={int(totalSize / 1024 / 1024)} totalNumFiles={maxNumFiles} nRun_rt={tmpRTrunning} nQueued_rt={tmpRTqueue} resource_type={resource_type_str} "
1796                 )
1797             else:
1798                 weight = float(nRunning + 1) / float(nActivated + nAssigned + nStarting + nDefined + 10) / manyAssigned
1799                 weightStr = (
1800                     f"nRun={nRunning} nAct={nActivated} nAss={nAssigned} nStart={nStarting} nDef={nDefined} manyAss={manyAssigned} "
1801                     f"nPilot={nPilot}{corrNumPilotStr} totalSizeMB={int(totalSize / 1024 / 1024)} "
1802                     f"totalNumFiles={maxNumFiles} nRun_rt={tmpRTrunning} nQueued_rt={tmpRTqueue} resource_type={resource_type_str} "
1803                 )
1804 
1805             # reduce weights by taking data availability into account
1806             skipRemoteData = False
1807             if totalSize > 0:
1808                 # file size to move in MB
1809                 mbToMove = int((totalSize - siteSizeMap[tmpSiteName]) / (1024 * 1024))
1810                 # number of files to move
1811                 nFilesToMove = maxNumFiles - len(siteFilesMap[tmpSiteName])
1812                 # consider size and # of files
1813                 if tmpSiteSpec.use_only_local_data() and (mbToMove > 0 or nFilesToMove > 0):
1814                     skipRemoteData = True
1815                 else:
1816                     weight = weight * (totalSize + siteSizeMap[tmpSiteName]) / totalSize / (nFilesToMove / 100 + 1)
1817                     weightStr += f"fileSizeToMoveMB={mbToMove} nFilesToMove={nFilesToMove} "
1818 
1819             # T1 weight
1820             if tmpSiteName in sites_in_nucleus + sites_sharing_output_storages_in_nucleus:
1821                 weight *= t1Weight
1822                 weightStr += f"t1W={t1Weight} "
1823             if useT1Weight:
1824                 weightStr += f"nRunningAll={nRunningAll} "
1825 
1826             # apply network metrics to weight
1827             if nucleus:
1828                 tmpAtlasSiteName = None
1829                 try:
1830                     tmpAtlasSiteName = storageMapping[tmpSiteName]["default"]
1831                 except KeyError:
1832                     tmpLog.debug(f"Panda site {tmpSiteName} was not in site mapping. Default network values will be given")
1833 
1834                 try:
1835                     closeness = networkMap[tmpAtlasSiteName][AGIS_CLOSENESS]
1836                 except KeyError:
1837                     tmpLog.debug(f"No {AGIS_CLOSENESS} information found in network matrix from {tmpAtlasSiteName}({tmpSiteName}) to {nucleus}")
1838                     closeness = MAX_CLOSENESS * 0.7
1839 
1840                 try:
1841                     nFilesInQueue = networkMap[tmpAtlasSiteName][queued_tag]
1842                 except KeyError:
1843                     tmpLog.debug(f"No {queued_tag} information found in network matrix from {tmpAtlasSiteName} ({tmpSiteName}) to {nucleus}")
1844                     nFilesInQueue = 0
1845 
1846                 mbps = None
1847                 try:
1848                     mbps = networkMap[tmpAtlasSiteName][FTS_1W]
1849                     mbps = networkMap[tmpAtlasSiteName][FTS_1D]
1850                     mbps = networkMap[tmpAtlasSiteName][FTS_1H]
1851                 except KeyError:
1852                     if mbps is None:
1853                         tmpLog.debug(f"No dynamic FTS mbps information found in network matrix from {tmpAtlasSiteName}({tmpSiteName}) to {nucleus}")
1854 
1855                 # network weight: value between 1 and 2, except when nucleus == satellite
1856                 if nucleus == tmpAtlasSiteName:  # 25 percent weight boost for processing in nucleus itself
1857                     weightNwQueue = 2.5
1858                     weightNwThroughput = 2.5
1859                 else:
1860                     # queue weight: the more in queue, the lower the weight
1861                     weightNwQueue = 2 - (nFilesInQueue * 1.0 / self.queue_threshold)
1862 
1863                     # throughput weight: the higher the throughput, the higher the weight
1864                     if mbps is not None:
1865                         weightNwThroughput = self.convertMBpsToWeight(mbps)
1866                     else:
1867                         weightNwThroughput = 1 + ((MAX_CLOSENESS - closeness) * 1.0 / (MAX_CLOSENESS - MIN_CLOSENESS))
1868 
1869                 # combine queue and throughput weights
1870                 weightNw = self.nwQueueImportance * weightNwQueue + self.nwThroughputImportance * weightNwThroughput
1871 
1872                 weightStr += f"weightNw={weightNw} ( closeness={closeness} nFilesQueued={nFilesInQueue} throughputMBps={mbps} Network weight:{self.nwActive} )"
1873 
1874                 # If network measurements in active mode, apply the weight
1875                 if self.nwActive:
1876                     weight *= weightNw
1877 
1878                 tmpLog.debug(
1879                     f"subject=network_data src={tmpAtlasSiteName} dst={nucleus} weight={weight} weightNw={weightNw} "
1880                     f"weightNwThroughput={weightNwThroughput} weightNwQueued={weightNwQueue} mbps={mbps} closeness={closeness} nqueued={nFilesInQueue}"
1881                 )
1882 
1883             # apply architecture (x86-v2, 3, 4...) preference to weight
1884             if preference_weight_map and tmpSiteName in preference_weight_map:
1885                 pref_weight = preference_weight_map[tmpSiteName]
1886                 weight *= pref_weight
1887                 weightStr += f"prefW={pref_weight} "
1888                 tmpLog.info(f"prefW={pref_weight} ")
1889 
1890             # make candidate
1891             siteCandidateSpec = SiteCandidate(tmpPseudoSiteName, tmpSiteName)
1892             # override attributes
1893             siteCandidateSpec.override_attribute("maxwdir", newMaxwdir.get(tmpSiteName))
1894             platforms = resolved_platforms.get(tmpSiteName)
1895             if platforms:
1896                 siteCandidateSpec.override_attribute("platforms", platforms)
1897             # set weight and params
1898             siteCandidateSpec.weight = weight
1899             siteCandidateSpec.nRunningJobs = nRunning
1900             siteCandidateSpec.nAssignedJobs = nAssigned
1901             # set available files
1902             for tmpDatasetName, availableFiles in availableFileMap.items():
1903                 if tmpSiteName in availableFiles:
1904                     siteCandidateSpec.add_local_disk_files(availableFiles[tmpSiteName]["localdisk"])
1905                     siteCandidateSpec.add_local_tape_files(availableFiles[tmpSiteName]["localtape"])
1906                     siteCandidateSpec.add_cache_files(availableFiles[tmpSiteName]["cache"])
1907                     siteCandidateSpec.add_remote_files(availableFiles[tmpSiteName]["remote"])
1908             # add files as remote since WAN access is allowed
1909             if taskSpec.allowInputWAN() and tmpSiteSpec.allowWanInputAccess():
1910                 siteCandidateSpec.remoteProtocol = "direct"
1911                 for datasetSpec in inputChunk.getDatasets():
1912                     siteCandidateSpec.add_remote_files(datasetSpec.Files)
1913 
1914             # check if site is locked
1915             lockedByBrokerage = self.checkSiteLock(taskSpec.vo, taskSpec.prodSourceLabel, tmpPseudoSiteName, taskSpec.workQueue_ID, taskSpec.resource_type)
1916 
1917             # check cap with nRunning
1918             nPilot *= corrNumPilot
1919             cutOffFactor = 2
1920             if tmpSiteSpec.capability == "ucore":
1921                 if not inputChunk.isExpress():
1922                     siteCandidateSpec.nRunningJobsCap = max(cutOffValue, cutOffFactor * tmpRTrunning)
1923                 siteCandidateSpec.nQueuedJobs = tmpRTqueue
1924             else:
1925                 if not inputChunk.isExpress():
1926                     siteCandidateSpec.nRunningJobsCap = max(cutOffValue, cutOffFactor * nRunning)
1927                 if useAssigned:
1928                     siteCandidateSpec.nQueuedJobs = nActivated + nAssigned + nStarting
1929                 else:
1930                     siteCandidateSpec.nQueuedJobs = nActivated + nStarting
1931             if taskSpec.getNumJumboJobs() is None or not tmpSiteSpec.useJumboJobs():
1932                 forJumbo = False
1933             else:
1934                 forJumbo = True
1935             # OK message. Use jumbo as primary by default
1936             if not forJumbo:
1937                 okMsg = f"  use site={tmpPseudoSiteName} with weight={weight} {weightStr} criteria=+use"
1938                 okAsPrimary = False
1939             else:
1940                 okMsg = f"  use site={tmpPseudoSiteName} for jumbo jobs with weight={weight} {weightStr} criteria=+usejumbo"
1941                 okAsPrimary = True
1942             # checks
1943             if lockedByBrokerage:
1944                 ngMsg = f"  skip site={tmpPseudoSiteName} due to locked by another brokerage criteria=-lock"
1945             elif skipRemoteData:
1946                 ngMsg = f"  skip site={tmpPseudoSiteName} due to non-local data criteria=-non_local"
1947             elif not inputChunk.isExpress() and tmpSiteSpec.capability != "ucore" and siteCandidateSpec.nQueuedJobs > siteCandidateSpec.nRunningJobsCap:
1948                 if not useAssigned:
1949                     ngMsg = f"  skip site={tmpPseudoSiteName} weight={weight} due to nDefined+nActivated+nStarting={siteCandidateSpec.nQueuedJobs} "
1950                     ngMsg += "(nAssigned ignored due to data locally available) "
1951                 else:
1952                     ngMsg = f"  skip site={tmpPseudoSiteName} weight={weight} due to nDefined+nActivated+nAssigned+nStarting={siteCandidateSpec.nQueuedJobs} "
1953                 ngMsg += f"greater than max({cutOffValue},{cutOffFactor}*nRun) {weightStr} criteria=-cap"
1954             elif self.nwActive and inputChunk.isExpress() and weightNw < self.nw_threshold * self.nw_weight_multiplier:
1955                 ngMsg = (
1956                     f"  skip site={tmpPseudoSiteName} due to low network weight for express task weightNw={weightNw} threshold={self.nw_threshold} "
1957                     f"{weightStr} criteria=-lowNetworkWeight"
1958                 )
1959             elif useCapRT and tmpRTqueue > max(cutOffValue, tmpRTrunning * RT_Cap) and not inputChunk.isExpress():
1960                 ngMsg = f"  skip site={tmpSiteName} since "
1961                 if useAssigned:
1962                     ngMsg += f"nDefined_rt+nActivated_rt+nAssigned_rt+nStarting_rt={tmpRTqueue} "
1963                 else:
1964                     ngMsg += f"nDefined_rt+nActivated_rt+nStarting_rt={tmpRTqueue} (nAssigned_rt ignored due to data locally available) "
1965                 ngMsg += f"with gshare+resource_type({resource_type_str}) is greater than max({cutOffValue},{RT_Cap}*nRun_rt={RT_Cap}*{tmpRTrunning}) criteria=-cap_rt"
1966             elif (
1967                 nRunning + nActivated + nAssigned + nStarting + nDefined == 0
1968                 and taskSpec.currentPriority <= self.max_prio_for_bootstrap
1969                 and not inputChunk.isMerging
1970                 and not inputChunk.isExpress()
1971             ):
1972                 okMsg = f"  use site={tmpPseudoSiteName} to bootstrap (no running or queued jobs) criteria=+use"
1973                 ngMsg = (
1974                     f"  skip site={tmpPseudoSiteName} as others being bootstrapped (no running or queued jobs), "
1975                     f"weight={weight} {weightStr} criteria=-others_bootstrap"
1976                 )
1977                 okAsPrimary = True
1978                 # set weight to 0 for subsequent processing
1979                 weight = 0
1980                 siteCandidateSpec.weight = weight
1981             else:
1982                 if min_weight > 0 and weight < min_weight:
1983                     ngMsg = (
1984                         f"  skip site={tmpPseudoSiteName} due to weight below the minimum {min_weight_param}={min_weight}, "
1985                         f"weight={weight} {weightStr} criteria=-below_min_weight"
1986                     )
1987                 elif useT1Weight:
1988                     ngMsg = f"  skip site={tmpPseudoSiteName} due to low total nRunningAll={nRunningAll} for negative T1 weight criteria=-t1_weight"
1989                     if not largestNumRun or largestNumRun[-1] < nRunningAll:
1990                         largestNumRun = (tmpPseudoSiteName, nRunningAll)
1991                         okAsPrimary = True
1992                         # copy primaries to secondary map
1993                         for tmpWeight, tmpCandidates in weightMapPrimary.items():
1994                             weightMapSecondary.setdefault(tmpWeight, [])
1995                             weightMapSecondary[tmpWeight] += tmpCandidates
1996                         weightMapPrimary = {}
1997                 else:
1998                     ngMsg = f"  skip site={tmpPseudoSiteName} due to low weight, weight={weight} {weightStr} criteria=-low_weight"
1999                     okAsPrimary = True
2000             # add to jumbo or primary or secondary
2001             if forJumbo:
2002                 # only OK sites for jumbo
2003                 if not okAsPrimary:
2004                     continue
2005                 weightMap = weightMapJumbo
2006             elif okAsPrimary:
2007                 weightMap = weightMapPrimary
2008             else:
2009                 weightMap = weightMapSecondary
2010             # add weight
2011             if weight not in weightMap:
2012                 weightMap[weight] = []
2013             weightMap[weight].append((siteCandidateSpec, okMsg, ngMsg))
2014         # use only primary candidates
2015         weightMap = weightMapPrimary
2016         # use all weights
2017         weightRank = None
2018         # dump NG message
2019         for tmpWeight in weightMapSecondary.keys():
2020             for siteCandidateSpec, tmpOkMsg, tmpNgMsg in weightMapSecondary[tmpWeight]:
2021                 tmpLog.info(tmpNgMsg)
2022         if weightMapPrimary == {}:
2023             tmpLog.info("available sites all capped")
2024         # add jumbo sites
2025         for weight, tmpList in weightMapJumbo.items():
2026             if weight not in weightMap:
2027                 weightMap[weight] = []
2028             for tmpItem in tmpList:
2029                 weightMap[weight].append(tmpItem)
2030         # max candidates for WORLD
2031         maxSiteCandidates = 10
2032         newScanSiteList = []
2033         weightList = sorted(weightMap.keys())
2034         weightList.reverse()
2035         # put 0 at the head of the list to give priorities bootstrap PQs
2036         if 0 in weightList:
2037             weightList.remove(0)
2038             weightList.insert(0, 0)
2039         for weightIdx, tmpWeight in enumerate(weightList):
2040             for siteCandidateSpec, tmpOkMsg, tmpNgMsg in weightMap[tmpWeight]:
2041                 # candidates for jumbo jobs
2042                 if taskSpec.getNumJumboJobs() is not None:
2043                     tmpSiteSpec = self.siteMapper.getSite(siteCandidateSpec.siteName)
2044                     if tmpSiteSpec.useJumboJobs():
2045                         # use site for jumbo jobs
2046                         tmpLog.info(tmpOkMsg)
2047                         inputChunk.addSiteCandidateForJumbo(siteCandidateSpec)
2048                         if inputChunk.useJumbo not in ["fake", "only"]:
2049                             continue
2050                 # candidates for normal jobs
2051                 if (weightRank is None or weightIdx < weightRank) and (maxSiteCandidates is None or len(newScanSiteList) < maxSiteCandidates):
2052                     # use site
2053                     tmpLog.info(tmpOkMsg)
2054                     newScanSiteList.append(siteCandidateSpec.siteName)
2055                     inputChunk.addSiteCandidate(siteCandidateSpec)
2056                 else:
2057                     # dump NG message
2058                     tmpLog.info(tmpNgMsg)
2059         oldScanSiteList = copy.copy(scanSiteList)
2060         scanSiteList = newScanSiteList
2061         self.add_summary_message(oldScanSiteList, scanSiteList, "final check", tmpLog, {})
2062         # final check
2063         if scanSiteList == []:
2064             self.dump_summary(tmpLog)
2065             tmpLog.error("no candidates")
2066             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
2067             return retTmpError
2068 
2069         self.dump_summary(tmpLog, scanSiteList)
2070         # return
2071         tmpLog.info("done")
2072         return self.SC_SUCCEEDED, inputChunk