Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 import re
0002 
0003 from pandaserver.taskbuffer.JobSpec import JobSpec
0004 
0005 # status codes for each event range
0006 ST_ready = 0
0007 ST_sent = 1
0008 ST_running = 2
0009 ST_finished = 3
0010 ST_cancelled = 4
0011 ST_discarded = 5
0012 ST_done = 6
0013 ST_failed = 7
0014 ST_fatal = 8
0015 ST_merged = 9
0016 ST_corrupted = 10
0017 ST_reserved_fail = 98
0018 ST_reserved_get = 99
0019 
0020 ES_status_map = {
0021     ST_ready: "ready",
0022     ST_sent: "sent",
0023     ST_running: "running",
0024     ST_finished: "finished",
0025     ST_cancelled: "cancelled",
0026     ST_discarded: "discarded",
0027     ST_done: "done",
0028     ST_failed: "failed",
0029     ST_fatal: "fatal",
0030     ST_merged: "merged",
0031     ST_corrupted: "corrupted",
0032     ST_reserved_fail: "reserved_fail",
0033     ST_reserved_get: "reserved_get",
0034 }
0035 
0036 # identifiers for specialHandling
0037 esHeader = "es:"
0038 singleConsumerType = {"runonce": "1", "storeonce": "2"}
0039 
0040 # tags for special handling. check JobSpec._tagForSH for duplication
0041 esToken = "eventservice"
0042 esMergeToken = "esmerge"
0043 dynamicNumEventsToken = JobSpec._tagForSH["dynamicNumEvents"]
0044 mergeAtOsToken = JobSpec._tagForSH["mergeAtOs"]
0045 resurrectConsumersToken = JobSpec._tagForSH["resurrectConsumers"]
0046 singleToken = JobSpec._tagForSH["jobCloning"]
0047 
0048 
0049 # values for job.eventService
0050 esJobFlagNumber = 1
0051 esMergeJobFlagNumber = 2
0052 jobCloningFlagNumber = 3
0053 jumboJobFlagNumber = 4
0054 coJumboJobFlagNumber = 5
0055 fineGrainedFlagNumber = 6
0056 
0057 
0058 # values for task.eventService
0059 TASK_NORMAL = 0
0060 TASK_EVENT_SERVICE = 1
0061 TASK_JOB_CLONING = 2
0062 TASK_FINE_GRAINED = 3
0063 
0064 # values for event.is_jumbo
0065 eventTableIsJumbo = 1
0066 
0067 
0068 # siteid for waiting co-jumbo jobs
0069 siteIdForWaitingCoJumboJobs = "WAITING_CO_JUMBO"
0070 
0071 
0072 # relation type for jobsets
0073 relationTypeJS_ID = "jobset_id"
0074 relationTypeJS_Retry = "jobset_retry"
0075 relationTypeJS_Map = "jobset_map"
0076 relationTypesForJS = [relationTypeJS_ID, relationTypeJS_Retry, relationTypeJS_Map]
0077 
0078 
0079 # suffix for ES dataset and files to register to DDM
0080 esSuffixDDM = ".events"
0081 esScopeDDM = "transient"
0082 esRegStatus = "esregister"
0083 
0084 # default max number of ES job attempt
0085 defMaxAttemptEsJob = 3
0086 
0087 
0088 # pilot error code
0089 PEC_corruptedInputFiles = [1171, 1145, 1175, 1103]
0090 PEC_corruptedInputFilesTmp = [1099]
0091 
0092 
0093 # encode file info
0094 def encodeFileInfo(
0095     lfn,
0096     startEvent,
0097     endEvent,
0098     nEventsPerWorker,
0099     maxAttempt=None,
0100     firstOffset=None,
0101     firstEvent=None,
0102 ):
0103     if maxAttempt is None:
0104         maxAttempt = 10
0105     if firstOffset is None:
0106         return f"{lfn}/{startEvent}/{endEvent}/{nEventsPerWorker}/{maxAttempt}^"
0107     else:
0108         try:
0109             totalOffset = firstEvent - firstOffset
0110         except Exception:
0111             totalOffset = 0
0112         return f"{lfn}/{startEvent}/{endEvent}/{nEventsPerWorker}/{maxAttempt}/{totalOffset}^"
0113 
0114 
0115 # get header for specialHandling
0116 def getHeaderForES(esIndex):
0117     return f"{esHeader}{esIndex}:"
0118 
0119 
0120 # decode file info
0121 def decodeFileInfo(specialHandling):
0122     eventServiceInfo = {}
0123     newSpecialHandling = ""
0124     esIndex = None
0125     try:
0126         for tmpItem in specialHandling.split(","):
0127             if tmpItem.startswith(esHeader):
0128                 tmpItem = re.sub("^" + esHeader, "", tmpItem)
0129                 # look for event service index
0130                 tmpMatch = re.search("^(\d+):", tmpItem)
0131                 if tmpMatch is not None:
0132                     esIndex = tmpMatch.group(1)
0133                     tmpItem = re.sub("^(\d+):", "", tmpItem)
0134                 for esItem in tmpItem.split("^"):
0135                     if esItem == "":
0136                         continue
0137                     esItems = esItem.split("/")
0138                     maxAttempt = 10
0139                     esOffset = 0
0140                     if len(esItems) == 3:
0141                         esLFN, esEvents, esRange = esItems
0142                         esStartEvent = 0
0143                     elif len(esItems) == 5:
0144                         esLFN, esStartEvent, esEndEvent, esRange, maxAttempt = esItems
0145                         esEvents = int(esEndEvent) - int(esStartEvent) + 1
0146                     elif len(esItems) == 6:
0147                         (
0148                             esLFN,
0149                             esStartEvent,
0150                             esEndEvent,
0151                             esRange,
0152                             maxAttempt,
0153                             esOffset,
0154                         ) = esItems
0155                         esEvents = int(esEndEvent) - int(esStartEvent) + 1
0156                     else:
0157                         esLFN, esStartEvent, esEndEvent, esRange = esItems
0158                         esEvents = int(esEndEvent) - int(esStartEvent) + 1
0159                     eventServiceInfo[esLFN] = {
0160                         "nEvents": int(esEvents),
0161                         "startEvent": int(esStartEvent),
0162                         "nEventsPerRange": int(esRange),
0163                         "maxAttempt": int(maxAttempt),
0164                         "offset": int(esOffset),
0165                     }
0166                 newSpecialHandling += f"{esToken},"
0167             else:
0168                 newSpecialHandling += f"{tmpItem},"
0169         newSpecialHandling = newSpecialHandling[:-1]
0170     except Exception:
0171         newSpecialHandling = specialHandling
0172     return eventServiceInfo, newSpecialHandling, esIndex
0173 
0174 
0175 # check if event service job
0176 def isEventServiceJob(job):
0177     # fine-grained job
0178     if is_fine_grained_job(job):
0179         return False
0180     return isEventServiceSH(job.specialHandling)
0181 
0182 
0183 # check if event service merge job
0184 def isEventServiceMerge(job):
0185     return isEventServiceMergeSH(job.specialHandling)
0186 
0187 
0188 # check if specialHandling for event service
0189 def isEventServiceSH(specialHandling):
0190     try:
0191         if specialHandling is not None and esToken in specialHandling.split(","):
0192             return True
0193     except Exception:
0194         pass
0195     return False
0196 
0197 
0198 # check if specialHandling for event service merge
0199 def isEventServiceMergeSH(specialHandling):
0200     try:
0201         if specialHandling is not None and esMergeToken in specialHandling.split(","):
0202             return True
0203     except Exception:
0204         pass
0205     return False
0206 
0207 
0208 # set event service merge
0209 def setEventServiceMerge(job):
0210     try:
0211         # set ES flag
0212         job.eventService = esMergeJobFlagNumber
0213         # set gshare to express
0214         job.gshare = "Express"
0215         # set flag for merging
0216         if job.specialHandling is None:
0217             job.specialHandling = esMergeToken
0218         else:
0219             newSpecialHandling = ""
0220             # remove flag for event service
0221             for tmpFlag in job.specialHandling.split(","):
0222                 if tmpFlag in ["", esToken]:
0223                     continue
0224                 newSpecialHandling += tmpFlag
0225                 newSpecialHandling += ","
0226             newSpecialHandling += esMergeToken
0227             job.specialHandling = newSpecialHandling
0228         # remove fake flag
0229         job.removeFakeJobToIgnore()
0230     except Exception:
0231         pass
0232 
0233 
0234 # check if specialHandling for job cloning
0235 def isJobCloningSH(specialHandling):
0236     try:
0237         if specialHandling is not None:
0238             for token in specialHandling.split(","):
0239                 if singleToken == token.split(":")[0]:
0240                     return True
0241     except Exception:
0242         pass
0243     return False
0244 
0245 
0246 # check if event service job
0247 def isJobCloningJob(job):
0248     return isJobCloningSH(job.specialHandling)
0249 
0250 
0251 # set header for job cloning
0252 def setHeaderForJobCloning(specialHandling, scType):
0253     if specialHandling is None:
0254         specialHandling = ""
0255     tokens = specialHandling.split(",")
0256     while True:
0257         try:
0258             tokens.remove("")
0259         except Exception:
0260             break
0261     if scType in singleConsumerType.values():
0262         tokens.append(f"{singleToken}:{scType}")
0263     return ",".join(tokens)
0264 
0265 
0266 # get consumer type
0267 def getJobCloningType(job):
0268     if job.specialHandling is not None:
0269         for token in job.specialHandling.split(","):
0270             if singleToken == token.split(":")[0]:
0271                 for tmpKey in singleConsumerType:
0272                     tmpVal = singleConsumerType[tmpKey]
0273                     if tmpVal == token.split(":")[-1]:
0274                         return tmpKey
0275     return ""
0276 
0277 
0278 # get consumer value
0279 def getJobCloningValue(scType):
0280     if scType in singleConsumerType:
0281         return singleConsumerType[scType]
0282     return ""
0283 
0284 
0285 # set header for dynamic number of events
0286 def setHeaderForDynNumEvents(specialHandling):
0287     if specialHandling is None:
0288         specialHandling = ""
0289     tokens = specialHandling.split(",")
0290     while True:
0291         try:
0292             tokens.remove("")
0293         except Exception:
0294             break
0295     tokens.append(dynamicNumEventsToken)
0296     return ",".join(tokens)
0297 
0298 
0299 # check if specialHandling for dynamic number of events
0300 def isDynNumEventsSH(specialHandling):
0301     try:
0302         if specialHandling is not None:
0303             if dynamicNumEventsToken in specialHandling.split(","):
0304                 return True
0305     except Exception:
0306         pass
0307     return False
0308 
0309 
0310 # remove event service header
0311 def removeHeaderForES(job):
0312     if job.specialHandling is not None:
0313         items = job.specialHandling.split(",")
0314         newItems = []
0315         for item in items:
0316             if re.search("^" + esHeader + ".+", item) is None:
0317                 newItems.append(item)
0318     job.specialHandling = ",".join(newItems)
0319 
0320 
0321 # check if jumbo job
0322 def isJumboJob(job):
0323     return job.eventService == jumboJobFlagNumber
0324 
0325 
0326 # check if cooperative with jumbo job
0327 def isCoJumboJob(job):
0328     return job.eventService == coJumboJobFlagNumber
0329 
0330 
0331 # set header for merge at OS
0332 def setHeaderForMergeAtOS(specialHandling):
0333     if specialHandling is None:
0334         specialHandling = ""
0335     tokens = specialHandling.split(",")
0336     while True:
0337         try:
0338             tokens.remove("")
0339         except Exception:
0340             break
0341     if mergeAtOsToken not in tokens:
0342         tokens.append(mergeAtOsToken)
0343     return ",".join(tokens)
0344 
0345 
0346 # check if specialHandling for merge at OS
0347 def isMergeAtOS(specialHandling):
0348     try:
0349         if specialHandling is not None:
0350             if mergeAtOsToken in specialHandling.split(","):
0351                 return True
0352     except Exception:
0353         pass
0354     return False
0355 
0356 
0357 # get dataset name for event service
0358 def getEsDatasetName(taskID):
0359     esDataset = f"{esScopeDDM}:{taskID}{esSuffixDDM}"
0360     return esDataset
0361 
0362 
0363 # set header to resurrect consumers
0364 def setHeaderToResurrectConsumers(specialHandling):
0365     if specialHandling is None:
0366         specialHandling = ""
0367     tokens = specialHandling.split(",")
0368     while True:
0369         try:
0370             tokens.remove("")
0371         except Exception:
0372             break
0373     if resurrectConsumersToken not in tokens:
0374         tokens.append(resurrectConsumersToken)
0375     return ",".join(tokens)
0376 
0377 
0378 # check if specialHandling to resurrect consumers
0379 def isResurrectConsumers(specialHandling):
0380     try:
0381         if specialHandling is not None:
0382             if resurrectConsumersToken in specialHandling.split(","):
0383                 return True
0384     except Exception:
0385         pass
0386     return False
0387 
0388 
0389 # check if fine-grained job
0390 def is_fine_grained_job(job):
0391     return job.eventService == fineGrainedFlagNumber
0392 
0393 
0394 # set fine-grained
0395 def set_fine_grained(job):
0396     job.eventService = fineGrainedFlagNumber