File indexing completed on 2026-04-20 07:58:58
0001 """
0002 Job spec class
0003
0004 """
0005
0006 import copy
0007 import datetime
0008 import json
0009
0010 from pandaharvester.harvestercore import core_utils
0011
0012 from .spec_base import SpecBase
0013
0014
0015 class JobSpec(SpecBase):
0016
0017 HO_noOutput = 0
0018 HO_hasOutput = 1
0019 HO_hasZipOutput = 2
0020 HO_hasTransfer = 3
0021 HO_hasPostZipOutput = 4
0022
0023
0024 AUX_hasAuxInput = 0
0025 AUX_inTriggered = 1
0026 AUX_allTriggered = 2
0027 AUX_inReady = 3
0028 AUX_allReady = 4
0029
0030
0031 attributesWithTypes = (
0032 "PandaID:integer primary key",
0033 "taskID:integer / index",
0034 "attemptNr:integer",
0035 "status:text",
0036 "subStatus:text / index",
0037 "currentPriority:integer / index",
0038 "computingSite:text / index",
0039 "creationTime:timestamp",
0040 "modificationTime:timestamp / index",
0041 "stateChangeTime:timestamp",
0042 "startTime:timestamp",
0043 "endTime:timestamp",
0044 "nCore:integer",
0045 "jobParams:blob",
0046 "jobAttributes:blob",
0047 "hasOutFile:integer",
0048 "metaData:blob",
0049 "outputFilesToReport:blob",
0050 "lockedBy:text",
0051 "propagatorLock:text",
0052 "propagatorTime:timestamp / index",
0053 "preparatorTime:timestamp / index",
0054 "submitterTime:timestamp",
0055 "stagerLock:text",
0056 "stagerTime:timestamp / index",
0057 "zipPerMB:integer",
0058 "nWorkers:integer",
0059 "nWorkersLimit:integer",
0060 "submissionAttempts:integer",
0061 "jobsetID:integer",
0062 "pilotClosed:integer",
0063 "configID:integer / index",
0064 "nRemainingEvents:integer",
0065 "moreWorkers:integer",
0066 "maxWorkersInTotal:integer",
0067 "nWorkersInTotal:integer",
0068 "jobParamsExtForOutput:blob",
0069 "jobParamsExtForLog:blob",
0070 "auxInput:integer",
0071 "resourceType:text",
0072 )
0073
0074
0075 zeroAttrs = ("nWorkers", "submissionAttempts", "nWorkersInTotal")
0076
0077
0078 skipAttrsToSlim = "jobParams"
0079
0080
0081 def __init__(self):
0082 SpecBase.__init__(self)
0083 object.__setattr__(self, "events", set())
0084 object.__setattr__(self, "zipEventMap", {})
0085 object.__setattr__(self, "inFiles", set())
0086 object.__setattr__(self, "outFiles", set())
0087 object.__setattr__(self, "zipFileMap", {})
0088 object.__setattr__(self, "workspec_list", [])
0089
0090
0091 def add_file(self, filespec):
0092 if filespec.fileType == "input":
0093 self.add_in_file(filespec)
0094 else:
0095 self.add_out_file(filespec)
0096
0097
0098 def add_in_file(self, filespec):
0099 self.inFiles.add(filespec)
0100
0101
0102 def add_out_file(self, filespec):
0103 self.outFiles.add(filespec)
0104
0105
0106 def reset_out_file(self):
0107 self.outFiles.clear()
0108
0109
0110 def get_files_to_delete(self):
0111 files = []
0112 for fileSpec in self.inFiles.union(self.outFiles):
0113 if fileSpec.todelete == 1:
0114 files.append(fileSpec)
0115 return files
0116
0117
0118 def add_event(self, event_spec, zip_filespec):
0119 if zip_filespec is None:
0120 zipFileID = None
0121 else:
0122 zipFileID = zip_filespec.fileID
0123 if zipFileID not in self.zipEventMap:
0124 self.zipEventMap[zipFileID] = {"events": set(), "zip": zip_filespec}
0125 self.zipEventMap[zipFileID]["events"].add(event_spec)
0126 self.events.add(event_spec)
0127
0128
0129 def convert_job_json(self, data):
0130
0131 try:
0132 if "secrets" in data:
0133 data["secrets"] = json.loads(data["secrets"])
0134 except Exception:
0135 pass
0136 self.PandaID = data["PandaID"]
0137 if data["taskID"] == "NULL":
0138 self.taskID = None
0139 else:
0140 self.taskID = data["taskID"]
0141 self.attemptNr = data["attemptNr"]
0142 if data["jobsetID"] == "NULL":
0143 self.jobsetID = None
0144 else:
0145 self.jobsetID = data["jobsetID"]
0146 self.currentPriority = data["currentPriority"]
0147 self.nCore = data.get("coreCount")
0148 self.resourceType = data.get("resource_type")
0149 self.jobParams = data
0150 self.jobParamsExtForOutput = self.get_output_file_attributes()
0151 self.jobParamsExtForLog = self.get_logfile_info()
0152 if "zipPerMB" in data:
0153 self.zipPerMB = data["zipPerMB"]
0154
0155
0156 def trigger_propagation(self):
0157 self.propagatorTime = core_utils.naive_utcnow() - datetime.timedelta(hours=1)
0158
0159
0160 def trigger_preparation(self):
0161 self.preparatorTime = core_utils.naive_utcnow() - datetime.timedelta(hours=1)
0162
0163
0164 def trigger_stage_out(self):
0165 self.stagerTime = core_utils.naive_utcnow() - datetime.timedelta(hours=1)
0166
0167
0168 def set_attributes(self, attrs):
0169 if attrs is None:
0170 return
0171 attrs = copy.copy(attrs)
0172
0173 for attName in ["pilotErrorCode", "pilotErrorDiag", "exeErrorCode", "exeErrorDiag"]:
0174 if attName in attrs:
0175 if self.PandaID not in attrs:
0176 attrs[self.PandaID] = dict()
0177 if attName not in attrs[self.PandaID]:
0178 attrs[self.PandaID][attName] = attrs[attName]
0179 if self.PandaID not in attrs:
0180 return
0181 attrs = copy.copy(attrs[self.PandaID])
0182
0183 if "metaData" in attrs:
0184 self.metaData = attrs["metaData"]
0185 del attrs["metaData"]
0186 if "xml" in attrs:
0187 self.outputFilesToReport = attrs["xml"]
0188 del attrs["xml"]
0189 if self.jobAttributes is None:
0190 self.jobAttributes = attrs
0191 else:
0192 for key, val in attrs.items():
0193 if key not in self.jobAttributes or self.jobAttributes[key] != val:
0194 self.jobAttributes[key] = val
0195 self.force_update("jobAttributes")
0196
0197
0198 def set_one_attribute(self, attr, value):
0199 if self.jobAttributes is None:
0200 self.jobAttributes = dict()
0201 if attr not in self.jobAttributes or self.jobAttributes[attr] != value:
0202 self.jobAttributes[attr] = value
0203 self.force_update("jobAttributes")
0204
0205
0206 def has_attribute(self, attr):
0207 if self.jobAttributes is None:
0208 return False
0209 return attr in self.jobAttributes
0210
0211
0212 def get_one_attribute(self, attr):
0213 if self.jobAttributes and attr in self.jobAttributes:
0214 return self.jobAttributes[attr]
0215 return None
0216
0217
0218 def is_final_status(self, job_status=None):
0219 if job_status is None:
0220 job_status = self.status
0221 return job_status in ["finished", "failed", "cancelled", "missed"]
0222
0223
0224 def get_status(self):
0225
0226 if self.is_final_status() and self.subStatus not in ["killed"] and (self.subStatus in ["to_transfer", "transferring"] or not self.all_events_done()):
0227 return "transferring"
0228 return self.status
0229
0230
0231 def all_events_done(self):
0232 retVal = True
0233 for eventSpec in self.events:
0234 if eventSpec.subStatus != "done":
0235 retVal = False
0236 break
0237 return retVal
0238
0239
0240 def all_files_triggered_to_stage_out(self):
0241 for fileSpec in self.outFiles:
0242 if fileSpec.status not in ["finished", "failed"]:
0243 fileSpec.status = "transferring"
0244 fileSpec.attemptNr = 0
0245
0246
0247 def all_files_zipped(self, use_post_zipping=False):
0248 for fileSpec in self.outFiles:
0249 if fileSpec.status not in ["finished", "failed"]:
0250 fileSpec.attemptNr = 0
0251 if use_post_zipping:
0252 fileSpec.status = "post_zipping"
0253 else:
0254 fileSpec.status = "defined"
0255 fileSpec.groupID = None
0256 fileSpec.groupStatus = None
0257 fileSpec.groupUpdateTime = None
0258
0259
0260 def to_event_data(self, max_events=None):
0261 data = []
0262 eventSpecs = []
0263 iEvents = 0
0264 for zipFileID, eventsData in self.zipEventMap.items():
0265 if max_events is not None and iEvents > max_events:
0266 break
0267 eventRanges = []
0268 for eventSpec in eventsData["events"]:
0269 eventRanges.append(eventSpec.to_data())
0270 eventSpecs.append(eventSpec)
0271 iEvents += 1
0272 tmpData = {}
0273 tmpData["eventRanges"] = eventRanges
0274 if "sourceURL" in self.jobParams:
0275 tmpData["sourceURL"] = self.jobParams["sourceURL"]
0276 if zipFileID is not None:
0277 zipFileSpec = eventsData["zip"]
0278 if zipFileSpec.status == "finished":
0279 objstoreID = f"{zipFileSpec.objstoreID}"
0280 if zipFileSpec.pathConvention is not None:
0281 objstoreID += f"/{zipFileSpec.pathConvention}"
0282 tmpData["zipFile"] = {"lfn": zipFileSpec.lfn, "objstoreID": objstoreID}
0283 if zipFileSpec.fsize not in [None, 0]:
0284 tmpData["zipFile"]["fsize"] = zipFileSpec.fsize
0285 if zipFileSpec.chksum is not None:
0286 if zipFileSpec.chksum.startswith("md:"):
0287 tmpData["zipFile"]["md5"] = zipFileSpec.chksum.split(":")[-1]
0288 elif zipFileSpec.chksum.startswith("ad:"):
0289 tmpData["zipFile"]["adler32"] = zipFileSpec.chksum.split(":")[-1]
0290 else:
0291 tmpData["zipFile"]["adler32"] = zipFileSpec.chksum
0292 data.append(tmpData)
0293 return data, eventSpecs
0294
0295
0296 def get_input_file_attributes(self, skip_ready=False):
0297 lfnToSkip = set()
0298 attemptNrMap = dict()
0299 pathMap = dict()
0300 for fileSpec in self.inFiles:
0301 if skip_ready and fileSpec.status == "ready":
0302 lfnToSkip.add(fileSpec.lfn)
0303 attemptNrMap[fileSpec.lfn] = fileSpec.attemptNr
0304 pathMap[fileSpec.lfn] = fileSpec.path
0305 inFiles = {}
0306 lfns = self.jobParams["inFiles"].split(",")
0307 guids = self.jobParams["GUID"].split(",")
0308 fsizes = self.jobParams["fsize"].split(",")
0309 chksums = self.jobParams["checksum"].split(",")
0310 scopes = self.jobParams["scopeIn"].split(",")
0311 datasets = self.jobParams["realDatasetsIn"].split(",")
0312 endpoints = self.jobParams["ddmEndPointIn"].split(",")
0313 for lfn, guid, fsize, chksum, scope, dataset, endpoint in zip(lfns, guids, fsizes, chksums, scopes, datasets, endpoints):
0314 try:
0315 fsize = int(fsize)
0316 except Exception:
0317 fsize = None
0318 if lfn in lfnToSkip:
0319 continue
0320 if lfn in attemptNrMap:
0321 attemptNr = attemptNrMap[lfn]
0322 else:
0323 attemptNr = 0
0324 inFiles[lfn] = {"fsize": fsize, "guid": guid, "checksum": chksum, "scope": scope, "dataset": dataset, "endpoint": endpoint, "attemptNr": attemptNr}
0325
0326 if "inFilePaths" in self.jobParams:
0327 for lfn in lfns:
0328 if lfn not in inFiles or lfn not in pathMap:
0329 continue
0330 inFiles[lfn]["path"] = pathMap[lfn]
0331
0332 if "" in inFiles:
0333 del inFiles[""]
0334 if "NULL" in inFiles:
0335 del inFiles["NULL"]
0336 return inFiles
0337
0338
0339 def set_input_file_paths(self, in_files):
0340 lfns = self.get_input_file_attributes().keys()
0341 paths = []
0342 for lfn in lfns:
0343
0344 if lfn in in_files:
0345 paths.append(in_files[lfn]["path"])
0346 self.jobParams["inFilePaths"] = ",".join(paths)
0347
0348 self.force_update("jobParams")
0349
0350 for fileSpec in self.inFiles:
0351 if fileSpec.lfn in in_files:
0352 fileSpec.path = in_files[fileSpec.lfn]["path"]
0353
0354
0355 def set_all_input_ready(self):
0356
0357 for fileSpec in self.inFiles:
0358 fileSpec.status = "ready"
0359
0360
0361 def get_output_file_attributes(self):
0362 if self.jobParamsExtForOutput is not None:
0363 return self.jobParamsExtForOutput
0364 outFiles = {}
0365 lfns = self.jobParams["outFiles"].split(",")
0366 scopes = self.jobParams["scopeOut"].split(",")
0367 scopeLog = self.jobParams["scopeLog"]
0368 logLFN = self.jobParams["logFile"]
0369 if scopeLog and logLFN:
0370 scopes.insert(lfns.index(logLFN), scopeLog)
0371 datasets = self.jobParams["realDatasets"].split(",")
0372 endpoints = self.jobParams["ddmEndPointOut"].split(",")
0373 for lfn, scope, dataset, endpoint in zip(lfns, scopes, datasets, endpoints):
0374 outFiles[lfn] = {"scope": scope, "dataset": dataset, "endpoint": endpoint}
0375 self.jobParamsExtForOutput = outFiles
0376 return outFiles
0377
0378
0379 def get_logfile_info(self):
0380 if self.jobParamsExtForLog is not None:
0381 return self.jobParamsExtForLog
0382 retMap = dict()
0383 retMap["lfn"] = self.jobParams["logFile"]
0384 retMap["guid"] = self.jobParams["logGUID"]
0385 self.jobParamsExtForLog = retMap
0386 return retMap
0387
0388
0389 def set_start_time(self, force=False):
0390 if self.startTime is None or force is True:
0391 self.startTime = core_utils.naive_utcnow()
0392
0393
0394 def set_end_time(self, force=False):
0395 if self.endTime is None or force is True:
0396 self.endTime = core_utils.naive_utcnow()
0397
0398
0399 def reset_start_end_time(self):
0400 self.startTime = core_utils.naive_utcnow()
0401 self.endTime = self.startTime
0402
0403
0404 def add_workspec_list(self, workspec_list):
0405 self.workspec_list = workspec_list
0406
0407
0408 def get_workspec_list(self):
0409 return self.workspec_list
0410
0411
0412 def get_job_attributes_for_panda(self):
0413 data = dict()
0414 if self.jobAttributes is None:
0415 return data
0416
0417
0418 panda_attributes = {
0419 "token": "token",
0420 "transExitCode": "trans_exit_code",
0421 "pilotErrorCode": "pilot_error_code",
0422 "pilotErrorDiag": "pilot_error_diag",
0423 "node": "node",
0424 "cpuConsumptionTime": "cpu_consumption_time",
0425 "cpuConsumptionUnit": "cpu_consumption_unit",
0426 "schedulerID": "scheduler_id",
0427 "pilotID": "pilot_id",
0428 "siteName": "site_name",
0429 "pilotLog": "pilot_log",
0430 "cpuConversionFactor": "cpu_conversion_factor",
0431 "exeErrorCode": "exe_error_code",
0432 "exeErrorDiag": "exe_error_diag",
0433 "pilotTiming": "pilot_timing",
0434 "startTime": "start_time",
0435 "endTime": "end_time",
0436 "nEvents": "n_events",
0437 "nInputFiles": "n_input_files",
0438 "batchID": "batch_id",
0439 "attemptNr": "attempt_nr",
0440 "jobMetrics": "job_metrics",
0441 "stdout": "stdout",
0442 "coreCount": "core_count",
0443 "maxRSS": "max_rss",
0444 "maxVMEM": "max_vmem",
0445 "maxSWAP": "max_swap",
0446 "maxPSS": "max_pss",
0447 "avgRSS": "avg_rss",
0448 "avgVMEM": "avg_vmem",
0449 "avgSWAP": "avg_swap",
0450 "avgPSS": "avg_pss",
0451 "totRCHAR": "tot_rchar",
0452 "totWCHAR": "tot_wchar",
0453 "totRBYTES": "tot_rbytes",
0454 "totWBYTES": "tot_wbytes",
0455 "rateRCHAR": "rate_rchar",
0456 "rateWCHAR": "rate_wchar",
0457 "rateRBYTES": "rate_rbytes",
0458 "rateWBYTES": "rate_wbytes",
0459 }
0460
0461
0462 for job_attribute_name, job_attribute_value in self.jobAttributes.items():
0463 if job_attribute_name in panda_attributes:
0464 clean_attribute_name = panda_attributes[job_attribute_name]
0465 data[clean_attribute_name] = job_attribute_value
0466
0467 return data
0468
0469
0470 def get_job_status_from_attributes(self):
0471 if self.jobAttributes is None or "jobStatus" not in self.jobAttributes:
0472 return None
0473 if self.jobAttributes["jobStatus"] not in ["finished", "failed"]:
0474 return None
0475 return self.jobAttributes["jobStatus"]
0476
0477
0478 def set_groups_to_files(self, id_map):
0479 timeNow = core_utils.naive_utcnow()
0480
0481 revMap = dict()
0482 for gID, items in id_map.items():
0483 for lfn in items["lfns"]:
0484 revMap[lfn] = gID
0485
0486 for fileSpec in self.inFiles.union(self.outFiles):
0487 if fileSpec.lfn in revMap:
0488 fileSpec.groupID = revMap[fileSpec.lfn]
0489 fileSpec.groupStatus = id_map[fileSpec.groupID]["groupStatus"]
0490 fileSpec.groupUpdateTime = timeNow
0491
0492
0493 def update_group_status_in_files(self, group_id, group_status):
0494 timeNow = core_utils.naive_utcnow()
0495
0496 for fileSpec in self.inFiles.union(self.outFiles):
0497 if fileSpec.groupID == group_id:
0498 fileSpec.groupStatus = group_status
0499 fileSpec.groupUpdateTime = timeNow
0500
0501
0502 def get_groups_of_input_files(self, skip_ready=False):
0503 groups = dict()
0504 for fileSpec in self.inFiles:
0505 if skip_ready and fileSpec.status == "ready":
0506 continue
0507 groups[fileSpec.groupID] = {"groupUpdateTime": fileSpec.groupUpdateTime, "groupStatus": fileSpec.groupStatus}
0508 return groups
0509
0510
0511 def get_groups_of_output_files(self):
0512 groups = dict()
0513 for fileSpec in self.outFiles:
0514 groups[fileSpec.groupID] = {"groupUpdateTime": fileSpec.groupUpdateTime, "groupStatus": fileSpec.groupStatus}
0515 return groups
0516
0517
0518 def get_output_file_specs(self, skip_done=False):
0519 if not skip_done:
0520 return self.outFiles
0521 else:
0522 retList = []
0523 for fileSpec in self.outFiles:
0524 if fileSpec.status not in ["finished", "failed"]:
0525 retList.append(fileSpec)
0526 return retList
0527
0528
0529 def get_input_file_specs(self, group_id, skip_ready=False):
0530 retList = []
0531 for fileSpec in self.inFiles:
0532 if fileSpec.groupID == group_id:
0533 if skip_ready and fileSpec.status in ["ready", "failed"]:
0534 continue
0535 retList.append(fileSpec)
0536 return retList
0537
0538
0539 def set_pilot_error(self, error_code, error_dialog):
0540 if not self.has_attribute("pilotErrorCode"):
0541 self.set_one_attribute("pilotErrorCode", error_code)
0542 if not self.has_attribute("pilotErrorDiag"):
0543 self.set_one_attribute("pilotErrorDiag", error_dialog)
0544
0545
0546 def not_suppress_heartbeat(self):
0547 if self.subStatus in ["missed"]:
0548 return True
0549 return False
0550
0551
0552 def set_pilot_closed(self):
0553 self.pilotClosed = 1
0554
0555
0556 def is_pilot_closed(self):
0557 return self.pilotClosed == 1
0558
0559
0560 def get_job_params(self, strip):
0561 if not strip:
0562 return self.jobParams
0563 else:
0564 newParams = dict()
0565 for k, v in self.jobParams.items():
0566 if k in ["prodDBlocks", "realDatasetsIn", "dispatchDblock", "ddmEndPointIn", "scopeIn", "dispatchDBlockToken", "prodDBlockToken"]:
0567 continue
0568 newParams[k] = v
0569 return newParams
0570
0571
0572 def get_pilot_type(self):
0573 if "prodSourceLabel" not in self.jobParams:
0574 return None
0575 if self.jobParams["prodSourceLabel"] == "rc_test":
0576 return "RC"
0577 elif self.jobParams["prodSourceLabel"] == "rc_test2":
0578 return "RC"
0579 elif self.jobParams["prodSourceLabel"] == "rc_alrb":
0580 return "ALRB"
0581 elif self.jobParams["prodSourceLabel"] == "ptest":
0582 return "PT"
0583 elif self.jobParams["prodSourceLabel"]:
0584 return "PR"
0585 else:
0586 return None
0587
0588
0589 def manipulate_job_params_for_container(self):
0590 updated = False
0591 for fileSpec in self.inFiles:
0592 for k, v in self.jobParams.items():
0593
0594 if k == "container_name":
0595 if v == fileSpec.url:
0596 self.jobParams[k] = fileSpec.path
0597 updated = True
0598 elif k == "containerOptions":
0599 for kk, vv in v.items():
0600 if kk == "containerImage":
0601 if vv == fileSpec.url:
0602 self.jobParams[k][kk] = fileSpec.path
0603 updated = True
0604 continue
0605
0606 if updated:
0607 self.force_update("jobParams")