File indexing completed on 2026-04-20 07:58:59
0001 import json
0002 import os
0003 import re
0004 import subprocess
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestercore.work_spec import WorkSpec
0009
0010
0011 baseLogger = core_utils.setup_logger("slurm_squeue_monitor")
0012
0013
0014
0015 class SlurmSqueueMonitor(PluginBase):
0016 _HARVESTER_POSTMORTEM_FILENAME = "FINISHED"
0017
0018
0019 def __init__(self, **kwarg):
0020 PluginBase.__init__(self, **kwarg)
0021
0022
0023 def check_workers(self, workspec_list):
0024 retList = []
0025 for workSpec in workspec_list:
0026
0027 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0028
0029 current_postmortem_fname = f"{workSpec.accessPoint}/{SlurmSqueueMonitor._HARVESTER_POSTMORTEM_FILENAME}"
0030
0031 if os.path.exists(current_postmortem_fname):
0032 with open(current_postmortem_fname) as postmortem:
0033 try:
0034 worker_status_json = json.load(postmortem)
0035 if "worker_status" in worker_status_json:
0036 worker_status = None
0037 if worker_status_json["worker_status"] == "finished":
0038 worker_status = WorkSpec.ST_finished
0039 if worker_status_json["worker_status"] == "failed":
0040 worker_status = WorkSpec.ST_failed
0041 if worker_status is not None:
0042 retList.append((worker_status, ""))
0043 continue
0044 except json.JSONDecodeError:
0045 tmpLog.debug(f"Not able to parse JSON in postmortem for a worker: {current_postmortem_fname}, continung with SLURM CLI")
0046
0047
0048 comStr = f"squeue -j {workSpec.batchID}"
0049
0050 tmpLog.debug(f"check with {comStr}")
0051 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0052 newStatus = workSpec.status
0053
0054 stdOut, stdErr = p.communicate()
0055 retCode = p.returncode
0056 tmpLog.debug(f"retCode={retCode}")
0057 errStr = ""
0058 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0059 stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0060 if retCode == 0:
0061 for tmpLine in stdOut_str.split("\n"):
0062 tmpMatch = re.search(f"{workSpec.batchID} ", tmpLine)
0063 if tmpMatch is not None:
0064 errStr = tmpLine
0065 batchStatus = tmpLine.split()[4]
0066 if batchStatus in ["R", "RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0067 newStatus = WorkSpec.ST_running
0068 elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0069 newStatus = WorkSpec.ST_finished
0070 elif batchStatus in ["CANCELLED"]:
0071 newStatus = WorkSpec.ST_cancelled
0072 elif batchStatus in ["PD", "CONFIGURING", "PENDING"]:
0073 newStatus = WorkSpec.ST_submitted
0074 else:
0075 newStatus = WorkSpec.ST_failed
0076 tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0077 break
0078 retList.append((newStatus, errStr))
0079 else:
0080
0081
0082 newStatus = WorkSpec.ST_finished
0083 errStr = f"{stdOut_str} {stdErr_str}"
0084 tmpLog.error(errStr)
0085
0086
0087 retList.append((newStatus, errStr))
0088 return True, retList
0089
0090 def _get_worker_completion_details():
0091
0092 pass