Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import re
0002 import subprocess
0003 
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestercore.plugin_base import PluginBase
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007 
0008 # logger
0009 baseLogger = core_utils.setup_logger("slurm_monitor")
0010 
0011 
0012 # monitor for SLURM batch system
0013 class SlurmMonitor(PluginBase):
0014     # constructor
0015     def __init__(self, **kwarg):
0016         PluginBase.__init__(self, **kwarg)
0017 
0018     # check workers
0019     def check_workers(self, workspec_list):
0020         retList = []
0021         for workSpec in workspec_list:
0022             # make logger
0023             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0024             # command
0025             comStr = f"sacct --jobs={workSpec.batchID}"
0026             # check
0027             tmpLog.debug(f"check with {comStr}")
0028             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0029             newStatus = workSpec.status
0030             # check return code
0031             stdOut, stdErr = p.communicate()
0032             retCode = p.returncode
0033             tmpLog.debug(f"retCode={retCode}")
0034             errStr = ""
0035             stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0036             stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0037             if retCode == 0:
0038                 for tmpLine in stdOut_str.split("\n"):
0039                     tmpMatch = re.search(f"{workSpec.batchID} ", tmpLine)
0040                     if tmpMatch is not None:
0041                         errStr = tmpLine
0042                         batchStatus = tmpLine.split()[5]
0043                         if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0044                             newStatus = WorkSpec.ST_running
0045                         elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0046                             newStatus = WorkSpec.ST_finished
0047                         elif batchStatus in ["CANCELLED"]:
0048                             newStatus = WorkSpec.ST_cancelled
0049                         elif batchStatus in ["CONFIGURING", "PENDING"]:
0050                             newStatus = WorkSpec.ST_submitted
0051                         else:
0052                             newStatus = WorkSpec.ST_failed
0053                         tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0054                         break
0055                 retList.append((newStatus, errStr))
0056             else:
0057                 # failed
0058                 errStr = f"{stdOut_str} {stdErr_str}"
0059                 tmpLog.error(errStr)
0060                 if "slurm_load_jobs error: Invalid job id specified" in errStr:
0061                     newStatus = WorkSpec.ST_failed
0062                 retList.append((newStatus, errStr))
0063         return True, retList