Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import json
0002 import os
0003 import re
0004 import subprocess
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestercore.work_spec import WorkSpec
0009 
0010 # logger
0011 baseLogger = core_utils.setup_logger("slurm_squeue_monitor")
0012 
0013 
0014 # monitor for SLURM batch system with squeue
0015 class SlurmSqueueMonitor(PluginBase):
0016     _HARVESTER_POSTMORTEM_FILENAME = "FINISHED"
0017 
0018     # constructor
0019     def __init__(self, **kwarg):
0020         PluginBase.__init__(self, **kwarg)
0021 
0022     # check workers
0023     def check_workers(self, workspec_list):
0024         retList = []
0025         for workSpec in workspec_list:
0026             # make logger
0027             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0028             # here try to load file
0029             current_postmortem_fname = f"{workSpec.accessPoint}/{SlurmSqueueMonitor._HARVESTER_POSTMORTEM_FILENAME}"
0030 
0031             if os.path.exists(current_postmortem_fname):
0032                 with open(current_postmortem_fname) as postmortem:
0033                     try:
0034                         worker_status_json = json.load(postmortem)
0035                         if "worker_status" in worker_status_json:
0036                             worker_status = None
0037                             if worker_status_json["worker_status"] == "finished":
0038                                 worker_status = WorkSpec.ST_finished
0039                             if worker_status_json["worker_status"] == "failed":
0040                                 worker_status = WorkSpec.ST_failed
0041                             if worker_status is not None:
0042                                 retList.append((worker_status, ""))
0043                                 continue
0044                     except json.JSONDecodeError:
0045                         tmpLog.debug(f"Not able to parse JSON in postmortem for a worker: {current_postmortem_fname}, continung with SLURM CLI")
0046 
0047             # command
0048             comStr = f"squeue -j {workSpec.batchID}"
0049             # check
0050             tmpLog.debug(f"check with {comStr}")
0051             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0052             newStatus = workSpec.status
0053             # check return code
0054             stdOut, stdErr = p.communicate()
0055             retCode = p.returncode
0056             tmpLog.debug(f"retCode={retCode}")
0057             errStr = ""
0058             stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0059             stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0060             if retCode == 0:
0061                 for tmpLine in stdOut_str.split("\n"):
0062                     tmpMatch = re.search(f"{workSpec.batchID} ", tmpLine)
0063                     if tmpMatch is not None:
0064                         errStr = tmpLine
0065                         batchStatus = tmpLine.split()[4]
0066                         if batchStatus in ["R", "RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0067                             newStatus = WorkSpec.ST_running
0068                         elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0069                             newStatus = WorkSpec.ST_finished
0070                         elif batchStatus in ["CANCELLED"]:
0071                             newStatus = WorkSpec.ST_cancelled
0072                         elif batchStatus in ["PD", "CONFIGURING", "PENDING"]:
0073                             newStatus = WorkSpec.ST_submitted
0074                         else:
0075                             newStatus = WorkSpec.ST_failed
0076                         tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0077                         break
0078                 retList.append((newStatus, errStr))
0079             else:
0080                 # squeue does not show finished jobs, gives return code 1
0081                 # Assume finished for now. Maybe look in workdir.
0082                 newStatus = WorkSpec.ST_finished
0083                 errStr = f"{stdOut_str} {stdErr_str}"
0084                 tmpLog.error(errStr)
0085                 # if 'slurm_load_jobs error: Invalid job id specified' in errStr:
0086                 #    newStatus = WorkSpec.ST_failed
0087                 retList.append((newStatus, errStr))
0088         return True, retList
0089 
0090     def _get_worker_completion_details():
0091         # try to open FINISHED file
0092         pass