File indexing completed on 2026-04-10 08:39:06
0001 """
0002 job specification
0003
0004 """
0005
0006 import datetime
0007 import json
0008 import re
0009
0010 from pandacommon.pandautils.PandaUtils import naive_utcnow
0011
0012 from pandaserver.taskbuffer.FileSpec import FileSpec
0013
0014 reserveChangedState = False
0015
0016
0017 class JobSpec(object):
0018
0019 _attributes = (
0020 "PandaID",
0021 "jobDefinitionID",
0022 "schedulerID",
0023 "pilotID",
0024 "creationTime",
0025 "creationHost",
0026 "modificationTime",
0027 "modificationHost",
0028 "AtlasRelease",
0029 "transformation",
0030 "homepackage",
0031 "prodSeriesLabel",
0032 "prodSourceLabel",
0033 "prodUserID",
0034 "assignedPriority",
0035 "currentPriority",
0036 "attemptNr",
0037 "maxAttempt",
0038 "jobStatus",
0039 "jobName",
0040 "maxCpuCount",
0041 "maxCpuUnit",
0042 "maxDiskCount",
0043 "maxDiskUnit",
0044 "ipConnectivity",
0045 "minRamCount",
0046 "minRamUnit",
0047 "startTime",
0048 "endTime",
0049 "cpuConsumptionTime",
0050 "cpuConsumptionUnit",
0051 "commandToPilot",
0052 "transExitCode",
0053 "pilotErrorCode",
0054 "pilotErrorDiag",
0055 "exeErrorCode",
0056 "exeErrorDiag",
0057 "supErrorCode",
0058 "supErrorDiag",
0059 "ddmErrorCode",
0060 "ddmErrorDiag",
0061 "brokerageErrorCode",
0062 "brokerageErrorDiag",
0063 "jobDispatcherErrorCode",
0064 "jobDispatcherErrorDiag",
0065 "taskBufferErrorCode",
0066 "taskBufferErrorDiag",
0067 "computingSite",
0068 "computingElement",
0069 "jobParameters",
0070 "metadata",
0071 "prodDBlock",
0072 "dispatchDBlock",
0073 "destinationDBlock",
0074 "destinationSE",
0075 "nEvents",
0076 "grid",
0077 "cloud",
0078 "cpuConversion",
0079 "sourceSite",
0080 "destinationSite",
0081 "transferType",
0082 "taskID",
0083 "cmtConfig",
0084 "stateChangeTime",
0085 "prodDBUpdateTime",
0086 "lockedby",
0087 "relocationFlag",
0088 "jobExecutionID",
0089 "VO",
0090 "pilotTiming",
0091 "workingGroup",
0092 "processingType",
0093 "prodUserName",
0094 "nInputFiles",
0095 "countryGroup",
0096 "batchID",
0097 "parentID",
0098 "specialHandling",
0099 "jobsetID",
0100 "coreCount",
0101 "nInputDataFiles",
0102 "inputFileType",
0103 "inputFileProject",
0104 "inputFileBytes",
0105 "nOutputDataFiles",
0106 "outputFileBytes",
0107 "jobMetrics",
0108 "workQueue_ID",
0109 "jediTaskID",
0110 "jobSubStatus",
0111 "actualCoreCount",
0112 "reqID",
0113 "maxRSS",
0114 "maxVMEM",
0115 "maxSWAP",
0116 "maxPSS",
0117 "avgRSS",
0118 "avgVMEM",
0119 "avgSWAP",
0120 "avgPSS",
0121 "maxWalltime",
0122 "nucleus",
0123 "eventService",
0124 "failedAttempt",
0125 "hs06sec",
0126 "gshare",
0127 "hs06",
0128 "totRCHAR",
0129 "totWCHAR",
0130 "totRBYTES",
0131 "totWBYTES",
0132 "rateRCHAR",
0133 "rateWCHAR",
0134 "rateRBYTES",
0135 "rateWBYTES",
0136 "resource_type",
0137 "diskIO",
0138 "memory_leak",
0139 "memory_leak_x2",
0140 "container_name",
0141 "job_label",
0142 "gco2_regional",
0143 "gco2_global",
0144 "cpu_architecture_level",
0145 "outputFileType",
0146 )
0147
0148 __slots__ = _attributes + ("Files", "_changedAttrs", "_reserveChangedState")
0149
0150 _zeroAttrs = (
0151 "assignedPriority",
0152 "currentPriority",
0153 "attemptNr",
0154 "maxAttempt",
0155 "maxCpuCount",
0156 "maxDiskCount",
0157 "minRamCount",
0158 "cpuConsumptionTime",
0159 "pilotErrorCode",
0160 "exeErrorCode",
0161 "supErrorCode",
0162 "ddmErrorCode",
0163 "brokerageErrorCode",
0164 "jobDispatcherErrorCode",
0165 "taskBufferErrorCode",
0166 "nEvents",
0167 "relocationFlag",
0168 "jobExecutionID",
0169 "nOutputDataFiles",
0170 "outputFileBytes",
0171 )
0172
0173 _suppAttrs = ("jobParameters", "metadata")
0174
0175 _seqAttrMap = {"PandaID": "ATLAS_PANDA.JOBSDEFINED4_PANDAID_SEQ.nextval"}
0176
0177 _limitLength = {
0178 "ddmErrorDiag": 500,
0179 "taskBufferErrorDiag": 300,
0180 "jobDispatcherErrorDiag": 250,
0181 "brokerageErrorDiag": 250,
0182 "pilotErrorDiag": 500,
0183 "exeErrorDiag": 500,
0184 "jobSubStatus": 80,
0185 "supErrorDiag": 250,
0186 "commandToPilot": 250,
0187 "inputFileType": 32,
0188 "outputFileType": 32,
0189 "grid": 50,
0190 "sourceSite": 36,
0191 "destinationSite": 36,
0192 }
0193
0194 _tagForSH = {
0195 "altStgOut": "ao",
0196 "acceptPartial": "ap",
0197 "allOkEvents": "at",
0198 "notDiscardEvents": "de",
0199 "decAttOnFailedES": "df",
0200 "debugMode": "dm",
0201 "dynamicNumEvents": "dy",
0202 "encJobParams": "ej",
0203 "fakeJobToIgnore": "fake",
0204 "homeCloud": "hc",
0205 "hpoWorkflow": "ho",
0206 "inFilePosEvtNum": "if",
0207 "inputPrestaging": "ip",
0208 "lumiBlock": "lb",
0209 "noLoopingCheck": "lc",
0210 "mergeAtOs": "mo",
0211 "noExecStrCnv": "nc",
0212 "onSiteMerging": "om",
0213 "pushStatusChanges": "pc",
0214 "pushJob": "pj",
0215 "putLogToOS": "po",
0216 "registerEsFiles": "re",
0217 "retryRam": "rr",
0218 "resurrectConsumers": "rs",
0219 "requestType": "rt",
0220 "jobCloning": "sc",
0221 "scoutJob": "sj",
0222 "taskQueuedTime": "tq",
0223 "usePrefetcher": "up",
0224 "useSecrets": "us",
0225 "useZipToPin": "uz",
0226 "writeInputToFile": "wf",
0227 }
0228
0229
0230 def __init__(self):
0231
0232 for attr in self._attributes:
0233 object.__setattr__(self, attr, None)
0234
0235 object.__setattr__(self, "Files", [])
0236
0237 object.__setattr__(self, "_changedAttrs", {})
0238
0239 object.__setattr__(self, "_reserveChangedState", False)
0240
0241
0242 def __getattribute__(self, name):
0243 ret = object.__getattribute__(self, name)
0244 if ret is None:
0245 return "NULL"
0246 return ret
0247
0248
0249 def __setattr__(self, name, value):
0250 oldVal = getattr(self, name)
0251 object.__setattr__(self, name, value)
0252 newVal = getattr(self, name)
0253 if name == "jobStatus":
0254 if oldVal != newVal:
0255 self.stateChangeTime = naive_utcnow()
0256
0257 if oldVal != newVal and name not in self._suppAttrs:
0258 self._changedAttrs[name] = value
0259
0260
0261 def resetChangedList(self):
0262 object.__setattr__(self, "_changedAttrs", {})
0263
0264
0265 def addFile(self, file):
0266
0267 file.setOwner(self)
0268
0269 self.Files.append(file)
0270
0271
0272 def pack(self, values):
0273 for i in range(len(self._attributes)):
0274 attr = self._attributes[i]
0275 val = values[i]
0276 object.__setattr__(self, attr, val)
0277
0278
0279 def values(self):
0280 ret = []
0281 for attr in self._attributes:
0282 val = getattr(self, attr)
0283 ret.append(val)
0284 return tuple(ret)
0285
0286
0287 def valuesMap(self, useSeq=False, onlyChanged=False):
0288 ret = {}
0289 for attr in self._attributes:
0290 if useSeq and attr in self._seqAttrMap:
0291 continue
0292 if onlyChanged:
0293 if attr not in self._changedAttrs:
0294 continue
0295 val = getattr(self, attr)
0296 if val == "NULL":
0297 if attr in self._zeroAttrs:
0298 val = 0
0299 else:
0300 val = None
0301
0302 if attr in self._suppAttrs:
0303 val = None
0304
0305 if attr in self._limitLength:
0306 if val is not None:
0307 val = val[: self._limitLength[attr]]
0308 ret[f":{attr}"] = val
0309 return ret
0310
0311
0312 def __getstate__(self):
0313 state = []
0314 for attr in self._attributes:
0315 val = getattr(self, attr)
0316 state.append(val)
0317 if reserveChangedState or self._reserveChangedState:
0318 state.append(self._changedAttrs)
0319
0320 state.append(self.Files)
0321 return state
0322
0323
0324 def __setstate__(self, state):
0325 for i in range(len(self._attributes)):
0326
0327 if i + 1 < len(state):
0328 object.__setattr__(self, self._attributes[i], state[i])
0329 else:
0330 object.__setattr__(self, self._attributes[i], "NULL")
0331 object.__setattr__(self, "Files", state[-1])
0332 if not hasattr(self, "_reserveChangedState"):
0333 object.__setattr__(self, "_reserveChangedState", False)
0334 if reserveChangedState or self._reserveChangedState:
0335 object.__setattr__(self, "_changedAttrs", state[-2])
0336 else:
0337 object.__setattr__(self, "_changedAttrs", {})
0338
0339
0340 def columnNames(cls):
0341 ret = ""
0342 for attr in cls._attributes:
0343 if ret != "":
0344 ret += ","
0345 ret += attr
0346 return ret
0347
0348 columnNames = classmethod(columnNames)
0349
0350
0351 def valuesExpression(cls):
0352 ret = "VALUES("
0353 for attr in cls._attributes:
0354 ret += "%s"
0355 if attr != cls._attributes[len(cls._attributes) - 1]:
0356 ret += ","
0357 ret += ")"
0358 return ret
0359
0360 valuesExpression = classmethod(valuesExpression)
0361
0362
0363 def bindValuesExpression(cls, useSeq=False):
0364 from pandaserver.config import panda_config
0365
0366 ret = "VALUES("
0367 for attr in cls._attributes:
0368 if useSeq and attr in cls._seqAttrMap:
0369 if panda_config.backend == "mysql":
0370
0371 ret += f"NULL,"
0372 else:
0373
0374 ret += f"{cls._seqAttrMap[attr]},"
0375 else:
0376 ret += f":{attr},"
0377 ret = ret[:-1]
0378 ret += ")"
0379 return ret
0380
0381 bindValuesExpression = classmethod(bindValuesExpression)
0382
0383
0384 def updateExpression(cls):
0385 ret = ""
0386 for attr in cls._attributes:
0387 ret = ret + attr + "=%s"
0388 if attr != cls._attributes[len(cls._attributes) - 1]:
0389 ret += ","
0390 return ret
0391
0392 updateExpression = classmethod(updateExpression)
0393
0394
0395 def bindUpdateExpression(cls):
0396 ret = ""
0397 for attr in cls._attributes:
0398 ret += f"{attr}=:{attr},"
0399 ret = ret[:-1]
0400 ret += " "
0401 return ret
0402
0403 bindUpdateExpression = classmethod(bindUpdateExpression)
0404
0405
0406 def compFunc(cls, a, b):
0407 iPandaID = list(cls._attributes).index("PandaID")
0408 iPriority = list(cls._attributes).index("currentPriority")
0409 if a[iPriority] > b[iPriority]:
0410 return -1
0411 elif a[iPriority] < b[iPriority]:
0412 return 1
0413 else:
0414 if a[iPandaID] > b[iPandaID]:
0415 return 1
0416 elif a[iPandaID] < b[iPandaID]:
0417 return -1
0418 else:
0419 return 0
0420
0421 compFunc = classmethod(compFunc)
0422
0423
0424 def bindUpdateChangesExpression(self):
0425 ret = ""
0426 for attr in self._attributes:
0427 if attr in self._changedAttrs:
0428 ret += f"{attr}=:{attr},"
0429 ret = ret[:-1]
0430 ret += " "
0431 return ret
0432
0433
0434 def produceUnMerge(self):
0435 for tmpFile in self.Files:
0436 if tmpFile.isUnMergedOutput():
0437 return True
0438 return False
0439
0440
0441 def truncateStringAttr(cls, attr, val):
0442 if attr not in cls._limitLength:
0443 return val
0444 if val is None:
0445 return val
0446 return val[: cls._limitLength[attr]]
0447
0448 truncateStringAttr = classmethod(truncateStringAttr)
0449
0450
0451 def setDdmBackEnd(self, backEnd):
0452 if self.specialHandling in [None, ""]:
0453 self.specialHandling = "ddm:" + backEnd
0454 else:
0455 if "ddm:" in self.specialHandling:
0456 self.specialHandling = re.sub("ddm:[,]+", "ddm:" + backEnd, self.specialHandling)
0457 else:
0458 self.specialHandling = self.specialHandling + "," + "ddm:" + backEnd
0459
0460
0461 def setLumiBlockNr(self, lumiBlockNr):
0462 if self.specialHandling in ["", None, "NULL"]:
0463 self.specialHandling = f"lb:{lumiBlockNr}"
0464 else:
0465 self.specialHandling += f",lb:{lumiBlockNr}"
0466
0467
0468 def getLumiBlockNr(self):
0469 if self.specialHandling is not None:
0470 for tmpItem in self.specialHandling.split(","):
0471 if tmpItem.startswith("lb:"):
0472 return int(tmpItem.split(":")[-1])
0473 return None
0474
0475
0476 def getDdmBackEnd(self):
0477 if self.specialHandling is None:
0478 return None
0479 for tmpItem in self.specialHandling.split(","):
0480 if tmpItem.startswith("ddm:"):
0481 return tmpItem.split(":")[-1]
0482 return None
0483
0484
0485 def setToAcceptPartialFinish(self):
0486 self.set_special_handling("acceptPartial")
0487
0488
0489 def acceptPartialFinish(self):
0490 return self.check_special_handling("acceptPartial")
0491
0492
0493 def setHomeCloud(self, homeCloud):
0494 if self.specialHandling in ["", None, "NULL"]:
0495 self.specialHandling = f"hc:{homeCloud}"
0496 else:
0497 self.specialHandling += f",hc:{homeCloud}"
0498
0499
0500 def getCloud(self):
0501 if self.specialHandling is not None:
0502 for tmpItem in self.specialHandling.split(","):
0503 if tmpItem.startswith("hc:"):
0504 return tmpItem.split(":")[-1]
0505 return self.cloud
0506
0507
0508 def isCancelled(self):
0509 return self.jobStatus in ["cancelled", "closed"]
0510
0511
0512 def altStgOutFileList(self):
0513 try:
0514 if self.jobMetrics is not None:
0515 for item in self.jobMetrics.split():
0516 if item.startswith("altTransferred="):
0517 return item.split("=")[-1].split(",")
0518 except Exception:
0519 pass
0520 return []
0521
0522
0523 def check_special_handling(self, key):
0524 if self.specialHandling:
0525 return self._tagForSH[key] in self.specialHandling.split(",")
0526 return False
0527
0528
0529 def set_special_handling(self, key):
0530 if self.specialHandling:
0531 items = self.specialHandling.split(",")
0532 else:
0533 items = []
0534 if self._tagForSH[key] not in items:
0535 items.append(self._tagForSH[key])
0536 self.specialHandling = ",".join(items)
0537
0538
0539 def getAltStgOut(self):
0540 if self.specialHandling is not None:
0541 for tmpItem in self.specialHandling.split(","):
0542 if tmpItem.startswith(f"{self._tagForSH['altStgOut']}:"):
0543 return tmpItem.split(":")[-1]
0544 return None
0545
0546
0547 def setAltStgOut(self, mode):
0548 if self.specialHandling is not None:
0549 items = self.specialHandling.split(",")
0550 else:
0551 items = []
0552
0553 newItems = []
0554 for tmpItem in items:
0555 if tmpItem.startswith(f"{self._tagForSH['altStgOut']}:"):
0556 continue
0557 newItems.append(tmpItem)
0558 newItems.append(f"{self._tagForSH['altStgOut']}:{mode}")
0559 self.specialHandling = ",".join(newItems)
0560
0561
0562 def putLogToOS(self):
0563 return self.check_special_handling("putLogToOS")
0564
0565
0566 def setToPutLogToOS(self):
0567 self.set_special_handling("putLogToOS")
0568
0569
0570 def writeInputToFile(self):
0571 return self.check_special_handling("writeInputToFile")
0572
0573
0574 def setToWriteInputToFile(self):
0575 self.set_special_handling("writeInputToFile")
0576
0577
0578 def setRequestType(self, reqType):
0579 if self.specialHandling is not None:
0580 items = self.specialHandling.split(",")
0581 else:
0582 items = []
0583 newItems = []
0584 setFlag = False
0585 for item in items:
0586 if not item.startswith(self._tagForSH["requestType"]):
0587 newItems.append(item)
0588 newItems.append(f"{self._tagForSH['requestType']}={reqType}")
0589 self.specialHandling = ",".join(newItems)
0590
0591
0592 def sortFiles(self):
0593 try:
0594 lfnMap = {}
0595 for tmpFile in self.Files:
0596 if tmpFile.lfn not in lfnMap:
0597 lfnMap[tmpFile.lfn] = []
0598 lfnMap[tmpFile.lfn].append(tmpFile)
0599 lfns = sorted(lfnMap)
0600 newFiles = []
0601 for tmpLFN in lfns:
0602 for tmpFile in lfnMap[tmpLFN]:
0603 newFiles.append(tmpFile)
0604 self.Files = newFiles
0605 except Exception:
0606 pass
0607
0608
0609 def getZipFileMap(self):
0610 zipMap = dict()
0611 try:
0612 if self.jobParameters is not None:
0613 zipStr = re.search("<ZIP_MAP>(.+)</ZIP_MAP>", self.jobParameters)
0614 if zipStr is not None:
0615 for item in zipStr.group(1).split():
0616 zipFile, conFiles = item.split(":")
0617 conFiles = conFiles.split(",")
0618 zipMap[zipFile] = conFiles
0619 except Exception:
0620 pass
0621 return zipMap
0622
0623
0624 def addMultiStepExec(self, steps):
0625 if not self.jobParameters:
0626 self.jobParameters = ""
0627 self.jobParameters += "<MULTI_STEP_EXEC>" + json.dumps(steps) + "</MULTI_STEP_EXEC>"
0628
0629
0630 def extractMultiStepExec(self):
0631 try:
0632 if "<MULTI_STEP_EXEC>" in self.jobParameters and "</MULTI_STEP_EXEC>" in self.jobParameters:
0633 pp_1, pp_2 = self.jobParameters.split("<MULTI_STEP_EXEC>")
0634 pp_2 = pp_2.split("</MULTI_STEP_EXEC>")[0]
0635 return pp_1, json.loads(pp_2)
0636 except Exception:
0637 pass
0638 return self.jobParameters, None
0639
0640
0641 def noExecStrCnv(self):
0642 return self.check_special_handling("noExecStrCnv")
0643
0644
0645 def setNoExecStrCnv(self):
0646 self.set_special_handling("noExecStrCnv")
0647
0648
0649 def inFilePosEvtNum(self):
0650 return self.check_special_handling("inFilePosEvtNum")
0651
0652
0653 def setInFilePosEvtNum(self):
0654 self.set_special_handling("inFilePosEvtNum")
0655
0656
0657 def registerEsFiles(self):
0658 return self.check_special_handling("registerEsFiles")
0659
0660
0661 def setRegisterEsFiles(self):
0662 self.set_special_handling("registerEsFiles")
0663
0664
0665 def setBackgroundableFlag(self):
0666 self.jobExecutionID = 0
0667 if self.prodSourceLabel not in ["managed", "test"]:
0668 return
0669 try:
0670 if self.inputFileBytes / self.maxWalltime > 5000:
0671 return
0672 except Exception:
0673 return
0674 try:
0675 if self.coreCount <= 1:
0676 return
0677 except Exception:
0678 return
0679 if self.currentPriority > 250:
0680 return
0681 self.jobExecutionID = 1
0682
0683
0684 def usePrefetcher(self):
0685 return self.check_special_handling("usePrefetcher")
0686
0687
0688 def setUsePrefetcher(self):
0689 self.set_special_handling("usePrefetcher")
0690
0691
0692 def useZipToPin(self):
0693 return self.check_special_handling("useZipToPin")
0694
0695
0696 def setUseZipToPin(self):
0697 self.set_special_handling("useZipToPin")
0698
0699
0700 def use_secrets(self):
0701 return self.check_special_handling("useSecrets")
0702
0703
0704 def set_use_secrets(self):
0705 self.set_special_handling("useSecrets")
0706
0707
0708 def notDiscardEvents(self):
0709 return self.check_special_handling("notDiscardEvents")
0710
0711
0712 def setNotDiscardEvents(self):
0713 self.set_special_handling("notDiscardEvents")
0714
0715
0716 def allOkEvents(self):
0717 return self.check_special_handling("allOkEvents")
0718
0719
0720 def setAllOkEvents(self):
0721 self.set_special_handling("allOkEvents")
0722
0723
0724 def setScoutJobFlag(self):
0725 self.set_special_handling("scoutJob")
0726
0727
0728 def isScoutJob(self):
0729 return self.check_special_handling("scoutJob")
0730
0731
0732 def decAttOnFailedES(self):
0733 return self.check_special_handling("decAttOnFailedES")
0734
0735
0736 def setDecAttOnFailedES(self):
0737 self.set_special_handling("decAttOnFailedES")
0738
0739
0740 def setFakeJobToIgnore(self):
0741 self.set_special_handling("fakeJobToIgnore")
0742
0743
0744 def removeFakeJobToIgnore(self):
0745 if self.specialHandling is not None:
0746 items = self.specialHandling.split(",")
0747 else:
0748 items = []
0749 if self._tagForSH["fakeJobToIgnore"] in items:
0750 items.remove(self._tagForSH["fakeJobToIgnore"])
0751 self.specialHandling = ",".join(items)
0752
0753
0754 def set_task_attribute(self, key, value):
0755 if not isinstance(self.metadata, list):
0756 self.metadata = [None, None]
0757 if len(self.metadata) != 3:
0758 self.metadata.append({})
0759 self.metadata[2][key] = value
0760
0761
0762 def get_task_attribute(self, key):
0763 try:
0764 return self.metadata[2][key]
0765 except Exception:
0766 return None
0767
0768
0769 def setInputPrestaging(self):
0770 self.set_special_handling("inputPrestaging")
0771
0772
0773 def useInputPrestaging(self):
0774 return self.check_special_handling("inputPrestaging")
0775
0776
0777 def to_dict_advanced(self):
0778 ret = {}
0779
0780
0781 for a in self._attributes:
0782 v = getattr(self, a)
0783 if v == "NULL":
0784 v = None
0785 ret[a] = v
0786
0787
0788 a = "Files"
0789 files = getattr(self, a)
0790 file_stats = []
0791 for f in files:
0792 file_stats.append(f.to_dict())
0793 ret[a] = file_stats
0794
0795 return ret
0796
0797 def to_dict(self):
0798 ret = {}
0799 for a in self._attributes:
0800 v = getattr(self, a)
0801 if isinstance(v, datetime.datetime):
0802 v = str(v)
0803 elif v == "NULL":
0804 v = None
0805 ret[a] = v
0806 return ret
0807
0808
0809 def is_hpo_workflow(self):
0810 return self.check_special_handling("hpoWorkflow")
0811
0812
0813 def set_hpo_workflow(self):
0814 self.set_special_handling("hpoWorkflow")
0815
0816
0817 def is_no_looping_check(self):
0818 return self.check_special_handling("noLoopingCheck")
0819
0820
0821 def disable_looping_check(self):
0822 self.set_special_handling("noLoopingCheck")
0823
0824
0825 def to_encode_job_params(self):
0826 return self.check_special_handling("encJobParams")
0827
0828
0829 def set_encode_job_params(self):
0830 self.set_special_handling("encJobParams")
0831
0832
0833 def is_debug_mode(self):
0834 if self.specialHandling is not None:
0835 items = self.specialHandling.split(",")
0836 return self._tagForSH["debugMode"] in items or "debug" in items
0837 return False
0838
0839
0840 def set_debug_mode(self):
0841 self.set_special_handling("debugMode")
0842
0843
0844 def set_push_status_changes(self):
0845 self.set_special_handling("pushStatusChanges")
0846
0847
0848 def push_status_changes(self):
0849 return push_status_changes(self.specialHandling)
0850
0851
0852 def set_push_job(self):
0853 self.set_special_handling("pushJob")
0854
0855
0856 def is_push_job(self):
0857 return self.check_special_handling("pushJob")
0858
0859
0860 def set_on_site_merging(self):
0861 self.set_special_handling("onSiteMerging")
0862
0863
0864 def is_on_site_merging(self):
0865 return self.check_special_handling("onSiteMerging")
0866
0867
0868 def get_ram_for_retry(self):
0869 if self.specialHandling is not None:
0870 for tmpItem in self.specialHandling.split(","):
0871 if tmpItem.startswith(f"{self._tagForSH['retryRam']}:"):
0872 return int(tmpItem.split(":")[-1])
0873 return None
0874
0875
0876 def set_ram_for_retry(self, val):
0877 if self.specialHandling:
0878 items = self.specialHandling.split(",")
0879 else:
0880 items = []
0881
0882 newItems = []
0883 for tmpItem in items:
0884 if tmpItem.startswith(f"{self._tagForSH['retryRam']}:"):
0885 continue
0886 newItems.append(tmpItem)
0887 newItems.append(f"{self._tagForSH['retryRam']}:{val}")
0888 self.specialHandling = ",".join(newItems)
0889
0890
0891 def dump_to_json_serializable(self):
0892 job_state = self.__getstate__()
0893 file_state_list = []
0894 for file_spec in job_state[-1]:
0895 file_stat = file_spec.dump_to_json_serializable()
0896 file_state_list.append(file_stat)
0897 job_state = job_state[:-1]
0898
0899 job_state.append(file_state_list)
0900 return job_state
0901
0902
0903 def load_from_json_serializable(self, job_state):
0904
0905 self.__setstate__(job_state[:-1] + [[]])
0906
0907 for file_stat in job_state[-1]:
0908 file_spec = FileSpec()
0909 file_spec.__setstate__(file_stat)
0910 self.addFile(file_spec)
0911
0912 def load_from_dict(self, job_dict):
0913
0914 job_attrs = []
0915 for slot in self.__slots__:
0916 if slot == "Files":
0917 continue
0918 job_attrs.append(job_dict.get(slot, None))
0919
0920
0921 self.__setstate__(job_attrs + [[]])
0922
0923
0924 for file_data in job_dict.get("Files", []):
0925 file_spec = FileSpec()
0926 file_spec.__setstate__([file_data.get(s, None) for s in file_spec.__slots__])
0927 self.addFile(file_spec)
0928
0929
0930 def set_input_output_file_types(self) -> None:
0931 """
0932 Set input and output file types based on the input and output file names
0933 """
0934 in_types = set()
0935 out_types = set()
0936 for tmp_file in self.Files:
0937
0938 if tmp_file.dataset.startswith("ddo") or tmp_file.lfn.endswith(".lib.tgz"):
0939 continue
0940
0941 tmp_items = tmp_file.dataset.split(".")
0942 if (
0943 len(tmp_items) > 4
0944 and (not tmp_items[0] in ["", "NULL", "user", "group", "hc_test"])
0945 and (not tmp_items[0].startswith("group"))
0946 and not tmp_file.dataset.startswith("panda.um.")
0947 ):
0948 tmp_type = tmp_items[4]
0949 else:
0950 continue
0951 if tmp_file.type == "input":
0952 in_types.add(tmp_type)
0953 elif tmp_file.type == "output":
0954 out_types.add(tmp_type)
0955
0956 if in_types:
0957 in_types = sorted(list(in_types))
0958 self.inputFileType = ",".join(in_types)[: self._limitLength["inputFileType"]]
0959 if out_types:
0960 out_types = sorted(list(out_types))
0961 self.outputFileType = ",".join(out_types)[: self._limitLength["outputFileType"]]
0962
0963
0964 def set_task_queued_time(self, queued_time):
0965 """
0966 Set task queued time in job metrics. Skip if queued_time is None
0967
0968 :param queued_time: task queued time in seconds since epoch
0969 """
0970 if queued_time is None:
0971 return
0972 if self.specialHandling in [None, "", "NULL"]:
0973 items = []
0974 else:
0975 items = self.specialHandling.split(",")
0976 items.append(f"{self._tagForSH['taskQueuedTime']}={queued_time}")
0977 self.specialHandling = ",".join(items)
0978
0979
0980
0981
0982
0983
0984 def push_status_changes(special_handling):
0985 if special_handling is not None:
0986 items = special_handling.split(",")
0987 return JobSpec._tagForSH["pushStatusChanges"] in items
0988 return False
0989
0990
0991
0992 def get_task_queued_time(special_handling):
0993 """
0994 Get task queued time from job metrics
0995
0996 :param special_handling: special handling string
0997 :return: task queued time. None if unset
0998 """
0999 try:
1000 if special_handling is not None:
1001 for item in special_handling.split(","):
1002 if item.startswith(f"{JobSpec._tagForSH['taskQueuedTime']}="):
1003 return datetime.datetime.fromtimestamp(float(item.split("=")[-1]), datetime.timezone.utc)
1004 except Exception:
1005 pass
1006 return None