Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:58

0001 """
0002 Work spec class
0003 
0004 """
0005 
0006 import datetime
0007 import os
0008 import re
0009 
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012 
0013 from .spec_base import SpecBase
0014 
0015 
0016 # work spec
0017 class WorkSpec(SpecBase):
0018     # worker statuses
0019     ST_submitted = "submitted"
0020     ST_running = "running"
0021     ST_finished = "finished"
0022     ST_failed = "failed"
0023     ST_ready = "ready"
0024     ST_cancelled = "cancelled"
0025     ST_idle = "idle"
0026     ST_missed = "missed"
0027     ST_pending = "pending"
0028 
0029     # list of worker statuses
0030     ST_LIST = [ST_submitted, ST_running, ST_finished, ST_failed, ST_ready, ST_cancelled, ST_idle, ST_missed]
0031 
0032     # type of mapping between job and worker
0033     MT_NoJob = "NoJob"
0034     MT_OneToOne = "OneToOne"
0035     MT_MultiJobs = "ManyToOne"
0036     MT_MultiWorkers = "OneToMany"
0037 
0038     # events
0039     EV_noEvents = 0
0040     EV_useEvents = 1
0041     EV_requestEvents = 2
0042 
0043     # attributes
0044     attributesWithTypes = (
0045         "workerID:integer primary key",
0046         "batchID:text",
0047         "mapType:text",
0048         "queueName:text",
0049         "status:text / index",
0050         "hasJob:integer",
0051         "workParams:blob",
0052         "workAttributes:blob",
0053         "eventsRequestParams:blob",
0054         "eventsRequest:integer / index",
0055         "computingSite:text / index",
0056         "creationTime:timestamp",
0057         "submitTime:timestamp / index",
0058         "startTime:timestamp",
0059         "endTime:timestamp",
0060         "nCore:integer",
0061         "walltime:timestamp",
0062         "accessPoint:text",
0063         "modificationTime:timestamp / index",
0064         "lastUpdate:timestamp / index",
0065         "eventFeedTime:timestamp / index",
0066         "lockedBy:text",
0067         "postProcessed:integer",
0068         "nodeID:text",
0069         "minRamCount:integer",
0070         "maxDiskCount:integer",
0071         "maxWalltime:integer",
0072         "killTime:timestamp / index",
0073         "computingElement:text",
0074         "nJobsToReFill:integer / index",
0075         "logFilesToUpload:blob",
0076         "jobType:text",
0077         "resourceType:text",
0078         "nativeExitCode:integer",
0079         "nativeStatus:text",
0080         "diagMessage:varchar(500)",
0081         "nJobs:integer",
0082         "submissionHost:text",
0083         "configID:integer / index",
0084         "syncLevel:integer",
0085         "checkTime:timestamp",
0086         "ioIntensity:integer",
0087         "harvesterHost:text",
0088         "pilotType:text",
0089         "eventFeedLock:text",
0090         "errorCode:integer",
0091         "errorDiag:text",
0092     )
0093 
0094     # attributes to skip when slim reading
0095     skipAttrsToSlim = ("workParams", "workAttributes")
0096 
0097     # constructor
0098     def __init__(self):
0099         SpecBase.__init__(self)
0100         object.__setattr__(self, "isNew", False)
0101         object.__setattr__(self, "nextLookup", False)
0102         object.__setattr__(self, "jobspec_list", None)
0103         object.__setattr__(self, "pandaid_list", None)
0104         object.__setattr__(self, "new_status", False)
0105         object.__setattr__(self, "pilot_closed", False)
0106 
0107     # keep state for pickle
0108     def __getstate__(self):
0109         odict = SpecBase.__getstate__(self)
0110         del odict["isNew"]
0111         del odict["new_status"]
0112         return odict
0113 
0114     # set status
0115     def set_status(self, value):
0116         # prevent reverse transition
0117         if value == self.ST_submitted and self.status in [self.ST_running, self.ST_idle]:
0118             return
0119         if self.status != value:
0120             self.trigger_propagation()
0121             self.new_status = True
0122         self.status = value
0123         if self.status == self.ST_running:
0124             self.set_start_time()
0125         elif self.is_final_status():
0126             self.set_end_time()
0127 
0128     # get access point
0129     def get_access_point(self):
0130         # replace placeholders
0131         if "$" in self.accessPoint:
0132             patts = re.findall("\$\{([a-zA-Z\d_.]+)\}", self.accessPoint)
0133             for patt in patts:
0134                 tmpKey = "${" + patt + "}"
0135                 tmpVar = None
0136                 if hasattr(self, patt):
0137                     tmpVar = str(getattr(self, patt))
0138                 elif patt == "harvesterID":
0139                     tmpVar = harvester_config.master.harvester_id
0140                 else:
0141                     _match = re.search("^_workerID_((?:\d+.)*\d)$", patt)
0142                     if _match:
0143                         workerID_str = str(self.workerID)
0144                         digit_list = _match.group(1).split(".")
0145                         string_list = []
0146                         for _d in digit_list:
0147                             digit = int(_d)
0148                             try:
0149                                 _n = workerID_str[(-1 - digit)]
0150                             except IndexError:
0151                                 string_list.append("0")
0152                             else:
0153                                 string_list.append(_n)
0154                         tmpVar = "".join(string_list)
0155                 if tmpVar is not None:
0156                     self.accessPoint = self.accessPoint.replace(tmpKey, tmpVar)
0157         return self.accessPoint
0158 
0159     # set job spec list
0160     def set_jobspec_list(self, jobspec_list):
0161         self.jobspec_list = jobspec_list
0162 
0163     # get job spec list
0164     def get_jobspec_list(self):
0165         return self.jobspec_list
0166 
0167     # set number of jobs with jobspec list
0168     def set_num_jobs_with_list(self):
0169         if self.jobspec_list is None:
0170             return
0171         if len(self.jobspec_list) == 0:
0172             return
0173         if self.nJobs is None:
0174             self.nJobs = 0
0175         self.nJobs += len(self.jobspec_list)
0176 
0177     # convert worker status to job status
0178     def convert_to_job_status(self, status=None):
0179         if status is None:
0180             status = self.status
0181         if status in [self.ST_submitted, self.ST_ready]:
0182             jobStatus = "starting"
0183             jobSubStatus = status
0184         elif status in [self.ST_finished, self.ST_failed, self.ST_cancelled]:
0185             jobStatus = status
0186             jobSubStatus = "to_transfer"
0187         elif status in [self.ST_missed]:
0188             jobStatus = "missed"
0189             jobSubStatus = status
0190         else:
0191             jobStatus = "running"
0192             jobSubStatus = status
0193         return jobStatus, jobSubStatus
0194 
0195     # check if post processed
0196     def is_post_processed(self):
0197         return self.postProcessed == 1
0198 
0199     # set post processed flag
0200     def post_processed(self):
0201         self.postProcessed = 1
0202 
0203     # trigger next lookup
0204     def trigger_next_lookup(self):
0205         self.nextLookup = True
0206         self.modificationTime = core_utils.naive_utcnow() - datetime.timedelta(hours=1)
0207 
0208     # trigger propagation
0209     def trigger_propagation(self):
0210         self.lastUpdate = core_utils.naive_utcnow() - datetime.timedelta(hours=24)
0211 
0212     # disable propagation
0213     def disable_propagation(self):
0214         self.lastUpdate = None
0215         self.force_update("lastUpdate")
0216 
0217     # final status
0218     def is_final_status(self):
0219         return self.status in [self.ST_finished, self.ST_failed, self.ST_cancelled, self.ST_missed]
0220 
0221     # convert to propagate
0222     def convert_to_propagate(self):
0223         data = dict()
0224         for attr in [
0225             "workerID",
0226             "batchID",
0227             "queueName",
0228             "status",
0229             "computingSite",
0230             "nCore",
0231             "nodeID",
0232             "submitTime",
0233             "startTime",
0234             "endTime",
0235             "jobType",
0236             "resourceType",
0237             "nativeExitCode",
0238             "nativeStatus",
0239             "diagMessage",
0240             "nJobs",
0241             "computingElement",
0242             "syncLevel",
0243             "submissionHost",
0244             "harvesterHost",
0245             "errorCode",
0246             "minRamCount",
0247         ]:
0248             val = getattr(self, attr)
0249             if val is not None:
0250                 if isinstance(val, datetime.datetime):
0251                     val = "datetime/" + val.strftime("%Y-%m-%d %H:%M:%S.%f")
0252                 data[attr] = val
0253         if self.errorCode not in [None, 0] and self.errorDiag not in [None, ""]:
0254             data["diagMessage"] = self.errorDiag
0255         if self.pandaid_list is not None:
0256             data["pandaid_list"] = self.pandaid_list
0257         if self.workAttributes is not None:
0258             for attr in ["stdOut", "stdErr", "batchLog", "jdl"]:
0259                 if attr in self.workAttributes:
0260                     data[attr] = self.workAttributes[attr]
0261         return data
0262 
0263     # set start time
0264     def set_start_time(self, force=False):
0265         if self.startTime is None or force is True:
0266             self.startTime = core_utils.naive_utcnow()
0267 
0268     # set end time
0269     def set_end_time(self, force=False):
0270         if self.endTime is None or force is True:
0271             self.endTime = core_utils.naive_utcnow()
0272 
0273     # set work params
0274     def set_work_params(self, data):
0275         if data is None:
0276             return
0277         if self.workParams is None and data is not None:
0278             self.workParams = dict()
0279         for key, val in data.items():
0280             if key not in self.workParams or self.workParams[key] != val:
0281                 self.workParams[key] = val
0282                 self.force_update("workParams")
0283 
0284     # get work params
0285     def get_work_params(self, name):
0286         if self.workParams is None or name not in self.workParams:
0287             return False, None
0288         return True, self.workParams[name]
0289 
0290     # check if has work params
0291     def has_work_params(self, name):
0292         if self.workParams is None or name not in self.workParams:
0293             return False
0294         return True
0295 
0296     # set work attributes
0297     def set_work_attributes(self, data):
0298         if data is None:
0299             return
0300         if self.workAttributes is None and data is not None:
0301             self.workAttributes = dict()
0302         for key, val in data.items():
0303             if key not in self.workAttributes or self.workAttributes[key] != val:
0304                 self.workAttributes[key] = val
0305                 self.force_update("workAttributes")
0306 
0307     # get work attribute
0308     def get_work_attribute(self, name):
0309         if self.workAttributes is None or name not in self.workAttributes:
0310             return False, None
0311         return True, self.workAttributes[name]
0312 
0313     # check if has work attribute
0314     def has_work_attribute(self, name):
0315         if self.workAttributes is None or name not in self.workAttributes:
0316             return False
0317         return True
0318 
0319     # update log files to upload
0320     def update_log_files_to_upload(self, file_path, position, remote_name=None, stream_type=None):
0321         if self.logFilesToUpload is None:
0322             self.logFilesToUpload = dict()
0323         if stream_type is not None:
0324             # delete existing stream
0325             for tmp_file_path, tmpDict in self.logFilesToUpload.copy().items():
0326                 if tmpDict["stream_type"] == stream_type:
0327                     del self.logFilesToUpload[tmp_file_path]
0328         if file_path not in self.logFilesToUpload:
0329             self.logFilesToUpload[file_path] = {"position": position, "remote_name": remote_name, "stream_type": stream_type}
0330             self.force_update("logFilesToUpload")
0331         elif self.logFilesToUpload[file_path]["position"] != position:
0332             self.logFilesToUpload[file_path]["position"] = position
0333             self.force_update("logFilesToUpload")
0334 
0335     # set log file
0336     def set_log_file(self, log_type, stream):
0337         if log_type == "stdout":
0338             keyName = "stdOut"
0339         elif log_type == "stderr":
0340             keyName = "stdErr"
0341         elif log_type == "jdl":
0342             keyName = "jdl"
0343         else:
0344             keyName = "batchLog"
0345         if stream.startswith("http"):
0346             url = stream
0347         else:
0348             remoteName = f"{harvester_config.master.harvester_id}__{os.path.basename(stream)}"
0349             url = f"{harvester_config.pandacon.pandaCacheURL_R}/{remoteName}"
0350             # set file to periodically upload
0351             self.update_log_files_to_upload(stream, 0, remoteName, keyName)
0352         self.set_work_attributes({keyName: url})
0353 
0354     # get the list of log files to upload
0355     def get_log_files_to_upload(self):
0356         retList = []
0357         if self.logFilesToUpload is not None:
0358             for filePath, fileInfo in self.logFilesToUpload.items():
0359                 if not os.path.exists(filePath):
0360                     continue
0361                 fileSize = os.stat(filePath).st_size
0362                 if fileSize <= fileInfo["position"]:
0363                     continue
0364                 retList.append((filePath, fileInfo["position"], fileSize - fileInfo["position"], fileInfo["remote_name"]))
0365         return retList
0366 
0367     # set dialog message
0368     def set_dialog_message(self, msg):
0369         if msg not in (None, ""):
0370             msg = msg[:500]
0371             self.diagMessage = msg
0372 
0373     # set pilot error
0374     def set_pilot_error(self, error_code, error_dialog):
0375         self.set_work_attributes({"pilotErrorCode": error_code, "pilotErrorDiag": error_dialog})
0376 
0377     # check if has pilot error
0378     def has_pilot_error(self):
0379         return self.has_work_attribute("pilotErrorCode")
0380 
0381     # set pilot_closed
0382     def set_pilot_closed(self):
0383         self.pilot_closed = True
0384 
0385     # set supplemental error
0386     def set_supplemental_error(self, error_code, error_diag):
0387         if error_code is not None:
0388             self.errorCode = error_code
0389         if error_diag not in (None, ""):
0390             self.errorDiag = str(error_diag)[:256]