File indexing completed on 2026-04-10 08:39:00
0001 import datetime
0002 import functools
0003 import time
0004 import traceback
0005 import uuid
0006
0007 from pandacommon.pandalogger.LogWrapper import LogWrapper
0008 from pandacommon.pandalogger.PandaLogger import PandaLogger
0009 from pandacommon.pandautils.PandaUtils import naive_utcnow
0010
0011 from pandaserver.config import panda_config
0012 from pandaserver.dataservice import DataServiceUtils
0013
0014 _log = PandaLogger().getLogger("broker")
0015
0016
0017 _allSites = []
0018
0019
0020 skipBrokerageProTypes = ["prod_test"]
0021
0022
0023
0024 def _compFunc(job_a, job_b):
0025
0026
0027 if job_a.computingSite not in _allSites:
0028 _allSites.append(job_a.computingSite)
0029 if job_b.computingSite not in _allSites:
0030 _allSites.append(job_b.computingSite)
0031
0032
0033 index_a = _allSites.index(job_a.computingSite)
0034 index_b = _allSites.index(job_b.computingSite)
0035
0036 if index_a > index_b:
0037 return 1
0038 elif index_a < index_b:
0039 return -1
0040 else:
0041 return 0
0042
0043
0044 def schedule(jobs, siteMapper):
0045 timestamp = naive_utcnow().isoformat("/")
0046 tmp_log = LogWrapper(_log, f"start_ts={timestamp}")
0047
0048 try:
0049
0050 if len(jobs) == 0:
0051 tmp_log.debug("finished : no jobs")
0052 return
0053
0054 max_jobs = 20
0055 max_files = 20
0056
0057 iJob = 0
0058 fileList = []
0059 chosen_panda_queue = None
0060 prodDBlock = None
0061 computingSite = None
0062 dispatchDBlock = None
0063 previousCloud = None
0064 prevProType = None
0065 prevSourceLabel = None
0066 prevDirectAcc = None
0067 prevIsJEDI = None
0068 prevBrokerageSiteList = None
0069
0070 indexJob = 0
0071
0072
0073 jobs = sorted(jobs, key=functools.cmp_to_key(_compFunc))
0074
0075
0076 for job in jobs + [None]:
0077 indexJob += 1
0078
0079
0080 if job and job.jobStatus == "failed":
0081 continue
0082
0083
0084 specialBrokerageSiteList = []
0085
0086
0087 if job and job.computingSite != "NULL" and job.prodSourceLabel in ("test", "managed") and specialBrokerageSiteList == []:
0088 specialBrokerageSiteList = [job.computingSite]
0089
0090 overwriteSite = False
0091
0092
0093 isJEDI = False
0094 if job and job.lockedby == "jedi":
0095 isJEDI = True
0096
0097
0098 if (
0099 job is None
0100 or len(fileList) >= max_files
0101 or (dispatchDBlock is None and job.homepackage.startswith("AnalysisTransforms"))
0102 or prodDBlock != job.prodDBlock
0103 or job.computingSite != computingSite
0104 or iJob > max_jobs
0105 or previousCloud != job.getCloud()
0106 or (prevProType in skipBrokerageProTypes and iJob > 0)
0107 or prevDirectAcc != job.transferType
0108 or prevProType != job.processingType
0109 or prevBrokerageSiteList != specialBrokerageSiteList
0110 or prevIsJEDI != isJEDI
0111 ):
0112 if indexJob > 1:
0113 tmp_log.debug("new bunch")
0114 tmp_log.debug(f" iJob {iJob}")
0115 tmp_log.debug(f" cloud {previousCloud}")
0116 tmp_log.debug(f" sourceLabel {prevSourceLabel}")
0117 tmp_log.debug(f" prodDBlock {prodDBlock}")
0118 tmp_log.debug(f" computingSite {computingSite}")
0119 tmp_log.debug(f" processingType {prevProType}")
0120 tmp_log.debug(f" transferType {prevDirectAcc}")
0121
0122 if (iJob != 0 and chosen_panda_queue == "TOBEDONE") or prevBrokerageSiteList not in [None, []]:
0123
0124 minSites = {}
0125 if prevBrokerageSiteList:
0126
0127 scanSiteList = prevBrokerageSiteList
0128 else:
0129 if siteMapper.checkCloud(previousCloud):
0130
0131 scanSiteList = siteMapper.getCloud(previousCloud)["sites"]
0132
0133
0134 for site in scanSiteList:
0135 tmp_log.debug(f"calculate weight for site:{site}")
0136
0137 if site == "NULL":
0138 tmp_log.debug("site is NULL")
0139 continue
0140
0141 winv = 1
0142
0143 tmp_log.debug(f"Site:{site} 1/Weight:{winv}")
0144
0145
0146 minSites[site] = winv
0147
0148
0149 tmp_log.debug(f"Min Sites:{minSites}")
0150 if len(fileList) == 0 or prevIsJEDI is True:
0151
0152 minSite = list(minSites)[0]
0153 chosen_panda_queue = siteMapper.getSite(minSite)
0154
0155
0156 tmp_log.debug(f"indexJob : {indexJob}")
0157
0158 for tmpJob in jobs[indexJob - iJob - 1 : indexJob - 1]:
0159
0160 tmpJob.computingSite = chosen_panda_queue.sitename
0161 tmp_log.debug(f"PandaID:{tmpJob.PandaID} -> site:{tmpJob.computingSite}")
0162
0163
0164 if job is None:
0165 break
0166
0167 iJob = 0
0168
0169 fileList = []
0170
0171 if job.prodDBlock != "NULL":
0172
0173 try:
0174 tmpDataType = job.prodDBlock.split(":")[-1].split(".")[-2]
0175 except Exception:
0176
0177 tmpDataType = "GEN"
0178 if len(tmpDataType) > 20:
0179
0180 tmpDataType = "GEN"
0181 transferType = "transfer"
0182 if job.useInputPrestaging():
0183 transferType = "prestaging"
0184 dispatchDBlock = f"panda.{job.taskID}.{time.strftime('%m.%d')}.{tmpDataType}.{transferType}.{str(uuid.uuid4())}_dis{job.PandaID}"
0185 tmp_log.debug(f"New dispatchDBlock: {dispatchDBlock}")
0186 prodDBlock = job.prodDBlock
0187
0188 if job.computingSite != "NULL":
0189
0190 chosen_panda_queue = siteMapper.getSite(job.computingSite)
0191
0192
0193 if job.homepackage.startswith("AnalysisTransforms"):
0194 if chosen_panda_queue.sitename == panda_config.def_sitename:
0195 chosen_panda_queue = siteMapper.getSite(panda_config.def_queue)
0196 overwriteSite = True
0197 else:
0198
0199 if job.homepackage.startswith("AnalysisTransforms"):
0200 chosen_panda_queue = siteMapper.getSite(panda_config.def_queue)
0201 overwriteSite = True
0202 else:
0203
0204 chosen_panda_queue = "TOBEDONE"
0205
0206 iJob += 1
0207
0208 computingSite = job.computingSite
0209 previousCloud = job.getCloud()
0210 prevProType = job.processingType
0211 prevSourceLabel = job.prodSourceLabel
0212 prevDirectAcc = job.transferType
0213 prevBrokerageSiteList = specialBrokerageSiteList
0214 prevIsJEDI = isJEDI
0215
0216
0217 if chosen_panda_queue != "TOBEDONE":
0218 job.computingSite = chosen_panda_queue.sitename
0219 tmp_log.debug(f"PandaID:{job.PandaID} -> preset site:{chosen_panda_queue.sitename}")
0220
0221 if job.cloud in ["NULL", None, ""]:
0222 job.cloud = chosen_panda_queue.cloud
0223
0224
0225 destSE = job.destinationSE
0226 if siteMapper.checkCloud(job.getCloud()):
0227
0228 if job.prodSourceLabel != "user" and job.destinationSE not in siteMapper.siteSpecList and job.destinationSE != "local":
0229 if DataServiceUtils.checkJobDestinationSE(job):
0230 destSE = DataServiceUtils.checkJobDestinationSE(job)
0231 job.destinationSE = destSE
0232
0233 if overwriteSite:
0234
0235 destSE = job.computingSite
0236 job.destinationSE = destSE
0237
0238
0239 first = True
0240 for file in job.Files:
0241
0242 if file.type == "input" and file.dispatchDBlock == "NULL" and file.status not in ["ready", "missing", "cached"]:
0243 if first:
0244 first = False
0245 job.dispatchDBlock = dispatchDBlock
0246 file.dispatchDBlock = dispatchDBlock
0247 file.status = "pending"
0248 if file.lfn not in fileList:
0249 fileList.append(file.lfn)
0250
0251
0252 if file.type in ["output", "log"] and destSE != "":
0253 if job.prodSourceLabel == "user" and job.computingSite == file.destinationSE:
0254 pass
0255 elif job.prodSourceLabel == "user" and prevIsJEDI is True and file.destinationSE not in ["", "NULL"]:
0256 pass
0257 elif destSE == "local":
0258 pass
0259 elif DataServiceUtils.getDistributedDestination(file.destinationDBlockToken):
0260 pass
0261 else:
0262 file.destinationSE = destSE
0263
0264
0265 if file.type == "log":
0266
0267 file.GUID = str(uuid.uuid4())
0268
0269 tmp_log.debug("finished")
0270
0271 except Exception as e:
0272 tmp_log.error(f"schedule : {str(e)} {traceback.format_exc()}")