File indexing completed on 2026-04-10 08:39:06
0001 import re
0002
0003 from pandaserver.taskbuffer.JobSpec import JobSpec
0004
0005
0006 ST_ready = 0
0007 ST_sent = 1
0008 ST_running = 2
0009 ST_finished = 3
0010 ST_cancelled = 4
0011 ST_discarded = 5
0012 ST_done = 6
0013 ST_failed = 7
0014 ST_fatal = 8
0015 ST_merged = 9
0016 ST_corrupted = 10
0017 ST_reserved_fail = 98
0018 ST_reserved_get = 99
0019
0020 ES_status_map = {
0021 ST_ready: "ready",
0022 ST_sent: "sent",
0023 ST_running: "running",
0024 ST_finished: "finished",
0025 ST_cancelled: "cancelled",
0026 ST_discarded: "discarded",
0027 ST_done: "done",
0028 ST_failed: "failed",
0029 ST_fatal: "fatal",
0030 ST_merged: "merged",
0031 ST_corrupted: "corrupted",
0032 ST_reserved_fail: "reserved_fail",
0033 ST_reserved_get: "reserved_get",
0034 }
0035
0036
0037 esHeader = "es:"
0038 singleConsumerType = {"runonce": "1", "storeonce": "2"}
0039
0040
0041 esToken = "eventservice"
0042 esMergeToken = "esmerge"
0043 dynamicNumEventsToken = JobSpec._tagForSH["dynamicNumEvents"]
0044 mergeAtOsToken = JobSpec._tagForSH["mergeAtOs"]
0045 resurrectConsumersToken = JobSpec._tagForSH["resurrectConsumers"]
0046 singleToken = JobSpec._tagForSH["jobCloning"]
0047
0048
0049
0050 esJobFlagNumber = 1
0051 esMergeJobFlagNumber = 2
0052 jobCloningFlagNumber = 3
0053 jumboJobFlagNumber = 4
0054 coJumboJobFlagNumber = 5
0055 fineGrainedFlagNumber = 6
0056
0057
0058
0059 TASK_NORMAL = 0
0060 TASK_EVENT_SERVICE = 1
0061 TASK_JOB_CLONING = 2
0062 TASK_FINE_GRAINED = 3
0063
0064
0065 eventTableIsJumbo = 1
0066
0067
0068
0069 siteIdForWaitingCoJumboJobs = "WAITING_CO_JUMBO"
0070
0071
0072
0073 relationTypeJS_ID = "jobset_id"
0074 relationTypeJS_Retry = "jobset_retry"
0075 relationTypeJS_Map = "jobset_map"
0076 relationTypesForJS = [relationTypeJS_ID, relationTypeJS_Retry, relationTypeJS_Map]
0077
0078
0079
0080 esSuffixDDM = ".events"
0081 esScopeDDM = "transient"
0082 esRegStatus = "esregister"
0083
0084
0085 defMaxAttemptEsJob = 3
0086
0087
0088
0089 PEC_corruptedInputFiles = [1171, 1145, 1175, 1103]
0090 PEC_corruptedInputFilesTmp = [1099]
0091
0092
0093
0094 def encodeFileInfo(
0095 lfn,
0096 startEvent,
0097 endEvent,
0098 nEventsPerWorker,
0099 maxAttempt=None,
0100 firstOffset=None,
0101 firstEvent=None,
0102 ):
0103 if maxAttempt is None:
0104 maxAttempt = 10
0105 if firstOffset is None:
0106 return f"{lfn}/{startEvent}/{endEvent}/{nEventsPerWorker}/{maxAttempt}^"
0107 else:
0108 try:
0109 totalOffset = firstEvent - firstOffset
0110 except Exception:
0111 totalOffset = 0
0112 return f"{lfn}/{startEvent}/{endEvent}/{nEventsPerWorker}/{maxAttempt}/{totalOffset}^"
0113
0114
0115
0116 def getHeaderForES(esIndex):
0117 return f"{esHeader}{esIndex}:"
0118
0119
0120
0121 def decodeFileInfo(specialHandling):
0122 eventServiceInfo = {}
0123 newSpecialHandling = ""
0124 esIndex = None
0125 try:
0126 for tmpItem in specialHandling.split(","):
0127 if tmpItem.startswith(esHeader):
0128 tmpItem = re.sub("^" + esHeader, "", tmpItem)
0129
0130 tmpMatch = re.search("^(\d+):", tmpItem)
0131 if tmpMatch is not None:
0132 esIndex = tmpMatch.group(1)
0133 tmpItem = re.sub("^(\d+):", "", tmpItem)
0134 for esItem in tmpItem.split("^"):
0135 if esItem == "":
0136 continue
0137 esItems = esItem.split("/")
0138 maxAttempt = 10
0139 esOffset = 0
0140 if len(esItems) == 3:
0141 esLFN, esEvents, esRange = esItems
0142 esStartEvent = 0
0143 elif len(esItems) == 5:
0144 esLFN, esStartEvent, esEndEvent, esRange, maxAttempt = esItems
0145 esEvents = int(esEndEvent) - int(esStartEvent) + 1
0146 elif len(esItems) == 6:
0147 (
0148 esLFN,
0149 esStartEvent,
0150 esEndEvent,
0151 esRange,
0152 maxAttempt,
0153 esOffset,
0154 ) = esItems
0155 esEvents = int(esEndEvent) - int(esStartEvent) + 1
0156 else:
0157 esLFN, esStartEvent, esEndEvent, esRange = esItems
0158 esEvents = int(esEndEvent) - int(esStartEvent) + 1
0159 eventServiceInfo[esLFN] = {
0160 "nEvents": int(esEvents),
0161 "startEvent": int(esStartEvent),
0162 "nEventsPerRange": int(esRange),
0163 "maxAttempt": int(maxAttempt),
0164 "offset": int(esOffset),
0165 }
0166 newSpecialHandling += f"{esToken},"
0167 else:
0168 newSpecialHandling += f"{tmpItem},"
0169 newSpecialHandling = newSpecialHandling[:-1]
0170 except Exception:
0171 newSpecialHandling = specialHandling
0172 return eventServiceInfo, newSpecialHandling, esIndex
0173
0174
0175
0176 def isEventServiceJob(job):
0177
0178 if is_fine_grained_job(job):
0179 return False
0180 return isEventServiceSH(job.specialHandling)
0181
0182
0183
0184 def isEventServiceMerge(job):
0185 return isEventServiceMergeSH(job.specialHandling)
0186
0187
0188
0189 def isEventServiceSH(specialHandling):
0190 try:
0191 if specialHandling is not None and esToken in specialHandling.split(","):
0192 return True
0193 except Exception:
0194 pass
0195 return False
0196
0197
0198
0199 def isEventServiceMergeSH(specialHandling):
0200 try:
0201 if specialHandling is not None and esMergeToken in specialHandling.split(","):
0202 return True
0203 except Exception:
0204 pass
0205 return False
0206
0207
0208
0209 def setEventServiceMerge(job):
0210 try:
0211
0212 job.eventService = esMergeJobFlagNumber
0213
0214 job.gshare = "Express"
0215
0216 if job.specialHandling is None:
0217 job.specialHandling = esMergeToken
0218 else:
0219 newSpecialHandling = ""
0220
0221 for tmpFlag in job.specialHandling.split(","):
0222 if tmpFlag in ["", esToken]:
0223 continue
0224 newSpecialHandling += tmpFlag
0225 newSpecialHandling += ","
0226 newSpecialHandling += esMergeToken
0227 job.specialHandling = newSpecialHandling
0228
0229 job.removeFakeJobToIgnore()
0230 except Exception:
0231 pass
0232
0233
0234
0235 def isJobCloningSH(specialHandling):
0236 try:
0237 if specialHandling is not None:
0238 for token in specialHandling.split(","):
0239 if singleToken == token.split(":")[0]:
0240 return True
0241 except Exception:
0242 pass
0243 return False
0244
0245
0246
0247 def isJobCloningJob(job):
0248 return isJobCloningSH(job.specialHandling)
0249
0250
0251
0252 def setHeaderForJobCloning(specialHandling, scType):
0253 if specialHandling is None:
0254 specialHandling = ""
0255 tokens = specialHandling.split(",")
0256 while True:
0257 try:
0258 tokens.remove("")
0259 except Exception:
0260 break
0261 if scType in singleConsumerType.values():
0262 tokens.append(f"{singleToken}:{scType}")
0263 return ",".join(tokens)
0264
0265
0266
0267 def getJobCloningType(job):
0268 if job.specialHandling is not None:
0269 for token in job.specialHandling.split(","):
0270 if singleToken == token.split(":")[0]:
0271 for tmpKey in singleConsumerType:
0272 tmpVal = singleConsumerType[tmpKey]
0273 if tmpVal == token.split(":")[-1]:
0274 return tmpKey
0275 return ""
0276
0277
0278
0279 def getJobCloningValue(scType):
0280 if scType in singleConsumerType:
0281 return singleConsumerType[scType]
0282 return ""
0283
0284
0285
0286 def setHeaderForDynNumEvents(specialHandling):
0287 if specialHandling is None:
0288 specialHandling = ""
0289 tokens = specialHandling.split(",")
0290 while True:
0291 try:
0292 tokens.remove("")
0293 except Exception:
0294 break
0295 tokens.append(dynamicNumEventsToken)
0296 return ",".join(tokens)
0297
0298
0299
0300 def isDynNumEventsSH(specialHandling):
0301 try:
0302 if specialHandling is not None:
0303 if dynamicNumEventsToken in specialHandling.split(","):
0304 return True
0305 except Exception:
0306 pass
0307 return False
0308
0309
0310
0311 def removeHeaderForES(job):
0312 if job.specialHandling is not None:
0313 items = job.specialHandling.split(",")
0314 newItems = []
0315 for item in items:
0316 if re.search("^" + esHeader + ".+", item) is None:
0317 newItems.append(item)
0318 job.specialHandling = ",".join(newItems)
0319
0320
0321
0322 def isJumboJob(job):
0323 return job.eventService == jumboJobFlagNumber
0324
0325
0326
0327 def isCoJumboJob(job):
0328 return job.eventService == coJumboJobFlagNumber
0329
0330
0331
0332 def setHeaderForMergeAtOS(specialHandling):
0333 if specialHandling is None:
0334 specialHandling = ""
0335 tokens = specialHandling.split(",")
0336 while True:
0337 try:
0338 tokens.remove("")
0339 except Exception:
0340 break
0341 if mergeAtOsToken not in tokens:
0342 tokens.append(mergeAtOsToken)
0343 return ",".join(tokens)
0344
0345
0346
0347 def isMergeAtOS(specialHandling):
0348 try:
0349 if specialHandling is not None:
0350 if mergeAtOsToken in specialHandling.split(","):
0351 return True
0352 except Exception:
0353 pass
0354 return False
0355
0356
0357
0358 def getEsDatasetName(taskID):
0359 esDataset = f"{esScopeDDM}:{taskID}{esSuffixDDM}"
0360 return esDataset
0361
0362
0363
0364 def setHeaderToResurrectConsumers(specialHandling):
0365 if specialHandling is None:
0366 specialHandling = ""
0367 tokens = specialHandling.split(",")
0368 while True:
0369 try:
0370 tokens.remove("")
0371 except Exception:
0372 break
0373 if resurrectConsumersToken not in tokens:
0374 tokens.append(resurrectConsumersToken)
0375 return ",".join(tokens)
0376
0377
0378
0379 def isResurrectConsumers(specialHandling):
0380 try:
0381 if specialHandling is not None:
0382 if resurrectConsumersToken in specialHandling.split(","):
0383 return True
0384 except Exception:
0385 pass
0386 return False
0387
0388
0389
0390 def is_fine_grained_job(job):
0391 return job.eventService == fineGrainedFlagNumber
0392
0393
0394
0395 def set_fine_grained(job):
0396 job.eventService = fineGrainedFlagNumber