File indexing completed on 2026-04-10 08:39:06
0001 import enum
0002 import json
0003 import math
0004 import re
0005
0006 from pandaserver.taskbuffer import task_split_rules
0007
0008 """
0009 task specification for JEDI
0010
0011 """
0012
0013
0014 class JediTaskSpec(object):
0015
0016 attributes = (
0017 "jediTaskID",
0018 "taskName",
0019 "status",
0020 "userName",
0021 "creationDate",
0022 "modificationTime",
0023 "startTime",
0024 "endTime",
0025 "frozenTime",
0026 "prodSourceLabel",
0027 "workingGroup",
0028 "vo",
0029 "coreCount",
0030 "taskType",
0031 "processingType",
0032 "taskPriority",
0033 "currentPriority",
0034 "architecture",
0035 "transUses",
0036 "transHome",
0037 "transPath",
0038 "lockedBy",
0039 "lockedTime",
0040 "termCondition",
0041 "splitRule",
0042 "walltime",
0043 "walltimeUnit",
0044 "outDiskCount",
0045 "outDiskUnit",
0046 "workDiskCount",
0047 "workDiskUnit",
0048 "ramCount",
0049 "ramUnit",
0050 "ioIntensity",
0051 "ioIntensityUnit",
0052 "workQueue_ID",
0053 "progress",
0054 "failureRate",
0055 "errorDialog",
0056 "reqID",
0057 "oldStatus",
0058 "cloud",
0059 "site",
0060 "countryGroup",
0061 "parent_tid",
0062 "eventService",
0063 "ticketID",
0064 "ticketSystemType",
0065 "stateChangeTime",
0066 "superStatus",
0067 "campaign",
0068 "mergeRamCount",
0069 "mergeRamUnit",
0070 "mergeWalltime",
0071 "mergeWalltimeUnit",
0072 "throttledTime",
0073 "numThrottled",
0074 "mergeCoreCount",
0075 "goal",
0076 "assessmentTime",
0077 "cpuTime",
0078 "cpuTimeUnit",
0079 "cpuEfficiency",
0080 "baseWalltime",
0081 "nucleus",
0082 "baseRamCount",
0083 "ttcRequested",
0084 "ttcPredicted",
0085 "ttcPredictionDate",
0086 "rescueTime",
0087 "requestType",
0088 "gshare",
0089 "resource_type",
0090 "useJumbo",
0091 "diskIO",
0092 "diskIOUnit",
0093 "memory_leak_core",
0094 "memory_leak_x2",
0095 "attemptNr",
0096 "container_name",
0097 "framework",
0098 "activatedTime",
0099 "queuedTime",
0100 "actionTime",
0101 )
0102
0103 _zeroAttrs = ()
0104
0105 _forceUpdateAttrs = ("lockedBy", "lockedTime")
0106
0107 _seqAttrMap = {}
0108
0109 _limitLength = {"errorDialog": 510}
0110
0111 _attrLength = {"workingGroup": 32}
0112
0113
0114 splitRuleToken = task_split_rules.split_rule_dict
0115
0116
0117 _inv = {v: k for k, v in task_split_rules.enum_usePrePro.items()}
0118 enum_toPreProcess = _inv["toPreProcess"]
0119 enum_preProcessed = _inv["preProcessed"]
0120 enum_postPProcess = _inv["postPProcess"]
0121
0122 enum_limitedSites = task_split_rules.enum_limitedSites
0123
0124 _inv = {v: k for k, v in task_split_rules.enum_useScout.items()}
0125 enum_noScout = _inv["no_use"]
0126 enum_useScout = _inv["will_update_requirements"]
0127 enum_postScout = _inv["updated_requirements"]
0128
0129 _inv = {v: k for k, v in task_split_rules.enum_registerDatasets.items()}
0130 enum_toRegisterDS = _inv["registering"]
0131 enum_registeredDS = _inv["registered"]
0132
0133 enum_ipConnectivity = task_split_rules.enum_ipConnectivity
0134
0135 enum_ipStack = task_split_rules.enum_ipStack
0136
0137 enum_altStageOut = task_split_rules.enum_altStageOut
0138
0139 enum_inputLAN = task_split_rules.enum_inputLAN
0140
0141 worldCloudName = "WORLD"
0142
0143
0144 class FirstContentsFeed(enum.Enum):
0145 TRUE = "1"
0146 FALSE = "0"
0147
0148
0149 enum_useJumbo = {"waiting": "W", "running": "R", "pending": "P", "lack": "L", "disabled": "D"}
0150
0151
0152 enum_inputPreStaging = {v: k for k, v in task_split_rules.enum_inputPreStaging.items()}
0153
0154
0155 class FullChain(str, enum.Enum):
0156 Only = "1"
0157 Require = "2"
0158 Capable = "3"
0159
0160
0161 class OrderInputBy(str, enum.Enum):
0162 eventsAlignment = "1"
0163
0164
0165 def __init__(self):
0166
0167 for attr in self.attributes:
0168 if attr in self._zeroAttrs:
0169 object.__setattr__(self, attr, 0)
0170 else:
0171 object.__setattr__(self, attr, None)
0172
0173 object.__setattr__(self, "_changedAttrs", {})
0174
0175 object.__setattr__(self, "jobParamsTemplate", "")
0176
0177 object.__setattr__(self, "datasetSpecList", [])
0178
0179 object.__setattr__(self, "origErrorDialog", None)
0180
0181 object.__setattr__(self, "origUserName", None)
0182
0183
0184 def __setattr__(self, name, value):
0185 oldVal = getattr(self, name)
0186 if name in self._limitLength and value is not None:
0187
0188 if name == "errorDialog":
0189 object.__setattr__(self, "origErrorDialog", value)
0190 value = value[: self._limitLength[name]]
0191 object.__setattr__(self, name, value)
0192 newVal = getattr(self, name)
0193
0194 if oldVal != newVal or name in self._forceUpdateAttrs:
0195 self._changedAttrs[name] = value
0196
0197
0198 def copyAttributes(self, oldTaskSpec):
0199 for attr in self.attributes + ("jobParamsTemplate",):
0200 if "Time" in attr:
0201 continue
0202 if "Date" in attr:
0203 continue
0204 if attr in ["progress", "failureRate", "errorDialog", "status", "oldStatus", "lockedBy"]:
0205 continue
0206 self.__setattr__(attr, getattr(oldTaskSpec, attr))
0207
0208
0209 def resetChangedList(self):
0210 object.__setattr__(self, "_changedAttrs", {})
0211
0212
0213 def resetChangedAttr(self, name):
0214 try:
0215 del self._changedAttrs[name]
0216 except Exception:
0217 pass
0218
0219
0220 def reserve_old_attributes(self):
0221 for attName in ["ramCount", "walltime", "cpuTime", "startTime", "cpuTimeUnit", "outDiskCount", "workDiskCount", "ioIntensity", "diskIO"]:
0222 self.resetChangedAttr(attName)
0223
0224
0225 def forceUpdate(self, name):
0226 if name in self.attributes:
0227 self._changedAttrs[name] = getattr(self, name)
0228
0229
0230 def valuesMap(self, useSeq=False, onlyChanged=False):
0231 ret = {}
0232 for attr in self.attributes:
0233
0234 if useSeq and attr in self._seqAttrMap:
0235 continue
0236
0237 if onlyChanged:
0238 if attr not in self._changedAttrs:
0239 continue
0240 val = getattr(self, attr)
0241 if val is None:
0242 if attr in self._zeroAttrs:
0243 val = 0
0244 else:
0245 val = None
0246
0247 if attr in self._limitLength:
0248 if val is not None:
0249 val = val[: self._limitLength[attr]]
0250 ret[f":{attr}"] = val
0251 return ret
0252
0253
0254 def pack(self, values):
0255 for i in range(len(self.attributes)):
0256 attr = self.attributes[i]
0257 val = values[i]
0258 object.__setattr__(self, attr, val)
0259
0260
0261 def columnNames(cls, prefix=None):
0262 ret = ""
0263 for attr in cls.attributes:
0264 if prefix is not None:
0265 ret += f"{prefix}."
0266 ret += f"{attr},"
0267 ret = ret[:-1]
0268 return ret
0269
0270 columnNames = classmethod(columnNames)
0271
0272
0273 def bindValuesExpression(cls, useSeq=True):
0274 ret = "VALUES("
0275 for attr in cls.attributes:
0276 if useSeq and attr in cls._seqAttrMap:
0277 ret += f"{cls._seqAttrMap[attr]},"
0278 else:
0279 ret += f":{attr},"
0280 ret = ret[:-1]
0281 ret += ")"
0282 return ret
0283
0284 bindValuesExpression = classmethod(bindValuesExpression)
0285
0286
0287 def bindUpdateChangesExpression(self):
0288 ret = ""
0289 for attr in self.attributes:
0290 if attr in self._changedAttrs:
0291 ret += f"{attr}=:{attr},"
0292 ret = ret[:-1]
0293 ret += " "
0294 return ret
0295
0296
0297 def check_split_rule(self, key):
0298 if self.splitRule is not None:
0299 if re.search(self.splitRuleToken[key] + r"=(\d+)", self.splitRule):
0300 return True
0301 return False
0302
0303
0304 def getMaxSizePerJob(self):
0305 if self.splitRule is not None:
0306 tmpMatch = re.search(self.splitRuleToken["nGBPerJob"] + "=(\d+)", self.splitRule)
0307 if tmpMatch is not None:
0308 nGBPerJob = int(tmpMatch.group(1)) * 1024 * 1024 * 1024
0309 return nGBPerJob
0310 return None
0311
0312
0313 def removeMaxSizePerJob(self):
0314 self.removeSplitRule(self.splitRuleToken["nGBPerJob"])
0315
0316
0317 def getMaxSizePerMergeJob(self):
0318 if self.splitRule is not None:
0319 tmpMatch = re.search(self.splitRuleToken["nGBPerMergeJob"] + "=(\d+)", self.splitRule)
0320 if tmpMatch is not None:
0321 nGBPerJob = int(tmpMatch.group(1)) * 1024 * 1024 * 1024
0322 return nGBPerJob
0323 return None
0324
0325
0326 def getMaxNumFilesPerJob(self):
0327 if self.splitRule is not None:
0328 tmpMatch = re.search(self.splitRuleToken["nMaxFilesPerJob"] + "=(\d+)", self.splitRule)
0329 if tmpMatch is not None:
0330 return int(tmpMatch.group(1))
0331 return None
0332
0333
0334 def setMaxNumFilesPerJob(self, value):
0335 self.setSplitRule("nMaxFilesPerJob", value)
0336
0337
0338 def getMaxNumFilesPerMergeJob(self):
0339 if self.splitRule is not None:
0340 tmpMatch = re.search(self.splitRuleToken["nMaxFilesPerMergeJob"] + "=(\d+)", self.splitRule)
0341 if tmpMatch is not None:
0342 return int(tmpMatch.group(1))
0343 return 50
0344
0345
0346 def getNumEventsPerMergeJob(self):
0347 if self.splitRule is not None:
0348 tmpMatch = re.search(self.splitRuleToken["nEventsPerMergeJob"] + "=(\d+)", self.splitRule)
0349 if tmpMatch is not None:
0350 return int(tmpMatch.group(1))
0351 return None
0352
0353
0354 def usingJumboJobs(self):
0355 if self.useJumbo in self.enum_useJumbo.values() and self.useJumbo != self.enum_useJumbo["disabled"]:
0356 return True
0357 return False
0358
0359
0360 def getNumJumboJobs(self):
0361 if self.usingJumboJobs() and self.splitRule is not None:
0362 tmpMatch = re.search(self.splitRuleToken["nJumboJobs"] + "=(\d+)", self.splitRule)
0363 if tmpMatch is not None:
0364 return int(tmpMatch.group(1))
0365 return None
0366
0367
0368 def getMaxJumboPerSite(self):
0369 if self.usingJumboJobs() and self.splitRule is not None:
0370 tmpMatch = re.search(self.splitRuleToken["maxJumboPerSite"] + "=(\d+)", self.splitRule)
0371 if tmpMatch is not None:
0372 return int(tmpMatch.group(1))
0373 return 1
0374
0375
0376 def getNumSitesPerJob(self):
0377 if not self.useEventService():
0378 return 1
0379 if self.splitRule is not None:
0380 tmpMatch = re.search(self.splitRuleToken["nSitesPerJob"] + "=(\d+)", self.splitRule)
0381 if tmpMatch is not None:
0382 return int(tmpMatch.group(1))
0383 return 1
0384
0385
0386 def getNumFilesPerJob(self):
0387 if self.splitRule is not None:
0388 tmpMatch = re.search(self.splitRuleToken["nFilesPerJob"] + "=(\d+)", self.splitRule)
0389 if tmpMatch is not None:
0390 n = int(tmpMatch.group(1))
0391 if self.dynamicNumEvents():
0392 inn = self.get_num_events_per_input()
0393 dyn = self.get_min_granularity()
0394 if inn and dyn and inn > dyn:
0395 n *= inn // dyn
0396 return n
0397 return None
0398
0399
0400 def removeNumFilesPerJob(self):
0401 self.removeSplitRule(self.splitRuleToken["nFilesPerJob"])
0402
0403
0404 def getNumFilesPerMergeJob(self):
0405 if self.splitRule is not None:
0406 tmpMatch = re.search(self.splitRuleToken["nFilesPerMergeJob"] + "=(\d+)", self.splitRule)
0407 if tmpMatch is not None:
0408 return int(tmpMatch.group(1))
0409 return None
0410
0411
0412 def getNumEventsPerJob(self):
0413 if self.splitRule is not None:
0414 tmpMatch = re.search(self.splitRuleToken["nEventsPerJob"] + "=(\d+)", self.splitRule)
0415 if tmpMatch is not None:
0416 return int(tmpMatch.group(1))
0417 return None
0418
0419
0420 def getRndmSeedOffset(self):
0421 if self.splitRule is not None:
0422 tmpMatch = re.search(self.splitRuleToken["randomSeed"] + "=(\d+)", self.splitRule)
0423 if tmpMatch is not None:
0424 return int(tmpMatch.group(1))
0425 return 0
0426
0427
0428 def getFirstEventOffset(self):
0429 if self.splitRule is not None:
0430 tmpMatch = re.search(self.splitRuleToken["firstEvent"] + "=(\d+)", self.splitRule)
0431 if tmpMatch is not None:
0432 return int(tmpMatch.group(1))
0433 return 0
0434
0435
0436 def useGroupWithBoundaryID(self):
0437 if self.splitRule is not None:
0438 tmpMatch = re.search(self.splitRuleToken["groupBoundaryID"] + "=(\d+)", self.splitRule)
0439 if tmpMatch is not None:
0440 gbID = int(tmpMatch.group(1))
0441
0442
0443
0444
0445
0446
0447
0448
0449
0450
0451
0452
0453
0454 retMap = {}
0455 if gbID in [1, 2]:
0456 retMap["inSplit"] = 1
0457 else:
0458 retMap["inSplit"] = 2
0459 if gbID in [1, 3]:
0460 retMap["outMap"] = False
0461 else:
0462 retMap["outMap"] = True
0463 retMap["secSplit"] = None
0464 return retMap
0465 return None
0466
0467
0468 def useBuild(self):
0469 return self.check_split_rule("useBuild")
0470
0471
0472 def useJobCloning(self):
0473 return self.check_split_rule("useJobCloning")
0474
0475
0476 def getJobCloningType(self):
0477 if self.splitRule is not None:
0478 tmpMatch = re.search(self.splitRuleToken["useJobCloning"] + "=(\d+)", self.splitRule)
0479 if tmpMatch is not None:
0480 return tmpMatch.group(1)
0481 return ""
0482
0483
0484 def reuseSecOnDemand(self):
0485 return self.check_split_rule("reuseSecOnDemand")
0486
0487
0488 def noWaitParent(self):
0489 return self.check_split_rule("noWaitParent")
0490
0491
0492 def noWaitParentSL(cls, splitRule):
0493 if splitRule is not None:
0494 tmpMatch = re.search(cls.splitRuleToken["noWaitParent"] + "=(\d+)", splitRule)
0495 if tmpMatch is not None:
0496 return True
0497 return False
0498
0499 noWaitParentSL = classmethod(noWaitParentSL)
0500
0501
0502 def useLimitedSites(self):
0503 return self.check_split_rule("limitedSites")
0504
0505
0506 def setLimitedSites(self, policy):
0507 tag = None
0508 for tmpIdx, tmpPolicy in self.enum_limitedSites.items():
0509 if policy == tmpPolicy:
0510 tag = tmpIdx
0511 break
0512
0513 if tag is None:
0514 return
0515
0516 if self.splitRule is None:
0517
0518 self.splitRule = self.splitRuleToken["limitedSites"] + "=" + tag
0519 else:
0520 tmpMatch = re.search(self.splitRuleToken["limitedSites"] + "=(\d+)", self.splitRule)
0521 if tmpMatch is None:
0522
0523 self.splitRule += "," + self.splitRuleToken["limitedSites"] + "=" + tag
0524 else:
0525
0526 self.splitRule = re.sub(self.splitRuleToken["limitedSites"] + "=(\d+)", self.splitRuleToken["limitedSites"] + "=" + tag, self.splitRule)
0527
0528
0529 def useLocalIO(self):
0530 if self.splitRule is not None:
0531 tmpMatch = re.search(self.splitRuleToken["useLocalIO"] + "=(\d+)", self.splitRule)
0532 if tmpMatch is not None and int(tmpMatch.group(1)):
0533 return True
0534 return False
0535
0536
0537 def useEventService(self, siteSpec=None):
0538 if self.eventService in [1, 2]:
0539
0540 if self.switchEStoNormal() and siteSpec is not None and siteSpec.getJobSeed() in ["all"]:
0541 return False
0542 return True
0543 return False
0544
0545
0546 def getNumEventsPerWorker(self):
0547 if self.splitRule is not None:
0548 tmpMatch = re.search(self.splitRuleToken["nEventsPerWorker"] + "=(\d+)", self.splitRule)
0549 if tmpMatch is not None:
0550 return int(tmpMatch.group(1))
0551 return None
0552
0553
0554 def getNumEventServiceConsumer(self):
0555 if not self.useEventService():
0556 return None
0557 if self.splitRule is not None:
0558 tmpMatch = re.search(self.splitRuleToken["nEsConsumers"] + "=(\d+)", self.splitRule)
0559 if tmpMatch is not None:
0560 return int(tmpMatch.group(1))
0561 return None
0562
0563
0564 def disableAutoRetry(self):
0565 return self.check_split_rule("disableAutoRetry")
0566
0567
0568 def disableReassign(self):
0569 return self.check_split_rule("disableReassign")
0570
0571
0572 def allowEmptyInput(self):
0573 return self.check_split_rule("allowEmptyInput")
0574
0575
0576 def useListPFN(self):
0577 return self.check_split_rule("pfnList")
0578
0579
0580 def setPrePro(self):
0581 if self.splitRule is None:
0582
0583 self.splitRule = self.splitRuleToken["usePrePro"] + "=" + self.enum_toPreProcess
0584 else:
0585
0586 self.splitRule += "," + self.splitRuleToken["usePrePro"] + "=" + self.enum_toPreProcess
0587
0588
0589 def usePrePro(self):
0590 if self.splitRule is not None:
0591 tmpMatch = re.search(self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRule)
0592 if tmpMatch is not None and tmpMatch.group(1) == self.enum_toPreProcess:
0593 return True
0594 return False
0595
0596
0597 def setPreProcessed(self):
0598 if self.splitRule is None:
0599
0600 self.splitRule = self.splitRuleToken["usePrePro"] + "=" + self.enum_preProcessed
0601 else:
0602 tmpMatch = re.search(self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRule)
0603 if tmpMatch is None:
0604
0605 self.splitRule += "," + self.splitRuleToken["usePrePro"] + "=" + self.enum_preProcessed
0606 else:
0607
0608 self.splitRule = re.sub(
0609 self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRuleToken["usePrePro"] + "=" + self.enum_preProcessed, self.splitRule
0610 )
0611 return
0612
0613
0614 def checkPreProcessed(self):
0615 if self.splitRule is not None:
0616 tmpMatch = re.search(self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRule)
0617 if tmpMatch is not None and tmpMatch.group(1) == self.enum_preProcessed:
0618 return True
0619 return False
0620
0621
0622 def setPostPreProcess(self):
0623 if self.splitRule is None:
0624
0625 self.splitRule = self.splitRuleToken["usePrePro"] + "=" + self.enum_postPProcess
0626 else:
0627 tmpMatch = re.search(self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRule)
0628 if tmpMatch is None:
0629
0630 self.splitRule += "," + self.splitRuleToken["usePrePro"] + "=" + self.enum_postPProcess
0631 else:
0632
0633 self.splitRule = re.sub(
0634 self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRuleToken["usePrePro"] + "=" + self.enum_postPProcess, self.splitRule
0635 )
0636 return
0637
0638
0639 def instantiateTmpl(self):
0640 return self.check_split_rule("instantiateTmpl")
0641
0642
0643 def instantiateTmplSite(self):
0644 return self.check_split_rule("instantiateTmplSite")
0645
0646
0647 def mergeOutput(self):
0648 return self.check_split_rule("mergeOutput")
0649
0650
0651 def useRandomSeed(self):
0652 return self.check_split_rule("randomSeed")
0653
0654
0655 def getWorkDiskSize(self):
0656 safetyMargin = 300 * 1024 * 1024
0657 tmpSize = self.workDiskCount
0658 if tmpSize is None:
0659 return 0
0660 if self.workDiskUnit == "GB":
0661 tmpSize = tmpSize * 1024 * 1024 * 1024
0662 elif self.workDiskUnit == "MB":
0663 tmpSize = tmpSize * 1024 * 1024
0664 elif self.workDiskUnit == "kB":
0665 tmpSize = tmpSize * 1024
0666 tmpSize += safetyMargin
0667 return tmpSize
0668
0669
0670 def getOutDiskSize(self):
0671 tmpSize = self.outDiskCount
0672 if tmpSize is None or tmpSize < 0:
0673 return 0
0674 if self.outDiskUnit is not None:
0675 if self.outDiskUnit.startswith("GB"):
0676 tmpSize = tmpSize * 1024 * 1024 * 1024
0677 elif self.outDiskUnit.startswith("MB"):
0678 tmpSize = tmpSize * 1024 * 1024
0679 elif self.outDiskUnit.startswith("kB"):
0680 tmpSize = tmpSize * 1024
0681 return tmpSize
0682
0683
0684 def outputScaleWithEvents(self):
0685 if self.outDiskUnit is not None and "PerEvent" in self.outDiskUnit:
0686 return True
0687 return False
0688
0689
0690 def statusToUpdateContents(cls):
0691 return ["defined"]
0692
0693 statusToUpdateContents = classmethod(statusToUpdateContents)
0694
0695
0696 def setOnHold(self):
0697
0698 if self.status in ["ready", "running", "merging", "scouting", "defined", "topreprocess", "preprocessing", "registered", "prepared", "rerefine"]:
0699 self.oldStatus = self.status
0700 self.status = "pending"
0701
0702
0703 def statusToRejectExtChange(cls):
0704 return ["finished", "done", "prepared", "broken", "tobroken", "aborted", "toabort", "aborting", "failed", "passed"]
0705
0706 statusToRejectExtChange = classmethod(statusToRejectExtChange)
0707
0708
0709 def statusToRetry(cls):
0710 return ["finished", "failed", "aborted", "exhausted"]
0711
0712 statusToRetry = classmethod(statusToRetry)
0713
0714
0715 def statusToIncexec(cls):
0716 return ["done"] + cls.statusToRetry()
0717
0718 statusToIncexec = classmethod(statusToIncexec)
0719
0720
0721 def statusToReassign(cls):
0722 return ["registered", "defined", "ready", "running", "scouting", "scouted", "pending", "assigning", "exhausted"]
0723
0724 statusToReassign = classmethod(statusToReassign)
0725
0726
0727 def statusForJobGenerator(cls):
0728 return ["ready", "running", "scouting", "topreprocess", "preprocessing"]
0729
0730 statusForJobGenerator = classmethod(statusForJobGenerator)
0731
0732
0733 def statusNotToPause(cls):
0734 return ["finished", "failed", "done", "aborted", "broken", "paused"]
0735
0736 statusNotToPause = classmethod(statusNotToPause)
0737
0738
0739 def commandStatusMap(cls):
0740 return {
0741 "kill": {"doing": "aborting", "done": "toabort"},
0742 "finish": {"doing": "finishing", "done": "passed"},
0743 "retry": {"doing": "toretry", "done": "ready"},
0744 "incexec": {"doing": "toincexec", "done": "rerefine"},
0745 "reassign": {"doing": "toreassign", "done": "reassigning"},
0746 "pause": {"doing": "paused", "done": "dummy"},
0747 "resume": {"doing": "dummy", "done": "dummy"},
0748 "avalanche": {"doing": "dummy", "done": "dummy"},
0749 "release": {"doing": "dummy", "done": "dummy"},
0750 }
0751
0752 commandStatusMap = classmethod(commandStatusMap)
0753
0754
0755 def get_retry_command_qualifiers(
0756 cls,
0757 no_child_retry: bool = False,
0758 discard_events: bool = False,
0759 disable_staging_mode: bool = False,
0760 keep_gshare_priority: bool = False,
0761 ignore_hard_exhausted: bool = False,
0762 ) -> list:
0763 """
0764 Get the list of qualifiers for the retry command.
0765 :param no_child_retry: If True, retry will not be attempted on child tasks.
0766 :param discard_events: If True, events will be discarded.
0767 :param disable_staging_mode: If True, staging mode will be disabled.
0768 :param keep_gshare_priority: If True, current gshare and priority will be kept.
0769 :param ignore_hard_exhausted: If True, the limits for hard exhausted will be ignored.
0770 :return: A list of qualifiers.
0771 """
0772 qualifiers = []
0773 if no_child_retry:
0774 qualifiers.append("sole")
0775 if discard_events:
0776 qualifiers.append("discard")
0777 if disable_staging_mode:
0778 qualifiers.append("staged")
0779 if keep_gshare_priority:
0780 qualifiers.append("keep")
0781 if ignore_hard_exhausted:
0782 qualifiers.append("transcend")
0783 return qualifiers
0784
0785 get_retry_command_qualifiers = classmethod(get_retry_command_qualifiers)
0786
0787
0788 def setErrDiag(self, diag, append=False, prepend=False):
0789
0790 if diag:
0791 try:
0792 diag.encode()
0793 except UnicodeEncodeError:
0794
0795 diag = re.sub(r"[^\x00-\x7F]+", "<non-ASCII char>", diag)
0796
0797 if append is True and self.errorDialog is not None:
0798 if not prepend:
0799 self.errorDialog = f"{self.errorDialog} {diag}"
0800 else:
0801 self.errorDialog = f"{diag} {self.errorDialog}"
0802 elif append is None:
0803
0804 if self.errorDialog is None:
0805 self.errorDialog = diag
0806 else:
0807 self.errorDialog = diag
0808
0809
0810 def useLoadXML(self):
0811 return self.check_split_rule("loadXML")
0812
0813
0814 def makeFQANs(self):
0815
0816 if self.workingGroup is not None:
0817 fqan = f"/{self.vo}/{self.workingGroup}/Role=production"
0818 else:
0819 if self.vo is not None:
0820 fqan = f"/{self.vo}/Role=NULL"
0821 else:
0822 return []
0823
0824 return [fqan]
0825
0826
0827 def setSplitRule(self, ruleName, ruleValue):
0828 if self.splitRule is None:
0829
0830 self.splitRule = self.splitRuleToken[ruleName] + "=" + ruleValue
0831 else:
0832 tmpMatch = re.search(self.splitRuleToken[ruleName] + "=(\d+)", self.splitRule)
0833 if tmpMatch is None:
0834
0835 self.splitRule += "," + self.splitRuleToken[ruleName] + "=" + ruleValue
0836 else:
0837
0838 self.splitRule = re.sub(self.splitRuleToken[ruleName] + "=(\d+)", self.splitRuleToken[ruleName] + "=" + ruleValue, self.splitRule)
0839
0840
0841 def removeSplitRule(self, ruleName):
0842 if self.splitRule is not None:
0843 items = self.splitRule.split(",")
0844 newItems = []
0845 for item in items:
0846
0847 tmpRuleName = item.split("=")[0]
0848 if ruleName == tmpRuleName:
0849 continue
0850 newItems.append(item)
0851 self.splitRule = ",".join(newItems)
0852
0853
0854 def setUseScout(self, useFlag):
0855 if useFlag:
0856 self.setSplitRule("useScout", self.enum_useScout)
0857 else:
0858 self.setSplitRule("useScout", self.enum_noScout)
0859
0860
0861 def setPostScout(self):
0862 self.setSplitRule("useScout", self.enum_postScout)
0863
0864
0865 def useScout(self, splitRule=None):
0866 if splitRule is None:
0867 splitRule = self.splitRule
0868 if splitRule is not None:
0869 tmpMatch = re.search(self.splitRuleToken["useScout"] + "=(\d+)", splitRule)
0870 if tmpMatch is not None and tmpMatch.group(1) == self.enum_useScout:
0871 return True
0872 return False
0873
0874
0875 def useExhausted(self):
0876 return self.check_split_rule("useExhausted")
0877
0878
0879 def useRealNumEvents(self):
0880 return self.check_split_rule("useRealNumEvents")
0881
0882
0883 def useFileAsSourceLFN(self):
0884 return self.check_split_rule("useFileAsSourceLFN")
0885
0886
0887 def isPostScout(self):
0888 if self.splitRule is not None:
0889 tmpMatch = re.search(self.splitRuleToken["useScout"] + "=(\d+)", self.splitRule)
0890 if tmpMatch is not None and tmpMatch.group(1) == self.enum_postScout:
0891 return True
0892 return False
0893
0894
0895 def waitInput(self):
0896 return self.check_split_rule("waitInput")
0897
0898
0899 def inputPreStaging(self):
0900 if self.splitRule is not None:
0901 tmpMatch = re.search(self.splitRuleToken["inputPreStaging"] + "=" + self.enum_inputPreStaging["use"], self.splitRule)
0902 if tmpMatch is not None:
0903 return True
0904 return False
0905
0906
0907 def setDdmBackEnd(self, backEnd):
0908 if self.splitRule is None:
0909
0910 self.splitRule = self.splitRuleToken["ddmBackEnd"] + "=" + backEnd
0911 else:
0912 tmpMatch = re.search(self.splitRuleToken["ddmBackEnd"] + "=([^,$]+)", self.splitRule)
0913 if tmpMatch is None:
0914
0915 self.splitRule += "," + self.splitRuleToken["ddmBackEnd"] + "=" + backEnd
0916 else:
0917
0918 self.splitRule = re.sub(self.splitRuleToken["ddmBackEnd"] + "=([^,$]+)", self.splitRuleToken["ddmBackEnd"] + "=" + backEnd, self.splitRule)
0919
0920
0921 def getDdmBackEnd(self):
0922 if self.splitRule is not None:
0923 tmpMatch = re.search(self.splitRuleToken["ddmBackEnd"] + "=([^,$]+)", self.splitRule)
0924 if tmpMatch is not None:
0925 return tmpMatch.group(1)
0926 return None
0927
0928
0929 def getFieldNumToLFN(self):
0930 try:
0931 if self.splitRule is not None:
0932 tmpMatch = re.search(self.splitRuleToken["addNthFieldToLFN"] + "=([,\d]+)", self.splitRule)
0933 if tmpMatch is not None:
0934 tmpList = tmpMatch.group(1).split(",")
0935 try:
0936 tmpList.remove("")
0937 except Exception:
0938 pass
0939 return list(map(int, tmpList))
0940 except Exception:
0941 pass
0942 return None
0943
0944
0945 def getScoutSuccessRate(self):
0946 if self.splitRule is not None:
0947 tmpMatch = re.search(self.splitRuleToken["scoutSuccessRate"] + "=(\d+)", self.splitRule)
0948 if tmpMatch is not None:
0949 return int(tmpMatch.group(1))
0950 return None
0951
0952
0953 def getT1Weight(self):
0954 if self.splitRule is not None:
0955 tmpMatch = re.search(self.splitRuleToken["t1Weight"] + "=(-*\d+)", self.splitRule)
0956 if tmpMatch is not None:
0957 return int(tmpMatch.group(1))
0958 return 0
0959
0960
0961 def respectLumiblock(self):
0962 return self.check_split_rule("respectLB")
0963
0964
0965 def releasePerLumiblock(self):
0966 return self.check_split_rule("releasePerLB")
0967
0968
0969 def orderByLB(self):
0970 return self.check_split_rule("orderByLB")
0971
0972
0973 def respectSplitRule(self):
0974 return self.check_split_rule("respectSplitRule")
0975
0976
0977 def allowPartialFinish(self):
0978 return self.check_split_rule("allowPartialFinish")
0979
0980
0981 def toRegisterDatasets(self):
0982 if self.splitRule is not None:
0983 tmpMatch = re.search(self.splitRuleToken["registerDatasets"] + "=(\d+)", self.splitRule)
0984 if tmpMatch is not None and tmpMatch.group(1) == self.enum_toRegisterDS:
0985 return True
0986 return False
0987
0988
0989 def registeredDatasets(self):
0990 self.setSplitRule("registerDatasets", self.enum_registeredDS)
0991
0992
0993 def setToRegisterDatasets(self):
0994 self.setSplitRule("registerDatasets", self.enum_toRegisterDS)
0995
0996
0997 def getMaxAttemptES(self):
0998 if self.splitRule is not None:
0999 tmpMatch = re.search(self.splitRuleToken["maxAttemptES"] + "=(\d+)", self.splitRule)
1000 if tmpMatch is not None:
1001 return int(tmpMatch.group(1))
1002 return None
1003
1004
1005 def getMaxAttemptEsJob(self):
1006 if self.splitRule is not None:
1007 tmpMatch = re.search(self.splitRuleToken["maxAttemptEsJob"] + "=(\d+)", self.splitRule)
1008 if tmpMatch is not None:
1009 return int(tmpMatch.group(1))
1010 return self.getMaxAttemptES()
1011
1012
1013 def checkAttrLength(self):
1014 for attrName, attrLength in self._attrLength.items():
1015 attrVal = getattr(self, attrName)
1016 if attrVal is None:
1017 continue
1018 if len(attrVal) > attrLength:
1019 setattr(self, attrName, None)
1020 self.errorDialog = f"{attrName} is too long (actual: {len(attrVal)}, maximum: {attrLength})"
1021 return False
1022 return True
1023
1024
1025 def setIpConnectivity(self, value):
1026 if not value:
1027 return
1028 values = value.split("#")
1029 if not values:
1030 return
1031 ipConnectivity = values[0]
1032 if ipConnectivity in self.enum_ipConnectivity.values():
1033 for tmpKey, tmpVal in self.enum_ipConnectivity.items():
1034 if ipConnectivity == tmpVal:
1035 self.setSplitRule("ipConnectivity", tmpKey)
1036 break
1037 if len(values) > 1:
1038 ipStack = values[1]
1039 if ipStack in self.enum_ipStack.values():
1040 for tmpKey, tmpVal in self.enum_ipStack.items():
1041 if ipStack == tmpVal:
1042 self.setSplitRule("ipStack", tmpKey)
1043 break
1044
1045
1046 def getIpConnectivity(self):
1047 if self.splitRule is not None:
1048 tmpMatch = re.search(self.splitRuleToken["ipConnectivity"] + "=(\d+)", self.splitRule)
1049 if tmpMatch is not None:
1050 return self.enum_ipConnectivity[tmpMatch.group(1)]
1051 return None
1052
1053
1054 def getIpStack(self):
1055 if self.splitRule is not None:
1056 tmpMatch = re.search(self.splitRuleToken["ipStack"] + "=(\d+)", self.splitRule)
1057 if tmpMatch is not None:
1058 return self.enum_ipStack[tmpMatch.group(1)]
1059 return None
1060
1061
1062 def useHS06(self):
1063 return self.cpuTimeUnit in ["HS06sPerEvent", "HS06sPerEventFixed", "mHS06sPerEvent", "mHS06sPerEventFixed"]
1064
1065
1066 def getCpuTime(self):
1067 if not self.useHS06():
1068 return None
1069 try:
1070 if self.cpuTimeUnit.startswith("m"):
1071 return float(self.cpuTime) / 1000.0
1072 except Exception:
1073 pass
1074 return self.cpuTime
1075
1076
1077 def ramPerCore(self):
1078 return self.ramUnit in ["MBPerCore", "MBPerCoreFixed"]
1079
1080
1081 def runUntilClosed(self):
1082 return self.check_split_rule("runUntilClosed")
1083
1084
1085 def stayOutputOnSite(self):
1086 return self.check_split_rule("stayOutputOnSite")
1087
1088
1089 def failGoalUnreached(self):
1090 return self.check_split_rule("failGoalUnreached")
1091
1092
1093 def unsetFailGoalUnreached(self):
1094 self.removeSplitRule(self.splitRuleToken["failGoalUnreached"])
1095
1096
1097 def switchEStoNormal(self):
1098 return self.check_split_rule("switchEStoNormal")
1099
1100
1101 def useWorldCloud(self):
1102 return self.cloud == self.worldCloudName
1103
1104
1105 def dynamicNumEvents(self):
1106 if self.splitRule is not None:
1107 tmpMatch = re.search(self.splitRuleToken["dynamicNumEvents"] + "=(\d+)", self.splitRule)
1108 if tmpMatch is not None:
1109 return True
1110 return False
1111
1112
1113 def get_min_granularity(self):
1114 if self.splitRule is not None:
1115 tmpMatch = re.search(self.splitRuleToken["dynamicNumEvents"] + "=(\d+)", self.splitRule)
1116 if tmpMatch is not None:
1117 return int(tmpMatch.group(1))
1118 return None
1119
1120
1121 def setAltStageOut(self, value):
1122 if value in self.enum_altStageOut.values():
1123 for tmpKey, tmpVal in self.enum_altStageOut.items():
1124 if value == tmpVal:
1125 self.setSplitRule("altStageOut", tmpKey)
1126 break
1127
1128
1129 def getAltStageOut(self):
1130 if self.splitRule is not None:
1131 tmpMatch = re.search(self.splitRuleToken["altStageOut"] + "=(\d+)", self.splitRule)
1132 if tmpMatch is not None:
1133 return self.enum_altStageOut[tmpMatch.group(1)]
1134 return None
1135
1136
1137 def allowInputWAN(self):
1138 return self.check_split_rule("allowInputWAN")
1139
1140
1141 def setAllowInputLAN(self, value):
1142 if value in self.enum_inputLAN.values():
1143 for tmpKey, tmpVal in self.enum_inputLAN.items():
1144 if value == tmpVal:
1145 self.setSplitRule("allowInputLAN", tmpKey)
1146 break
1147
1148
1149 def allowInputLAN(self):
1150 if self.splitRule is not None:
1151 tmpMatch = re.search(self.splitRuleToken["allowInputLAN"] + "=(\d+)", self.splitRule)
1152 if tmpMatch is not None:
1153 return self.enum_inputLAN[tmpMatch.group(1)]
1154 return None
1155
1156
1157 def putLogToOS(self):
1158 return self.check_split_rule("putLogToOS")
1159
1160
1161 def mergeEsOnOS(self):
1162 return self.check_split_rule("mergeEsOnOS")
1163
1164
1165 def writeInputToFile(self):
1166 return self.check_split_rule("writeInputToFile")
1167
1168
1169 def ignoreMissingInDS(self):
1170 return self.check_split_rule("ignoreMissingInDS")
1171
1172
1173 def noExecStrCnv(self):
1174 return self.check_split_rule("noExecStrCnv")
1175
1176
1177 def inFilePosEvtNum(self):
1178 return self.check_split_rule("inFilePosEvtNum")
1179
1180
1181 def registerEsFiles(self):
1182 return self.check_split_rule("registerEsFiles")
1183
1184
1185 def disableAutoFinish(self):
1186 return self.check_split_rule("disableAutoFinish")
1187
1188
1189 def resetRefinedAttrs(self):
1190 self.resetChangedAttr("splitRule")
1191 self.resetChangedAttr("eventService")
1192 self.reserve_old_attributes()
1193
1194
1195 def resurrectConsumers(self):
1196 return self.check_split_rule("resurrectConsumers")
1197
1198
1199 def usePrefetcher(self):
1200 return self.check_split_rule("usePrefetcher")
1201
1202
1203 def noInputPooling(self):
1204 return self.check_split_rule("noInputPooling")
1205
1206
1207 def nChunksToWait(self):
1208 if self.splitRule is not None:
1209 tmpMatch = re.search(self.splitRuleToken["nChunksToWait"] + "=(\d+)", self.splitRule)
1210 if tmpMatch is not None:
1211 return int(tmpMatch.group(1))
1212 return None
1213
1214
1215 def getMaxWalltime(self):
1216 if self.splitRule is not None:
1217 tmpMatch = re.search(self.splitRuleToken["maxWalltime"] + "=(\d+)", self.splitRule)
1218 if tmpMatch is not None:
1219 return int(tmpMatch.group(1)) * 60 * 60
1220 return None
1221
1222
1223 def set_max_walltime(self, value):
1224 self.setSplitRule("maxWalltime", str(value))
1225
1226
1227 def getTgtMaxOutputForNG(self):
1228 if self.splitRule is not None:
1229 tmpMatch = re.search(self.splitRuleToken["tgtMaxOutputForNG"] + "=(\d+)", self.splitRule)
1230 if tmpMatch is not None:
1231 return int(tmpMatch.group(1))
1232 return None
1233
1234
1235 def notDiscardEvents(self):
1236 return self.check_split_rule("notDiscardEvents")
1237
1238
1239 def getMinCpuEfficiency(self):
1240 if self.splitRule is not None:
1241 tmpMatch = re.search(self.splitRuleToken["minCpuEfficiency"] + "=(\d+)", self.splitRule)
1242 if tmpMatch is not None:
1243 return int(tmpMatch.group(1))
1244 return None
1245
1246
1247 def decAttOnFailedES(self):
1248 return self.check_split_rule("decAttOnFailedES")
1249
1250
1251 def useZipToPin(self):
1252 return self.check_split_rule("useZipToPin")
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264 def reformat_architecture(self):
1265 if self.architecture is None:
1266 return
1267 encoded_platform = ""
1268 try:
1269
1270 tmp_dict = json.loads(self.architecture)
1271
1272 if "encoded_platform" not in tmp_dict:
1273 return
1274 encoded_platform = tmp_dict["encoded_platform"]
1275 except Exception:
1276 pass
1277
1278 new_dict = {}
1279 val = self.get_sw_platform()
1280 if val:
1281 new_dict["sw_platform"] = val
1282 val = self.get_base_platform(encoded_platform)
1283 if val:
1284 new_dict["base_platform"] = val
1285 val = self.get_host_cpu_spec(encoded_platform)
1286 if val:
1287
1288 l = [x for x in [{k: v for k, v in d.items() if v != "*"} for d in val] if x]
1289 if l:
1290 new_dict["cpu_specs"] = l
1291 val = self.get_host_gpu_spec()
1292 if val:
1293
1294 l = {k: v for k, v in val.items() if v != "*"}
1295 if l:
1296 new_dict["gpu_spec"] = l
1297 self.architecture = json.dumps(new_dict)
1298
1299
1300 def get_sw_platform(self):
1301 try:
1302 d = json.loads(self.architecture)
1303 return d.get("sw_platform", "")
1304 except Exception:
1305 pass
1306 if self.architecture is not None:
1307 m = re.search("^([^@#&]*)", self.architecture)
1308 if m:
1309 return m.group(1)
1310 return self.architecture
1311
1312
1313 def get_base_platform(self, encoded_platform=None):
1314 try:
1315 d = json.loads(self.architecture)
1316 val = d.get("base_platform", None)
1317 if val is not None or encoded_platform is None:
1318 return val
1319 except Exception:
1320 pass
1321 if encoded_platform:
1322 architecture = encoded_platform
1323 else:
1324 architecture = self.architecture
1325 if architecture is None or "@" not in architecture:
1326 return None
1327 m = re.search("@([^#&]*)", architecture)
1328 img = m.group(1)
1329 if img == "":
1330 img = None
1331 return img
1332
1333
1334 def get_platforms(self):
1335 if self.architecture is not None:
1336 platform = self.get_sw_platform()
1337 base = self.get_base_platform()
1338 if base:
1339 platform += "@" + base
1340 return platform
1341 return self.architecture
1342
1343
1344 def get_host_cpu_spec(self, encoded_platform=None):
1345 try:
1346 d = json.loads(self.architecture)
1347 specs = d.get("cpu_specs", None)
1348 if not specs and encoded_platform is None:
1349 return None
1350 else:
1351 for spec in specs:
1352 spec.setdefault("vendor", "*")
1353 spec.setdefault("instr", "*")
1354 return specs
1355 except Exception:
1356 pass
1357 if encoded_platform:
1358 architecture = encoded_platform
1359 else:
1360 architecture = self.architecture
1361 try:
1362 if not architecture:
1363 return None
1364 if "#" not in architecture:
1365 if re.search(r"^[\^@&]", architecture):
1366 return None
1367 arch = architecture.split("-")[0]
1368 if arch:
1369 return [{"arch": arch, "vendor": "*", "instr": "*"}]
1370 return None
1371 m = re.search(r"#([^\^@&]*)", architecture)
1372 spec_strs = m.group(1)
1373 if not spec_strs:
1374 return None
1375
1376 if spec_strs.startswith("("):
1377 spec_strs = spec_strs[1:]
1378 if spec_strs.endswith(")"):
1379 spec_strs = spec_strs[:-1]
1380 specs = []
1381 for spec_str in spec_strs.split("|"):
1382 spec_str += "-*" * (2 - spec_str.count("-"))
1383 if "-" not in spec_str:
1384 spec_str += "-*"
1385 items = spec_str.split("-")
1386 spec = {"arch": items[0], "vendor": items[1], "instr": items[2]}
1387 specs.append(spec)
1388 return specs
1389 except Exception:
1390 return None
1391
1392 def get_host_cpu_preference(self):
1393 try:
1394 d = json.loads(self.architecture)
1395 cpu_pref = d.get("cpu_pref", None)
1396 return cpu_pref
1397 except Exception:
1398 return None
1399
1400
1401 def get_host_gpu_spec(self):
1402 try:
1403 d = json.loads(self.architecture)
1404 spec = d.get("gpu_spec", None)
1405 spec.setdefault("vendor", "*")
1406 spec.setdefault("model", "*")
1407 return spec
1408 except Exception:
1409 pass
1410 try:
1411 if self.architecture is None or "&" not in self.architecture:
1412 return None
1413 m = re.search(r"&([^\^@#]*)", self.architecture)
1414 spec_str = m.group(1)
1415 if not spec_str:
1416 return None
1417 spec_str += "-*" * (1 - spec_str.count("-"))
1418 items = spec_str.split("-")
1419 spec = {"vendor": items[0], "model": items[1]}
1420 return spec
1421 except Exception:
1422 return None
1423
1424
1425 def is_hpo_workflow(self):
1426 return self.check_split_rule("hpoWorkflow")
1427
1428
1429 def is_debug_mode(self):
1430 return self.check_split_rule("debugMode")
1431
1432
1433 def is_multi_step_exec(self):
1434 return self.check_split_rule("multiStepExec")
1435
1436
1437 def get_max_num_jobs(self):
1438 if self.splitRule is not None:
1439 tmpMatch = re.search(self.splitRuleToken["maxNumJobs"] + "=(\d+)", self.splitRule)
1440 if tmpMatch is not None:
1441 return int(tmpMatch.group(1))
1442 return None
1443
1444
1445 def get_total_num_jobs(self):
1446 if self.splitRule is not None:
1447 tmpMatch = re.search(self.splitRuleToken["totNumJobs"] + "=(\d+)", self.splitRule)
1448 if tmpMatch is not None:
1449 return int(tmpMatch.group(1))
1450 return None
1451
1452
1453 def use_only_tags_fc(self):
1454 return self.check_split_rule("onlyTagsForFC")
1455
1456
1457 def avoid_vp(self):
1458 return self.check_split_rule("avoidVP")
1459
1460
1461 def set_first_contents_feed(self, is_first):
1462 if is_first:
1463 self.setSplitRule("firstContentsFeed", self.FirstContentsFeed.TRUE.value)
1464 else:
1465 self.setSplitRule("firstContentsFeed", self.FirstContentsFeed.FALSE.value)
1466
1467
1468 def is_first_contents_feed(self):
1469 if self.splitRule is not None:
1470 tmpMatch = re.search(self.splitRuleToken["firstContentsFeed"] + "=(\d+)", self.splitRule)
1471 if tmpMatch is not None and tmpMatch.group(1) == self.FirstContentsFeed.TRUE.value:
1472 return True
1473 return False
1474
1475
1476 def is_work_segmented(self):
1477 return self.check_split_rule("segmentedWork")
1478
1479
1480 def no_looping_check(self):
1481 return self.check_split_rule("noLoopingCheck")
1482
1483
1484 def encode_job_params(self):
1485 return self.check_split_rule("encJobParams")
1486
1487
1488 def get_original_error_dialog(self):
1489 if not self.origErrorDialog:
1490 return ""
1491
1492 tmpStr = re.sub("<a href.+</a> : ", "", self.origErrorDialog)
1493 return tmpStr.split(". ")[-1]
1494
1495
1496 def use_secrets(self):
1497 return self.check_split_rule("useSecrets")
1498
1499
1500 def get_max_core_count(self):
1501 if self.splitRule is not None:
1502 tmpMatch = re.search(self.splitRuleToken["maxCoreCount"] + "=(\d+)", self.splitRule)
1503 if tmpMatch is not None:
1504 return int(tmpMatch.group(1))
1505 return None
1506
1507
1508 def push_status_changes(self):
1509 return push_status_changes(self.splitRule)
1510
1511
1512 def cloud_as_vo(self):
1513 return self.check_split_rule("cloudAsVO")
1514
1515
1516 def push_job(self):
1517 return self.check_split_rule("pushJob")
1518
1519
1520 def is_fine_grained_process(self):
1521 return self.check_split_rule("fineGrainedProc")
1522
1523
1524 def on_site_merging(self):
1525 return self.check_split_rule("onSiteMerging")
1526
1527
1528 def set_full_chain(self, mode):
1529 var = None
1530 if mode == "only":
1531 var = self.FullChain.Only
1532 elif mode == "require":
1533 var = self.FullChain.Require
1534 elif mode == "capable":
1535 var = self.FullChain.Capable
1536 if var:
1537 self.setSplitRule("fullChain", var)
1538
1539
1540 def get_full_chain(self):
1541 if self.splitRule:
1542 tmpMatch = re.search(self.splitRuleToken["fullChain"] + r"=(\d+)", self.splitRule)
1543 if tmpMatch:
1544 return tmpMatch.group(1)
1545 return None
1546
1547
1548 def check_full_chain_with_mode(self, mode):
1549 task_flag = self.get_full_chain()
1550 if mode == "only":
1551 if task_flag == self.FullChain.Only:
1552 return True
1553 elif mode == "require":
1554 if task_flag == self.FullChain.Require:
1555 return True
1556 elif mode == "capable":
1557 if task_flag == self.FullChain.Capable:
1558 return True
1559 return False
1560
1561
1562 def check_full_chain_with_nucleus(self, nucleus):
1563 if self.get_full_chain() and nucleus.get_bare_nucleus_mode():
1564 return True
1565 return False
1566
1567
1568 def get_ram_for_retry(self, current_ram):
1569 if not self.splitRule:
1570 return None
1571 tmpMatch = re.search(self.splitRuleToken["retryRamOffset"] + r"=(\d+)", self.splitRule)
1572 if not tmpMatch:
1573 return None
1574 offset = int(tmpMatch.group(1))
1575 tmpMatch = re.search(self.splitRuleToken["retryRamStep"] + r"=(\d+)", self.splitRule)
1576 if tmpMatch:
1577 step = int(tmpMatch.group(1))
1578 else:
1579 step = 0
1580 tmpMatch = re.search(self.splitRuleToken["retryRamMax"] + r"=(\d+)", self.splitRule)
1581 if tmpMatch:
1582 max_ram = int(tmpMatch.group(1))
1583 else:
1584 max_ram = None
1585 if not current_ram:
1586 return current_ram
1587 if current_ram < offset:
1588 if max_ram is None:
1589 return offset
1590 else:
1591 return min(offset, max_ram)
1592 if not step:
1593 return current_ram
1594 if max_ram is None:
1595 return offset + math.ceil((current_ram - offset) / step) * step
1596 else:
1597 return min(offset + math.ceil((current_ram - offset) / step) * step, max_ram)
1598
1599
1600 def get_num_events_per_input(self):
1601 if self.splitRule is not None:
1602 tmpMatch = re.search(self.splitRuleToken["nEventsPerInput"] + "=(\d+)", self.splitRule)
1603 if tmpMatch is not None:
1604 return int(tmpMatch.group(1))
1605 return None
1606
1607 def get_max_events_per_job(self):
1608 if self.splitRule is not None:
1609 tmpMatch = re.search(self.splitRuleToken["maxEventsPerJob"] + "=(\d+)", self.splitRule)
1610 if tmpMatch is not None:
1611 return int(tmpMatch.group(1))
1612 return None
1613
1614
1615 def set_order_input_by(self, mode):
1616 var = None
1617 if mode == "eventsAlignment":
1618 var = self.OrderInputBy.eventsAlignment
1619 if var:
1620 self.setSplitRule("orderInputBy", var)
1621
1622
1623 def order_input_by(self):
1624 if self.splitRule:
1625 tmpMatch = re.search(self.splitRuleToken["orderInputBy"] + r"=(\d+)", self.splitRule)
1626 if tmpMatch:
1627 if tmpMatch.group(1) == self.OrderInputBy.eventsAlignment:
1628 return "eventsAlignment"
1629 return None
1630
1631
1632 def is_intermediate_task(self):
1633 return self.check_split_rule("intermediateTask")
1634
1635
1636 def is_msg_driven(self):
1637 return is_msg_driven(self.splitRule)
1638
1639
1640 def allow_incomplete_input(self):
1641 return self.check_split_rule("allowIncompleteInDS")
1642
1643
1644 def is_workflow_holdup(self):
1645 return self.check_split_rule("workflowHoldup")
1646
1647
1648 def set_workflow_holdup(self, value: bool):
1649 if value:
1650 self.setSplitRule("workflowHoldup", "1")
1651 else:
1652 self.removeSplitRule(self.splitRuleToken["workflowHoldup"])
1653
1654
1655 def get_queued_time(self):
1656 """
1657 Get queued time in timestamp
1658 :return: queued time in timestamp. None if not set
1659 """
1660 if self.queuedTime is None:
1661 return None
1662 return self.queuedTime.timestamp()
1663
1664
1665
1666
1667
1668
1669 def check_split_rule_positive_int(key, split_rule):
1670 if not split_rule:
1671 return False
1672 tmpMatch = re.search(JediTaskSpec.splitRuleToken[key] + r"=(\d+)", split_rule)
1673 if not tmpMatch or int(tmpMatch.group(1)) <= 0:
1674 return False
1675 return True
1676
1677
1678
1679 def push_status_changes(split_rule):
1680 return check_split_rule_positive_int("pushStatusChanges", split_rule)
1681
1682
1683
1684 def is_msg_driven(split_rule):
1685 return check_split_rule_positive_int("messageDriven", split_rule)
1686
1687
1688
1689 def is_auto_pause_disabled(split_rule):
1690 return not check_split_rule_positive_int("noAutoPause", split_rule)