File indexing completed on 2026-04-20 07:58:59
0001 import json
0002 import time
0003 from concurrent.futures import ThreadPoolExecutor as Pool
0004
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 from pandaharvester.harvestercore.work_spec import WorkSpec
0010 from pandaharvester.harvestercore.worker_errors import WorkerErrors
0011 from pandaharvester.harvestermisc.htcondor_utils import (
0012 CondorJobManage,
0013 CondorJobQuery,
0014 condor_job_id_from_workspec,
0015 get_host_batchid_map,
0016 )
0017 from pandaharvester.harvestermonitor.monitor_common import get_payload_errstr_from_ec
0018
0019
0020 baseLogger = core_utils.setup_logger("htcondor_monitor")
0021
0022
0023
0024 CONDOR_JOB_STATUS_MAP = {
0025 "1": "idle",
0026 "2": "running",
0027 "3": "removed",
0028 "4": "completed",
0029 "5": "held",
0030 "6": "transferring_output",
0031 "7": "suspended",
0032 }
0033
0034
0035
0036 TO_KILL_HOLD_REASONS = [
0037 "Job not found",
0038 "Failed to start GAHP",
0039 ]
0040
0041
0042
0043 PILOT_ERRORS = PilotErrors()
0044
0045
0046
0047 def _check_one_worker(workspec, job_ads_all_dict, cancel_unknown=False, held_timeout=3600, payload_type=None):
0048
0049 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="_check_one_worker")
0050
0051 newStatus = workspec.status
0052 errStr = ""
0053 try:
0054 job_ads_dict = job_ads_all_dict[condor_job_id_from_workspec(workspec)]
0055 except KeyError:
0056 got_job_ads = False
0057 except Exception as e:
0058 got_job_ads = False
0059 tmpLog.error(f"With error {e}")
0060 else:
0061 got_job_ads = True
0062
0063 if got_job_ads:
0064
0065 try:
0066 batchStatus = str(job_ads_dict["JobStatus"])
0067 except KeyError:
0068
0069 workspec.nativeStatus = "unknown"
0070 if cancel_unknown:
0071 newStatus = WorkSpec.ST_cancelled
0072 errStr = f"cannot get JobStatus of job submissionHost={workspec.submissionHost} batchID={workspec.batchID}. Regard the worker as canceled"
0073 tmpLog.error(errStr)
0074 else:
0075 newStatus = None
0076 errStr = f"cannot get JobStatus of job submissionHost={workspec.submissionHost} batchID={workspec.batchID}. Skipped"
0077 tmpLog.warning(errStr)
0078 else:
0079
0080 lastBatchStatus = str(job_ads_dict.get("LastJobStatus", ""))
0081
0082 if (lastBatchStatus in ["3", "4"] and batchStatus not in ["3", "4"]) or (lastBatchStatus in ["4"] and batchStatus in ["3"]):
0083 batchStatus = lastBatchStatus
0084 tmpLog.warning(
0085 "refer to LastJobStatus={0} as new status of job submissionHost={1} batchID={2} to avoid reversal in status (Jobstatus={3})".format(
0086 lastBatchStatus, workspec.submissionHost, workspec.batchID, str(job_ads_dict["JobStatus"])
0087 )
0088 )
0089
0090 workspec.nativeStatus = CONDOR_JOB_STATUS_MAP.get(batchStatus, "unexpected")
0091 if batchStatus in ["2", "6"]:
0092
0093 newStatus = WorkSpec.ST_running
0094 elif batchStatus in ["1", "7"]:
0095
0096 if job_ads_dict.get("JobStartDate"):
0097 newStatus = WorkSpec.ST_idle
0098 else:
0099 newStatus = WorkSpec.ST_submitted
0100 elif batchStatus in ["3"]:
0101
0102 if not errStr:
0103 errStr = f"Condor HoldReason: {job_ads_dict.get('LastHoldReason')} ; Condor RemoveReason: {job_ads_dict.get('RemoveReason')} "
0104 newStatus = WorkSpec.ST_cancelled
0105 elif batchStatus in ["5"]:
0106
0107 hold_reason = job_ads_dict.get("HoldReason")
0108 errStr = f"Condor HoldReason: {hold_reason} "
0109 if hold_reason in TO_KILL_HOLD_REASONS or int(time.time()) - int(job_ads_dict.get("EnteredCurrentStatus", 0)) > held_timeout:
0110
0111 if hold_reason in TO_KILL_HOLD_REASONS:
0112 tmpLog.debug(f"trying to kill job submissionHost={workspec.submissionHost} batchID={workspec.batchID} due to HoldReason: {hold_reason}")
0113 else:
0114 tmpLog.debug(f"trying to kill job submissionHost={workspec.submissionHost} batchID={workspec.batchID} due to held too long")
0115 for submissionHost, batchIDs_dict in get_host_batchid_map([workspec]).items():
0116 batchIDs_list = list(batchIDs_dict.keys())
0117 condor_job_manage = CondorJobManage(id=workspec.submissionHost)
0118 try:
0119 ret_map = condor_job_manage.remove(batchIDs_list)
0120 except Exception as e:
0121 ret_map = {}
0122 ret_err_str = f"failed to kill job. Exception {e.__class__.__name__}: {e}"
0123 tmpLog.error(ret_err_str)
0124 else:
0125 ret = ret_map.get(condor_job_id_from_workspec(workspec))
0126 if ret and ret[0]:
0127 tmpLog.info(f"killed held job submissionHost={workspec.submissionHost} batchID={workspec.batchID}")
0128 else:
0129 tmpLog.error(f"cannot kill held job submissionHost={workspec.submissionHost} batchID={workspec.batchID}")
0130 newStatus = WorkSpec.ST_cancelled
0131 errStr += " ; Worker canceled by harvester due to held too long or not found"
0132
0133 workspec.set_pilot_closed()
0134 tmpLog.debug("Called workspec set_pilot_closed")
0135 else:
0136 if job_ads_dict.get("JobStartDate"):
0137 newStatus = WorkSpec.ST_idle
0138 else:
0139 newStatus = WorkSpec.ST_submitted
0140 elif batchStatus in ["4"]:
0141
0142 try:
0143 payloadExitCode_str = str(job_ads_dict["ExitCode"])
0144 payloadExitCode = int(payloadExitCode_str)
0145 except KeyError:
0146 errStr = f"cannot get ExitCode of job submissionHost={workspec.submissionHost} batchID={workspec.batchID}. Regard the worker as failed"
0147 tmpLog.warning(errStr)
0148 newStatus = WorkSpec.ST_failed
0149 except ValueError:
0150 errStr = "got invalid ExitCode {0} of job submissionHost={1} batchID={2}. Regard the worker as failed".format(
0151 payloadExitCode_str, workspec.submissionHost, workspec.batchID
0152 )
0153 tmpLog.warning(errStr)
0154 newStatus = WorkSpec.ST_failed
0155 else:
0156
0157 workspec.nativeExitCode = payloadExitCode
0158 if payloadExitCode == 0:
0159
0160 newStatus = WorkSpec.ST_finished
0161 else:
0162
0163 newStatus = WorkSpec.ST_failed
0164 errStr = ""
0165 if payload_type:
0166 errStr = get_payload_errstr_from_ec(payload_type, payloadExitCode)
0167 if not errStr:
0168 errStr = f"Payload execution error: returned non-zero {payloadExitCode}"
0169 tmpLog.debug(errStr)
0170
0171 reduced_exit_code = payloadExitCode // 256 if (payloadExitCode % 256 == 0) else payloadExitCode
0172 pilot_error_code, pilot_error_diag = PILOT_ERRORS.convertToPilotErrors(reduced_exit_code)
0173 if pilot_error_code is not None:
0174 workspec.set_pilot_error(pilot_error_code, pilot_error_diag)
0175 tmpLog.info(f"Payload return code = {payloadExitCode}")
0176 else:
0177 errStr = "cannot get reasonable JobStatus of job submissionHost={0} batchID={1}. Regard the worker as failed by default".format(
0178 workspec.submissionHost, workspec.batchID
0179 )
0180 tmpLog.error(errStr)
0181 newStatus = WorkSpec.ST_failed
0182 tmpLog.info(f"submissionHost={workspec.submissionHost} batchID={workspec.batchID} : batchStatus {batchStatus} -> workerStatus {newStatus}")
0183 else:
0184
0185 workspec.nativeStatus = "unknown"
0186 if cancel_unknown:
0187 errStr = f"condor job submissionHost={workspec.submissionHost} batchID={workspec.batchID} not found. Regard the worker as canceled by default"
0188 tmpLog.error(errStr)
0189 newStatus = WorkSpec.ST_cancelled
0190 tmpLog.info(f"submissionHost={workspec.submissionHost} batchID={workspec.batchID} : batchStatus 3 -> workerStatus {newStatus}")
0191 else:
0192 errStr = f"condor job submissionHost={workspec.submissionHost} batchID={workspec.batchID} not found. Skipped"
0193 tmpLog.warning(errStr)
0194 newStatus = None
0195
0196 error_code = None
0197 if errStr:
0198
0199 error_code = WorkerErrors.htcondor_message_pattern_handler.get_error_code(errStr)
0200 if error_code is None:
0201
0202 error_code = WorkerErrors.error_codes.get("GENERAL_ERROR")
0203 tmpLog.debug(f"errorCode={error_code} errorDiag={errStr}")
0204 else:
0205
0206 error_code = WorkerErrors.error_codes.get("SUCCEEDED")
0207 workspec.set_supplemental_error(error_code=error_code, error_diag=errStr)
0208
0209 return (newStatus, errStr)
0210
0211
0212
0213 class HTCondorMonitor(PluginBase):
0214
0215 def __init__(self, **kwarg):
0216 PluginBase.__init__(self, **kwarg)
0217 extra_plugin_configs = {}
0218 try:
0219 extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorMonitor"]
0220 except AttributeError:
0221 pass
0222 except KeyError:
0223 pass
0224
0225 self.nProcesses = getattr(self, "nProcesses", 4)
0226
0227 self.cancelUnknown = getattr(self, "cancelUnknown", False)
0228 self.cancelUnknown = bool(self.cancelUnknown)
0229
0230 self.heldTimeout = getattr(self, "heldTimeout", 3600)
0231
0232 self.cacheEnable = False
0233 try:
0234 self.cacheEnable = harvester_config.monitor.pluginCacheEnable
0235 except AttributeError:
0236 pass
0237
0238 self.cacheRefreshInterval = 240
0239 try:
0240 self.cacheRefreshInterval = harvester_config.monitor.pluginCacheRefreshInterval
0241 except AttributeError:
0242 self.cacheRefreshInterval = harvester_config.monitor.checkInterval
0243
0244 self.useCondorHistory = getattr(self, "useCondorHistory", True)
0245 if extra_plugin_configs.get("use_condor_history") is False:
0246 self.useCondorHistory = False
0247
0248 self.useCondorHistoryMaxAge = getattr(self, "useCondorHistoryMaxAge", 7200)
0249
0250 self.submissionHost_list = getattr(self, "submissionHost_list", [])
0251
0252 self.condorHostConfig_list = getattr(self, "condorHostConfig_list", [])
0253
0254 self.payloadType = getattr(self, "payloadType", None)
0255
0256
0257 def check_workers(self, workspec_list):
0258
0259 tmpLog = self.make_logger(baseLogger, "batch job query", method_name="check_workers")
0260 tmpLog.debug("start")
0261
0262 job_ads_all_dict = {}
0263 for submissionHost, batchIDs_dict in get_host_batchid_map(workspec_list).items():
0264
0265 try:
0266 job_query = CondorJobQuery(
0267 cacheEnable=self.cacheEnable,
0268 cacheRefreshInterval=self.cacheRefreshInterval,
0269 useCondorHistory=self.useCondorHistory,
0270 id=submissionHost,
0271 useCondorHistoryMaxAge=self.useCondorHistoryMaxAge,
0272 )
0273 host_job_ads_dict = job_query.get_all(batchIDs_dict=batchIDs_dict)
0274 except Exception as e:
0275 host_job_ads_dict = {}
0276 ret_err_str = f"Exception {e.__class__.__name__}: {e}"
0277 tmpLog.error(ret_err_str)
0278 job_ads_all_dict.update(host_job_ads_dict)
0279
0280 with Pool(self.nProcesses) as _pool:
0281 retIterator = _pool.map(
0282 lambda _x: _check_one_worker(
0283 _x, job_ads_all_dict, cancel_unknown=self.cancelUnknown, held_timeout=self.heldTimeout, payload_type=self.payloadType
0284 ),
0285 workspec_list,
0286 )
0287 retList = list(retIterator)
0288 tmpLog.debug("done")
0289 return True, retList
0290
0291
0292 def report_updated_workers(self, time_window):
0293
0294 tmpLog = self.make_logger(baseLogger, method_name="report_updated_workers")
0295 tmpLog.debug("start")
0296
0297 timeNow = time.time()
0298
0299 submission_host_set = set()
0300 for submissionHost in self.submissionHost_list:
0301 submission_host_set.add(submissionHost)
0302 for condorHostConfig in self.condorHostConfig_list:
0303 try:
0304 with open(condorHostConfig, "r") as f:
0305 condor_host_config_map = json.load(f)
0306 for _schedd, _cm in condor_host_config_map.items():
0307 _pool = _cm["pool"]
0308 submissionHost = f"{_schedd},{_pool}"
0309 submission_host_set.add(submissionHost)
0310 except Exception as e:
0311 err_str = f"failed to parse condorHostConfig {condorHostConfig}; {e.__class__.__name__}: {e}"
0312 tmpLog.error(err_str)
0313 continue
0314
0315 job_ads_all_dict = {}
0316 for submissionHost in submission_host_set:
0317 try:
0318 job_query = CondorJobQuery(
0319 cacheEnable=self.cacheEnable, cacheRefreshInterval=self.cacheRefreshInterval, useCondorHistory=self.useCondorHistory, id=submissionHost
0320 )
0321 job_ads_all_dict.update(job_query.get_all(batchIDs_dict=None, allJobs=True, to_update_cache=True))
0322 tmpLog.debug(f"got information of condor jobs on {submissionHost}")
0323 except Exception as e:
0324 ret_err_str = f"Exception {e.__class__.__name__}: {e}"
0325 tmpLog.error(ret_err_str)
0326
0327 workers_to_check_list = []
0328 for condor_job_id, job_ads in job_ads_all_dict.items():
0329
0330 job_EnteredCurrentStatus = job_ads.get("EnteredCurrentStatus")
0331 if not (job_EnteredCurrentStatus > timeNow - time_window):
0332 continue
0333 workerid = job_ads.get("harvesterWorkerID")
0334 batch_status = job_ads.get("JobStatus")
0335 if workerid is None:
0336 continue
0337 else:
0338 workerid = int(workerid)
0339 workers_to_check_list.append((workerid, job_EnteredCurrentStatus))
0340 tmpLog.debug(f"workerID={workerid} got batchStatus={batch_status} at ts={job_EnteredCurrentStatus}")
0341 tmpLog.debug(f"got {len(workers_to_check_list)} workers")
0342 tmpLog.debug("done")
0343 return workers_to_check_list