Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:38:58

0001 import random
0002 import re
0003 
0004 from pandacommon.pandalogger.PandaLogger import PandaLogger
0005 
0006 from pandajedi.jedicore import Interaction
0007 from pandajedi.jedicore.MsgWrapper import MsgWrapper
0008 from pandajedi.jedicore.SiteCandidate import SiteCandidate
0009 from pandajedi.jedirefine import RefinerUtils
0010 from pandaserver.srvcore import CoreUtils
0011 
0012 from . import AtlasBrokerUtils
0013 from .JobBrokerBase import JobBrokerBase
0014 
0015 logger = PandaLogger().getLogger(__name__.split(".")[-1])
0016 
0017 
0018 # brokerage for general purpose
0019 class GenJobBroker(JobBrokerBase):
0020     # constructor
0021     def __init__(self, ddmIF, taskBufferIF):
0022         JobBrokerBase.__init__(self, ddmIF, taskBufferIF)
0023 
0024     # main
0025     def doBrokerage(self, taskSpec, cloudName, inputChunk, taskParamMap):
0026         # make logger
0027         tmpLog = MsgWrapper(logger, f"<jediTaskID={taskSpec.jediTaskID}>")
0028         tmpLog.debug("start")
0029         # return for failure
0030         retFatal = self.SC_FATAL, inputChunk
0031         retTmpError = self.SC_FAILED, inputChunk
0032         # set cloud
0033         try:
0034             if not taskParamMap:
0035                 taskParam = self.taskBufferIF.getTaskParamsWithID_JEDI(taskSpec.jediTaskID)
0036                 taskParamMap = RefinerUtils.decodeJSON(taskParam)
0037             if not taskSpec.cloud and "cloud" in taskParamMap:
0038                 taskSpec.cloud = taskParamMap["cloud"]
0039         except Exception:
0040             pass
0041         # get sites in the cloud
0042         site_preassigned = True
0043         if taskSpec.site not in ["", None]:
0044             tmpLog.debug(f"site={taskSpec.site} is pre-assigned")
0045             if self.siteMapper.checkSite(taskSpec.site):
0046                 scanSiteList = [taskSpec.site]
0047             else:
0048                 scanSiteList = []
0049                 for tmpSite in self.siteMapper.getCloud(taskSpec.cloud)["sites"]:
0050                     if re.search(taskSpec.site, tmpSite):
0051                         scanSiteList.append(tmpSite)
0052                 if not scanSiteList:
0053                     tmpLog.error(f"unknown site={taskSpec.site}")
0054                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0055                     return retTmpError
0056         elif inputChunk.getPreassignedSite() is not None:
0057             scanSiteList = [inputChunk.getPreassignedSite()]
0058             tmpLog.debug(f"site={inputChunk.getPreassignedSite()} is pre-assigned in masterDS")
0059         else:
0060             site_preassigned = False
0061             scanSiteList = self.siteMapper.getCloud(taskSpec.cloud)["sites"]
0062             # remove NA
0063             if "NA" in scanSiteList:
0064                 scanSiteList.remove("NA")
0065             tmpLog.debug(f"cloud={taskSpec.cloud} has {len(scanSiteList)} candidates")
0066         tmpLog.debug(f"initial {len(scanSiteList)} candidates")
0067         ######################################
0068         # selection for status and PandaSite
0069         newScanSiteList = []
0070         for tmpSiteName in scanSiteList:
0071             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0072             # check site status
0073             if tmpSiteSpec.status != "online" and not site_preassigned:
0074                 tmpLog.debug(f"  skip {tmpSiteName} due to status={tmpSiteSpec.status}")
0075                 continue
0076             # check PandaSite
0077             if "PandaSite" in taskParamMap and taskParamMap["PandaSite"]:
0078                 if tmpSiteSpec.pandasite != taskParamMap["PandaSite"]:
0079                     tmpLog.debug(f"  skip {tmpSiteName} due to wrong PandaSite={tmpSiteSpec.pandasite} <> {taskParamMap['PandaSite']}")
0080                     continue
0081             newScanSiteList.append(tmpSiteName)
0082         scanSiteList = newScanSiteList
0083         tmpLog.debug(f"{len(scanSiteList)} candidates passed site status check")
0084         if scanSiteList == []:
0085             tmpLog.error("no candidates")
0086             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0087             return retTmpError
0088         ######################################
0089         # selection for scratch disk
0090         minDiskCountS = taskSpec.getOutDiskSize() + taskSpec.getWorkDiskSize() + inputChunk.getMaxAtomSize()
0091         minDiskCountS = minDiskCountS // 1024 // 1024
0092         # size for direct IO sites
0093         if taskSpec.useLocalIO():
0094             minDiskCountR = minDiskCountS
0095         else:
0096             minDiskCountR = taskSpec.getOutDiskSize() + taskSpec.getWorkDiskSize()
0097             minDiskCountR = minDiskCountR // 1024 // 1024
0098         newScanSiteList = []
0099         for tmpSiteName in scanSiteList:
0100             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0101             # check at the site
0102             if tmpSiteSpec.maxwdir:
0103                 if CoreUtils.use_direct_io_for_job(taskSpec, tmpSiteSpec, inputChunk):
0104                     minDiskCount = minDiskCountR
0105                 else:
0106                     minDiskCount = minDiskCountS
0107                 if minDiskCount > tmpSiteSpec.maxwdir:
0108                     tmpLog.debug(f"  skip {tmpSiteName} due to small scratch disk={tmpSiteSpec.maxwdir} < {minDiskCount}")
0109                     continue
0110             newScanSiteList.append(tmpSiteName)
0111         scanSiteList = newScanSiteList
0112         tmpLog.debug(f"{len(scanSiteList)} candidates passed scratch disk check")
0113         if scanSiteList == []:
0114             tmpLog.error("no candidates")
0115             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0116             return retTmpError
0117         ######################################
0118         # selection for available space in SE
0119         newScanSiteList = []
0120         for tmpSiteName in scanSiteList:
0121             # check at the site
0122             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0123             # free space must be >= 200GB
0124             diskThreshold = 200
0125             tmpSpaceSize = tmpSiteSpec.space
0126             if tmpSiteSpec.space and tmpSpaceSize < diskThreshold:
0127                 tmpLog.debug(f"  skip {tmpSiteName} due to disk shortage in SE = {tmpSiteSpec.space} < {diskThreshold}GB")
0128                 continue
0129             newScanSiteList.append(tmpSiteName)
0130         scanSiteList = newScanSiteList
0131         tmpLog.debug(f"{len(scanSiteList)} candidates passed SE space check")
0132         if not scanSiteList:
0133             tmpLog.error("no candidates")
0134             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0135             return retTmpError
0136         ######################################
0137         # selection for walltime
0138         minWalltime = taskSpec.walltime
0139         if minWalltime not in [0, None]:
0140             newScanSiteList = []
0141             for tmpSiteName in scanSiteList:
0142                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0143                 # check at the site
0144                 if tmpSiteSpec.maxtime != 0 and minWalltime > tmpSiteSpec.maxtime:
0145                     tmpLog.debug(f"  skip {tmpSiteName} due to short site walltime={tmpSiteSpec.maxtime}(site upper limit) < {minWalltime}")
0146                     continue
0147                 if tmpSiteSpec.mintime != 0 and minWalltime < tmpSiteSpec.mintime:
0148                     tmpLog.debug(f"  skip {tmpSiteName} due to short job walltime={tmpSiteSpec.mintime}(site lower limit) > {minWalltime}")
0149                     continue
0150                 newScanSiteList.append(tmpSiteName)
0151             scanSiteList = newScanSiteList
0152             tmpLog.debug(f"{len(scanSiteList)} candidates passed walltime check ={minWalltime}{taskSpec.walltimeUnit}")
0153             if not scanSiteList:
0154                 tmpLog.error("no candidates")
0155                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0156                 return retTmpError
0157         ######################################
0158         # selection for MP
0159         if taskSpec.coreCount is not None and taskSpec.coreCount >= 0:
0160             if not site_preassigned:
0161                 newScanSiteList = []
0162                 for tmpSiteName in scanSiteList:
0163                     tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0164                     # check at the site
0165                     is_ok = False
0166                     if taskSpec.coreCount == 0:
0167                         # any
0168                         is_ok = True
0169                     elif taskSpec.coreCount == 1:
0170                         # score
0171                         if tmpSiteSpec.coreCount in [0, 1, -1, None]:
0172                             is_ok = True
0173                     else:
0174                         # mcore
0175                         if tmpSiteSpec.coreCount in [0, -1]:
0176                             is_ok = True
0177                         elif tmpSiteSpec.coreCount and tmpSiteSpec.coreCount >= taskSpec.coreCount:
0178                             is_ok = True
0179                     if is_ok:
0180                         newScanSiteList.append(tmpSiteName)
0181                     else:
0182                         tmpLog.info(
0183                             f"  skip site={tmpSiteName} due to core mismatch site:{tmpSiteSpec.coreCount} <> task:{taskSpec.coreCount} criteria=-cpucore"
0184                         )
0185                 scanSiteList = newScanSiteList
0186                 tmpLog.info(f"{len(scanSiteList)} candidates passed for core count check")
0187                 if not scanSiteList:
0188                     self.dump_summary(tmpLog)
0189                     tmpLog.error("no candidates")
0190                     taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0191                     return retTmpError
0192         ######################################
0193         # selection for memory
0194         origMinRamCount = inputChunk.getMaxRamCount()
0195         if not site_preassigned and origMinRamCount:
0196             newScanSiteList = []
0197             for tmpSiteName in scanSiteList:
0198                 tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0199                 # job memory requirement
0200                 if taskSpec.ramPerCore():
0201                     if tmpSiteSpec.coreCount and tmpSiteSpec.coreCount > 0:
0202                         minRamCount = origMinRamCount * tmpSiteSpec.coreCount
0203                     else:
0204                         minRamCount = origMinRamCount * (taskSpec.coreCount if taskSpec.coreCount else 1)
0205                     minRamCount += taskSpec.baseRamCount if taskSpec.baseRamCount else 0
0206                 else:
0207                     minRamCount = origMinRamCount
0208                 # site max memory requirement
0209                 site_maxmemory = tmpSiteSpec.maxrss if tmpSiteSpec.maxrss else 0
0210                 # check at the site
0211                 if site_maxmemory and minRamCount and minRamCount > site_maxmemory:
0212                     tmpMsg = f"  skip site={tmpSiteName} due to site RAM shortage. {site_maxmemory} (site upper limit) less than {minRamCount} "
0213                     tmpLog.debug(tmpMsg)
0214                     continue
0215                 # site min memory requirement
0216                 site_minmemory = tmpSiteSpec.minrss if tmpSiteSpec.minrss else 0
0217                 if site_minmemory and minRamCount and minRamCount < site_minmemory:
0218                     tmpMsg = f"  skip site={tmpSiteName} due to job RAM shortage. {site_minmemory} (site lower limit) greater than {minRamCount} "
0219                     tmpLog.info(tmpMsg)
0220                     continue
0221                 newScanSiteList.append(tmpSiteName)
0222             scanSiteList = newScanSiteList
0223             tmpLog.debug(f"{len(scanSiteList)} candidates passed memory check")
0224             if not scanSiteList:
0225                 tmpLog.error("no candidates")
0226                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0227                 return retTmpError
0228         ######################################
0229         # selection with processing types
0230         newScanSiteList = []
0231         for tmpSiteName in scanSiteList:
0232             tmpSiteSpec = self.siteMapper.getSite(tmpSiteName)
0233             allowed_list = tmpSiteSpec.get_allowed_processing_types()
0234             exclude_list = tmpSiteSpec.get_excluded_processing_types()
0235             if allowed_list is not None:
0236                 if taskSpec.processingType not in allowed_list:
0237                     tmpLog.debug(f"  skip {tmpSiteName} due to processing type not in allowed list")
0238                     continue
0239             if exclude_list is not None:
0240                 if taskSpec.processingType in exclude_list:
0241                     tmpLog.debug(f"  skip {tmpSiteName} due to processing type in excluded list")
0242                     continue
0243             newScanSiteList.append(tmpSiteName)
0244         scanSiteList = newScanSiteList
0245         tmpLog.debug(f"{len(scanSiteList)} candidates passed allowed/excluded processing type check")
0246         if not scanSiteList:
0247             tmpLog.error("no candidates")
0248             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0249             return retTmpError
0250         ######################################
0251         # selection for nPilot
0252         nWNmap = self.taskBufferIF.getCurrentSiteData()
0253         newScanSiteList = []
0254         for tmpSiteName in scanSiteList:
0255             tmp_site_spec = self.siteMapper.getSite(tmpSiteName)
0256             # check at the site
0257             nPilot = 0
0258             if tmpSiteName in nWNmap:
0259                 nPilot = nWNmap[tmpSiteName]["getJob"] + nWNmap[tmpSiteName]["updateJob"]
0260             if nPilot == 0 and taskSpec.prodSourceLabel not in ["test"] and not tmp_site_spec.hasValueInCatchall("allow_no_pilot"):
0261                 tmpLog.debug(f"  skip {tmpSiteName} due to no pilot")
0262                 continue
0263             newScanSiteList.append(tmpSiteName)
0264         scanSiteList = newScanSiteList
0265         tmpLog.debug(f"{len(scanSiteList)} candidates passed pilot activity check")
0266         if scanSiteList == []:
0267             tmpLog.error("no candidates")
0268             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0269             return retTmpError
0270         ######################################
0271         # sites already used by task
0272         tmpSt, sitesUsedByTask = self.taskBufferIF.getSitesUsedByTask_JEDI(taskSpec.jediTaskID)
0273         if not tmpSt:
0274             tmpLog.error("failed to get sites which already used by task")
0275             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0276             return retTmpError
0277         ######################################
0278         # get list of available files
0279         availableFileMap = {}
0280         for datasetSpec in inputChunk.getDatasets():
0281             try:
0282                 # get list of site to be scanned
0283                 tmpLog.debug(f"getting the list of available files for {datasetSpec.datasetName}")
0284                 fileScanSiteList = []
0285                 for tmpPseudoSiteName in scanSiteList:
0286                     tmpSiteSpec = self.siteMapper.getSite(tmpPseudoSiteName)
0287                     tmpSiteName = tmpSiteSpec.get_unified_name()
0288                     if tmpSiteName in fileScanSiteList:
0289                         continue
0290                     fileScanSiteList.append(tmpSiteName)
0291                 # mapping between sites and input storage endpoints
0292                 siteStorageEP = AtlasBrokerUtils.getSiteInputStorageEndpointMap(fileScanSiteList, self.siteMapper, taskSpec.prodSourceLabel, None)
0293                 # disable file lookup for merge jobs
0294                 if inputChunk.isMerging:
0295                     checkCompleteness = False
0296                 else:
0297                     checkCompleteness = True
0298                 if not datasetSpec.isMaster():
0299                     useCompleteOnly = True
0300                 else:
0301                     useCompleteOnly = False
0302                 # get available files per site/endpoint
0303                 tmpAvFileMap = self.ddmIF.getAvailableFiles(
0304                     datasetSpec,
0305                     siteStorageEP,
0306                     self.siteMapper,
0307                     check_completeness=checkCompleteness,
0308                     file_scan_in_container=False,
0309                     complete_only=useCompleteOnly,
0310                     use_deep=True,
0311                 )
0312                 if tmpAvFileMap is None:
0313                     raise Interaction.JEDITemporaryError("ddmIF.getAvailableFiles failed")
0314                 availableFileMap[datasetSpec.datasetName] = tmpAvFileMap
0315             except Exception as e:
0316                 tmpLog.error(f"failed to get available files with {e}")
0317                 taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0318                 return retTmpError
0319         ######################################
0320         # calculate weight
0321         tmpSt, jobStatPrioMap = self.taskBufferIF.getJobStatisticsByGlobalShare(taskSpec.vo)
0322         if not tmpSt:
0323             tmpLog.error("failed to get job statistics with priority")
0324             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0325             return retTmpError
0326         ######################################
0327         # final procedure
0328         tmpLog.debug(f"final {len(scanSiteList)} candidates")
0329         weightMap = {}
0330         candidateSpecList = []
0331         preSiteCandidateSpec = None
0332         for tmpSiteName in scanSiteList:
0333             # get number of jobs in each job status. Using workQueueID=None to include non-JEDI jobs
0334             nRunning = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "running", None, None)
0335             nAssigned = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "defined", None, None)
0336             nActivated = AtlasBrokerUtils.getNumJobs(jobStatPrioMap, tmpSiteName, "activated", None, None)
0337             weight = float(nRunning + 1) / float(nActivated + nAssigned + 1) / float(nAssigned + 1)
0338             # make candidate
0339             siteCandidateSpec = SiteCandidate(tmpSiteName)
0340             # set weight
0341             siteCandidateSpec.weight = weight
0342             # files
0343             for tmpDatasetName, availableFiles in availableFileMap.items():
0344                 if tmpSiteName in availableFiles:
0345                     siteCandidateSpec.add_local_disk_files(availableFiles[tmpSiteName]["localdisk"])
0346             # append
0347             if tmpSiteName in sitesUsedByTask:
0348                 candidateSpecList.append(siteCandidateSpec)
0349             else:
0350                 if weight not in weightMap:
0351                     weightMap[weight] = []
0352                 weightMap[weight].append(siteCandidateSpec)
0353         # limit the number of sites
0354         maxNumSites = 5
0355         weightList = sorted(weightMap.keys())
0356         weightList.reverse()
0357         for weightVal in weightList:
0358             if len(candidateSpecList) >= maxNumSites:
0359                 break
0360             sitesWithWeight = weightMap[weightVal]
0361             random.shuffle(sitesWithWeight)
0362             candidateSpecList += sitesWithWeight[: (maxNumSites - len(candidateSpecList))]
0363         # collect site names
0364         scanSiteList = []
0365         for siteCandidateSpec in candidateSpecList:
0366             scanSiteList.append(siteCandidateSpec.siteName)
0367         # append candidates
0368         newScanSiteList = []
0369         for siteCandidateSpec in candidateSpecList:
0370             # append
0371             inputChunk.addSiteCandidate(siteCandidateSpec)
0372             newScanSiteList.append(siteCandidateSpec.siteName)
0373             tmpLog.debug(f"  use {siteCandidateSpec.siteName} with weight={siteCandidateSpec.weight} nFiles={len(siteCandidateSpec.localDiskFiles)}")
0374         scanSiteList = newScanSiteList
0375         if scanSiteList == []:
0376             tmpLog.error("no candidates")
0377             taskSpec.setErrDiag(tmpLog.uploadLog(taskSpec.jediTaskID))
0378             return retTmpError
0379         # return
0380         tmpLog.debug("done")
0381         return self.SC_SUCCEEDED, inputChunk