Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 import enum
0002 import json
0003 import math
0004 import re
0005 
0006 from pandaserver.taskbuffer import task_split_rules
0007 
0008 """
0009 task specification for JEDI
0010 
0011 """
0012 
0013 
0014 class JediTaskSpec(object):
0015     # attributes
0016     attributes = (
0017         "jediTaskID",
0018         "taskName",
0019         "status",
0020         "userName",
0021         "creationDate",
0022         "modificationTime",
0023         "startTime",
0024         "endTime",
0025         "frozenTime",
0026         "prodSourceLabel",
0027         "workingGroup",
0028         "vo",
0029         "coreCount",
0030         "taskType",
0031         "processingType",
0032         "taskPriority",
0033         "currentPriority",
0034         "architecture",
0035         "transUses",
0036         "transHome",
0037         "transPath",
0038         "lockedBy",
0039         "lockedTime",
0040         "termCondition",
0041         "splitRule",
0042         "walltime",
0043         "walltimeUnit",
0044         "outDiskCount",
0045         "outDiskUnit",
0046         "workDiskCount",
0047         "workDiskUnit",
0048         "ramCount",
0049         "ramUnit",
0050         "ioIntensity",
0051         "ioIntensityUnit",
0052         "workQueue_ID",
0053         "progress",
0054         "failureRate",
0055         "errorDialog",
0056         "reqID",
0057         "oldStatus",
0058         "cloud",
0059         "site",
0060         "countryGroup",
0061         "parent_tid",
0062         "eventService",
0063         "ticketID",
0064         "ticketSystemType",
0065         "stateChangeTime",
0066         "superStatus",
0067         "campaign",
0068         "mergeRamCount",
0069         "mergeRamUnit",
0070         "mergeWalltime",
0071         "mergeWalltimeUnit",
0072         "throttledTime",
0073         "numThrottled",
0074         "mergeCoreCount",
0075         "goal",
0076         "assessmentTime",
0077         "cpuTime",
0078         "cpuTimeUnit",
0079         "cpuEfficiency",
0080         "baseWalltime",
0081         "nucleus",
0082         "baseRamCount",
0083         "ttcRequested",
0084         "ttcPredicted",
0085         "ttcPredictionDate",
0086         "rescueTime",
0087         "requestType",
0088         "gshare",
0089         "resource_type",
0090         "useJumbo",
0091         "diskIO",
0092         "diskIOUnit",
0093         "memory_leak_core",
0094         "memory_leak_x2",
0095         "attemptNr",
0096         "container_name",
0097         "framework",
0098         "activatedTime",
0099         "queuedTime",
0100         "actionTime",
0101     )
0102     # attributes which have 0 by default
0103     _zeroAttrs = ()
0104     # attributes to force update
0105     _forceUpdateAttrs = ("lockedBy", "lockedTime")
0106     # mapping between sequence and attr
0107     _seqAttrMap = {}
0108     # limit length
0109     _limitLength = {"errorDialog": 510}
0110     # attribute length
0111     _attrLength = {"workingGroup": 32}
0112 
0113     # tokens for split rule
0114     splitRuleToken = task_split_rules.split_rule_dict
0115 
0116     # enum for preprocessing (derived from task_split_rules.enum_usePrePro)
0117     _inv = {v: k for k, v in task_split_rules.enum_usePrePro.items()}
0118     enum_toPreProcess = _inv["toPreProcess"]
0119     enum_preProcessed = _inv["preProcessed"]
0120     enum_postPProcess = _inv["postPProcess"]
0121     # enum for limited sites (single source of truth in task_split_rules)
0122     enum_limitedSites = task_split_rules.enum_limitedSites
0123     # enum for scout (derived from task_split_rules.enum_useScout)
0124     _inv = {v: k for k, v in task_split_rules.enum_useScout.items()}
0125     enum_noScout = _inv["no_use"]
0126     enum_useScout = _inv["will_update_requirements"]
0127     enum_postScout = _inv["updated_requirements"]
0128     # enum for dataset registration (derived from task_split_rules.enum_registerDatasets)
0129     _inv = {v: k for k, v in task_split_rules.enum_registerDatasets.items()}
0130     enum_toRegisterDS = _inv["registering"]
0131     enum_registeredDS = _inv["registered"]
0132     # enum for IP connectivity (single source of truth in task_split_rules)
0133     enum_ipConnectivity = task_split_rules.enum_ipConnectivity
0134     # enum for IP stack (single source of truth in task_split_rules)
0135     enum_ipStack = task_split_rules.enum_ipStack
0136     # enum for alternative stage-out (single source of truth in task_split_rules)
0137     enum_altStageOut = task_split_rules.enum_altStageOut
0138     # enum for local direct access (single source of truth in task_split_rules)
0139     enum_inputLAN = task_split_rules.enum_inputLAN
0140     # world cloud name
0141     worldCloudName = "WORLD"
0142 
0143     # enum for contents feeder
0144     class FirstContentsFeed(enum.Enum):
0145         TRUE = "1"
0146         FALSE = "0"
0147 
0148     # enum for useJumbo
0149     enum_useJumbo = {"waiting": "W", "running": "R", "pending": "P", "lack": "L", "disabled": "D"}
0150 
0151     # enum for input prestaging (inverted from task_split_rules.enum_inputPreStaging)
0152     enum_inputPreStaging = {v: k for k, v in task_split_rules.enum_inputPreStaging.items()}
0153 
0154     # enum for full chain
0155     class FullChain(str, enum.Enum):
0156         Only = "1"
0157         Require = "2"
0158         Capable = "3"
0159 
0160     # enum for order input by
0161     class OrderInputBy(str, enum.Enum):
0162         eventsAlignment = "1"
0163 
0164     # constructor
0165     def __init__(self):
0166         # install attributes
0167         for attr in self.attributes:
0168             if attr in self._zeroAttrs:
0169                 object.__setattr__(self, attr, 0)
0170             else:
0171                 object.__setattr__(self, attr, None)
0172         # map of changed attributes
0173         object.__setattr__(self, "_changedAttrs", {})
0174         # template to generate job parameters
0175         object.__setattr__(self, "jobParamsTemplate", "")
0176         # associated datasets
0177         object.__setattr__(self, "datasetSpecList", [])
0178         # original error dialog
0179         object.__setattr__(self, "origErrorDialog", None)
0180         # original user name
0181         object.__setattr__(self, "origUserName", None)
0182 
0183     # override __setattr__ to collect the changed attributes
0184     def __setattr__(self, name, value):
0185         oldVal = getattr(self, name)
0186         if name in self._limitLength and value is not None:
0187             # keep original dialog
0188             if name == "errorDialog":
0189                 object.__setattr__(self, "origErrorDialog", value)
0190             value = value[: self._limitLength[name]]
0191         object.__setattr__(self, name, value)
0192         newVal = getattr(self, name)
0193         # collect changed attributes
0194         if oldVal != newVal or name in self._forceUpdateAttrs:
0195             self._changedAttrs[name] = value
0196 
0197     # copy old attributes
0198     def copyAttributes(self, oldTaskSpec):
0199         for attr in self.attributes + ("jobParamsTemplate",):
0200             if "Time" in attr:
0201                 continue
0202             if "Date" in attr:
0203                 continue
0204             if attr in ["progress", "failureRate", "errorDialog", "status", "oldStatus", "lockedBy"]:
0205                 continue
0206             self.__setattr__(attr, getattr(oldTaskSpec, attr))
0207 
0208     # reset changed attribute list
0209     def resetChangedList(self):
0210         object.__setattr__(self, "_changedAttrs", {})
0211 
0212     # reset changed attribute
0213     def resetChangedAttr(self, name):
0214         try:
0215             del self._changedAttrs[name]
0216         except Exception:
0217             pass
0218 
0219     # reserve old attributes
0220     def reserve_old_attributes(self):
0221         for attName in ["ramCount", "walltime", "cpuTime", "startTime", "cpuTimeUnit", "outDiskCount", "workDiskCount", "ioIntensity", "diskIO"]:
0222             self.resetChangedAttr(attName)
0223 
0224     # force update
0225     def forceUpdate(self, name):
0226         if name in self.attributes:
0227             self._changedAttrs[name] = getattr(self, name)
0228 
0229     # return map of values
0230     def valuesMap(self, useSeq=False, onlyChanged=False):
0231         ret = {}
0232         for attr in self.attributes:
0233             # use sequence
0234             if useSeq and attr in self._seqAttrMap:
0235                 continue
0236             # only changed attributes
0237             if onlyChanged:
0238                 if attr not in self._changedAttrs:
0239                     continue
0240             val = getattr(self, attr)
0241             if val is None:
0242                 if attr in self._zeroAttrs:
0243                     val = 0
0244                 else:
0245                     val = None
0246             # truncate too long values
0247             if attr in self._limitLength:
0248                 if val is not None:
0249                     val = val[: self._limitLength[attr]]
0250             ret[f":{attr}"] = val
0251         return ret
0252 
0253     # pack tuple into TaskSpec
0254     def pack(self, values):
0255         for i in range(len(self.attributes)):
0256             attr = self.attributes[i]
0257             val = values[i]
0258             object.__setattr__(self, attr, val)
0259 
0260     # return column names for INSERT
0261     def columnNames(cls, prefix=None):
0262         ret = ""
0263         for attr in cls.attributes:
0264             if prefix is not None:
0265                 ret += f"{prefix}."
0266             ret += f"{attr},"
0267         ret = ret[:-1]
0268         return ret
0269 
0270     columnNames = classmethod(columnNames)
0271 
0272     # return expression of bind variables for INSERT
0273     def bindValuesExpression(cls, useSeq=True):
0274         ret = "VALUES("
0275         for attr in cls.attributes:
0276             if useSeq and attr in cls._seqAttrMap:
0277                 ret += f"{cls._seqAttrMap[attr]},"
0278             else:
0279                 ret += f":{attr},"
0280         ret = ret[:-1]
0281         ret += ")"
0282         return ret
0283 
0284     bindValuesExpression = classmethod(bindValuesExpression)
0285 
0286     # return an expression of bind variables for UPDATE to update only changed attributes
0287     def bindUpdateChangesExpression(self):
0288         ret = ""
0289         for attr in self.attributes:
0290             if attr in self._changedAttrs:
0291                 ret += f"{attr}=:{attr},"
0292         ret = ret[:-1]
0293         ret += " "
0294         return ret
0295 
0296     # check split rule
0297     def check_split_rule(self, key):
0298         if self.splitRule is not None:
0299             if re.search(self.splitRuleToken[key] + r"=(\d+)", self.splitRule):
0300                 return True
0301         return False
0302 
0303     # get the max size per job if defined
0304     def getMaxSizePerJob(self):
0305         if self.splitRule is not None:
0306             tmpMatch = re.search(self.splitRuleToken["nGBPerJob"] + "=(\d+)", self.splitRule)
0307             if tmpMatch is not None:
0308                 nGBPerJob = int(tmpMatch.group(1)) * 1024 * 1024 * 1024
0309                 return nGBPerJob
0310         return None
0311 
0312     # remove nGBPerJob
0313     def removeMaxSizePerJob(self):
0314         self.removeSplitRule(self.splitRuleToken["nGBPerJob"])
0315 
0316     # get the max size per merge job if defined
0317     def getMaxSizePerMergeJob(self):
0318         if self.splitRule is not None:
0319             tmpMatch = re.search(self.splitRuleToken["nGBPerMergeJob"] + "=(\d+)", self.splitRule)
0320             if tmpMatch is not None:
0321                 nGBPerJob = int(tmpMatch.group(1)) * 1024 * 1024 * 1024
0322                 return nGBPerJob
0323         return None
0324 
0325     # get the maxnumber of files per job if defined
0326     def getMaxNumFilesPerJob(self):
0327         if self.splitRule is not None:
0328             tmpMatch = re.search(self.splitRuleToken["nMaxFilesPerJob"] + "=(\d+)", self.splitRule)
0329             if tmpMatch is not None:
0330                 return int(tmpMatch.group(1))
0331         return None
0332 
0333     # set MaxNumFilesPerJob
0334     def setMaxNumFilesPerJob(self, value):
0335         self.setSplitRule("nMaxFilesPerJob", value)
0336 
0337     # get the maxnumber of files per merge job if defined
0338     def getMaxNumFilesPerMergeJob(self):
0339         if self.splitRule is not None:
0340             tmpMatch = re.search(self.splitRuleToken["nMaxFilesPerMergeJob"] + "=(\d+)", self.splitRule)
0341             if tmpMatch is not None:
0342                 return int(tmpMatch.group(1))
0343         return 50
0344 
0345     # get the number of events per merge job if defined
0346     def getNumEventsPerMergeJob(self):
0347         if self.splitRule is not None:
0348             tmpMatch = re.search(self.splitRuleToken["nEventsPerMergeJob"] + "=(\d+)", self.splitRule)
0349             if tmpMatch is not None:
0350                 return int(tmpMatch.group(1))
0351         return None
0352 
0353     # check if using jumbo
0354     def usingJumboJobs(self):
0355         if self.useJumbo in self.enum_useJumbo.values() and self.useJumbo != self.enum_useJumbo["disabled"]:
0356             return True
0357         return False
0358 
0359     # get the number of jumbo jobs if defined
0360     def getNumJumboJobs(self):
0361         if self.usingJumboJobs() and self.splitRule is not None:
0362             tmpMatch = re.search(self.splitRuleToken["nJumboJobs"] + "=(\d+)", self.splitRule)
0363             if tmpMatch is not None:
0364                 return int(tmpMatch.group(1))
0365         return None
0366 
0367     # get the max number of jumbo jobs per site if defined
0368     def getMaxJumboPerSite(self):
0369         if self.usingJumboJobs() and self.splitRule is not None:
0370             tmpMatch = re.search(self.splitRuleToken["maxJumboPerSite"] + "=(\d+)", self.splitRule)
0371             if tmpMatch is not None:
0372                 return int(tmpMatch.group(1))
0373         return 1
0374 
0375     # get the number of sites per job
0376     def getNumSitesPerJob(self):
0377         if not self.useEventService():
0378             return 1
0379         if self.splitRule is not None:
0380             tmpMatch = re.search(self.splitRuleToken["nSitesPerJob"] + "=(\d+)", self.splitRule)
0381             if tmpMatch is not None:
0382                 return int(tmpMatch.group(1))
0383         return 1
0384 
0385     # get the number of files per job if defined
0386     def getNumFilesPerJob(self):
0387         if self.splitRule is not None:
0388             tmpMatch = re.search(self.splitRuleToken["nFilesPerJob"] + "=(\d+)", self.splitRule)
0389             if tmpMatch is not None:
0390                 n = int(tmpMatch.group(1))
0391                 if self.dynamicNumEvents():
0392                     inn = self.get_num_events_per_input()
0393                     dyn = self.get_min_granularity()
0394                     if inn and dyn and inn > dyn:
0395                         n *= inn // dyn
0396                 return n
0397         return None
0398 
0399     # remove nFilesPerJob
0400     def removeNumFilesPerJob(self):
0401         self.removeSplitRule(self.splitRuleToken["nFilesPerJob"])
0402 
0403     # get the number of files per merge job if defined
0404     def getNumFilesPerMergeJob(self):
0405         if self.splitRule is not None:
0406             tmpMatch = re.search(self.splitRuleToken["nFilesPerMergeJob"] + "=(\d+)", self.splitRule)
0407             if tmpMatch is not None:
0408                 return int(tmpMatch.group(1))
0409         return None
0410 
0411     # get the number of events per job if defined
0412     def getNumEventsPerJob(self):
0413         if self.splitRule is not None:
0414             tmpMatch = re.search(self.splitRuleToken["nEventsPerJob"] + "=(\d+)", self.splitRule)
0415             if tmpMatch is not None:
0416                 return int(tmpMatch.group(1))
0417         return None
0418 
0419     # get offset for random seed
0420     def getRndmSeedOffset(self):
0421         if self.splitRule is not None:
0422             tmpMatch = re.search(self.splitRuleToken["randomSeed"] + "=(\d+)", self.splitRule)
0423             if tmpMatch is not None:
0424                 return int(tmpMatch.group(1))
0425         return 0
0426 
0427     # get offset for first event
0428     def getFirstEventOffset(self):
0429         if self.splitRule is not None:
0430             tmpMatch = re.search(self.splitRuleToken["firstEvent"] + "=(\d+)", self.splitRule)
0431             if tmpMatch is not None:
0432                 return int(tmpMatch.group(1))
0433         return 0
0434 
0435     # grouping with boundaryID
0436     def useGroupWithBoundaryID(self):
0437         if self.splitRule is not None:
0438             tmpMatch = re.search(self.splitRuleToken["groupBoundaryID"] + "=(\d+)", self.splitRule)
0439             if tmpMatch is not None:
0440                 gbID = int(tmpMatch.group(1))
0441                 # 1 : input - can split,    output - free
0442                 # 2 : input - can split,    output - mapped with provenanceID
0443                 # 3 : input - cannot split, output - free
0444                 # 4 : input - cannot split, output - mapped with provenanceID
0445                 #
0446                 # * rule for master
0447                 # 1 : can split. one boundayID per sub chunk
0448                 # 2 : cannot split. one boundayID per sub chunk
0449                 # 3 : cannot split. multiple boundayIDs per sub chunk
0450                 #
0451                 # * rule for secodary
0452                 # 1 : must have same boundayID. cannot split
0453                 #
0454                 retMap = {}
0455                 if gbID in [1, 2]:
0456                     retMap["inSplit"] = 1
0457                 else:
0458                     retMap["inSplit"] = 2
0459                 if gbID in [1, 3]:
0460                     retMap["outMap"] = False
0461                 else:
0462                     retMap["outMap"] = True
0463                 retMap["secSplit"] = None
0464                 return retMap
0465         return None
0466 
0467     # use build
0468     def useBuild(self):
0469         return self.check_split_rule("useBuild")
0470 
0471     # use sjob cloning
0472     def useJobCloning(self):
0473         return self.check_split_rule("useJobCloning")
0474 
0475     # get job cloning type
0476     def getJobCloningType(self):
0477         if self.splitRule is not None:
0478             tmpMatch = re.search(self.splitRuleToken["useJobCloning"] + "=(\d+)", self.splitRule)
0479             if tmpMatch is not None:
0480                 return tmpMatch.group(1)
0481         return ""
0482 
0483     # reuse secondary on demand
0484     def reuseSecOnDemand(self):
0485         return self.check_split_rule("reuseSecOnDemand")
0486 
0487     # not wait for completion of parent
0488     def noWaitParent(self):
0489         return self.check_split_rule("noWaitParent")
0490 
0491     # check splitRule if not wait for completion of parent
0492     def noWaitParentSL(cls, splitRule):
0493         if splitRule is not None:
0494             tmpMatch = re.search(cls.splitRuleToken["noWaitParent"] + "=(\d+)", splitRule)
0495             if tmpMatch is not None:
0496                 return True
0497         return False
0498 
0499     noWaitParentSL = classmethod(noWaitParentSL)
0500 
0501     # use only limited sites
0502     def useLimitedSites(self):
0503         return self.check_split_rule("limitedSites")
0504 
0505     # set limited sites
0506     def setLimitedSites(self, policy):
0507         tag = None
0508         for tmpIdx, tmpPolicy in self.enum_limitedSites.items():
0509             if policy == tmpPolicy:
0510                 tag = tmpIdx
0511                 break
0512         # not found
0513         if tag is None:
0514             return
0515         # set
0516         if self.splitRule is None:
0517             # new
0518             self.splitRule = self.splitRuleToken["limitedSites"] + "=" + tag
0519         else:
0520             tmpMatch = re.search(self.splitRuleToken["limitedSites"] + "=(\d+)", self.splitRule)
0521             if tmpMatch is None:
0522                 # append
0523                 self.splitRule += "," + self.splitRuleToken["limitedSites"] + "=" + tag
0524             else:
0525                 # replace
0526                 self.splitRule = re.sub(self.splitRuleToken["limitedSites"] + "=(\d+)", self.splitRuleToken["limitedSites"] + "=" + tag, self.splitRule)
0527 
0528     # use local IO
0529     def useLocalIO(self):
0530         if self.splitRule is not None:
0531             tmpMatch = re.search(self.splitRuleToken["useLocalIO"] + "=(\d+)", self.splitRule)
0532             if tmpMatch is not None and int(tmpMatch.group(1)):
0533                 return True
0534         return False
0535 
0536     # use Event Service
0537     def useEventService(self, siteSpec=None):
0538         if self.eventService in [1, 2]:
0539             # check site if ES is disabled
0540             if self.switchEStoNormal() and siteSpec is not None and siteSpec.getJobSeed() in ["all"]:
0541                 return False
0542             return True
0543         return False
0544 
0545     # get the number of events per worker for Event Service
0546     def getNumEventsPerWorker(self):
0547         if self.splitRule is not None:
0548             tmpMatch = re.search(self.splitRuleToken["nEventsPerWorker"] + "=(\d+)", self.splitRule)
0549             if tmpMatch is not None:
0550                 return int(tmpMatch.group(1))
0551         return None
0552 
0553     # get the number of event service consumers
0554     def getNumEventServiceConsumer(self):
0555         if not self.useEventService():
0556             return None
0557         if self.splitRule is not None:
0558             tmpMatch = re.search(self.splitRuleToken["nEsConsumers"] + "=(\d+)", self.splitRule)
0559             if tmpMatch is not None:
0560                 return int(tmpMatch.group(1))
0561         return None
0562 
0563     # disable automatic retry
0564     def disableAutoRetry(self):
0565         return self.check_split_rule("disableAutoRetry")
0566 
0567     # disable reassign
0568     def disableReassign(self):
0569         return self.check_split_rule("disableReassign")
0570 
0571     # allow empty input
0572     def allowEmptyInput(self):
0573         return self.check_split_rule("allowEmptyInput")
0574 
0575     # use PFN list
0576     def useListPFN(self):
0577         return self.check_split_rule("pfnList")
0578 
0579     # set preprocessing
0580     def setPrePro(self):
0581         if self.splitRule is None:
0582             # new
0583             self.splitRule = self.splitRuleToken["usePrePro"] + "=" + self.enum_toPreProcess
0584         else:
0585             # append
0586             self.splitRule += "," + self.splitRuleToken["usePrePro"] + "=" + self.enum_toPreProcess
0587 
0588     # use preprocessing
0589     def usePrePro(self):
0590         if self.splitRule is not None:
0591             tmpMatch = re.search(self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRule)
0592             if tmpMatch is not None and tmpMatch.group(1) == self.enum_toPreProcess:
0593                 return True
0594         return False
0595 
0596     # set preprocessed
0597     def setPreProcessed(self):
0598         if self.splitRule is None:
0599             # new
0600             self.splitRule = self.splitRuleToken["usePrePro"] + "=" + self.enum_preProcessed
0601         else:
0602             tmpMatch = re.search(self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRule)
0603             if tmpMatch is None:
0604                 # append
0605                 self.splitRule += "," + self.splitRuleToken["usePrePro"] + "=" + self.enum_preProcessed
0606             else:
0607                 # replace
0608                 self.splitRule = re.sub(
0609                     self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRuleToken["usePrePro"] + "=" + self.enum_preProcessed, self.splitRule
0610                 )
0611         return
0612 
0613     # check preprocessed
0614     def checkPreProcessed(self):
0615         if self.splitRule is not None:
0616             tmpMatch = re.search(self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRule)
0617             if tmpMatch is not None and tmpMatch.group(1) == self.enum_preProcessed:
0618                 return True
0619         return False
0620 
0621     # set post preprocess
0622     def setPostPreProcess(self):
0623         if self.splitRule is None:
0624             # new
0625             self.splitRule = self.splitRuleToken["usePrePro"] + "=" + self.enum_postPProcess
0626         else:
0627             tmpMatch = re.search(self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRule)
0628             if tmpMatch is None:
0629                 # append
0630                 self.splitRule += "," + self.splitRuleToken["usePrePro"] + "=" + self.enum_postPProcess
0631             else:
0632                 # replace
0633                 self.splitRule = re.sub(
0634                     self.splitRuleToken["usePrePro"] + "=(\d+)", self.splitRuleToken["usePrePro"] + "=" + self.enum_postPProcess, self.splitRule
0635                 )
0636         return
0637 
0638     # instantiate template datasets
0639     def instantiateTmpl(self):
0640         return self.check_split_rule("instantiateTmpl")
0641 
0642     # instantiate template datasets at site
0643     def instantiateTmplSite(self):
0644         return self.check_split_rule("instantiateTmplSite")
0645 
0646     # merge output
0647     def mergeOutput(self):
0648         return self.check_split_rule("mergeOutput")
0649 
0650     # use random seed
0651     def useRandomSeed(self):
0652         return self.check_split_rule("randomSeed")
0653 
0654     # get the size of workDisk in bytes
0655     def getWorkDiskSize(self):
0656         safetyMargin = 300 * 1024 * 1024
0657         tmpSize = self.workDiskCount
0658         if tmpSize is None:
0659             return 0
0660         if self.workDiskUnit == "GB":
0661             tmpSize = tmpSize * 1024 * 1024 * 1024
0662         elif self.workDiskUnit == "MB":
0663             tmpSize = tmpSize * 1024 * 1024
0664         elif self.workDiskUnit == "kB":
0665             tmpSize = tmpSize * 1024
0666         tmpSize += safetyMargin
0667         return tmpSize
0668 
0669     # get the size of outDisk in bytes
0670     def getOutDiskSize(self):
0671         tmpSize = self.outDiskCount
0672         if tmpSize is None or tmpSize < 0:
0673             return 0
0674         if self.outDiskUnit is not None:
0675             if self.outDiskUnit.startswith("GB"):
0676                 tmpSize = tmpSize * 1024 * 1024 * 1024
0677             elif self.outDiskUnit.startswith("MB"):
0678                 tmpSize = tmpSize * 1024 * 1024
0679             elif self.outDiskUnit.startswith("kB"):
0680                 tmpSize = tmpSize * 1024
0681         return tmpSize
0682 
0683     # output scales with the number of events
0684     def outputScaleWithEvents(self):
0685         if self.outDiskUnit is not None and "PerEvent" in self.outDiskUnit:
0686             return True
0687         return False
0688 
0689     # return list of status to update contents
0690     def statusToUpdateContents(cls):
0691         return ["defined"]
0692 
0693     statusToUpdateContents = classmethod(statusToUpdateContents)
0694 
0695     # set task status on hold
0696     def setOnHold(self):
0697         # change status
0698         if self.status in ["ready", "running", "merging", "scouting", "defined", "topreprocess", "preprocessing", "registered", "prepared", "rerefine"]:
0699             self.oldStatus = self.status
0700             self.status = "pending"
0701 
0702     # return list of status to reject external changes
0703     def statusToRejectExtChange(cls):
0704         return ["finished", "done", "prepared", "broken", "tobroken", "aborted", "toabort", "aborting", "failed", "passed"]
0705 
0706     statusToRejectExtChange = classmethod(statusToRejectExtChange)
0707 
0708     # return list of status for retry
0709     def statusToRetry(cls):
0710         return ["finished", "failed", "aborted", "exhausted"]
0711 
0712     statusToRetry = classmethod(statusToRetry)
0713 
0714     # return list of status for incexec
0715     def statusToIncexec(cls):
0716         return ["done"] + cls.statusToRetry()
0717 
0718     statusToIncexec = classmethod(statusToIncexec)
0719 
0720     # return list of status for reassign
0721     def statusToReassign(cls):
0722         return ["registered", "defined", "ready", "running", "scouting", "scouted", "pending", "assigning", "exhausted"]
0723 
0724     statusToReassign = classmethod(statusToReassign)
0725 
0726     # return list of status for Job Generator
0727     def statusForJobGenerator(cls):
0728         return ["ready", "running", "scouting", "topreprocess", "preprocessing"]
0729 
0730     statusForJobGenerator = classmethod(statusForJobGenerator)
0731 
0732     # return list of status to not pause
0733     def statusNotToPause(cls):
0734         return ["finished", "failed", "done", "aborted", "broken", "paused"]
0735 
0736     statusNotToPause = classmethod(statusNotToPause)
0737 
0738     # return mapping of command and status
0739     def commandStatusMap(cls):
0740         return {
0741             "kill": {"doing": "aborting", "done": "toabort"},
0742             "finish": {"doing": "finishing", "done": "passed"},
0743             "retry": {"doing": "toretry", "done": "ready"},
0744             "incexec": {"doing": "toincexec", "done": "rerefine"},
0745             "reassign": {"doing": "toreassign", "done": "reassigning"},
0746             "pause": {"doing": "paused", "done": "dummy"},
0747             "resume": {"doing": "dummy", "done": "dummy"},
0748             "avalanche": {"doing": "dummy", "done": "dummy"},
0749             "release": {"doing": "dummy", "done": "dummy"},
0750         }
0751 
0752     commandStatusMap = classmethod(commandStatusMap)
0753 
0754     # qualifiers for retry command
0755     def get_retry_command_qualifiers(
0756         cls,
0757         no_child_retry: bool = False,
0758         discard_events: bool = False,
0759         disable_staging_mode: bool = False,
0760         keep_gshare_priority: bool = False,
0761         ignore_hard_exhausted: bool = False,
0762     ) -> list:
0763         """
0764         Get the list of qualifiers for the retry command.
0765         :param no_child_retry: If True, retry will not be attempted on child tasks.
0766         :param discard_events: If True, events will be discarded.
0767         :param disable_staging_mode: If True, staging mode will be disabled.
0768         :param keep_gshare_priority: If True, current gshare and priority will be kept.
0769         :param ignore_hard_exhausted: If True, the limits for hard exhausted will be ignored.
0770         :return: A list of qualifiers.
0771         """
0772         qualifiers = []
0773         if no_child_retry:
0774             qualifiers.append("sole")
0775         if discard_events:
0776             qualifiers.append("discard")
0777         if disable_staging_mode:
0778             qualifiers.append("staged")
0779         if keep_gshare_priority:
0780             qualifiers.append("keep")
0781         if ignore_hard_exhausted:
0782             qualifiers.append("transcend")
0783         return qualifiers
0784 
0785     get_retry_command_qualifiers = classmethod(get_retry_command_qualifiers)
0786 
0787     # set error dialog
0788     def setErrDiag(self, diag, append=False, prepend=False):
0789         # check if message can be encoded with UTF-8
0790         if diag:
0791             try:
0792                 diag.encode()
0793             except UnicodeEncodeError:
0794                 # remove non-ascii chars
0795                 diag = re.sub(r"[^\x00-\x7F]+", "<non-ASCII char>", diag)
0796         # set error dialog
0797         if append is True and self.errorDialog is not None:
0798             if not prepend:
0799                 self.errorDialog = f"{self.errorDialog} {diag}"
0800             else:
0801                 self.errorDialog = f"{diag} {self.errorDialog}"
0802         elif append is None:
0803             # keep old log
0804             if self.errorDialog is None:
0805                 self.errorDialog = diag
0806         else:
0807             self.errorDialog = diag
0808 
0809     # use loadXML
0810     def useLoadXML(self):
0811         return self.check_split_rule("loadXML")
0812 
0813     # make VOMS FQANs
0814     def makeFQANs(self):
0815         # no working group
0816         if self.workingGroup is not None:
0817             fqan = f"/{self.vo}/{self.workingGroup}/Role=production"
0818         else:
0819             if self.vo is not None:
0820                 fqan = f"/{self.vo}/Role=NULL"
0821             else:
0822                 return []
0823         # return
0824         return [fqan]
0825 
0826     # set split rule
0827     def setSplitRule(self, ruleName, ruleValue):
0828         if self.splitRule is None:
0829             # new
0830             self.splitRule = self.splitRuleToken[ruleName] + "=" + ruleValue
0831         else:
0832             tmpMatch = re.search(self.splitRuleToken[ruleName] + "=(\d+)", self.splitRule)
0833             if tmpMatch is None:
0834                 # append
0835                 self.splitRule += "," + self.splitRuleToken[ruleName] + "=" + ruleValue
0836             else:
0837                 # replace
0838                 self.splitRule = re.sub(self.splitRuleToken[ruleName] + "=(\d+)", self.splitRuleToken[ruleName] + "=" + ruleValue, self.splitRule)
0839 
0840     # remove split rule
0841     def removeSplitRule(self, ruleName):
0842         if self.splitRule is not None:
0843             items = self.splitRule.split(",")
0844             newItems = []
0845             for item in items:
0846                 # remove rule
0847                 tmpRuleName = item.split("=")[0]
0848                 if ruleName == tmpRuleName:
0849                     continue
0850                 newItems.append(item)
0851             self.splitRule = ",".join(newItems)
0852 
0853     # set to use scout
0854     def setUseScout(self, useFlag):
0855         if useFlag:
0856             self.setSplitRule("useScout", self.enum_useScout)
0857         else:
0858             self.setSplitRule("useScout", self.enum_noScout)
0859 
0860     # set post scout
0861     def setPostScout(self):
0862         self.setSplitRule("useScout", self.enum_postScout)
0863 
0864     # use scout
0865     def useScout(self, splitRule=None):
0866         if splitRule is None:
0867             splitRule = self.splitRule
0868         if splitRule is not None:
0869             tmpMatch = re.search(self.splitRuleToken["useScout"] + "=(\d+)", splitRule)
0870             if tmpMatch is not None and tmpMatch.group(1) == self.enum_useScout:
0871                 return True
0872         return False
0873 
0874     # use exhausted
0875     def useExhausted(self):
0876         return self.check_split_rule("useExhausted")
0877 
0878     # use real number of events
0879     def useRealNumEvents(self):
0880         return self.check_split_rule("useRealNumEvents")
0881 
0882     # use input LFN as source for output LFN
0883     def useFileAsSourceLFN(self):
0884         return self.check_split_rule("useFileAsSourceLFN")
0885 
0886     # post scout
0887     def isPostScout(self):
0888         if self.splitRule is not None:
0889             tmpMatch = re.search(self.splitRuleToken["useScout"] + "=(\d+)", self.splitRule)
0890             if tmpMatch is not None and tmpMatch.group(1) == self.enum_postScout:
0891                 return True
0892         return False
0893 
0894     # wait until input shows up
0895     def waitInput(self):
0896         return self.check_split_rule("waitInput")
0897 
0898     # input prestaging
0899     def inputPreStaging(self):
0900         if self.splitRule is not None:
0901             tmpMatch = re.search(self.splitRuleToken["inputPreStaging"] + "=" + self.enum_inputPreStaging["use"], self.splitRule)
0902             if tmpMatch is not None:
0903                 return True
0904         return False
0905 
0906     # set DDM backend
0907     def setDdmBackEnd(self, backEnd):
0908         if self.splitRule is None:
0909             # new
0910             self.splitRule = self.splitRuleToken["ddmBackEnd"] + "=" + backEnd
0911         else:
0912             tmpMatch = re.search(self.splitRuleToken["ddmBackEnd"] + "=([^,$]+)", self.splitRule)
0913             if tmpMatch is None:
0914                 # append
0915                 self.splitRule += "," + self.splitRuleToken["ddmBackEnd"] + "=" + backEnd
0916             else:
0917                 # replace
0918                 self.splitRule = re.sub(self.splitRuleToken["ddmBackEnd"] + "=([^,$]+)", self.splitRuleToken["ddmBackEnd"] + "=" + backEnd, self.splitRule)
0919 
0920     # get DDM backend
0921     def getDdmBackEnd(self):
0922         if self.splitRule is not None:
0923             tmpMatch = re.search(self.splitRuleToken["ddmBackEnd"] + "=([^,$]+)", self.splitRule)
0924             if tmpMatch is not None:
0925                 return tmpMatch.group(1)
0926         return None
0927 
0928     # get field number to add middle name to LFN
0929     def getFieldNumToLFN(self):
0930         try:
0931             if self.splitRule is not None:
0932                 tmpMatch = re.search(self.splitRuleToken["addNthFieldToLFN"] + "=([,\d]+)", self.splitRule)
0933                 if tmpMatch is not None:
0934                     tmpList = tmpMatch.group(1).split(",")
0935                     try:
0936                         tmpList.remove("")
0937                     except Exception:
0938                         pass
0939                     return list(map(int, tmpList))
0940         except Exception:
0941             pass
0942         return None
0943 
0944     # get required success rate for scout jobs
0945     def getScoutSuccessRate(self):
0946         if self.splitRule is not None:
0947             tmpMatch = re.search(self.splitRuleToken["scoutSuccessRate"] + "=(\d+)", self.splitRule)
0948             if tmpMatch is not None:
0949                 return int(tmpMatch.group(1))
0950         return None
0951 
0952     # get T1 weight
0953     def getT1Weight(self):
0954         if self.splitRule is not None:
0955             tmpMatch = re.search(self.splitRuleToken["t1Weight"] + "=(-*\d+)", self.splitRule)
0956             if tmpMatch is not None:
0957                 return int(tmpMatch.group(1))
0958         return 0
0959 
0960     # respect Lumiblock boundaries
0961     def respectLumiblock(self):
0962         return self.check_split_rule("respectLB")
0963 
0964     # release files per Lumiblock
0965     def releasePerLumiblock(self):
0966         return self.check_split_rule("releasePerLB")
0967 
0968     # order by Lumiblock numbers
0969     def orderByLB(self):
0970         return self.check_split_rule("orderByLB")
0971 
0972     # respect split rule
0973     def respectSplitRule(self):
0974         return self.check_split_rule("respectSplitRule")
0975 
0976     # allow partial finish
0977     def allowPartialFinish(self):
0978         return self.check_split_rule("allowPartialFinish")
0979 
0980     # check if datasets should be registered
0981     def toRegisterDatasets(self):
0982         if self.splitRule is not None:
0983             tmpMatch = re.search(self.splitRuleToken["registerDatasets"] + "=(\d+)", self.splitRule)
0984             if tmpMatch is not None and tmpMatch.group(1) == self.enum_toRegisterDS:
0985                 return True
0986         return False
0987 
0988     # datasets were registered
0989     def registeredDatasets(self):
0990         self.setSplitRule("registerDatasets", self.enum_registeredDS)
0991 
0992     # set datasets to be registered
0993     def setToRegisterDatasets(self):
0994         self.setSplitRule("registerDatasets", self.enum_toRegisterDS)
0995 
0996     # get the max number of attempts for ES events
0997     def getMaxAttemptES(self):
0998         if self.splitRule is not None:
0999             tmpMatch = re.search(self.splitRuleToken["maxAttemptES"] + "=(\d+)", self.splitRule)
1000             if tmpMatch is not None:
1001                 return int(tmpMatch.group(1))
1002         return None
1003 
1004     # get the max number of attempts for ES jobs
1005     def getMaxAttemptEsJob(self):
1006         if self.splitRule is not None:
1007             tmpMatch = re.search(self.splitRuleToken["maxAttemptEsJob"] + "=(\d+)", self.splitRule)
1008             if tmpMatch is not None:
1009                 return int(tmpMatch.group(1))
1010         return self.getMaxAttemptES()
1011 
1012     # check attribute length
1013     def checkAttrLength(self):
1014         for attrName, attrLength in self._attrLength.items():
1015             attrVal = getattr(self, attrName)
1016             if attrVal is None:
1017                 continue
1018             if len(attrVal) > attrLength:
1019                 setattr(self, attrName, None)
1020                 self.errorDialog = f"{attrName} is too long (actual: {len(attrVal)}, maximum: {attrLength})"
1021                 return False
1022         return True
1023 
1024     # set IP connectivity and stack
1025     def setIpConnectivity(self, value):
1026         if not value:
1027             return
1028         values = value.split("#")
1029         if not values:
1030             return
1031         ipConnectivity = values[0]
1032         if ipConnectivity in self.enum_ipConnectivity.values():
1033             for tmpKey, tmpVal in self.enum_ipConnectivity.items():
1034                 if ipConnectivity == tmpVal:
1035                     self.setSplitRule("ipConnectivity", tmpKey)
1036                     break
1037         if len(values) > 1:
1038             ipStack = values[1]
1039             if ipStack in self.enum_ipStack.values():
1040                 for tmpKey, tmpVal in self.enum_ipStack.items():
1041                     if ipStack == tmpVal:
1042                         self.setSplitRule("ipStack", tmpKey)
1043                         break
1044 
1045     # get IP connectivity
1046     def getIpConnectivity(self):
1047         if self.splitRule is not None:
1048             tmpMatch = re.search(self.splitRuleToken["ipConnectivity"] + "=(\d+)", self.splitRule)
1049             if tmpMatch is not None:
1050                 return self.enum_ipConnectivity[tmpMatch.group(1)]
1051         return None
1052 
1053     # get IP connectivity
1054     def getIpStack(self):
1055         if self.splitRule is not None:
1056             tmpMatch = re.search(self.splitRuleToken["ipStack"] + "=(\d+)", self.splitRule)
1057             if tmpMatch is not None:
1058                 return self.enum_ipStack[tmpMatch.group(1)]
1059         return None
1060 
1061     # use HS06 for walltime estimation
1062     def useHS06(self):
1063         return self.cpuTimeUnit in ["HS06sPerEvent", "HS06sPerEventFixed", "mHS06sPerEvent", "mHS06sPerEventFixed"]
1064 
1065     # get CPU time in sec
1066     def getCpuTime(self):
1067         if not self.useHS06():
1068             return None
1069         try:
1070             if self.cpuTimeUnit.startswith("m"):
1071                 return float(self.cpuTime) / 1000.0
1072         except Exception:
1073             pass
1074         return self.cpuTime
1075 
1076     # RAM scales with nCores
1077     def ramPerCore(self):
1078         return self.ramUnit in ["MBPerCore", "MBPerCoreFixed"]
1079 
1080     # run until input is closed
1081     def runUntilClosed(self):
1082         return self.check_split_rule("runUntilClosed")
1083 
1084     # stay output on site
1085     def stayOutputOnSite(self):
1086         return self.check_split_rule("stayOutputOnSite")
1087 
1088     # fail when goal unreached
1089     def failGoalUnreached(self):
1090         return self.check_split_rule("failGoalUnreached")
1091 
1092     # unset fail when goal unreached
1093     def unsetFailGoalUnreached(self):
1094         self.removeSplitRule(self.splitRuleToken["failGoalUnreached"])
1095 
1096     # switch ES to normal when jobs land at normal sites
1097     def switchEStoNormal(self):
1098         return self.check_split_rule("switchEStoNormal")
1099 
1100     # use world cloud
1101     def useWorldCloud(self):
1102         return self.cloud == self.worldCloudName
1103 
1104     # dynamic number of events
1105     def dynamicNumEvents(self):
1106         if self.splitRule is not None:
1107             tmpMatch = re.search(self.splitRuleToken["dynamicNumEvents"] + "=(\d+)", self.splitRule)
1108             if tmpMatch is not None:
1109                 return True
1110         return False
1111 
1112     # get min granularity for dynamic number of events
1113     def get_min_granularity(self):
1114         if self.splitRule is not None:
1115             tmpMatch = re.search(self.splitRuleToken["dynamicNumEvents"] + "=(\d+)", self.splitRule)
1116             if tmpMatch is not None:
1117                 return int(tmpMatch.group(1))
1118         return None
1119 
1120     # set alternative stage-out
1121     def setAltStageOut(self, value):
1122         if value in self.enum_altStageOut.values():
1123             for tmpKey, tmpVal in self.enum_altStageOut.items():
1124                 if value == tmpVal:
1125                     self.setSplitRule("altStageOut", tmpKey)
1126                     break
1127 
1128     # get alternative stage-out
1129     def getAltStageOut(self):
1130         if self.splitRule is not None:
1131             tmpMatch = re.search(self.splitRuleToken["altStageOut"] + "=(\d+)", self.splitRule)
1132             if tmpMatch is not None:
1133                 return self.enum_altStageOut[tmpMatch.group(1)]
1134         return None
1135 
1136     # allow WAN for input access
1137     def allowInputWAN(self):
1138         return self.check_split_rule("allowInputWAN")
1139 
1140     # set mode for input LAN access
1141     def setAllowInputLAN(self, value):
1142         if value in self.enum_inputLAN.values():
1143             for tmpKey, tmpVal in self.enum_inputLAN.items():
1144                 if value == tmpVal:
1145                     self.setSplitRule("allowInputLAN", tmpKey)
1146                     break
1147 
1148     # check if LAN is used for input access
1149     def allowInputLAN(self):
1150         if self.splitRule is not None:
1151             tmpMatch = re.search(self.splitRuleToken["allowInputLAN"] + "=(\d+)", self.splitRule)
1152             if tmpMatch is not None:
1153                 return self.enum_inputLAN[tmpMatch.group(1)]
1154         return None
1155 
1156     # put log files to OS
1157     def putLogToOS(self):
1158         return self.check_split_rule("putLogToOS")
1159 
1160     # merge ES on Object Store
1161     def mergeEsOnOS(self):
1162         return self.check_split_rule("mergeEsOnOS")
1163 
1164     # write input to file
1165     def writeInputToFile(self):
1166         return self.check_split_rule("writeInputToFile")
1167 
1168     # ignore missing input datasets
1169     def ignoreMissingInDS(self):
1170         return self.check_split_rule("ignoreMissingInDS")
1171 
1172     # suppress execute string conversion
1173     def noExecStrCnv(self):
1174         return self.check_split_rule("noExecStrCnv")
1175 
1176     # in-file positional event number
1177     def inFilePosEvtNum(self):
1178         return self.check_split_rule("inFilePosEvtNum")
1179 
1180     # register event service files
1181     def registerEsFiles(self):
1182         return self.check_split_rule("registerEsFiles")
1183 
1184     # disable auto finish
1185     def disableAutoFinish(self):
1186         return self.check_split_rule("disableAutoFinish")
1187 
1188     # reset refined attributes which may confuse they system
1189     def resetRefinedAttrs(self):
1190         self.resetChangedAttr("splitRule")
1191         self.resetChangedAttr("eventService")
1192         self.reserve_old_attributes()
1193 
1194     # resurrect consumers
1195     def resurrectConsumers(self):
1196         return self.check_split_rule("resurrectConsumers")
1197 
1198     # use prefetcher
1199     def usePrefetcher(self):
1200         return self.check_split_rule("usePrefetcher")
1201 
1202     # no input pooling
1203     def noInputPooling(self):
1204         return self.check_split_rule("noInputPooling")
1205 
1206     # get num of input chunks to wait
1207     def nChunksToWait(self):
1208         if self.splitRule is not None:
1209             tmpMatch = re.search(self.splitRuleToken["nChunksToWait"] + "=(\d+)", self.splitRule)
1210             if tmpMatch is not None:
1211                 return int(tmpMatch.group(1))
1212         return None
1213 
1214     # get max walltime
1215     def getMaxWalltime(self):
1216         if self.splitRule is not None:
1217             tmpMatch = re.search(self.splitRuleToken["maxWalltime"] + "=(\d+)", self.splitRule)
1218             if tmpMatch is not None:
1219                 return int(tmpMatch.group(1)) * 60 * 60
1220         return None
1221 
1222     # set max walltime
1223     def set_max_walltime(self, value):
1224         self.setSplitRule("maxWalltime", str(value))
1225 
1226     # get target size of the largest output to reset NG
1227     def getTgtMaxOutputForNG(self):
1228         if self.splitRule is not None:
1229             tmpMatch = re.search(self.splitRuleToken["tgtMaxOutputForNG"] + "=(\d+)", self.splitRule)
1230             if tmpMatch is not None:
1231                 return int(tmpMatch.group(1))
1232         return None
1233 
1234     # not discard events
1235     def notDiscardEvents(self):
1236         return self.check_split_rule("notDiscardEvents")
1237 
1238     # get min CPU efficiency
1239     def getMinCpuEfficiency(self):
1240         if self.splitRule is not None:
1241             tmpMatch = re.search(self.splitRuleToken["minCpuEfficiency"] + "=(\d+)", self.splitRule)
1242             if tmpMatch is not None:
1243                 return int(tmpMatch.group(1))
1244         return None
1245 
1246     # decrement attemptNr of events only when failed
1247     def decAttOnFailedES(self):
1248         return self.check_split_rule("decAttOnFailedES")
1249 
1250     # use zip files to pin input files
1251     def useZipToPin(self):
1252         return self.check_split_rule("useZipToPin")
1253 
1254     # architecture:
1255     # old format: sw_platform<@base_platform><#host_cpu_spec><&host_gpu_spec>
1256     #             host_cpu_spec: architecture<-vendor<-instruction_set>>
1257     #             host_gpu_spec: vendor<-model>
1258     #
1259     # new format: a json dict with keys of sw_platform, base_platform, cpu_specs, and gpu_spec
1260     #             cpu_specs: a list of json dicts with keys of arch, vendor, and instr
1261     #             gpu_spec: a json dict with keys of vendor and model
1262 
1263     # reformat architecture into JSON
1264     def reformat_architecture(self):
1265         if self.architecture is None:
1266             return
1267         encoded_platform = ""
1268         try:
1269             # JSON format without encoded platform
1270             tmp_dict = json.loads(self.architecture)
1271             # skip if encoded platform is not present
1272             if "encoded_platform" not in tmp_dict:
1273                 return
1274             encoded_platform = tmp_dict["encoded_platform"]
1275         except Exception:
1276             pass
1277         # convert to new format
1278         new_dict = {}
1279         val = self.get_sw_platform()
1280         if val:
1281             new_dict["sw_platform"] = val
1282         val = self.get_base_platform(encoded_platform)
1283         if val:
1284             new_dict["base_platform"] = val
1285         val = self.get_host_cpu_spec(encoded_platform)
1286         if val:
1287             # remove wildcard entries and empty specs
1288             l = [x for x in [{k: v for k, v in d.items() if v != "*"} for d in val] if x]
1289             if l:
1290                 new_dict["cpu_specs"] = l
1291         val = self.get_host_gpu_spec()
1292         if val:
1293             # remove wildcard entries and empty specs
1294             l = {k: v for k, v in val.items() if v != "*"}
1295             if l:
1296                 new_dict["gpu_spec"] = l
1297         self.architecture = json.dumps(new_dict)
1298 
1299     # get SW platform
1300     def get_sw_platform(self):
1301         try:
1302             d = json.loads(self.architecture)
1303             return d.get("sw_platform", "")
1304         except Exception:
1305             pass
1306         if self.architecture is not None:
1307             m = re.search("^([^@#&]*)", self.architecture)
1308             if m:
1309                 return m.group(1)
1310         return self.architecture
1311 
1312     # get base platform
1313     def get_base_platform(self, encoded_platform=None):
1314         try:
1315             d = json.loads(self.architecture)
1316             val = d.get("base_platform", None)
1317             if val is not None or encoded_platform is None:
1318                 return val
1319         except Exception:
1320             pass
1321         if encoded_platform:
1322             architecture = encoded_platform
1323         else:
1324             architecture = self.architecture
1325         if architecture is None or "@" not in architecture:
1326             return None
1327         m = re.search("@([^#&]*)", architecture)
1328         img = m.group(1)
1329         if img == "":
1330             img = None
1331         return img
1332 
1333     # get platforms
1334     def get_platforms(self):
1335         if self.architecture is not None:
1336             platform = self.get_sw_platform()
1337             base = self.get_base_platform()
1338             if base:
1339                 platform += "@" + base
1340             return platform
1341         return self.architecture
1342 
1343     # get host CPU spec
1344     def get_host_cpu_spec(self, encoded_platform=None):
1345         try:
1346             d = json.loads(self.architecture)
1347             specs = d.get("cpu_specs", None)
1348             if not specs and encoded_platform is None:
1349                 return None
1350             else:
1351                 for spec in specs:
1352                     spec.setdefault("vendor", "*")
1353                     spec.setdefault("instr", "*")
1354                 return specs
1355         except Exception:
1356             pass
1357         if encoded_platform:
1358             architecture = encoded_platform
1359         else:
1360             architecture = self.architecture
1361         try:
1362             if not architecture:
1363                 return None
1364             if "#" not in architecture:
1365                 if re.search(r"^[\^@&]", architecture):
1366                     return None
1367                 arch = architecture.split("-")[0]
1368                 if arch:
1369                     return [{"arch": arch, "vendor": "*", "instr": "*"}]
1370                 return None
1371             m = re.search(r"#([^\^@&]*)", architecture)
1372             spec_strs = m.group(1)
1373             if not spec_strs:
1374                 return None
1375             # remove ()
1376             if spec_strs.startswith("("):
1377                 spec_strs = spec_strs[1:]
1378             if spec_strs.endswith(")"):
1379                 spec_strs = spec_strs[:-1]
1380             specs = []
1381             for spec_str in spec_strs.split("|"):
1382                 spec_str += "-*" * (2 - spec_str.count("-"))
1383                 if "-" not in spec_str:
1384                     spec_str += "-*"
1385                 items = spec_str.split("-")
1386                 spec = {"arch": items[0], "vendor": items[1], "instr": items[2]}
1387                 specs.append(spec)
1388             return specs
1389         except Exception:
1390             return None
1391 
1392     def get_host_cpu_preference(self):
1393         try:
1394             d = json.loads(self.architecture)
1395             cpu_pref = d.get("cpu_pref", None)
1396             return cpu_pref
1397         except Exception:
1398             return None
1399 
1400     # get host GPU spec
1401     def get_host_gpu_spec(self):
1402         try:
1403             d = json.loads(self.architecture)
1404             spec = d.get("gpu_spec", None)
1405             spec.setdefault("vendor", "*")
1406             spec.setdefault("model", "*")
1407             return spec
1408         except Exception:
1409             pass
1410         try:
1411             if self.architecture is None or "&" not in self.architecture:
1412                 return None
1413             m = re.search(r"&([^\^@#]*)", self.architecture)
1414             spec_str = m.group(1)
1415             if not spec_str:
1416                 return None
1417             spec_str += "-*" * (1 - spec_str.count("-"))
1418             items = spec_str.split("-")
1419             spec = {"vendor": items[0], "model": items[1]}
1420             return spec
1421         except Exception:
1422             return None
1423 
1424     # HPO workflow
1425     def is_hpo_workflow(self):
1426         return self.check_split_rule("hpoWorkflow")
1427 
1428     # debug mode
1429     def is_debug_mode(self):
1430         return self.check_split_rule("debugMode")
1431 
1432     # multi-step execution
1433     def is_multi_step_exec(self):
1434         return self.check_split_rule("multiStepExec")
1435 
1436     # get max number of jobs
1437     def get_max_num_jobs(self):
1438         if self.splitRule is not None:
1439             tmpMatch = re.search(self.splitRuleToken["maxNumJobs"] + "=(\d+)", self.splitRule)
1440             if tmpMatch is not None:
1441                 return int(tmpMatch.group(1))
1442         return None
1443 
1444     # get total number of jobs
1445     def get_total_num_jobs(self):
1446         if self.splitRule is not None:
1447             tmpMatch = re.search(self.splitRuleToken["totNumJobs"] + "=(\d+)", self.splitRule)
1448             if tmpMatch is not None:
1449                 return int(tmpMatch.group(1))
1450         return None
1451 
1452     # use only tags for fat container
1453     def use_only_tags_fc(self):
1454         return self.check_split_rule("onlyTagsForFC")
1455 
1456     # avoid VP
1457     def avoid_vp(self):
1458         return self.check_split_rule("avoidVP")
1459 
1460     # set first contents feed
1461     def set_first_contents_feed(self, is_first):
1462         if is_first:
1463             self.setSplitRule("firstContentsFeed", self.FirstContentsFeed.TRUE.value)
1464         else:
1465             self.setSplitRule("firstContentsFeed", self.FirstContentsFeed.FALSE.value)
1466 
1467     # check if first contents feed
1468     def is_first_contents_feed(self):
1469         if self.splitRule is not None:
1470             tmpMatch = re.search(self.splitRuleToken["firstContentsFeed"] + "=(\d+)", self.splitRule)
1471             if tmpMatch is not None and tmpMatch.group(1) == self.FirstContentsFeed.TRUE.value:
1472                 return True
1473         return False
1474 
1475     # check if work is segmented
1476     def is_work_segmented(self):
1477         return self.check_split_rule("segmentedWork")
1478 
1479     # check if looping check is disabled
1480     def no_looping_check(self):
1481         return self.check_split_rule("noLoopingCheck")
1482 
1483     # encode job parameters
1484     def encode_job_params(self):
1485         return self.check_split_rule("encJobParams")
1486 
1487     # get original error dialog
1488     def get_original_error_dialog(self):
1489         if not self.origErrorDialog:
1490             return ""
1491         # remove log URL
1492         tmpStr = re.sub("<a href.+</a> : ", "", self.origErrorDialog)
1493         return tmpStr.split(". ")[-1]
1494 
1495     # check if secrets are used
1496     def use_secrets(self):
1497         return self.check_split_rule("useSecrets")
1498 
1499     # get max core count
1500     def get_max_core_count(self):
1501         if self.splitRule is not None:
1502             tmpMatch = re.search(self.splitRuleToken["maxCoreCount"] + "=(\d+)", self.splitRule)
1503             if tmpMatch is not None:
1504                 return int(tmpMatch.group(1))
1505         return None
1506 
1507     # push status changes
1508     def push_status_changes(self):
1509         return push_status_changes(self.splitRule)
1510 
1511     # use cloud as VO
1512     def cloud_as_vo(self):
1513         return self.check_split_rule("cloudAsVO")
1514 
1515     # push job
1516     def push_job(self):
1517         return self.check_split_rule("pushJob")
1518 
1519     # fine-grained process
1520     def is_fine_grained_process(self):
1521         return self.check_split_rule("fineGrainedProc")
1522 
1523     # on site merging
1524     def on_site_merging(self):
1525         return self.check_split_rule("onSiteMerging")
1526 
1527     # set full chain flag
1528     def set_full_chain(self, mode):
1529         var = None
1530         if mode == "only":
1531             var = self.FullChain.Only
1532         elif mode == "require":
1533             var = self.FullChain.Require
1534         elif mode == "capable":
1535             var = self.FullChain.Capable
1536         if var:
1537             self.setSplitRule("fullChain", var)
1538 
1539     # get full chain flag
1540     def get_full_chain(self):
1541         if self.splitRule:
1542             tmpMatch = re.search(self.splitRuleToken["fullChain"] + r"=(\d+)", self.splitRule)
1543             if tmpMatch:
1544                 return tmpMatch.group(1)
1545         return None
1546 
1547     # check full chain with mode
1548     def check_full_chain_with_mode(self, mode):
1549         task_flag = self.get_full_chain()
1550         if mode == "only":
1551             if task_flag == self.FullChain.Only:
1552                 return True
1553         elif mode == "require":
1554             if task_flag == self.FullChain.Require:
1555                 return True
1556         elif mode == "capable":
1557             if task_flag == self.FullChain.Capable:
1558                 return True
1559         return False
1560 
1561     # check full chain with nucleus
1562     def check_full_chain_with_nucleus(self, nucleus):
1563         if self.get_full_chain() and nucleus.get_bare_nucleus_mode():
1564             return True
1565         return False
1566 
1567     # get RAM for retry
1568     def get_ram_for_retry(self, current_ram):
1569         if not self.splitRule:
1570             return None
1571         tmpMatch = re.search(self.splitRuleToken["retryRamOffset"] + r"=(\d+)", self.splitRule)
1572         if not tmpMatch:
1573             return None
1574         offset = int(tmpMatch.group(1))
1575         tmpMatch = re.search(self.splitRuleToken["retryRamStep"] + r"=(\d+)", self.splitRule)
1576         if tmpMatch:
1577             step = int(tmpMatch.group(1))
1578         else:
1579             step = 0
1580         tmpMatch = re.search(self.splitRuleToken["retryRamMax"] + r"=(\d+)", self.splitRule)
1581         if tmpMatch:
1582             max_ram = int(tmpMatch.group(1))
1583         else:
1584             max_ram = None
1585         if not current_ram:
1586             return current_ram
1587         if current_ram < offset:
1588             if max_ram is None:
1589                 return offset
1590             else:
1591                 return min(offset, max_ram)
1592         if not step:
1593             return current_ram
1594         if max_ram is None:
1595             return offset + math.ceil((current_ram - offset) / step) * step
1596         else:
1597             return min(offset + math.ceil((current_ram - offset) / step) * step, max_ram)
1598 
1599     # get number of events per input
1600     def get_num_events_per_input(self):
1601         if self.splitRule is not None:
1602             tmpMatch = re.search(self.splitRuleToken["nEventsPerInput"] + "=(\d+)", self.splitRule)
1603             if tmpMatch is not None:
1604                 return int(tmpMatch.group(1))
1605         return None
1606 
1607     def get_max_events_per_job(self):
1608         if self.splitRule is not None:
1609             tmpMatch = re.search(self.splitRuleToken["maxEventsPerJob"] + "=(\d+)", self.splitRule)
1610             if tmpMatch is not None:
1611                 return int(tmpMatch.group(1))
1612         return None
1613 
1614     # set order input by
1615     def set_order_input_by(self, mode):
1616         var = None
1617         if mode == "eventsAlignment":
1618             var = self.OrderInputBy.eventsAlignment
1619         if var:
1620             self.setSplitRule("orderInputBy", var)
1621 
1622     # get full chain flag
1623     def order_input_by(self):
1624         if self.splitRule:
1625             tmpMatch = re.search(self.splitRuleToken["orderInputBy"] + r"=(\d+)", self.splitRule)
1626             if tmpMatch:
1627                 if tmpMatch.group(1) == self.OrderInputBy.eventsAlignment:
1628                     return "eventsAlignment"
1629         return None
1630 
1631     # check if intermediate task
1632     def is_intermediate_task(self):
1633         return self.check_split_rule("intermediateTask")
1634 
1635     # check if message driven
1636     def is_msg_driven(self):
1637         return is_msg_driven(self.splitRule)
1638 
1639     # check if incomplete input datasets are allowed
1640     def allow_incomplete_input(self):
1641         return self.check_split_rule("allowIncompleteInDS")
1642 
1643     # check if workflow holdup
1644     def is_workflow_holdup(self):
1645         return self.check_split_rule("workflowHoldup")
1646 
1647     # set workflow holdup
1648     def set_workflow_holdup(self, value: bool):
1649         if value:
1650             self.setSplitRule("workflowHoldup", "1")
1651         else:
1652             self.removeSplitRule(self.splitRuleToken["workflowHoldup"])
1653 
1654     # get queued time
1655     def get_queued_time(self):
1656         """
1657         Get queued time in timestamp
1658         :return: queued time in timestamp. None if not set
1659         """
1660         if self.queuedTime is None:
1661             return None
1662         return self.queuedTime.timestamp()
1663 
1664 
1665 # utils
1666 
1667 
1668 # check split rule with positive integer
1669 def check_split_rule_positive_int(key, split_rule):
1670     if not split_rule:
1671         return False
1672     tmpMatch = re.search(JediTaskSpec.splitRuleToken[key] + r"=(\d+)", split_rule)
1673     if not tmpMatch or int(tmpMatch.group(1)) <= 0:
1674         return False
1675     return True
1676 
1677 
1678 # check if push status changes without class instance
1679 def push_status_changes(split_rule):
1680     return check_split_rule_positive_int("pushStatusChanges", split_rule)
1681 
1682 
1683 # check if message driven without class instance
1684 def is_msg_driven(split_rule):
1685     return check_split_rule_positive_int("messageDriven", split_rule)
1686 
1687 
1688 # check if auto pause is disabled
1689 def is_auto_pause_disabled(split_rule):
1690     return not check_split_rule_positive_int("noAutoPause", split_rule)