File indexing completed on 2026-04-20 07:58:59
0001 import re
0002 import subprocess
0003
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007
0008
0009 baseLogger = core_utils.setup_logger("slurm_monitor")
0010
0011
0012
0013 class SlurmMonitor(PluginBase):
0014
0015 def __init__(self, **kwarg):
0016 PluginBase.__init__(self, **kwarg)
0017
0018
0019 def check_workers(self, workspec_list):
0020 retList = []
0021 for workSpec in workspec_list:
0022
0023 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0024
0025 comStr = f"sacct --jobs={workSpec.batchID}"
0026
0027 tmpLog.debug(f"check with {comStr}")
0028 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0029 newStatus = workSpec.status
0030
0031 stdOut, stdErr = p.communicate()
0032 retCode = p.returncode
0033 tmpLog.debug(f"retCode={retCode}")
0034 errStr = ""
0035 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0036 stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0037 if retCode == 0:
0038 for tmpLine in stdOut_str.split("\n"):
0039 tmpMatch = re.search(f"{workSpec.batchID} ", tmpLine)
0040 if tmpMatch is not None:
0041 errStr = tmpLine
0042 batchStatus = tmpLine.split()[5]
0043 if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0044 newStatus = WorkSpec.ST_running
0045 elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0046 newStatus = WorkSpec.ST_finished
0047 elif batchStatus in ["CANCELLED"]:
0048 newStatus = WorkSpec.ST_cancelled
0049 elif batchStatus in ["CONFIGURING", "PENDING"]:
0050 newStatus = WorkSpec.ST_submitted
0051 else:
0052 newStatus = WorkSpec.ST_failed
0053 tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0054 break
0055 retList.append((newStatus, errStr))
0056 else:
0057
0058 errStr = f"{stdOut_str} {stdErr_str}"
0059 tmpLog.error(errStr)
0060 if "slurm_load_jobs error: Invalid job id specified" in errStr:
0061 newStatus = WorkSpec.ST_failed
0062 retList.append((newStatus, errStr))
0063 return True, retList