Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import json
0002 import time
0003 from concurrent.futures import ThreadPoolExecutor as Pool
0004 
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.pilot_errors import PilotErrors
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 from pandaharvester.harvestercore.work_spec import WorkSpec
0010 from pandaharvester.harvestercore.worker_errors import WorkerErrors
0011 from pandaharvester.harvestermisc.htcondor_utils import (
0012     CondorJobManage,
0013     CondorJobQuery,
0014     condor_job_id_from_workspec,
0015     get_host_batchid_map,
0016 )
0017 from pandaharvester.harvestermonitor.monitor_common import get_payload_errstr_from_ec
0018 
0019 # logger
0020 baseLogger = core_utils.setup_logger("htcondor_monitor")
0021 
0022 
0023 # Native HTCondor job status map
0024 CONDOR_JOB_STATUS_MAP = {
0025     "1": "idle",
0026     "2": "running",
0027     "3": "removed",
0028     "4": "completed",
0029     "5": "held",
0030     "6": "transferring_output",
0031     "7": "suspended",
0032 }
0033 
0034 
0035 # Condor jobs held with these reasons should be killed
0036 TO_KILL_HOLD_REASONS = [
0037     "Job not found",
0038     "Failed to start GAHP",
0039 ]
0040 
0041 
0042 # pilot error object
0043 PILOT_ERRORS = PilotErrors()
0044 
0045 
0046 # Check one worker
0047 def _check_one_worker(workspec, job_ads_all_dict, cancel_unknown=False, held_timeout=3600, payload_type=None):
0048     # Make logger for one single worker
0049     tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="_check_one_worker")
0050     # Initialize newStatus
0051     newStatus = workspec.status
0052     errStr = ""
0053     try:
0054         job_ads_dict = job_ads_all_dict[condor_job_id_from_workspec(workspec)]
0055     except KeyError:
0056         got_job_ads = False
0057     except Exception as e:
0058         got_job_ads = False
0059         tmpLog.error(f"With error {e}")
0060     else:
0061         got_job_ads = True
0062     # Parse job ads
0063     if got_job_ads:
0064         # Check JobStatus
0065         try:
0066             batchStatus = str(job_ads_dict["JobStatus"])
0067         except KeyError:
0068             # Propagate native condor job status as unknown
0069             workspec.nativeStatus = "unknown"
0070             if cancel_unknown:
0071                 newStatus = WorkSpec.ST_cancelled
0072                 errStr = f"cannot get JobStatus of job submissionHost={workspec.submissionHost} batchID={workspec.batchID}. Regard the worker as canceled"
0073                 tmpLog.error(errStr)
0074             else:
0075                 newStatus = None
0076                 errStr = f"cannot get JobStatus of job submissionHost={workspec.submissionHost} batchID={workspec.batchID}. Skipped"
0077                 tmpLog.warning(errStr)
0078         else:
0079             # Try to get LastJobStatus
0080             lastBatchStatus = str(job_ads_dict.get("LastJobStatus", ""))
0081             # Set batchStatus if lastBatchStatus is terminated status
0082             if (lastBatchStatus in ["3", "4"] and batchStatus not in ["3", "4"]) or (lastBatchStatus in ["4"] and batchStatus in ["3"]):
0083                 batchStatus = lastBatchStatus
0084                 tmpLog.warning(
0085                     "refer to LastJobStatus={0} as new status of job submissionHost={1} batchID={2} to avoid reversal in status (Jobstatus={3})".format(
0086                         lastBatchStatus, workspec.submissionHost, workspec.batchID, str(job_ads_dict["JobStatus"])
0087                     )
0088                 )
0089             # Propagate native condor job status
0090             workspec.nativeStatus = CONDOR_JOB_STATUS_MAP.get(batchStatus, "unexpected")
0091             if batchStatus in ["2", "6"]:
0092                 # 2 running, 6 transferring output
0093                 newStatus = WorkSpec.ST_running
0094             elif batchStatus in ["1", "7"]:
0095                 # 1 idle, 7 suspended
0096                 if job_ads_dict.get("JobStartDate"):
0097                     newStatus = WorkSpec.ST_idle
0098                 else:
0099                     newStatus = WorkSpec.ST_submitted
0100             elif batchStatus in ["3"]:
0101                 # 3 removed
0102                 if not errStr:
0103                     errStr = f"Condor HoldReason: {job_ads_dict.get('LastHoldReason')} ; Condor RemoveReason: {job_ads_dict.get('RemoveReason')} "
0104                 newStatus = WorkSpec.ST_cancelled
0105             elif batchStatus in ["5"]:
0106                 # 5 held
0107                 hold_reason = job_ads_dict.get("HoldReason")
0108                 errStr = f"Condor HoldReason: {hold_reason} "
0109                 if hold_reason in TO_KILL_HOLD_REASONS or int(time.time()) - int(job_ads_dict.get("EnteredCurrentStatus", 0)) > held_timeout:
0110                     # Kill the job if held too long or other reasons
0111                     if hold_reason in TO_KILL_HOLD_REASONS:
0112                         tmpLog.debug(f"trying to kill job submissionHost={workspec.submissionHost} batchID={workspec.batchID} due to HoldReason: {hold_reason}")
0113                     else:
0114                         tmpLog.debug(f"trying to kill job submissionHost={workspec.submissionHost} batchID={workspec.batchID} due to held too long")
0115                     for submissionHost, batchIDs_dict in get_host_batchid_map([workspec]).items():
0116                         batchIDs_list = list(batchIDs_dict.keys())
0117                         condor_job_manage = CondorJobManage(id=workspec.submissionHost)
0118                         try:
0119                             ret_map = condor_job_manage.remove(batchIDs_list)
0120                         except Exception as e:
0121                             ret_map = {}
0122                             ret_err_str = f"failed to kill job. Exception {e.__class__.__name__}: {e}"
0123                             tmpLog.error(ret_err_str)
0124                         else:
0125                             ret = ret_map.get(condor_job_id_from_workspec(workspec))
0126                             if ret and ret[0]:
0127                                 tmpLog.info(f"killed held job submissionHost={workspec.submissionHost} batchID={workspec.batchID}")
0128                             else:
0129                                 tmpLog.error(f"cannot kill held job submissionHost={workspec.submissionHost} batchID={workspec.batchID}")
0130                     newStatus = WorkSpec.ST_cancelled
0131                     errStr += " ; Worker canceled by harvester due to held too long or not found"
0132                     # Mark the PanDA job as closed instead of failed
0133                     workspec.set_pilot_closed()
0134                     tmpLog.debug("Called workspec set_pilot_closed")
0135                 else:
0136                     if job_ads_dict.get("JobStartDate"):
0137                         newStatus = WorkSpec.ST_idle
0138                     else:
0139                         newStatus = WorkSpec.ST_submitted
0140             elif batchStatus in ["4"]:
0141                 # 4 completed
0142                 try:
0143                     payloadExitCode_str = str(job_ads_dict["ExitCode"])
0144                     payloadExitCode = int(payloadExitCode_str)
0145                 except KeyError:
0146                     errStr = f"cannot get ExitCode of job submissionHost={workspec.submissionHost} batchID={workspec.batchID}. Regard the worker as failed"
0147                     tmpLog.warning(errStr)
0148                     newStatus = WorkSpec.ST_failed
0149                 except ValueError:
0150                     errStr = "got invalid ExitCode {0} of job submissionHost={1} batchID={2}. Regard the worker as failed".format(
0151                         payloadExitCode_str, workspec.submissionHost, workspec.batchID
0152                     )
0153                     tmpLog.warning(errStr)
0154                     newStatus = WorkSpec.ST_failed
0155                 else:
0156                     # Propagate condor return code
0157                     workspec.nativeExitCode = payloadExitCode
0158                     if payloadExitCode == 0:
0159                         # Payload should return 0 after successful run
0160                         newStatus = WorkSpec.ST_finished
0161                     else:
0162                         # Other return codes are considered failed
0163                         newStatus = WorkSpec.ST_failed
0164                         errStr = ""
0165                         if payload_type:
0166                             errStr = get_payload_errstr_from_ec(payload_type, payloadExitCode)
0167                         if not errStr:
0168                             errStr = f"Payload execution error: returned non-zero {payloadExitCode}"
0169                         tmpLog.debug(errStr)
0170                         # Map return code to Pilot error code
0171                         reduced_exit_code = payloadExitCode // 256 if (payloadExitCode % 256 == 0) else payloadExitCode
0172                         pilot_error_code, pilot_error_diag = PILOT_ERRORS.convertToPilotErrors(reduced_exit_code)
0173                         if pilot_error_code is not None:
0174                             workspec.set_pilot_error(pilot_error_code, pilot_error_diag)
0175                     tmpLog.info(f"Payload return code = {payloadExitCode}")
0176             else:
0177                 errStr = "cannot get reasonable JobStatus of job submissionHost={0} batchID={1}. Regard the worker as failed by default".format(
0178                     workspec.submissionHost, workspec.batchID
0179                 )
0180                 tmpLog.error(errStr)
0181                 newStatus = WorkSpec.ST_failed
0182             tmpLog.info(f"submissionHost={workspec.submissionHost} batchID={workspec.batchID} : batchStatus {batchStatus} -> workerStatus {newStatus}")
0183     else:
0184         # Propagate native condor job status as unknown
0185         workspec.nativeStatus = "unknown"
0186         if cancel_unknown:
0187             errStr = f"condor job submissionHost={workspec.submissionHost} batchID={workspec.batchID} not found. Regard the worker as canceled by default"
0188             tmpLog.error(errStr)
0189             newStatus = WorkSpec.ST_cancelled
0190             tmpLog.info(f"submissionHost={workspec.submissionHost} batchID={workspec.batchID} : batchStatus 3 -> workerStatus {newStatus}")
0191         else:
0192             errStr = f"condor job submissionHost={workspec.submissionHost} batchID={workspec.batchID} not found. Skipped"
0193             tmpLog.warning(errStr)
0194             newStatus = None
0195     # Set supplemental error message
0196     error_code = None
0197     if errStr:
0198         # Check if the error message matches any known patterns
0199         error_code = WorkerErrors.htcondor_message_pattern_handler.get_error_code(errStr)
0200         if error_code is None:
0201             # If no pattern matched, use a general error code
0202             error_code = WorkerErrors.error_codes.get("GENERAL_ERROR")
0203         tmpLog.debug(f"errorCode={error_code} errorDiag={errStr}")
0204     else:
0205         # No error string; set to succeeded
0206         error_code = WorkerErrors.error_codes.get("SUCCEEDED")
0207     workspec.set_supplemental_error(error_code=error_code, error_diag=errStr)
0208     # Return
0209     return (newStatus, errStr)
0210 
0211 
0212 # monitor for HTCONDOR batch system
0213 class HTCondorMonitor(PluginBase):
0214     # constructor
0215     def __init__(self, **kwarg):
0216         PluginBase.__init__(self, **kwarg)
0217         extra_plugin_configs = {}
0218         try:
0219             extra_plugin_configs = harvester_config.master.extraPluginConfigs["HTCondorMonitor"]
0220         except AttributeError:
0221             pass
0222         except KeyError:
0223             pass
0224         # number of processes to use for checking workers
0225         self.nProcesses = getattr(self, "nProcesses", 4)
0226         # whether to cancel unknown workers
0227         self.cancelUnknown = getattr(self, "cancelUnknown", False)
0228         self.cancelUnknown = bool(self.cancelUnknown)
0229         # timeout for held condor jobs in seconds
0230         self.heldTimeout = getattr(self, "heldTimeout", 3600)
0231         # whether to use cache for condor job query
0232         self.cacheEnable = False
0233         try:
0234             self.cacheEnable = harvester_config.monitor.pluginCacheEnable
0235         except AttributeError:
0236             pass
0237         # cache refresh interval in seconds
0238         self.cacheRefreshInterval = 240
0239         try:
0240             self.cacheRefreshInterval = harvester_config.monitor.pluginCacheRefreshInterval
0241         except AttributeError:
0242             self.cacheRefreshInterval = harvester_config.monitor.checkInterval
0243         # whether to use condor history
0244         self.useCondorHistory = getattr(self, "useCondorHistory", True)
0245         if extra_plugin_configs.get("use_condor_history") is False:
0246             self.useCondorHistory = False
0247         # max age of workers in seconds (since last status update) that are allowed to queyr with condor history
0248         self.useCondorHistoryMaxAge = getattr(self, "useCondorHistoryMaxAge", 7200)
0249         # submission hosts
0250         self.submissionHost_list = getattr(self, "submissionHost_list", [])
0251         # condor host config files
0252         self.condorHostConfig_list = getattr(self, "condorHostConfig_list", [])
0253         # payload type
0254         self.payloadType = getattr(self, "payloadType", None)
0255 
0256     # check workers
0257     def check_workers(self, workspec_list):
0258         # Make logger for batch job query
0259         tmpLog = self.make_logger(baseLogger, "batch job query", method_name="check_workers")
0260         tmpLog.debug("start")
0261         # Loop over submissionHost
0262         job_ads_all_dict = {}
0263         for submissionHost, batchIDs_dict in get_host_batchid_map(workspec_list).items():
0264             # Record batch job query result to this dict, with key = batchID
0265             try:
0266                 job_query = CondorJobQuery(
0267                     cacheEnable=self.cacheEnable,
0268                     cacheRefreshInterval=self.cacheRefreshInterval,
0269                     useCondorHistory=self.useCondorHistory,
0270                     id=submissionHost,
0271                     useCondorHistoryMaxAge=self.useCondorHistoryMaxAge,
0272                 )
0273                 host_job_ads_dict = job_query.get_all(batchIDs_dict=batchIDs_dict)
0274             except Exception as e:
0275                 host_job_ads_dict = {}
0276                 ret_err_str = f"Exception {e.__class__.__name__}: {e}"
0277                 tmpLog.error(ret_err_str)
0278             job_ads_all_dict.update(host_job_ads_dict)
0279         # Check for all workers
0280         with Pool(self.nProcesses) as _pool:
0281             retIterator = _pool.map(
0282                 lambda _x: _check_one_worker(
0283                     _x, job_ads_all_dict, cancel_unknown=self.cancelUnknown, held_timeout=self.heldTimeout, payload_type=self.payloadType
0284                 ),
0285                 workspec_list,
0286             )
0287         retList = list(retIterator)
0288         tmpLog.debug("done")
0289         return True, retList
0290 
0291     # report updated workers info to monitor to check
0292     def report_updated_workers(self, time_window):
0293         # Make logger for batch job query
0294         tmpLog = self.make_logger(baseLogger, method_name="report_updated_workers")
0295         tmpLog.debug("start")
0296         # Get now timestamp
0297         timeNow = time.time()
0298         # Set of submission hosts
0299         submission_host_set = set()
0300         for submissionHost in self.submissionHost_list:
0301             submission_host_set.add(submissionHost)
0302         for condorHostConfig in self.condorHostConfig_list:
0303             try:
0304                 with open(condorHostConfig, "r") as f:
0305                     condor_host_config_map = json.load(f)
0306                 for _schedd, _cm in condor_host_config_map.items():
0307                     _pool = _cm["pool"]
0308                     submissionHost = f"{_schedd},{_pool}"
0309                     submission_host_set.add(submissionHost)
0310             except Exception as e:
0311                 err_str = f"failed to parse condorHostConfig {condorHostConfig}; {e.__class__.__name__}: {e}"
0312                 tmpLog.error(err_str)
0313                 continue
0314         # Loop over submissionHost and get all jobs
0315         job_ads_all_dict = {}
0316         for submissionHost in submission_host_set:
0317             try:
0318                 job_query = CondorJobQuery(
0319                     cacheEnable=self.cacheEnable, cacheRefreshInterval=self.cacheRefreshInterval, useCondorHistory=self.useCondorHistory, id=submissionHost
0320                 )
0321                 job_ads_all_dict.update(job_query.get_all(batchIDs_dict=None, allJobs=True, to_update_cache=True))
0322                 tmpLog.debug(f"got information of condor jobs on {submissionHost}")
0323             except Exception as e:
0324                 ret_err_str = f"Exception {e.__class__.__name__}: {e}"
0325                 tmpLog.error(ret_err_str)
0326         # Choose workers updated within a time window
0327         workers_to_check_list = []
0328         for condor_job_id, job_ads in job_ads_all_dict.items():
0329             # put in worker cache fifo, with lock mechanism
0330             job_EnteredCurrentStatus = job_ads.get("EnteredCurrentStatus")
0331             if not (job_EnteredCurrentStatus > timeNow - time_window):
0332                 continue
0333             workerid = job_ads.get("harvesterWorkerID")
0334             batch_status = job_ads.get("JobStatus")
0335             if workerid is None:
0336                 continue
0337             else:
0338                 workerid = int(workerid)
0339             workers_to_check_list.append((workerid, job_EnteredCurrentStatus))
0340             tmpLog.debug(f"workerID={workerid} got batchStatus={batch_status} at ts={job_EnteredCurrentStatus}")
0341         tmpLog.debug(f"got {len(workers_to_check_list)} workers")
0342         tmpLog.debug("done")
0343         return workers_to_check_list