Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:06

0001 """
0002 dataset specification for JEDI
0003 
0004 """
0005 
0006 import math
0007 import re
0008 
0009 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables
0010 
0011 from pandaserver.config import panda_config
0012 
0013 
0014 class JediDatasetSpec(object):
0015     def __str__(self):
0016         sb = []
0017         for key in self.__dict__:
0018             if key == "Files":
0019                 sb.append(f"{key}='{len(self.__dict__[key])}'")
0020             else:
0021                 sb.append(f"{key}='{self.__dict__[key]}'")
0022         return ", ".join(sb)
0023 
0024     def __repr__(self):
0025         return self.__str__()
0026 
0027     # attributes
0028     _attributes = (
0029         "jediTaskID",
0030         "datasetID",
0031         "datasetName",
0032         "containerName",
0033         "type",
0034         "creationTime",
0035         "modificationTime",
0036         "vo",
0037         "cloud",
0038         "site",
0039         "masterID",
0040         "provenanceID",
0041         "status",
0042         "state",
0043         "stateCheckTime",
0044         "stateCheckExpiration",
0045         "frozenTime",
0046         "nFiles",
0047         "nFilesToBeUsed",
0048         "nFilesUsed",
0049         "nFilesFinished",
0050         "nFilesFailed",
0051         "nFilesOnHold",
0052         "nEvents",
0053         "nEventsToBeUsed",
0054         "nEventsUsed",
0055         "lockedBy",
0056         "lockedTime",
0057         "attributes",
0058         "streamName",
0059         "storageToken",
0060         "destination",
0061         "templateID",
0062         "nFilesWaiting",
0063         "nFilesMissing",
0064     )
0065     # attributes which have 0 by default
0066     _zeroAttrs = ()
0067     # attributes to force update
0068     _forceUpdateAttrs = ("lockedBy", "lockedTime")
0069     # mapping between sequence and attr
0070     _seqAttrMap = {"datasetID": f"{panda_config.schemaJEDI}.JEDI_DATASETS_ID_SEQ.nextval"}
0071     # token for attributes
0072     attrToken = {
0073         "allowNoOutput": "an",
0074         "consistencyCheck": "cc",
0075         "eventRatio": "er",
0076         "indexConsistent": "ic",
0077         "mergeOnly": "mo",
0078         "nFilesPerJob": "np",
0079         "num_records": "nr",
0080         "offset": "of",
0081         "objectStore": "os",
0082         "pseudo": "ps",
0083         "random": "rd",
0084         "reusable": "ru",
0085         "transient": "tr",
0086         "useDuplicated": "ud",
0087         "no_staging": "ns",
0088     }
0089 
0090     # constructor
0091     def __init__(self):
0092         # install attributes
0093         for attr in self._attributes:
0094             object.__setattr__(self, attr, None)
0095         # file list
0096         object.__setattr__(self, "Files", [])
0097         # map of changed attributes
0098         object.__setattr__(self, "_changedAttrs", {})
0099         # distributed
0100         object.__setattr__(self, "distributed", False)
0101 
0102     # override __setattr__ to collecte the changed attributes
0103     def __setattr__(self, name, value):
0104         oldVal = getattr(self, name)
0105         object.__setattr__(self, name, value)
0106         newVal = getattr(self, name)
0107         # collect changed attributes
0108         if oldVal != newVal or name in self._forceUpdateAttrs:
0109             self._changedAttrs[name] = value
0110 
0111     # add File to files list
0112     def addFile(self, fileSpec):
0113         # append
0114         self.Files.append(fileSpec)
0115 
0116     # reset changed attribute list
0117     def resetChangedList(self):
0118         object.__setattr__(self, "_changedAttrs", {})
0119 
0120     # force update
0121     def forceUpdate(self, name):
0122         if name in self._attributes:
0123             self._changedAttrs[name] = getattr(self, name)
0124 
0125     # return map of values
0126     def valuesMap(self, useSeq=False, onlyChanged=False):
0127         ret = {}
0128         for attr in self._attributes:
0129             # use sequence
0130             if useSeq and attr in self._seqAttrMap:
0131                 continue
0132             # only changed attributes
0133             if onlyChanged:
0134                 if attr not in self._changedAttrs:
0135                     continue
0136             val = getattr(self, attr)
0137             if val is None:
0138                 if attr in self._zeroAttrs:
0139                     val = 0
0140                 else:
0141                     val = None
0142             ret[f":{attr}"] = val
0143         return ret
0144 
0145     # pack tuple into FileSpec
0146     def pack(self, values):
0147         for i in range(len(self._attributes)):
0148             attr = self._attributes[i]
0149             val = values[i]
0150             object.__setattr__(self, attr, val)
0151 
0152     # return column names for INSERT
0153     def columnNames(cls, prefix=None):
0154         ret = ""
0155         for attr in cls._attributes:
0156             if prefix is not None:
0157                 ret += f"{prefix}."
0158             ret += f"{attr},"
0159         ret = ret[:-1]
0160         return ret
0161 
0162     columnNames = classmethod(columnNames)
0163 
0164     # return expression of bind variables for INSERT
0165     def bindValuesExpression(cls, useSeq=True):
0166         ret = "VALUES("
0167         for attr in cls._attributes:
0168             if useSeq and attr in cls._seqAttrMap:
0169                 ret += f"{cls._seqAttrMap[attr]},"
0170             else:
0171                 ret += f":{attr},"
0172         ret = ret[:-1]
0173         ret += ")"
0174         return ret
0175 
0176     bindValuesExpression = classmethod(bindValuesExpression)
0177 
0178     # return an expression of bind variables for UPDATE to update only changed attributes
0179     def bindUpdateChangesExpression(self):
0180         ret = ""
0181         for attr in self._attributes:
0182             if attr in self._changedAttrs:
0183                 ret += f"{attr}=:{attr},"
0184         ret = ret[:-1]
0185         ret += " "
0186         return ret
0187 
0188     # set dataset attribute
0189     def setDatasetAttribute(self, attr):
0190         if self.attributes is None:
0191             self.attributes = ""
0192         else:
0193             self.attributes += ","
0194         self.attributes += attr
0195 
0196     # set dataset attribute with label
0197     def setDatasetAttributeWithLabel(self, label):
0198         if label not in self.attrToken:
0199             return
0200         attr = self.attrToken[label]
0201         if self.attributes is None:
0202             self.attributes = ""
0203         else:
0204             self.attributes += ","
0205         self.attributes += attr
0206 
0207     # return list of status to update contents
0208     def statusToUpdateContents(cls):
0209         return ["defined", "toupdate"]
0210 
0211     statusToUpdateContents = classmethod(statusToUpdateContents)
0212 
0213     # return list of types for input
0214     def getInputTypes(cls):
0215         return ["input", "pseudo_input"]
0216 
0217     getInputTypes = classmethod(getInputTypes)
0218 
0219     # return list of types to generate jobs
0220     def getProcessTypes(cls):
0221         return cls.getInputTypes() + ["pp_input"] + cls.getMergeProcessTypes()
0222 
0223     getProcessTypes = classmethod(getProcessTypes)
0224 
0225     # return list of types for merging
0226     def getMergeProcessTypes(cls):
0227         return ["trn_log", "trn_output"]
0228 
0229     getMergeProcessTypes = classmethod(getMergeProcessTypes)
0230 
0231     # get type of unknown input
0232     def getUnknownInputType(cls):
0233         return "trn_unknown"
0234 
0235     getUnknownInputType = classmethod(getUnknownInputType)
0236 
0237     # check if JEDI needs to keep track of file usage
0238     def toKeepTrack(self):
0239         if self.isNoSplit() and self.isRepeated():
0240             return False
0241         elif self.isReusable():
0242             return False
0243         else:
0244             return True
0245 
0246     # check if it is not split
0247     def isNoSplit(self):
0248         if self.attributes is not None and "nosplit" in self.attributes:
0249             return True
0250         else:
0251             return False
0252 
0253     # check if it is repeatedly used
0254     def isRepeated(self):
0255         if self.attributes is not None and "repeat" in self.attributes:
0256             return True
0257         else:
0258             return False
0259 
0260     # check if it is randomly used
0261     def isRandom(self):
0262         if self.attributes is not None and "rd" in self.attributes.split(","):
0263             return True
0264         else:
0265             return False
0266 
0267     # check if it is reusable
0268     def isReusable(self):
0269         if self.attributes is not None and "ru" in self.attributes.split(","):
0270             return True
0271         else:
0272             return False
0273 
0274     # check if consistency is checked
0275     def checkConsistency(self):
0276         if self.attributes is not None and "cc" in self.attributes.split(","):
0277             return True
0278         else:
0279             return False
0280 
0281     # set consistency is checked
0282     def enableCheckConsistency(self):
0283         if self.attributes in [None, ""]:
0284             self.attributes = "cc"
0285         elif "cc" not in self.attributes.split(","):
0286             self.attributes += ",cc"
0287 
0288     # check if it is pseudo
0289     def isPseudo(self):
0290         if self.datasetName in ["pseudo_dataset", "seq_number"] or self.type in ["pp_input"]:
0291             return True
0292         if self.attributes is not None and self.attrToken["pseudo"] in self.attributes.split(","):
0293             return True
0294         return False
0295 
0296     # check if it is a many-time dataset which is treated as long-standing at T2s
0297     def isManyTime(self):
0298         if self.attributes is not None and "manytime" in self.attributes:
0299             return True
0300         else:
0301             return False
0302 
0303     # check if it is seq number
0304     def isSeqNumber(self):
0305         if self.datasetName in ["seq_number"]:
0306             return True
0307         else:
0308             return False
0309 
0310     # check if duplicated files are used
0311     def useDuplicatedFiles(self):
0312         if self.attributes is not None and ("usedup" in self.attributes or "ud" in self.attributes.split(",")):
0313             return True
0314         else:
0315             return False
0316 
0317     # check if it is a master dataset
0318     def isMaster(self):
0319         if self.masterID is None and self.type in self.getProcessTypes():
0320             return True
0321         else:
0322             return False
0323 
0324     # check if it is a master input dataset
0325     def isMasterInput(self):
0326         if self.masterID is None and self.type in self.getInputTypes():
0327             return True
0328         else:
0329             return False
0330 
0331     # remove nosplit attribute
0332     def remAttribute(self, attrName):
0333         if self.attributes is not None:
0334             self.attributes = re.sub(attrName, "", self.attributes)
0335             self.attributes = re.sub(",,", ",", self.attributes)
0336             self.attributes = re.sub("^,", "", self.attributes)
0337             self.attributes = re.sub(",$", "", self.attributes)
0338             if self.attributes == "":
0339                 self.attributes = None
0340 
0341     # remove nosplit attribute
0342     def remNoSplit(self):
0343         self.remAttribute("nosplit")
0344 
0345     # remove repeat attribute
0346     def remRepeat(self):
0347         self.remAttribute("repeat")
0348 
0349     # get the ratio to master
0350     def getRatioToMaster(self):
0351         try:
0352             tmpMatch = re.search("ratio=(\d+(\.\d+)*)", self.attributes)
0353             if tmpMatch is not None:
0354                 ratioStr = tmpMatch.group(1)
0355                 try:
0356                     # integer
0357                     return int(ratioStr)
0358                 except Exception:
0359                     pass
0360                 try:
0361                     # float
0362                     return float(ratioStr)
0363                 except Exception:
0364                     pass
0365         except Exception:
0366             pass
0367         return 1
0368 
0369     # get N multiplied by ratio
0370     def getNumMultByRatio(self, num):
0371         # no split
0372         if self.isNoSplit():
0373             return None
0374         # get ratio
0375         ratioVal = self.getRatioToMaster()
0376         # integer or float
0377         if isinstance(ratioVal, int):
0378             retVal = num * ratioVal
0379         else:
0380             retVal = float(num) * ratioVal
0381             retVal = int(math.ceil(retVal))
0382         return retVal
0383 
0384     # unique map key for output
0385     def outputMapKey(self):
0386         mapKey = f"{self.datasetName}#{self.provenanceID}"
0387         return mapKey
0388 
0389     # unique map key
0390     def uniqueMapKey(self):
0391         mapKey = f"{self.datasetName}#{self.datasetID}"
0392         return mapKey
0393 
0394     # set offset
0395     def setOffset(self, offset):
0396         self.setDatasetAttribute(f"{self.attrToken['offset']}={offset}")
0397 
0398     # get offset
0399     def getOffset(self):
0400         if self.attributes is not None:
0401             tmpMatch = re.search(self.attrToken["offset"] + "=(\d+)", self.attributes)
0402             if tmpMatch is not None:
0403                 offset = int(tmpMatch.group(1))
0404                 return offset
0405         return 0
0406 
0407     # set number of records
0408     def setNumRecords(self, n):
0409         self.setDatasetAttribute(f"{self.attrToken['num_records']}={n}")
0410 
0411     # get number of records
0412     def getNumRecords(self):
0413         if self.attributes is not None:
0414             for item in self.attributes.split(","):
0415                 tmpMatch = re.search(self.attrToken["num_records"] + "=(\d+)", item)
0416                 if tmpMatch is not None:
0417                     num_records = int(tmpMatch.group(1))
0418                     return num_records
0419         return None
0420 
0421     # set object store
0422     def setObjectStore(self, objectStore):
0423         self.setDatasetAttribute(f"{self.attrToken['objectStore']}={objectStore}")
0424 
0425     # get object store
0426     def getObjectStore(self):
0427         if self.attributes is not None:
0428             tmpMatch = re.search(self.attrToken["objectStore"] + "=([^,]+)", self.attributes)
0429             if tmpMatch is not None:
0430                 return tmpMatch.group(1)
0431         return None
0432 
0433     # set the number of files per job
0434     def setNumFilesPerJob(self, num):
0435         self.setDatasetAttribute(f"{self.attrToken['nFilesPerJob']}={num}")
0436 
0437     # get the number of files per job
0438     def getNumFilesPerJob(self):
0439         if self.attributes is not None:
0440             tmpMatch = re.search(self.attrToken["nFilesPerJob"] + "=(\d+)", self.attributes)
0441             if tmpMatch is not None:
0442                 num = int(tmpMatch.group(1))
0443                 return num
0444         # use continuous numbers for seq_number
0445         if self.isSeqNumber():
0446             return 1
0447         return None
0448 
0449     # check if unmerged dataset
0450     def toMerge(self):
0451         if self.type.startswith("trn_"):
0452             return True
0453         return False
0454 
0455     # set transient
0456     def setTransient(self, val):
0457         if val is True:
0458             val = 1
0459         else:
0460             val = 0
0461         self.setDatasetAttribute(f"{self.attrToken['transient']}={val}")
0462 
0463     # get transient
0464     def getTransient(self):
0465         if self.attributes is not None:
0466             for item in self.attributes.split(","):
0467                 tmpMatch = re.search(self.attrToken["transient"] + "=(\d+)", item)
0468                 if tmpMatch is not None:
0469                     val = int(tmpMatch.group(1))
0470                     if val == 1:
0471                         return True
0472                     else:
0473                         return False
0474         return None
0475 
0476     # check if no output is allowed
0477     def isAllowedNoOutput(self):
0478         if self.attributes is not None and self.attrToken["allowNoOutput"] in self.attributes.split(","):
0479             return True
0480         else:
0481             return False
0482 
0483     # allow no output
0484     def allowNoOutput(self):
0485         if self.attributes in [None, ""]:
0486             items = []
0487         else:
0488             items = self.attributes.split(",")
0489         if self.attrToken["allowNoOutput"] not in items:
0490             items.append(self.attrToken["allowNoOutput"])
0491             self.attributes = ",".join(items)
0492 
0493     # check if index consistency is required
0494     def indexConsistent(self):
0495         if self.attributes is not None and self.attrToken["indexConsistent"] in self.attributes.split(","):
0496             return True
0497         else:
0498             return False
0499 
0500     # set distributed
0501     def setDistributed(self):
0502         self.distributed = True
0503 
0504     # reset distributed
0505     def reset_distributed(self):
0506         self.distributed = False
0507 
0508     # check if distributed
0509     def isDistributed(self):
0510         return self.distributed
0511 
0512     # set event ratio
0513     def setEventRatio(self, num):
0514         self.setDatasetAttribute(f"{self.attrToken['eventRatio']}={num}")
0515 
0516     # get event ratio
0517     def getEventRatio(self):
0518         if self.attributes is not None:
0519             for item in self.attributes.split(","):
0520                 tmpMatch = re.search(self.attrToken["eventRatio"] + "=(\d+(\.\d+)*)", item)
0521                 if tmpMatch is not None:
0522                     ratioStr = tmpMatch.group(1)
0523                     try:
0524                         # integer
0525                         return int(ratioStr)
0526                     except Exception:
0527                         pass
0528                     try:
0529                         # float
0530                         return float(ratioStr)
0531                     except Exception:
0532                         pass
0533         return None
0534 
0535     # set pseudo
0536     def setPseudo(self):
0537         if self.attributes in [None, ""]:
0538             items = []
0539         else:
0540             items = self.attributes.split(",")
0541         if self.attrToken["pseudo"] not in items:
0542             items.append(self.attrToken["pseudo"])
0543             self.attributes = ",".join(items)
0544 
0545     # merge only
0546     def is_merge_only(self):
0547         try:
0548             return self.attrToken["mergeOnly"] in self.attributes.split(",")
0549         except Exception:
0550             return False
0551 
0552     # sort files by old PandaIDs and move files with no PandaIDs to the end
0553     def sort_files_by_panda_ids(self):
0554         self.Files = sorted([f for f in self.Files if f.PandaID is not None], key=lambda x: x.PandaID) + [f for f in self.Files if f.PandaID is None]
0555 
0556     # set no staging
0557     def set_no_staging(self, value: bool):
0558         """
0559         Set no_staging (ns) attribute for dataset
0560 
0561         Args:
0562             value (bool): value of no_staging; will set attribute = 1 for True or 0 for False
0563         """
0564         num_repr = 1 if value else 0
0565         self.setDatasetAttribute(f"{self.attrToken['no_staging']}={num_repr}")
0566 
0567     # get whether is no_staging
0568     def is_no_staging(self) -> bool:
0569         """
0570         Set no_staging (ns) attribute for dataset
0571 
0572         Returns:
0573             bool: whether with no_staging
0574         """
0575         if self.attributes is not None:
0576             for item in self.attributes.split(","):
0577                 tmpMatch = re.search(self.attrToken["no_staging"] + "=(\d+)", item)
0578                 if tmpMatch is not None:
0579                     num_repr = int(tmpMatch.group(1))
0580                     value = bool(num_repr)
0581                     return value
0582         return False
0583 
0584 
0585 # often-used bind variables
0586 INPUT_TYPES_var_str, INPUT_TYPES_var_map = get_sql_IN_bind_variables(JediDatasetSpec.getInputTypes(), prefix=":type_", value_as_suffix=True)
0587 PROCESS_TYPES_var_str, PROCESS_TYPES_var_map = get_sql_IN_bind_variables(JediDatasetSpec.getProcessTypes(), prefix=":type_", value_as_suffix=True)
0588 MERGE_TYPES_var_str, MERGE_TYPES_var_map = get_sql_IN_bind_variables(JediDatasetSpec.getMergeProcessTypes(), prefix=":type_", value_as_suffix=True)