File indexing completed on 2026-04-10 08:38:58
0001 import random
0002 import re
0003
0004 from pandacommon.pandalogger.PandaLogger import PandaLogger
0005
0006 from pandajedi.jedicore import Interaction
0007 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0008 from pandajedi.jedicore.SiteCandidate import SiteCandidate
0009 from pandajedi.jedirefine import RefinerUtils
0010 from pandaserver.srvcore import CoreUtils
0011
0012 from . import AtlasBrokerUtils
0013 from .JobBrokerBase import JobBrokerBase
0014
0015 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0016
0017
0018
0019 class GenJobBroker(JobBrokerBase):
0020
0021 def __init__(self, ddmIF, taskBufferIF):
0022 JobBrokerBase.__init__(self, ddmIF, taskBufferIF)
0023
0024
0025 def doBrokerage(self, taskSpec, cloudName, inputChunk, taskParamMap):
0026
0027 tmpLog = MsgWrapper(logger, f"<jediTaskID={taskSpec.jediTaskID}>")
0028 tmpLog.debug("start")
0029
0030 retFatal = self.SC_FATAL, inputChunk
0031 retTmpError = self.SC_FAILED, inputChunk
0032
0033 try:
0034 if not taskParamMap:
0035 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0036 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0037 if not taskSpec.cloud and "cloud" in taskParamMap:
0038 taskSpec.cloud = taskParamMap["cloud"]
0039 except Exception:
0040 pass
0041
0042 site_preassigned = True
0043 if taskSpec.site not in ["", None]:
0044 tmpLog.debug(f"site={taskSpec.site} is pre-assigned")
0045 if self.siteMapper.checkSite(taskSpec.site):
0046 scanSiteList = [taskSpec.site]
0047 else:
0048 scanSiteList = []
0049 for tmpSite in self.siteMapper.getCloud(taskSpec.cloud)["sites"]:
0050 if re.search(taskSpec.site, tmpSite):
0051 scanSiteList.append(tmpSite)
0052 if not scanSiteList:
0053 tmpLog.error(f"unknown site={taskSpec.site}")
0054 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0055 return retTmpError
0056 elif inputChunk.getPreassignedSite() is not None:
0057 scanSiteList = [inputChunk.getPreassignedSite()]
0058 tmpLog.debug(f"site={inputChunk.getPreassignedSite()} is pre-assigned in masterDS")
0059 else:
0060 site_preassigned = False
0061 scanSiteList = self.siteMapper.getCloud(taskSpec.cloud)["sites"]
0062
0063 if "NA" in scanSiteList:
0064 scanSiteList.remove("NA")
0065 tmpLog.debug(f"cloud={taskSpec.cloud} has {len(scanSiteList)} candidates")
0066 tmpLog.debug(f"initial {len(scanSiteList)} candidates")
0067
0068
0069 newScanSiteList = []
0070 for tmpSiteName in scanSiteList:
0071 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0072
0073 if tmpSiteSpec.status != "online" and not site_preassigned:
0074 tmpLog.debug(f" skip {tmpSiteName} due to status={tmpSiteSpec.status}")
0075 continue
0076
0077 if "PandaSite" in taskParamMap and taskParamMap["PandaSite"]:
0078 if tmpSiteSpec.pandasite != taskParamMap["PandaSite"]:
0079 tmpLog.debug(f" skip {tmpSiteName} due to wrong PandaSite={tmpSiteSpec.pandasite} <> {taskParamMap['PandaSite']}")
0080 continue
0081 newScanSiteList.append(tmpSiteName)
0082 scanSiteList = newScanSiteList
0083 tmpLog.debug(f"{len(scanSiteList)} candidates passed site status check")
0084 if scanSiteList == []:
0085 tmpLog.error("no candidates")
0086 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0087 return retTmpError
0088
0089
0090 minDiskCountS = taskSpec.getOutDiskSize() + taskSpec.getWorkDiskSize() + inputChunk.getMaxAtomSize()
0091 minDiskCountS = minDiskCountS // 1024 // 1024
0092
0093 if taskSpec.useLocalIO():
0094 minDiskCountR = minDiskCountS
0095 else:
0096 minDiskCountR = taskSpec.getOutDiskSize() + taskSpec.getWorkDiskSize()
0097 minDiskCountR = minDiskCountR // 1024 // 1024
0098 newScanSiteList = []
0099 for tmpSiteName in scanSiteList:
0100 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0101
0102 if tmpSiteSpec.maxwdir:
0103 if CoreUtils.use_direct_io_for_job(taskSpec, tmpSiteSpec, inputChunk):
0104 minDiskCount = minDiskCountR
0105 else:
0106 minDiskCount = minDiskCountS
0107 if minDiskCount > tmpSiteSpec.maxwdir:
0108 tmpLog.debug(f" skip {tmpSiteName} due to small scratch disk={tmpSiteSpec.maxwdir} < {minDiskCount}")
0109 continue
0110 newScanSiteList.append(tmpSiteName)
0111 scanSiteList = newScanSiteList
0112 tmpLog.debug(f"{len(scanSiteList)} candidates passed scratch disk check")
0113 if scanSiteList == []:
0114 tmpLog.error("no candidates")
0115 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0116 return retTmpError
0117
0118
0119 newScanSiteList = []
0120 for tmpSiteName in scanSiteList:
0121
0122 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0123
0124 diskThreshold = 200
0125 tmpSpaceSize = tmpSiteSpec.space
0126 if tmpSiteSpec.space and tmpSpaceSize < diskThreshold:
0127 tmpLog.debug(f" skip {tmpSiteName} due to disk shortage in SE = {tmpSiteSpec.space} < {diskThreshold}GB")
0128 continue
0129 newScanSiteList.append(tmpSiteName)
0130 scanSiteList = newScanSiteList
0131 tmpLog.debug(f"{len(scanSiteList)} candidates passed SE space check")
0132 if not scanSiteList:
0133 tmpLog.error("no candidates")
0134 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0135 return retTmpError
0136
0137
0138 minWalltime = taskSpec.walltime
0139 if minWalltime not in [0, None]:
0140 newScanSiteList = []
0141 for tmpSiteName in scanSiteList:
0142 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0143
0144 if tmpSiteSpec.maxtime != 0 and minWalltime > tmpSiteSpec.maxtime:
0145 tmpLog.debug(f" skip {tmpSiteName} due to short site walltime={tmpSiteSpec.maxtime}(site upper limit) < {minWalltime}")
0146 continue
0147 if tmpSiteSpec.mintime != 0 and minWalltime < tmpSiteSpec.mintime:
0148 tmpLog.debug(f" skip {tmpSiteName} due to short job walltime={tmpSiteSpec.mintime}(site lower limit) > {minWalltime}")
0149 continue
0150 newScanSiteList.append(tmpSiteName)
0151 scanSiteList = newScanSiteList
0152 tmpLog.debug(f"{len(scanSiteList)} candidates passed walltime check ={minWalltime}{taskSpec.walltimeUnit}")
0153 if not scanSiteList:
0154 tmpLog.error("no candidates")
0155 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0156 return retTmpError
0157
0158
0159 if taskSpec.coreCount is not None and taskSpec.coreCount >= 0:
0160 if not site_preassigned:
0161 newScanSiteList = []
0162 for tmpSiteName in scanSiteList:
0163 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0164
0165 is_ok = False
0166 if taskSpec.coreCount == 0:
0167
0168 is_ok = True
0169 elif taskSpec.coreCount == 1:
0170
0171 if tmpSiteSpec.coreCount in [0, 1, -1, None]:
0172 is_ok = True
0173 else:
0174
0175 if tmpSiteSpec.coreCount in [0, -1]:
0176 is_ok = True
0177 elif tmpSiteSpec.coreCount and tmpSiteSpec.coreCount >= taskSpec.coreCount:
0178 is_ok = True
0179 if is_ok:
0180 newScanSiteList.append(tmpSiteName)
0181 else:
0182 tmpLog.info(
0183 f" skip site={tmpSiteName} due to core mismatch site:{tmpSiteSpec.coreCount} <> task:{taskSpec.coreCount} criteria=-cpucore"
0184 )
0185 scanSiteList = newScanSiteList
0186 tmpLog.info(f"{len(scanSiteList)} candidates passed for core count check")
0187 if not scanSiteList:
0188 self.dump_summary(tmpLog)
0189 tmpLog.error("no candidates")
0190 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0191 return retTmpError
0192
0193
0194 origMinRamCount = inputChunk.getMaxRamCount()
0195 if not site_preassigned and origMinRamCount:
0196 newScanSiteList = []
0197 for tmpSiteName in scanSiteList:
0198 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0199
0200 if taskSpec.ramPerCore():
0201 if tmpSiteSpec.coreCount and tmpSiteSpec.coreCount > 0:
0202 minRamCount = origMinRamCount * tmpSiteSpec.coreCount
0203 else:
0204 minRamCount = origMinRamCount * (taskSpec.coreCount if taskSpec.coreCount else 1)
0205 minRamCount += taskSpec.baseRamCount if taskSpec.baseRamCount else 0
0206 else:
0207 minRamCount = origMinRamCount
0208
0209 site_maxmemory = tmpSiteSpec.maxrss if tmpSiteSpec.maxrss else 0
0210
0211 if site_maxmemory and minRamCount and minRamCount > site_maxmemory:
0212 tmpMsg = f" skip site={tmpSiteName} due to site RAM shortage. {site_maxmemory} (site upper limit) less than {minRamCount} "
0213 tmpLog.debug(tmpMsg)
0214 continue
0215
0216 site_minmemory = tmpSiteSpec.minrss if tmpSiteSpec.minrss else 0
0217 if site_minmemory and minRamCount and minRamCount < site_minmemory:
0218 tmpMsg = f" skip site={tmpSiteName} due to job RAM shortage. {site_minmemory} (site lower limit) greater than {minRamCount} "
0219 tmpLog.info(tmpMsg)
0220 continue
0221 newScanSiteList.append(tmpSiteName)
0222 scanSiteList = newScanSiteList
0223 tmpLog.debug(f"{len(scanSiteList)} candidates passed memory check")
0224 if not scanSiteList:
0225 tmpLog.error("no candidates")
0226 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0227 return retTmpError
0228
0229
0230 newScanSiteList = []
0231 for tmpSiteName in scanSiteList:
0232 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0233 allowed_list = tmpSiteSpec.get_allowed_processing_types()
0234 exclude_list = tmpSiteSpec.get_excluded_processing_types()
0235 if allowed_list is not None:
0236 if taskSpec.processingType not in allowed_list:
0237 tmpLog.debug(f" skip {tmpSiteName} due to processing type not in allowed list")
0238 continue
0239 if exclude_list is not None:
0240 if taskSpec.processingType in exclude_list:
0241 tmpLog.debug(f" skip {tmpSiteName} due to processing type in excluded list")
0242 continue
0243 newScanSiteList.append(tmpSiteName)
0244 scanSiteList = newScanSiteList
0245 tmpLog.debug(f"{len(scanSiteList)} candidates passed allowed/excluded processing type check")
0246 if not scanSiteList:
0247 tmpLog.error("no candidates")
0248 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0249 return retTmpError
0250
0251
0252 nWNmap = self.taskBufferIF.getCurrentSiteData()
0253 newScanSiteList = []
0254 for tmpSiteName in scanSiteList:
0255 tmp_site_spec = self.siteMapper.getSite(tmpSiteName)
0256
0257 nPilot = 0
0258 if tmpSiteName in nWNmap:
0259 nPilot = nWNmap[tmpSiteName]["getJob"] + nWNmap[tmpSiteName]["updateJob"]
0260 if nPilot == 0 and taskSpec.prodSourceLabel not in ["test"] and not tmp_site_spec.hasValueInCatchall("allow_no_pilot"):
0261 tmpLog.debug(f" skip {tmpSiteName} due to no pilot")
0262 continue
0263 newScanSiteList.append(tmpSiteName)
0264 scanSiteList = newScanSiteList
0265 tmpLog.debug(f"{len(scanSiteList)} candidates passed pilot activity check")
0266 if scanSiteList == []:
0267 tmpLog.error("no candidates")
0268 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0269 return retTmpError
0270
0271
0272 tmpSt, sitesUsedByTask = self.taskBufferIF.getSitesUsedByTask_JEDI(taskSpec.jediTaskID)
0273 if not tmpSt:
0274 tmpLog.error("failed to get sites which already used by task")
0275 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0276 return retTmpError
0277
0278
0279 availableFileMap = {}
0280 for datasetSpec in inputChunk.getDatasets():
0281 try:
0282
0283 tmpLog.debug(f"getting the list of available files for {datasetSpec.datasetName}")
0284 fileScanSiteList = []
0285 for tmpPseudoSiteName in scanSiteList:
0286 tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
0287 tmpSiteName = tmpSiteSpec.get_unified_name()
0288 if tmpSiteName in fileScanSiteList:
0289 continue
0290 fileScanSiteList.append(tmpSiteName)
0291
0292 siteStorageEP = AtlasBrokerUtils.getSiteInputStorageEndpointMap(fileScanSiteList, self.siteMapper, taskSpec.prodSourceLabel, None)
0293
0294 if inputChunk.isMerging:
0295 checkCompleteness = False
0296 else:
0297 checkCompleteness = True
0298 if not datasetSpec.isMaster():
0299 useCompleteOnly = True
0300 else:
0301 useCompleteOnly = False
0302
0303 tmpAvFileMap = self.ddmIF.getAvailableFiles(
0304 datasetSpec,
0305 siteStorageEP,
0306 self.siteMapper,
0307 check_completeness=checkCompleteness,
0308 file_scan_in_container=False,
0309 complete_only=useCompleteOnly,
0310 use_deep=True,
0311 )
0312 if tmpAvFileMap is None:
0313 raise Interaction.JEDITemporaryError("ddmIF.getAvailableFiles failed")
0314 availableFileMap[datasetSpec.datasetName] = tmpAvFileMap
0315 except Exception as e:
0316 tmpLog.error(f"failed to get available files with {e}")
0317 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0318 return retTmpError
0319
0320
0321 tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
0322 if not tmpSt:
0323 tmpLog.error("failed to get job statistics with priority")
0324 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0325 return retTmpError
0326
0327
0328 tmpLog.debug(f"final {len(scanSiteList)} candidates")
0329 weightMap = {}
0330 candidateSpecList = []
0331 preSiteCandidateSpec = None
0332 for tmpSiteName in scanSiteList:
0333
0334 nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running", None, None)
0335 nAssigned = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "defined", None, None)
0336 nActivated = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "activated", None, None)
0337 weight = float(nRunning + 1) / float(nActivated + nAssigned + 1) / float(nAssigned + 1)
0338
0339 siteCandidateSpec = SiteCandidate(tmpSiteName)
0340
0341 siteCandidateSpec.weight = weight
0342
0343 for tmpDatasetName, availableFiles in availableFileMap.items():
0344 if tmpSiteName in availableFiles:
0345 siteCandidateSpec.add_local_disk_files(availableFiles[tmpSiteName]["localdisk"])
0346
0347 if tmpSiteName in sitesUsedByTask:
0348 candidateSpecList.append(siteCandidateSpec)
0349 else:
0350 if weight not in weightMap:
0351 weightMap[weight] = []
0352 weightMap[weight].append(siteCandidateSpec)
0353
0354 maxNumSites = 5
0355 weightList = sorted(weightMap.keys())
0356 weightList.reverse()
0357 for weightVal in weightList:
0358 if len(candidateSpecList) >= maxNumSites:
0359 break
0360 sitesWithWeight = weightMap[weightVal]
0361 random.shuffle(sitesWithWeight)
0362 candidateSpecList += sitesWithWeight[: (maxNumSites - len(candidateSpecList))]
0363
0364 scanSiteList = []
0365 for siteCandidateSpec in candidateSpecList:
0366 scanSiteList.append(siteCandidateSpec.siteName)
0367
0368 newScanSiteList = []
0369 for siteCandidateSpec in candidateSpecList:
0370
0371 inputChunk.addSiteCandidate(siteCandidateSpec)
0372 newScanSiteList.append(siteCandidateSpec.siteName)
0373 tmpLog.debug(f" use {siteCandidateSpec.siteName} with weight={siteCandidateSpec.weight} nFiles={len(siteCandidateSpec.localDiskFiles)}")
0374 scanSiteList = newScanSiteList
0375 if scanSiteList == []:
0376 tmpLog.error("no candidates")
0377 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0378 return retTmpError
0379
0380 tmpLog.debug("done")
0381 return self.SC_SUCCEEDED, inputChunk