Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:57

0001 import copy
0002 import datetime
0003 import math
0004 import random
0005 import sys
0006 import traceback
0007 
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 from pandacommon.pandautils.PandaUtils import naive_utcnow
0010 
0011 from pandajedi.jedicore import Interaction
0012 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0013 from pandajedi.jedicore.ThreadUtils import (
0014     ListWithLock,
0015     MapWithLock,
0016     ThreadPool,
0017     WorkerThread,
0018 )
0019 from pandajedi.jedirefine import RefinerUtils
0020 from pandaserver.dataservice import DataServiceUtils
0021 
0022 from . import AtlasBrokerUtils
0023 from .AtlasProdJobBroker import AtlasProdJobBroker
0024 from .TaskBrokerBase import TaskBrokerBase
0025 
0026 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0027 
0028 
0029 # brokerage for ATLAS production
0030 class AtlasProdTaskBroker(TaskBrokerBase):
0031     # constructor
0032     def __init__(self, taskBufferIF, ddmIF):
0033         TaskBrokerBase.__init__(self, taskBufferIF, ddmIF)
0034 
0035     # main to check
0036     def doCheck(self, taskSpecList):
0037         return self.SC_SUCCEEDED, {}
0038 
0039     # main to assign
0040     def doBrokerage(self, inputList, vo, prodSourceLabel, workQueue, resource_name):
0041         # list with a lock
0042         inputListWorld = ListWithLock([])
0043 
0044         # make logger
0045         tmpLog = MsgWrapper(logger)
0046         tmpLog.debug("start doBrokerage")
0047 
0048         # return for failure
0049         retTmpError = self.SC_FAILED
0050         tmpLog.debug(f"vo={vo} label={prodSourceLabel} queue={workQueue.queue_name} resource_name={resource_name} nTasks={len(inputList)}")
0051 
0052         # build the map with Remaining Work by priority
0053         allRwMap = {}
0054 
0055         # loop over all tasks and build the list of WORLD tasks. Nowadays all tasks are WORLD
0056         for tmpJediTaskID, tmpInputList in inputList:
0057             for taskSpec, cloudName, inputChunk in tmpInputList:
0058                 if taskSpec.useWorldCloud():
0059                     inputListWorld.append((taskSpec, inputChunk))
0060 
0061         # broker WORLD tasks
0062         if inputListWorld:
0063             # thread pool
0064             threadPool = ThreadPool()
0065             # get full RW for WORLD
0066             fullRWs = self.taskBufferIF.calculateWorldRWwithPrio_JEDI(vo, prodSourceLabel, None, None)
0067             if fullRWs is None:
0068                 tmpLog.error("failed to calculate full WORLD RW")
0069                 return retTmpError
0070             # get RW per priority
0071             for taskSpec, inputChunk in inputListWorld:
0072                 if taskSpec.currentPriority not in allRwMap:
0073                     tmpRW = self.taskBufferIF.calculateWorldRWwithPrio_JEDI(vo, prodSourceLabel, workQueue, taskSpec.currentPriority)
0074                     if tmpRW is None:
0075                         tmpLog.error(f"failed to calculate RW with prio={taskSpec.currentPriority}")
0076                         return retTmpError
0077                     allRwMap[taskSpec.currentPriority] = tmpRW
0078 
0079             # live counter for RWs
0080             liveCounter = MapWithLock(allRwMap)
0081             # make workers
0082             ddmIF = self.ddmIF.getInterface(vo)
0083             for iWorker in range(4):
0084                 thr = AtlasProdTaskBrokerThread(inputListWorld, threadPool, self.taskBufferIF, ddmIF, fullRWs, liveCounter, workQueue)
0085                 thr.start()
0086             threadPool.join(60 * 10)
0087         # return
0088         tmpLog.debug("doBrokerage done")
0089         return self.SC_SUCCEEDED
0090 
0091 
0092 # thread for real worker
0093 class AtlasProdTaskBrokerThread(WorkerThread):
0094     # constructor
0095     def __init__(self, inputList, threadPool, taskbufferIF, ddmIF, fullRW, prioRW, workQueue):
0096         # initialize woker with no semaphore
0097         WorkerThread.__init__(self, None, threadPool, logger)
0098         # attributres
0099         self.inputList = inputList
0100         self.taskBufferIF = taskbufferIF
0101         self.ddmIF = ddmIF
0102         self.msgType = "taskbrokerage"
0103         self.fullRW = fullRW
0104         self.prioRW = prioRW
0105         self.numTasks = 0
0106         self.workQueue = workQueue
0107         self.summaryList = None
0108 
0109     # init summary list
0110     def init_summary_list(self, header, comment, initial_list):
0111         self.summaryList = []
0112         self.summaryList.append(f"===== {header} =====")
0113         if comment:
0114             self.summaryList.append(comment)
0115         self.summaryList.append(f"the number of initial candidates: {len(initial_list)}")
0116 
0117     # dump summary
0118     def dump_summary(self, tmp_log, final_candidates=None):
0119         if not self.summaryList:
0120             return
0121         tmp_log.info("")
0122         for m in self.summaryList:
0123             tmp_log.info(m)
0124         if not final_candidates:
0125             final_candidates = []
0126         tmp_log.info(f"the number of final candidates: {len(final_candidates)}")
0127         tmp_log.info("")
0128 
0129     # make summary
0130     def add_summary_message(self, old_list, new_list, message):
0131         if old_list and len(old_list) != len(new_list):
0132             red = int(math.ceil(((len(old_list) - len(new_list)) * 100) / len(old_list)))
0133             self.summaryList.append(f"{len(old_list):>5} -> {len(new_list):>3} candidates, {red:>3}% cut : {message}")
0134 
0135     # post-process for errors
0136     def post_process_for_error(self, task_spec, tmp_log, msg, dump_summary=True):
0137         if dump_summary:
0138             self.dump_summary(tmp_log)
0139         tmp_log.error(msg)
0140         task_spec.resetChangedList()
0141         task_spec.setErrDiag(tmp_log.uploadLog(task_spec.jediTaskID))
0142         self.taskBufferIF.updateTask_JEDI(task_spec, {"jediTaskID": task_spec.jediTaskID}, oldStatus=["assigning"], updateDEFT=False, setFrozenTime=False)
0143 
0144     # main function
0145     def runImpl(self):
0146         # cutoff for disk in TB
0147         diskThreshold = self.taskBufferIF.getConfigValue(self.msgType, f"DISK_THRESHOLD_{self.workQueue.queue_name}", "jedi", "atlas")
0148         if diskThreshold is None:
0149             diskThreshold = self.taskBufferIF.getConfigValue(self.msgType, "DISK_THRESHOLD", "jedi", "atlas")
0150             if diskThreshold is None:
0151                 diskThreshold = 100
0152         diskThreshold *= 1024
0153         # cutoff for free disk in TB
0154         free_disk_cutoff = self.taskBufferIF.getConfigValue(self.msgType, f"FREE_DISK_CUTOFF", "jedi", "atlas")
0155         if free_disk_cutoff is None:
0156             free_disk_cutoff = 1000
0157         # dataset type to ignore file availability check
0158         datasetTypeToSkipCheck = ["log"]
0159         # thresholds for data availability check
0160         thrInputSize = self.taskBufferIF.getConfigValue(self.msgType, "INPUT_SIZE_THRESHOLD", "jedi", "atlas")
0161         if thrInputSize is None:
0162             thrInputSize = 1
0163         thrInputSize *= 1024 * 1024 * 1024
0164         thrInputNum = self.taskBufferIF.getConfigValue(self.msgType, "INPUT_NUM_THRESHOLD", "jedi", "atlas")
0165         if thrInputNum is None:
0166             thrInputNum = 100
0167         thrInputSizeFrac = self.taskBufferIF.getConfigValue(self.msgType, "INPUT_SIZE_FRACTION", "jedi", "atlas")
0168         if thrInputSizeFrac is None:
0169             thrInputSizeFrac = 10
0170         thrInputSizeFrac = float(thrInputSizeFrac) / 100
0171         thrInputNumFrac = self.taskBufferIF.getConfigValue(self.msgType, "INPUT_NUM_FRACTION", "jedi", "atlas")
0172         if thrInputNumFrac is None:
0173             thrInputNumFrac = 10
0174         thrInputNumFrac = float(thrInputNumFrac) / 100
0175         cutOffRW = 50
0176         negWeightTape = 0.001
0177         minIoIntensityWithLD = self.taskBufferIF.getConfigValue(self.msgType, "MIN_IO_INTENSITY_WITH_LOCAL_DATA", "jedi", "atlas")
0178         if minIoIntensityWithLD is None:
0179             minIoIntensityWithLD = 200
0180         minInputSizeWithLD = self.taskBufferIF.getConfigValue(self.msgType, "MIN_INPUT_SIZE_WITH_LOCAL_DATA", "jedi", "atlas")
0181         if minInputSizeWithLD is None:
0182             minInputSizeWithLD = 10000
0183         maxTaskPrioWithLD = self.taskBufferIF.getConfigValue(self.msgType, "MAX_TASK_PRIO_WITH_LOCAL_DATA", "jedi", "atlas")
0184         if maxTaskPrioWithLD is None:
0185             maxTaskPrioWithLD = 800
0186         # ignore data locality once the period passes (in days)
0187         data_location_check_period = self.taskBufferIF.getConfigValue(self.msgType, "DATA_LOCATION_CHECK_PERIOD", "jedi", "atlas")
0188         if not data_location_check_period:
0189             data_location_check_period = 7
0190         # main
0191         lastJediTaskID = None
0192         siteMapper = self.taskBufferIF.get_site_mapper()
0193         while True:
0194             try:
0195                 taskInputList = self.inputList.get(1)
0196                 # no more datasets
0197                 if len(taskInputList) == 0:
0198                     self.logger.debug(f"{self.__class__.__name__} terminating after processing {self.numTasks} tasks since no more inputs ")
0199                     return
0200                 # loop over all tasks
0201                 for taskSpec, inputChunk in taskInputList:
0202                     lastJediTaskID = taskSpec.jediTaskID
0203                     # make logger
0204                     tmpLog = MsgWrapper(self.logger, f"<jediTaskID={taskSpec.jediTaskID}>", monToken=f"jediTaskID={taskSpec.jediTaskID}")
0205                     tmpLog.debug("start")
0206                     tmpLog.info(f"thrInputSize:{thrInputSize} thrInputNum:{thrInputNum} thrInputSizeFrac:{thrInputSizeFrac} thrInputNumFrac:{thrInputNumFrac}")
0207                     tmpLog.info(f"full-chain:{taskSpec.get_full_chain()} ioIntensity={taskSpec.ioIntensity}")
0208                     # read task parameters
0209                     try:
0210                         taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0211                         taskParamMap = RefinerUtils.decodeJSON(taskParam)
0212                     except Exception:
0213                         tmpLog.error("failed to read task params")
0214                         taskSpec.resetChangedList()
0215                         taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0216                         self.taskBufferIF.updateTask_JEDI(
0217                             taskSpec, {"jediTaskID": taskSpec.jediTaskID}, oldStatus=["assigning"], updateDEFT=False, setFrozenTime=False
0218                         )
0219                         continue
0220                     # RW
0221                     taskRW = self.taskBufferIF.calculateTaskWorldRW_JEDI(taskSpec.jediTaskID)
0222                     # get nuclei
0223                     nucleusList = copy.copy(siteMapper.nuclei)
0224                     if taskSpec.get_full_chain():
0225                         # use satellites with bareNucleus as nuclei for full chain
0226                         for tmpNucleus, tmpNucleusSpec in siteMapper.satellites.items():
0227                             if tmpNucleusSpec.get_bare_nucleus_mode():
0228                                 nucleusList[tmpNucleus] = tmpNucleusSpec
0229                     # init summary list
0230                     self.init_summary_list("Task brokerage summary", None, nucleusList)
0231                     if taskSpec.nucleus in siteMapper.nuclei:
0232                         candidateNucleus = taskSpec.nucleus
0233                     elif taskSpec.nucleus in siteMapper.satellites:
0234                         nucleusList = siteMapper.satellites
0235                         candidateNucleus = taskSpec.nucleus
0236                     else:
0237                         tmpLog.info(f"got {len(nucleusList)} candidates")
0238                         ######################################
0239                         # check data
0240                         dataset_availability_info = {}
0241                         to_skip = False
0242                         for datasetSpec in inputChunk.getDatasets():
0243                             # only for real datasets
0244                             if datasetSpec.isPseudo():
0245                                 continue
0246                             # ignore DBR
0247                             if DataServiceUtils.isDBR(datasetSpec.datasetName):
0248                                 continue
0249                             # skip locality check
0250                             if DataServiceUtils.getDatasetType(datasetSpec.datasetName) in datasetTypeToSkipCheck:
0251                                 continue
0252                             # primary only
0253                             if taskParamMap.get("taskBrokerOnMaster") is True and not datasetSpec.isMaster():
0254                                 continue
0255                             # use deep scan for primary dataset unless data carousel
0256                             if datasetSpec.isMaster() and not taskSpec.inputPreStaging():
0257                                 deepScan = True
0258                             else:
0259                                 deepScan = False
0260                             # get nuclei where data is available
0261                             tmp_status, tmp_data_map, remote_source_available = AtlasBrokerUtils.getNucleiWithData(
0262                                 siteMapper, self.ddmIF, datasetSpec.datasetName, list(nucleusList.keys()), deepScan
0263                             )
0264                             if tmp_status != Interaction.SC_SUCCEEDED:
0265                                 self.post_process_for_error(taskSpec, tmpLog, f"failed to get nuclei where data is available, since {tmp_data_map}", False)
0266                                 to_skip = True
0267                                 break
0268                             # sum
0269                             for tmpNucleus, tmpVals in tmp_data_map.items():
0270                                 if tmpNucleus not in dataset_availability_info:
0271                                     dataset_availability_info[tmpNucleus] = tmpVals
0272                                 else:
0273                                     dataset_availability_info[tmpNucleus] = dict(
0274                                         (k, v + tmpVals[k]) for (k, v) in dataset_availability_info[tmpNucleus].items() if not isinstance(v, bool)
0275                                     )
0276                                 # set remote_source_available to True if any is readable over WAN
0277                                 if tmpVals.get("can_be_remote_source"):
0278                                     dataset_availability_info[tmpNucleus]["can_be_remote_source"] = True
0279                             if not remote_source_available:
0280                                 self.post_process_for_error(
0281                                     taskSpec, tmpLog, f"dataset={datasetSpec.datasetName} is incomplete/missing at online endpoints with read_wan=ON", False
0282                                 )
0283                                 to_skip = True
0284                                 break
0285                         if to_skip:
0286                             continue
0287                         ######################################
0288                         # check status
0289                         newNucleusList = {}
0290                         oldNucleusList = copy.copy(nucleusList)
0291                         for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0292                             if tmpNucleusSpec.state not in ["ACTIVE"]:
0293                                 tmpLog.info(f"  skip nucleus={tmpNucleus} due to status={tmpNucleusSpec.state} criteria=-status")
0294                             else:
0295                                 newNucleusList[tmpNucleus] = tmpNucleusSpec
0296                         nucleusList = newNucleusList
0297                         tmpLog.info(f"{len(nucleusList)} candidates passed status check")
0298                         self.add_summary_message(oldNucleusList, nucleusList, "status check")
0299                         if not nucleusList:
0300                             self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0301                             continue
0302                         ######################################
0303                         # check status of transfer backlog
0304                         t1Weight = taskSpec.getT1Weight()
0305                         if t1Weight < 0:
0306                             tmpLog.info("skip transfer backlog check due to negative T1Weight")
0307                         else:
0308                             newNucleusList = {}
0309                             oldNucleusList = copy.copy(nucleusList)
0310                             backlogged_nuclei = self.taskBufferIF.getBackloggedNuclei()
0311                             for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0312                                 if tmpNucleus in backlogged_nuclei:
0313                                     tmpLog.info(f"  skip nucleus={tmpNucleus} due to long transfer backlog criteria=-transfer_backlog")
0314                                 else:
0315                                     newNucleusList[tmpNucleus] = tmpNucleusSpec
0316                             nucleusList = newNucleusList
0317                             tmpLog.info(f"{len(nucleusList)} candidates passed transfer backlog check")
0318                             self.add_summary_message(oldNucleusList, nucleusList, "transfer backlog check")
0319                             if not nucleusList:
0320                                 self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0321                                 continue
0322                         ######################################
0323                         # check endpoint
0324                         fractionFreeSpace = {}
0325                         newNucleusList = {}
0326                         oldNucleusList = copy.copy(nucleusList)
0327                         tmpStat, tmpDatasetSpecList = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.jediTaskID, ["output", "log"])
0328                         for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0329                             to_skip = False
0330                             origNucleusSpec = tmpNucleusSpec
0331                             for tmpDatasetSpec in tmpDatasetSpecList:
0332                                 tmpNucleusSpec = origNucleusSpec
0333                                 # use secondary nucleus for full-chain if defined
0334                                 if taskSpec.get_full_chain() and tmpNucleusSpec.get_secondary_nucleus():
0335                                     tmpNucleusSpec = siteMapper.getNucleus(tmpNucleusSpec.get_secondary_nucleus())
0336                                 # ignore distributed datasets
0337                                 if DataServiceUtils.getDistributedDestination(tmpDatasetSpec.storageToken) is not None:
0338                                     continue
0339                                 # get endpoint with the pattern
0340                                 tmpEP = tmpNucleusSpec.getAssociatedEndpoint(tmpDatasetSpec.storageToken)
0341                                 tmp_ddm_endpoint_name = tmpEP["ddm_endpoint_name"]
0342                                 if tmpEP is None:
0343                                     tmpLog.info(f"  skip nucleus={tmpNucleus} since no endpoint with {tmpDatasetSpec.storageToken} criteria=-match")
0344                                     to_skip = True
0345                                     break
0346                                 # check blacklist
0347                                 read_wan_status = tmpEP["detailed_status"].get("read_wan")
0348                                 if read_wan_status in ["OFF", "TEST"]:
0349                                     tmpLog.info(
0350                                         f"  skip nucleus={tmpNucleus} since {tmp_ddm_endpoint_name} has read_wan={read_wan_status} criteria=-source_blacklist"
0351                                     )
0352                                     to_skip = True
0353                                     break
0354                                 write_wan_status = tmpEP["detailed_status"].get("write_wan")
0355                                 if write_wan_status in ["OFF", "TEST"]:
0356                                     tmpLog.info(
0357                                         f"  skip nucleus={tmpNucleus} since {tmp_ddm_endpoint_name} has write_wan={read_wan_status} criteria=-destination_blacklist"
0358                                     )
0359                                     to_skip = True
0360                                     break
0361                                 # check space
0362                                 tmpSpaceSize = 0
0363                                 if tmpEP["space_free"]:
0364                                     tmpSpaceSize += tmpEP["space_free"]
0365                                 if tmpEP["space_expired"]:
0366                                     tmpSpaceSize += tmpEP["space_expired"]
0367                                 tmpSpaceToUse = 0
0368                                 if tmpNucleus in self.fullRW:
0369                                     # 0.25GB per cpuTime/corePower/day
0370                                     tmpSpaceToUse = int(self.fullRW[tmpNucleus] / 10 / 24 / 3600 * 0.25)
0371                                 if tmpSpaceSize - tmpSpaceToUse < diskThreshold:
0372                                     tmpLog.info(
0373                                         "  skip nucleus={0} since disk shortage (free {1} TB - reserved {2} TB < thr {3} TB) at endpoint {4} criteria=-space".format(
0374                                             tmpNucleus, tmpSpaceSize // 1024, tmpSpaceToUse // 1024, diskThreshold // 1024, tmpEP["ddm_endpoint_name"]
0375                                         )
0376                                     )
0377                                     to_skip = True
0378                                     break
0379                                 # keep fraction of free space
0380                                 if tmpNucleus not in fractionFreeSpace:
0381                                     fractionFreeSpace[tmpNucleus] = {"total": 0, "free": 0}
0382                                 try:
0383                                     tmpOld = float(fractionFreeSpace[tmpNucleus]["free"]) / float(fractionFreeSpace[tmpNucleus]["total"])
0384                                 except Exception:
0385                                     tmpOld = None
0386                                 try:
0387                                     tmpNew = float(tmpSpaceSize - tmpSpaceToUse) / float(tmpEP["space_total"])
0388                                 except Exception:
0389                                     tmpNew = None
0390                                 if tmpNew is not None and (tmpOld is None or tmpNew < tmpOld):
0391                                     fractionFreeSpace[tmpNucleus] = {"total": tmpEP["space_total"], "free": tmpSpaceSize - tmpSpaceToUse}
0392                             if not to_skip:
0393                                 newNucleusList[tmpNucleus] = origNucleusSpec
0394                         nucleusList = newNucleusList
0395                         tmpLog.info(f"{len(nucleusList)} candidates passed endpoint check with DISK_THRESHOLD={diskThreshold // 1024} TB")
0396                         self.add_summary_message(oldNucleusList, nucleusList, "storage endpoint check")
0397                         if not nucleusList:
0398                             self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0399                             continue
0400                         ######################################
0401                         # ability to execute jobs
0402                         newNucleusList = {}
0403                         oldNucleusList = copy.copy(nucleusList)
0404                         # get all panda sites
0405                         tmpSiteList = []
0406                         for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0407                             tmpSiteList += tmpNucleusSpec.allPandaSites
0408                         tmpSiteList = list(set(tmpSiteList))
0409                         tmpLog.debug("===== start for job check")
0410                         jobBroker = AtlasProdJobBroker(self.ddmIF, self.taskBufferIF)
0411                         tmp_status, tmp_data_map = jobBroker.doBrokerage(taskSpec, taskSpec.cloud, inputChunk, None, True, tmpSiteList, tmpLog)
0412                         tmpLog.debug("===== done for job check")
0413                         if tmp_status != Interaction.SC_SUCCEEDED:
0414                             tmpLog.error("no sites can run jobs")
0415                             taskSpec.resetChangedList()
0416                             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0417                             self.taskBufferIF.updateTask_JEDI(
0418                                 taskSpec, {"jediTaskID": taskSpec.jediTaskID}, oldStatus=["assigning"], updateDEFT=False, setFrozenTime=False
0419                             )
0420                             continue
0421                         okNuclei = set()
0422                         for tmpSite in tmp_data_map:
0423                             siteSpec = siteMapper.getSite(tmpSite)
0424                             okNuclei.add(siteSpec.pandasite)
0425                         for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0426                             if tmpNucleus in okNuclei:
0427                                 newNucleusList[tmpNucleus] = tmpNucleusSpec
0428                             else:
0429                                 tmpLog.info(f"  skip nucleus={tmpNucleus} due to missing ability to run jobs criteria=-job")
0430                         nucleusList = newNucleusList
0431                         tmpLog.info(f"{len(nucleusList)} candidates passed job check")
0432                         self.add_summary_message(oldNucleusList, nucleusList, "job check")
0433                         if not nucleusList:
0434                             self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0435                             continue
0436                         ######################################
0437                         # data locality
0438                         time_now = naive_utcnow()
0439                         if taskSpec.frozenTime and time_now - taskSpec.frozenTime > datetime.timedelta(days=data_location_check_period):
0440                             tmpLog.info(f"disabled data check since the task was in assigning for " f"{data_location_check_period} days")
0441                         else:
0442                             dataset_availability_info = {k: v for k, v in dataset_availability_info.items() if k in nucleusList}
0443                             if dataset_availability_info != {}:
0444                                 newNucleusList = {}
0445                                 oldNucleusList = copy.copy(nucleusList)
0446                                 # skip if no data
0447                                 skipMsgList = []
0448                                 for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0449                                     if taskSpec.inputPreStaging() and dataset_availability_info[tmpNucleus]["ava_num_any"] > 0:
0450                                         # use incomplete replicas for data carousel since the completeness is guaranteed
0451                                         newNucleusList[tmpNucleus] = tmpNucleusSpec
0452                                     elif (
0453                                         dataset_availability_info[tmpNucleus]["tot_size"] > thrInputSize
0454                                         and dataset_availability_info[tmpNucleus]["ava_size_any"]
0455                                         < dataset_availability_info[tmpNucleus]["tot_size"] * thrInputSizeFrac
0456                                     ):
0457                                         tmpMsg = "  skip nucleus={0} due to insufficient input size {1}B < {2}*{3} criteria=-insize".format(
0458                                             tmpNucleus,
0459                                             dataset_availability_info[tmpNucleus]["ava_size_any"],
0460                                             dataset_availability_info[tmpNucleus]["tot_size"],
0461                                             thrInputSizeFrac,
0462                                         )
0463                                         skipMsgList.append(tmpMsg)
0464                                     elif (
0465                                         dataset_availability_info[tmpNucleus]["tot_num"] > thrInputNum
0466                                         and dataset_availability_info[tmpNucleus]["ava_num_any"]
0467                                         < dataset_availability_info[tmpNucleus]["tot_num"] * thrInputNumFrac
0468                                     ):
0469                                         tmpMsg = "  skip nucleus={0} due to short number of input files {1} < {2}*{3} criteria=-innum".format(
0470                                             tmpNucleus,
0471                                             dataset_availability_info[tmpNucleus]["ava_num_any"],
0472                                             dataset_availability_info[tmpNucleus]["tot_num"],
0473                                             thrInputNumFrac,
0474                                         )
0475                                         skipMsgList.append(tmpMsg)
0476                                     else:
0477                                         newNucleusList[tmpNucleus] = tmpNucleusSpec
0478                                 totInputSize = list(dataset_availability_info.values())[0]["tot_size"] / 1024 / 1024 / 1024
0479                                 data_locality_check_str = (
0480                                     f"(ioIntensity ({taskSpec.ioIntensity}) is None or less than {minIoIntensityWithLD} kBPerS "
0481                                     f"and input size ({int(totInputSize)} GB) is less than "
0482                                     f"{minInputSizeWithLD}) "
0483                                     f"or (task.currentPriority ("
0484                                     f"{taskSpec.currentPriority}) is higher than or equal to {maxTaskPrioWithLD}) "
0485                                     f"or the task is in assigning for {data_location_check_period} days"
0486                                 )
0487                                 if len(newNucleusList) > 0:
0488                                     nucleusList = newNucleusList
0489                                     for tmpMsg in skipMsgList:
0490                                         tmpLog.info(tmpMsg)
0491                                 elif (
0492                                     (taskSpec.ioIntensity is None or taskSpec.ioIntensity <= minIoIntensityWithLD) and totInputSize <= minInputSizeWithLD
0493                                 ) or taskSpec.currentPriority >= maxTaskPrioWithLD:
0494                                     dataset_availability_info = {}
0495                                     tmpLog.info(f"  disable data locality check since no nucleus has input data, {data_locality_check_str}")
0496                                 else:
0497                                     # no candidate + unavoidable data locality check
0498                                     nucleusList = newNucleusList
0499                                     for tmpMsg in skipMsgList:
0500                                         tmpLog.info(tmpMsg)
0501                                     tmpLog.info(f"  the following conditions required to disable data locality check: {data_locality_check_str}")
0502                                 tmpLog.info(f"{len(nucleusList)} candidates passed data check")
0503                                 self.add_summary_message(oldNucleusList, nucleusList, "data check")
0504                                 if not nucleusList:
0505                                     self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0506                                     continue
0507                         ######################################
0508                         # check for full chain
0509                         newNucleusList = {}
0510                         oldNucleusList = copy.copy(nucleusList)
0511                         parent_full_chain = False
0512                         if taskSpec.get_full_chain() and taskSpec.jediTaskID != taskSpec.parent_tid:
0513                             tmpStat, parentTaskSpec = self.taskBufferIF.getTaskWithID_JEDI(taskSpec.parent_tid, False)
0514 
0515                             if not tmpStat or parentTaskSpec is None:
0516                                 self.post_process_for_error(taskSpec, tmpLog, f"failed to get parent taskSpec with jediTaskID={taskSpec.parent_tid}", False)
0517                                 continue
0518                             parent_full_chain = parentTaskSpec.check_full_chain_with_nucleus(siteMapper.getNucleus(parentTaskSpec.nucleus))
0519                         for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0520                             # nucleus to run only full-chain tasks
0521                             if tmpNucleusSpec.get_bare_nucleus_mode() == "only" and taskSpec.get_full_chain() is None:
0522                                 tmpLog.info(f"  skip nucleus={tmpNucleus} since only full-chain tasks are allowed criteria=-full_chain")
0523                                 continue
0524                             # task requirement to run on nucleus with full-chain support
0525                             if tmpNucleusSpec.get_bare_nucleus_mode() is None and (
0526                                 taskSpec.check_full_chain_with_mode("only") or taskSpec.check_full_chain_with_mode("require")
0527                             ):
0528                                 tmpLog.info(f"  skip nucleus={tmpNucleus} due to lack of full-chain capability criteria=-full_chain")
0529                                 continue
0530                             # check parent task
0531                             if taskSpec.get_full_chain() and parent_full_chain:
0532                                 if tmpNucleus != parentTaskSpec.nucleus:
0533                                     tmpLog.info(f"  skip nucleus={tmpNucleus} since the parent of the full-chain ran elsewhere criteria=-full_chain")
0534                                     continue
0535                             newNucleusList[tmpNucleus] = tmpNucleusSpec
0536                         nucleusList = newNucleusList
0537                         tmpLog.info(f"{len(nucleusList)} candidates passed full-chain check")
0538                         self.add_summary_message(oldNucleusList, nucleusList, "full-chain check")
0539                         if not nucleusList:
0540                             self.post_process_for_error(taskSpec, tmpLog, "no candidates")
0541                             continue
0542                         self.dump_summary(tmpLog, nucleusList)
0543                         ######################################
0544                         # weight
0545                         self.prioRW.acquire()
0546                         nucleusRW = self.prioRW[taskSpec.currentPriority]
0547                         self.prioRW.release()
0548                         totalWeight = 0
0549                         nucleusweights = []
0550                         for tmpNucleus, tmpNucleusSpec in nucleusList.items():
0551                             if tmpNucleus not in nucleusRW:
0552                                 nucleusRW[tmpNucleus] = 0
0553                             wStr = "1"
0554                             # with RW
0555                             if tmpNucleus in nucleusRW and nucleusRW[tmpNucleus] and nucleusRW[tmpNucleus] >= cutOffRW:
0556                                 weight = 1 / float(nucleusRW[tmpNucleus])
0557                                 wStr += f"/( RW={nucleusRW[tmpNucleus]} )"
0558                             else:
0559                                 weight = 1
0560                                 wStr += f"/(1 : RW={nucleusRW[tmpNucleus]}<{cutOffRW})"
0561                             # with data
0562                             if dataset_availability_info != {}:
0563                                 if dataset_availability_info[tmpNucleus]["tot_size"] > 0:
0564                                     # use input size only when high IO intensity
0565                                     if taskSpec.ioIntensity is None or taskSpec.ioIntensity > minIoIntensityWithLD:
0566                                         weight *= float(dataset_availability_info[tmpNucleus]["ava_size_any"])
0567                                         weight /= float(dataset_availability_info[tmpNucleus]["tot_size"])
0568                                         wStr += f"* ( available_input_size_DISKTAPE={dataset_availability_info[tmpNucleus]['ava_size_any']} )"
0569                                         wStr += f"/ ( total_input_size={dataset_availability_info[tmpNucleus]['tot_size']} )"
0570                                     # negative weight for tape
0571                                     if dataset_availability_info[tmpNucleus]["ava_size_any"] > dataset_availability_info[tmpNucleus]["ava_size_disk"]:
0572                                         weight *= negWeightTape
0573                                         wStr += f"*( weight_TAPE={negWeightTape} )"
0574                             # fraction of free space
0575                             if tmpNucleus in fractionFreeSpace:
0576                                 try:
0577                                     tmpFrac = float(fractionFreeSpace[tmpNucleus]["free"]) / float(fractionFreeSpace[tmpNucleus]["total"])
0578                                     weight *= tmpFrac
0579                                     wStr += f"*( free_space={fractionFreeSpace[tmpNucleus]['free'] // 1024} TB )/( total_space={fractionFreeSpace[tmpNucleus]['total'] // 1024} TB )"
0580                                     free_disk_in_tb = fractionFreeSpace[tmpNucleus]["free"] // 1024
0581                                     free_disk_term = min(free_disk_cutoff, free_disk_in_tb)
0582                                     weight *= free_disk_term
0583                                     wStr += f"*min( free_space={free_disk_in_tb} TB, FREE_DISK_CUTOFF={free_disk_cutoff} TB )"
0584                                 except Exception:
0585                                     pass
0586                             tmpLog.info(f"  use nucleus={tmpNucleus} weight={weight} {wStr} criteria=+use")
0587                             totalWeight += weight
0588                             nucleusweights.append((tmpNucleus, weight))
0589                         ######################################
0590                         # final selection
0591                         tgtWeight = random.uniform(0, totalWeight)
0592                         candidateNucleus = None
0593                         for tmpNucleus, weight in nucleusweights:
0594                             tgtWeight -= weight
0595                             if tgtWeight <= 0:
0596                                 candidateNucleus = tmpNucleus
0597                                 break
0598                         if candidateNucleus is None:
0599                             candidateNucleus = nucleusweights[-1][0]
0600                     ######################################
0601                     # update
0602                     nucleusSpec = nucleusList[candidateNucleus]
0603                     # get output/log datasets
0604                     tmpStat, tmpDatasetSpecs = self.taskBufferIF.getDatasetsWithJediTaskID_JEDI(taskSpec.jediTaskID, ["output", "log"])
0605                     # get destinations
0606                     retMap = {taskSpec.jediTaskID: AtlasBrokerUtils.getDictToSetNucleus(nucleusSpec, tmpDatasetSpecs)}
0607                     tmp_ret = self.taskBufferIF.setCloudToTasks_JEDI(retMap)
0608                     tmpLog.info(f"  set nucleus={candidateNucleus} with {tmp_ret} criteria=+set")
0609                     if tmp_ret:
0610                         tmpMsg = "set task_status=ready"
0611                         tmpLog.sendMsg(tmpMsg, self.msgType)
0612                     # update RW table
0613                     self.prioRW.acquire()
0614                     for prio, rwMap in self.prioRW.items():
0615                         if prio > taskSpec.currentPriority:
0616                             continue
0617                         if candidateNucleus in rwMap:
0618                             if not rwMap[candidateNucleus]:
0619                                 rwMap[candidateNucleus] = 0
0620                             rwMap[candidateNucleus] += taskRW
0621                         else:
0622                             rwMap[candidateNucleus] = taskRW
0623                     self.prioRW.release()
0624             except Exception:
0625                 errtype, errvalue = sys.exc_info()[:2]
0626                 errMsg = f"{self.__class__.__name__}.runImpl() failed with {errtype.__name__} {errvalue} "
0627                 errMsg += f"lastJediTaskID={lastJediTaskID} "
0628                 errMsg += traceback.format_exc()
0629                 logger.error(errMsg)