File indexing completed on 2026-04-10 08:38:57
0001 import copy
0002 import datetime
0003 import math
0004 import random
0005 import sys
0006 import traceback
0007
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 from pandacommon.pandautils.PandaUtils import naive_utcnow
0010
0011 from pandajedi.jedicore import Interaction
0012 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0013 from pandajedi.jedicore.ThreadUtils import (
0014 ListWithLock,
0015 MapWithLock,
0016 ThreadPool,
0017 WorkerThread,
0018 )
0019 from pandajedi.jedirefine import RefinerUtils
0020 from pandaserver.dataservice import DataServiceUtils
0021
0022 from . import AtlasBrokerUtils
0023 from .AtlasProdJobBroker import AtlasProdJobBroker
0024 from .TaskBrokerBase import TaskBrokerBase
0025
0026 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0027
0028
0029
0030 class AtlasProdTaskBroker(TaskBrokerBase):
0031
0032 def __init__(self, taskBufferIF, ddmIF):
0033 TaskBrokerBase.__init__(self, taskBufferIF, ddmIF)
0034
0035
0036 def doCheck(self, taskSpecList):
0037 return self.SC_SUCCEEDED, {}
0038
0039
0040 def doBrokerage(self, inputList, vo, prodSourceLabel, workQueue, resource_name):
0041
0042 inputListWorld = ListWithLock([])
0043
0044
0045 tmpLog = MsgWrapper(logger)
0046 tmpLog.debug("start doBrokerage")
0047
0048
0049 retTmpError = self.SC_FAILED
0050 tmpLog.debug(f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name} resource_name={resource_name} nTasks={len(inputList)}")
0051
0052
0053 allRwMap = {}
0054
0055
0056 for tmpJediTaskID, tmpInputList in inputList:
0057 for taskSpec, cloudName, inputChunk in tmpInputList:
0058 if taskSpec.useWorldCloud():
0059 inputListWorld.append((taskSpec, inputChunk))
0060
0061
0062 if inputListWorld:
0063
0064 threadPool = ThreadPool()
0065
0066 fullRWs = self.taskBufferIF.calculateWorldRWwithPrio_JEDI(vo, prodSourceLabel, None, None)
0067 if fullRWs is None:
0068 tmpLog.error("failed to calculate full WORLD RW")
0069 return retTmpError
0070
0071 for taskSpec, inputChunk in inputListWorld:
0072 if taskSpec.currentPriority not in allRwMap:
0073 tmpRW = self.taskBufferIF.calculateWorldRWwithPrio_JEDI(vo, prodSourceLabel, workQueue, taskSpec.currentPriority)
0074 if tmpRW is None:
0075 tmpLog.error(f"failed to calculate RW with prio={taskSpec.currentPriority}")
0076 return retTmpError
0077 allRwMap[taskSpec.currentPriority] = tmpRW
0078
0079
0080 liveCounter = MapWithLock(allRwMap)
0081
0082 ddmIF = self.ddmIF.getInterface(vo)
0083 for iWorker in range(4):
0084 thr = AtlasProdTaskBrokerThread(inputListWorld, threadPool, self.taskBufferIF, ddmIF, fullRWs, liveCounter, workQueue)
0085 thr.start()
0086 threadPool.join(60 * 10)
0087
0088 tmpLog.debug("doBrokerage done")
0089 return self.SC_SUCCEEDED
0090
0091
0092
0093 class AtlasProdTaskBrokerThread(WorkerThread):
0094
0095 def __init__(self, inputList, threadPool, taskbufferIF, ddmIF, fullRW, prioRW, workQueue):
0096
0097 WorkerThread.__init__(self, None, threadPool, logger)
0098
0099 self.inputList = inputList
0100 self.taskBufferIF = taskbufferIF
0101 self.ddmIF = ddmIF
0102 self.msgType = "taskbrokerage"
0103 self.fullRW = fullRW
0104 self.prioRW = prioRW
0105 self.numTasks = 0
0106 self.workQueue = workQueue
0107 self.summaryList = None
0108
0109
0110 def init_summary_list(self, header, comment, initial_list):
0111 self.summaryList = []
0112 self.summaryList.append(f"===== {header} =====")
0113 if comment:
0114 self.summaryList.append(comment)
0115 self.summaryList.append(f"the number of initial candidates: {len(initial_list)}")
0116
0117
0118 def dump_summary(self, tmp_log, final_candidates=None):
0119 if not self.summaryList:
0120 return
0121 tmp_log.info("")
0122 for m in self.summaryList:
0123 tmp_log.info(m)
0124 if not final_candidates:
0125 final_candidates = []
0126 tmp_log.info(f"the number of final candidates: {len(final_candidates)}")
0127 tmp_log.info("")
0128
0129
0130 def add_summary_message(self, old_list, new_list, message):
0131 if old_list and len(old_list) != len(new_list):
0132 red = int(math.ceil(((len(old_list) - len(new_list)) * 100) / len(old_list)))
0133 self.summaryList.append(f"{len(old_list):>5} -> {len(new_list):>3} candidates, {red:>3}% cut : {message}")
0134
0135
0136 def post_process_for_error(self, task_spec, tmp_log, msg, dump_summary=True):
0137 if dump_summary:
0138 self.dump_summary(tmp_log)
0139 tmp_log.error(msg)
0140 task_spec.resetChangedList()
0141 task_spec.setErrDiag(tmp_log.uploadLog(task_spec.jediTaskID))
0142 self.taskBufferIF.updateTask_JEDI(task_spec, {"jediTaskID": task_spec.jediTaskID}, oldStatus=["assigning"], updateDEFT=False, setFrozenTime=False)
0143
0144
0145 def runImpl(self):
0146
0147 diskThreshold = self.taskBufferIF.getConfigValue(self.msgType, f"DISK_THRESHOLD_{self.workQueue.queue_name}", "jedi", "atlas")
0148 if diskThreshold is None:
0149 diskThreshold = self.taskBufferIF.getConfigValue(self.msgType, "DISK_THRESHOLD", "jedi", "atlas")
0150 if diskThreshold is None:
0151 diskThreshold = 100
0152 diskThreshold *= 1024
0153
0154 free_disk_cutoff = self.taskBufferIF.getConfigValue(self.msgType, f"FREE_DISK_CUTOFF", "jedi", "atlas")
0155 if free_disk_cutoff is None:
0156 free_disk_cutoff = 1000
0157
0158 datasetTypeToSkipCheck = ["log"]
0159
0160 thrInputSize = self.taskBufferIF.getConfigValue(self.msgType, "INPUT_SIZE_THRESHOLD", "jedi", "atlas")
0161 if thrInputSize is None:
0162 thrInputSize = 1
0163 thrInputSize *= 1024 * 1024 * 1024
0164 thrInputNum = self.taskBufferIF.getConfigValue(self.msgType, "INPUT_NUM_THRESHOLD", "jedi", "atlas")
0165 if thrInputNum is None:
0166 thrInputNum = 100
0167 thrInputSizeFrac = self.taskBufferIF.getConfigValue(self.msgType, "INPUT_SIZE_FRACTION", "jedi", "atlas")
0168 if thrInputSizeFrac is None:
0169 thrInputSizeFrac = 10
0170 thrInputSizeFrac = float(thrInputSizeFrac) / 100
0171 thrInputNumFrac = self.taskBufferIF.getConfigValue(self.msgType, "INPUT_NUM_FRACTION", "jedi", "atlas")
0172 if thrInputNumFrac is None:
0173 thrInputNumFrac = 10
0174 thrInputNumFrac = float(thrInputNumFrac) / 100
0175 cutOffRW = 50
0176 negWeightTape = 0.001
0177 minIoIntensityWithLD = self.taskBufferIF.getConfigValue(self.msgType, "MIN_IO_INTENSITY_WITH_LOCAL_DATA", "jedi", "atlas")
0178 if minIoIntensityWithLD is None:
0179 minIoIntensityWithLD = 200
0180 minInputSizeWithLD = self.taskBufferIF.getConfigValue(self.msgType, "MIN_INPUT_SIZE_WITH_LOCAL_DATA", "jedi", "atlas")
0181 if minInputSizeWithLD is None:
0182 minInputSizeWithLD = 10000
0183 maxTaskPrioWithLD = self.taskBufferIF.getConfigValue(self.msgType, "MAX_TASK_PRIO_WITH_LOCAL_DATA", "jedi", "atlas")
0184 if maxTaskPrioWithLD is None:
0185 maxTaskPrioWithLD = 800
0186
0187 data_location_check_period = self.taskBufferIF.getConfigValue(self.msgType, "DATA_LOCATION_CHECK_PERIOD", "jedi", "atlas")
0188 if not data_location_check_period:
0189 data_location_check_period = 7
0190
0191 lastJediTaskID = None
0192 siteMapper = self.taskBufferIF.get_site_mapper()
0193 while True:
0194 try:
0195 taskInputList = self.inputList.get(1)
0196
0197 if len(taskInputList) == 0:
0198 self.logger.debug(f"{self.__class__.__name__} terminating after processing {self.numTasks} tasks since no more inputs ")
0199 return
0200
0201 for taskSpec, inputChunk in taskInputList:
0202 lastJediTaskID = taskSpec.jediTaskID
0203
0204 tmpLog = MsgWrapper(self.logger, f"<jediTaskID={taskSpec.jediTaskID}>", monToken=f"jediTaskID={taskSpec.jediTaskID}")
0205 tmpLog.debug("start")
0206 tmpLog.info(f"thrInputSize:{thrInputSize} thrInputNum:{thrInputNum} thrInputSizeFrac:{thrInputSizeFrac} thrInputNumFrac:{thrInputNumFrac}")
0207 tmpLog.info(f"full-chain:{taskSpec.get_full_chain()} ioIntensity={taskSpec.ioIntensity}")
0208
0209 try:
0210 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0211 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0212 except Exception:
0213 tmpLog.error("failed to read task params")
0214 taskSpec.resetChangedList()
0215 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0216 self.taskBufferIF.updateTask_JEDI(
0217 taskSpec, {"jediTaskID": taskSpec.jediTaskID}, oldStatus=["assigning"], updateDEFT=False, setFrozenTime=False
0218 )
0219 continue
0220
0221 taskRW = self.taskBufferIF.calculateTaskWorldRW_JEDI(taskSpec.jediTaskID)
0222
0223 nucleusList = copy.copy(siteMapper.nuclei)
0224 if taskSpec.get_full_chain():
0225
0226 for tmpNucleus, tmpNucleusSpec in siteMapper.satellites.items():
0227 if tmpNucleusSpec.get_bare_nucleus_mode():
0228 nucleusList[tmpNucleus] = tmpNucleusSpec
0229
0230 self.init_summary_list("Task brokerage summary", None, nucleusList)
0231 if taskSpec.nucleus in siteMapper.nuclei:
0232 candidateNucleus = taskSpec.nucleus
0233 elif taskSpec.nucleus in siteMapper.satellites:
0234 nucleusList = siteMapper.satellites
0235 candidateNucleus = taskSpec.nucleus
0236 else:
0237 tmpLog.info(f"got {len(nucleusList)} candidates")
0238
0239
0240 dataset_availability_info = {}
0241 to_skip = False
0242 for datasetSpec in inputChunk.getDatasets():
0243
0244 if datasetSpec.isPseudo():
0245 continue
0246
0247 if DataServiceUtils.isDBR(datasetSpec.datasetName):
0248 continue
0249
0250 if DataServiceUtils.getDatasetType(datasetSpec.datasetName) in datasetTypeToSkipCheck:
0251 continue
0252
0253 if taskParamMap.get("taskBrokerOnMaster") is True and not datasetSpec.isMaster():
0254 continue
0255
0256 if datasetSpec.isMaster() and not taskSpec.inputPreStaging():
0257 deepScan = True
0258 else:
0259 deepScan = False
0260
0261 tmp_status, tmp_data_map, remote_source_available = AtlasBrokerUtils.getNucleiWithData(
0262 siteMapper, self.ddmIF, datasetSpec.datasetName, list(nucleusList.keys()), deepScan
0263 )
0264 if tmp_status != Interaction.SC_SUCCEEDED:
0265 self.post_process_for_error(taskSpec, tmpLog, f"failed to get nuclei where data is available, since {tmp_data_map}", False)
0266 to_skip = True
0267 break
0268
0269 for tmpNucleus, tmpVals in tmp_data_map.items():
0270 if tmpNucleus not in dataset_availability_info:
0271 dataset_availability_info[tmpNucleus] = tmpVals
0272 else:
0273 dataset_availability_info[tmpNucleus] = dict(
0274 (k, v + tmpVals[k]) for (k, v) in dataset_availability_info[tmpNucleus].items() if not isinstance(v, bool)
0275 )
0276
0277 if tmpVals.get("can_be_remote_source"):
0278 dataset_availability_info[tmpNucleus]["can_be_remote_source"] = True
0279 if not remote_source_available:
0280 self.post_process_for_error(
0281 taskSpec, tmpLog, f"dataset={datasetSpec.datasetName} is incomplete/missing at online endpoints with read_wan=ON", False
0282 )
0283 to_skip = True
0284 break
0285 if to_skip:
0286 continue
0287
0288
0289 newNucleusList = {}
0290 oldNucleusList = copy.copy(nucleusList)
0291 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0292 if tmpNucleusSpec.state not in ["ACTIVE"]:
0293 tmpLog.info(f" skip nucleus={tmpNucleus} due to status={tmpNucleusSpec.state} criteria=-status")
0294 else:
0295 newNucleusList[tmpNucleus] = tmpNucleusSpec
0296 nucleusList = newNucleusList
0297 tmpLog.info(f"{len(nucleusList)} candidates passed status check")
0298 self.add_summary_message(oldNucleusList, nucleusList, "status check")
0299 if not nucleusList:
0300 self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0301 continue
0302
0303
0304 t1Weight = taskSpec.getT1Weight()
0305 if t1Weight < 0:
0306 tmpLog.info("skip transfer backlog check due to negative T1Weight")
0307 else:
0308 newNucleusList = {}
0309 oldNucleusList = copy.copy(nucleusList)
0310 backlogged_nuclei = self.taskBufferIF.getBackloggedNuclei()
0311 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0312 if tmpNucleus in backlogged_nuclei:
0313 tmpLog.info(f" skip nucleus={tmpNucleus} due to long transfer backlog criteria=-transfer_backlog")
0314 else:
0315 newNucleusList[tmpNucleus] = tmpNucleusSpec
0316 nucleusList = newNucleusList
0317 tmpLog.info(f"{len(nucleusList)} candidates passed transfer backlog check")
0318 self.add_summary_message(oldNucleusList, nucleusList, "transfer backlog check")
0319 if not nucleusList:
0320 self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0321 continue
0322
0323
0324 fractionFreeSpace = {}
0325 newNucleusList = {}
0326 oldNucleusList = copy.copy(nucleusList)
0327 tmpStat, tmpDatasetSpecList = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.jediTaskID, ["output", "log"])
0328 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0329 to_skip = False
0330 origNucleusSpec = tmpNucleusSpec
0331 for tmpDatasetSpec in tmpDatasetSpecList:
0332 tmpNucleusSpec = origNucleusSpec
0333
0334 if taskSpec.get_full_chain() and tmpNucleusSpec.get_secondary_nucleus():
0335 tmpNucleusSpec = siteMapper.getNucleus(tmpNucleusSpec.get_secondary_nucleus())
0336
0337 if DataServiceUtils.getDistributedDestination(tmpDatasetSpec.storageToken) is not None:
0338 continue
0339
0340 tmpEP = tmpNucleusSpec.getAssociatedEndpoint(tmpDatasetSpec.storageToken)
0341 tmp_ddm_endpoint_name = tmpEP["ddm_endpoint_name"]
0342 if tmpEP is None:
0343 tmpLog.info(f" skip nucleus={tmpNucleus} since no endpoint with {tmpDatasetSpec.storageToken} criteria=-match")
0344 to_skip = True
0345 break
0346
0347 read_wan_status = tmpEP["detailed_status"].get("read_wan")
0348 if read_wan_status in ["OFF", "TEST"]:
0349 tmpLog.info(
0350 f" skip nucleus={tmpNucleus} since {tmp_ddm_endpoint_name} has read_wan={read_wan_status} criteria=-source_blacklist"
0351 )
0352 to_skip = True
0353 break
0354 write_wan_status = tmpEP["detailed_status"].get("write_wan")
0355 if write_wan_status in ["OFF", "TEST"]:
0356 tmpLog.info(
0357 f" skip nucleus={tmpNucleus} since {tmp_ddm_endpoint_name} has write_wan={read_wan_status} criteria=-destination_blacklist"
0358 )
0359 to_skip = True
0360 break
0361
0362 tmpSpaceSize = 0
0363 if tmpEP["space_free"]:
0364 tmpSpaceSize += tmpEP["space_free"]
0365 if tmpEP["space_expired"]:
0366 tmpSpaceSize += tmpEP["space_expired"]
0367 tmpSpaceToUse = 0
0368 if tmpNucleus in self.fullRW:
0369
0370 tmpSpaceToUse = int(self.fullRW[tmpNucleus] / 10 / 24 / 3600 * 0.25)
0371 if tmpSpaceSize - tmpSpaceToUse < diskThreshold:
0372 tmpLog.info(
0373 " skip nucleus={0} since disk shortage (free {1} TB - reserved {2} TB < thr {3} TB) at endpoint {4} criteria=-space".format(
0374 tmpNucleus, tmpSpaceSize // 1024, tmpSpaceToUse // 1024, diskThreshold // 1024, tmpEP["ddm_endpoint_name"]
0375 )
0376 )
0377 to_skip = True
0378 break
0379
0380 if tmpNucleus not in fractionFreeSpace:
0381 fractionFreeSpace[tmpNucleus] = {"total": 0, "free": 0}
0382 try:
0383 tmpOld = float(fractionFreeSpace[tmpNucleus]["free"]) / float(fractionFreeSpace[tmpNucleus]["total"])
0384 except Exception:
0385 tmpOld = None
0386 try:
0387 tmpNew = float(tmpSpaceSize - tmpSpaceToUse) / float(tmpEP["space_total"])
0388 except Exception:
0389 tmpNew = None
0390 if tmpNew is not None and (tmpOld is None or tmpNew < tmpOld):
0391 fractionFreeSpace[tmpNucleus] = {"total": tmpEP["space_total"], "free": tmpSpaceSize - tmpSpaceToUse}
0392 if not to_skip:
0393 newNucleusList[tmpNucleus] = origNucleusSpec
0394 nucleusList = newNucleusList
0395 tmpLog.info(f"{len(nucleusList)} candidates passed endpoint check with DISK_THRESHOLD={diskThreshold // 1024} TB")
0396 self.add_summary_message(oldNucleusList, nucleusList, "storage endpoint check")
0397 if not nucleusList:
0398 self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0399 continue
0400
0401
0402 newNucleusList = {}
0403 oldNucleusList = copy.copy(nucleusList)
0404
0405 tmpSiteList = []
0406 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0407 tmpSiteList += tmpNucleusSpec.allPandaSites
0408 tmpSiteList = list(set(tmpSiteList))
0409 tmpLog.debug("===== start for job check")
0410 jobBroker = AtlasProdJobBroker(self.ddmIF, self.taskBufferIF)
0411 tmp_status, tmp_data_map = jobBroker.doBrokerage(taskSpec, taskSpec.cloud, inputChunk, None, True, tmpSiteList, tmpLog)
0412 tmpLog.debug("===== done for job check")
0413 if tmp_status != Interaction.SC_SUCCEEDED:
0414 tmpLog.error("no sites can run jobs")
0415 taskSpec.resetChangedList()
0416 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0417 self.taskBufferIF.updateTask_JEDI(
0418 taskSpec, {"jediTaskID": taskSpec.jediTaskID}, oldStatus=["assigning"], updateDEFT=False, setFrozenTime=False
0419 )
0420 continue
0421 okNuclei = set()
0422 for tmpSite in tmp_data_map:
0423 siteSpec = siteMapper.getSite(tmpSite)
0424 okNuclei.add(siteSpec.pandasite)
0425 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0426 if tmpNucleus in okNuclei:
0427 newNucleusList[tmpNucleus] = tmpNucleusSpec
0428 else:
0429 tmpLog.info(f" skip nucleus={tmpNucleus} due to missing ability to run jobs criteria=-job")
0430 nucleusList = newNucleusList
0431 tmpLog.info(f"{len(nucleusList)} candidates passed job check")
0432 self.add_summary_message(oldNucleusList, nucleusList, "job check")
0433 if not nucleusList:
0434 self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0435 continue
0436
0437
0438 time_now = naive_utcnow()
0439 if taskSpec.frozenTime and time_now - taskSpec.frozenTime > datetime.timedelta(days=data_location_check_period):
0440 tmpLog.info(f"disabled data check since the task was in assigning for " f"{data_location_check_period} days")
0441 else:
0442 dataset_availability_info = {k: v for k, v in dataset_availability_info.items() if k in nucleusList}
0443 if dataset_availability_info != {}:
0444 newNucleusList = {}
0445 oldNucleusList = copy.copy(nucleusList)
0446
0447 skipMsgList = []
0448 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0449 if taskSpec.inputPreStaging() and dataset_availability_info[tmpNucleus]["ava_num_any"] > 0:
0450
0451 newNucleusList[tmpNucleus] = tmpNucleusSpec
0452 elif (
0453 dataset_availability_info[tmpNucleus]["tot_size"] > thrInputSize
0454 and dataset_availability_info[tmpNucleus]["ava_size_any"]
0455 < dataset_availability_info[tmpNucleus]["tot_size"] * thrInputSizeFrac
0456 ):
0457 tmpMsg = " skip nucleus={0} due to insufficient input size {1}B < {2}*{3} criteria=-insize".format(
0458 tmpNucleus,
0459 dataset_availability_info[tmpNucleus]["ava_size_any"],
0460 dataset_availability_info[tmpNucleus]["tot_size"],
0461 thrInputSizeFrac,
0462 )
0463 skipMsgList.append(tmpMsg)
0464 elif (
0465 dataset_availability_info[tmpNucleus]["tot_num"] > thrInputNum
0466 and dataset_availability_info[tmpNucleus]["ava_num_any"]
0467 < dataset_availability_info[tmpNucleus]["tot_num"] * thrInputNumFrac
0468 ):
0469 tmpMsg = " skip nucleus={0} due to short number of input files {1} < {2}*{3} criteria=-innum".format(
0470 tmpNucleus,
0471 dataset_availability_info[tmpNucleus]["ava_num_any"],
0472 dataset_availability_info[tmpNucleus]["tot_num"],
0473 thrInputNumFrac,
0474 )
0475 skipMsgList.append(tmpMsg)
0476 else:
0477 newNucleusList[tmpNucleus] = tmpNucleusSpec
0478 totInputSize = list(dataset_availability_info.values())[0]["tot_size"] / 1024 / 1024 / 1024
0479 data_locality_check_str = (
0480 f"(ioIntensity ({taskSpec.ioIntensity}) is None or less than {minIoIntensityWithLD} kBPerS "
0481 f"and input size ({int(totInputSize)} GB) is less than "
0482 f"{minInputSizeWithLD}) "
0483 f"or (task.currentPriority ("
0484 f"{taskSpec.currentPriority}) is higher than or equal to {maxTaskPrioWithLD}) "
0485 f"or the task is in assigning for {data_location_check_period} days"
0486 )
0487 if len(newNucleusList) > 0:
0488 nucleusList = newNucleusList
0489 for tmpMsg in skipMsgList:
0490 tmpLog.info(tmpMsg)
0491 elif (
0492 (taskSpec.ioIntensity is None or taskSpec.ioIntensity <= minIoIntensityWithLD) and totInputSize <= minInputSizeWithLD
0493 ) or taskSpec.currentPriority >= maxTaskPrioWithLD:
0494 dataset_availability_info = {}
0495 tmpLog.info(f" disable data locality check since no nucleus has input data, {data_locality_check_str}")
0496 else:
0497
0498 nucleusList = newNucleusList
0499 for tmpMsg in skipMsgList:
0500 tmpLog.info(tmpMsg)
0501 tmpLog.info(f" the following conditions required to disable data locality check: {data_locality_check_str}")
0502 tmpLog.info(f"{len(nucleusList)} candidates passed data check")
0503 self.add_summary_message(oldNucleusList, nucleusList, "data check")
0504 if not nucleusList:
0505 self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0506 continue
0507
0508
0509 newNucleusList = {}
0510 oldNucleusList = copy.copy(nucleusList)
0511 parent_full_chain = False
0512 if taskSpec.get_full_chain() and taskSpec.jediTaskID != taskSpec.parent_tid:
0513 tmpStat, parentTaskSpec = self.taskBufferIF.getTaskWithID_JEDI(taskSpec.parent_tid, False)
0514
0515 if not tmpStat or parentTaskSpec is None:
0516 self.post_process_for_error(taskSpec, tmpLog, f"failed to get parent taskSpec with jediTaskID={taskSpec.parent_tid}", False)
0517 continue
0518 parent_full_chain = parentTaskSpec.check_full_chain_with_nucleus(siteMapper.getNucleus(parentTaskSpec.nucleus))
0519 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0520
0521 if tmpNucleusSpec.get_bare_nucleus_mode() == "only" and taskSpec.get_full_chain() is None:
0522 tmpLog.info(f" skip nucleus={tmpNucleus} since only full-chain tasks are allowed criteria=-full_chain")
0523 continue
0524
0525 if tmpNucleusSpec.get_bare_nucleus_mode() is None and (
0526 taskSpec.check_full_chain_with_mode("only") or taskSpec.check_full_chain_with_mode("require")
0527 ):
0528 tmpLog.info(f" skip nucleus={tmpNucleus} due to lack of full-chain capability criteria=-full_chain")
0529 continue
0530
0531 if taskSpec.get_full_chain() and parent_full_chain:
0532 if tmpNucleus != parentTaskSpec.nucleus:
0533 tmpLog.info(f" skip nucleus={tmpNucleus} since the parent of the full-chain ran elsewhere criteria=-full_chain")
0534 continue
0535 newNucleusList[tmpNucleus] = tmpNucleusSpec
0536 nucleusList = newNucleusList
0537 tmpLog.info(f"{len(nucleusList)} candidates passed full-chain check")
0538 self.add_summary_message(oldNucleusList, nucleusList, "full-chain check")
0539 if not nucleusList:
0540 self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0541 continue
0542 self.dump_summary(tmpLog, nucleusList)
0543
0544
0545 self.prioRW.acquire()
0546 nucleusRW = self.prioRW[taskSpec.currentPriority]
0547 self.prioRW.release()
0548 totalWeight = 0
0549 nucleusweights = []
0550 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0551 if tmpNucleus not in nucleusRW:
0552 nucleusRW[tmpNucleus] = 0
0553 wStr = "1"
0554
0555 if tmpNucleus in nucleusRW and nucleusRW[tmpNucleus] and nucleusRW[tmpNucleus] >= cutOffRW:
0556 weight = 1 / float(nucleusRW[tmpNucleus])
0557 wStr += f"/( RW={nucleusRW[tmpNucleus]} )"
0558 else:
0559 weight = 1
0560 wStr += f"/(1 : RW={nucleusRW[tmpNucleus]}<{cutOffRW})"
0561
0562 if dataset_availability_info != {}:
0563 if dataset_availability_info[tmpNucleus]["tot_size"] > 0:
0564
0565 if taskSpec.ioIntensity is None or taskSpec.ioIntensity > minIoIntensityWithLD:
0566 weight *= float(dataset_availability_info[tmpNucleus]["ava_size_any"])
0567 weight /= float(dataset_availability_info[tmpNucleus]["tot_size"])
0568 wStr += f"* ( available_input_size_DISKTAPE={dataset_availability_info[tmpNucleus]['ava_size_any']} )"
0569 wStr += f"/ ( total_input_size={dataset_availability_info[tmpNucleus]['tot_size']} )"
0570
0571 if dataset_availability_info[tmpNucleus]["ava_size_any"] > dataset_availability_info[tmpNucleus]["ava_size_disk"]:
0572 weight *= negWeightTape
0573 wStr += f"*( weight_TAPE={negWeightTape} )"
0574
0575 if tmpNucleus in fractionFreeSpace:
0576 try:
0577 tmpFrac = float(fractionFreeSpace[tmpNucleus]["free"]) / float(fractionFreeSpace[tmpNucleus]["total"])
0578 weight *= tmpFrac
0579 wStr += f"*( free_space={fractionFreeSpace[tmpNucleus]['free'] // 1024} TB )/( total_space={fractionFreeSpace[tmpNucleus]['total'] // 1024} TB )"
0580 free_disk_in_tb = fractionFreeSpace[tmpNucleus]["free"] // 1024
0581 free_disk_term = min(free_disk_cutoff, free_disk_in_tb)
0582 weight *= free_disk_term
0583 wStr += f"*min( free_space={free_disk_in_tb} TB, FREE_DISK_CUTOFF={free_disk_cutoff} TB )"
0584 except Exception:
0585 pass
0586 tmpLog.info(f" use nucleus={tmpNucleus} weight={weight} {wStr} criteria=+use")
0587 totalWeight += weight
0588 nucleusweights.append((tmpNucleus, weight))
0589
0590
0591 tgtWeight = random.uniform(0, totalWeight)
0592 candidateNucleus = None
0593 for tmpNucleus, weight in nucleusweights:
0594 tgtWeight -= weight
0595 if tgtWeight <= 0:
0596 candidateNucleus = tmpNucleus
0597 break
0598 if candidateNucleus is None:
0599 candidateNucleus = nucleusweights[-1][0]
0600
0601
0602 nucleusSpec = nucleusList[candidateNucleus]
0603
0604 tmpStat, tmpDatasetSpecs = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.jediTaskID, ["output", "log"])
0605
0606 retMap = {taskSpec.jediTaskID: AtlasBrokerUtils.getDictToSetNucleus(nucleusSpec, tmpDatasetSpecs)}
0607 tmp_ret = self.taskBufferIF.setCloudToTasks_JEDI(retMap)
0608 tmpLog.info(f" set nucleus={candidateNucleus} with {tmp_ret} criteria=+set")
0609 if tmp_ret:
0610 tmpMsg = "set task_status=ready"
0611 tmpLog.sendMsg(tmpMsg, self.msgType)
0612
0613 self.prioRW.acquire()
0614 for prio, rwMap in self.prioRW.items():
0615 if prio > taskSpec.currentPriority:
0616 continue
0617 if candidateNucleus in rwMap:
0618 if not rwMap[candidateNucleus]:
0619 rwMap[candidateNucleus] = 0
0620 rwMap[candidateNucleus] += taskRW
0621 else:
0622 rwMap[candidateNucleus] = taskRW
0623 self.prioRW.release()
0624 except Exception:
0625 errtype, errvalue = sys.exc_info()[:2]
0626 errMsg = f"{self.__class__.__name__}.runImpl() failed with {errtype.__name__} {errvalue} "
0627 errMsg += f"lastJediTaskID={lastJediTaskID} "
0628 errMsg += traceback.format_exc()
0629 logger.error(errMsg)