Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 import datetime
0002 import functools
0003 import time
0004 import traceback
0005 import uuid
0006 
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 from pandacommon.pandautils.PandaUtils import naive_utcnow
0010 
0011 from pandaserver.config import panda_config
0012 from pandaserver.dataservice import DataServiceUtils
0013 
0014 _log = PandaLogger().getLogger("broker")
0015 
0016 # all known sites
0017 _allSites = []
0018 
0019 # processingType to skip brokerage
0020 skipBrokerageProTypes = ["prod_test"]
0021 
0022 
0023 # comparison function for sort
0024 def _compFunc(job_a, job_b):
0025     # This comparison helps in sorting the jobs based on the order of their computing sites in the _allSites list.
0026     # append site if not in list
0027     if job_a.computingSite not in _allSites:
0028         _allSites.append(job_a.computingSite)
0029     if job_b.computingSite not in _allSites:
0030         _allSites.append(job_b.computingSite)
0031 
0032     # compare site indices
0033     index_a = _allSites.index(job_a.computingSite)
0034     index_b = _allSites.index(job_b.computingSite)
0035 
0036     if index_a > index_b:
0037         return 1
0038     elif index_a < index_b:
0039         return -1
0040     else:
0041         return 0
0042 
0043 
0044 def schedule(jobs, siteMapper):
0045     timestamp = naive_utcnow().isoformat("/")
0046     tmp_log = LogWrapper(_log, f"start_ts={timestamp}")
0047 
0048     try:
0049         # no jobs
0050         if len(jobs) == 0:
0051             tmp_log.debug("finished : no jobs")
0052             return
0053 
0054         max_jobs = 20
0055         max_files = 20
0056 
0057         iJob = 0
0058         fileList = []
0059         chosen_panda_queue = None
0060         prodDBlock = None
0061         computingSite = None
0062         dispatchDBlock = None
0063         previousCloud = None
0064         prevProType = None
0065         prevSourceLabel = None
0066         prevDirectAcc = None
0067         prevIsJEDI = None
0068         prevBrokerageSiteList = None
0069 
0070         indexJob = 0
0071 
0072         # sort jobs by siteID. Some jobs may already define computingSite
0073         jobs = sorted(jobs, key=functools.cmp_to_key(_compFunc))
0074 
0075         # loop over all jobs + terminator(None)
0076         for job in jobs + [None]:
0077             indexJob += 1
0078 
0079             # ignore failed jobs
0080             if job and job.jobStatus == "failed":
0081                 continue
0082 
0083             # list of sites for special brokerage
0084             specialBrokerageSiteList = []
0085 
0086             # manually set site
0087             if job and job.computingSite != "NULL" and job.prodSourceLabel in ("test", "managed") and specialBrokerageSiteList == []:
0088                 specialBrokerageSiteList = [job.computingSite]
0089 
0090             overwriteSite = False
0091 
0092             # check JEDI
0093             isJEDI = False
0094             if job and job.lockedby == "jedi":
0095                 isJEDI = True
0096 
0097             # new bunch or terminator
0098             if (
0099                 job is None
0100                 or len(fileList) >= max_files
0101                 or (dispatchDBlock is None and job.homepackage.startswith("AnalysisTransforms"))
0102                 or prodDBlock != job.prodDBlock
0103                 or job.computingSite != computingSite
0104                 or iJob > max_jobs
0105                 or previousCloud != job.getCloud()
0106                 or (prevProType in skipBrokerageProTypes and iJob > 0)
0107                 or prevDirectAcc != job.transferType
0108                 or prevProType != job.processingType
0109                 or prevBrokerageSiteList != specialBrokerageSiteList
0110                 or prevIsJEDI != isJEDI
0111             ):
0112                 if indexJob > 1:
0113                     tmp_log.debug("new bunch")
0114                     tmp_log.debug(f"  iJob           {iJob}")
0115                     tmp_log.debug(f"  cloud          {previousCloud}")
0116                     tmp_log.debug(f"  sourceLabel    {prevSourceLabel}")
0117                     tmp_log.debug(f"  prodDBlock     {prodDBlock}")
0118                     tmp_log.debug(f"  computingSite  {computingSite}")
0119                     tmp_log.debug(f"  processingType {prevProType}")
0120                     tmp_log.debug(f"  transferType   {prevDirectAcc}")
0121 
0122                 if (iJob != 0 and chosen_panda_queue == "TOBEDONE") or prevBrokerageSiteList not in [None, []]:
0123                     # load balancing
0124                     minSites = {}
0125                     if prevBrokerageSiteList:
0126                         # special brokerage
0127                         scanSiteList = prevBrokerageSiteList
0128                     else:
0129                         if siteMapper.checkCloud(previousCloud):
0130                             # use cloud sites
0131                             scanSiteList = siteMapper.getCloud(previousCloud)["sites"]
0132 
0133                     # loop over all sites
0134                     for site in scanSiteList:
0135                         tmp_log.debug(f"calculate weight for site:{site}")
0136                         # _allSites may contain NULL after sort()
0137                         if site == "NULL":
0138                             tmp_log.debug("site is NULL")
0139                             continue
0140 
0141                         winv = 1
0142 
0143                         tmp_log.debug(f"Site:{site} 1/Weight:{winv}")
0144 
0145                         # choose largest nMinSites weights
0146                         minSites[site] = winv
0147 
0148                     # choose site
0149                     tmp_log.debug(f"Min Sites:{minSites}")
0150                     if len(fileList) == 0 or prevIsJEDI is True:
0151                         # choose min 1/weight
0152                         minSite = list(minSites)[0]
0153                         chosen_panda_queue = siteMapper.getSite(minSite)
0154 
0155                     # set job spec
0156                     tmp_log.debug(f"indexJob      : {indexJob}")
0157 
0158                     for tmpJob in jobs[indexJob - iJob - 1 : indexJob - 1]:
0159                         # set computingSite
0160                         tmpJob.computingSite = chosen_panda_queue.sitename
0161                         tmp_log.debug(f"PandaID:{tmpJob.PandaID} -> site:{tmpJob.computingSite}")
0162 
0163                 # terminate
0164                 if job is None:
0165                     break
0166                 # reset iJob
0167                 iJob = 0
0168                 # reset file list
0169                 fileList = []
0170                 # create new dispDBlock
0171                 if job.prodDBlock != "NULL":
0172                     # get datatype
0173                     try:
0174                         tmpDataType = job.prodDBlock.split(":")[-1].split(".")[-2]
0175                     except Exception:
0176                         # default
0177                         tmpDataType = "GEN"
0178                     if len(tmpDataType) > 20:
0179                         # avoid too long name
0180                         tmpDataType = "GEN"
0181                     transferType = "transfer"
0182                     if job.useInputPrestaging():
0183                         transferType = "prestaging"
0184                     dispatchDBlock = f"panda.{job.taskID}.{time.strftime('%m.%d')}.{tmpDataType}.{transferType}.{str(uuid.uuid4())}_dis{job.PandaID}"
0185                     tmp_log.debug(f"New dispatchDBlock: {dispatchDBlock}")
0186                 prodDBlock = job.prodDBlock
0187                 # already define computingSite
0188                 if job.computingSite != "NULL":
0189                     # instantiate KnownSite
0190                     chosen_panda_queue = siteMapper.getSite(job.computingSite)
0191 
0192                     # if site doesn't exist, use the default site
0193                     if job.homepackage.startswith("AnalysisTransforms"):
0194                         if chosen_panda_queue.sitename == panda_config.def_sitename:
0195                             chosen_panda_queue = siteMapper.getSite(panda_config.def_queue)
0196                             overwriteSite = True
0197                 else:
0198                     # default for Analysis jobs
0199                     if job.homepackage.startswith("AnalysisTransforms"):
0200                         chosen_panda_queue = siteMapper.getSite(panda_config.def_queue)
0201                         overwriteSite = True
0202                     else:
0203                         # set chosen_panda_queue
0204                         chosen_panda_queue = "TOBEDONE"
0205             # increment iJob
0206             iJob += 1
0207             # reserve computingSite and cloud
0208             computingSite = job.computingSite
0209             previousCloud = job.getCloud()
0210             prevProType = job.processingType
0211             prevSourceLabel = job.prodSourceLabel
0212             prevDirectAcc = job.transferType
0213             prevBrokerageSiteList = specialBrokerageSiteList
0214             prevIsJEDI = isJEDI
0215 
0216             # assign site
0217             if chosen_panda_queue != "TOBEDONE":
0218                 job.computingSite = chosen_panda_queue.sitename
0219                 tmp_log.debug(f"PandaID:{job.PandaID} -> preset site:{chosen_panda_queue.sitename}")
0220                 # set cloud
0221                 if job.cloud in ["NULL", None, ""]:
0222                     job.cloud = chosen_panda_queue.cloud
0223 
0224             # set destinationSE
0225             destSE = job.destinationSE
0226             if siteMapper.checkCloud(job.getCloud()):
0227                 # use cloud dest for non-existing sites
0228                 if job.prodSourceLabel != "user" and job.destinationSE not in siteMapper.siteSpecList and job.destinationSE != "local":
0229                     if DataServiceUtils.checkJobDestinationSE(job):
0230                         destSE = DataServiceUtils.checkJobDestinationSE(job)
0231                     job.destinationSE = destSE
0232 
0233             if overwriteSite:
0234                 # overwrite SE for analysis jobs which set non-existing sites
0235                 destSE = job.computingSite
0236                 job.destinationSE = destSE
0237 
0238             # set dispatchDBlock and destinationSE
0239             first = True
0240             for file in job.Files:
0241                 # Set dispatch data block for pre-stating jobs too
0242                 if file.type == "input" and file.dispatchDBlock == "NULL" and file.status not in ["ready", "missing", "cached"]:
0243                     if first:
0244                         first = False
0245                         job.dispatchDBlock = dispatchDBlock
0246                     file.dispatchDBlock = dispatchDBlock
0247                     file.status = "pending"
0248                     if file.lfn not in fileList:
0249                         fileList.append(file.lfn)
0250 
0251                 # destinationSE
0252                 if file.type in ["output", "log"] and destSE != "":
0253                     if job.prodSourceLabel == "user" and job.computingSite == file.destinationSE:
0254                         pass
0255                     elif job.prodSourceLabel == "user" and prevIsJEDI is True and file.destinationSE not in ["", "NULL"]:
0256                         pass
0257                     elif destSE == "local":
0258                         pass
0259                     elif DataServiceUtils.getDistributedDestination(file.destinationDBlockToken):
0260                         pass
0261                     else:
0262                         file.destinationSE = destSE
0263 
0264                 # pre-assign GUID to log
0265                 if file.type == "log":
0266                     # generate GUID
0267                     file.GUID = str(uuid.uuid4())
0268 
0269         tmp_log.debug("finished")
0270 
0271     except Exception as e:
0272         tmp_log.error(f"schedule : {str(e)} {traceback.format_exc()}")