File indexing completed on 2026-04-20 07:58:58
0001 """
0002 Work spec class
0003
0004 """
0005
0006 import datetime
0007 import os
0008 import re
0009
0010 from pandaharvester.harvesterconfig import harvester_config
0011 from pandaharvester.harvestercore import core_utils
0012
0013 from .spec_base import SpecBase
0014
0015
0016
0017 class WorkSpec(SpecBase):
0018
0019 ST_submitted = "submitted"
0020 ST_running = "running"
0021 ST_finished = "finished"
0022 ST_failed = "failed"
0023 ST_ready = "ready"
0024 ST_cancelled = "cancelled"
0025 ST_idle = "idle"
0026 ST_missed = "missed"
0027 ST_pending = "pending"
0028
0029
0030 ST_LIST = [ST_submitted, ST_running, ST_finished, ST_failed, ST_ready, ST_cancelled, ST_idle, ST_missed]
0031
0032
0033 MT_NoJob = "NoJob"
0034 MT_OneToOne = "OneToOne"
0035 MT_MultiJobs = "ManyToOne"
0036 MT_MultiWorkers = "OneToMany"
0037
0038
0039 EV_noEvents = 0
0040 EV_useEvents = 1
0041 EV_requestEvents = 2
0042
0043
0044 attributesWithTypes = (
0045 "workerID:integer primary key",
0046 "batchID:text",
0047 "mapType:text",
0048 "queueName:text",
0049 "status:text / index",
0050 "hasJob:integer",
0051 "workParams:blob",
0052 "workAttributes:blob",
0053 "eventsRequestParams:blob",
0054 "eventsRequest:integer / index",
0055 "computingSite:text / index",
0056 "creationTime:timestamp",
0057 "submitTime:timestamp / index",
0058 "startTime:timestamp",
0059 "endTime:timestamp",
0060 "nCore:integer",
0061 "walltime:timestamp",
0062 "accessPoint:text",
0063 "modificationTime:timestamp / index",
0064 "lastUpdate:timestamp / index",
0065 "eventFeedTime:timestamp / index",
0066 "lockedBy:text",
0067 "postProcessed:integer",
0068 "nodeID:text",
0069 "minRamCount:integer",
0070 "maxDiskCount:integer",
0071 "maxWalltime:integer",
0072 "killTime:timestamp / index",
0073 "computingElement:text",
0074 "nJobsToReFill:integer / index",
0075 "logFilesToUpload:blob",
0076 "jobType:text",
0077 "resourceType:text",
0078 "nativeExitCode:integer",
0079 "nativeStatus:text",
0080 "diagMessage:varchar(500)",
0081 "nJobs:integer",
0082 "submissionHost:text",
0083 "configID:integer / index",
0084 "syncLevel:integer",
0085 "checkTime:timestamp",
0086 "ioIntensity:integer",
0087 "harvesterHost:text",
0088 "pilotType:text",
0089 "eventFeedLock:text",
0090 "errorCode:integer",
0091 "errorDiag:text",
0092 )
0093
0094
0095 skipAttrsToSlim = ("workParams", "workAttributes")
0096
0097
0098 def __init__(self):
0099 SpecBase.__init__(self)
0100 object.__setattr__(self, "isNew", False)
0101 object.__setattr__(self, "nextLookup", False)
0102 object.__setattr__(self, "jobspec_list", None)
0103 object.__setattr__(self, "pandaid_list", None)
0104 object.__setattr__(self, "new_status", False)
0105 object.__setattr__(self, "pilot_closed", False)
0106
0107
0108 def __getstate__(self):
0109 odict = SpecBase.__getstate__(self)
0110 del odict["isNew"]
0111 del odict["new_status"]
0112 return odict
0113
0114
0115 def set_status(self, value):
0116
0117 if value == self.ST_submitted and self.status in [self.ST_running, self.ST_idle]:
0118 return
0119 if self.status != value:
0120 self.trigger_propagation()
0121 self.new_status = True
0122 self.status = value
0123 if self.status == self.ST_running:
0124 self.set_start_time()
0125 elif self.is_final_status():
0126 self.set_end_time()
0127
0128
0129 def get_access_point(self):
0130
0131 if "$" in self.accessPoint:
0132 patts = re.findall("\$\{([a-zA-Z\d_.]+)\}", self.accessPoint)
0133 for patt in patts:
0134 tmpKey = "${" + patt + "}"
0135 tmpVar = None
0136 if hasattr(self, patt):
0137 tmpVar = str(getattr(self, patt))
0138 elif patt == "harvesterID":
0139 tmpVar = harvester_config.master.harvester_id
0140 else:
0141 _match = re.search("^_workerID_((?:\d+.)*\d)$", patt)
0142 if _match:
0143 workerID_str = str(self.workerID)
0144 digit_list = _match.group(1).split(".")
0145 string_list = []
0146 for _d in digit_list:
0147 digit = int(_d)
0148 try:
0149 _n = workerID_str[(-1 - digit)]
0150 except IndexError:
0151 string_list.append("0")
0152 else:
0153 string_list.append(_n)
0154 tmpVar = "".join(string_list)
0155 if tmpVar is not None:
0156 self.accessPoint = self.accessPoint.replace(tmpKey, tmpVar)
0157 return self.accessPoint
0158
0159
0160 def set_jobspec_list(self, jobspec_list):
0161 self.jobspec_list = jobspec_list
0162
0163
0164 def get_jobspec_list(self):
0165 return self.jobspec_list
0166
0167
0168 def set_num_jobs_with_list(self):
0169 if self.jobspec_list is None:
0170 return
0171 if len(self.jobspec_list) == 0:
0172 return
0173 if self.nJobs is None:
0174 self.nJobs = 0
0175 self.nJobs += len(self.jobspec_list)
0176
0177
0178 def convert_to_job_status(self, status=None):
0179 if status is None:
0180 status = self.status
0181 if status in [self.ST_submitted, self.ST_ready]:
0182 jobStatus = "starting"
0183 jobSubStatus = status
0184 elif status in [self.ST_finished, self.ST_failed, self.ST_cancelled]:
0185 jobStatus = status
0186 jobSubStatus = "to_transfer"
0187 elif status in [self.ST_missed]:
0188 jobStatus = "missed"
0189 jobSubStatus = status
0190 else:
0191 jobStatus = "running"
0192 jobSubStatus = status
0193 return jobStatus, jobSubStatus
0194
0195
0196 def is_post_processed(self):
0197 return self.postProcessed == 1
0198
0199
0200 def post_processed(self):
0201 self.postProcessed = 1
0202
0203
0204 def trigger_next_lookup(self):
0205 self.nextLookup = True
0206 self.modificationTime = core_utils.naive_utcnow() - datetime.timedelta(hours=1)
0207
0208
0209 def trigger_propagation(self):
0210 self.lastUpdate = core_utils.naive_utcnow() - datetime.timedelta(hours=24)
0211
0212
0213 def disable_propagation(self):
0214 self.lastUpdate = None
0215 self.force_update("lastUpdate")
0216
0217
0218 def is_final_status(self):
0219 return self.status in [self.ST_finished, self.ST_failed, self.ST_cancelled, self.ST_missed]
0220
0221
0222 def convert_to_propagate(self):
0223 data = dict()
0224 for attr in [
0225 "workerID",
0226 "batchID",
0227 "queueName",
0228 "status",
0229 "computingSite",
0230 "nCore",
0231 "nodeID",
0232 "submitTime",
0233 "startTime",
0234 "endTime",
0235 "jobType",
0236 "resourceType",
0237 "nativeExitCode",
0238 "nativeStatus",
0239 "diagMessage",
0240 "nJobs",
0241 "computingElement",
0242 "syncLevel",
0243 "submissionHost",
0244 "harvesterHost",
0245 "errorCode",
0246 "minRamCount",
0247 ]:
0248 val = getattr(self, attr)
0249 if val is not None:
0250 if isinstance(val, datetime.datetime):
0251 val = "datetime/" + val.strftime("%Y-%m-%d %H:%M:%S.%f")
0252 data[attr] = val
0253 if self.errorCode not in [None, 0] and self.errorDiag not in [None, ""]:
0254 data["diagMessage"] = self.errorDiag
0255 if self.pandaid_list is not None:
0256 data["pandaid_list"] = self.pandaid_list
0257 if self.workAttributes is not None:
0258 for attr in ["stdOut", "stdErr", "batchLog", "jdl"]:
0259 if attr in self.workAttributes:
0260 data[attr] = self.workAttributes[attr]
0261 return data
0262
0263
0264 def set_start_time(self, force=False):
0265 if self.startTime is None or force is True:
0266 self.startTime = core_utils.naive_utcnow()
0267
0268
0269 def set_end_time(self, force=False):
0270 if self.endTime is None or force is True:
0271 self.endTime = core_utils.naive_utcnow()
0272
0273
0274 def set_work_params(self, data):
0275 if data is None:
0276 return
0277 if self.workParams is None and data is not None:
0278 self.workParams = dict()
0279 for key, val in data.items():
0280 if key not in self.workParams or self.workParams[key] != val:
0281 self.workParams[key] = val
0282 self.force_update("workParams")
0283
0284
0285 def get_work_params(self, name):
0286 if self.workParams is None or name not in self.workParams:
0287 return False, None
0288 return True, self.workParams[name]
0289
0290
0291 def has_work_params(self, name):
0292 if self.workParams is None or name not in self.workParams:
0293 return False
0294 return True
0295
0296
0297 def set_work_attributes(self, data):
0298 if data is None:
0299 return
0300 if self.workAttributes is None and data is not None:
0301 self.workAttributes = dict()
0302 for key, val in data.items():
0303 if key not in self.workAttributes or self.workAttributes[key] != val:
0304 self.workAttributes[key] = val
0305 self.force_update("workAttributes")
0306
0307
0308 def get_work_attribute(self, name):
0309 if self.workAttributes is None or name not in self.workAttributes:
0310 return False, None
0311 return True, self.workAttributes[name]
0312
0313
0314 def has_work_attribute(self, name):
0315 if self.workAttributes is None or name not in self.workAttributes:
0316 return False
0317 return True
0318
0319
0320 def update_log_files_to_upload(self, file_path, position, remote_name=None, stream_type=None):
0321 if self.logFilesToUpload is None:
0322 self.logFilesToUpload = dict()
0323 if stream_type is not None:
0324
0325 for tmp_file_path, tmpDict in self.logFilesToUpload.copy().items():
0326 if tmpDict["stream_type"] == stream_type:
0327 del self.logFilesToUpload[tmp_file_path]
0328 if file_path not in self.logFilesToUpload:
0329 self.logFilesToUpload[file_path] = {"position": position, "remote_name": remote_name, "stream_type": stream_type}
0330 self.force_update("logFilesToUpload")
0331 elif self.logFilesToUpload[file_path]["position"] != position:
0332 self.logFilesToUpload[file_path]["position"] = position
0333 self.force_update("logFilesToUpload")
0334
0335
0336 def set_log_file(self, log_type, stream):
0337 if log_type == "stdout":
0338 keyName = "stdOut"
0339 elif log_type == "stderr":
0340 keyName = "stdErr"
0341 elif log_type == "jdl":
0342 keyName = "jdl"
0343 else:
0344 keyName = "batchLog"
0345 if stream.startswith("http"):
0346 url = stream
0347 else:
0348 remoteName = f"{harvester_config.master.harvester_id}__{os.path.basename(stream)}"
0349 url = f"{harvester_config.pandacon.pandaCacheURL_R}/{remoteName}"
0350
0351 self.update_log_files_to_upload(stream, 0, remoteName, keyName)
0352 self.set_work_attributes({keyName: url})
0353
0354
0355 def get_log_files_to_upload(self):
0356 retList = []
0357 if self.logFilesToUpload is not None:
0358 for filePath, fileInfo in self.logFilesToUpload.items():
0359 if not os.path.exists(filePath):
0360 continue
0361 fileSize = os.stat(filePath).st_size
0362 if fileSize <= fileInfo["position"]:
0363 continue
0364 retList.append((filePath, fileInfo["position"], fileSize - fileInfo["position"], fileInfo["remote_name"]))
0365 return retList
0366
0367
0368 def set_dialog_message(self, msg):
0369 if msg not in (None, ""):
0370 msg = msg[:500]
0371 self.diagMessage = msg
0372
0373
0374 def set_pilot_error(self, error_code, error_dialog):
0375 self.set_work_attributes({"pilotErrorCode": error_code, "pilotErrorDiag": error_dialog})
0376
0377
0378 def has_pilot_error(self):
0379 return self.has_work_attribute("pilotErrorCode")
0380
0381
0382 def set_pilot_closed(self):
0383 self.pilot_closed = True
0384
0385
0386 def set_supplemental_error(self, error_code, error_diag):
0387 if error_code is not None:
0388 self.errorCode = error_code
0389 if error_diag not in (None, ""):
0390 self.errorDiag = str(error_diag)[:256]