File indexing completed on 2026-04-10 08:38:57
0001 import copy
0002 import datetime
0003 import re
0004
0005 from pandacommon.pandalogger.PandaLogger import PandaLogger
0006 from pandacommon.pandautils.PandaUtils import naive_utcnow
0007
0008 from pandajedi.jedicore import Interaction
0009 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0010 from pandajedi.jedicore.SiteCandidate import SiteCandidate
0011 from pandaserver.dataservice import DataServiceUtils
0012 from pandaserver.dataservice.DataServiceUtils import select_scope
0013 from pandaserver.srvcore import CoreUtils
0014 from pandaserver.taskbuffer import EventServiceUtils, JobUtils
0015
0016 from . import AtlasBrokerUtils
0017 from .JobBrokerBase import JobBrokerBase
0018
0019 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0020
0021
0022 AGIS_CLOSENESS = "AGIS_closeness"
0023 BLOCKED_LINK = -1
0024 MIN_CLOSENESS = 0
0025 MAX_CLOSENESS = 11
0026
0027 TRANSFERRED_1H = "_done_1h"
0028 TRANSFERRED_6H = "_done_6h"
0029 QUEUED = "_queued"
0030 ZERO_TRANSFERS = 0.00001
0031 URG_ACTIVITY = "Express"
0032 PRD_ACTIVITY = "Production Output"
0033
0034 FTS_1H = "dashb_mbps_1h"
0035 FTS_1D = "dashb_mbps_1d"
0036 FTS_1W = "dashb_mbps_1w"
0037 APP = "jedi"
0038 COMPONENT = "jobbroker"
0039 VO = "atlas"
0040
0041 WORLD_NUCLEUS_WEIGHT = 4
0042
0043
0044
0045 class AtlasProdJobBroker(JobBrokerBase):
0046
0047 def __init__(self, ddmIF, taskBufferIF):
0048 JobBrokerBase.__init__(self, ddmIF, taskBufferIF)
0049 self.dataSiteMap = {}
0050 self.suppressLogSending = False
0051
0052 self.nwActive = taskBufferIF.getConfigValue(COMPONENT, "NW_ACTIVE", APP, VO)
0053 if self.nwActive is None:
0054 self.nwActive = False
0055
0056 self.nwQueueImportance = taskBufferIF.getConfigValue(COMPONENT, "NW_QUEUE_IMPORTANCE", APP, VO)
0057 if self.nwQueueImportance is None:
0058 self.nwQueueImportance = 0.5
0059 self.nwThroughputImportance = 1 - self.nwQueueImportance
0060
0061 self.nw_threshold = taskBufferIF.getConfigValue(COMPONENT, "NW_THRESHOLD", APP, VO)
0062 if self.nw_threshold is None:
0063 self.nw_threshold = 1.7
0064
0065 self.queue_threshold = taskBufferIF.getConfigValue(COMPONENT, "NQUEUED_SAT_CAP", APP, VO)
0066 if self.queue_threshold is None:
0067 self.queue_threshold = 150
0068
0069 self.total_queue_threshold = taskBufferIF.getConfigValue(COMPONENT, "NQUEUED_NUC_CAP_FOR_JOBS", APP, VO)
0070 if self.total_queue_threshold is None:
0071 self.total_queue_threshold = 10000
0072
0073 self.nw_weight_multiplier = taskBufferIF.getConfigValue(COMPONENT, "NW_WEIGHT_MULTIPLIER", APP, VO)
0074 if self.nw_weight_multiplier is None:
0075 self.nw_weight_multiplier = 1
0076
0077 self.io_intensity_cutoff = taskBufferIF.getConfigValue(COMPONENT, "IO_INTENSITY_CUTOFF", APP, VO)
0078 if self.io_intensity_cutoff is None:
0079 self.io_intensity_cutoff = 200
0080
0081 self.max_prio_for_bootstrap = taskBufferIF.getConfigValue(COMPONENT, "MAX_PRIO_TO_BOOTSTRAP", APP, VO)
0082 if self.max_prio_for_bootstrap is None:
0083 self.max_prio_for_bootstrap = 150
0084
0085
0086 try:
0087 self.sw_map = taskBufferIF.load_sw_map()
0088 except BaseException:
0089 logger.error("Failed to load the SW tags map!!!")
0090 self.sw_map = {}
0091
0092
0093 try:
0094 self.architecture_level_map = taskBufferIF.get_architecture_level_map()
0095 except BaseException:
0096 logger.error("Failed to load the WN architecture level map!!!")
0097 self.architecture_level_map = {}
0098
0099 def convertMBpsToWeight(self, mbps):
0100 """
0101 Takes MBps value and converts to a weight between 1 and 2
0102 """
0103 mbps_thresholds = [200, 100, 75, 50, 20, 10, 2, 1]
0104 weights = [2, 1.9, 1.8, 1.6, 1.5, 1.3, 1.2, 1.1]
0105
0106 for threshold, weight in zip(mbps_thresholds, weights):
0107 if mbps > threshold:
0108 return weight
0109 return 1
0110
0111
0112 def doBrokerage(self, taskSpec, cloudName, inputChunk, taskParamMap, hintForTB=False, siteListForTB=None, glLog=None):
0113
0114 if hintForTB:
0115 self.suppressLogSending = True
0116
0117 if glLog is None:
0118 tmpLog = MsgWrapper(
0119 logger,
0120 f"<jediTaskID={taskSpec.jediTaskID}>",
0121 monToken=f"<jediTaskID={taskSpec.jediTaskID} {naive_utcnow().isoformat('/')}>",
0122 )
0123 else:
0124 tmpLog = glLog
0125
0126 if not hintForTB:
0127 tmpLog.debug(f"start currentPriority={taskSpec.currentPriority}")
0128
0129 if self.nwActive:
0130 tmpLog.debug("Network weights are ACTIVE!")
0131 else:
0132 tmpLog.debug("Network weights are PASSIVE!")
0133
0134 timeNow = naive_utcnow()
0135
0136
0137 retTmpError = self.SC_FAILED, inputChunk
0138
0139
0140 newMaxwdir = {}
0141
0142
0143 work_shortage = self.taskBufferIF.getConfigValue("core", "WORK_SHORTAGE", APP, VO)
0144 if work_shortage is True:
0145 tmp_status, core_statistics = self.taskBufferIF.get_core_statistics(taskSpec.vo, taskSpec.prodSourceLabel)
0146 if not tmp_status:
0147 tmpLog.error("failed to get core statistics")
0148 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0149 return retTmpError
0150 tmpLog.debug(f"Work shortage is {work_shortage}")
0151 else:
0152 core_statistics = {}
0153
0154
0155 max_missing_input_files = self.taskBufferIF.getConfigValue("jobbroker", "MAX_MISSING_INPUT_FILES", "jedi", taskSpec.vo)
0156 if max_missing_input_files is None:
0157 max_missing_input_files = 10
0158 min_input_completeness = self.taskBufferIF.getConfigValue("jobbroker", "MIN_INPUT_COMPLETENESS", "jedi", taskSpec.vo)
0159 if min_input_completeness is None:
0160 min_input_completeness = 90
0161
0162
0163 min_weight_param = f"MIN_WEIGHT_{taskSpec.prodSourceLabel}_{taskSpec.gshare}"
0164 min_weight = self.taskBufferIF.getConfigValue("jobbroker", min_weight_param, "jedi", taskSpec.vo)
0165 if min_weight is None:
0166 min_weight_param = f"MIN_WEIGHT_{taskSpec.prodSourceLabel}"
0167 min_weight = self.taskBufferIF.getConfigValue("jobbroker", min_weight_param, "jedi", taskSpec.vo)
0168 if min_weight is None:
0169 min_weight = 0
0170
0171
0172 siteSkippedTmp = dict()
0173 sitePreAssigned = False
0174 siteListPreAssigned = False
0175 if siteListForTB is not None:
0176 scanSiteList = siteListForTB
0177
0178 elif taskSpec.site not in ["", None] and inputChunk.getPreassignedSite() is None:
0179
0180 scanSiteList = []
0181 for tmpSiteName in taskSpec.site.split(","):
0182 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0183 if tmpSiteSpec is None or not tmpSiteSpec.is_unified:
0184 scanSiteList.append(tmpSiteName)
0185 else:
0186 scanSiteList += self.get_pseudo_sites([tmpSiteName], self.siteMapper.getCloud(cloudName)["sites"])
0187 for tmpSiteName in scanSiteList:
0188 tmpLog.info(f"site={tmpSiteName} is pre-assigned criteria=+preassign")
0189 if len(scanSiteList) > 1:
0190 siteListPreAssigned = True
0191 else:
0192 sitePreAssigned = True
0193
0194 elif inputChunk.getPreassignedSite() is not None:
0195 if (
0196 inputChunk.masterDataset.creationTime is not None
0197 and inputChunk.masterDataset.modificationTime is not None
0198 and inputChunk.masterDataset.modificationTime != inputChunk.masterDataset.creationTime
0199 and timeNow - inputChunk.masterDataset.modificationTime > datetime.timedelta(hours=24)
0200 and taskSpec.frozenTime is not None
0201 and timeNow - taskSpec.frozenTime > datetime.timedelta(hours=6)
0202 ):
0203
0204 tmpLog.info("ignore pre-assigned for pmerge due to timeout")
0205 scanSiteList = self.siteMapper.getCloud(cloudName)["sites"]
0206 tmpLog.info(f"cloud={cloudName} has {len(scanSiteList)} candidates")
0207 else:
0208
0209 siteListPreAssigned = True
0210 scanSiteList = DataServiceUtils.getSitesShareDDM(self.siteMapper, inputChunk.getPreassignedSite(), JobUtils.PROD_PS, JobUtils.PROD_PS)
0211 scanSiteList.append(inputChunk.getPreassignedSite())
0212 tmp_msg = (
0213 f"use site={scanSiteList} since they share DDM endpoints with original_site={inputChunk.getPreassignedSite()} "
0214 f"which is pre-assigned in masterDS criteria=+premerge"
0215 )
0216 tmpLog.info(tmp_msg)
0217
0218 else:
0219 scanSiteList = self.siteMapper.getCloud(cloudName)["sites"]
0220 tmpLog.info(f"cloud={cloudName} has {len(scanSiteList)} candidates")
0221
0222
0223 if taskSpec.useEventService() and not taskSpec.useJobCloning() and (taskSpec.currentPriority >= 900 or inputChunk.useScout()):
0224 esHigh = True
0225 else:
0226 esHigh = False
0227
0228
0229 tmpSt, jobStatMap = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
0230 if not tmpSt:
0231 tmpLog.error("failed to get job statistics")
0232 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0233 return retTmpError
0234
0235 tmp_st, transferring_job_map = self.taskBufferIF.get_num_jobs_with_status_by_nucleus(taskSpec.vo, "transferring")
0236 if not tmp_st:
0237 tmpLog.error("failed to get transferring job statistics")
0238 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0239 return retTmpError
0240
0241
0242 nucleusSpec = None
0243 nucleus_with_storages_unwritable_over_wan = {}
0244 if not hintForTB:
0245
0246 nucleusSpec = self.siteMapper.getNucleus(taskSpec.nucleus)
0247 if nucleusSpec is None:
0248 tmpLog.error(f"unknown nucleus {taskSpec.nucleus}")
0249 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0250 return retTmpError
0251 sites_in_nucleus = nucleusSpec.allPandaSites
0252
0253 for tmp_name, tmp_nucleus_dict in self.siteMapper.nuclei.items():
0254 default_endpoint_out = tmp_nucleus_dict.get_default_endpoint_out()
0255 if not default_endpoint_out:
0256 continue
0257 receive_output_over_wan = default_endpoint_out["detailed_status"].get("write_wan")
0258 if receive_output_over_wan in ["OFF", "TEST"]:
0259 nucleus_with_storages_unwritable_over_wan[tmp_name] = receive_output_over_wan
0260
0261 else:
0262
0263 sites_in_nucleus = []
0264 for tmpNucleus in self.siteMapper.nuclei.values():
0265 sites_in_nucleus += tmpNucleus.allPandaSites
0266
0267
0268 if len(sites_in_nucleus) > 0:
0269 sites_sharing_output_storages_in_nucleus = DataServiceUtils.getSitesShareDDM(
0270 self.siteMapper, sites_in_nucleus[0], JobUtils.PROD_PS, JobUtils.PROD_PS, True
0271 )
0272 else:
0273 sites_sharing_output_storages_in_nucleus = []
0274
0275
0276 if inputChunk.isMerging and taskSpec.mergeCoreCount is not None:
0277 taskCoreCount = taskSpec.mergeCoreCount
0278 else:
0279 taskCoreCount = taskSpec.coreCount
0280
0281
0282 if taskCoreCount is not None and taskCoreCount > 1:
0283
0284 useMP = "only"
0285 elif taskCoreCount == 0 and (taskSpec.currentPriority >= 900 or inputChunk.useScout()):
0286
0287 useMP = "only"
0288 elif taskCoreCount == 0:
0289
0290 useMP = "any"
0291 else:
0292
0293 useMP = "unuse"
0294
0295
0296 workQueue = self.taskBufferIF.getWorkQueueMap().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
0297 if workQueue.is_global_share:
0298 wq_tag = workQueue.queue_name
0299 wq_tag_global_share = wq_tag
0300 else:
0301 wq_tag = workQueue.queue_id
0302 workQueueGS = self.taskBufferIF.getWorkQueueMap().getQueueWithIDGshare(None, taskSpec.gshare)
0303 wq_tag_global_share = workQueueGS.queue_name
0304
0305
0306 self.init_summary_list("Job brokerage summary", None, self.get_unified_sites(scanSiteList))
0307
0308
0309
0310 remote_source_available = True
0311 if inputChunk.getDatasets() and not taskSpec.inputPreStaging():
0312 for datasetSpec in inputChunk.getDatasets():
0313 datasetName = datasetSpec.datasetName
0314
0315 is_distributed = self.ddmIF.isDistributedDataset(datasetName)
0316 if is_distributed:
0317 tmpLog.debug(f"completeness check disabled for {datasetName} since it is distributed")
0318 continue
0319
0320 (
0321 tmpSt,
0322 tmpRet,
0323 tmp_complete_disk_ok,
0324 tmp_complete_tape_ok,
0325 tmp_truly_complete_disk,
0326 tmp_can_be_local_source,
0327 tmp_can_be_remote_source,
0328 tmp_list_of_complete_replica_locations,
0329 ) = AtlasBrokerUtils.get_sites_with_data(
0330 [],
0331 self.siteMapper,
0332 self.ddmIF,
0333 datasetName,
0334 [],
0335 max_missing_input_files,
0336 min_input_completeness,
0337 )
0338 if tmpSt != Interaction.SC_SUCCEEDED:
0339 tmpLog.error(f"failed to get available storage endpoints with {datasetName}")
0340 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0341 return retTmpError
0342
0343 if not tmp_complete_disk_ok and not tmp_complete_tape_ok:
0344 err_msg = f"{datasetName} is "
0345 if tmp_list_of_complete_replica_locations:
0346 tmp_rse_list = ",".join(tmp_list_of_complete_replica_locations)
0347 tmp_is_single = len(tmp_list_of_complete_replica_locations) == 1
0348 err_msg += f"only complete at {tmp_rse_list} which "
0349 err_msg += "is " if tmp_is_single else "are "
0350 err_msg += "currently in downtime or offline"
0351 else:
0352 err_msg += "incomplete at any online storage"
0353 tmpLog.error(err_msg)
0354 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0355 return retTmpError
0356 tmp_msg = f"complete replicas of {datasetName} are available at online storages"
0357 if not tmp_can_be_remote_source:
0358 if tmp_can_be_local_source:
0359 tmp_msg += ", but files cannot be sent out to satellites since read_wan is not ON"
0360 remote_source_available = False
0361 else:
0362 tmp_msg = f"complete replicas of {datasetName} are available at endpoints where read_wan/lan are not ON"
0363 tmpLog.error(tmp_msg)
0364 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0365 return retTmpError
0366 tmpLog.info(tmp_msg)
0367
0368
0369
0370 if not sitePreAssigned and not siteListPreAssigned:
0371 newScanSiteList = []
0372 oldScanSiteList = copy.copy(scanSiteList)
0373 msg_map = {}
0374 tmpLog.set_message_slot()
0375 for tmpSiteName in scanSiteList:
0376 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0377
0378
0379 if tmpSiteSpec.is_unified:
0380 continue
0381
0382 msgFlag = False
0383 skipFlag = False
0384
0385 if tmpSiteSpec.status in ["online", "standby"]:
0386 newScanSiteList.append(tmpSiteName)
0387 else:
0388 msgFlag = True
0389 skipFlag = True
0390
0391 if msgFlag:
0392 tmpStr = f" skip site={tmpSiteName} due to status={tmpSiteSpec.status} criteria=-status"
0393 if skipFlag:
0394 msg_map[tmpSiteSpec.get_unified_name()] = (
0395 f" skip site={tmpSiteSpec.get_unified_name()} due to status={tmpSiteSpec.status} criteria=-status"
0396 )
0397 else:
0398 siteSkippedTmp[tmpSiteName] = tmpStr
0399
0400 tmpLog.set_message_slot()
0401 scanSiteList = newScanSiteList
0402 self.add_summary_message(oldScanSiteList, scanSiteList, "status check", tmpLog, msg_map)
0403 if not scanSiteList:
0404 self.dump_summary(tmpLog)
0405 tmpLog.error("no candidates")
0406 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0407 return retTmpError
0408
0409
0410
0411 nucleus = taskSpec.nucleus
0412 storageMapping = self.taskBufferIF.getPandaSiteToOutputStorageSiteMapping()
0413
0414 if nucleus:
0415
0416 if inputChunk.isExpress():
0417 transferred_tag = f"{URG_ACTIVITY}{TRANSFERRED_6H}"
0418 queued_tag = f"{URG_ACTIVITY}{QUEUED}"
0419 else:
0420 transferred_tag = f"{PRD_ACTIVITY}{TRANSFERRED_6H}"
0421 queued_tag = f"{PRD_ACTIVITY}{QUEUED}"
0422
0423 networkMap = self.taskBufferIF.getNetworkMetrics(nucleus, [AGIS_CLOSENESS, transferred_tag, queued_tag, FTS_1H, FTS_1D, FTS_1W])
0424
0425
0426
0427 if nucleus:
0428
0429 if queued_tag in networkMap["total"]:
0430 totalQueued = networkMap["total"][queued_tag]
0431 else:
0432 totalQueued = 0
0433
0434 tmpLog.debug(f"Total number of files being transferred to the nucleus : {totalQueued}")
0435 newScanSiteList = []
0436 oldScanSiteList = copy.copy(scanSiteList)
0437 newSkippedTmp = dict()
0438 msg_map = {}
0439 tmpLog.set_message_slot()
0440 for tmpPandaSiteName in self.get_unified_sites(scanSiteList):
0441 try:
0442 tmpAtlasSiteName = storageMapping[tmpPandaSiteName]["default"]
0443 skipFlag = False
0444 tempFlag = False
0445 criteria = "-link_unusable"
0446 from_str = f"from satellite={tmpAtlasSiteName}"
0447 reason = ""
0448 if nucleus == tmpAtlasSiteName:
0449
0450 pass
0451 elif nucleus in nucleus_with_storages_unwritable_over_wan:
0452
0453 reason = (
0454 nucleusSpec.get_default_endpoint_out()["ddm_endpoint_name"]
0455 + f" at nucleus={nucleus} unwritable over WAN write_wan={nucleus_with_storages_unwritable_over_wan[nucleus]}"
0456 )
0457 criteria = "-dest_blacklisted"
0458 from_str = ""
0459 tempFlag = True
0460 elif totalQueued >= self.total_queue_threshold:
0461
0462 reason = f"too many output files being transferred to the nucleus {totalQueued}(>{self.total_queue_threshold} total limit)"
0463 criteria = "-links_full"
0464 from_str = ""
0465 tempFlag = True
0466 elif tmpAtlasSiteName not in networkMap:
0467
0468
0469 pass
0470 elif AGIS_CLOSENESS in networkMap[tmpAtlasSiteName] and networkMap[tmpAtlasSiteName][AGIS_CLOSENESS] == BLOCKED_LINK:
0471
0472 reason = f"agis_closeness={networkMap[tmpAtlasSiteName][AGIS_CLOSENESS]}"
0473 skipFlag = True
0474 elif queued_tag in networkMap[tmpAtlasSiteName] and networkMap[tmpAtlasSiteName][queued_tag] >= self.queue_threshold:
0475
0476 reason = f"too many output files queued in the channel {networkMap[tmpAtlasSiteName][queued_tag]}(>{self.queue_threshold} link limit)"
0477 tempFlag = True
0478
0479
0480 tmpStr = f" skip site={tmpPandaSiteName} due to {reason}"
0481 if from_str:
0482 tmpStr += f", {from_str} to nucleus={nucleus}"
0483 tmpStr += f": criteria={criteria}"
0484 if skipFlag:
0485 msg_map[tmpPandaSiteName] = tmpStr
0486 else:
0487 newScanSiteList.append(tmpPandaSiteName)
0488 if tempFlag:
0489 newSkippedTmp[tmpPandaSiteName] = tmpStr
0490 except KeyError:
0491
0492
0493 newScanSiteList.append(tmpPandaSiteName)
0494 tmpLog.unset_message_slot()
0495 siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
0496 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0497 self.add_summary_message(oldScanSiteList, scanSiteList, "link check", tmpLog, msg_map)
0498 if not scanSiteList:
0499 self.dump_summary(tmpLog)
0500 tmpLog.error("no candidates")
0501 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0502 return retTmpError
0503
0504
0505 t1WeightForHighPrio = 1
0506 if (taskSpec.currentPriority >= 800 or inputChunk.useScout()) and not sitePreAssigned and not siteListPreAssigned:
0507 if not taskSpec.useEventService():
0508 if taskSpec.currentPriority >= 900 or inputChunk.useScout():
0509 t1WeightForHighPrio = 100
0510 newScanSiteList = []
0511 oldScanSiteList = copy.copy(scanSiteList)
0512 msg_map = {}
0513 tmpLog.set_message_slot()
0514
0515 for tmpSiteName in scanSiteList:
0516 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0517 if not tmpSiteSpec.is_opportunistic():
0518 newScanSiteList.append(tmpSiteName)
0519 else:
0520 tmp_msg = f" skip site={tmpSiteName} to avoid opportunistic for high priority jobs "
0521 tmp_msg += "criteria=-opportunistic"
0522 msg_map[tmpSiteSpec.get_unified_name()] = (
0523 f" skip site={tmpSiteSpec.get_unified_name()} to avoid opportunistic for high priority jobs criteria=-opportunistic"
0524 )
0525
0526 tmpLog.unset_message_slot()
0527 scanSiteList = newScanSiteList
0528 self.add_summary_message(oldScanSiteList, scanSiteList, "opportunistic check", tmpLog, msg_map)
0529
0530 if not scanSiteList:
0531 self.dump_summary(tmpLog)
0532 tmpLog.error("no candidates")
0533 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0534 return retTmpError
0535
0536
0537
0538 if (
0539 (taskSpec.currentPriority >= 800 or inputChunk.useScout() or inputChunk.isMerging or taskSpec.mergeOutput())
0540 and not sitePreAssigned
0541 and not siteListPreAssigned
0542 ):
0543
0544 inactiveTimeLimit = 2
0545 inactiveSites = self.taskBufferIF.getInactiveSites_JEDI("production", inactiveTimeLimit)
0546 newScanSiteList = []
0547 oldScanSiteList = copy.copy(scanSiteList)
0548 newSkippedTmp = dict()
0549 tmpLog.set_message_slot()
0550
0551 for tmpSiteName in self.get_unified_sites(scanSiteList):
0552 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0553 nToGetAll = AtlasBrokerUtils.getNumJobs(jobStatMap, tmpSiteSpec.get_unified_name(), "activated") + AtlasBrokerUtils.getNumJobs(
0554 jobStatMap, tmpSiteSpec.get_unified_name(), "starting"
0555 )
0556
0557 if tmpSiteName in inactiveSites and nToGetAll > 0:
0558 tmp_msg = (
0559 f" skip site={tmpSiteName} since high prio/scouts/merge needs to avoid inactive sites "
0560 f"(laststart is older than {inactiveTimeLimit}h) criteria=-inactive"
0561 )
0562
0563 newSkippedTmp[tmpSiteName] = tmp_msg
0564 newScanSiteList.append(tmpSiteName)
0565
0566 tmpLog.unset_message_slot()
0567 siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
0568 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0569 self.add_summary_message(oldScanSiteList, scanSiteList, "slowness/inactive check", tmpLog, {})
0570 if not scanSiteList:
0571 self.dump_summary(tmpLog)
0572 tmpLog.error("no candidates")
0573 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0574 return retTmpError
0575
0576
0577
0578 if sitePreAssigned and taskSpec.prodSourceLabel not in [JobUtils.PROD_PS] or workQueue.queue_name not in ["test", "validation"]:
0579 newScanSiteList = []
0580 oldScanSiteList = copy.copy(scanSiteList)
0581 msg_map = {}
0582 tmpLog.set_message_slot()
0583 for tmpSiteName in scanSiteList:
0584 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0585
0586 if AtlasBrokerUtils.hasZeroShare(tmpSiteSpec, taskSpec, inputChunk.isMerging, tmpLog):
0587 msg_map[tmpSiteSpec.get_unified_name()] = f" skip site={tmpSiteSpec.get_unified_name()} due to zero share criteria=-zeroshare"
0588 continue
0589 newScanSiteList.append(tmpSiteName)
0590 tmpLog.unset_message_slot()
0591 scanSiteList = newScanSiteList
0592 self.add_summary_message(oldScanSiteList, scanSiteList, "zero share check", tmpLog, msg_map)
0593 if not scanSiteList:
0594 self.dump_summary(tmpLog)
0595 tmpLog.error("no candidates")
0596 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0597 return retTmpError
0598
0599
0600
0601 if not sitePreAssigned and taskSpec.getNumJumboJobs() is None:
0602 newScanSiteList = []
0603 oldScanSiteList = copy.copy(scanSiteList)
0604 msg_map = {}
0605 tmpLog.set_message_slot()
0606 for tmpSiteName in scanSiteList:
0607 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0608 if tmpSiteSpec.useJumboJobs():
0609 msg_map[tmpSiteSpec.get_unified_name()] = f" skip site={tmpSiteSpec.get_unified_name()} since it is only for jumbo jobs criteria=-jumbo"
0610 continue
0611 newScanSiteList.append(tmpSiteName)
0612 tmpLog.unset_message_slot()
0613 scanSiteList = newScanSiteList
0614 tmpLog.info(f"{len(scanSiteList)} candidates passed jumbo job check")
0615 self.add_summary_message(oldScanSiteList, scanSiteList, "jumbo job check", tmpLog, msg_map)
0616 if scanSiteList == []:
0617 self.dump_summary(tmpLog)
0618 tmpLog.error("no candidates")
0619 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0620 return retTmpError
0621 elif not sitePreAssigned and taskSpec.getNumJumboJobs() is not None:
0622 nReadyEvents = self.taskBufferIF.getNumReadyEvents(taskSpec.jediTaskID)
0623 if nReadyEvents is not None:
0624 newScanSiteList = []
0625 oldScanSiteList = copy.copy(scanSiteList)
0626 msg_map = {}
0627 tmpLog.set_message_slot()
0628 for tmpSiteName in scanSiteList:
0629 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0630 if tmpSiteSpec.useJumboJobs():
0631 minEvents = tmpSiteSpec.getMinEventsForJumbo()
0632 if minEvents is not None and nReadyEvents < minEvents:
0633 msg_map[tmpSiteSpec.get_unified_name()] = (
0634 f" skip site={tmpSiteSpec.get_unified_name()} since not enough events ({minEvents}<{nReadyEvents}) "
0635 "for jumbo criteria=-fewevents"
0636 )
0637 continue
0638 newScanSiteList.append(tmpSiteName)
0639 tmpLog.unset_message_slot()
0640 scanSiteList = newScanSiteList
0641 self.add_summary_message(oldScanSiteList, scanSiteList, "jumbo events check", tmpLog, msg_map)
0642 if not scanSiteList:
0643 self.dump_summary(tmpLog)
0644 tmpLog.error("no candidates")
0645 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0646 return retTmpError
0647
0648
0649
0650
0651
0652 max_diskio_per_core_default = self.taskBufferIF.getConfigValue(COMPONENT, "MAX_DISKIO_DEFAULT", APP, VO)
0653 if not max_diskio_per_core_default:
0654 max_diskio_per_core_default = 10**10
0655
0656
0657 diskio_percore_usage = self.taskBufferIF.getAvgDiskIO_JEDI()
0658 unified_site_list = self.get_unified_sites(scanSiteList)
0659 newScanSiteList = []
0660 oldScanSiteList = copy.copy(scanSiteList)
0661 msg_map = {}
0662 tmpLog.set_message_slot()
0663 newSkippedTmp = dict()
0664 for tmpSiteName in unified_site_list:
0665 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0666
0667
0668 diskio_usage_tmp = diskio_percore_usage.get(tmpSiteName, 0)
0669
0670
0671 if tmpSiteSpec.maxDiskio and tmpSiteSpec.maxDiskio > 0:
0672
0673 diskio_limit_tmp = tmpSiteSpec.maxDiskio
0674 else:
0675
0676 diskio_limit_tmp = max_diskio_per_core_default
0677
0678
0679 diskio_task_tmp = taskSpec.diskIO
0680 if taskSpec.diskIO is not None and taskSpec.coreCount not in [None, 0, 1] and tmpSiteSpec.coreCount not in [None, 0]:
0681 diskio_task_tmp = taskSpec.diskIO // tmpSiteSpec.coreCount
0682
0683 try:
0684 log_msg = f"diskIO measurements: site={tmpSiteName} jediTaskID={taskSpec.jediTaskID} "
0685 if diskio_task_tmp is not None:
0686 log_msg += f"diskIO_task={diskio_task_tmp:.2f} "
0687 if diskio_usage_tmp is not None:
0688 log_msg += f"diskIO_site_usage={diskio_usage_tmp:.2f} "
0689 if diskio_limit_tmp is not None:
0690 log_msg += f"diskIO_site_limit={diskio_limit_tmp:.2f} "
0691 tmpLog.info(log_msg)
0692 except Exception:
0693 tmpLog.debug("diskIO measurements: Error generating diskIO message")
0694
0695
0696 if diskio_task_tmp and diskio_usage_tmp and diskio_limit_tmp and diskio_usage_tmp > diskio_limit_tmp and diskio_task_tmp > diskio_limit_tmp:
0697 tmp_msg = f" skip site={tmpSiteName} due to diskIO overload criteria=-diskIO"
0698 newSkippedTmp[tmpSiteName] = tmp_msg
0699 msg_map[tmpSiteName] = tmp_msg
0700
0701 newScanSiteList.append(tmpSiteName)
0702
0703 tmpLog.unset_message_slot()
0704 siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
0705 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0706 self.add_summary_message(oldScanSiteList, scanSiteList, "diskIO check", tmpLog, msg_map)
0707 if not scanSiteList:
0708 self.dump_summary(tmpLog)
0709 tmpLog.error("no candidates")
0710 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0711 return retTmpError
0712
0713
0714
0715 if not sitePreAssigned:
0716 newScanSiteList = []
0717 oldScanSiteList = copy.copy(scanSiteList)
0718 msg_map = {}
0719 tmpLog.set_message_slot()
0720 max_core_count = taskSpec.get_max_core_count()
0721 for tmpSiteName in scanSiteList:
0722 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0723
0724 if useMP == "any" or (useMP == "only" and tmpSiteSpec.coreCount > 1) or (useMP == "unuse" and tmpSiteSpec.coreCount in [0, 1, None]):
0725 if max_core_count and tmpSiteSpec.coreCount and tmpSiteSpec.coreCount > max_core_count:
0726 msg_map[tmpSiteSpec.get_unified_name()] = (
0727 f" skip site={tmpSiteSpec.get_unified_name()} due to larger core count site:{tmpSiteSpec.coreCount} "
0728 f"than task_max={max_core_count} criteria=-max_cpucore"
0729 )
0730 else:
0731 newScanSiteList.append(tmpSiteName)
0732 else:
0733 msg_map[tmpSiteSpec.get_unified_name()] = (
0734 f" skip site={tmpSiteSpec.get_unified_name()} due to core mismatch "
0735 f"site:{tmpSiteSpec.coreCount} <> task:{taskCoreCount} criteria=-cpucore"
0736 )
0737 tmpLog.unset_message_slot()
0738 scanSiteList = newScanSiteList
0739 self.add_summary_message(oldScanSiteList, scanSiteList, "core count check", tmpLog, msg_map)
0740 if not scanSiteList:
0741 self.dump_summary(tmpLog)
0742 tmpLog.error("no candidates")
0743 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0744 return retTmpError
0745
0746
0747
0748 cmt_config = taskSpec.get_sw_platform()
0749 is_regexp_cmt_config = False
0750 if cmt_config:
0751 if re.match(cmt_config, cmt_config) is None:
0752 is_regexp_cmt_config = True
0753 base_platform = taskSpec.get_base_platform()
0754 resolved_platforms = {}
0755 preference_weight_map = {}
0756 if taskSpec.transHome is not None:
0757 jsonCheck = AtlasBrokerUtils.JsonSoftwareCheck(self.siteMapper, self.sw_map, self.architecture_level_map)
0758 unified_site_list = self.get_unified_sites(scanSiteList)
0759
0760 host_cpu_spec = taskSpec.get_host_cpu_spec()
0761 host_cpu_pref = taskSpec.get_host_cpu_preference()
0762 host_gpu_spec = taskSpec.get_host_gpu_spec()
0763
0764 if taskSpec.get_base_platform() is None:
0765 use_container = False
0766 else:
0767 use_container = True
0768
0769 need_cvmfs = False
0770
0771
0772 if (re.search("-\d+\.\d+\.\d+$", taskSpec.transHome) is not None) or (
0773 re.search("rel_\d+(\n|$)", taskSpec.transHome) is None and re.search("\d{4}-\d{2}-\d{2}T\d{4}$", taskSpec.transHome) is None
0774 ):
0775 cvmfs_repo = "atlas"
0776 sw_project = taskSpec.transHome.split("-")[0]
0777 sw_version = taskSpec.transHome.split("-")[1]
0778 cmt_config_only = False
0779
0780
0781 else:
0782 cvmfs_repo = "nightlies"
0783 sw_project = None
0784 sw_version = None
0785 cmt_config_only = False
0786
0787 siteListWithSW, sitesNoJsonCheck, preference_weight_map = jsonCheck.check(
0788 unified_site_list,
0789 cvmfs_repo,
0790 sw_project,
0791 sw_version,
0792 cmt_config,
0793 need_cvmfs,
0794 cmt_config_only,
0795 need_container=use_container,
0796 container_name=taskSpec.container_name,
0797 only_tags_fc=taskSpec.use_only_tags_fc(),
0798 host_cpu_specs=host_cpu_spec,
0799 host_cpu_pref=host_cpu_pref,
0800 host_gpu_spec=host_gpu_spec,
0801 log_stream=tmpLog,
0802 )
0803 sitesAuto = copy.copy(siteListWithSW)
0804 sitesAny = []
0805
0806 newScanSiteList = []
0807 oldScanSiteList = copy.copy(scanSiteList)
0808 msg_map = {}
0809 tmpLog.set_message_slot()
0810
0811 for tmpSiteName in unified_site_list:
0812 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0813 if cmt_config:
0814 platforms = AtlasBrokerUtils.resolve_cmt_config(tmpSiteName, cmt_config, base_platform, self.sw_map)
0815 if platforms:
0816 resolved_platforms[tmpSiteName] = platforms
0817 if tmpSiteName in siteListWithSW:
0818
0819 if not is_regexp_cmt_config or tmpSiteName in resolved_platforms:
0820 newScanSiteList.append(tmpSiteName)
0821 else:
0822
0823 msg_map[tmpSiteName] = f" skip site={tmpSiteName} due to unresolved regexp in cmtconfig={cmt_config} criteria=-regexpcmtconfig"
0824 elif (
0825 not (taskSpec.container_name and taskSpec.use_only_tags_fc())
0826 and host_cpu_spec is None
0827 and host_gpu_spec is None
0828 and tmpSiteSpec.releases == ["ANY"]
0829 ):
0830
0831 newScanSiteList.append(tmpSiteName)
0832 sitesAny.append(tmpSiteName)
0833 else:
0834 if tmpSiteSpec.releases == ["AUTO"]:
0835 autoStr = "with AUTO"
0836 else:
0837 autoStr = "without AUTO"
0838
0839 msg_map[tmpSiteName] = (
0840 f" skip site={tmpSiteName} {autoStr} due to missing SW cache={taskSpec.transHome}:{taskSpec.get_sw_platform()} sw_platform='{taskSpec.container_name}' "
0841 f"or irrelevant HW cpu={str(host_cpu_spec)} gpu={str(host_gpu_spec)} criteria=-cache"
0842 )
0843
0844 tmpLog.unset_message_slot()
0845 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0846 self.add_summary_message(oldScanSiteList, scanSiteList, "SW/HW check", tmpLog, msg_map)
0847 tmpLog.info(f" {len(sitesAuto)} with AUTO, {len(sitesAny)} with ANY")
0848 if not scanSiteList:
0849 self.dump_summary(tmpLog)
0850 tmpLog.error("no candidates")
0851 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0852 return retTmpError
0853
0854
0855
0856 origMinRamCount = inputChunk.getMaxRamCount()
0857 if origMinRamCount not in [0, None]:
0858 if inputChunk.isMerging:
0859 strMinRamCount = f"{origMinRamCount}(MB)"
0860 else:
0861 str_ram_unit = taskSpec.ramUnit
0862 if str_ram_unit:
0863 str_ram_unit = str_ram_unit.replace("PerCore", " ").strip()
0864 strMinRamCount = f"{origMinRamCount}({str_ram_unit})"
0865 if not inputChunk.isMerging and taskSpec.baseRamCount not in [0, None]:
0866 strMinRamCount += f"+{taskSpec.baseRamCount}"
0867 newScanSiteList = []
0868 oldScanSiteList = copy.copy(scanSiteList)
0869 msg_map = {}
0870 tmpLog.set_message_slot()
0871 for tmpSiteName in scanSiteList:
0872 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0873
0874 minRamCount = origMinRamCount
0875 if taskSpec.ramPerCore() and not inputChunk.isMerging:
0876 if tmpSiteSpec.coreCount not in [None, 0]:
0877 minRamCount = origMinRamCount * tmpSiteSpec.coreCount
0878 minRamCount += taskSpec.baseRamCount
0879
0880 minRamCount = JobUtils.compensate_ram_count(minRamCount)
0881
0882 site_maxmemory = 0
0883 if tmpSiteSpec.maxrss not in [0, None]:
0884 site_maxmemory = tmpSiteSpec.maxrss
0885
0886 if site_maxmemory not in [0, None] and minRamCount != 0 and minRamCount > site_maxmemory:
0887 tmp_msg = f" skip site={tmpSiteName} due to sue to insufficient RAM less than less than job's core-scaled requirement {minRamCount} MB "
0888 tmp_msg += "criteria=-lowmemory"
0889 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
0890 continue
0891
0892 site_minmemory = 0
0893 if tmpSiteSpec.minrss not in [0, None]:
0894 site_minmemory = tmpSiteSpec.minrss
0895 if site_minmemory not in [0, None] and minRamCount != 0 and minRamCount < site_minmemory:
0896 tmp_msg = f" skip site={tmpSiteName} due to RAM lower limit greater than than job's core-scaled requirement {minRamCount} MB "
0897 tmp_msg += "criteria=-highmemory"
0898 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
0899 continue
0900 newScanSiteList.append(tmpSiteName)
0901 tmpLog.unset_message_slot()
0902 scanSiteList = newScanSiteList
0903 self.add_summary_message(oldScanSiteList, scanSiteList, "memory check", tmpLog, msg_map)
0904 if not scanSiteList:
0905 self.dump_summary(tmpLog)
0906 tmpLog.error("no candidates")
0907 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0908 return retTmpError
0909
0910
0911
0912 if taskSpec.outputScaleWithEvents():
0913 minDiskCount = max(taskSpec.getOutDiskSize() * inputChunk.getMaxAtomSize(getNumEvents=True), inputChunk.defaultOutputSize)
0914 else:
0915 minDiskCount = max(taskSpec.getOutDiskSize() * inputChunk.getMaxAtomSize(effectiveSize=True), inputChunk.defaultOutputSize)
0916 minDiskCount += taskSpec.getWorkDiskSize()
0917 minDiskCountL = minDiskCount
0918 minDiskCountD = minDiskCount
0919 minDiskCountL += inputChunk.getMaxAtomSize()
0920 minDiskCountL = minDiskCountL // 1024 // 1024
0921 minDiskCountD = minDiskCountD // 1024 // 1024
0922 newScanSiteList = []
0923 oldScanSiteList = copy.copy(scanSiteList)
0924 msg_map = {}
0925 tmpLog.set_message_slot()
0926 for tmpSiteName in self.get_unified_sites(scanSiteList):
0927 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0928
0929 if taskSpec.allowInputLAN() == "only" and not tmpSiteSpec.isDirectIO() and not tmpSiteSpec.always_use_direct_io() and not inputChunk.isMerging:
0930 tmp_msg = f" skip site={tmpSiteName} since direct IO is disabled "
0931 tmp_msg += "criteria=-remoteio"
0932 msg_map[tmpSiteName] = tmp_msg
0933 continue
0934
0935 if tmpSiteSpec.maxwdir != 0:
0936 if CoreUtils.use_direct_io_for_job(taskSpec, tmpSiteSpec, inputChunk):
0937
0938 minDiskCount = minDiskCountD
0939 else:
0940
0941 minDiskCount = minDiskCountL
0942
0943
0944 if tmpSiteSpec.coreCount in [None, 0, 1]:
0945 site_cc = 1
0946 else:
0947 site_cc = tmpSiteSpec.coreCount
0948
0949 if taskSpec.coreCount in [None, 0, 1] and not taskSpec.useEventService():
0950 task_cc = 1
0951 else:
0952 task_cc = site_cc
0953
0954 maxwdir_scaled = tmpSiteSpec.maxwdir * task_cc / site_cc
0955
0956 if minDiskCount > maxwdir_scaled:
0957 tmp_msg = f" skip site={tmpSiteName} due to small scratch disk {maxwdir_scaled} MB less than {minDiskCount} MB"
0958 tmp_msg += " criteria=-disk"
0959 msg_map[tmpSiteName] = tmp_msg
0960 continue
0961 newMaxwdir[tmpSiteName] = maxwdir_scaled
0962 newScanSiteList.append(tmpSiteName)
0963 tmpLog.unset_message_slot()
0964 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0965 self.add_summary_message(oldScanSiteList, scanSiteList, "disk check", tmpLog, msg_map)
0966 if not scanSiteList:
0967 self.dump_summary(tmpLog)
0968 tmpLog.error("no candidates")
0969 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0970 return retTmpError
0971
0972
0973
0974 newScanSiteList = []
0975 oldScanSiteList = copy.copy(scanSiteList)
0976 msg_map = {}
0977 tmpLog.set_message_slot()
0978 newSkippedTmp = dict()
0979 for tmpSiteName in self.get_unified_sites(scanSiteList):
0980 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0981 scope_input, scope_output = select_scope(tmpSiteSpec, JobUtils.PROD_PS, JobUtils.PROD_PS)
0982
0983 tmp_default_output_endpoint = tmpSiteSpec.ddm_endpoints_output[scope_output].getEndPoint(tmpSiteSpec.ddm_output[scope_output])
0984 tmp_msg = None
0985
0986 if tmp_default_output_endpoint is not None:
0987 tmpSpaceSize = 0
0988 if tmp_default_output_endpoint["space_free"] is not None:
0989 tmpSpaceSize += tmp_default_output_endpoint["space_free"]
0990 if tmp_default_output_endpoint["space_expired"] is not None:
0991 tmpSpaceSize += tmp_default_output_endpoint["space_expired"]
0992 diskThreshold = 200
0993
0994
0995 if tmpSpaceSize < diskThreshold and "skip_RSE_check" not in tmpSiteSpec.catchall:
0996 tmp_msg = (
0997 f" skip site={tmpSiteName} due to disk shortage at "
0998 f"{tmpSiteSpec.ddm_output[scope_output]} {tmpSpaceSize}GB < {diskThreshold}GB criteria=-disk"
0999 )
1000
1001 if not tmp_msg:
1002 tmp_msg = AtlasBrokerUtils.check_endpoints_with_blacklist(tmpSiteSpec, scope_input, scope_output, sites_in_nucleus, remote_source_available)
1003 if tmp_msg is not None:
1004 newSkippedTmp[tmpSiteName] = tmp_msg
1005 msg_map[tmpSiteName] = tmp_msg
1006 newScanSiteList.append(tmpSiteName)
1007 tmpLog.unset_message_slot()
1008 siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
1009 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1010 self.add_summary_message(oldScanSiteList, scanSiteList, "Storage check", tmpLog, msg_map)
1011 if not scanSiteList:
1012 self.dump_summary(tmpLog)
1013 tmpLog.error("no candidates")
1014 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1015 return retTmpError
1016
1017
1018
1019 if taskSpec.useEventService() and not taskSpec.useJobCloning():
1020 nEsConsumers = taskSpec.getNumEventServiceConsumer()
1021 if nEsConsumers is None:
1022 nEsConsumers = 1
1023 maxAttemptEsJob = taskSpec.getMaxAttemptEsJob()
1024 if maxAttemptEsJob is None:
1025 maxAttemptEsJob = EventServiceUtils.defMaxAttemptEsJob
1026 else:
1027 maxAttemptEsJob += 1
1028 else:
1029 nEsConsumers = 1
1030 maxAttemptEsJob = 1
1031 maxWalltime = None
1032 maxWalltime_dyn = None
1033 minWalltime_dyn = None
1034 minWalltime = None
1035 strMaxWalltime = None
1036 strMaxWalltime_dyn = None
1037 strMinWalltime_dyn = None
1038 if not taskSpec.useHS06():
1039 tmpMaxAtomSize = inputChunk.getMaxAtomSize(effectiveSize=True)
1040 if taskSpec.walltime is not None:
1041 minWalltime = taskSpec.walltime * tmpMaxAtomSize
1042 else:
1043 minWalltime = None
1044
1045 if not taskSpec.useEventService() or taskSpec.useJobCloning():
1046 strMinWalltime = f"walltime*inputSize={taskSpec.walltime}*{tmpMaxAtomSize}"
1047 else:
1048 strMinWalltime = f"walltime*inputSize/nEsConsumers/maxAttemptEsJob={taskSpec.walltime}*{tmpMaxAtomSize}/{nEsConsumers}/{maxAttemptEsJob}"
1049 else:
1050 tmpMaxAtomSize = inputChunk.getMaxAtomSize(getNumEvents=True)
1051 if taskSpec.getCpuTime() is not None:
1052 minWalltime = taskSpec.getCpuTime() * tmpMaxAtomSize
1053 if taskSpec.dynamicNumEvents():
1054
1055 minGranularity = taskSpec.get_min_granularity()
1056 minWalltime_dyn = taskSpec.getCpuTime() * minGranularity
1057 strMinWalltime_dyn = f"cpuTime*minGranularity={taskSpec.getCpuTime()}*{minGranularity}"
1058
1059 eventJump, totalEvents = inputChunk.check_event_jump_and_sum()
1060
1061 maxEventsPerJob = taskSpec.get_max_events_per_job()
1062 if maxEventsPerJob:
1063 totalEvents = min(totalEvents, maxEventsPerJob)
1064 maxWalltime_dyn = taskSpec.getCpuTime() * totalEvents
1065 strMaxWalltime_dyn = f"cpuTime*maxEventsPerJob={taskSpec.getCpuTime()}*{totalEvents}"
1066
1067 if not taskSpec.useEventService() or taskSpec.useJobCloning():
1068 strMinWalltime = f"cpuTime*nEventsPerJob={taskSpec.getCpuTime()}*{tmpMaxAtomSize}"
1069 else:
1070 strMinWalltime = f"cpuTime*nEventsPerJob/nEsConsumers/maxAttemptEsJob={taskSpec.getCpuTime()}*{tmpMaxAtomSize}/{nEsConsumers}/{maxAttemptEsJob}"
1071 if minWalltime:
1072 minWalltime /= nEsConsumers * maxAttemptEsJob
1073 newScanSiteList = []
1074 oldScanSiteList = copy.copy(scanSiteList)
1075 msg_map = {}
1076 tmpLog.set_message_slot()
1077 for tmpSiteName in scanSiteList:
1078 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1079 siteMaxTime = tmpSiteSpec.maxtime
1080 origSiteMaxTime = siteMaxTime
1081
1082 tmpSiteStr = f"{siteMaxTime}"
1083 if taskSpec.useHS06():
1084 oldSiteMaxTime = siteMaxTime
1085 siteMaxTime -= taskSpec.baseWalltime
1086 tmpSiteStr = f"({oldSiteMaxTime}-{taskSpec.baseWalltime})"
1087 if siteMaxTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1088 siteMaxTime *= tmpSiteSpec.coreCount
1089 tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1090 if taskSpec.useHS06():
1091 if siteMaxTime not in [None, 0]:
1092 siteMaxTime *= tmpSiteSpec.corepower
1093 tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1094 siteMaxTime *= float(taskSpec.cpuEfficiency) / 100.0
1095 siteMaxTime = int(siteMaxTime)
1096 tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1097 if origSiteMaxTime != 0:
1098 toSkip = False
1099 if minWalltime_dyn and tmpSiteSpec.mintime and tmpSiteSpec.mintime > 0:
1100 if minWalltime_dyn > siteMaxTime:
1101 tmp_msg = f" skip site={tmpSiteName} due to short site walltime {tmpSiteStr} " f"(site upper limit) less than {strMinWalltime_dyn} "
1102 toSkip = True
1103 else:
1104 if minWalltime and minWalltime > siteMaxTime:
1105 tmp_msg = f" skip site={tmpSiteName} due to short site walltime {tmpSiteStr} " f"(site upper limit) less than {strMinWalltime} "
1106 toSkip = True
1107 if toSkip:
1108 tmp_msg += "criteria=-shortwalltime"
1109 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1110 continue
1111
1112 if (
1113 (not sitePreAssigned and inputChunk.useScout())
1114 or inputChunk.isMerging
1115 or (not taskSpec.walltime and not taskSpec.walltimeUnit and not taskSpec.cpuTimeUnit)
1116 or (not taskSpec.getCpuTime() and taskSpec.cpuTimeUnit)
1117 ):
1118 minTimeForZeroWalltime = 24
1119 str_minTimeForZeroWalltime = f"{minTimeForZeroWalltime}hr*10HS06s"
1120 minTimeForZeroWalltime *= 60 * 60 * 10
1121
1122 if tmpSiteSpec.coreCount not in [None, 0]:
1123 minTimeForZeroWalltime *= tmpSiteSpec.coreCount
1124 str_minTimeForZeroWalltime += f"*{tmpSiteSpec.coreCount}cores"
1125
1126 if siteMaxTime != 0 and siteMaxTime < minTimeForZeroWalltime:
1127 tmp_msg = f" skip site={tmpSiteName} due to site walltime {tmpSiteStr} (site upper limit) insufficient "
1128 if inputChunk.useScout():
1129 tmp_msg += f"for scouts ({str_minTimeForZeroWalltime} at least) "
1130 tmp_msg += "criteria=-scoutwalltime"
1131 else:
1132 tmp_msg += f"for zero walltime ({str_minTimeForZeroWalltime} at least) "
1133 tmp_msg += "criteria=-zerowalltime"
1134 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1135 continue
1136
1137 siteMinTime = tmpSiteSpec.mintime
1138 origSiteMinTime = siteMinTime
1139 tmpSiteStr = f"{siteMinTime}"
1140 if taskSpec.useHS06():
1141 oldSiteMinTime = siteMinTime
1142 siteMinTime -= taskSpec.baseWalltime
1143 tmpSiteStr = f"({oldSiteMinTime}-{taskSpec.baseWalltime})"
1144 if siteMinTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1145 siteMinTime *= tmpSiteSpec.coreCount
1146 tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1147 if taskSpec.useHS06():
1148 if siteMinTime not in [None, 0]:
1149 siteMinTime *= tmpSiteSpec.corepower
1150 tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1151 siteMinTime *= float(taskSpec.cpuEfficiency) / 100.0
1152 siteMinTime = int(siteMinTime)
1153 tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1154 if origSiteMinTime != 0:
1155 toSkip = False
1156 if minWalltime_dyn and tmpSiteSpec.mintime and tmpSiteSpec.mintime > 0:
1157 if minWalltime_dyn < siteMinTime and (maxWalltime_dyn is None or maxWalltime_dyn < siteMinTime):
1158 tmp_msg = f" skip site {tmpSiteName} due to short job walltime {tmpSiteStr} " f"(site lower limit) greater than {strMinWalltime_dyn} "
1159 if maxWalltime_dyn:
1160 tmp_msg += f"and {strMinWalltime_dyn} "
1161 toSkip = True
1162 else:
1163 if (minWalltime is None or minWalltime < siteMinTime) and (maxWalltime is None or maxWalltime < siteMinTime):
1164 tmp_msg = f" skip site {tmpSiteName} due to short job walltime {tmpSiteStr} " f"(site lower limit) greater than {strMinWalltime} "
1165 if maxWalltime:
1166 tmp_msg += f"and {strMaxWalltime} "
1167 toSkip = True
1168 if toSkip:
1169 tmp_msg += "criteria=-longwalltime"
1170 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1171 continue
1172 newScanSiteList.append(tmpSiteName)
1173 tmpLog.unset_message_slot()
1174 scanSiteList = newScanSiteList
1175 if not taskSpec.useHS06():
1176 tmpLog.info(f"walltime check with {minWalltime}({taskSpec.walltimeUnit})")
1177 else:
1178 tmpStr = f"walltime check with {strMinWalltime}({taskSpec.cpuTimeUnit})"
1179 if maxWalltime:
1180 tmpStr += f" and {strMaxWalltime}"
1181 if minWalltime_dyn:
1182 tmpStr += f" for normal PQs, {strMinWalltime_dyn}"
1183 if maxWalltime_dyn:
1184 tmpStr += f" and {strMaxWalltime_dyn}"
1185 tmpStr += " for PQs with non-zero mintime"
1186 tmpLog.info(tmpStr)
1187 self.add_summary_message(oldScanSiteList, scanSiteList, "walltime check", tmpLog, msg_map)
1188 if not scanSiteList:
1189 self.dump_summary(tmpLog)
1190 tmpLog.error("no candidates")
1191 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1192 return retTmpError
1193
1194
1195
1196 if not sitePreAssigned:
1197 ipConnectivity = taskSpec.getIpConnectivity()
1198 ipStack = taskSpec.getIpStack()
1199 if ipConnectivity or ipStack:
1200 newScanSiteList = []
1201 oldScanSiteList = copy.copy(scanSiteList)
1202 msg_map = {}
1203 tmpLog.set_message_slot()
1204 for tmpSiteName in scanSiteList:
1205 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1206
1207 if ipConnectivity:
1208 wn_connectivity = tmpSiteSpec.get_wn_connectivity()
1209 if wn_connectivity == "full":
1210 pass
1211 elif wn_connectivity == "http" and ipConnectivity == "http":
1212 pass
1213 else:
1214 tmp_msg = f" skip site={tmpSiteName} due to insufficient connectivity site={wn_connectivity} vs task={ipConnectivity} "
1215 tmp_msg += "criteria=-network"
1216 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1217 continue
1218
1219 if ipStack:
1220 wn_ipstack = tmpSiteSpec.get_ipstack()
1221 if ipStack != wn_ipstack:
1222 tmp_msg = f" skip site={tmpSiteName} due to IP stack mismatch site={wn_ipstack} vs task={ipStack} "
1223 tmp_msg += "criteria=-network"
1224 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1225 continue
1226
1227 newScanSiteList.append(tmpSiteName)
1228 tmpLog.unset_message_slot()
1229 scanSiteList = newScanSiteList
1230 self.add_summary_message(oldScanSiteList, scanSiteList, "network check", tmpLog, msg_map)
1231 if not scanSiteList:
1232 self.dump_summary(tmpLog)
1233 tmpLog.error("no candidates")
1234 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1235 return retTmpError
1236
1237
1238
1239 if not sitePreAssigned:
1240 newScanSiteList = []
1241 oldScanSiteList = copy.copy(scanSiteList)
1242 msg_map = {}
1243 tmpLog.set_message_slot()
1244 for tmpSiteName in scanSiteList:
1245 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1246
1247 if taskSpec.useEventService() and not taskSpec.useJobCloning():
1248 if tmpSiteSpec.getJobSeed() == "std":
1249 tmp_msg = f" skip site={tmpSiteName} since EventService is not allowed "
1250 tmp_msg += "criteria=-es"
1251 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1252 continue
1253 if tmpSiteSpec.getJobSeed() == "eshigh" and not esHigh:
1254 tmp_msg = f" skip site={tmpSiteName} since low prio EventService is not allowed "
1255 tmp_msg += "criteria=-eshigh"
1256 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1257 continue
1258 else:
1259 if tmpSiteSpec.getJobSeed() == "es":
1260 tmp_msg = f" skip site={tmpSiteName} since only EventService is allowed "
1261 tmp_msg += "criteria=-nones"
1262 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1263 continue
1264
1265 if (
1266 taskSpec.useEventService()
1267 and not taskSpec.useJobCloning()
1268 and tmpSiteSpec.sitename != tmpSiteSpec.get_unified_name()
1269 and tmpSiteSpec.coreCount == 1
1270 ):
1271 tmp_msg = f" skip site={tmpSiteName} since EventService on UCORE/SCORE "
1272 tmp_msg += "criteria=-es_ucore"
1273 msg_map[tmpSiteSpec.get_unified_name()] = tmp_msg
1274 continue
1275 newScanSiteList.append(tmpSiteName)
1276 tmpLog.unset_message_slot()
1277 scanSiteList = newScanSiteList
1278 self.add_summary_message(oldScanSiteList, scanSiteList, "EventService check", tmpLog, msg_map)
1279 if not scanSiteList:
1280 self.dump_summary(tmpLog)
1281 tmpLog.error("no candidates")
1282 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1283 return retTmpError
1284
1285
1286
1287 newScanSiteList = []
1288 oldScanSiteList = copy.copy(scanSiteList)
1289 msg_map = {}
1290 tmpLog.set_message_slot()
1291 newSkippedTmp = dict()
1292 for tmpSiteName in self.get_unified_sites(scanSiteList):
1293 try:
1294 tmpAtlasSiteName = storageMapping[tmpSiteName]["default"]
1295 if tmpSiteName not in sites_in_nucleus + sites_sharing_output_storages_in_nucleus and nucleus != tmpAtlasSiteName:
1296 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1297
1298 def_maxTransferring = 2000
1299 if tmpSiteSpec.transferringlimit == 0:
1300
1301 maxTransferring = def_maxTransferring
1302 else:
1303 maxTransferring = tmpSiteSpec.transferringlimit
1304
1305 n_jobs_bad_transfer = 0
1306 if tmpSiteName in transferring_job_map:
1307 for tmp_nucleus in transferring_job_map[tmpSiteName]:
1308 if tmp_nucleus in nucleus_with_storages_unwritable_over_wan:
1309 n_jobs_bad_transfer += transferring_job_map[tmpSiteName][tmp_nucleus]
1310
1311 nTraJobs = AtlasBrokerUtils.getNumJobs(jobStatMap, tmpSiteName, "transferring") - n_jobs_bad_transfer
1312 nRunJobs = AtlasBrokerUtils.getNumJobs(jobStatMap, tmpSiteName, "running")
1313 if max(maxTransferring, 2 * nRunJobs) < nTraJobs:
1314 tmpStr = " skip site=%s due to too many transferring=%s greater than max(%s,2x%s) criteria=-transferring" % (
1315 tmpSiteName,
1316 nTraJobs,
1317 maxTransferring,
1318 nRunJobs,
1319 )
1320 newSkippedTmp[tmpSiteName] = tmpStr
1321 msg_map[tmpSiteName] = tmpStr
1322 except KeyError:
1323 pass
1324 newScanSiteList.append(tmpSiteName)
1325 tmpLog.unset_message_slot()
1326 siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
1327 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1328 self.add_summary_message(oldScanSiteList, scanSiteList, "transferring check", tmpLog, msg_map)
1329 if not scanSiteList:
1330 self.dump_summary(tmpLog)
1331 tmpLog.error("no candidates")
1332 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1333 return retTmpError
1334
1335
1336
1337 if not sitePreAssigned and work_shortage:
1338 newScanSiteList = []
1339 oldScanSiteList = copy.copy(scanSiteList)
1340 msg_map = {}
1341 tmpLog.set_message_slot()
1342 for tmpSiteName in self.get_unified_sites(scanSiteList):
1343 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1344 if tmpSiteSpec.is_opportunistic():
1345
1346 tmp_msg = f" skip site={tmpSiteName} to avoid opportunistic in case of work shortage "
1347 tmp_msg += "criteria=-non_pledged"
1348 msg_map[tmpSiteName] = tmp_msg
1349 continue
1350 elif tmpSiteSpec.pledgedCPU is not None and tmpSiteSpec.pledgedCPU > 0:
1351
1352 tmp_stat_dict = core_statistics.get(tmpSiteName, {})
1353 n_running_cores = tmp_stat_dict.get("running", 0)
1354 n_starting_cores = tmp_stat_dict.get("starting", 0)
1355 tmpLog.debug(f" {tmpSiteName} running={n_running_cores} starting={n_starting_cores}")
1356 if n_running_cores + n_starting_cores > tmpSiteSpec.pledgedCPU:
1357 tmp_msg = f" skip site={tmpSiteName} since nCores(running+starting)={n_running_cores+n_starting_cores} more than pledgedCPU={tmpSiteSpec.pledgedCPU} "
1358 tmp_msg += "in case of work shortage "
1359 tmp_msg += "criteria=-over_pledged"
1360 msg_map[tmpSiteName] = tmp_msg
1361 continue
1362 newScanSiteList.append(tmpSiteName)
1363 tmpLog.unset_message_slot()
1364 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1365 self.add_summary_message(oldScanSiteList, scanSiteList, "pledge check", tmpLog, msg_map)
1366 if not scanSiteList:
1367 self.dump_summary(tmpLog)
1368 tmpLog.error("no candidates")
1369 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1370 return retTmpError
1371
1372
1373
1374 if taskSpec.getT1Weight() < 0 and not inputChunk.isMerging:
1375 useT1Weight = True
1376 else:
1377 useT1Weight = False
1378 t1Weight = taskSpec.getT1Weight()
1379 if t1Weight == 0:
1380 tmpLog.info(f"IO intensity {taskSpec.ioIntensity}")
1381
1382 t1Weight = 1
1383 if taskSpec.ioIntensity is not None and taskSpec.ioIntensity > 500:
1384 t1Weight = WORLD_NUCLEUS_WEIGHT
1385
1386 oldScanSiteList = copy.copy(scanSiteList)
1387 msg_map = {}
1388 if t1Weight < 0 and not inputChunk.isMerging:
1389 newScanSiteList = []
1390 tmpLog.set_message_slot()
1391 for tmpSiteName in scanSiteList:
1392 if tmpSiteName not in sites_in_nucleus + sites_sharing_output_storages_in_nucleus:
1393 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1394 msg_map[tmpSiteSpec.get_unified_name()] = f" skip site={tmpSiteSpec.get_unified_name()} due to negative T1 weight criteria=-t1weight"
1395 continue
1396 newScanSiteList.append(tmpSiteName)
1397 tmpLog.unset_message_slot()
1398 scanSiteList = newScanSiteList
1399 t1Weight = 1
1400 t1Weight = max(t1Weight, t1WeightForHighPrio)
1401 tmpLog.info(f"T1 weight {t1Weight}")
1402 self.add_summary_message(oldScanSiteList, scanSiteList, "T1 weight check", tmpLog, msg_map)
1403 if not scanSiteList:
1404 self.dump_summary(tmpLog)
1405 tmpLog.error("no candidates")
1406 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1407 return retTmpError
1408
1409
1410
1411 if nucleusSpec:
1412 full_chain = taskSpec.check_full_chain_with_nucleus(nucleusSpec)
1413 oldScanSiteList = copy.copy(scanSiteList)
1414 msg_map = {}
1415 tmpLog.set_message_slot()
1416 newScanSiteList = []
1417 for tmpSiteName in scanSiteList:
1418 if full_chain:
1419
1420 if tmpSiteName not in sites_in_nucleus:
1421 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1422 msg_map[tmpSiteSpec.get_unified_name()] = (
1423 f" skip site={tmpSiteSpec.get_unified_name()} not in nucleus for full chain criteria=-full_chain"
1424 )
1425 continue
1426 else:
1427
1428 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1429 if tmpSiteSpec.bare_nucleus_mode() == "only":
1430 msg_map[tmpSiteSpec.get_unified_name()] = f" skip site={tmpSiteSpec.get_unified_name()} only for full chain criteria=-not_full_chain"
1431 continue
1432 newScanSiteList.append(tmpSiteName)
1433 tmpLog.unset_message_slot()
1434 scanSiteList = newScanSiteList
1435 self.add_summary_message(oldScanSiteList, scanSiteList, "full chain check", tmpLog, msg_map)
1436 if not scanSiteList:
1437 self.dump_summary(tmpLog)
1438 tmpLog.error("no candidates")
1439 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1440 return retTmpError
1441
1442
1443
1444 nPilotMap = {}
1445 if not sitePreAssigned and not siteListPreAssigned:
1446 nWNmap = self.taskBufferIF.getCurrentSiteData()
1447 newScanSiteList = []
1448 oldScanSiteList = copy.copy(scanSiteList)
1449 tmpLog.set_message_slot()
1450 newSkippedTmp = dict()
1451 for tmpSiteName in self.get_unified_sites(scanSiteList):
1452 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1453
1454 nPilot = 0
1455 if tmpSiteName in nWNmap:
1456 nPilot = nWNmap[tmpSiteName]["getJob"] + nWNmap[tmpSiteName]["updateJob"]
1457
1458 if (
1459 nPilot == 0
1460 and ("test" not in taskSpec.prodSourceLabel or inputChunk.isExpress())
1461 and (taskSpec.getNumJumboJobs() is None or not tmpSiteSpec.useJumboJobs())
1462 and tmpSiteSpec.getNumStandby(wq_tag, taskSpec.resource_type) is None
1463 ):
1464 tmpStr = f" skip site={tmpSiteName} due to no pilot criteria=-nopilot"
1465 newSkippedTmp[tmpSiteName] = tmpStr
1466 newScanSiteList.append(tmpSiteName)
1467 nPilotMap[tmpSiteName] = nPilot
1468 tmpLog.unset_message_slot()
1469 siteSkippedTmp = self.add_pseudo_sites_to_skip(newSkippedTmp, scanSiteList, siteSkippedTmp)
1470 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1471 self.add_summary_message(oldScanSiteList, scanSiteList, "pilot check", tmpLog, {})
1472 if not scanSiteList:
1473 self.dump_summary(tmpLog)
1474 tmpLog.error("no candidates")
1475 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1476 return retTmpError
1477
1478
1479 if hintForTB:
1480
1481
1482 newScanSiteList = []
1483 oldScanSiteList = copy.copy(scanSiteList)
1484 msg_map = {}
1485 tmpLog.set_message_slot()
1486 for tmpSiteName in scanSiteList:
1487 if tmpSiteName in siteSkippedTmp:
1488 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1489 msg_map[tmpSiteSpec.get_unified_name()] = siteSkippedTmp[tmpSiteName]
1490 else:
1491 newScanSiteList.append(tmpSiteName)
1492 tmpLog.unset_message_slot()
1493 scanSiteList = newScanSiteList
1494 self.add_summary_message(oldScanSiteList, scanSiteList, "temporary problem check", tmpLog, msg_map)
1495 if not scanSiteList:
1496 self.dump_summary(tmpLog)
1497 tmpLog.error("no candidates")
1498 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1499 return retTmpError
1500 return self.SC_SUCCEEDED, scanSiteList
1501
1502
1503
1504 siteSizeMap = {}
1505 siteSizeMapWT = {}
1506 availableFileMap = {}
1507 siteFilesMap = {}
1508 siteFilesMapWT = {}
1509 for datasetSpec in inputChunk.getDatasets():
1510 try:
1511
1512 siteStorageEP = AtlasBrokerUtils.getSiteInputStorageEndpointMap(
1513 self.get_unified_sites(scanSiteList), self.siteMapper, JobUtils.PROD_PS, JobUtils.PROD_PS
1514 )
1515
1516 checkCompleteness = True
1517 useCompleteOnly = False
1518 if inputChunk.isMerging:
1519 checkCompleteness = False
1520 if not datasetSpec.isMaster() or (taskSpec.ioIntensity and taskSpec.ioIntensity > self.io_intensity_cutoff and not taskSpec.inputPreStaging()):
1521 useCompleteOnly = True
1522
1523 tmpLog.debug(f"getting available files for {datasetSpec.datasetName}")
1524 tmpAvFileMap = self.ddmIF.getAvailableFiles(
1525 datasetSpec,
1526 siteStorageEP,
1527 self.siteMapper,
1528 check_completeness=checkCompleteness,
1529 storage_token=datasetSpec.storageToken,
1530 complete_only=useCompleteOnly,
1531 )
1532 tmpLog.debug("got")
1533 if tmpAvFileMap is None:
1534 raise Interaction.JEDITemporaryError("ddmIF.getAvailableFiles failed")
1535 availableFileMap[datasetSpec.datasetName] = tmpAvFileMap
1536 except Exception as e:
1537 tmp_str = str(e).replace("\\n", " ")
1538 tmpLog.error(f"failed to get available files with {tmp_str}")
1539 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1540 return retTmpError
1541
1542 for tmpSiteName in self.get_unified_sites(scanSiteList):
1543 siteSizeMap.setdefault(tmpSiteName, 0)
1544 siteSizeMapWT.setdefault(tmpSiteName, 0)
1545 siteFilesMap.setdefault(tmpSiteName, set())
1546 siteFilesMapWT.setdefault(tmpSiteName, set())
1547
1548 if tmpSiteName in availableFileMap[datasetSpec.datasetName]:
1549 availableFiles = availableFileMap[datasetSpec.datasetName][tmpSiteName]
1550 for tmpFileSpec in availableFiles["localdisk"] + availableFiles["cache"]:
1551 if tmpFileSpec.lfn not in siteFilesMap[tmpSiteName]:
1552 siteSizeMap[tmpSiteName] += tmpFileSpec.fsize
1553 siteSizeMapWT[tmpSiteName] += tmpFileSpec.fsize
1554 siteFilesMap[tmpSiteName].add(tmpFileSpec.lfn)
1555 siteFilesMapWT[tmpSiteName].add(tmpFileSpec.lfn)
1556 for tmpFileSpec in availableFiles["localtape"]:
1557 if tmpFileSpec.lfn not in siteFilesMapWT[tmpSiteName]:
1558 siteSizeMapWT[tmpSiteName] += tmpFileSpec.fsize
1559 siteFilesMapWT[tmpSiteName].add(tmpFileSpec.lfn)
1560
1561 allLFNs = set()
1562 totalSize = 0
1563 for datasetSpec in inputChunk.getDatasets():
1564 for fileSpec in datasetSpec.Files:
1565 if fileSpec.lfn not in allLFNs:
1566 allLFNs.add(fileSpec.lfn)
1567 try:
1568 totalSize += fileSpec.fsize
1569 except Exception:
1570 pass
1571
1572 maxNumFiles = len(allLFNs)
1573
1574
1575
1576 moveSizeCutoffGB = self.taskBufferIF.getConfigValue(COMPONENT, "SIZE_CUTOFF_TO_MOVE_INPUT", APP, VO)
1577 if moveSizeCutoffGB is None:
1578 moveSizeCutoffGB = 10
1579 moveNumFilesCutoff = self.taskBufferIF.getConfigValue(COMPONENT, "NUM_CUTOFF_TO_MOVE_INPUT", APP, VO)
1580 if moveNumFilesCutoff is None:
1581 moveNumFilesCutoff = 100
1582 if (
1583 not sitePreAssigned
1584 and totalSize > 0
1585 and not inputChunk.isMerging
1586 and taskSpec.ioIntensity is not None
1587 and taskSpec.ioIntensity > self.io_intensity_cutoff
1588 and not (taskSpec.useEventService() and not taskSpec.useJobCloning())
1589 ):
1590 newScanSiteList = []
1591 newScanSiteListWT = []
1592 msgList = []
1593 msgListDT = []
1594 for tmpSiteName in self.get_unified_sites(scanSiteList):
1595
1596 mbToMove = int((totalSize - siteSizeMap[tmpSiteName]) / (1024 * 1024))
1597 nFilesToMove = maxNumFiles - len(siteFilesMap[tmpSiteName])
1598 mbToMoveWT = int((totalSize - siteSizeMapWT[tmpSiteName]) / (1024 * 1024))
1599 nFilesToMoveWT = maxNumFiles - len(siteFilesMapWT[tmpSiteName])
1600 if min(mbToMove, mbToMoveWT) > moveSizeCutoffGB * 1024 or min(nFilesToMove, nFilesToMoveWT) > moveNumFilesCutoff:
1601 tmp_msg = f" skip site={tmpSiteName} "
1602 if mbToMove > moveSizeCutoffGB * 1024:
1603 tmp_msg += f"since size of missing input is too large ({int(mbToMove / 1024)} GB > {moveSizeCutoffGB} GB) "
1604 else:
1605 tmp_msg += f"since the number of missing input files is too large ({nFilesToMove} > {moveNumFilesCutoff}) "
1606 tmp_msg += f"for IO intensive task ({taskSpec.ioIntensity} > {self.io_intensity_cutoff} kBPerS) criteria=-io"
1607 msgListDT.append(tmp_msg)
1608 continue
1609 if mbToMove > moveSizeCutoffGB * 1024 or nFilesToMove > moveNumFilesCutoff:
1610 tmp_msg = f" skip site={tmpSiteName} "
1611 if mbToMove > moveSizeCutoffGB * 1024:
1612 tmp_msg += f"since size of missing disk input is too large ({int(mbToMove / 1024)} GB > {moveSizeCutoffGB} GB) "
1613 else:
1614 tmp_msg += f"since the number of missing disk input files is too large ({nFilesToMove} > {moveNumFilesCutoff}) "
1615 tmp_msg += f"for IO intensive task ({taskSpec.ioIntensity} > {self.io_intensity_cutoff} kBPerS) criteria=-io"
1616 msgList.append(tmp_msg)
1617 newScanSiteListWT.append(tmpSiteName)
1618 else:
1619 newScanSiteList.append(tmpSiteName)
1620 if len(newScanSiteList + newScanSiteListWT) == 0:
1621
1622 tmpLog.info("disabled IO check since no candidate passed")
1623 else:
1624 tmpLog.set_message_slot()
1625 for tmp_msg in msgListDT:
1626 tmpLog.info(tmp_msg)
1627 if len(newScanSiteList) == 0:
1628
1629 newScanSiteList = newScanSiteListWT
1630 else:
1631 for tmp_msg in msgList:
1632 tmpLog.info(tmp_msg)
1633 tmpLog.unset_message_slot()
1634 oldScanSiteList = copy.copy(scanSiteList)
1635 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1636 self.add_summary_message(oldScanSiteList, scanSiteList, "IO check", tmpLog, {})
1637 if scanSiteList == []:
1638 self.dump_summary(tmpLog)
1639 tmpLog.error("no candidates")
1640 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1641 return retTmpError
1642
1643
1644
1645 newScanSiteList = []
1646 oldScanSiteList = copy.copy(scanSiteList)
1647 msg_map = {}
1648 tmpLog.set_message_slot()
1649 for tmpSiteName in scanSiteList:
1650 if tmpSiteName in siteSkippedTmp:
1651 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1652 msg_map[tmpSiteSpec.get_unified_name()] = siteSkippedTmp[tmpSiteName]
1653 else:
1654 newScanSiteList.append(tmpSiteName)
1655 tmpLog.unset_message_slot()
1656 scanSiteList = newScanSiteList
1657 self.add_summary_message(oldScanSiteList, scanSiteList, "temporary problem check", tmpLog, msg_map)
1658 if scanSiteList == []:
1659 self.dump_summary(tmpLog)
1660 tmpLog.error("no candidates")
1661 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1662 return retTmpError
1663
1664
1665
1666 jobStatPrioMapGS = dict()
1667 jobStatPrioMapGSOnly = dict()
1668 if workQueue.is_global_share:
1669 tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
1670 else:
1671 tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsWithWorkQueue_JEDI(taskSpec.vo, taskSpec.prodSourceLabel)
1672 if tmpSt:
1673 tmpSt, jobStatPrioMapGS = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
1674 tmpSt, jobStatPrioMapGSOnly = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo, True)
1675 if tmpSt:
1676
1677 tmpSt, tmpStatMapRT = self.taskBufferIF.getJobStatisticsByResourceTypeSite(workQueue)
1678 if not tmpSt:
1679 tmpLog.error("failed to get job statistics with priority")
1680 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1681 return retTmpError
1682
1683 site_resource_type_map = self.taskBufferIF.get_distinct_resource_types_per_site(taskSpec.jediTaskID)
1684 workerStat = self.taskBufferIF.ups_load_worker_stats()
1685 upsQueues = set(self.taskBufferIF.ups_get_queues())
1686 tmpLog.info(f"calculate weight and check cap for {len(scanSiteList)} candidates")
1687 cutoffName = f"NQUEUELIMITSITE_{taskSpec.gshare}"
1688 cutOffValue = self.taskBufferIF.getConfigValue(COMPONENT, cutoffName, APP, VO)
1689 if not cutOffValue:
1690 cutOffValue = 20
1691 else:
1692 tmpLog.info(f"using {cutoffName}={cutOffValue} as lower limit for nQueued")
1693 weightMapPrimary = {}
1694 weightMapSecondary = {}
1695 weightMapJumbo = {}
1696 largestNumRun = None
1697 for tmpPseudoSiteName in scanSiteList:
1698 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1699 tmpSiteName = tmpSiteSpec.get_unified_name()
1700 if not workQueue.is_global_share and esHigh and tmpSiteSpec.getJobSeed() == "eshigh":
1701 tmp_wq_tag = wq_tag_global_share
1702 tmp_jobStatPrioMap = jobStatPrioMapGS
1703 else:
1704 tmp_wq_tag = wq_tag
1705 tmp_jobStatPrioMap = jobStatPrioMap
1706 nRunning = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "running", None, tmp_wq_tag)
1707 nRunningAll = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "running", None, None)
1708 corrNumPilotStr = ""
1709 if not workQueue.is_global_share:
1710
1711 nRunningGS = AtlasBrokerUtils.getNumJobs(jobStatPrioMapGS, tmpSiteName, "running", None, wq_tag_global_share)
1712 nRunningGSOnly = AtlasBrokerUtils.getNumJobs(jobStatPrioMapGSOnly, tmpSiteName, "running", None, wq_tag_global_share)
1713 corrNumPilot = float(nRunningGS - nRunningGSOnly + 1) / float(nRunningGS + 1)
1714 corrNumPilotStr = f"*(nRunResourceQueue({nRunningGS - nRunningGSOnly})+1)/(nRunGlobalShare({nRunningGS})+1)"
1715 else:
1716 corrNumPilot = 1
1717 nDefined = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "defined", None, tmp_wq_tag) + self.getLiveCount(tmpSiteName)
1718 nAssigned = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "assigned", None, tmp_wq_tag)
1719 nActivated = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "activated", None, tmp_wq_tag)
1720 nStarting = AtlasBrokerUtils.getNumJobs(tmp_jobStatPrioMap, tmpSiteName, "starting", None, tmp_wq_tag)
1721 if tmpSiteName in nPilotMap:
1722 nPilot = nPilotMap[tmpSiteName]
1723 else:
1724 nPilot = 0
1725
1726 nWorkers = 0
1727 nWorkersCutoff = 20
1728 if tmpSiteName in workerStat:
1729 for tmpHarvesterID, tmpLabelStat in workerStat[tmpSiteName].items():
1730 for tmpHarvesterID, tmpResStat in tmpLabelStat.items():
1731 for tmpResType, tmpCounts in tmpResStat.items():
1732 for tmpStatus, tmpNum in tmpCounts.items():
1733 if tmpStatus in ["running", "submitted"]:
1734 nWorkers += tmpNum
1735
1736 nWorkers = min(nWorkersCutoff, nWorkers)
1737
1738 if (
1739 nPilot > 0
1740 and nRunning < nWorkersCutoff
1741 and nWorkers > nRunning
1742 and tmpSiteName in upsQueues
1743 and taskSpec.currentPriority <= self.max_prio_for_bootstrap
1744 ):
1745 tmpLog.debug(f"using nWorkers={nWorkers} as nRunning at {tmpPseudoSiteName} since original nRunning={nRunning} is low")
1746 nRunning = nWorkers
1747
1748 numStandby = tmpSiteSpec.getNumStandby(wq_tag, taskSpec.resource_type)
1749 if numStandby is None:
1750 pass
1751 elif taskSpec.currentPriority > self.max_prio_for_bootstrap:
1752
1753 tmpLog.debug(f"ignored numSlots at {tmpPseudoSiteName} due to prio={taskSpec.currentPriority} > {self.max_prio_for_bootstrap}")
1754 elif numStandby == 0:
1755
1756 nRunning = nStarting + nRunning
1757 tmpLog.debug(f"using nStarting+nRunning at {tmpPseudoSiteName} to set nRunning={nRunning} due to numSlot={numStandby}")
1758 else:
1759
1760 nRunning = max(int(numStandby / tmpSiteSpec.coreCount), nRunning)
1761 tmpLog.debug(f"using numSlots={numStandby}/coreCount at {tmpPseudoSiteName} to set nRunning={nRunning}")
1762 manyAssigned = float(nAssigned + 1) / float(nActivated + 1)
1763 manyAssigned = min(2.0, manyAssigned)
1764 manyAssigned = max(1.0, manyAssigned)
1765
1766 if tmpSiteName not in siteSizeMap or siteSizeMap[tmpSiteName] >= totalSize:
1767 useAssigned = False
1768 else:
1769 useAssigned = True
1770
1771 RT_Cap = 2
1772 useCapRT = False
1773 tmpRTqueue = 0
1774 tmpRTrunning = 0
1775 resource_type_str = taskSpec.resource_type
1776 if tmpSiteName in tmpStatMapRT:
1777
1778 tmp_resource_types = site_resource_type_map.get(tmpSiteName, [taskSpec.resource_type])
1779 resource_type_str = ",".join(tmp_resource_types)
1780 for tmp_resource_type in tmp_resource_types:
1781 if tmp_resource_type in tmpStatMapRT[tmpSiteName]:
1782 useCapRT = True
1783 tmpRTrunning += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("running", 0)
1784 tmpRTqueue += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("defined", 0)
1785 if useAssigned:
1786 tmpRTqueue += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("assigned", 0)
1787 tmpRTqueue += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("activated", 0)
1788 tmpRTqueue += tmpStatMapRT[tmpSiteName][tmp_resource_type].get("starting", 0)
1789 if useCapRT:
1790 tmpRTrunning = max(tmpRTrunning, nRunning)
1791 if totalSize == 0 or totalSize - siteSizeMap[tmpSiteName] <= 0:
1792 weight = float(nRunning + 1) / float(nActivated + nStarting + nDefined + 10)
1793 weightStr = (
1794 f"nRun={nRunning} nAct={nActivated} nStart={nStarting} nDef={nDefined} nPilot={nPilot}{corrNumPilotStr} "
1795 f"totalSizeMB={int(totalSize / 1024 / 1024)} totalNumFiles={maxNumFiles} nRun_rt={tmpRTrunning} nQueued_rt={tmpRTqueue} resource_type={resource_type_str} "
1796 )
1797 else:
1798 weight = float(nRunning + 1) / float(nActivated + nAssigned + nStarting + nDefined + 10) / manyAssigned
1799 weightStr = (
1800 f"nRun={nRunning} nAct={nActivated} nAss={nAssigned} nStart={nStarting} nDef={nDefined} manyAss={manyAssigned} "
1801 f"nPilot={nPilot}{corrNumPilotStr} totalSizeMB={int(totalSize / 1024 / 1024)} "
1802 f"totalNumFiles={maxNumFiles} nRun_rt={tmpRTrunning} nQueued_rt={tmpRTqueue} resource_type={resource_type_str} "
1803 )
1804
1805
1806 skipRemoteData = False
1807 if totalSize > 0:
1808
1809 mbToMove = int((totalSize - siteSizeMap[tmpSiteName]) / (1024 * 1024))
1810
1811 nFilesToMove = maxNumFiles - len(siteFilesMap[tmpSiteName])
1812
1813 if tmpSiteSpec.use_only_local_data() and (mbToMove > 0 or nFilesToMove > 0):
1814 skipRemoteData = True
1815 else:
1816 weight = weight * (totalSize + siteSizeMap[tmpSiteName]) / totalSize / (nFilesToMove / 100 + 1)
1817 weightStr += f"fileSizeToMoveMB={mbToMove} nFilesToMove={nFilesToMove} "
1818
1819
1820 if tmpSiteName in sites_in_nucleus + sites_sharing_output_storages_in_nucleus:
1821 weight *= t1Weight
1822 weightStr += f"t1W={t1Weight} "
1823 if useT1Weight:
1824 weightStr += f"nRunningAll={nRunningAll} "
1825
1826
1827 if nucleus:
1828 tmpAtlasSiteName = None
1829 try:
1830 tmpAtlasSiteName = storageMapping[tmpSiteName]["default"]
1831 except KeyError:
1832 tmpLog.debug(f"Panda site {tmpSiteName} was not in site mapping. Default network values will be given")
1833
1834 try:
1835 closeness = networkMap[tmpAtlasSiteName][AGIS_CLOSENESS]
1836 except KeyError:
1837 tmpLog.debug(f"No {AGIS_CLOSENESS} information found in network matrix from {tmpAtlasSiteName}({tmpSiteName}) to {nucleus}")
1838 closeness = MAX_CLOSENESS * 0.7
1839
1840 try:
1841 nFilesInQueue = networkMap[tmpAtlasSiteName][queued_tag]
1842 except KeyError:
1843 tmpLog.debug(f"No {queued_tag} information found in network matrix from {tmpAtlasSiteName} ({tmpSiteName}) to {nucleus}")
1844 nFilesInQueue = 0
1845
1846 mbps = None
1847 try:
1848 mbps = networkMap[tmpAtlasSiteName][FTS_1W]
1849 mbps = networkMap[tmpAtlasSiteName][FTS_1D]
1850 mbps = networkMap[tmpAtlasSiteName][FTS_1H]
1851 except KeyError:
1852 if mbps is None:
1853 tmpLog.debug(f"No dynamic FTS mbps information found in network matrix from {tmpAtlasSiteName}({tmpSiteName}) to {nucleus}")
1854
1855
1856 if nucleus == tmpAtlasSiteName:
1857 weightNwQueue = 2.5
1858 weightNwThroughput = 2.5
1859 else:
1860
1861 weightNwQueue = 2 - (nFilesInQueue * 1.0 / self.queue_threshold)
1862
1863
1864 if mbps is not None:
1865 weightNwThroughput = self.convertMBpsToWeight(mbps)
1866 else:
1867 weightNwThroughput = 1 + ((MAX_CLOSENESS - closeness) * 1.0 / (MAX_CLOSENESS - MIN_CLOSENESS))
1868
1869
1870 weightNw = self.nwQueueImportance * weightNwQueue + self.nwThroughputImportance * weightNwThroughput
1871
1872 weightStr += f"weightNw={weightNw} ( closeness={closeness} nFilesQueued={nFilesInQueue} throughputMBps={mbps} Network weight:{self.nwActive} )"
1873
1874
1875 if self.nwActive:
1876 weight *= weightNw
1877
1878 tmpLog.debug(
1879 f"subject=network_data src={tmpAtlasSiteName} dst={nucleus} weight={weight} weightNw={weightNw} "
1880 f"weightNwThroughput={weightNwThroughput} weightNwQueued={weightNwQueue} mbps={mbps} closeness={closeness} nqueued={nFilesInQueue}"
1881 )
1882
1883
1884 if preference_weight_map and tmpSiteName in preference_weight_map:
1885 pref_weight = preference_weight_map[tmpSiteName]
1886 weight *= pref_weight
1887 weightStr += f"prefW={pref_weight} "
1888 tmpLog.info(f"prefW={pref_weight} ")
1889
1890
1891 siteCandidateSpec = SiteCandidate(tmpPseudoSiteName, tmpSiteName)
1892
1893 siteCandidateSpec.override_attribute("maxwdir", newMaxwdir.get(tmpSiteName))
1894 platforms = resolved_platforms.get(tmpSiteName)
1895 if platforms:
1896 siteCandidateSpec.override_attribute("platforms", platforms)
1897
1898 siteCandidateSpec.weight = weight
1899 siteCandidateSpec.nRunningJobs = nRunning
1900 siteCandidateSpec.nAssignedJobs = nAssigned
1901
1902 for tmpDatasetName, availableFiles in availableFileMap.items():
1903 if tmpSiteName in availableFiles:
1904 siteCandidateSpec.add_local_disk_files(availableFiles[tmpSiteName]["localdisk"])
1905 siteCandidateSpec.add_local_tape_files(availableFiles[tmpSiteName]["localtape"])
1906 siteCandidateSpec.add_cache_files(availableFiles[tmpSiteName]["cache"])
1907 siteCandidateSpec.add_remote_files(availableFiles[tmpSiteName]["remote"])
1908
1909 if taskSpec.allowInputWAN() and tmpSiteSpec.allowWanInputAccess():
1910 siteCandidateSpec.remoteProtocol = "direct"
1911 for datasetSpec in inputChunk.getDatasets():
1912 siteCandidateSpec.add_remote_files(datasetSpec.Files)
1913
1914
1915 lockedByBrokerage = self.checkSiteLock(taskSpec.vo, taskSpec.prodSourceLabel, tmpPseudoSiteName, taskSpec.workQueue_ID, taskSpec.resource_type)
1916
1917
1918 nPilot *= corrNumPilot
1919 cutOffFactor = 2
1920 if tmpSiteSpec.capability == "ucore":
1921 if not inputChunk.isExpress():
1922 siteCandidateSpec.nRunningJobsCap = max(cutOffValue, cutOffFactor * tmpRTrunning)
1923 siteCandidateSpec.nQueuedJobs = tmpRTqueue
1924 else:
1925 if not inputChunk.isExpress():
1926 siteCandidateSpec.nRunningJobsCap = max(cutOffValue, cutOffFactor * nRunning)
1927 if useAssigned:
1928 siteCandidateSpec.nQueuedJobs = nActivated + nAssigned + nStarting
1929 else:
1930 siteCandidateSpec.nQueuedJobs = nActivated + nStarting
1931 if taskSpec.getNumJumboJobs() is None or not tmpSiteSpec.useJumboJobs():
1932 forJumbo = False
1933 else:
1934 forJumbo = True
1935
1936 if not forJumbo:
1937 okMsg = f" use site={tmpPseudoSiteName} with weight={weight} {weightStr} criteria=+use"
1938 okAsPrimary = False
1939 else:
1940 okMsg = f" use site={tmpPseudoSiteName} for jumbo jobs with weight={weight} {weightStr} criteria=+usejumbo"
1941 okAsPrimary = True
1942
1943 if lockedByBrokerage:
1944 ngMsg = f" skip site={tmpPseudoSiteName} due to locked by another brokerage criteria=-lock"
1945 elif skipRemoteData:
1946 ngMsg = f" skip site={tmpPseudoSiteName} due to non-local data criteria=-non_local"
1947 elif not inputChunk.isExpress() and tmpSiteSpec.capability != "ucore" and siteCandidateSpec.nQueuedJobs > siteCandidateSpec.nRunningJobsCap:
1948 if not useAssigned:
1949 ngMsg = f" skip site={tmpPseudoSiteName} weight={weight} due to nDefined+nActivated+nStarting={siteCandidateSpec.nQueuedJobs} "
1950 ngMsg += "(nAssigned ignored due to data locally available) "
1951 else:
1952 ngMsg = f" skip site={tmpPseudoSiteName} weight={weight} due to nDefined+nActivated+nAssigned+nStarting={siteCandidateSpec.nQueuedJobs} "
1953 ngMsg += f"greater than max({cutOffValue},{cutOffFactor}*nRun) {weightStr} criteria=-cap"
1954 elif self.nwActive and inputChunk.isExpress() and weightNw < self.nw_threshold * self.nw_weight_multiplier:
1955 ngMsg = (
1956 f" skip site={tmpPseudoSiteName} due to low network weight for express task weightNw={weightNw} threshold={self.nw_threshold} "
1957 f"{weightStr} criteria=-lowNetworkWeight"
1958 )
1959 elif useCapRT and tmpRTqueue > max(cutOffValue, tmpRTrunning * RT_Cap) and not inputChunk.isExpress():
1960 ngMsg = f" skip site={tmpSiteName} since "
1961 if useAssigned:
1962 ngMsg += f"nDefined_rt+nActivated_rt+nAssigned_rt+nStarting_rt={tmpRTqueue} "
1963 else:
1964 ngMsg += f"nDefined_rt+nActivated_rt+nStarting_rt={tmpRTqueue} (nAssigned_rt ignored due to data locally available) "
1965 ngMsg += f"with gshare+resource_type({resource_type_str}) is greater than max({cutOffValue},{RT_Cap}*nRun_rt={RT_Cap}*{tmpRTrunning}) criteria=-cap_rt"
1966 elif (
1967 nRunning + nActivated + nAssigned + nStarting + nDefined == 0
1968 and taskSpec.currentPriority <= self.max_prio_for_bootstrap
1969 and not inputChunk.isMerging
1970 and not inputChunk.isExpress()
1971 ):
1972 okMsg = f" use site={tmpPseudoSiteName} to bootstrap (no running or queued jobs) criteria=+use"
1973 ngMsg = (
1974 f" skip site={tmpPseudoSiteName} as others being bootstrapped (no running or queued jobs), "
1975 f"weight={weight} {weightStr} criteria=-others_bootstrap"
1976 )
1977 okAsPrimary = True
1978
1979 weight = 0
1980 siteCandidateSpec.weight = weight
1981 else:
1982 if min_weight > 0 and weight < min_weight:
1983 ngMsg = (
1984 f" skip site={tmpPseudoSiteName} due to weight below the minimum {min_weight_param}={min_weight}, "
1985 f"weight={weight} {weightStr} criteria=-below_min_weight"
1986 )
1987 elif useT1Weight:
1988 ngMsg = f" skip site={tmpPseudoSiteName} due to low total nRunningAll={nRunningAll} for negative T1 weight criteria=-t1_weight"
1989 if not largestNumRun or largestNumRun[-1] < nRunningAll:
1990 largestNumRun = (tmpPseudoSiteName, nRunningAll)
1991 okAsPrimary = True
1992
1993 for tmpWeight, tmpCandidates in weightMapPrimary.items():
1994 weightMapSecondary.setdefault(tmpWeight, [])
1995 weightMapSecondary[tmpWeight] += tmpCandidates
1996 weightMapPrimary = {}
1997 else:
1998 ngMsg = f" skip site={tmpPseudoSiteName} due to low weight, weight={weight} {weightStr} criteria=-low_weight"
1999 okAsPrimary = True
2000
2001 if forJumbo:
2002
2003 if not okAsPrimary:
2004 continue
2005 weightMap = weightMapJumbo
2006 elif okAsPrimary:
2007 weightMap = weightMapPrimary
2008 else:
2009 weightMap = weightMapSecondary
2010
2011 if weight not in weightMap:
2012 weightMap[weight] = []
2013 weightMap[weight].append((siteCandidateSpec, okMsg, ngMsg))
2014
2015 weightMap = weightMapPrimary
2016
2017 weightRank = None
2018
2019 for tmpWeight in weightMapSecondary.keys():
2020 for siteCandidateSpec, tmpOkMsg, tmpNgMsg in weightMapSecondary[tmpWeight]:
2021 tmpLog.info(tmpNgMsg)
2022 if weightMapPrimary == {}:
2023 tmpLog.info("available sites all capped")
2024
2025 for weight, tmpList in weightMapJumbo.items():
2026 if weight not in weightMap:
2027 weightMap[weight] = []
2028 for tmpItem in tmpList:
2029 weightMap[weight].append(tmpItem)
2030
2031 maxSiteCandidates = 10
2032 newScanSiteList = []
2033 weightList = sorted(weightMap.keys())
2034 weightList.reverse()
2035
2036 if 0 in weightList:
2037 weightList.remove(0)
2038 weightList.insert(0, 0)
2039 for weightIdx, tmpWeight in enumerate(weightList):
2040 for siteCandidateSpec, tmpOkMsg, tmpNgMsg in weightMap[tmpWeight]:
2041
2042 if taskSpec.getNumJumboJobs() is not None:
2043 tmpSiteSpec = self.siteMapper.getSite(siteCandidateSpec.siteName)
2044 if tmpSiteSpec.useJumboJobs():
2045
2046 tmpLog.info(tmpOkMsg)
2047 inputChunk.addSiteCandidateForJumbo(siteCandidateSpec)
2048 if inputChunk.useJumbo not in ["fake", "only"]:
2049 continue
2050
2051 if (weightRank is None or weightIdx < weightRank) and (maxSiteCandidates is None or len(newScanSiteList) < maxSiteCandidates):
2052
2053 tmpLog.info(tmpOkMsg)
2054 newScanSiteList.append(siteCandidateSpec.siteName)
2055 inputChunk.addSiteCandidate(siteCandidateSpec)
2056 else:
2057
2058 tmpLog.info(tmpNgMsg)
2059 oldScanSiteList = copy.copy(scanSiteList)
2060 scanSiteList = newScanSiteList
2061 self.add_summary_message(oldScanSiteList, scanSiteList, "final check", tmpLog, {})
2062
2063 if scanSiteList == []:
2064 self.dump_summary(tmpLog)
2065 tmpLog.error("no candidates")
2066 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
2067 return retTmpError
2068
2069 self.dump_summary(tmpLog, scanSiteList)
2070
2071 tmpLog.info("done")
2072 return self.SC_SUCCEEDED, inputChunk