Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 """
0002 Job spec class
0003 
0004 """
0005 
0006 import copy
0007 import datetime
0008 import json
0009 
0010 from pandaharvester.harvestercore import core_utils
0011 
0012 from .spec_base import SpecBase
0013 
0014 
0015 class JobSpec(SpecBase):
0016     # has output file
0017     HO_noOutput = 0
0018     HO_hasOutput = 1
0019     HO_hasZipOutput = 2
0020     HO_hasTransfer = 3
0021     HO_hasPostZipOutput = 4
0022 
0023     # auxiliary input
0024     AUX_hasAuxInput = 0
0025     AUX_inTriggered = 1
0026     AUX_allTriggered = 2
0027     AUX_inReady = 3
0028     AUX_allReady = 4
0029 
0030     # attributes
0031     attributesWithTypes = (
0032         "PandaID:integer primary key",
0033         "taskID:integer / index",
0034         "attemptNr:integer",
0035         "status:text",
0036         "subStatus:text / index",
0037         "currentPriority:integer / index",
0038         "computingSite:text / index",
0039         "creationTime:timestamp",
0040         "modificationTime:timestamp / index",
0041         "stateChangeTime:timestamp",
0042         "startTime:timestamp",
0043         "endTime:timestamp",
0044         "nCore:integer",
0045         "jobParams:blob",
0046         "jobAttributes:blob",
0047         "hasOutFile:integer",
0048         "metaData:blob",
0049         "outputFilesToReport:blob",
0050         "lockedBy:text",
0051         "propagatorLock:text",
0052         "propagatorTime:timestamp / index",
0053         "preparatorTime:timestamp / index",
0054         "submitterTime:timestamp",
0055         "stagerLock:text",
0056         "stagerTime:timestamp / index",
0057         "zipPerMB:integer",
0058         "nWorkers:integer",
0059         "nWorkersLimit:integer",
0060         "submissionAttempts:integer",
0061         "jobsetID:integer",
0062         "pilotClosed:integer",
0063         "configID:integer / index",
0064         "nRemainingEvents:integer",
0065         "moreWorkers:integer",
0066         "maxWorkersInTotal:integer",
0067         "nWorkersInTotal:integer",
0068         "jobParamsExtForOutput:blob",
0069         "jobParamsExtForLog:blob",
0070         "auxInput:integer",
0071         "resourceType:text",
0072     )
0073 
0074     # attributes initialized with 0
0075     zeroAttrs = ("nWorkers", "submissionAttempts", "nWorkersInTotal")
0076 
0077     # attributes to skip when slim reading
0078     skipAttrsToSlim = "jobParams"
0079 
0080     # constructor
0081     def __init__(self):
0082         SpecBase.__init__(self)
0083         object.__setattr__(self, "events", set())
0084         object.__setattr__(self, "zipEventMap", {})
0085         object.__setattr__(self, "inFiles", set())
0086         object.__setattr__(self, "outFiles", set())
0087         object.__setattr__(self, "zipFileMap", {})
0088         object.__setattr__(self, "workspec_list", [])
0089 
0090     # add file
0091     def add_file(self, filespec):
0092         if filespec.fileType == "input":
0093             self.add_in_file(filespec)
0094         else:
0095             self.add_out_file(filespec)
0096 
0097     # add input file
0098     def add_in_file(self, filespec):
0099         self.inFiles.add(filespec)
0100 
0101     # add output file
0102     def add_out_file(self, filespec):
0103         self.outFiles.add(filespec)
0104 
0105     # reset output file list
0106     def reset_out_file(self):
0107         self.outFiles.clear()
0108 
0109     # get files to delete
0110     def get_files_to_delete(self):
0111         files = []
0112         for fileSpec in self.inFiles.union(self.outFiles):
0113             if fileSpec.todelete == 1:
0114                 files.append(fileSpec)
0115         return files
0116 
0117     # add event
0118     def add_event(self, event_spec, zip_filespec):
0119         if zip_filespec is None:
0120             zipFileID = None
0121         else:
0122             zipFileID = zip_filespec.fileID
0123         if zipFileID not in self.zipEventMap:
0124             self.zipEventMap[zipFileID] = {"events": set(), "zip": zip_filespec}
0125         self.zipEventMap[zipFileID]["events"].add(event_spec)
0126         self.events.add(event_spec)
0127 
0128     # convert from Job JSON
0129     def convert_job_json(self, data):
0130         # decode secrets
0131         try:
0132             if "secrets" in data:
0133                 data["secrets"] = json.loads(data["secrets"])
0134         except Exception:
0135             pass
0136         self.PandaID = data["PandaID"]
0137         if data["taskID"] == "NULL":
0138             self.taskID = None
0139         else:
0140             self.taskID = data["taskID"]
0141         self.attemptNr = data["attemptNr"]
0142         if data["jobsetID"] == "NULL":
0143             self.jobsetID = None
0144         else:
0145             self.jobsetID = data["jobsetID"]
0146         self.currentPriority = data["currentPriority"]
0147         self.nCore = data.get("coreCount")
0148         self.resourceType = data.get("resource_type")
0149         self.jobParams = data
0150         self.jobParamsExtForOutput = self.get_output_file_attributes()
0151         self.jobParamsExtForLog = self.get_logfile_info()
0152         if "zipPerMB" in data:
0153             self.zipPerMB = data["zipPerMB"]
0154 
0155     # trigger propagation
0156     def trigger_propagation(self):
0157         self.propagatorTime = core_utils.naive_utcnow() - datetime.timedelta(hours=1)
0158 
0159     # trigger preparation
0160     def trigger_preparation(self):
0161         self.preparatorTime = core_utils.naive_utcnow() - datetime.timedelta(hours=1)
0162 
0163     # trigger stage out
0164     def trigger_stage_out(self):
0165         self.stagerTime = core_utils.naive_utcnow() - datetime.timedelta(hours=1)
0166 
0167     # set attributes
0168     def set_attributes(self, attrs):
0169         if attrs is None:
0170             return
0171         attrs = copy.copy(attrs)
0172         # set work attribute
0173         for attName in ["pilotErrorCode", "pilotErrorDiag", "exeErrorCode", "exeErrorDiag"]:
0174             if attName in attrs:
0175                 if self.PandaID not in attrs:
0176                     attrs[self.PandaID] = dict()
0177                 if attName not in attrs[self.PandaID]:
0178                     attrs[self.PandaID][attName] = attrs[attName]
0179         if self.PandaID not in attrs:
0180             return
0181         attrs = copy.copy(attrs[self.PandaID])
0182         # set metadata and outputs to dedicated attributes
0183         if "metaData" in attrs:
0184             self.metaData = attrs["metaData"]
0185             del attrs["metaData"]
0186         if "xml" in attrs:
0187             self.outputFilesToReport = attrs["xml"]
0188             del attrs["xml"]
0189         if self.jobAttributes is None:
0190             self.jobAttributes = attrs
0191         else:
0192             for key, val in attrs.items():
0193                 if key not in self.jobAttributes or self.jobAttributes[key] != val:
0194                     self.jobAttributes[key] = val
0195                     self.force_update("jobAttributes")
0196 
0197     # set one attribute
0198     def set_one_attribute(self, attr, value):
0199         if self.jobAttributes is None:
0200             self.jobAttributes = dict()
0201         if attr not in self.jobAttributes or self.jobAttributes[attr] != value:
0202             self.jobAttributes[attr] = value
0203             self.force_update("jobAttributes")
0204 
0205     # check if an attribute is there
0206     def has_attribute(self, attr):
0207         if self.jobAttributes is None:
0208             return False
0209         return attr in self.jobAttributes
0210 
0211     # get an attribute
0212     def get_one_attribute(self, attr):
0213         if self.jobAttributes and attr in self.jobAttributes:
0214             return self.jobAttributes[attr]
0215         return None
0216 
0217     # check if final status
0218     def is_final_status(self, job_status=None):
0219         if job_status is None:
0220             job_status = self.status
0221         return job_status in ["finished", "failed", "cancelled", "missed"]
0222 
0223     # get status
0224     def get_status(self):
0225         # don't report the final status while staging-out
0226         if self.is_final_status() and self.subStatus not in ["killed"] and (self.subStatus in ["to_transfer", "transferring"] or not self.all_events_done()):
0227             return "transferring"
0228         return self.status
0229 
0230     # check if all events are done
0231     def all_events_done(self):
0232         retVal = True
0233         for eventSpec in self.events:
0234             if eventSpec.subStatus != "done":
0235                 retVal = False
0236                 break
0237         return retVal
0238 
0239     # all files are triggered to stage-out
0240     def all_files_triggered_to_stage_out(self):
0241         for fileSpec in self.outFiles:
0242             if fileSpec.status not in ["finished", "failed"]:
0243                 fileSpec.status = "transferring"
0244                 fileSpec.attemptNr = 0
0245 
0246     # all files are zipped
0247     def all_files_zipped(self, use_post_zipping=False):
0248         for fileSpec in self.outFiles:
0249             if fileSpec.status not in ["finished", "failed"]:
0250                 fileSpec.attemptNr = 0
0251                 if use_post_zipping:
0252                     fileSpec.status = "post_zipping"
0253                 else:
0254                     fileSpec.status = "defined"
0255                     fileSpec.groupID = None
0256                     fileSpec.groupStatus = None
0257                     fileSpec.groupUpdateTime = None
0258 
0259     # convert to event data
0260     def to_event_data(self, max_events=None):
0261         data = []
0262         eventSpecs = []
0263         iEvents = 0
0264         for zipFileID, eventsData in self.zipEventMap.items():
0265             if max_events is not None and iEvents > max_events:
0266                 break
0267             eventRanges = []
0268             for eventSpec in eventsData["events"]:
0269                 eventRanges.append(eventSpec.to_data())
0270                 eventSpecs.append(eventSpec)
0271                 iEvents += 1
0272             tmpData = {}
0273             tmpData["eventRanges"] = eventRanges
0274             if "sourceURL" in self.jobParams:
0275                 tmpData["sourceURL"] = self.jobParams["sourceURL"]
0276             if zipFileID is not None:
0277                 zipFileSpec = eventsData["zip"]
0278                 if zipFileSpec.status == "finished":
0279                     objstoreID = f"{zipFileSpec.objstoreID}"
0280                     if zipFileSpec.pathConvention is not None:
0281                         objstoreID += f"/{zipFileSpec.pathConvention}"
0282                     tmpData["zipFile"] = {"lfn": zipFileSpec.lfn, "objstoreID": objstoreID}
0283                     if zipFileSpec.fsize not in [None, 0]:
0284                         tmpData["zipFile"]["fsize"] = zipFileSpec.fsize
0285                     if zipFileSpec.chksum is not None:
0286                         if zipFileSpec.chksum.startswith("md:"):
0287                             tmpData["zipFile"]["md5"] = zipFileSpec.chksum.split(":")[-1]
0288                         elif zipFileSpec.chksum.startswith("ad:"):
0289                             tmpData["zipFile"]["adler32"] = zipFileSpec.chksum.split(":")[-1]
0290                         else:
0291                             tmpData["zipFile"]["adler32"] = zipFileSpec.chksum
0292             data.append(tmpData)
0293         return data, eventSpecs
0294 
0295     # get input file attributes
0296     def get_input_file_attributes(self, skip_ready=False):
0297         lfnToSkip = set()
0298         attemptNrMap = dict()
0299         pathMap = dict()
0300         for fileSpec in self.inFiles:
0301             if skip_ready and fileSpec.status == "ready":
0302                 lfnToSkip.add(fileSpec.lfn)
0303             attemptNrMap[fileSpec.lfn] = fileSpec.attemptNr
0304             pathMap[fileSpec.lfn] = fileSpec.path
0305         inFiles = {}
0306         lfns = self.jobParams["inFiles"].split(",")
0307         guids = self.jobParams["GUID"].split(",")
0308         fsizes = self.jobParams["fsize"].split(",")
0309         chksums = self.jobParams["checksum"].split(",")
0310         scopes = self.jobParams["scopeIn"].split(",")
0311         datasets = self.jobParams["realDatasetsIn"].split(",")
0312         endpoints = self.jobParams["ddmEndPointIn"].split(",")
0313         for lfn, guid, fsize, chksum, scope, dataset, endpoint in zip(lfns, guids, fsizes, chksums, scopes, datasets, endpoints):
0314             try:
0315                 fsize = int(fsize)
0316             except Exception:
0317                 fsize = None
0318             if lfn in lfnToSkip:
0319                 continue
0320             if lfn in attemptNrMap:
0321                 attemptNr = attemptNrMap[lfn]
0322             else:
0323                 attemptNr = 0
0324             inFiles[lfn] = {"fsize": fsize, "guid": guid, "checksum": chksum, "scope": scope, "dataset": dataset, "endpoint": endpoint, "attemptNr": attemptNr}
0325         # add path
0326         if "inFilePaths" in self.jobParams:
0327             for lfn in lfns:
0328                 if lfn not in inFiles or lfn not in pathMap:
0329                     continue
0330                 inFiles[lfn]["path"] = pathMap[lfn]
0331         # delete empty file
0332         if "" in inFiles:
0333             del inFiles[""]
0334         if "NULL" in inFiles:
0335             del inFiles["NULL"]
0336         return inFiles
0337 
0338     # set input file paths
0339     def set_input_file_paths(self, in_files):
0340         lfns = self.get_input_file_attributes().keys()
0341         paths = []
0342         for lfn in lfns:
0343             # check for consistency
0344             if lfn in in_files:
0345                 paths.append(in_files[lfn]["path"])
0346         self.jobParams["inFilePaths"] = ",".join(paths)
0347         # trigger updating
0348         self.force_update("jobParams")
0349         # update file specs
0350         for fileSpec in self.inFiles:
0351             if fileSpec.lfn in in_files:
0352                 fileSpec.path = in_files[fileSpec.lfn]["path"]
0353 
0354     # set ready to all input files
0355     def set_all_input_ready(self):
0356         # update file specs
0357         for fileSpec in self.inFiles:
0358             fileSpec.status = "ready"
0359 
0360     # get output file attributes
0361     def get_output_file_attributes(self):
0362         if self.jobParamsExtForOutput is not None:
0363             return self.jobParamsExtForOutput
0364         outFiles = {}
0365         lfns = self.jobParams["outFiles"].split(",")
0366         scopes = self.jobParams["scopeOut"].split(",")
0367         scopeLog = self.jobParams["scopeLog"]
0368         logLFN = self.jobParams["logFile"]
0369         if scopeLog and logLFN:
0370             scopes.insert(lfns.index(logLFN), scopeLog)
0371         datasets = self.jobParams["realDatasets"].split(",")
0372         endpoints = self.jobParams["ddmEndPointOut"].split(",")
0373         for lfn, scope, dataset, endpoint in zip(lfns, scopes, datasets, endpoints):
0374             outFiles[lfn] = {"scope": scope, "dataset": dataset, "endpoint": endpoint}
0375         self.jobParamsExtForOutput = outFiles
0376         return outFiles
0377 
0378     # get log file information
0379     def get_logfile_info(self):
0380         if self.jobParamsExtForLog is not None:
0381             return self.jobParamsExtForLog
0382         retMap = dict()
0383         retMap["lfn"] = self.jobParams["logFile"]
0384         retMap["guid"] = self.jobParams["logGUID"]
0385         self.jobParamsExtForLog = retMap
0386         return retMap
0387 
0388     # set start time
0389     def set_start_time(self, force=False):
0390         if self.startTime is None or force is True:
0391             self.startTime = core_utils.naive_utcnow()
0392 
0393     # set end time
0394     def set_end_time(self, force=False):
0395         if self.endTime is None or force is True:
0396             self.endTime = core_utils.naive_utcnow()
0397 
0398     # reset start and end time
0399     def reset_start_end_time(self):
0400         self.startTime = core_utils.naive_utcnow()
0401         self.endTime = self.startTime
0402 
0403     # add work spec list
0404     def add_workspec_list(self, workspec_list):
0405         self.workspec_list = workspec_list
0406 
0407     # get work spec list
0408     def get_workspec_list(self):
0409         return self.workspec_list
0410 
0411     # get job attributes to be reported to Panda
0412     def get_job_attributes_for_panda(self):
0413         data = dict()
0414         if self.jobAttributes is None:
0415             return data
0416         # extract only panda attributes
0417         # FIXME use set literal for python >=2.7
0418         panda_attributes = {
0419             "token": "token",
0420             "transExitCode": "trans_exit_code",
0421             "pilotErrorCode": "pilot_error_code",
0422             "pilotErrorDiag": "pilot_error_diag",
0423             "node": "node",
0424             "cpuConsumptionTime": "cpu_consumption_time",
0425             "cpuConsumptionUnit": "cpu_consumption_unit",
0426             "schedulerID": "scheduler_id",
0427             "pilotID": "pilot_id",
0428             "siteName": "site_name",
0429             "pilotLog": "pilot_log",
0430             "cpuConversionFactor": "cpu_conversion_factor",
0431             "exeErrorCode": "exe_error_code",
0432             "exeErrorDiag": "exe_error_diag",
0433             "pilotTiming": "pilot_timing",
0434             "startTime": "start_time",
0435             "endTime": "end_time",
0436             "nEvents": "n_events",
0437             "nInputFiles": "n_input_files",
0438             "batchID": "batch_id",
0439             "attemptNr": "attempt_nr",
0440             "jobMetrics": "job_metrics",
0441             "stdout": "stdout",
0442             "coreCount": "core_count",
0443             "maxRSS": "max_rss",
0444             "maxVMEM": "max_vmem",
0445             "maxSWAP": "max_swap",
0446             "maxPSS": "max_pss",
0447             "avgRSS": "avg_rss",
0448             "avgVMEM": "avg_vmem",
0449             "avgSWAP": "avg_swap",
0450             "avgPSS": "avg_pss",
0451             "totRCHAR": "tot_rchar",
0452             "totWCHAR": "tot_wchar",
0453             "totRBYTES": "tot_rbytes",
0454             "totWBYTES": "tot_wbytes",
0455             "rateRCHAR": "rate_rchar",
0456             "rateWCHAR": "rate_wchar",
0457             "rateRBYTES": "rate_rbytes",
0458             "rateWBYTES": "rate_wbytes",
0459         }
0460 
0461         # filter job attributes expected in a job update
0462         for job_attribute_name, job_attribute_value in self.jobAttributes.items():
0463             if job_attribute_name in panda_attributes:
0464                 clean_attribute_name = panda_attributes[job_attribute_name]
0465                 data[clean_attribute_name] = job_attribute_value
0466 
0467         return data
0468 
0469     # get job status from attributes
0470     def get_job_status_from_attributes(self):
0471         if self.jobAttributes is None or "jobStatus" not in self.jobAttributes:
0472             return None
0473         if self.jobAttributes["jobStatus"] not in ["finished", "failed"]:
0474             return None
0475         return self.jobAttributes["jobStatus"]
0476 
0477     # set group to files
0478     def set_groups_to_files(self, id_map):
0479         timeNow = core_utils.naive_utcnow()
0480         # reverse mapping
0481         revMap = dict()
0482         for gID, items in id_map.items():
0483             for lfn in items["lfns"]:
0484                 revMap[lfn] = gID
0485         # update file specs
0486         for fileSpec in self.inFiles.union(self.outFiles):
0487             if fileSpec.lfn in revMap:
0488                 fileSpec.groupID = revMap[fileSpec.lfn]
0489                 fileSpec.groupStatus = id_map[fileSpec.groupID]["groupStatus"]
0490                 fileSpec.groupUpdateTime = timeNow
0491 
0492     # update group status in files
0493     def update_group_status_in_files(self, group_id, group_status):
0494         timeNow = core_utils.naive_utcnow()
0495         # update file specs
0496         for fileSpec in self.inFiles.union(self.outFiles):
0497             if fileSpec.groupID == group_id:
0498                 fileSpec.groupStatus = group_status
0499                 fileSpec.groupUpdateTime = timeNow
0500 
0501     # get groups of input files
0502     def get_groups_of_input_files(self, skip_ready=False):
0503         groups = dict()
0504         for fileSpec in self.inFiles:
0505             if skip_ready and fileSpec.status == "ready":
0506                 continue
0507             groups[fileSpec.groupID] = {"groupUpdateTime": fileSpec.groupUpdateTime, "groupStatus": fileSpec.groupStatus}
0508         return groups
0509 
0510     # get groups of output files
0511     def get_groups_of_output_files(self):
0512         groups = dict()
0513         for fileSpec in self.outFiles:
0514             groups[fileSpec.groupID] = {"groupUpdateTime": fileSpec.groupUpdateTime, "groupStatus": fileSpec.groupStatus}
0515         return groups
0516 
0517     # get output file specs
0518     def get_output_file_specs(self, skip_done=False):
0519         if not skip_done:
0520             return self.outFiles
0521         else:
0522             retList = []
0523             for fileSpec in self.outFiles:
0524                 if fileSpec.status not in ["finished", "failed"]:
0525                     retList.append(fileSpec)
0526             return retList
0527 
0528     # get input file specs for a given group id
0529     def get_input_file_specs(self, group_id, skip_ready=False):
0530         retList = []
0531         for fileSpec in self.inFiles:
0532             if fileSpec.groupID == group_id:
0533                 if skip_ready and fileSpec.status in ["ready", "failed"]:
0534                     continue
0535                 retList.append(fileSpec)
0536         return retList
0537 
0538     # set pilot error
0539     def set_pilot_error(self, error_code, error_dialog):
0540         if not self.has_attribute("pilotErrorCode"):
0541             self.set_one_attribute("pilotErrorCode", error_code)
0542         if not self.has_attribute("pilotErrorDiag"):
0543             self.set_one_attribute("pilotErrorDiag", error_dialog)
0544 
0545     # not to suppress heartbeat
0546     def not_suppress_heartbeat(self):
0547         if self.subStatus in ["missed"]:
0548             return True
0549         return False
0550 
0551     # set pilot_closed
0552     def set_pilot_closed(self):
0553         self.pilotClosed = 1
0554 
0555     # check if pilot_closed
0556     def is_pilot_closed(self):
0557         return self.pilotClosed == 1
0558 
0559     # get job parameters
0560     def get_job_params(self, strip):
0561         if not strip:
0562             return self.jobParams
0563         else:
0564             newParams = dict()
0565             for k, v in self.jobParams.items():
0566                 if k in ["prodDBlocks", "realDatasetsIn", "dispatchDblock", "ddmEndPointIn", "scopeIn", "dispatchDBlockToken", "prodDBlockToken"]:
0567                     continue
0568                 newParams[k] = v
0569             return newParams
0570 
0571     # get pilot type
0572     def get_pilot_type(self):
0573         if "prodSourceLabel" not in self.jobParams:
0574             return None
0575         if self.jobParams["prodSourceLabel"] == "rc_test":
0576             return "RC"
0577         elif self.jobParams["prodSourceLabel"] == "rc_test2":
0578             return "RC"
0579         elif self.jobParams["prodSourceLabel"] == "rc_alrb":
0580             return "ALRB"
0581         elif self.jobParams["prodSourceLabel"] == "ptest":
0582             return "PT"
0583         elif self.jobParams["prodSourceLabel"]:
0584             return "PR"
0585         else:
0586             return None
0587 
0588     # manipulate job parameters related to container
0589     def manipulate_job_params_for_container(self):
0590         updated = False
0591         for fileSpec in self.inFiles:
0592             for k, v in self.jobParams.items():
0593                 # only container image
0594                 if k == "container_name":
0595                     if v == fileSpec.url:
0596                         self.jobParams[k] = fileSpec.path
0597                         updated = True
0598                 elif k == "containerOptions":
0599                     for kk, vv in v.items():
0600                         if kk == "containerImage":
0601                             if vv == fileSpec.url:
0602                                 self.jobParams[k][kk] = fileSpec.path
0603                                 updated = True
0604                     continue
0605         # trigger updating
0606         if updated:
0607             self.force_update("jobParams")