File indexing completed on 2026-04-10 08:39:06
0001 """
0002 dataset specification for JEDI
0003
0004 """
0005
0006 import math
0007 import re
0008
0009 from pandacommon.pandautils.PandaUtils import get_sql_IN_bind_variables
0010
0011 from pandaserver.config import panda_config
0012
0013
0014 class JediDatasetSpec(object):
0015 def __str__(self):
0016 sb = []
0017 for key in self.__dict__:
0018 if key == "Files":
0019 sb.append(f"{key}='{len(self.__dict__[key])}'")
0020 else:
0021 sb.append(f"{key}='{self.__dict__[key]}'")
0022 return ", ".join(sb)
0023
0024 def __repr__(self):
0025 return self.__str__()
0026
0027
0028 _attributes = (
0029 "jediTaskID",
0030 "datasetID",
0031 "datasetName",
0032 "containerName",
0033 "type",
0034 "creationTime",
0035 "modificationTime",
0036 "vo",
0037 "cloud",
0038 "site",
0039 "masterID",
0040 "provenanceID",
0041 "status",
0042 "state",
0043 "stateCheckTime",
0044 "stateCheckExpiration",
0045 "frozenTime",
0046 "nFiles",
0047 "nFilesToBeUsed",
0048 "nFilesUsed",
0049 "nFilesFinished",
0050 "nFilesFailed",
0051 "nFilesOnHold",
0052 "nEvents",
0053 "nEventsToBeUsed",
0054 "nEventsUsed",
0055 "lockedBy",
0056 "lockedTime",
0057 "attributes",
0058 "streamName",
0059 "storageToken",
0060 "destination",
0061 "templateID",
0062 "nFilesWaiting",
0063 "nFilesMissing",
0064 )
0065
0066 _zeroAttrs = ()
0067
0068 _forceUpdateAttrs = ("lockedBy", "lockedTime")
0069
0070 _seqAttrMap = {"datasetID": f"{panda_config.schemaJEDI}.JEDI_DATASETS_ID_SEQ.nextval"}
0071
0072 attrToken = {
0073 "allowNoOutput": "an",
0074 "consistencyCheck": "cc",
0075 "eventRatio": "er",
0076 "indexConsistent": "ic",
0077 "mergeOnly": "mo",
0078 "nFilesPerJob": "np",
0079 "num_records": "nr",
0080 "offset": "of",
0081 "objectStore": "os",
0082 "pseudo": "ps",
0083 "random": "rd",
0084 "reusable": "ru",
0085 "transient": "tr",
0086 "useDuplicated": "ud",
0087 "no_staging": "ns",
0088 }
0089
0090
0091 def __init__(self):
0092
0093 for attr in self._attributes:
0094 object.__setattr__(self, attr, None)
0095
0096 object.__setattr__(self, "Files", [])
0097
0098 object.__setattr__(self, "_changedAttrs", {})
0099
0100 object.__setattr__(self, "distributed", False)
0101
0102
0103 def __setattr__(self, name, value):
0104 oldVal = getattr(self, name)
0105 object.__setattr__(self, name, value)
0106 newVal = getattr(self, name)
0107
0108 if oldVal != newVal or name in self._forceUpdateAttrs:
0109 self._changedAttrs[name] = value
0110
0111
0112 def addFile(self, fileSpec):
0113
0114 self.Files.append(fileSpec)
0115
0116
0117 def resetChangedList(self):
0118 object.__setattr__(self, "_changedAttrs", {})
0119
0120
0121 def forceUpdate(self, name):
0122 if name in self._attributes:
0123 self._changedAttrs[name] = getattr(self, name)
0124
0125
0126 def valuesMap(self, useSeq=False, onlyChanged=False):
0127 ret = {}
0128 for attr in self._attributes:
0129
0130 if useSeq and attr in self._seqAttrMap:
0131 continue
0132
0133 if onlyChanged:
0134 if attr not in self._changedAttrs:
0135 continue
0136 val = getattr(self, attr)
0137 if val is None:
0138 if attr in self._zeroAttrs:
0139 val = 0
0140 else:
0141 val = None
0142 ret[f":{attr}"] = val
0143 return ret
0144
0145
0146 def pack(self, values):
0147 for i in range(len(self._attributes)):
0148 attr = self._attributes[i]
0149 val = values[i]
0150 object.__setattr__(self, attr, val)
0151
0152
0153 def columnNames(cls, prefix=None):
0154 ret = ""
0155 for attr in cls._attributes:
0156 if prefix is not None:
0157 ret += f"{prefix}."
0158 ret += f"{attr},"
0159 ret = ret[:-1]
0160 return ret
0161
0162 columnNames = classmethod(columnNames)
0163
0164
0165 def bindValuesExpression(cls, useSeq=True):
0166 ret = "VALUES("
0167 for attr in cls._attributes:
0168 if useSeq and attr in cls._seqAttrMap:
0169 ret += f"{cls._seqAttrMap[attr]},"
0170 else:
0171 ret += f":{attr},"
0172 ret = ret[:-1]
0173 ret += ")"
0174 return ret
0175
0176 bindValuesExpression = classmethod(bindValuesExpression)
0177
0178
0179 def bindUpdateChangesExpression(self):
0180 ret = ""
0181 for attr in self._attributes:
0182 if attr in self._changedAttrs:
0183 ret += f"{attr}=:{attr},"
0184 ret = ret[:-1]
0185 ret += " "
0186 return ret
0187
0188
0189 def setDatasetAttribute(self, attr):
0190 if self.attributes is None:
0191 self.attributes = ""
0192 else:
0193 self.attributes += ","
0194 self.attributes += attr
0195
0196
0197 def setDatasetAttributeWithLabel(self, label):
0198 if label not in self.attrToken:
0199 return
0200 attr = self.attrToken[label]
0201 if self.attributes is None:
0202 self.attributes = ""
0203 else:
0204 self.attributes += ","
0205 self.attributes += attr
0206
0207
0208 def statusToUpdateContents(cls):
0209 return ["defined", "toupdate"]
0210
0211 statusToUpdateContents = classmethod(statusToUpdateContents)
0212
0213
0214 def getInputTypes(cls):
0215 return ["input", "pseudo_input"]
0216
0217 getInputTypes = classmethod(getInputTypes)
0218
0219
0220 def getProcessTypes(cls):
0221 return cls.getInputTypes() + ["pp_input"] + cls.getMergeProcessTypes()
0222
0223 getProcessTypes = classmethod(getProcessTypes)
0224
0225
0226 def getMergeProcessTypes(cls):
0227 return ["trn_log", "trn_output"]
0228
0229 getMergeProcessTypes = classmethod(getMergeProcessTypes)
0230
0231
0232 def getUnknownInputType(cls):
0233 return "trn_unknown"
0234
0235 getUnknownInputType = classmethod(getUnknownInputType)
0236
0237
0238 def toKeepTrack(self):
0239 if self.isNoSplit() and self.isRepeated():
0240 return False
0241 elif self.isReusable():
0242 return False
0243 else:
0244 return True
0245
0246
0247 def isNoSplit(self):
0248 if self.attributes is not None and "nosplit" in self.attributes:
0249 return True
0250 else:
0251 return False
0252
0253
0254 def isRepeated(self):
0255 if self.attributes is not None and "repeat" in self.attributes:
0256 return True
0257 else:
0258 return False
0259
0260
0261 def isRandom(self):
0262 if self.attributes is not None and "rd" in self.attributes.split(","):
0263 return True
0264 else:
0265 return False
0266
0267
0268 def isReusable(self):
0269 if self.attributes is not None and "ru" in self.attributes.split(","):
0270 return True
0271 else:
0272 return False
0273
0274
0275 def checkConsistency(self):
0276 if self.attributes is not None and "cc" in self.attributes.split(","):
0277 return True
0278 else:
0279 return False
0280
0281
0282 def enableCheckConsistency(self):
0283 if self.attributes in [None, ""]:
0284 self.attributes = "cc"
0285 elif "cc" not in self.attributes.split(","):
0286 self.attributes += ",cc"
0287
0288
0289 def isPseudo(self):
0290 if self.datasetName in ["pseudo_dataset", "seq_number"] or self.type in ["pp_input"]:
0291 return True
0292 if self.attributes is not None and self.attrToken["pseudo"] in self.attributes.split(","):
0293 return True
0294 return False
0295
0296
0297 def isManyTime(self):
0298 if self.attributes is not None and "manytime" in self.attributes:
0299 return True
0300 else:
0301 return False
0302
0303
0304 def isSeqNumber(self):
0305 if self.datasetName in ["seq_number"]:
0306 return True
0307 else:
0308 return False
0309
0310
0311 def useDuplicatedFiles(self):
0312 if self.attributes is not None and ("usedup" in self.attributes or "ud" in self.attributes.split(",")):
0313 return True
0314 else:
0315 return False
0316
0317
0318 def isMaster(self):
0319 if self.masterID is None and self.type in self.getProcessTypes():
0320 return True
0321 else:
0322 return False
0323
0324
0325 def isMasterInput(self):
0326 if self.masterID is None and self.type in self.getInputTypes():
0327 return True
0328 else:
0329 return False
0330
0331
0332 def remAttribute(self, attrName):
0333 if self.attributes is not None:
0334 self.attributes = re.sub(attrName, "", self.attributes)
0335 self.attributes = re.sub(",,", ",", self.attributes)
0336 self.attributes = re.sub("^,", "", self.attributes)
0337 self.attributes = re.sub(",$", "", self.attributes)
0338 if self.attributes == "":
0339 self.attributes = None
0340
0341
0342 def remNoSplit(self):
0343 self.remAttribute("nosplit")
0344
0345
0346 def remRepeat(self):
0347 self.remAttribute("repeat")
0348
0349
0350 def getRatioToMaster(self):
0351 try:
0352 tmpMatch = re.search("ratio=(\d+(\.\d+)*)", self.attributes)
0353 if tmpMatch is not None:
0354 ratioStr = tmpMatch.group(1)
0355 try:
0356
0357 return int(ratioStr)
0358 except Exception:
0359 pass
0360 try:
0361
0362 return float(ratioStr)
0363 except Exception:
0364 pass
0365 except Exception:
0366 pass
0367 return 1
0368
0369
0370 def getNumMultByRatio(self, num):
0371
0372 if self.isNoSplit():
0373 return None
0374
0375 ratioVal = self.getRatioToMaster()
0376
0377 if isinstance(ratioVal, int):
0378 retVal = num * ratioVal
0379 else:
0380 retVal = float(num) * ratioVal
0381 retVal = int(math.ceil(retVal))
0382 return retVal
0383
0384
0385 def outputMapKey(self):
0386 mapKey = f"{self.datasetName}#{self.provenanceID}"
0387 return mapKey
0388
0389
0390 def uniqueMapKey(self):
0391 mapKey = f"{self.datasetName}#{self.datasetID}"
0392 return mapKey
0393
0394
0395 def setOffset(self, offset):
0396 self.setDatasetAttribute(f"{self.attrToken['offset']}={offset}")
0397
0398
0399 def getOffset(self):
0400 if self.attributes is not None:
0401 tmpMatch = re.search(self.attrToken["offset"] + "=(\d+)", self.attributes)
0402 if tmpMatch is not None:
0403 offset = int(tmpMatch.group(1))
0404 return offset
0405 return 0
0406
0407
0408 def setNumRecords(self, n):
0409 self.setDatasetAttribute(f"{self.attrToken['num_records']}={n}")
0410
0411
0412 def getNumRecords(self):
0413 if self.attributes is not None:
0414 for item in self.attributes.split(","):
0415 tmpMatch = re.search(self.attrToken["num_records"] + "=(\d+)", item)
0416 if tmpMatch is not None:
0417 num_records = int(tmpMatch.group(1))
0418 return num_records
0419 return None
0420
0421
0422 def setObjectStore(self, objectStore):
0423 self.setDatasetAttribute(f"{self.attrToken['objectStore']}={objectStore}")
0424
0425
0426 def getObjectStore(self):
0427 if self.attributes is not None:
0428 tmpMatch = re.search(self.attrToken["objectStore"] + "=([^,]+)", self.attributes)
0429 if tmpMatch is not None:
0430 return tmpMatch.group(1)
0431 return None
0432
0433
0434 def setNumFilesPerJob(self, num):
0435 self.setDatasetAttribute(f"{self.attrToken['nFilesPerJob']}={num}")
0436
0437
0438 def getNumFilesPerJob(self):
0439 if self.attributes is not None:
0440 tmpMatch = re.search(self.attrToken["nFilesPerJob"] + "=(\d+)", self.attributes)
0441 if tmpMatch is not None:
0442 num = int(tmpMatch.group(1))
0443 return num
0444
0445 if self.isSeqNumber():
0446 return 1
0447 return None
0448
0449
0450 def toMerge(self):
0451 if self.type.startswith("trn_"):
0452 return True
0453 return False
0454
0455
0456 def setTransient(self, val):
0457 if val is True:
0458 val = 1
0459 else:
0460 val = 0
0461 self.setDatasetAttribute(f"{self.attrToken['transient']}={val}")
0462
0463
0464 def getTransient(self):
0465 if self.attributes is not None:
0466 for item in self.attributes.split(","):
0467 tmpMatch = re.search(self.attrToken["transient"] + "=(\d+)", item)
0468 if tmpMatch is not None:
0469 val = int(tmpMatch.group(1))
0470 if val == 1:
0471 return True
0472 else:
0473 return False
0474 return None
0475
0476
0477 def isAllowedNoOutput(self):
0478 if self.attributes is not None and self.attrToken["allowNoOutput"] in self.attributes.split(","):
0479 return True
0480 else:
0481 return False
0482
0483
0484 def allowNoOutput(self):
0485 if self.attributes in [None, ""]:
0486 items = []
0487 else:
0488 items = self.attributes.split(",")
0489 if self.attrToken["allowNoOutput"] not in items:
0490 items.append(self.attrToken["allowNoOutput"])
0491 self.attributes = ",".join(items)
0492
0493
0494 def indexConsistent(self):
0495 if self.attributes is not None and self.attrToken["indexConsistent"] in self.attributes.split(","):
0496 return True
0497 else:
0498 return False
0499
0500
0501 def setDistributed(self):
0502 self.distributed = True
0503
0504
0505 def reset_distributed(self):
0506 self.distributed = False
0507
0508
0509 def isDistributed(self):
0510 return self.distributed
0511
0512
0513 def setEventRatio(self, num):
0514 self.setDatasetAttribute(f"{self.attrToken['eventRatio']}={num}")
0515
0516
0517 def getEventRatio(self):
0518 if self.attributes is not None:
0519 for item in self.attributes.split(","):
0520 tmpMatch = re.search(self.attrToken["eventRatio"] + "=(\d+(\.\d+)*)", item)
0521 if tmpMatch is not None:
0522 ratioStr = tmpMatch.group(1)
0523 try:
0524
0525 return int(ratioStr)
0526 except Exception:
0527 pass
0528 try:
0529
0530 return float(ratioStr)
0531 except Exception:
0532 pass
0533 return None
0534
0535
0536 def setPseudo(self):
0537 if self.attributes in [None, ""]:
0538 items = []
0539 else:
0540 items = self.attributes.split(",")
0541 if self.attrToken["pseudo"] not in items:
0542 items.append(self.attrToken["pseudo"])
0543 self.attributes = ",".join(items)
0544
0545
0546 def is_merge_only(self):
0547 try:
0548 return self.attrToken["mergeOnly"] in self.attributes.split(",")
0549 except Exception:
0550 return False
0551
0552
0553 def sort_files_by_panda_ids(self):
0554 self.Files = sorted([f for f in self.Files if f.PandaID is not None], key=lambda x: x.PandaID) + [f for f in self.Files if f.PandaID is None]
0555
0556
0557 def set_no_staging(self, value: bool):
0558 """
0559 Set no_staging (ns) attribute for dataset
0560
0561 Args:
0562 value (bool): value of no_staging; will set attribute = 1 for True or 0 for False
0563 """
0564 num_repr = 1 if value else 0
0565 self.setDatasetAttribute(f"{self.attrToken['no_staging']}={num_repr}")
0566
0567
0568 def is_no_staging(self) -> bool:
0569 """
0570 Set no_staging (ns) attribute for dataset
0571
0572 Returns:
0573 bool: whether with no_staging
0574 """
0575 if self.attributes is not None:
0576 for item in self.attributes.split(","):
0577 tmpMatch = re.search(self.attrToken["no_staging"] + "=(\d+)", item)
0578 if tmpMatch is not None:
0579 num_repr = int(tmpMatch.group(1))
0580 value = bool(num_repr)
0581 return value
0582 return False
0583
0584
0585
0586 INPUT_TYPES_var_str, INPUT_TYPES_var_map = get_sql_IN_bind_variables(JediDatasetSpec.getInputTypes(), prefix=":type_", value_as_suffix=True)
0587 PROCESS_TYPES_var_str, PROCESS_TYPES_var_map = get_sql_IN_bind_variables(JediDatasetSpec.getProcessTypes(), prefix=":type_", value_as_suffix=True)
0588 MERGE_TYPES_var_str, MERGE_TYPES_var_map = get_sql_IN_bind_variables(JediDatasetSpec.getMergeProcessTypes(), prefix=":type_", value_as_suffix=True)