Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 """
0002 job specification
0003 
0004 """
0005 
0006 import datetime
0007 import json
0008 import re
0009 
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011 
0012 from pandaserver.taskbuffer.FileSpec import FileSpec
0013 
0014 reserveChangedState = False
0015 
0016 
0017 class JobSpec(object):
0018     # attributes
0019     _attributes = (
0020         "PandaID",
0021         "jobDefinitionID",
0022         "schedulerID",
0023         "pilotID",
0024         "creationTime",
0025         "creationHost",
0026         "modificationTime",
0027         "modificationHost",
0028         "AtlasRelease",
0029         "transformation",
0030         "homepackage",
0031         "prodSeriesLabel",
0032         "prodSourceLabel",
0033         "prodUserID",
0034         "assignedPriority",
0035         "currentPriority",
0036         "attemptNr",
0037         "maxAttempt",
0038         "jobStatus",
0039         "jobName",
0040         "maxCpuCount",
0041         "maxCpuUnit",
0042         "maxDiskCount",
0043         "maxDiskUnit",
0044         "ipConnectivity",
0045         "minRamCount",
0046         "minRamUnit",
0047         "startTime",
0048         "endTime",
0049         "cpuConsumptionTime",
0050         "cpuConsumptionUnit",
0051         "commandToPilot",
0052         "transExitCode",
0053         "pilotErrorCode",
0054         "pilotErrorDiag",
0055         "exeErrorCode",
0056         "exeErrorDiag",
0057         "supErrorCode",
0058         "supErrorDiag",
0059         "ddmErrorCode",
0060         "ddmErrorDiag",
0061         "brokerageErrorCode",
0062         "brokerageErrorDiag",
0063         "jobDispatcherErrorCode",
0064         "jobDispatcherErrorDiag",
0065         "taskBufferErrorCode",
0066         "taskBufferErrorDiag",
0067         "computingSite",
0068         "computingElement",
0069         "jobParameters",
0070         "metadata",
0071         "prodDBlock",
0072         "dispatchDBlock",
0073         "destinationDBlock",
0074         "destinationSE",
0075         "nEvents",
0076         "grid",
0077         "cloud",
0078         "cpuConversion",
0079         "sourceSite",
0080         "destinationSite",
0081         "transferType",
0082         "taskID",
0083         "cmtConfig",
0084         "stateChangeTime",
0085         "prodDBUpdateTime",
0086         "lockedby",
0087         "relocationFlag",
0088         "jobExecutionID",
0089         "VO",
0090         "pilotTiming",
0091         "workingGroup",
0092         "processingType",
0093         "prodUserName",
0094         "nInputFiles",
0095         "countryGroup",
0096         "batchID",
0097         "parentID",
0098         "specialHandling",
0099         "jobsetID",
0100         "coreCount",
0101         "nInputDataFiles",
0102         "inputFileType",
0103         "inputFileProject",
0104         "inputFileBytes",
0105         "nOutputDataFiles",
0106         "outputFileBytes",
0107         "jobMetrics",
0108         "workQueue_ID",
0109         "jediTaskID",
0110         "jobSubStatus",
0111         "actualCoreCount",
0112         "reqID",
0113         "maxRSS",
0114         "maxVMEM",
0115         "maxSWAP",
0116         "maxPSS",
0117         "avgRSS",
0118         "avgVMEM",
0119         "avgSWAP",
0120         "avgPSS",
0121         "maxWalltime",
0122         "nucleus",
0123         "eventService",
0124         "failedAttempt",
0125         "hs06sec",
0126         "gshare",
0127         "hs06",
0128         "totRCHAR",
0129         "totWCHAR",
0130         "totRBYTES",
0131         "totWBYTES",
0132         "rateRCHAR",
0133         "rateWCHAR",
0134         "rateRBYTES",
0135         "rateWBYTES",
0136         "resource_type",
0137         "diskIO",
0138         "memory_leak",
0139         "memory_leak_x2",
0140         "container_name",
0141         "job_label",
0142         "gco2_regional",
0143         "gco2_global",
0144         "cpu_architecture_level",
0145         "outputFileType",
0146     )
0147     # slots
0148     __slots__ = _attributes + ("Files", "_changedAttrs", "_reserveChangedState")
0149     # attributes which have 0 by default
0150     _zeroAttrs = (
0151         "assignedPriority",
0152         "currentPriority",
0153         "attemptNr",
0154         "maxAttempt",
0155         "maxCpuCount",
0156         "maxDiskCount",
0157         "minRamCount",
0158         "cpuConsumptionTime",
0159         "pilotErrorCode",
0160         "exeErrorCode",
0161         "supErrorCode",
0162         "ddmErrorCode",
0163         "brokerageErrorCode",
0164         "jobDispatcherErrorCode",
0165         "taskBufferErrorCode",
0166         "nEvents",
0167         "relocationFlag",
0168         "jobExecutionID",
0169         "nOutputDataFiles",
0170         "outputFileBytes",
0171     )
0172     # attribute to be suppressed. They are in another table
0173     _suppAttrs = ("jobParameters", "metadata")
0174     # mapping between sequence and attr
0175     _seqAttrMap = {"PandaID": "ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval"}
0176     # limit length
0177     _limitLength = {
0178         "ddmErrorDiag": 500,
0179         "taskBufferErrorDiag": 300,
0180         "jobDispatcherErrorDiag": 250,
0181         "brokerageErrorDiag": 250,
0182         "pilotErrorDiag": 500,
0183         "exeErrorDiag": 500,
0184         "jobSubStatus": 80,
0185         "supErrorDiag": 250,
0186         "commandToPilot": 250,
0187         "inputFileType": 32,
0188         "outputFileType": 32,
0189         "grid": 50,
0190         "sourceSite": 36,
0191         "destinationSite": 36,
0192     }
0193     # tag for special handling
0194     _tagForSH = {
0195         "altStgOut": "ao",
0196         "acceptPartial": "ap",
0197         "allOkEvents": "at",
0198         "notDiscardEvents": "de",
0199         "decAttOnFailedES": "df",
0200         "debugMode": "dm",
0201         "dynamicNumEvents": "dy",
0202         "encJobParams": "ej",
0203         "fakeJobToIgnore": "fake",
0204         "homeCloud": "hc",
0205         "hpoWorkflow": "ho",
0206         "inFilePosEvtNum": "if",
0207         "inputPrestaging": "ip",
0208         "lumiBlock": "lb",
0209         "noLoopingCheck": "lc",
0210         "mergeAtOs": "mo",
0211         "noExecStrCnv": "nc",
0212         "onSiteMerging": "om",
0213         "pushStatusChanges": "pc",
0214         "pushJob": "pj",
0215         "putLogToOS": "po",
0216         "registerEsFiles": "re",
0217         "retryRam": "rr",
0218         "resurrectConsumers": "rs",
0219         "requestType": "rt",
0220         "jobCloning": "sc",
0221         "scoutJob": "sj",
0222         "taskQueuedTime": "tq",
0223         "usePrefetcher": "up",
0224         "useSecrets": "us",
0225         "useZipToPin": "uz",
0226         "writeInputToFile": "wf",
0227     }
0228 
0229     # constructor
0230     def __init__(self):
0231         # install attributes
0232         for attr in self._attributes:
0233             object.__setattr__(self, attr, None)
0234         # files list
0235         object.__setattr__(self, "Files", [])
0236         # map of changed attributes
0237         object.__setattr__(self, "_changedAttrs", {})
0238         # reserve changed state at instance level
0239         object.__setattr__(self, "_reserveChangedState", False)
0240 
0241     # override __getattribute__ for SQL
0242     def __getattribute__(self, name):
0243         ret = object.__getattribute__(self, name)
0244         if ret is None:
0245             return "NULL"
0246         return ret
0247 
0248     # override __setattr__ to collecte the changed attributes
0249     def __setattr__(self, name, value):
0250         oldVal = getattr(self, name)
0251         object.__setattr__(self, name, value)
0252         newVal = getattr(self, name)
0253         if name == "jobStatus":
0254             if oldVal != newVal:
0255                 self.stateChangeTime = naive_utcnow()
0256         # collect changed attributes
0257         if oldVal != newVal and name not in self._suppAttrs:
0258             self._changedAttrs[name] = value
0259 
0260     # reset changed attribute list
0261     def resetChangedList(self):
0262         object.__setattr__(self, "_changedAttrs", {})
0263 
0264     # add File to files list
0265     def addFile(self, file):
0266         # set owner
0267         file.setOwner(self)
0268         # append
0269         self.Files.append(file)
0270 
0271     # pack tuple into JobSpec
0272     def pack(self, values):
0273         for i in range(len(self._attributes)):
0274             attr = self._attributes[i]
0275             val = values[i]
0276             object.__setattr__(self, attr, val)
0277 
0278     # return a tuple of values
0279     def values(self):
0280         ret = []
0281         for attr in self._attributes:
0282             val = getattr(self, attr)
0283             ret.append(val)
0284         return tuple(ret)
0285 
0286     # return map of values
0287     def valuesMap(self, useSeq=False, onlyChanged=False):
0288         ret = {}
0289         for attr in self._attributes:
0290             if useSeq and attr in self._seqAttrMap:
0291                 continue
0292             if onlyChanged:
0293                 if attr not in self._changedAttrs:
0294                     continue
0295             val = getattr(self, attr)
0296             if val == "NULL":
0297                 if attr in self._zeroAttrs:
0298                     val = 0
0299                 else:
0300                     val = None
0301             # jobParameters/metadata go to another table
0302             if attr in self._suppAttrs:
0303                 val = None
0304             # truncate too long values
0305             if attr in self._limitLength:
0306                 if val is not None:
0307                     val = val[: self._limitLength[attr]]
0308             ret[f":{attr}"] = val
0309         return ret
0310 
0311     # return state values to be pickled
0312     def __getstate__(self):
0313         state = []
0314         for attr in self._attributes:
0315             val = getattr(self, attr)
0316             state.append(val)
0317         if reserveChangedState or self._reserveChangedState:
0318             state.append(self._changedAttrs)
0319         # append File info
0320         state.append(self.Files)
0321         return state
0322 
0323     # restore state from the unpickled state values
0324     def __setstate__(self, state):
0325         for i in range(len(self._attributes)):
0326             # schema evolution is supported only when adding attributes
0327             if i + 1 < len(state):
0328                 object.__setattr__(self, self._attributes[i], state[i])
0329             else:
0330                 object.__setattr__(self, self._attributes[i], "NULL")
0331         object.__setattr__(self, "Files", state[-1])
0332         if not hasattr(self, "_reserveChangedState"):
0333             object.__setattr__(self, "_reserveChangedState", False)
0334         if reserveChangedState or self._reserveChangedState:
0335             object.__setattr__(self, "_changedAttrs", state[-2])
0336         else:
0337             object.__setattr__(self, "_changedAttrs", {})
0338 
0339     # return column names for INSERT or full SELECT
0340     def columnNames(cls):
0341         ret = ""
0342         for attr in cls._attributes:
0343             if ret != "":
0344                 ret += ","
0345             ret += attr
0346         return ret
0347 
0348     columnNames = classmethod(columnNames)
0349 
0350     # return expression of values for INSERT
0351     def valuesExpression(cls):
0352         ret = "VALUES("
0353         for attr in cls._attributes:
0354             ret += "%s"
0355             if attr != cls._attributes[len(cls._attributes) - 1]:
0356                 ret += ","
0357         ret += ")"
0358         return ret
0359 
0360     valuesExpression = classmethod(valuesExpression)
0361 
0362     # return expression of bind values for INSERT
0363     def bindValuesExpression(cls, useSeq=False):
0364         from pandaserver.config import panda_config
0365 
0366         ret = "VALUES("
0367         for attr in cls._attributes:
0368             if useSeq and attr in cls._seqAttrMap:
0369                 if panda_config.backend == "mysql":
0370                     # mysql
0371                     ret += f"NULL,"
0372                 else:
0373                     # oracle
0374                     ret += f"{cls._seqAttrMap[attr]},"
0375             else:
0376                 ret += f":{attr},"
0377         ret = ret[:-1]
0378         ret += ")"
0379         return ret
0380 
0381     bindValuesExpression = classmethod(bindValuesExpression)
0382 
0383     # return an expression for UPDATE
0384     def updateExpression(cls):
0385         ret = ""
0386         for attr in cls._attributes:
0387             ret = ret + attr + "=%s"
0388             if attr != cls._attributes[len(cls._attributes) - 1]:
0389                 ret += ","
0390         return ret
0391 
0392     updateExpression = classmethod(updateExpression)
0393 
0394     # return an expression of bind variables for UPDATE
0395     def bindUpdateExpression(cls):
0396         ret = ""
0397         for attr in cls._attributes:
0398             ret += f"{attr}=:{attr},"
0399         ret = ret[:-1]
0400         ret += " "
0401         return ret
0402 
0403     bindUpdateExpression = classmethod(bindUpdateExpression)
0404 
0405     # comparison function for sort
0406     def compFunc(cls, a, b):
0407         iPandaID = list(cls._attributes).index("PandaID")
0408         iPriority = list(cls._attributes).index("currentPriority")
0409         if a[iPriority] > b[iPriority]:
0410             return -1
0411         elif a[iPriority] < b[iPriority]:
0412             return 1
0413         else:
0414             if a[iPandaID] > b[iPandaID]:
0415                 return 1
0416             elif a[iPandaID] < b[iPandaID]:
0417                 return -1
0418             else:
0419                 return 0
0420 
0421     compFunc = classmethod(compFunc)
0422 
0423     # return an expression of bind variables for UPDATE to update only changed attributes
0424     def bindUpdateChangesExpression(self):
0425         ret = ""
0426         for attr in self._attributes:
0427             if attr in self._changedAttrs:
0428                 ret += f"{attr}=:{attr},"
0429         ret = ret[:-1]
0430         ret += " "
0431         return ret
0432 
0433     # check if goint to merging
0434     def produceUnMerge(self):
0435         for tmpFile in self.Files:
0436             if tmpFile.isUnMergedOutput():
0437                 return True
0438         return False
0439 
0440     # truncate string attribute
0441     def truncateStringAttr(cls, attr, val):
0442         if attr not in cls._limitLength:
0443             return val
0444         if val is None:
0445             return val
0446         return val[: cls._limitLength[attr]]
0447 
0448     truncateStringAttr = classmethod(truncateStringAttr)
0449 
0450     # set DDM backend
0451     def setDdmBackEnd(self, backEnd):
0452         if self.specialHandling in [None, ""]:
0453             self.specialHandling = "ddm:" + backEnd
0454         else:
0455             if "ddm:" in self.specialHandling:
0456                 self.specialHandling = re.sub("ddm:[,]+", "ddm:" + backEnd, self.specialHandling)
0457             else:
0458                 self.specialHandling = self.specialHandling + "," + "ddm:" + backEnd
0459 
0460     # set LB number
0461     def setLumiBlockNr(self, lumiBlockNr):
0462         if self.specialHandling in ["", None, "NULL"]:
0463             self.specialHandling = f"lb:{lumiBlockNr}"
0464         else:
0465             self.specialHandling += f",lb:{lumiBlockNr}"
0466 
0467     # get LB number
0468     def getLumiBlockNr(self):
0469         if self.specialHandling is not None:
0470             for tmpItem in self.specialHandling.split(","):
0471                 if tmpItem.startswith("lb:"):
0472                     return int(tmpItem.split(":")[-1])
0473         return None
0474 
0475     # get DDM backend
0476     def getDdmBackEnd(self):
0477         if self.specialHandling is None:
0478             return None
0479         for tmpItem in self.specialHandling.split(","):
0480             if tmpItem.startswith("ddm:"):
0481                 return tmpItem.split(":")[-1]
0482         return None
0483 
0484     # set to accept partial finish
0485     def setToAcceptPartialFinish(self):
0486         self.set_special_handling("acceptPartial")
0487 
0488     # accept partial finish
0489     def acceptPartialFinish(self):
0490         return self.check_special_handling("acceptPartial")
0491 
0492     # set home cloud
0493     def setHomeCloud(self, homeCloud):
0494         if self.specialHandling in ["", None, "NULL"]:
0495             self.specialHandling = f"hc:{homeCloud}"
0496         else:
0497             self.specialHandling += f",hc:{homeCloud}"
0498 
0499     # get cloud
0500     def getCloud(self):
0501         if self.specialHandling is not None:
0502             for tmpItem in self.specialHandling.split(","):
0503                 if tmpItem.startswith("hc:"):  # hc: Home Cloud
0504                     return tmpItem.split(":")[-1]
0505         return self.cloud
0506 
0507     # check if cancelled or it's flavor
0508     def isCancelled(self):
0509         return self.jobStatus in ["cancelled", "closed"]
0510 
0511     # get file names which were uploaded to alternative locations
0512     def altStgOutFileList(self):
0513         try:
0514             if self.jobMetrics is not None:
0515                 for item in self.jobMetrics.split():
0516                     if item.startswith("altTransferred="):
0517                         return item.split("=")[-1].split(",")
0518         except Exception:
0519             pass
0520         return []
0521 
0522     # check special handling
0523     def check_special_handling(self, key):
0524         if self.specialHandling:
0525             return self._tagForSH[key] in self.specialHandling.split(",")
0526         return False
0527 
0528     # set special handling
0529     def set_special_handling(self, key):
0530         if self.specialHandling:
0531             items = self.specialHandling.split(",")
0532         else:
0533             items = []
0534         if self._tagForSH[key] not in items:
0535             items.append(self._tagForSH[key])
0536         self.specialHandling = ",".join(items)
0537 
0538     # get mode for alternative stage-out
0539     def getAltStgOut(self):
0540         if self.specialHandling is not None:
0541             for tmpItem in self.specialHandling.split(","):
0542                 if tmpItem.startswith(f"{self._tagForSH['altStgOut']}:"):
0543                     return tmpItem.split(":")[-1]
0544         return None
0545 
0546     # set alternative stage-out
0547     def setAltStgOut(self, mode):
0548         if self.specialHandling is not None:
0549             items = self.specialHandling.split(",")
0550         else:
0551             items = []
0552         # remove old value
0553         newItems = []
0554         for tmpItem in items:
0555             if tmpItem.startswith(f"{self._tagForSH['altStgOut']}:"):
0556                 continue
0557             newItems.append(tmpItem)
0558         newItems.append(f"{self._tagForSH['altStgOut']}:{mode}")
0559         self.specialHandling = ",".join(newItems)
0560 
0561     # put log files to OS
0562     def putLogToOS(self):
0563         return self.check_special_handling("putLogToOS")
0564 
0565     # set to put log files to OS
0566     def setToPutLogToOS(self):
0567         self.set_special_handling("putLogToOS")
0568 
0569     # write input to file
0570     def writeInputToFile(self):
0571         return self.check_special_handling("writeInputToFile")
0572 
0573     # set to write input to file
0574     def setToWriteInputToFile(self):
0575         self.set_special_handling("writeInputToFile")
0576 
0577     # set request type
0578     def setRequestType(self, reqType):
0579         if self.specialHandling is not None:
0580             items = self.specialHandling.split(",")
0581         else:
0582             items = []
0583         newItems = []
0584         setFlag = False
0585         for item in items:
0586             if not item.startswith(self._tagForSH["requestType"]):
0587                 newItems.append(item)
0588         newItems.append(f"{self._tagForSH['requestType']}={reqType}")
0589         self.specialHandling = ",".join(newItems)
0590 
0591     # sort files
0592     def sortFiles(self):
0593         try:
0594             lfnMap = {}
0595             for tmpFile in self.Files:
0596                 if tmpFile.lfn not in lfnMap:
0597                     lfnMap[tmpFile.lfn] = []
0598                 lfnMap[tmpFile.lfn].append(tmpFile)
0599             lfns = sorted(lfnMap)
0600             newFiles = []
0601             for tmpLFN in lfns:
0602                 for tmpFile in lfnMap[tmpLFN]:
0603                     newFiles.append(tmpFile)
0604             self.Files = newFiles
0605         except Exception:
0606             pass
0607 
0608     # get zip file map
0609     def getZipFileMap(self):
0610         zipMap = dict()
0611         try:
0612             if self.jobParameters is not None:
0613                 zipStr = re.search("<ZIP_MAP>(.+)</ZIP_MAP>", self.jobParameters)
0614                 if zipStr is not None:
0615                     for item in zipStr.group(1).split():
0616                         zipFile, conFiles = item.split(":")
0617                         conFiles = conFiles.split(",")
0618                         zipMap[zipFile] = conFiles
0619         except Exception:
0620             pass
0621         return zipMap
0622 
0623     # add multi step exec
0624     def addMultiStepExec(self, steps):
0625         if not self.jobParameters:
0626             self.jobParameters = ""
0627         self.jobParameters += "<MULTI_STEP_EXEC>" + json.dumps(steps) + "</MULTI_STEP_EXEC>"
0628 
0629     # extract multi step exec
0630     def extractMultiStepExec(self):
0631         try:
0632             if "<MULTI_STEP_EXEC>" in self.jobParameters and "</MULTI_STEP_EXEC>" in self.jobParameters:
0633                 pp_1, pp_2 = self.jobParameters.split("<MULTI_STEP_EXEC>")
0634                 pp_2 = pp_2.split("</MULTI_STEP_EXEC>")[0]
0635                 return pp_1, json.loads(pp_2)
0636         except Exception:
0637             pass
0638         return self.jobParameters, None
0639 
0640     # suppress execute string conversion
0641     def noExecStrCnv(self):
0642         return self.check_special_handling("noExecStrCnv")
0643 
0644     # set to suppress execute string conversion
0645     def setNoExecStrCnv(self):
0646         self.set_special_handling("noExecStrCnv")
0647 
0648     # in-file positional event number
0649     def inFilePosEvtNum(self):
0650         return self.check_special_handling("inFilePosEvtNum")
0651 
0652     # set to use in-file positional event number
0653     def setInFilePosEvtNum(self):
0654         self.set_special_handling("inFilePosEvtNum")
0655 
0656     # register event service files
0657     def registerEsFiles(self):
0658         return self.check_special_handling("registerEsFiles")
0659 
0660     # set to register event service files
0661     def setRegisterEsFiles(self):
0662         self.set_special_handling("registerEsFiles")
0663 
0664     # set background-able flag
0665     def setBackgroundableFlag(self):
0666         self.jobExecutionID = 0
0667         if self.prodSourceLabel not in ["managed", "test"]:
0668             return
0669         try:
0670             if self.inputFileBytes / self.maxWalltime > 5000:
0671                 return
0672         except Exception:
0673             return
0674         try:
0675             if self.coreCount <= 1:
0676                 return
0677         except Exception:
0678             return
0679         if self.currentPriority > 250:
0680             return
0681         self.jobExecutionID = 1
0682 
0683     # use prefetcher
0684     def usePrefetcher(self):
0685         return self.check_special_handling("usePrefetcher")
0686 
0687     # set to use prefetcher
0688     def setUsePrefetcher(self):
0689         self.set_special_handling("usePrefetcher")
0690 
0691     # use zip to pin
0692     def useZipToPin(self):
0693         return self.check_special_handling("useZipToPin")
0694 
0695     # set to use zip to pin
0696     def setUseZipToPin(self):
0697         self.set_special_handling("useZipToPin")
0698 
0699     # use secrets
0700     def use_secrets(self):
0701         return self.check_special_handling("useSecrets")
0702 
0703     # set to use secrets
0704     def set_use_secrets(self):
0705         self.set_special_handling("useSecrets")
0706 
0707     # not discard events
0708     def notDiscardEvents(self):
0709         return self.check_special_handling("notDiscardEvents")
0710 
0711     # set not to discard events
0712     def setNotDiscardEvents(self):
0713         self.set_special_handling("notDiscardEvents")
0714 
0715     # all events are done
0716     def allOkEvents(self):
0717         return self.check_special_handling("allOkEvents")
0718 
0719     # set all events are done
0720     def setAllOkEvents(self):
0721         self.set_special_handling("allOkEvents")
0722 
0723     # set scout job flag
0724     def setScoutJobFlag(self):
0725         self.set_special_handling("scoutJob")
0726 
0727     # check if scout job
0728     def isScoutJob(self):
0729         return self.check_special_handling("scoutJob")
0730 
0731     # decrement attemptNr of events only when failed
0732     def decAttOnFailedES(self):
0733         return self.check_special_handling("decAttOnFailedES")
0734 
0735     # set to decrement attemptNr of events only when failed
0736     def setDecAttOnFailedES(self):
0737         self.set_special_handling("decAttOnFailedES")
0738 
0739     # set fake flag to ignore in monigoring
0740     def setFakeJobToIgnore(self):
0741         self.set_special_handling("fakeJobToIgnore")
0742 
0743     # remove fake flag to ignore in monigoring
0744     def removeFakeJobToIgnore(self):
0745         if self.specialHandling is not None:
0746             items = self.specialHandling.split(",")
0747         else:
0748             items = []
0749         if self._tagForSH["fakeJobToIgnore"] in items:
0750             items.remove(self._tagForSH["fakeJobToIgnore"])
0751         self.specialHandling = ",".join(items)
0752 
0753     # set task attribute
0754     def set_task_attribute(self, key, value):
0755         if not isinstance(self.metadata, list):
0756             self.metadata = [None, None]
0757         if len(self.metadata) != 3:
0758             self.metadata.append({})
0759         self.metadata[2][key] = value
0760 
0761     # get task attribute
0762     def get_task_attribute(self, key):
0763         try:
0764             return self.metadata[2][key]
0765         except Exception:
0766             return None
0767 
0768     # set input prestaging
0769     def setInputPrestaging(self):
0770         self.set_special_handling("inputPrestaging")
0771 
0772     # use input prestaging
0773     def useInputPrestaging(self):
0774         return self.check_special_handling("inputPrestaging")
0775 
0776     # to a dictionary
0777     def to_dict_advanced(self):
0778         ret = {}
0779 
0780         # Get the job attributes
0781         for a in self._attributes:
0782             v = getattr(self, a)
0783             if v == "NULL":
0784                 v = None
0785             ret[a] = v
0786 
0787         # Add the files explicitly, as they are not included under _attributes
0788         a = "Files"
0789         files = getattr(self, a)
0790         file_stats = []
0791         for f in files:
0792             file_stats.append(f.to_dict())
0793         ret[a] = file_stats
0794 
0795         return ret
0796 
0797     def to_dict(self):
0798         ret = {}
0799         for a in self._attributes:
0800             v = getattr(self, a)
0801             if isinstance(v, datetime.datetime):
0802                 v = str(v)
0803             elif v == "NULL":
0804                 v = None
0805             ret[a] = v
0806         return ret
0807 
0808     # check if HPO workflow flag
0809     def is_hpo_workflow(self):
0810         return self.check_special_handling("hpoWorkflow")
0811 
0812     # set HPO workflow flag
0813     def set_hpo_workflow(self):
0814         self.set_special_handling("hpoWorkflow")
0815 
0816     # check if looping check is disabled
0817     def is_no_looping_check(self):
0818         return self.check_special_handling("noLoopingCheck")
0819 
0820     # disable looping check
0821     def disable_looping_check(self):
0822         self.set_special_handling("noLoopingCheck")
0823 
0824     # check if encode job parameters
0825     def to_encode_job_params(self):
0826         return self.check_special_handling("encJobParams")
0827 
0828     # encode job parameters
0829     def set_encode_job_params(self):
0830         self.set_special_handling("encJobParams")
0831 
0832     # check if debug mode
0833     def is_debug_mode(self):
0834         if self.specialHandling is not None:
0835             items = self.specialHandling.split(",")
0836             return self._tagForSH["debugMode"] in items or "debug" in items
0837         return False
0838 
0839     # set debug mode
0840     def set_debug_mode(self):
0841         self.set_special_handling("debugMode")
0842 
0843     # set push status changes
0844     def set_push_status_changes(self):
0845         self.set_special_handling("pushStatusChanges")
0846 
0847     # check if to push status changes
0848     def push_status_changes(self):
0849         return push_status_changes(self.specialHandling)
0850 
0851     # set push job
0852     def set_push_job(self):
0853         self.set_special_handling("pushJob")
0854 
0855     # check if to push job
0856     def is_push_job(self):
0857         return self.check_special_handling("pushJob")
0858 
0859     # set on-site merging
0860     def set_on_site_merging(self):
0861         self.set_special_handling("onSiteMerging")
0862 
0863     # check if on-site merging
0864     def is_on_site_merging(self):
0865         return self.check_special_handling("onSiteMerging")
0866 
0867     # get RAM for retry
0868     def get_ram_for_retry(self):
0869         if self.specialHandling is not None:
0870             for tmpItem in self.specialHandling.split(","):
0871                 if tmpItem.startswith(f"{self._tagForSH['retryRam']}:"):
0872                     return int(tmpItem.split(":")[-1])
0873         return None
0874 
0875     # set RAM for retry
0876     def set_ram_for_retry(self, val):
0877         if self.specialHandling:
0878             items = self.specialHandling.split(",")
0879         else:
0880             items = []
0881         # remove old value
0882         newItems = []
0883         for tmpItem in items:
0884             if tmpItem.startswith(f"{self._tagForSH['retryRam']}:"):
0885                 continue
0886             newItems.append(tmpItem)
0887         newItems.append(f"{self._tagForSH['retryRam']}:{val}")
0888         self.specialHandling = ",".join(newItems)
0889 
0890     # dump to json-serializable
0891     def dump_to_json_serializable(self):
0892         job_state = self.__getstate__()
0893         file_state_list = []
0894         for file_spec in job_state[-1]:
0895             file_stat = file_spec.dump_to_json_serializable()
0896             file_state_list.append(file_stat)
0897         job_state = job_state[:-1]
0898         # append files
0899         job_state.append(file_state_list)
0900         return job_state
0901 
0902     # load from json-serializable
0903     def load_from_json_serializable(self, job_state):
0904         # initialize with empty file list
0905         self.__setstate__(job_state[:-1] + [[]])
0906         # add files
0907         for file_stat in job_state[-1]:
0908             file_spec = FileSpec()
0909             file_spec.__setstate__(file_stat)
0910             self.addFile(file_spec)
0911 
0912     def load_from_dict(self, job_dict):
0913         # Extract job attributes (excluding files)
0914         job_attrs = []
0915         for slot in self.__slots__:
0916             if slot == "Files":  # skip files here
0917                 continue
0918             job_attrs.append(job_dict.get(slot, None))
0919 
0920         # Initialize with empty file list
0921         self.__setstate__(job_attrs + [[]])
0922 
0923         # Add files
0924         for file_data in job_dict.get("Files", []):
0925             file_spec = FileSpec()
0926             file_spec.__setstate__([file_data.get(s, None) for s in file_spec.__slots__])
0927             self.addFile(file_spec)
0928 
0929     # set input and output file types
0930     def set_input_output_file_types(self) -> None:
0931         """
0932         Set input and output file types based on the input and output file names
0933         """
0934         in_types = set()
0935         out_types = set()
0936         for tmp_file in self.Files:
0937             # ignore DBRelease/lib.tgz files
0938             if tmp_file.dataset.startswith("ddo") or tmp_file.lfn.endswith(".lib.tgz"):
0939                 continue
0940             # extract type while ignoring user/group/groupXY
0941             tmp_items = tmp_file.dataset.split(".")
0942             if (
0943                 len(tmp_items) > 4
0944                 and (not tmp_items[0] in ["", "NULL", "user", "group", "hc_test"])
0945                 and (not tmp_items[0].startswith("group"))
0946                 and not tmp_file.dataset.startswith("panda.um.")
0947             ):
0948                 tmp_type = tmp_items[4]
0949             else:
0950                 continue
0951             if tmp_file.type == "input":
0952                 in_types.add(tmp_type)
0953             elif tmp_file.type == "output":
0954                 out_types.add(tmp_type)
0955         # set types
0956         if in_types:
0957             in_types = sorted(list(in_types))
0958             self.inputFileType = ",".join(in_types)[: self._limitLength["inputFileType"]]
0959         if out_types:
0960             out_types = sorted(list(out_types))
0961             self.outputFileType = ",".join(out_types)[: self._limitLength["outputFileType"]]
0962 
0963     # set task queued time
0964     def set_task_queued_time(self, queued_time):
0965         """
0966         Set task queued time in job metrics. Skip if queued_time is None
0967 
0968         :param queued_time: task queued time in seconds since epoch
0969         """
0970         if queued_time is None:
0971             return
0972         if self.specialHandling in [None, "", "NULL"]:
0973             items = []
0974         else:
0975             items = self.specialHandling.split(",")
0976         items.append(f"{self._tagForSH['taskQueuedTime']}={queued_time}")
0977         self.specialHandling = ",".join(items)
0978 
0979 
0980 # utils
0981 
0982 
0983 # check if to push status changes without class instance
0984 def push_status_changes(special_handling):
0985     if special_handling is not None:
0986         items = special_handling.split(",")
0987         return JobSpec._tagForSH["pushStatusChanges"] in items
0988     return False
0989 
0990 
0991 # get task queued time
0992 def get_task_queued_time(special_handling):
0993     """
0994     Get task queued time from job metrics
0995 
0996     :param special_handling: special handling string
0997     :return: task queued time. None if unset
0998     """
0999     try:
1000         if special_handling is not None:
1001             for item in special_handling.split(","):
1002                 if item.startswith(f"{JobSpec._tagForSH['taskQueuedTime']}="):
1003                     return datetime.datetime.fromtimestamp(float(item.split("=")[-1]), datetime.timezone.utc)
1004     except Exception:
1005         pass
1006     return None