File indexing completed on 2026-04-10 08:38:57
0001 import copy
0002 import datetime
0003 import math
0004 import random
0005 import re
0006 import sys
0007 import traceback
0008
0009 from pandacommon.pandalogger.PandaLogger import PandaLogger
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011
0012 from pandajedi.jedicore import Interaction
0013 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0014 from pandajedi.jedicore.SiteCandidate import SiteCandidate
0015 from pandaserver.dataservice.DataServiceUtils import select_scope
0016 from pandaserver.srvcore import CoreUtils
0017 from pandaserver.taskbuffer import JobUtils
0018
0019 from . import AtlasBrokerUtils
0020 from .JobBrokerBase import JobBrokerBase
0021
0022 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0023
0024 APP = "jedi"
0025 COMPONENT = "jobbroker"
0026 VO = "atlas"
0027
0028
0029
0030 class AtlasAnalJobBroker(JobBrokerBase):
0031
0032 def __init__(self, ddmIF, taskBufferIF):
0033 JobBrokerBase.__init__(self, ddmIF, taskBufferIF)
0034 self.dataSiteMap = {}
0035
0036
0037 try:
0038 self.sw_map = taskBufferIF.load_sw_map()
0039 except Exception:
0040 logger.error("Failed to load the SW tags map!!!")
0041 self.sw_map = {}
0042
0043
0044 try:
0045 self.architecture_level_map = taskBufferIF.get_architecture_level_map()
0046 except BaseException:
0047 logger.error("Failed to load the WN architecture level map!!!")
0048 self.architecture_level_map = {}
0049
0050
0051 def doBrokerage(self, taskSpec, cloudName, inputChunk, taskParamMap):
0052
0053 if inputChunk.masterDataset:
0054 msg_tag = f"<jediTaskID={taskSpec.jediTaskID} datasetID={inputChunk.masterDataset.datasetID}>"
0055 else:
0056 msg_tag = f"<jediTaskID={taskSpec.jediTaskID}>"
0057 tmpLog = MsgWrapper(logger, msg_tag, monToken=f"<jediTaskID={taskSpec.jediTaskID} {naive_utcnow().isoformat('/')}>")
0058 tmpLog.debug("start")
0059
0060 retFatal = self.SC_FATAL, inputChunk
0061 retTmpError = self.SC_FAILED, inputChunk
0062
0063 newMaxwdir = {}
0064
0065 sitePreAssigned = False
0066 siteListPreAssigned = False
0067 excludeList = []
0068 includeList = None
0069 scanSiteList = []
0070
0071 problematic_sites_dict = {}
0072
0073 if inputChunk.isMerging or taskSpec.avoid_vp() or taskSpec.useScout() or taskSpec.useLocalIO():
0074 useVP = False
0075 else:
0076 useVP = True
0077
0078 avoidVP = False
0079 if inputChunk.isMerging:
0080 avoidVP = True
0081
0082 workQueue = self.taskBufferIF.getWorkQueueMap().getQueueWithIDGshare(taskSpec.workQueue_ID, taskSpec.gshare)
0083
0084
0085 if taskSpec.useLimitedSites():
0086 if "excludedSite" in taskParamMap:
0087 excludeList = taskParamMap["excludedSite"]
0088
0089 try:
0090 if not isinstance(excludeList, list):
0091 excludeList = excludeList.split(",")
0092 except Exception:
0093 pass
0094 if "includedSite" in taskParamMap:
0095 includeList = taskParamMap["includedSite"]
0096
0097 if includeList == "":
0098 includeList = None
0099 try:
0100 if not isinstance(includeList, list):
0101 includeList = includeList.split(",")
0102 siteListPreAssigned = True
0103 except Exception:
0104 pass
0105
0106 for siteName, tmpSiteSpec in self.siteMapper.siteSpecList.items():
0107 if tmpSiteSpec.type == "analysis" or tmpSiteSpec.is_grandly_unified():
0108 scanSiteList.append(siteName)
0109
0110 preassignedSite = taskSpec.site
0111 if preassignedSite not in ["", None]:
0112
0113 if not self.siteMapper.checkSite(preassignedSite):
0114
0115 includeList = []
0116 for tmpSiteName in self.get_unified_sites(scanSiteList):
0117 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0118 scope_input, scope_output = select_scope(tmpSiteSpec, JobUtils.ANALY_PS, JobUtils.ANALY_PS)
0119 if scope_input in tmpSiteSpec.ddm_endpoints_input and preassignedSite in tmpSiteSpec.ddm_endpoints_input[scope_input].all:
0120 includeList.append(tmpSiteName)
0121 if not includeList:
0122 includeList = None
0123 tmpLog.info(f"site={preassignedSite} is ignored since unknown")
0124 else:
0125 tmpLog.info(f"site={preassignedSite} is converted to {','.join(includeList)}")
0126 preassignedSite = None
0127 else:
0128 tmpLog.info(f"site={preassignedSite} is pre-assigned")
0129 sitePreAssigned = True
0130 if preassignedSite not in scanSiteList:
0131 scanSiteList.append(preassignedSite)
0132 tmpLog.info(f"initial {len(self.get_unified_sites(scanSiteList))} candidates")
0133
0134 allowedRemoteProtocol = "fax"
0135
0136 if taskSpec.coreCount is not None and taskSpec.coreCount > 1:
0137
0138 useMP = "only"
0139 elif taskSpec.coreCount == 0:
0140
0141 useMP = "any"
0142 else:
0143
0144 useMP = "unuse"
0145
0146 timeWindowForFC = self.taskBufferIF.getConfigValue("anal_jobbroker", "TW_DONE_JOB_STAT", "jedi", taskSpec.vo)
0147 if timeWindowForFC is None:
0148 timeWindowForFC = 6
0149
0150
0151 minBadJobsToSkipPQ = self.taskBufferIF.getConfigValue("anal_jobbroker", "MIN_BAD_JOBS_TO_SKIP_PQ", "jedi", taskSpec.vo)
0152 if minBadJobsToSkipPQ is None:
0153 minBadJobsToSkipPQ = 5
0154
0155
0156 totalJobStat = self.get_task_common("totalJobStat")
0157 if totalJobStat is None:
0158 if taskSpec.workingGroup:
0159 totalJobStat = self.taskBufferIF.countJobsPerTarget_JEDI(taskSpec.workingGroup, False)
0160 else:
0161 totalJobStat = self.taskBufferIF.countJobsPerTarget_JEDI(taskSpec.origUserName, True)
0162 self.set_task_common("totalJobStat", totalJobStat)
0163
0164 if totalJobStat:
0165 if taskSpec.workingGroup:
0166 gdp_token_jobs = "CAP_RUNNING_GROUP_JOBS"
0167 gdp_token_cores = "CAP_RUNNING_GROUP_CORES"
0168 else:
0169 gdp_token_jobs = "CAP_RUNNING_USER_JOBS"
0170 gdp_token_cores = "CAP_RUNNING_USER_CORES"
0171 maxNumRunJobs = self.taskBufferIF.getConfigValue("prio_mgr", gdp_token_jobs)
0172 maxNumRunCores = self.taskBufferIF.getConfigValue("prio_mgr", gdp_token_cores)
0173 maxFactor = 2
0174
0175 if maxNumRunJobs:
0176 if totalJobStat["nRunJobs"] > maxNumRunJobs:
0177 tmpLog.error(f"throttle to generate jobs due to too many running jobs {totalJobStat['nRunJobs']} > {gdp_token_jobs}")
0178 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0179 return retTmpError
0180 elif totalJobStat["nQueuedJobs"] > maxFactor * maxNumRunJobs:
0181 tmpLog.error(f"throttle to generate jobs due to too many queued jobs {totalJobStat['nQueuedJobs']} > {maxFactor}x{gdp_token_jobs}")
0182 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0183 return retTmpError
0184 if maxNumRunCores:
0185 if totalJobStat["nRunCores"] > maxNumRunCores:
0186 tmpLog.error(f"throttle to generate jobs due to too many running cores {totalJobStat['nRunCores']} > {gdp_token_cores}")
0187 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0188 return retTmpError
0189 elif totalJobStat["nQueuedCores"] > maxFactor * maxNumRunCores:
0190 tmpLog.error(f"throttle to generate jobs due to too many queued cores {totalJobStat['nQueuedCores']} > {maxFactor}x{gdp_token_cores}")
0191 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0192 return retTmpError
0193
0194
0195 ret_val, gshare_usage_dict = AtlasBrokerUtils.getGShareUsage(tbIF=self.taskBufferIF, gshare=taskSpec.gshare)
0196 if not ret_val:
0197 tmpLog.warning(f"failed to get gshare usage of {taskSpec.gshare}")
0198 elif not gshare_usage_dict:
0199 tmpLog.warning(f"got empty gshare usage of {taskSpec.gshare}")
0200
0201
0202 l1_share_usage_dict = None
0203 if taskSpec.gshare == "User Analysis":
0204 l1_share_name = "L1 User Analysis"
0205 ret_val, l1_share_usage_dict = AtlasBrokerUtils.getGShareUsage(tbIF=self.taskBufferIF, gshare=l1_share_name)
0206 if not ret_val:
0207 tmpLog.warning(f"failed to get gshare usage of {l1_share_name}")
0208 elif not l1_share_usage_dict:
0209 tmpLog.warning(f"got empty gshare usage of {l1_share_name}")
0210
0211
0212 ret_val, analy_sites_class_dict = AtlasBrokerUtils.getAnalySitesClass(tbIF=self.taskBufferIF)
0213 if not ret_val:
0214 tmpLog.warning("failed to get analy sites classification")
0215 elif not analy_sites_class_dict:
0216 analy_sites_class_dict = {}
0217 tmpLog.warning("got empty analy sites classification")
0218
0219
0220 ret_val, user_eval_dict = AtlasBrokerUtils.getUserEval(tbIF=self.taskBufferIF, user=taskSpec.origUserName)
0221 if not ret_val:
0222 tmpLog.warning(f"failed to get user evaluation of {taskSpec.origUserName}")
0223 elif not user_eval_dict:
0224 tmpLog.warning(f"got empty user evaluation of {taskSpec.origUserName}")
0225
0226
0227 ret_val, task_eval_dict = AtlasBrokerUtils.getUserTaskEval(tbIF=self.taskBufferIF, taskID=taskSpec.jediTaskID)
0228 if not ret_val:
0229 tmpLog.warning("failed to get user task evaluation")
0230 elif not task_eval_dict:
0231 tmpLog.warning("got empty user task evaluation")
0232
0233
0234 threshold_A = self.taskBufferIF.getConfigValue("analy_eval", "USER_USAGE_THRESHOLD_A", "pandaserver", taskSpec.vo)
0235 if threshold_A is None:
0236 threshold_A = 1000
0237 threshold_B = self.taskBufferIF.getConfigValue("analy_eval", "USER_USAGE_THRESHOLD_B", "pandaserver", taskSpec.vo)
0238 if threshold_B is None:
0239 threshold_B = 10000
0240 user_analyis_to_throttle_threshold_perc_A = 100
0241 user_analyis_to_throttle_threshold_perc_B = min(95, user_analyis_to_throttle_threshold_perc_A)
0242 user_analyis_to_throttle_threshold_perc_C = min(90, user_analyis_to_throttle_threshold_perc_B)
0243 user_analyis_throttle_intensity_A = 1.0
0244
0245
0246 base_queue_length_per_pq = self.taskBufferIF.getConfigValue("anal_jobbroker", "BASE_QUEUE_LENGTH_PER_PQ", "jedi", taskSpec.vo)
0247 if base_queue_length_per_pq is None:
0248 base_queue_length_per_pq = 100
0249 base_expected_wait_hour_on_pq = self.taskBufferIF.getConfigValue("anal_jobbroker", "BASE_EXPECTED_WAIT_HOUR_ON_PQ", "jedi", taskSpec.vo)
0250 if base_expected_wait_hour_on_pq is None:
0251 base_expected_wait_hour_on_pq = 8
0252 base_default_queue_length_per_pq_user = self.taskBufferIF.getConfigValue("anal_jobbroker", "BASE_DEFAULT_QUEUE_LENGTH_PER_PQ_USER", "jedi", taskSpec.vo)
0253 if base_default_queue_length_per_pq_user is None:
0254 base_default_queue_length_per_pq_user = 5
0255 base_queue_ratio_on_pq = self.taskBufferIF.getConfigValue("anal_jobbroker", "BASE_QUEUE_RATIO_ON_PQ", "jedi", taskSpec.vo)
0256 if base_queue_ratio_on_pq is None:
0257 base_queue_ratio_on_pq = 0.05
0258 static_max_queue_running_ratio = self.taskBufferIF.getConfigValue("anal_jobbroker", "STATIC_MAX_QUEUE_RUNNING_RATIO", "jedi", taskSpec.vo)
0259 if static_max_queue_running_ratio is None:
0260 static_max_queue_running_ratio = 2.0
0261 max_expected_wait_hour = self.taskBufferIF.getConfigValue("anal_jobbroker", "MAX_EXPECTED_WAIT_HOUR", "jedi", taskSpec.vo)
0262 if max_expected_wait_hour is None:
0263 max_expected_wait_hour = 12.0
0264
0265 max_missing_input_files = self.taskBufferIF.getConfigValue("jobbroker", "MAX_MISSING_INPUT_FILES", "jedi", taskSpec.vo)
0266 if max_missing_input_files is None:
0267 max_missing_input_files = 10
0268 min_input_completeness = self.taskBufferIF.getConfigValue("jobbroker", "MIN_INPUT_COMPLETENESS", "jedi", taskSpec.vo)
0269 if min_input_completeness is None:
0270 min_input_completeness = 90
0271
0272
0273 min_weight_param = f"MIN_WEIGHT_{taskSpec.prodSourceLabel}_{taskSpec.gshare}"
0274 min_weight = self.taskBufferIF.getConfigValue("jobbroker", min_weight_param, "jedi", taskSpec.vo)
0275 if min_weight is None:
0276 min_weight_param = f"MIN_WEIGHT_{taskSpec.prodSourceLabel}"
0277 min_weight = self.taskBufferIF.getConfigValue("jobbroker", min_weight_param, "jedi", taskSpec.vo)
0278 if min_weight is None:
0279 min_weight = 0
0280
0281
0282 if taskSpec.gshare in ["User Analysis"] and gshare_usage_dict and task_eval_dict:
0283 try:
0284 usage_percent = gshare_usage_dict["usage_perc"] * 100
0285 if l1_share_usage_dict and l1_share_usage_dict.get("eqiv_usage_perc") is not None:
0286 usage_percent = min(usage_percent, l1_share_usage_dict["eqiv_usage_perc"] * 100)
0287 task_class_value = task_eval_dict["class"]
0288 usage_slot_ratio_A = 0.5
0289 if user_eval_dict:
0290 usage_slot_ratio_A = 1.0 - user_eval_dict["rem_slots_A"] / threshold_A
0291
0292 if task_class_value == -1 and usage_percent > user_analyis_to_throttle_threshold_perc_C:
0293
0294 tmpLog.error(
0295 "throttle to generate jobs due to gshare {gshare} > {threshold}% of target and task in class C".format(
0296 gshare=taskSpec.gshare, threshold=user_analyis_to_throttle_threshold_perc_C
0297 )
0298 )
0299 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0300 return retTmpError
0301 elif task_class_value == 0 and usage_percent > user_analyis_to_throttle_threshold_perc_B:
0302
0303 tmpLog.error(
0304 "throttle to generate jobs due to gshare {gshare} > {threshold}% of target and task in class B".format(
0305 gshare=taskSpec.gshare, threshold=user_analyis_to_throttle_threshold_perc_B
0306 )
0307 )
0308 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0309 return retTmpError
0310 elif (
0311 task_class_value == 1 and usage_percent * usage_slot_ratio_A * user_analyis_throttle_intensity_A > user_analyis_to_throttle_threshold_perc_A
0312 ):
0313
0314 tmpLog.error(
0315 "throttle to generate jobs due to gshare {gshare} > {threshold:.3%} of target and task in class A".format(
0316 gshare=taskSpec.gshare, threshold=user_analyis_throttle_intensity_A / (usage_slot_ratio_A + 2**-20)
0317 )
0318 )
0319 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0320 return retTmpError
0321 except Exception as e:
0322 tmpLog.error(f"got error when checking low-ranked tasks to throttle; skipped : {e}")
0323
0324
0325 if taskSpec.workingGroup:
0326 global_quota_ok, near_global_limit, quota_msg = self.ddmIF.check_global_quota(taskSpec.workingGroup)
0327 local_quota_ok, endpoints_over_local_quota = self.ddmIF.get_endpoints_over_local_quota(taskSpec.workingGroup)
0328 else:
0329 global_quota_ok, near_global_limit, quota_msg = self.ddmIF.check_global_quota(taskSpec.userName)
0330 local_quota_ok, endpoints_over_local_quota = self.ddmIF.get_endpoints_over_local_quota(taskSpec.userName)
0331
0332
0333 if not global_quota_ok:
0334 tmpLog.error(f"throttle to generate jobs due to {quota_msg}")
0335 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0336 return retTmpError
0337
0338
0339 if near_global_limit and not inputChunk.isMerging:
0340 tmpLog.error(f"throttle to generate only merge jobs due to {quota_msg}")
0341 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0342 return retTmpError
0343
0344
0345 if not local_quota_ok:
0346 tmpLog.error(f"failed to check local quota")
0347 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0348 return retTmpError
0349
0350
0351 failureCounts = self.get_task_common("failureCounts")
0352 if failureCounts is None:
0353 failureCounts = self.taskBufferIF.getFailureCountsForTask_JEDI(taskSpec.jediTaskID, timeWindowForFC)
0354 self.set_task_common("failureCounts", failureCounts)
0355
0356
0357 io_intensity_key = "IO_INTENSITY_CUTOFF_USER"
0358 io_intensity_cutoff = self.taskBufferIF.getConfigValue("anal_jobbroker", io_intensity_key, "jedi", taskSpec.vo)
0359
0360
0361 loc_check_timeout_key = "DATA_CHECK_TIMEOUT_USER"
0362 loc_check_timeout_val = self.taskBufferIF.getConfigValue("anal_jobbroker", loc_check_timeout_key, "jedi", taskSpec.vo)
0363
0364
0365 element_map = dict()
0366 ddsList = set()
0367 complete_disk_ok = {}
0368 complete_tape_ok = {}
0369 true_complete_disk_ok = {}
0370 can_be_local_source = {}
0371 can_be_remote_source = {}
0372 list_of_complete_replica_locations = {}
0373 is_distributed_map = {}
0374 if inputChunk.getDatasets():
0375 for datasetSpec in inputChunk.getDatasets():
0376 datasetName = datasetSpec.datasetName
0377 isDistributed = None
0378 if datasetName not in self.dataSiteMap:
0379
0380 tmpLog.info(f"getting the list of sites where {datasetName} is available")
0381 (
0382 tmpSt,
0383 tmpRet,
0384 tmp_complete_disk_ok,
0385 tmp_complete_tape_ok,
0386 tmp_truly_complete_disk,
0387 tmp_can_be_local_source,
0388 tmp_can_be_remote_source,
0389 tmp_list_of_complete_replica_locations,
0390 ) = AtlasBrokerUtils.get_sites_with_data(
0391 self.get_unified_sites(scanSiteList),
0392 self.siteMapper,
0393 self.ddmIF,
0394 datasetName,
0395 element_map.get(datasetSpec.datasetName),
0396 max_missing_input_files,
0397 min_input_completeness,
0398 )
0399 if tmpSt in [Interaction.JEDITemporaryError, Interaction.JEDITimeoutError]:
0400 tmpLog.error(f"temporary failed to get the list of sites where data is available, since {tmpRet}")
0401 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0402 return retTmpError
0403 if tmpSt == Interaction.JEDIFatalError:
0404 tmpLog.error(f"fatal error when getting the list of sites where data is available, since {tmpRet}")
0405 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0406 return retFatal
0407
0408 self.dataSiteMap[datasetName] = tmpRet
0409 complete_disk_ok[datasetName] = tmp_complete_disk_ok
0410 complete_tape_ok[datasetName] = tmp_complete_tape_ok
0411 true_complete_disk_ok[datasetName] = tmp_truly_complete_disk
0412 can_be_local_source[datasetName] = tmp_can_be_local_source
0413 can_be_remote_source[datasetName] = tmp_can_be_remote_source
0414 list_of_complete_replica_locations[datasetName] = tmp_list_of_complete_replica_locations
0415 if datasetName.startswith("ddo"):
0416 tmpLog.debug(f" {len(tmpRet)} sites")
0417 else:
0418 tmpLog.debug(f" {len(tmpRet)} sites : {str(tmpRet)}")
0419
0420 if tmpRet != {}:
0421 isDistributed = True
0422 for tmpMap in tmpRet.values():
0423 for tmpVal in tmpMap.values():
0424 if tmpVal["state"] == "complete":
0425 isDistributed = False
0426 break
0427 if not isDistributed:
0428 break
0429 if isDistributed or datasetName.endswith("/"):
0430
0431 isDistributed = self.ddmIF.isDistributedDataset(datasetName)
0432 if isDistributed or datasetName.endswith("/"):
0433 isDistributed = True
0434 tmpLog.debug(f" {datasetName} is distributed")
0435 ddsList.add(datasetName)
0436
0437 useVP = False
0438 avoidVP = True
0439 tmp_rse_list = ",".join(list_of_complete_replica_locations[datasetName])
0440 is_distributed_map[datasetName] = isDistributed
0441 tmpLog.info(
0442 f"replica_availability disk:{complete_disk_ok[datasetName]} tape:{complete_tape_ok[datasetName]}, is_distributed:{isDistributed}, remote_readable:{can_be_remote_source[datasetName]}, rses={tmp_rse_list}"
0443 )
0444
0445 if not complete_disk_ok[datasetName] and not complete_tape_ok[datasetName] and isDistributed is not True:
0446 err_msg = f"{datasetName} is "
0447 if list_of_complete_replica_locations[datasetName]:
0448 tmp_is_single = len(list_of_complete_replica_locations[datasetName]) == 1
0449 err_msg += f"only complete at {tmp_rse_list} which "
0450 err_msg += "is " if tmp_is_single else "are "
0451 err_msg += "currently in downtime or offline. "
0452 else:
0453 err_msg += "incomplete at online storage. "
0454 if not taskSpec.allow_incomplete_input():
0455 tmpLog.error(err_msg)
0456 taskSpec.setErrDiag(err_msg)
0457 retVal = retTmpError
0458 return retVal
0459 else:
0460 err_msg += "However, the task allows incomplete input."
0461 tmpLog.info(err_msg)
0462
0463
0464 remote_source_available = True
0465 remote_source_msg = ""
0466 for tmp_dataset_name, tmp_ok in can_be_remote_source.items():
0467 is_distributed = is_distributed_map.get(tmp_dataset_name, None)
0468 if not tmp_ok and is_distributed is not True:
0469 remote_source_msg = f"data locality cannot be ignored since {tmp_dataset_name} is unreadable over WAN"
0470 remote_source_available = False
0471 break
0472
0473
0474 scan_site_list_loops = [(copy.copy(scanSiteList), True)]
0475 to_ignore_data_loc = False
0476 if len(inputChunk.getDatasets()) > 0:
0477 nRealDS = 0
0478 for datasetSpec in inputChunk.getDatasets():
0479 if not datasetSpec.isPseudo():
0480 nRealDS += 1
0481 task_prio_cutoff_for_input_data_motion = 2000
0482 tmp_msg = "ignoring input data locality in the second loop due to "
0483 if taskSpec.taskPriority >= task_prio_cutoff_for_input_data_motion:
0484 to_ignore_data_loc = True
0485 tmp_msg += f"high taskPriority {taskSpec.taskPriority} is larger than or equal to {task_prio_cutoff_for_input_data_motion}"
0486 elif io_intensity_cutoff and taskSpec.ioIntensity and io_intensity_cutoff >= taskSpec.ioIntensity:
0487 to_ignore_data_loc = True
0488 tmp_msg += f"low ioIntensity {taskSpec.ioIntensity} is less than or equal to {io_intensity_key} ({io_intensity_cutoff})"
0489 elif loc_check_timeout_val and taskSpec.frozenTime and naive_utcnow() - taskSpec.frozenTime > datetime.timedelta(hours=loc_check_timeout_val):
0490 to_ignore_data_loc = True
0491 tmp_msg += "check timeout (last successful cycle at {} was more than {} ({}hrs) ago)".format(
0492 taskSpec.frozenTime, loc_check_timeout_key, loc_check_timeout_val
0493 )
0494 if not remote_source_available:
0495 tmpLog.info(remote_source_msg)
0496 elif to_ignore_data_loc:
0497 tmpLog.info(tmp_msg)
0498 scan_site_list_loops.append((copy.copy(scanSiteList), False))
0499 elif taskSpec.taskPriority > 1000 or nRealDS > 1 or taskSpec.getNumSitesPerJob() > 0:
0500
0501 scan_site_list_loops.append((copy.copy(scanSiteList), False))
0502
0503 for datasetSpec in inputChunk.getDatasets():
0504 if datasetSpec.datasetName.endswith("/"):
0505 file_list = [f.lfn for f in datasetSpec.Files]
0506 element_map[datasetSpec.datasetName] = self.taskBufferIF.get_origin_datasets(taskSpec.jediTaskID, datasetSpec.datasetName, file_list)
0507
0508 retVal = None
0509 checkDataLocality = False
0510 scanSiteWoVP = []
0511 summaryList = []
0512 site_list_with_data = None
0513 overall_site_list = set()
0514 for i_loop, (scanSiteList, checkDataLocality) in enumerate(scan_site_list_loops):
0515 useUnionLocality = False
0516 self.init_summary_list("Job brokerage summary", f"data locality check: {checkDataLocality}", self.get_unified_sites(scanSiteList))
0517 if checkDataLocality:
0518 tmpLog.info("!!! look for candidates WITH data locality check")
0519 else:
0520 tmpLog.info("!!! look for candidates WITHOUT data locality check")
0521
0522
0523 hasDDS = False
0524 dataWeight = {}
0525 remoteSourceList = {}
0526 sites_in_nucleus = []
0527 for datasetSpec in inputChunk.getDatasets():
0528 datasetSpec.reset_distributed()
0529
0530 if inputChunk.getDatasets() != [] and checkDataLocality:
0531 oldScanSiteList = copy.copy(scanSiteList)
0532 oldScanUnifiedSiteList = self.get_unified_sites(oldScanSiteList)
0533
0534 if ddsList:
0535 hasDDS = True
0536
0537 for datasetSpec in inputChunk.getDatasets():
0538 datasetName = datasetSpec.datasetName
0539 if datasetName in ddsList:
0540 datasetSpec.setDistributed()
0541
0542
0543 scanSiteList = None
0544 scanSiteListOnDisk = None
0545 scanSiteListUnion = None
0546 scanSiteListOnDiskUnion = None
0547 scanSiteWoVpUnion = None
0548
0549 for datasetName, tmpDataSite in self.dataSiteMap.items():
0550
0551 if datasetName in ddsList:
0552 useIncomplete = True
0553 elif true_complete_disk_ok.get(datasetName) is False:
0554 useIncomplete = True
0555 else:
0556 useIncomplete = False
0557
0558 tmpSiteList = AtlasBrokerUtils.getAnalSitesWithDataDisk(tmpDataSite, includeTape=True, use_incomplete=useIncomplete)
0559 tmpDiskSiteList = AtlasBrokerUtils.getAnalSitesWithDataDisk(tmpDataSite, includeTape=False, use_vp=useVP, use_incomplete=useIncomplete)
0560 tmpNonVpSiteList = AtlasBrokerUtils.getAnalSitesWithDataDisk(tmpDataSite, includeTape=True, use_vp=False, use_incomplete=useIncomplete)
0561
0562 for tmpSiteName in tmpSiteList:
0563 if tmpSiteName not in dataWeight:
0564 dataWeight[tmpSiteName] = 0
0565
0566 if tmpSiteName in tmpDiskSiteList:
0567 dataWeight[tmpSiteName] += 1
0568 else:
0569 dataWeight[tmpSiteName] += 0.001
0570
0571
0572 if scanSiteList is None:
0573 scanSiteList = []
0574 for tmpSiteName in tmpSiteList:
0575 if tmpSiteName not in oldScanUnifiedSiteList:
0576 continue
0577 if tmpSiteName not in scanSiteList:
0578 scanSiteList.append(tmpSiteName)
0579 scanSiteListOnDisk = set()
0580 for tmpSiteName in tmpDiskSiteList:
0581 if tmpSiteName not in oldScanUnifiedSiteList:
0582 continue
0583 scanSiteListOnDisk.add(tmpSiteName)
0584 scanSiteWoVP = tmpNonVpSiteList
0585 scanSiteListUnion = set(scanSiteList)
0586 scanSiteListOnDiskUnion = set(scanSiteListOnDisk)
0587 scanSiteWoVpUnion = set(scanSiteWoVP)
0588 else:
0589
0590 newScanList = []
0591 for tmpSiteName in tmpSiteList:
0592 if tmpSiteName in scanSiteList and tmpSiteName not in newScanList:
0593 newScanList.append(tmpSiteName)
0594 scanSiteListUnion.add(tmpSiteName)
0595 scanSiteList = newScanList
0596 tmpLog.debug(f"{datasetName} is available at {len(scanSiteList)} sites")
0597
0598 newScanListOnDisk = set()
0599 for tmpSiteName in tmpDiskSiteList:
0600 if tmpSiteName in scanSiteListOnDisk:
0601 newScanListOnDisk.add(tmpSiteName)
0602 scanSiteListOnDiskUnion.add(tmpSiteName)
0603 scanSiteListOnDisk = newScanListOnDisk
0604
0605 scanSiteWoVP = list(set(scanSiteWoVP).intersection(tmpNonVpSiteList))
0606 scanSiteWoVpUnion = scanSiteWoVpUnion.union(tmpNonVpSiteList)
0607 tmpLog.info(
0608 f"{datasetName} is available at {len(scanSiteList)} sites ({len(scanSiteListOnDisk)} sites on DISK). complete disk replica: {true_complete_disk_ok[datasetName]}, complete tape replica: {complete_tape_ok[datasetName]}"
0609 )
0610
0611 if sitePreAssigned:
0612 if preassignedSite not in scanSiteList and preassignedSite not in scanSiteListUnion:
0613 scanSiteList = []
0614 tmpLog.info(f"data is unavailable locally or remotely at preassigned site {preassignedSite}")
0615 elif preassignedSite not in scanSiteList:
0616 scanSiteList = list(scanSiteListUnion)
0617 elif len(scanSiteListOnDisk) > 0:
0618
0619 scanSiteList = list(scanSiteListOnDisk)
0620 elif not scanSiteList and scanSiteListUnion:
0621 tmpLog.info("use union list for data locality check since no site has all data")
0622 if scanSiteListOnDiskUnion:
0623 scanSiteList = list(scanSiteListOnDiskUnion)
0624 elif scanSiteListUnion:
0625 scanSiteList = list(scanSiteListUnion)
0626 scanSiteWoVP = list(scanSiteWoVpUnion)
0627 useUnionLocality = True
0628
0629 avoidVP = True
0630 scanSiteList = self.get_pseudo_sites(scanSiteList, oldScanSiteList)
0631
0632 for tmpSiteName in oldScanSiteList:
0633 if tmpSiteName not in scanSiteList:
0634 pass
0635 sites_in_nucleus = copy.copy(scanSiteList)
0636 self.add_summary_message(oldScanSiteList, scanSiteList, "input data check", tmpLog, {})
0637 if not scanSiteList:
0638 self.dump_summary(tmpLog)
0639 tmpLog.error("no candidates")
0640 retVal = retTmpError
0641 continue
0642
0643
0644
0645 newScanSiteList = []
0646 oldScanSiteList = copy.copy(scanSiteList)
0647 msg_map = {}
0648 for tmpSiteName in scanSiteList:
0649 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0650
0651 if tmpSiteSpec.is_unified:
0652 continue
0653
0654 skipFlag = False
0655 if tmpSiteSpec.status in ["offline"]:
0656 skipFlag = True
0657 elif tmpSiteSpec.status in ["brokeroff", "test"]:
0658 if siteListPreAssigned:
0659 pass
0660 elif not sitePreAssigned:
0661 skipFlag = True
0662 elif preassignedSite not in [tmpSiteName, tmpSiteSpec.get_unified_name()]:
0663 skipFlag = True
0664 if not skipFlag:
0665 newScanSiteList.append(tmpSiteName)
0666 else:
0667 tmp_unified_name = tmpSiteSpec.get_unified_name()
0668 msg_map[tmp_unified_name] = f" skip site={tmp_unified_name} due to status={tmpSiteSpec.status} criteria=-status"
0669 scanSiteList = newScanSiteList
0670 self.add_summary_message(oldScanSiteList, scanSiteList, "status check", tmpLog, msg_map)
0671 if not scanSiteList:
0672 self.dump_summary(tmpLog)
0673 tmpLog.error("no candidates")
0674 retVal = retTmpError
0675 continue
0676
0677
0678
0679 if not sitePreAssigned:
0680 newScanSiteList = []
0681 oldScanSiteList = copy.copy(scanSiteList)
0682 msg_map = {}
0683 for tmpSiteName in scanSiteList:
0684 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0685
0686 if AtlasBrokerUtils.hasZeroShare(tmpSiteSpec, taskSpec, inputChunk.isMerging, tmpLog):
0687 msg_map[tmpSiteSpec.get_unified_name()] = f" skip site={tmpSiteSpec.get_unified_name()} due to zero share criteria=-zeroshare"
0688 continue
0689 newScanSiteList.append(tmpSiteName)
0690 scanSiteList = newScanSiteList
0691 self.add_summary_message(oldScanSiteList, scanSiteList, "zero share check", tmpLog, msg_map)
0692 if not scanSiteList:
0693 self.dump_summary(tmpLog)
0694 tmpLog.error("no candidates")
0695 retVal = retTmpError
0696 continue
0697
0698
0699
0700
0701 max_diskio_per_core_default = self.taskBufferIF.getConfigValue(COMPONENT, "MAX_DISKIO_DEFAULT", APP, VO)
0702 if not max_diskio_per_core_default:
0703 max_diskio_per_core_default = 10**10
0704
0705
0706 diskio_percore_usage = self.taskBufferIF.getAvgDiskIO_JEDI()
0707 unified_site_list = self.get_unified_sites(scanSiteList)
0708 newScanSiteList = []
0709 oldScanSiteList = copy.copy(scanSiteList)
0710 msg_map = {}
0711 for tmpSiteName in unified_site_list:
0712 tmp_site_spec = self.siteMapper.getSite(tmpSiteName)
0713
0714
0715 diskio_usage_tmp = diskio_percore_usage.get(tmpSiteName, 0)
0716
0717
0718 if tmp_site_spec.maxDiskio and tmp_site_spec.maxDiskio > 0:
0719
0720 diskio_limit_tmp = tmp_site_spec.maxDiskio
0721 else:
0722
0723 diskio_limit_tmp = max_diskio_per_core_default
0724
0725
0726 diskio_task_tmp = taskSpec.diskIO
0727 if taskSpec.diskIO is not None and taskSpec.coreCount not in [None, 0, 1] and tmp_site_spec.coreCount not in [None, 0]:
0728 diskio_task_tmp = taskSpec.diskIO / tmp_site_spec.coreCount
0729
0730 try:
0731 log_msg = f"diskIO measurements: site={tmpSiteName} jediTaskID={taskSpec.jediTaskID} "
0732 if diskio_task_tmp is not None:
0733 log_msg += f"diskIO_task={diskio_task_tmp:.2f} "
0734 if diskio_usage_tmp is not None:
0735 log_msg += f"diskIO_site_usage={diskio_usage_tmp:.2f} "
0736 if diskio_limit_tmp is not None:
0737 log_msg += f"diskIO_site_limit={diskio_limit_tmp:.2f} "
0738
0739 except Exception:
0740 tmpLog.debug("diskIO measurements: Error generating diskIO message")
0741
0742
0743 if diskio_task_tmp and diskio_usage_tmp and diskio_limit_tmp and diskio_usage_tmp > diskio_limit_tmp and diskio_task_tmp > diskio_limit_tmp:
0744 msg_map[tmpSiteName] = f" skip site={tmpSiteName} due to diskIO overload criteria=-diskIO"
0745 continue
0746
0747 newScanSiteList.append(tmpSiteName)
0748
0749 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0750
0751 self.add_summary_message(oldScanSiteList, scanSiteList, "diskIO check", tmpLog, msg_map)
0752 if not scanSiteList:
0753 self.dump_summary(tmpLog)
0754 tmpLog.error("no candidates")
0755 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0756 retVal = retTmpError
0757 continue
0758
0759
0760
0761 if taskSpec.avoid_vp() or avoidVP or not checkDataLocality:
0762 newScanSiteList = []
0763 oldScanSiteList = copy.copy(scanSiteList)
0764 msg_map = {}
0765 for tmpSiteName in scanSiteList:
0766 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0767 if not tmpSiteSpec.use_vp(JobUtils.ANALY_PS):
0768 newScanSiteList.append(tmpSiteName)
0769 else:
0770 msg_map[tmpSiteSpec.get_unified_name()] = f" skip site={tmpSiteSpec.get_unified_name()} to avoid VP"
0771 scanSiteList = newScanSiteList
0772 self.add_summary_message(oldScanSiteList, scanSiteList, "avoid VP queue check", tmpLog, msg_map)
0773 if not scanSiteList:
0774 self.dump_summary(tmpLog)
0775 tmpLog.error("no candidates")
0776 retVal = retTmpError
0777 continue
0778
0779
0780 newScanSiteList = []
0781 oldScanSiteList = copy.copy(scanSiteList)
0782 msg_map = {}
0783 max_core_count = taskSpec.get_max_core_count()
0784 for tmpSiteName in scanSiteList:
0785 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0786
0787 if useMP == "any" or (useMP == "only" and tmpSiteSpec.coreCount > 1) or (useMP == "unuse" and tmpSiteSpec.coreCount in [0, 1, None]):
0788 if max_core_count and tmpSiteSpec.coreCount and tmpSiteSpec.coreCount > max_core_count:
0789 msg_map[tmpSiteSpec.get_unified_name()] = (
0790 f" skip site={tmpSiteSpec.get_unified_name()} due to larger core count "
0791 f"site:{tmpSiteSpec.coreCount} than task_max={max_core_count} criteria=-max_cpucore"
0792 )
0793 else:
0794 newScanSiteList.append(tmpSiteName)
0795 else:
0796 msg_map[tmpSiteSpec.get_unified_name()] = " skip site=%s due to core mismatch cores_site=%s <> cores_task=%s criteria=-cpucore" % (
0797 tmpSiteSpec.get_unified_name(),
0798 tmpSiteSpec.coreCount,
0799 taskSpec.coreCount,
0800 )
0801 scanSiteList = newScanSiteList
0802 self.add_summary_message(oldScanSiteList, scanSiteList, "CPU core check", tmpLog, msg_map)
0803 if not scanSiteList:
0804 self.dump_summary(tmpLog)
0805 tmpLog.error("no candidates")
0806 retVal = retTmpError
0807 continue
0808
0809
0810 newScanSiteList = []
0811 oldScanSiteList = copy.copy(scanSiteList)
0812 msg_map = {}
0813 jsonCheck = AtlasBrokerUtils.JsonSoftwareCheck(self.siteMapper, self.sw_map, self.architecture_level_map)
0814 for tmpSiteName in scanSiteList:
0815 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0816 if tmpSiteSpec.isGPU() and not taskSpec.is_hpo_workflow():
0817 if taskSpec.get_sw_platform() in ["", None]:
0818 msg_map[tmpSiteSpec.get_unified_name()] = f" skip site={tmpSiteSpec.get_unified_name()} since architecture is required for GPU queues"
0819 continue
0820 siteListWithCMTCONFIG = [tmpSiteSpec.get_unified_name()]
0821 siteListWithCMTCONFIG, sitesNoJsonCheck, _ = jsonCheck.check(
0822 siteListWithCMTCONFIG, None, None, None, taskSpec.get_sw_platform(), False, True
0823 )
0824
0825 if len(siteListWithCMTCONFIG) == 0:
0826 msg_map[tmpSiteSpec.get_unified_name()] = (
0827 f" skip site={tmpSiteSpec.get_unified_name()} since architecture={taskSpec.get_sw_platform()} is unavailable"
0828 )
0829 continue
0830
0831 newScanSiteList.append(tmpSiteName)
0832 scanSiteList = newScanSiteList
0833 self.add_summary_message(oldScanSiteList, scanSiteList, "architecture check", tmpLog, msg_map)
0834 if not scanSiteList:
0835 self.dump_summary(tmpLog)
0836 tmpLog.error("no candidates")
0837 retVal = retTmpError
0838 continue
0839
0840
0841 if not sitePreAssigned and not inputChunk.isMerging:
0842 oldScanSiteList = copy.copy(scanSiteList)
0843 newScanSiteList = []
0844 msg_map = {}
0845 for tmpSiteName in self.get_unified_sites(scanSiteList):
0846 if tmpSiteName in failureCounts and "closed" in failureCounts[tmpSiteName]:
0847 nClosed = failureCounts[tmpSiteName]["closed"]
0848 if nClosed > 0:
0849 msg_map[tmpSiteName] = f" skip site={tmpSiteName} due to n_closed={nClosed} criteria=-closed"
0850 continue
0851 newScanSiteList.append(tmpSiteName)
0852 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
0853 self.add_summary_message(oldScanSiteList, scanSiteList, "too many closed check", tmpLog, msg_map)
0854 if not scanSiteList:
0855 self.dump_summary(tmpLog)
0856 tmpLog.error("no candidates")
0857 retVal = retTmpError
0858 continue
0859
0860
0861 cmt_config = taskSpec.get_sw_platform()
0862 is_regexp_cmt_config = False
0863 if cmt_config:
0864 if re.match(cmt_config, cmt_config) is None:
0865 is_regexp_cmt_config = True
0866 base_platform = taskSpec.get_base_platform()
0867 resolved_platforms = {}
0868
0869 host_cpu_spec = taskSpec.get_host_cpu_spec()
0870 host_cpu_pref = taskSpec.get_host_cpu_preference()
0871 host_gpu_spec = taskSpec.get_host_gpu_spec()
0872
0873 if not sitePreAssigned:
0874 unified_site_list = self.get_unified_sites(scanSiteList)
0875 if taskSpec.transHome is not None:
0876 transHome = taskSpec.transHome
0877 else:
0878 transHome = ""
0879
0880 transHome = re.sub("^[^-]+-*", "", transHome)
0881 transHome = re.sub("_", "-", transHome)
0882 if (
0883 re.search("rel_\d+(\n|$)", transHome) is None
0884 and taskSpec.transHome not in ["AnalysisTransforms", None]
0885 and re.search("\d{4}-\d{2}-\d{2}T\d{4}$", transHome) is None
0886 and re.search("-\d+\.\d+\.\d+$", transHome) is None
0887 ):
0888
0889 siteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0890 unified_site_list,
0891 "atlas",
0892 transHome.split("-")[0],
0893 transHome.split("-")[1],
0894 taskSpec.get_sw_platform(),
0895 False,
0896 False,
0897 container_name=taskSpec.container_name,
0898 only_tags_fc=taskSpec.use_only_tags_fc(),
0899 host_cpu_specs=host_cpu_spec,
0900 host_cpu_pref=host_cpu_pref,
0901 host_gpu_spec=host_gpu_spec,
0902 log_stream=tmpLog,
0903 )
0904 sitesAuto = copy.copy(siteListWithSW)
0905
0906 elif (transHome == "" and taskSpec.transUses is not None) or (
0907 re.search("-\d+\.\d+\.\d+$", transHome) is not None and (taskSpec.transUses is None or re.search("-\d+\.\d+$", taskSpec.transUses) is None)
0908 ):
0909 siteListWithSW = []
0910 sitesNoJsonCheck = unified_site_list
0911
0912 if taskSpec.transUses is not None:
0913 transUses = taskSpec.transUses.split("-")[-1]
0914 else:
0915 transUses = None
0916 if transUses is not None:
0917
0918 tmpSiteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0919 unified_site_list,
0920 "atlas",
0921 "AtlasOffline",
0922 transUses,
0923 taskSpec.get_sw_platform(),
0924 False,
0925 False,
0926 container_name=taskSpec.container_name,
0927 only_tags_fc=taskSpec.use_only_tags_fc(),
0928 host_cpu_specs=host_cpu_spec,
0929 host_gpu_spec=host_gpu_spec,
0930 log_stream=tmpLog,
0931 )
0932 siteListWithSW += tmpSiteListWithSW
0933 if len(transHome.split("-")) == 2:
0934 tmpSiteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0935 sitesNoJsonCheck,
0936 "atlas",
0937 transHome.split("-")[0],
0938 transHome.split("-")[1],
0939 taskSpec.get_sw_platform(),
0940 False,
0941 False,
0942 container_name=taskSpec.container_name,
0943 only_tags_fc=taskSpec.use_only_tags_fc(),
0944 host_cpu_specs=host_cpu_spec,
0945 host_gpu_spec=host_gpu_spec,
0946 log_stream=tmpLog,
0947 )
0948 siteListWithSW += tmpSiteListWithSW
0949 sitesAuto = copy.copy(siteListWithSW)
0950
0951 else:
0952
0953 if taskSpec.transHome is not None:
0954
0955 siteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0956 unified_site_list,
0957 "nightlies",
0958 None,
0959 None,
0960 taskSpec.get_sw_platform(),
0961 True,
0962 False,
0963 container_name=taskSpec.container_name,
0964 only_tags_fc=taskSpec.use_only_tags_fc(),
0965 host_cpu_specs=host_cpu_spec,
0966 host_gpu_spec=host_gpu_spec,
0967 log_stream=tmpLog,
0968 )
0969 sitesAuto = copy.copy(siteListWithSW)
0970
0971 else:
0972
0973 siteListWithSW, sitesNoJsonCheck, _ = jsonCheck.check(
0974 unified_site_list,
0975 None,
0976 None,
0977 None,
0978 taskSpec.get_sw_platform(),
0979 False,
0980 True,
0981 container_name=taskSpec.container_name,
0982 only_tags_fc=taskSpec.use_only_tags_fc(),
0983 host_cpu_specs=host_cpu_spec,
0984 host_gpu_spec=host_gpu_spec,
0985 log_stream=tmpLog,
0986 )
0987 sitesAuto = copy.copy(siteListWithSW)
0988
0989 newScanSiteList = []
0990 oldScanSiteList = copy.copy(scanSiteList)
0991 sitesAny = []
0992 msg_map = {}
0993 for tmpSiteName in unified_site_list:
0994 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0995 if cmt_config:
0996 platforms = AtlasBrokerUtils.resolve_cmt_config(tmpSiteName, cmt_config, base_platform, self.sw_map)
0997 if platforms:
0998 resolved_platforms[tmpSiteName] = platforms
0999 if tmpSiteName in siteListWithSW:
1000
1001 if not is_regexp_cmt_config or tmpSiteName in resolved_platforms:
1002 newScanSiteList.append(tmpSiteName)
1003 else:
1004
1005 msg_map[tmpSiteName] = f" skip site={tmpSiteName} due to unresolved regexp in cmtconfig={cmt_config} criteria=-regexpcmtconfig"
1006 elif host_cpu_spec is None and host_gpu_spec is None and tmpSiteSpec.releases == ["ANY"]:
1007
1008 newScanSiteList.append(tmpSiteName)
1009 sitesAny.append(tmpSiteName)
1010 else:
1011
1012 msg_map[tmpSiteName] = (
1013 f" skip site={tmpSiteName} due to missing SW cache={taskSpec.transHome}:{taskSpec.get_sw_platform()} container_name='{taskSpec.container_name}' "
1014 f"or irrelevant HW cpu={str(host_cpu_spec)} gpu={str(host_gpu_spec)} criteria=-cache"
1015 )
1016 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1017 self.add_summary_message(oldScanSiteList, scanSiteList, "SW/HW check", tmpLog, msg_map)
1018 tmpLog.info(f" {len(sitesAuto)} with AUTO, {len(sitesAny)} with ANY")
1019 if not scanSiteList:
1020 self.dump_summary(tmpLog)
1021 tmpLog.error("no candidates")
1022 retVal = retTmpError
1023 continue
1024
1025
1026 origMinRamCount = inputChunk.getMaxRamCount()
1027 if origMinRamCount not in [0, None]:
1028 newScanSiteList = []
1029 oldScanSiteList = copy.copy(scanSiteList)
1030 msg_map = {}
1031 for tmpSiteName in scanSiteList:
1032 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1033
1034 minRamCount = origMinRamCount
1035 if taskSpec.ramPerCore() and not inputChunk.isMerging:
1036 if tmpSiteSpec.coreCount not in [None, 0]:
1037 minRamCount = origMinRamCount * tmpSiteSpec.coreCount
1038 minRamCount = JobUtils.compensate_ram_count(minRamCount)
1039
1040 site_maxmemory = 0
1041 if tmpSiteSpec.maxrss not in [0, None]:
1042 site_maxmemory = tmpSiteSpec.maxrss
1043 if site_maxmemory not in [0, None] and minRamCount != 0 and minRamCount > site_maxmemory:
1044 msg_map[tmpSiteSpec.get_unified_name()] = (
1045 f" skip site={tmpSiteSpec.get_unified_name()} due to insufficient RAM less than job's core-scaled requirement {minRamCount} MB criteria=-lowmemory"
1046 )
1047 continue
1048
1049 site_minmemory = 0
1050 if tmpSiteSpec.minrss not in [0, None]:
1051 site_minmemory = tmpSiteSpec.minrss
1052 if site_minmemory not in [0, None] and minRamCount != 0 and minRamCount < site_minmemory:
1053 msg_map[tmpSiteSpec.get_unified_name()] = (
1054 f" skip site={tmpSiteSpec.get_unified_name()} due to RAM lower limit greater than job's core-scaled requirement {minRamCount} MB criteria=-highmemory"
1055 )
1056 continue
1057 newScanSiteList.append(tmpSiteName)
1058 scanSiteList = newScanSiteList
1059 ramUnit = taskSpec.ramUnit
1060 if ramUnit is None:
1061 ramUnit = "MB"
1062 self.add_summary_message(oldScanSiteList, scanSiteList, "memory check", tmpLog, msg_map)
1063 if not scanSiteList:
1064 self.dump_summary(tmpLog)
1065 tmpLog.error("no candidates")
1066 retVal = retTmpError
1067 continue
1068
1069
1070 tmpMaxAtomSize = inputChunk.getMaxAtomSize()
1071 if not inputChunk.isMerging:
1072 tmpEffAtomSize = inputChunk.getMaxAtomSize(effectiveSize=True)
1073 tmpOutDiskSize = taskSpec.getOutDiskSize()
1074 tmpWorkDiskSize = taskSpec.getWorkDiskSize()
1075 minDiskCountS = tmpOutDiskSize * tmpEffAtomSize + tmpWorkDiskSize + tmpMaxAtomSize
1076 minDiskCountS = minDiskCountS // 1024 // 1024
1077 maxSizePerJob = taskSpec.getMaxSizePerJob()
1078 if maxSizePerJob is None:
1079 maxSizePerJob = None
1080 else:
1081 maxSizePerJob //= 1024 * 1024
1082
1083 minDiskCountR = tmpOutDiskSize * tmpEffAtomSize + tmpWorkDiskSize
1084 minDiskCountR = minDiskCountR // 1024 // 1024
1085 tmpLog.info(f"maxAtomSize={tmpMaxAtomSize} effectiveAtomSize={tmpEffAtomSize} outDiskCount={tmpOutDiskSize} workDiskSize={tmpWorkDiskSize}")
1086 else:
1087 maxSizePerJob = None
1088 minDiskCountS = 2 * tmpMaxAtomSize // 1024 // 1024
1089 minDiskCountR = "NA"
1090 tmpLog.info(f"minDiskCountScratch={minDiskCountS} minDiskCountRemote={minDiskCountR} nGBPerJobInMB={maxSizePerJob}")
1091 newScanSiteList = []
1092 oldScanSiteList = copy.copy(scanSiteList)
1093 msg_map = {}
1094 for tmpSiteName in self.get_unified_sites(scanSiteList):
1095 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1096
1097 if tmpSiteSpec.maxwdir:
1098 if CoreUtils.use_direct_io_for_job(taskSpec, tmpSiteSpec, inputChunk):
1099 minDiskCount = minDiskCountR
1100 if maxSizePerJob is not None and not taskSpec.useLocalIO():
1101 tmpMinDiskCountR = tmpOutDiskSize * maxSizePerJob + tmpWorkDiskSize
1102 tmpMinDiskCountR /= 1024 * 1024
1103 if tmpMinDiskCountR > minDiskCount:
1104 minDiskCount = tmpMinDiskCountR
1105 else:
1106 minDiskCount = minDiskCountS
1107 if maxSizePerJob is not None and maxSizePerJob > minDiskCount:
1108 minDiskCount = maxSizePerJob
1109
1110
1111 if tmpSiteSpec.coreCount in [None, 0, 1]:
1112 site_cc = 1
1113 else:
1114 site_cc = tmpSiteSpec.coreCount
1115
1116 if taskSpec.coreCount in [None, 0, 1]:
1117 task_cc = 1
1118 else:
1119 task_cc = site_cc
1120
1121 maxwdir_scaled = tmpSiteSpec.maxwdir * task_cc / site_cc
1122
1123 if minDiskCount > maxwdir_scaled:
1124 msg_map[tmpSiteName] = f" skip site={tmpSiteName} due to small scratch disk={maxwdir_scaled} < {minDiskCount} criteria=-disk"
1125 continue
1126 newMaxwdir[tmpSiteName] = maxwdir_scaled
1127 newScanSiteList.append(tmpSiteName)
1128 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1129 self.add_summary_message(oldScanSiteList, scanSiteList, "scratch disk check", tmpLog, msg_map)
1130 if not scanSiteList:
1131 self.dump_summary(tmpLog)
1132 tmpLog.error("no candidates")
1133 retVal = retTmpError
1134 continue
1135
1136
1137 newScanSiteList = []
1138 oldScanSiteList = copy.copy(scanSiteList)
1139 msg_map = {}
1140 for tmpSiteName in self.get_unified_sites(scanSiteList):
1141
1142 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1143 scope_input, scope_output = select_scope(tmpSiteSpec, JobUtils.ANALY_PS, JobUtils.ANALY_PS)
1144 if scope_output not in tmpSiteSpec.ddm_endpoints_output:
1145 msg_map[tmpSiteName] = f" skip site={tmpSiteName} since {scope_output} output endpoint undefined criteria=-disk"
1146 continue
1147 tmp_output_endpoint = tmpSiteSpec.ddm_endpoints_output[scope_output].getEndPoint(tmpSiteSpec.ddm_output[scope_output])
1148 if tmp_output_endpoint is not None:
1149
1150 diskThreshold = 200
1151 tmpSpaceSize = 0
1152 if tmp_output_endpoint["space_expired"] is not None:
1153 tmpSpaceSize += tmp_output_endpoint["space_expired"]
1154 if tmp_output_endpoint["space_free"] is not None:
1155 tmpSpaceSize += tmp_output_endpoint["space_free"]
1156 if (
1157 tmpSpaceSize < diskThreshold and "skip_RSE_check" not in tmpSiteSpec.catchall
1158 ):
1159 msg_map[tmpSiteName] = f" skip site={tmpSiteName} due to disk shortage in SE {tmpSpaceSize} < {diskThreshold}GB criteria=-disk"
1160 continue
1161
1162 tmp_msg = AtlasBrokerUtils.check_endpoints_with_blacklist(tmpSiteSpec, scope_input, scope_output, sites_in_nucleus, remote_source_available)
1163 if tmp_msg is not None:
1164 msg_map[tmpSiteName] = f" skip site={tmpSiteName} since {tmpSiteSpec.ddm_output[scope_output]} is blacklisted in DDM criteria=-blacklist"
1165 continue
1166
1167 if tmpSiteSpec.ddm_output[scope_output] in endpoints_over_local_quota:
1168 msg_map[tmpSiteName] = f" skip site={tmpSiteName} since {tmpSiteSpec.ddm_output[scope_output]} is over local quota criteria=-local_quota"
1169 continue
1170 newScanSiteList.append(tmpSiteName)
1171 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1172 self.add_summary_message(oldScanSiteList, scanSiteList, "storage space check", tmpLog, msg_map)
1173 if not scanSiteList:
1174 self.dump_summary(tmpLog)
1175 tmpLog.error("no candidates")
1176 retVal = retTmpError
1177 continue
1178
1179
1180 if not taskSpec.useHS06():
1181 tmpMaxAtomSize = inputChunk.getMaxAtomSize(effectiveSize=True)
1182 if taskSpec.walltime is not None:
1183 minWalltime = taskSpec.walltime * tmpMaxAtomSize
1184 else:
1185 minWalltime = None
1186 strMinWalltime = f"walltime*inputSize={taskSpec.walltime}*{tmpMaxAtomSize}"
1187 else:
1188 tmpMaxAtomSize = inputChunk.getMaxAtomSize(getNumEvents=True)
1189 if taskSpec.getCpuTime() is not None:
1190 minWalltime = taskSpec.getCpuTime() * tmpMaxAtomSize
1191 else:
1192 minWalltime = None
1193 strMinWalltime = f"cpuTime*nEventsPerJob={taskSpec.getCpuTime()}*{tmpMaxAtomSize}"
1194 if minWalltime and minWalltime > 0 and not inputChunk.isMerging:
1195 newScanSiteList = []
1196 oldScanSiteList = copy.copy(scanSiteList)
1197 msg_map = {}
1198 for tmpSiteName in scanSiteList:
1199 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1200 siteMaxTime = tmpSiteSpec.maxtime
1201 origSiteMaxTime = siteMaxTime
1202
1203 tmpSiteStr = f"{siteMaxTime}"
1204 if taskSpec.useHS06():
1205 oldSiteMaxTime = siteMaxTime
1206 siteMaxTime -= taskSpec.baseWalltime
1207 tmpSiteStr = f"({oldSiteMaxTime}-{taskSpec.baseWalltime})"
1208 if siteMaxTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1209 siteMaxTime *= tmpSiteSpec.coreCount
1210 tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1211 if taskSpec.useHS06():
1212 if siteMaxTime not in [None, 0]:
1213 siteMaxTime *= tmpSiteSpec.corepower
1214 tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1215 siteMaxTime *= float(taskSpec.cpuEfficiency) / 100.0
1216 siteMaxTime = int(siteMaxTime)
1217 tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1218 if origSiteMaxTime != 0 and minWalltime and minWalltime > siteMaxTime:
1219 tmpMsg = f" skip site={tmpSiteName} due to short site walltime {tmpSiteStr} (site upper limit) less than {strMinWalltime} "
1220 tmpMsg += "criteria=-shortwalltime"
1221 msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1222 continue
1223
1224 siteMinTime = tmpSiteSpec.mintime
1225 origSiteMinTime = siteMinTime
1226 tmpSiteStr = f"{siteMinTime}"
1227 if taskSpec.useHS06():
1228 oldSiteMinTime = siteMinTime
1229 siteMinTime -= taskSpec.baseWalltime
1230 tmpSiteStr = f"({oldSiteMinTime}-{taskSpec.baseWalltime})"
1231 if siteMinTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1232 siteMinTime *= tmpSiteSpec.coreCount
1233 tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1234 if taskSpec.useHS06():
1235 if siteMinTime not in [None, 0]:
1236 siteMinTime *= tmpSiteSpec.corepower
1237 tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1238 siteMinTime *= float(taskSpec.cpuEfficiency) / 100.0
1239 siteMinTime = int(siteMinTime)
1240 tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1241 if origSiteMinTime != 0 and (minWalltime is None or minWalltime < siteMinTime):
1242 tmpMsg = f" skip site {tmpSiteName} due to short job walltime {tmpSiteStr} (site lower limit) greater than {strMinWalltime} "
1243 tmpMsg += "criteria=-longwalltime"
1244 msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1245 continue
1246 newScanSiteList.append(tmpSiteName)
1247 scanSiteList = newScanSiteList
1248 if not taskSpec.useHS06():
1249 tmp_unit = taskSpec.walltimeUnit + "PerMB" if taskSpec.walltimeUnit else "kSI2ksecondsPerMB"
1250 tmpLog.info(f"walltime check with {strMinWalltime}({tmp_unit})")
1251 else:
1252 tmpLog.info(f"walltime check with {strMinWalltime}({taskSpec.cpuTimeUnit}*nEventsPerJob)")
1253 self.add_summary_message(oldScanSiteList, scanSiteList, "walltime check", tmpLog, msg_map)
1254 if not scanSiteList:
1255 self.dump_summary(tmpLog)
1256 tmpLog.error("no candidates")
1257 retVal = retTmpError
1258 continue
1259
1260
1261
1262
1263 if (
1264 (not sitePreAssigned and inputChunk.useScout())
1265 or (not taskSpec.walltime and not taskSpec.walltimeUnit and not taskSpec.cpuTimeUnit)
1266 or (not taskSpec.getCpuTime() and taskSpec.cpuTimeUnit)
1267 ):
1268 newScanSiteList = []
1269 oldScanSiteList = copy.copy(scanSiteList)
1270 msg_map = {}
1271 for tmpSiteName in scanSiteList:
1272 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1273 siteMaxTime = tmpSiteSpec.maxtime
1274 tmpSiteStr = f"{siteMaxTime}"
1275 if taskSpec.useHS06():
1276 oldSiteMaxTime = siteMaxTime
1277 siteMaxTime -= taskSpec.baseWalltime
1278 tmpSiteStr = f"({oldSiteMaxTime}-{taskSpec.baseWalltime})"
1279 if siteMaxTime not in [None, 0] and tmpSiteSpec.coreCount not in [None, 0]:
1280 siteMaxTime *= tmpSiteSpec.coreCount
1281 tmpSiteStr += f"*{tmpSiteSpec.coreCount}"
1282 if taskSpec.useHS06():
1283 if siteMaxTime not in [None, 0]:
1284 siteMaxTime *= tmpSiteSpec.corepower
1285 tmpSiteStr += f"*{tmpSiteSpec.corepower}"
1286 siteMaxTime *= float(taskSpec.cpuEfficiency) / 100.0
1287 siteMaxTime = int(siteMaxTime)
1288 tmpSiteStr += f"*{taskSpec.cpuEfficiency}%"
1289 minTimeForZeroWalltime = 24 * 60 * 60 * 10
1290 str_minTimeForZeroWalltime = "24hr*10HS06s"
1291 if tmpSiteSpec.coreCount not in [None, 0]:
1292 minTimeForZeroWalltime *= tmpSiteSpec.coreCount
1293 str_minTimeForZeroWalltime += f"*{tmpSiteSpec.coreCount}cores"
1294 if siteMaxTime != 0 and siteMaxTime < minTimeForZeroWalltime:
1295 tmpMsg = f" skip site={tmpSiteName} due to site walltime {tmpSiteStr} (site upper limit) insufficient "
1296 if inputChunk.useScout():
1297 tmpMsg += f"for scouts ({str_minTimeForZeroWalltime} at least) "
1298 tmpMsg += "criteria=-scoutwalltime"
1299 else:
1300 tmpMsg += f"for zero walltime ({str_minTimeForZeroWalltime} at least) "
1301 tmpMsg += "criteria=-zerowalltime"
1302 msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1303 continue
1304 newScanSiteList.append(tmpSiteName)
1305 scanSiteList = newScanSiteList
1306 self.add_summary_message(oldScanSiteList, scanSiteList, "zero walltime check", tmpLog, msg_map)
1307 if not scanSiteList:
1308 self.dump_summary(tmpLog)
1309 tmpLog.error("no candidates")
1310 retVal = retTmpError
1311 continue
1312
1313
1314
1315 nWNmap = self.taskBufferIF.getCurrentSiteData()
1316 nPilotMap = {}
1317 newScanSiteList = []
1318 oldScanSiteList = copy.copy(scanSiteList)
1319 msg_map = {}
1320 for tmpSiteName in self.get_unified_sites(scanSiteList):
1321
1322 nPilot = 0
1323 if tmpSiteName in nWNmap:
1324 nPilot = nWNmap[tmpSiteName]["getJob"] + nWNmap[tmpSiteName]["updateJob"]
1325 if nPilot == 0 and taskSpec.prodSourceLabel not in ["test"]:
1326 msg_map[tmpSiteName] = f" skip site={tmpSiteName} due to no pilot criteria=-nopilot"
1327 if not self.testMode:
1328 continue
1329 newScanSiteList.append(tmpSiteName)
1330 nPilotMap[tmpSiteName] = nPilot
1331 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1332 self.add_summary_message(oldScanSiteList, scanSiteList, "pilot activity check", tmpLog, msg_map)
1333 if not scanSiteList:
1334 self.dump_summary(tmpLog)
1335 tmpLog.error("no candidates")
1336 retVal = retTmpError
1337 continue
1338
1339
1340
1341 newScanSiteList = []
1342 oldScanSiteList = copy.copy(scanSiteList)
1343 sitesForANY = []
1344 msg_map = {}
1345 for tmpSiteName in self.get_unified_sites(scanSiteList):
1346 autoSite = False
1347
1348 if AtlasBrokerUtils.isMatched(tmpSiteName, excludeList):
1349 msg_map[tmpSiteName] = f" skip site={tmpSiteName} excluded criteria=-excluded"
1350 continue
1351
1352
1353 if includeList is not None and not AtlasBrokerUtils.isMatched(tmpSiteName, includeList):
1354 if "AUTO" in includeList:
1355 autoSite = True
1356 else:
1357 msg_map[tmpSiteName] = f" skip site={tmpSiteName} not included criteria=-notincluded"
1358 continue
1359 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1360
1361
1362 if taskSpec.cloud not in [None, "", "any", tmpSiteSpec.cloud]:
1363 msg_map[tmpSiteName] = f" skip site={tmpSiteName} cloud mismatch criteria=-cloudmismatch"
1364 continue
1365 if autoSite:
1366 sitesForANY.append(tmpSiteName)
1367 else:
1368 newScanSiteList.append(tmpSiteName)
1369
1370 if newScanSiteList == []:
1371 newScanSiteList = sitesForANY
1372 else:
1373 for tmpSiteName in sitesForANY:
1374 msg_map[tmpSiteName] = f" skip site={tmpSiteName} not included criteria=-notincluded"
1375 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1376 self.add_summary_message(oldScanSiteList, scanSiteList, "include/exclude check", tmpLog, msg_map)
1377 if not scanSiteList:
1378 self.dump_summary(tmpLog)
1379 tmpLog.error("no candidates")
1380 retVal = retTmpError
1381 continue
1382
1383
1384 tmpSt, sitesUsedByTask = self.taskBufferIF.getSitesUsedByTask_JEDI(taskSpec.jediTaskID)
1385 if not tmpSt:
1386 tmpLog.error("failed to get sites which already used by task")
1387 retVal = retTmpError
1388 continue
1389 sitesUsedByTask = self.get_unified_sites(sitesUsedByTask)
1390
1391
1392 tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
1393 if not tmpSt:
1394 tmpLog.error("failed to get job statistics with priority")
1395 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1396 return retTmpError
1397 tmpSt, siteToRunRateMap = AtlasBrokerUtils.getSiteToRunRateStats(tbIF=self.taskBufferIF, vo=taskSpec.vo)
1398 if not tmpSt:
1399 tmpLog.error("failed to get site to-running rate")
1400 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1401 return retTmpError
1402
1403 if sitePreAssigned:
1404 oldScanSiteList = copy.copy(scanSiteList)
1405 if preassignedSite not in scanSiteList and preassignedSite not in self.get_unified_sites(scanSiteList):
1406 tmpLog.info(f"preassigned site {preassignedSite} did not pass all tests")
1407 self.add_summary_message(oldScanSiteList, [], "preassign check", tmpLog, {})
1408 self.dump_summary(tmpLog)
1409 tmpLog.error("no candidates")
1410 retVal = retFatal
1411 continue
1412 else:
1413 newScanSiteList = []
1414 msg_map = {}
1415 for tmpPseudoSiteName in scanSiteList:
1416 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1417 tmpSiteName = tmpSiteSpec.get_unified_name()
1418 if tmpSiteName != preassignedSite:
1419 msg_map[tmpSiteName] = f" skip site={tmpSiteName} non pre-assigned site criteria=-nonpreassigned"
1420 continue
1421 newScanSiteList.append(tmpSiteName)
1422 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1423 self.add_summary_message(oldScanSiteList, scanSiteList, "preassign check", tmpLog, msg_map)
1424
1425
1426 newScanSiteList = []
1427 oldScanSiteList = copy.copy(scanSiteList)
1428 hasNormalSite = False
1429 for tmpSiteName in self.get_unified_sites(scanSiteList):
1430 if not tmpSiteName.endswith("_HOSPITAL"):
1431 hasNormalSite = True
1432 break
1433 if hasNormalSite:
1434 msg_map = {}
1435 for tmpSiteName in self.get_unified_sites(scanSiteList):
1436
1437 if tmpSiteName.endswith("_HOSPITAL"):
1438 msg_map[tmpSiteName] = f" skip site={tmpSiteName} due to hospital queue criteria=-hospital"
1439 continue
1440 newScanSiteList.append(tmpSiteName)
1441 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1442 self.add_summary_message(oldScanSiteList, scanSiteList, "hospital check", tmpLog, msg_map)
1443 if not scanSiteList:
1444 self.dump_summary(tmpLog)
1445 tmpLog.error("no candidates")
1446 retVal = retTmpError
1447 continue
1448
1449
1450 if not sitePreAssigned:
1451
1452 tmpRet, tmpStatMap = self.taskBufferIF.getJobStatisticsByResourceTypeSite(workQueue)
1453 newScanSiteList = []
1454 oldScanSiteList = copy.copy(scanSiteList)
1455 msg_map = {}
1456 RT_Cap = 2
1457 for tmpSiteName in self.get_unified_sites(scanSiteList):
1458 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
1459 tmpUnifiedName = tmpSiteSpec.get_unified_name()
1460 if tmpUnifiedName in tmpStatMap and taskSpec.resource_type in tmpStatMap[tmpUnifiedName]:
1461 tmpSiteStatMap = tmpStatMap[tmpUnifiedName][taskSpec.resource_type]
1462 tmpRTrunning = tmpSiteStatMap.get("running", 0)
1463 tmpRTqueue = tmpSiteStatMap.get("defined", 0)
1464 tmpRTqueue += tmpSiteStatMap.get("assigned", 0)
1465 tmpRTqueue += tmpSiteStatMap.get("activated", 0)
1466 tmpRTqueue += tmpSiteStatMap.get("starting", 0)
1467 if tmpRTqueue > max(20, tmpRTrunning * RT_Cap):
1468 tmpMsg = f" skip site={tmpSiteName} "
1469 tmpMsg += "since nQueue/max(20,nRun) with gshare+resource_type is "
1470 tmpMsg += f"{tmpRTqueue}/max(20,{tmpRTrunning}) > {RT_Cap} "
1471 tmpMsg += "criteria=-cap_rt"
1472 msg_map[tmpSiteName] = tmpMsg
1473 newScanSiteList.append(tmpSiteName)
1474 scanSiteList = self.get_pseudo_sites(newScanSiteList, scanSiteList)
1475 self.add_summary_message(oldScanSiteList, scanSiteList, "cap with gshare+resource_type check", tmpLog, msg_map)
1476 if not scanSiteList:
1477 self.dump_summary(tmpLog)
1478 tmpLog.error("no candidates")
1479 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1480 return retTmpError
1481
1482
1483 if not inputChunk.isMerging:
1484 newScanSiteList = []
1485 oldScanSiteList = copy.copy(scanSiteList)
1486 overloadedNonVP = []
1487 msg_map = {}
1488 msgList = []
1489 msgListVP = []
1490 minQueue = self.taskBufferIF.getConfigValue("anal_jobbroker", "OVERLOAD_MIN_QUEUE", "jedi", taskSpec.vo)
1491 if minQueue is None:
1492 minQueue = 20
1493 ratioOffset = self.taskBufferIF.getConfigValue("anal_jobbroker", "OVERLOAD_RATIO_OFFSET", "jedi", taskSpec.vo)
1494 if ratioOffset is None:
1495 ratioOffset = 1.2
1496 grandRatio = AtlasBrokerUtils.get_total_nq_nr_ratio(jobStatPrioMap, taskSpec.gshare)
1497 tmpLog.info(f"grand nQueue/nRunning ratio : {grandRatio}")
1498 tmpLog.info(f"sites with non-VP data : {','.join(scanSiteWoVP)}")
1499 for tmpPseudoSiteName in scanSiteList:
1500 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1501 tmpSiteName = tmpSiteSpec.get_unified_name()
1502
1503 nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running", workQueue_tag=taskSpec.gshare)
1504 nQueue = 0
1505 for jobStatus in ["defined", "assigned", "activated", "starting"]:
1506 nQueue += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, jobStatus, workQueue_tag=taskSpec.gshare)
1507
1508 if nQueue > minQueue and (nRunning == 0 or float(nQueue) / float(nRunning) > grandRatio * ratioOffset):
1509 tmpMsg = f" skip site={tmpSiteName} "
1510 tmpMsg += f"nQueue>minQueue({minQueue}) and "
1511 if nRunning == 0:
1512 tmpMsg += "nRunning=0 "
1513 problematic_sites_dict.setdefault(tmpSiteName, set())
1514 problematic_sites_dict[tmpSiteName].add(f"nQueue({nQueue})>minQueue({minQueue}) and nRunning=0")
1515 else:
1516 tmpMsg += f"nQueue({nQueue})/nRunning({nRunning}) > grandRatio({grandRatio:.2f})*offset({ratioOffset}) "
1517 if tmpSiteName in scanSiteWoVP or checkDataLocality is False or inputChunk.getDatasets() == []:
1518 tmpMsg += "criteria=-overloaded"
1519 overloadedNonVP.append(tmpPseudoSiteName)
1520 msgListVP.append(tmpMsg)
1521 else:
1522 tmpMsg += "and VP criteria=-overloaded_vp"
1523 msgList.append(tmpMsg)
1524 else:
1525 newScanSiteList.append(tmpPseudoSiteName)
1526 if len(newScanSiteList) > 0:
1527 scanSiteList = newScanSiteList
1528 for tmpMsg in msgList + msgListVP:
1529 tmp_site_name = tmpMsg.split("skip site=")[1].split(" ")[0]
1530 tmpSiteSpec = self.siteMapper.getSite(tmp_site_name)
1531 msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1532 else:
1533 scanSiteList = overloadedNonVP
1534 for tmpMsg in msgList:
1535 tmp_site_name = tmpMsg.split("skip site=")[1].split(" ")[0]
1536 tmpSiteSpec = self.siteMapper.getSite(tmp_site_name)
1537 msg_map[tmpSiteSpec.get_unified_name()] = tmpMsg
1538 self.add_summary_message(oldScanSiteList, scanSiteList, "overload check", tmpLog, msg_map)
1539 if not scanSiteList:
1540 self.dump_summary(tmpLog)
1541 tmpLog.error("no candidates")
1542 retVal = retTmpError
1543 continue
1544
1545
1546 user_name = CoreUtils.clean_user_id(taskSpec.userName)
1547 tmpSt, jobsStatsPerUser = AtlasBrokerUtils.getUsersJobsStats(
1548 tbIF=self.taskBufferIF, vo=taskSpec.vo, prod_source_label=taskSpec.prodSourceLabel, cache_lifetime=60
1549 )
1550 if not tmpSt:
1551 tmpLog.error("failed to get users jobs statistics")
1552 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1553
1554 return retTmpError
1555 elif not inputChunk.isMerging:
1556
1557 for tmpPseudoSiteName in scanSiteList:
1558 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1559 tmpSiteName = tmpSiteSpec.get_unified_name()
1560
1561 nRunning_pq_total = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running")
1562 nRunning_pq_in_gshare = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running", workQueue_tag=taskSpec.gshare)
1563 nQueue_pq_in_gshare = 0
1564 for jobStatus in ["defined", "assigned", "activated", "starting"]:
1565 nQueue_pq_in_gshare += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, jobStatus, workQueue_tag=taskSpec.gshare)
1566
1567 try:
1568 site_to_running_rate = siteToRunRateMap[tmpSiteName]
1569 if isinstance(site_to_running_rate, dict):
1570 site_to_running_rate = sum(site_to_running_rate.values())
1571 except KeyError:
1572 site_to_running_rate = 0
1573 finally:
1574 to_running_rate = nRunning_pq_in_gshare * site_to_running_rate / nRunning_pq_total if nRunning_pq_total > 0 else 0
1575
1576 if nQueue_pq_in_gshare < base_queue_length_per_pq:
1577
1578 tmpLog.debug(f"not throttle on {tmpSiteName} since nQ({nQueue_pq_in_gshare}) < base queue length ({base_queue_length_per_pq})")
1579 continue
1580 allowed_queue_length_from_wait_time = base_expected_wait_hour_on_pq * to_running_rate
1581 if nQueue_pq_in_gshare < allowed_queue_length_from_wait_time:
1582
1583 tmpLog.debug(
1584 "not throttle on {0} since nQ({1}) < {2:.3f} = toRunningRate({3:.3f} /hr) * base wait time ({4} hr)".format(
1585 tmpSiteName, nQueue_pq_in_gshare, allowed_queue_length_from_wait_time, to_running_rate, base_expected_wait_hour_on_pq
1586 )
1587 )
1588 continue
1589
1590 try:
1591 user_jobs_stats_map = jobsStatsPerUser[tmpSiteName][taskSpec.gshare][user_name]
1592 except KeyError:
1593 continue
1594 else:
1595 nQ_pq_user = user_jobs_stats_map["nQueue"]
1596 nR_pq_user = user_jobs_stats_map["nRunning"]
1597 nUsers_pq = len(jobsStatsPerUser[tmpSiteName][taskSpec.gshare])
1598 try:
1599 nR_pq = jobsStatsPerUser[tmpSiteName][taskSpec.gshare]["_total"]["nRunning"]
1600 except KeyError:
1601 nR_pq = nRunning_pq_in_gshare
1602
1603 nQ_pq_limit_map = {
1604 "base_limit": base_queue_length_per_pq,
1605 "static_limit": static_max_queue_running_ratio * nR_pq,
1606 "dynamic_limit": max_expected_wait_hour * to_running_rate,
1607 }
1608 max_nQ_pq = max(nQ_pq_limit_map.values())
1609
1610 description_of_max_nQ_pq = f"max_nQ_pq({max_nQ_pq:.3f}) "
1611 for k, v in nQ_pq_limit_map.items():
1612 if v == max_nQ_pq:
1613 if k in ["base_limit"]:
1614 description_of_max_nQ_pq += f"= {k} = BASE_QUEUE_LENGTH_PER_PQ({base_queue_length_per_pq})"
1615 elif k in ["static_limit"]:
1616 description_of_max_nQ_pq += f"= {k} = STATIC_MAX_QUEUE_RUNNING_RATIO({static_max_queue_running_ratio:.3f}) * nR_pq({nR_pq})"
1617 elif k in ["dynamic_limit"]:
1618 description_of_max_nQ_pq += "= {key} = MAX_EXPECTED_WAIT_HOUR({value:.3f} hr) * toRunningRate_pq({trr:.3f} /hr)".format(
1619 key=k, value=max_expected_wait_hour, trr=to_running_rate
1620 )
1621 break
1622
1623 user_fraction_map = {
1624 "equal_distr": 1 / nUsers_pq,
1625 "prop_to_nR": nR_pq_user / nR_pq if nR_pq > 0 else 0,
1626 }
1627 max_user_fraction = max(user_fraction_map.values())
1628
1629 description_of_max_user_fraction = f"max_user_fraction({max_user_fraction:.3f}) "
1630 for k, v in user_fraction_map.items():
1631 if v == max_user_fraction:
1632 if k in ["equal_distr"]:
1633 description_of_max_user_fraction += f"= {k} = 1 / nUsers_pq({nUsers_pq})"
1634 elif k in ["prop_to_nR"]:
1635 description_of_max_user_fraction += f"= {k} = nR_pq_user({nR_pq_user}) / nR_pq({nR_pq})"
1636 break
1637
1638 nQ_pq_user_limit_map = {
1639 "constant_base_user_limit": base_default_queue_length_per_pq_user,
1640 "ratio_base_user_limit": base_queue_ratio_on_pq * nR_pq,
1641 "dynamic_user_limit": max_nQ_pq * max_user_fraction,
1642 }
1643 max_nQ_pq_user = max(nQ_pq_user_limit_map.values())
1644
1645 description_of_max_nQ_pq_user = f"max_nQ_pq_user({max_nQ_pq_user:.3f}) "
1646 for k, v in nQ_pq_user_limit_map.items():
1647 if v == max_nQ_pq_user:
1648 if k in ["constant_base_user_limit"]:
1649 description_of_max_nQ_pq_user += f"= {k} = BASE_DEFAULT_QUEUE_LENGTH_PER_PQ_USER({base_default_queue_length_per_pq_user})"
1650 elif k in ["ratio_base_user_limit"]:
1651 description_of_max_nQ_pq_user += f"= {k} = BASE_QUEUE_RATIO_ON_PQ({base_queue_ratio_on_pq:.3f}) * nR_pq({nR_pq})"
1652 elif k in ["dynamic_user_limit"]:
1653 description_of_max_nQ_pq_user += f"= {k} = max_nQ_pq({max_nQ_pq:.3f}) * max_user_fraction({max_user_fraction:.3f})"
1654 description_of_max_nQ_pq_user += f" , where {description_of_max_nQ_pq} , and {description_of_max_user_fraction}"
1655 break
1656
1657 if nQ_pq_user > max_nQ_pq_user:
1658 tmpMsg = f" consider {tmpSiteName} unsuitable for the user due to long queue of the user: "
1659 tmpMsg += f"nQ_pq_user({nQ_pq_user}) > {description_of_max_nQ_pq_user} "
1660
1661 problematic_sites_dict.setdefault(tmpSiteName, set())
1662 problematic_sites_dict[tmpSiteName].add(tmpMsg)
1663
1664 if tmpSiteName in failureCounts:
1665 nFailed = failureCounts[tmpSiteName].get("failed", 0)
1666 nClosed = failureCounts[tmpSiteName].get("closed", 0)
1667 nFinished = failureCounts[tmpSiteName].get("finished", 0)
1668 if not inputChunk.isMerging and (nFailed + nClosed) > max(2 * nFinished, minBadJobsToSkipPQ):
1669 problematic_sites_dict.setdefault(tmpSiteName, set())
1670 problematic_sites_dict[tmpSiteName].add("too many failed or closed jobs for last 6h")
1671
1672
1673
1674
1675 if len(scan_site_list_loops) > 1 and i_loop == 0 and to_ignore_data_loc:
1676 candidates_with_problems = []
1677 for tmpPseudoSiteName in scanSiteList:
1678 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1679 tmpSiteName = tmpSiteSpec.get_unified_name()
1680 if tmpSiteName in problematic_sites_dict:
1681 candidates_with_problems.append([tmpPseudoSiteName, list(problematic_sites_dict[tmpSiteName])[0]])
1682 if len(scanSiteList) == len(candidates_with_problems):
1683 msg_map = {}
1684 for tmpPseudoSiteName, error_diag in candidates_with_problems:
1685 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1686 msg_map[tmpSiteSpec.get_unified_name()] = (
1687 f" skip site={tmpSiteSpec.get_unified_name()} due to a temporary user-specific problem: {error_diag} " "criteria=-tmp_user_problem"
1688 )
1689 self.add_summary_message(scanSiteList, [], "temp user problem check", tmpLog, msg_map)
1690 self.dump_summary(tmpLog)
1691 tmpLog.error("no candidates")
1692 retVal = retTmpError
1693 continue
1694
1695
1696
1697 overall_site_list.update(scanSiteList)
1698 if site_list_with_data is None:
1699
1700 site_list_with_data = set(scanSiteList)
1701 if len(overall_site_list) >= taskSpec.getNumSitesPerJob():
1702 retVal = None
1703 break
1704
1705 if retVal is not None:
1706 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1707 return retVal
1708
1709 scanSiteList = list(overall_site_list)
1710 availableFileMap = {}
1711 for datasetSpec in inputChunk.getDatasets():
1712 try:
1713
1714 tmpLog.debug(f"getting the list of available files for {datasetSpec.datasetName}")
1715 fileScanSiteList = []
1716 for tmpPseudoSiteName in scanSiteList:
1717 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1718 tmpSiteName = tmpSiteSpec.get_unified_name()
1719 if tmpSiteName in fileScanSiteList:
1720 continue
1721 fileScanSiteList.append(tmpSiteName)
1722 if tmpSiteName in remoteSourceList and datasetSpec.datasetName in remoteSourceList[tmpSiteName]:
1723 for tmpRemoteSite in remoteSourceList[tmpSiteName][datasetSpec.datasetName]:
1724 if tmpRemoteSite not in fileScanSiteList:
1725 fileScanSiteList.append(tmpRemoteSite)
1726
1727 siteStorageEP = AtlasBrokerUtils.getSiteInputStorageEndpointMap(fileScanSiteList, self.siteMapper, JobUtils.ANALY_PS, JobUtils.ANALY_PS)
1728
1729 if inputChunk.isMerging:
1730 checkCompleteness = False
1731 else:
1732 checkCompleteness = True
1733 if not datasetSpec.isMaster():
1734 useCompleteOnly = True
1735 else:
1736 useCompleteOnly = False
1737
1738 tmpAvFileMap = self.ddmIF.getAvailableFiles(
1739 datasetSpec,
1740 siteStorageEP,
1741 self.siteMapper,
1742 check_completeness=checkCompleteness,
1743 use_vp=useVP,
1744 file_scan_in_container=False,
1745 complete_only=useCompleteOnly,
1746 element_list=element_map.get(datasetSpec.datasetName),
1747 )
1748 if tmpAvFileMap is None:
1749 raise Interaction.JEDITemporaryError("ddmIF.getAvailableFiles failed")
1750 availableFileMap[datasetSpec.datasetName] = tmpAvFileMap
1751 except Exception:
1752 errtype, errvalue = sys.exc_info()[:2]
1753 tmpLog.error(f"failed to get available files with {errtype.__name__} {errvalue}")
1754 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
1755 return retTmpError
1756
1757 totalSize = 0
1758 totalNumFiles = 0
1759 totalDiskSizeMap = dict()
1760 totalTapeSizeMap = dict()
1761 for datasetSpec in inputChunk.getDatasets():
1762 totalNumFiles += len(datasetSpec.Files)
1763 for fileSpec in datasetSpec.Files:
1764 totalSize += fileSpec.fsize
1765 if datasetSpec.datasetName in availableFileMap:
1766 for tmpSiteName, tmpAvFileMap in availableFileMap[datasetSpec.datasetName].items():
1767 totalDiskSizeMap.setdefault(tmpSiteName, 0)
1768 totalTapeSizeMap.setdefault(tmpSiteName, 0)
1769 for fileSpec in tmpAvFileMap["localdisk"]:
1770 totalDiskSizeMap[tmpSiteName] += fileSpec.fsize
1771 for fileSpec in tmpAvFileMap["localtape"]:
1772 totalTapeSizeMap[tmpSiteName] += fileSpec.fsize
1773 totalSize //= 1024 * 1024 * 1024
1774 tmpLog.info(f"totalInputSize={totalSize} GB")
1775 for tmpSiteName in totalDiskSizeMap.keys():
1776 totalDiskSizeMap[tmpSiteName] //= 1024 * 1024 * 1024
1777 for tmpSiteName in totalTapeSizeMap.keys():
1778 totalTapeSizeMap[tmpSiteName] //= 1024 * 1024 * 1024
1779
1780
1781 tmpLog.info(f"{len(scanSiteList)} candidates for final check")
1782 weightMap = {}
1783 weightStr = {}
1784 candidateSpecList = []
1785 preSiteCandidateSpec = None
1786 basic_weight_compar_map = {}
1787 workerStat = self.taskBufferIF.ups_load_worker_stats()
1788 for tmpPseudoSiteName in scanSiteList:
1789 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
1790 tmpSiteName = tmpSiteSpec.get_unified_name()
1791 nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running", workQueue_tag=taskSpec.gshare)
1792 nDefined = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "defined", workQueue_tag=taskSpec.gshare)
1793 nAssigned = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "assigned", workQueue_tag=taskSpec.gshare)
1794 nActivated = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "activated", workQueue_tag=taskSpec.gshare)
1795 nStarting = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "starting", workQueue_tag=taskSpec.gshare)
1796
1797 nWorkers = 0
1798 nWorkersCutoff = 20
1799 if tmpSiteName in workerStat:
1800 for _, tmpLabelStat in workerStat[tmpSiteName].items():
1801 for _, tmpResStat in tmpLabelStat.items():
1802 for tmpResType, tmpCounts in tmpResStat.items():
1803 for tmpStatus, tmpNum in tmpCounts.items():
1804 if tmpStatus in ["running", "submitted"]:
1805 nWorkers += tmpNum
1806
1807 nWorkers = min(nWorkersCutoff, nWorkers)
1808
1809 if tmpSiteName in nPilotMap and nPilotMap[tmpSiteName] > 0 and nRunning < nWorkersCutoff and nWorkers > nRunning:
1810 tmpLog.debug(f"using nWorkers={nWorkers} as nRunning at {tmpPseudoSiteName} since original nRunning={nRunning} is low")
1811 nRunning = nWorkers
1812
1813 numStandby = tmpSiteSpec.getNumStandby(taskSpec.gshare, taskSpec.resource_type)
1814 if numStandby is None:
1815 pass
1816 elif numStandby == 0:
1817
1818 nRunning = nStarting + nRunning
1819 tmpLog.debug(f"using dynamic workload provisioning at {tmpPseudoSiteName} to set nRunning={nRunning}")
1820 else:
1821
1822 nRunning = max(int(numStandby / tmpSiteSpec.coreCount), nRunning)
1823 tmpLog.debug(f"using static workload provisioning at {tmpPseudoSiteName} with nStandby={numStandby} to set nRunning={nRunning}")
1824 nFailed = 0
1825 nClosed = 0
1826 nFinished = 0
1827 if tmpSiteName in failureCounts:
1828 if "failed" in failureCounts[tmpSiteName]:
1829 nFailed = failureCounts[tmpSiteName]["failed"]
1830 if "closed" in failureCounts[tmpSiteName]:
1831 nClosed = failureCounts[tmpSiteName]["closed"]
1832 if "finished" in failureCounts[tmpSiteName]:
1833 nFinished = failureCounts[tmpSiteName]["finished"]
1834
1835 try:
1836 site_to_running_rate = siteToRunRateMap[tmpSiteName]
1837 if isinstance(site_to_running_rate, dict):
1838 site_to_running_rate = sum(site_to_running_rate.values())
1839 except KeyError:
1840 to_running_rate_str = "0(unknown)"
1841 to_running_rate = 0
1842 else:
1843 site_n_running = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running")
1844 to_running_rate = nRunning * site_to_running_rate / site_n_running if site_n_running > 0 else 0
1845 to_running_rate_str = f"{to_running_rate:.3f}"
1846
1847 site_class_value = analy_sites_class_dict.get(tmpSiteName, 0)
1848 site_class_value = 0 if site_class_value is None else site_class_value
1849
1850 orig_basic_weight = float(nRunning + 1) / float(nActivated + nAssigned + nDefined + nStarting + 1)
1851
1852 basic_weight_compar_map[tmpSiteName] = {}
1853 basic_weight_compar_map[tmpSiteName]["orig"] = orig_basic_weight
1854 basic_weight_compar_map[tmpSiteName]["trr"] = to_running_rate
1855 basic_weight_compar_map[tmpSiteName]["nq"] = nActivated + nAssigned + nDefined + nStarting
1856 basic_weight_compar_map[tmpSiteName]["nr"] = nRunning
1857 basic_weight_compar_map[tmpSiteName]["class"] = site_class_value
1858 basic_weight_compar_map[tmpSiteName]["nDefined"] = nDefined
1859 basic_weight_compar_map[tmpSiteName]["nActivated"] = nActivated
1860 basic_weight_compar_map[tmpSiteName]["nStarting"] = nStarting
1861 basic_weight_compar_map[tmpSiteName]["nAssigned"] = nAssigned
1862 basic_weight_compar_map[tmpSiteName]["nFailed"] = nFailed
1863 basic_weight_compar_map[tmpSiteName]["nClosed"] = nClosed
1864 basic_weight_compar_map[tmpSiteName]["nFinished"] = nFinished
1865
1866 try:
1867 n_avail_sites = len(basic_weight_compar_map)
1868 if n_avail_sites == 0:
1869 tmpLog.debug("WEIGHT-COMPAR: zero available sites, skip")
1870 else:
1871
1872 task_class_value = task_eval_dict.get("class", 1) if task_eval_dict is not None else 1
1873
1874 if not inputChunk.isMerging:
1875 nFilesPerJob = taskSpec.getNumFilesPerJob()
1876 else:
1877 nFilesPerJob = taskSpec.getNumFilesPerMergeJob()
1878 if nFilesPerJob is None or nFilesPerJob < 1:
1879 nFilesPerJob = 1
1880
1881 _maxSizePerJob = taskSpec.getMaxSizePerJob()
1882 if _maxSizePerJob is not None:
1883 _maxSizePerJob += inputChunk.defaultOutputSize
1884 _maxSizePerJob += taskSpec.getWorkDiskSize()
1885 else:
1886 if taskSpec.useScout():
1887 _maxSizePerJob = inputChunk.maxInputSizeScouts * 1024 * 1024
1888 else:
1889 _maxSizePerJob = inputChunk.maxInputSizeAvalanche * 1024 * 1024
1890
1891 n_subchunks = 0
1892 while True:
1893 subchunk, _ = inputChunk.getSubChunk(
1894 None,
1895 maxNumFiles=taskSpec.getMaxNumFilesPerJob(),
1896 nFilesPerJob=taskSpec.getNumFilesPerJob(),
1897 walltimeGradient=(taskSpec.getCpuTime() if taskSpec.useHS06() else None),
1898 maxWalltime=(taskSpec.getMaxWalltime() if taskSpec.getMaxWalltime() is not None else 345600),
1899 sizeGradients=taskSpec.getOutDiskSize(),
1900 sizeIntercepts=taskSpec.getWorkDiskSize(),
1901 maxSize=_maxSizePerJob,
1902 nEventsPerJob=taskSpec.getNumEventsPerJob(),
1903 coreCount=taskSpec.coreCount,
1904 corePower=10,
1905 respectLB=taskSpec.respectLumiblock(),
1906 )
1907 if subchunk is None:
1908 break
1909 else:
1910 n_subchunks += 1
1911 inputChunk.resetUsedCounters()
1912 n_jobs_to_submit = n_subchunks
1913
1914 weight_epsilon_init = 0.001
1915 weight_epsilon_hi_mid = 0.05
1916 weight_epsilon_hi_lo = 0.002
1917 weight_epsilon_mid_lo = 0.1
1918
1919 tmpSt, siteToRunRateMap = AtlasBrokerUtils.getSiteToRunRateStats(tbIF=self.taskBufferIF, vo=taskSpec.vo)
1920 weight_comparison_avail_sites = set(basic_weight_compar_map.keys())
1921 site_class_n_site_dict = {1: 0, 0: 0, -1: 0}
1922 site_class_rem_q_len_dict = {1: 0, 0: 0, -1: 0}
1923 total_rem_q_len = 0
1924
1925 for site, bw_map in basic_weight_compar_map.items():
1926
1927 site_class_n_site_dict[bw_map["class"]] += 1
1928
1929 nRunning_pq_total = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, site, "running")
1930 nRunning_pq_in_gshare = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, site, "running", workQueue_tag=taskSpec.gshare)
1931 nQueue_pq_in_gshare = 0
1932 for jobStatus in ["defined", "assigned", "activated", "starting"]:
1933 nQueue_pq_in_gshare += AtlasBrokerUtils.getNumJobs(jobStatPrioMap, site, jobStatus, workQueue_tag=taskSpec.gshare)
1934
1935 try:
1936 site_to_running_rate = siteToRunRateMap[site]
1937 if isinstance(site_to_running_rate, dict):
1938 site_to_running_rate = sum(site_to_running_rate.values())
1939 except Exception:
1940 site_to_running_rate = 0
1941 finally:
1942 to_running_rate = nRunning_pq_in_gshare * site_to_running_rate / nRunning_pq_total if nRunning_pq_total > 0 else 0
1943
1944 try:
1945 user_jobs_stats_map = jobsStatsPerUser[site][taskSpec.gshare][user_name]
1946 except KeyError:
1947 nQ_pq_user = 0
1948 nR_pq_user = 0
1949 else:
1950 nQ_pq_user = user_jobs_stats_map["nQueue"]
1951 nR_pq_user = user_jobs_stats_map["nRunning"]
1952 try:
1953 nUsers_pq = len(jobsStatsPerUser[site][taskSpec.gshare])
1954 except KeyError:
1955 nUsers_pq = 1
1956 try:
1957 nR_pq = jobsStatsPerUser[site][taskSpec.gshare]["_total"]["nRunning"]
1958 except KeyError:
1959 nR_pq = nRunning_pq_in_gshare
1960
1961 nQ_pq_limit_map = {
1962 "base_limit": base_queue_length_per_pq,
1963 "static_limit": static_max_queue_running_ratio * nR_pq,
1964 "dynamic_limit": max_expected_wait_hour * to_running_rate,
1965 }
1966 max_nQ_pq = max(nQ_pq_limit_map.values())
1967
1968 user_fraction_map = {
1969 "equal_distr": 1 / (nUsers_pq),
1970 "prop_to_nR": nR_pq_user / nR_pq if nR_pq > 0 else 0,
1971 }
1972 max_user_fraction = max(user_fraction_map.values())
1973
1974 nQ_pq_user_limit_map = {
1975 "constant_base_user_limit": base_default_queue_length_per_pq_user,
1976 "ratio_base_user_limit": base_queue_ratio_on_pq * nR_pq,
1977 "dynamic_user_limit": max_nQ_pq * max_user_fraction,
1978 }
1979 max_nQ_pq_user = max(nQ_pq_user_limit_map.values())
1980
1981 bw_map["user_r"] = nR_pq_user
1982 bw_map["user_q_len"] = nQ_pq_user
1983 bw_map["max_q_len"] = max_nQ_pq_user
1984 bw_map["rem_q_len"] = max(bw_map["max_q_len"] - bw_map["user_q_len"], 0)
1985 site_class_rem_q_len_dict[bw_map["class"]] += bw_map["rem_q_len"]
1986 total_rem_q_len += bw_map["rem_q_len"]
1987
1988 main_weight_site_class_dict = {1: weight_epsilon_init, 0: weight_epsilon_init, -1: weight_epsilon_init}
1989 if taskSpec.gshare in ["User Analysis", "Express Analysis"]:
1990 n_jobs_to_submit_rem = min(n_jobs_to_submit, total_rem_q_len)
1991 if task_class_value == -1:
1992
1993 main_weight_site_class_dict[-1] = n_jobs_to_submit_rem
1994 elif task_class_value == 0:
1995
1996 main_weight_site_class_dict[0] = min(n_jobs_to_submit_rem, site_class_rem_q_len_dict[0])
1997 n_jobs_to_submit_rem -= main_weight_site_class_dict[0]
1998 main_weight_site_class_dict[-1] = n_jobs_to_submit_rem
1999 if main_weight_site_class_dict[0] > 0:
2000 main_weight_site_class_dict[0] -= weight_epsilon_mid_lo
2001 main_weight_site_class_dict[-1] += weight_epsilon_mid_lo
2002 else:
2003
2004 main_weight_site_class_dict[1] = min(n_jobs_to_submit_rem, site_class_rem_q_len_dict[1])
2005 n_jobs_to_submit_rem -= main_weight_site_class_dict[1]
2006 main_weight_site_class_dict[0] = min(n_jobs_to_submit_rem, site_class_rem_q_len_dict[0])
2007 n_jobs_to_submit_rem -= main_weight_site_class_dict[0]
2008 main_weight_site_class_dict[-1] = n_jobs_to_submit_rem
2009 if main_weight_site_class_dict[1] > 0:
2010 main_weight_site_class_dict[1] -= weight_epsilon_hi_mid + weight_epsilon_hi_lo
2011 main_weight_site_class_dict[0] += weight_epsilon_hi_mid
2012 main_weight_site_class_dict[-1] += weight_epsilon_hi_lo
2013
2014 for site in weight_comparison_avail_sites:
2015 bw_map = basic_weight_compar_map[site]
2016
2017 nbw_main = n_jobs_to_submit
2018 if taskSpec.gshare in ["User Analysis", "Express Analysis"]:
2019 nbw_main = main_weight_site_class_dict[bw_map["class"]]
2020
2021 nbw_sec = 1
2022 if taskSpec.gshare in ["User Analysis", "Express Analysis"]:
2023 _nbw_numer = max(bw_map["rem_q_len"] - nbw_main / site_class_n_site_dict[bw_map["class"]], nbw_main * 0.001)
2024 reduced_site_class_rem_q_len = site_class_rem_q_len_dict[bw_map["class"]] - nbw_main
2025 if reduced_site_class_rem_q_len > 0 and not inputChunk.isMerging:
2026 nbw_sec = _nbw_numer / reduced_site_class_rem_q_len
2027 elif site_class_rem_q_len_dict[bw_map["class"]] > 0:
2028 nbw_sec = bw_map["rem_q_len"] / site_class_rem_q_len_dict[bw_map["class"]]
2029 elif site_class_n_site_dict[bw_map["class"]] > 0:
2030 nbw_sec = 1 / site_class_n_site_dict[bw_map["class"]]
2031 else:
2032 reduced_total_rem_q_len = total_rem_q_len - nbw_main
2033 _nbw_numer = max(bw_map["rem_q_len"] - nbw_main / n_avail_sites, nbw_main * 0.001)
2034 if reduced_total_rem_q_len > 0 and not inputChunk.isMerging:
2035 nbw_sec = _nbw_numer / reduced_total_rem_q_len
2036 elif total_rem_q_len > 0:
2037 nbw_sec = bw_map["rem_q_len"] / total_rem_q_len
2038 elif site_class_n_site_dict[bw_map["class"]] > 0:
2039 nbw_sec = 1 / n_avail_sites
2040
2041 new_basic_weight = nbw_main * nbw_sec + 2**-20
2042 bw_map["new"] = new_basic_weight
2043
2044 orig_sum = 0
2045 new_sum = 0
2046 for bw_map in basic_weight_compar_map.values():
2047 orig_sum += bw_map["orig"]
2048 new_sum += bw_map["new"]
2049 for site in basic_weight_compar_map:
2050 bw_map = basic_weight_compar_map[site]
2051 if bw_map["nr"] == 0:
2052 trr_over_r = None
2053 else:
2054 trr_over_r = bw_map["trr"] / bw_map["nr"]
2055 bw_map["trr_over_r"] = f"{trr_over_r:6.3f}" if trr_over_r is not None else "None"
2056 if orig_sum == 0:
2057 normalized_orig = 0
2058 else:
2059 normalized_orig = bw_map["orig"] / orig_sum
2060 bw_map["normalized_orig"] = normalized_orig
2061 if new_sum == 0:
2062 normalized_new = 0
2063 else:
2064 normalized_new = bw_map["new"] / new_sum
2065 bw_map["normalized_new"] = normalized_new
2066 prt_str_list = []
2067 prt_str_temp = (
2068 ""
2069 " {site:>30} |"
2070 " {class:>2} |"
2071 " {nq:>6} |"
2072 " {nr:>6} |"
2073 " {trr:7.2f} |"
2074 " {user_q_len:>5} |"
2075 " {max_q_len:9.2f} |"
2076 " {rem_q_len:9.2f} |"
2077 " {orig:7.2f} |"
2078 " {new:7.3f} |"
2079 " {normalized_orig:6.1%} |"
2080 " {normalized_new:6.1%} |"
2081 )
2082 prt_str_title = (
2083 ""
2084 " {site:>30} |"
2085 " {cl:>2} |"
2086 " {nq:>6} |"
2087 " {nr:>6} |"
2088 " {trr:>7} |"
2089 " {user_q_len:>5} |"
2090 " {max_q_len:>9} |"
2091 " {rem_q_len:>9} |"
2092 " {orig:>7} |"
2093 " {new:>7} |"
2094 " {normalized_orig:>6} |"
2095 " {normalized_new:>6} |"
2096 ).format(
2097 site="Site",
2098 cl="Cl",
2099 nq="Q",
2100 nr="R",
2101 trr="TRR",
2102 user_q_len="UserQ",
2103 max_q_len="UserQ_max",
2104 rem_q_len="UserQ_rem",
2105 orig="Wb_orig",
2106 new="Wb_new",
2107 normalized_orig="orig_%",
2108 normalized_new="new_%",
2109 )
2110 prt_str_list.append(prt_str_title)
2111 for site in sorted(basic_weight_compar_map):
2112 bw_map = basic_weight_compar_map[site]
2113 prt_str = prt_str_temp.format(site=site, **{k: (x if x is not None else math.nan) for k, x in bw_map.items()})
2114 prt_str_list.append(prt_str)
2115 tmpLog.info(f"gshare: {taskSpec.gshare} ,{' merging,' if inputChunk.isMerging else ''} task_class: {task_class_value}")
2116 tmpLog.debug(
2117 "WEIGHT-COMPAR: for gshare={},{} cl={}, nJobsEst={} got \n{}".format(
2118 taskSpec.gshare, (" merging," if inputChunk.isMerging else ""), task_class_value, n_jobs_to_submit, "\n".join(prt_str_list)
2119 )
2120 )
2121 except Exception as e:
2122 tmpLog.error(f"{traceback.format_exc()}")
2123
2124 _basic_weight_version = "new"
2125
2126 for tmpPseudoSiteName in scanSiteList:
2127 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
2128 tmpSiteName = tmpSiteSpec.get_unified_name()
2129 bw_map = basic_weight_compar_map[tmpSiteName]
2130
2131 weight = bw_map[_basic_weight_version]
2132
2133 nThrottled = 0
2134 if tmpSiteName in remoteSourceList:
2135 nThrottled = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "throttled", workQueue_tag=taskSpec.gshare)
2136 weight /= float(nThrottled + 1)
2137
2138 diskNorm = 10
2139 tapeNorm = 1000
2140 localSize = totalSize
2141 if checkDataLocality and not useUnionLocality:
2142 tmpDataWeight = 1
2143 if tmpSiteName in dataWeight:
2144 weight *= dataWeight[tmpSiteName]
2145 tmpDataWeight = dataWeight[tmpSiteName]
2146 else:
2147 tmpDataWeight = 1
2148 if totalSize > 0:
2149 if tmpSiteName in totalDiskSizeMap:
2150 tmpDataWeight += totalDiskSizeMap[tmpSiteName] / diskNorm
2151 localSize = totalDiskSizeMap[tmpSiteName]
2152 elif tmpSiteName in totalTapeSizeMap:
2153 tmpDataWeight += totalTapeSizeMap[tmpSiteName] / tapeNorm
2154 localSize = totalTapeSizeMap[tmpSiteName]
2155 weight *= tmpDataWeight
2156
2157 siteCandidateSpec = SiteCandidate(tmpPseudoSiteName, tmpSiteName)
2158
2159 if sitePreAssigned and tmpSiteName == preassignedSite:
2160 preSiteCandidateSpec = siteCandidateSpec
2161
2162 siteCandidateSpec.override_attribute("maxwdir", newMaxwdir.get(tmpSiteName))
2163 if cmt_config:
2164 platforms = resolved_platforms.get(tmpSiteName)
2165 if platforms:
2166 siteCandidateSpec.override_attribute("platforms", platforms)
2167
2168 siteCandidateSpec.weight = weight
2169 tmpStr = (
2170 f"""weight={weight:.7f} """
2171 f"""gshare={taskSpec.gshare} class={bw_map["class"]} trr={bw_map["trr"]:.3f} """
2172 f"""userR={bw_map["user_r"]} userQ={bw_map["user_q_len"]} userQRem={bw_map["rem_q_len"]:.3f} """
2173 f"""nRunning={bw_map["nr"]} nDefined={bw_map["nDefined"]} nActivated={bw_map["nActivated"]} """
2174 f"""nStarting={bw_map["nStarting"]} nAssigned={bw_map["nAssigned"]} """
2175 f"""nFailed={bw_map["nFailed"]} nClosed={bw_map["nClosed"]} nFinished={bw_map["nFinished"]} """
2176 f"""dataW={tmpDataWeight} totalInGB={totalSize} localInGB={localSize} nFiles={totalNumFiles} """
2177 )
2178 weightStr[tmpPseudoSiteName] = tmpStr
2179
2180 if tmpSiteName in sitesUsedByTask:
2181 candidateSpecList.append(siteCandidateSpec)
2182 else:
2183 if weight not in weightMap:
2184 weightMap[weight] = []
2185 weightMap[weight].append(siteCandidateSpec)
2186
2187 candidateSpecListWithData = []
2188 weightList = sorted(weightMap.keys())
2189 weightList.reverse()
2190 for weightVal in weightList:
2191 sitesWithWeight = weightMap[weightVal]
2192 for tmp_site in list(sitesWithWeight):
2193 if tmp_site in site_list_with_data:
2194 candidateSpecListWithData.append(tmp_site)
2195 sitesWithWeight.remove(tmp_site)
2196 random.shuffle(sitesWithWeight)
2197 candidateSpecList += sitesWithWeight
2198 candidateSpecList = candidateSpecListWithData + candidateSpecList
2199
2200 if not hasDDS:
2201 if taskSpec.getNumSitesPerJob() > 1:
2202
2203 maxNumSites = max(len(site_list_with_data), taskSpec.getNumSitesPerJob())
2204 else:
2205 maxNumSites = 10
2206 else:
2207
2208 maxNumSites = None
2209
2210 oldScanSiteList = copy.copy(scanSiteList)
2211 candidateSpecList = AtlasBrokerUtils.skipProblematicSites(
2212 candidateSpecList, set(problematic_sites_dict), sitesUsedByTask, preSiteCandidateSpec, maxNumSites, tmpLog
2213 )
2214
2215 if sitePreAssigned and preSiteCandidateSpec is not None and preSiteCandidateSpec not in candidateSpecList:
2216 candidateSpecList.append(preSiteCandidateSpec)
2217
2218 scanSiteList = []
2219 for siteCandidateSpec in candidateSpecList:
2220 scanSiteList.append(siteCandidateSpec.siteName)
2221
2222 newScanSiteList = []
2223 msgList = []
2224 below_min_weight = []
2225 for siteCandidateSpec in candidateSpecList:
2226 tmpPseudoSiteName = siteCandidateSpec.siteName
2227 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
2228 tmpSiteName = tmpSiteSpec.get_unified_name()
2229
2230 if sitePreAssigned and tmpSiteName != preassignedSite:
2231 tmpLog.info(f" skip site={tmpPseudoSiteName} non pre-assigned site criteria=-nonpreassigned")
2232 try:
2233 del weightStr[tmpPseudoSiteName]
2234 except Exception:
2235 pass
2236 continue
2237
2238 if min_weight > 0 and siteCandidateSpec.weight < min_weight:
2239 below_min_weight.append(tmpSiteName)
2240 continue
2241
2242 if inputChunk.getDatasets() == [] or (not checkDataLocality and not tmpSiteSpec.use_only_local_data()):
2243 isAvailable = True
2244 else:
2245 isAvailable = False
2246 isAvailableBase = isAvailable
2247 for tmpDatasetName, availableFiles in availableFileMap.items():
2248 tmpDatasetSpec = inputChunk.getDatasetWithName(tmpDatasetName)
2249 isAvailable = isAvailableBase
2250
2251 if tmpSiteName in remoteSourceList and tmpDatasetName in remoteSourceList[tmpSiteName] and not tmpSiteSpec.use_only_local_data():
2252 for tmpRemoteSite in remoteSourceList[tmpSiteName][tmpDatasetName]:
2253 if tmpRemoteSite in availableFiles and len(tmpDatasetSpec.Files) <= len(availableFiles[tmpRemoteSite]["localdisk"]):
2254
2255 siteCandidateSpec.add_remote_files(availableFiles[tmpRemoteSite]["localdisk"])
2256
2257 siteCandidateSpec.remoteProtocol = allowedRemoteProtocol
2258 siteCandidateSpec.remoteSource = tmpRemoteSite
2259 isAvailable = True
2260 break
2261
2262 if tmpSiteName in availableFiles:
2263 if (
2264 len(tmpDatasetSpec.Files) <= len(availableFiles[tmpSiteName]["localdisk"])
2265 or len(tmpDatasetSpec.Files) <= len(availableFiles[tmpSiteName]["cache"])
2266 or len(tmpDatasetSpec.Files) <= len(availableFiles[tmpSiteName]["localtape"])
2267 or tmpDatasetSpec.isDistributed()
2268 or true_complete_disk_ok[tmpDatasetName] is False
2269 or ((checkDataLocality is False or useUnionLocality) and not tmpSiteSpec.use_only_local_data())
2270 ):
2271 siteCandidateSpec.add_local_disk_files(availableFiles[tmpSiteName]["localdisk"])
2272
2273 siteCandidateSpec.add_local_disk_files(availableFiles[tmpSiteName]["cache"])
2274 siteCandidateSpec.add_local_tape_files(availableFiles[tmpSiteName]["localtape"])
2275 siteCandidateSpec.add_cache_files(availableFiles[tmpSiteName]["cache"])
2276 siteCandidateSpec.add_remote_files(availableFiles[tmpSiteName]["remote"])
2277 siteCandidateSpec.addAvailableFiles(availableFiles[tmpSiteName]["all"])
2278 isAvailable = True
2279 else:
2280 tmpMsg = "{0} is incomplete at {1} : nFiles={2} nLocal={3} nCached={4} nTape={5}"
2281 tmpLog.debug(
2282 tmpMsg.format(
2283 tmpDatasetName,
2284 tmpPseudoSiteName,
2285 len(tmpDatasetSpec.Files),
2286 len(availableFiles[tmpSiteName]["localdisk"]),
2287 len(availableFiles[tmpSiteName]["cache"]),
2288 len(availableFiles[tmpSiteName]["localtape"]),
2289 )
2290 )
2291 if not isAvailable:
2292 break
2293
2294 if not isAvailable and tmpSiteSpec.use_only_local_data():
2295 tmpLog.info(f" skip site={siteCandidateSpec.siteName} file unavailable criteria=-fileunavailable")
2296 try:
2297 del weightStr[siteCandidateSpec.siteName]
2298 except Exception:
2299 pass
2300 continue
2301 inputChunk.addSiteCandidate(siteCandidateSpec)
2302 newScanSiteList.append(siteCandidateSpec.siteName)
2303 tmpMsg = " use site={0} with {1} nLocalDisk={2} nLocalTape={3} nCache={4} nRemote={5} criteria=+use".format(
2304 siteCandidateSpec.siteName,
2305 weightStr[siteCandidateSpec.siteName],
2306 len(siteCandidateSpec.localDiskFiles),
2307 len(siteCandidateSpec.localTapeFiles),
2308 len(siteCandidateSpec.cacheFiles),
2309 len(siteCandidateSpec.remoteFiles),
2310 )
2311 msgList.append(tmpMsg)
2312 del weightStr[siteCandidateSpec.siteName]
2313
2314 for tmpPseudoSiteName in oldScanSiteList:
2315 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
2316 tmpSiteName = tmpSiteSpec.get_unified_name()
2317 tmpWeightStr = None
2318 if tmpSiteName in weightStr:
2319 tmpWeightStr = weightStr[tmpSiteName]
2320 elif tmpPseudoSiteName in weightStr:
2321 tmpWeightStr = weightStr[tmpPseudoSiteName]
2322 if tmpWeightStr is not None:
2323 if tmpSiteName in problematic_sites_dict:
2324 bad_reasons = " ; ".join(list(problematic_sites_dict[tmpSiteName]))
2325 tmpMsg = f" skip site={tmpPseudoSiteName} {bad_reasons} ; with {tmpWeightStr} criteria=-badsite"
2326 elif tmpSiteName in below_min_weight:
2327 tmpMsg = f" skip site={tmpPseudoSiteName} due to weight below the minimum {min_weight_param}={min_weight} with {tmpWeightStr} criteria=-below_min_weight"
2328 else:
2329 tmpMsg = f" skip site={tmpPseudoSiteName} due to low weight and not-used by old jobs with {tmpWeightStr} criteria=-low_weight"
2330 tmpLog.info(tmpMsg)
2331 for tmpMsg in msgList:
2332 tmpLog.info(tmpMsg)
2333 scanSiteList = newScanSiteList
2334 self.add_summary_message(oldScanSiteList, scanSiteList, "final check", tmpLog, {})
2335 if not scanSiteList:
2336 self.dump_summary(tmpLog)
2337 tmpLog.error("no candidates")
2338 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
2339 return retTmpError
2340 self.dump_summary(tmpLog, scanSiteList)
2341
2342 tmpLog.debug("done")
2343 return self.SC_SUCCEEDED, inputChunk