Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import subprocess
0002 
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.plugin_base import PluginBase
0005 from pandaharvester.harvestercore.work_spec import WorkSpec
0006 
0007 # logger
0008 baseLogger = core_utils.setup_logger("slurm_monitor")
0009 
0010 
0011 # monitor for SLURM batch system
0012 class SlurmBulkMonitor(PluginBase):
0013     # constructor
0014     def __init__(self, **kwarg):
0015         PluginBase.__init__(self, **kwarg)
0016         if not hasattr(self, "use_squeue_monitor"):
0017             self.use_squeue_monitor = False
0018         self.use_squeue_monitor = bool(self.use_squeue_monitor)
0019 
0020     # check workers
0021     def check_workers(self, workspec_list, bulk_size=100):
0022         if self.use_squeue_monitor:
0023             return self.check_workers_squeue(workspec_list, bulk_size)
0024         else:
0025             return self.check_workers_sacct(workspec_list, bulk_size)
0026 
0027     # check workers sacct
0028     def check_workers_sacct(self, workspec_list, bulk_size=100):
0029         retList = []
0030         batch_id_status_map = {}
0031         workspec_list_chunks = [workspec_list[i : i + bulk_size] for i in range(0, len(workspec_list), bulk_size)]
0032         for workspec_list_chunk in workspec_list_chunks:
0033             # make logger
0034             # worker_ids = [workSpec.workerID for workSpec in workspec_list_chunk]
0035             tmpLog = self.make_logger(baseLogger, "bulkWorkers", method_name="check_workers")
0036 
0037             batch_id_list = []
0038             for workSpec in workspec_list_chunk:
0039                 batch_id_list.append(str(workSpec.batchID))
0040             batch_id_list_str = ",".join(batch_id_list)
0041             # command
0042             comStr = f"sacct -X --jobs={batch_id_list_str}"
0043             # check
0044             tmpLog.debug(f"check with {comStr}")
0045             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0046             newStatus = workSpec.status
0047             # check return code
0048             stdOut, stdErr = p.communicate()
0049             retCode = p.returncode
0050             tmpLog.debug(f"retCode={retCode}")
0051             errStr = ""
0052             stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0053             stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0054             tmpLog.debug(f"stdout={stdOut_str}")
0055             tmpLog.debug(f"stderr={stdErr_str}")
0056             if retCode == 0:
0057                 for tmpLine in stdOut_str.split("\n"):
0058                     if len(tmpLine) == 0 or tmpLine.startswith("JobID") or tmpLine.startswith("--"):
0059                         continue
0060                     batchID = tmpLine.split()[0].strip()
0061                     if len(tmpLine.split()) < 6:
0062                         batchStatus = tmpLine.split()[3].strip()
0063                     else:
0064                         batchStatus = tmpLine.split()[5].strip()
0065 
0066                     if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0067                         newStatus = WorkSpec.ST_running
0068                     elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0069                         newStatus = WorkSpec.ST_finished
0070                     elif batchStatus in ["CANCELLED"]:
0071                         newStatus = WorkSpec.ST_cancelled
0072                     elif batchStatus in ["CONFIGURING", "PENDING"]:
0073                         newStatus = WorkSpec.ST_submitted
0074                     else:
0075                         newStatus = WorkSpec.ST_failed
0076                     tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0077                     batch_id_status_map[batchID] = (newStatus, stdErr_str)
0078             else:
0079                 # failed
0080                 errStr = f"{stdOut_str} {stdErr_str}"
0081                 tmpLog.error(errStr)
0082                 if "slurm_load_jobs error: Invalid job id specified" in errStr:
0083                     newStatus = WorkSpec.ST_failed
0084                 for batchID in batch_id_list:
0085                     batch_id_status_map[batchID] = (newStatus, errStr)
0086 
0087         for workSpec in workspec_list:
0088             batchID = str(workSpec.batchID)
0089             newStatus, errStr = None, None
0090             if batchID in batch_id_status_map:
0091                 newStatus, errStr = batch_id_status_map[batchID]
0092             else:
0093                 newStatus = WorkSpec.ST_failed
0094                 errStr = "Unknown batchID"
0095             retList.append((newStatus, errStr))
0096             tmpLog.debug(f"Worker {workSpec.workerID} -> workerStatus {newStatus} errStr {errStr}")
0097         return True, retList
0098 
0099     def check_workers_squeue(self, workspec_list, bulk_size=100):
0100         retList = []
0101         batch_id_status_map = {}
0102         workspec_list_chunks = [workspec_list[i : i + bulk_size] for i in range(0, len(workspec_list), bulk_size)]
0103         for workspec_list_chunk in workspec_list_chunks:
0104             # make logger
0105             # worker_ids = [workSpec.workerID for workSpec in workspec_list_chunk]
0106             tmpLog = self.make_logger(baseLogger, "bulkWorkers", method_name="check_workers")
0107 
0108             batch_id_list = []
0109             for workSpec in workspec_list_chunk:
0110                 batch_id_list.append(str(workSpec.batchID))
0111             batch_id_list_str = ",".join(batch_id_list)
0112             # command
0113             comStr = f"squeue -t all --jobs={batch_id_list_str}"
0114             # check
0115             tmpLog.debug(f"check with {comStr}")
0116             p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0117             newStatus = workSpec.status
0118             # check return code
0119             stdOut, stdErr = p.communicate()
0120             retCode = p.returncode
0121             tmpLog.debug(f"retCode={retCode}")
0122             errStr = ""
0123             stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0124             stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0125             tmpLog.debug(f"stdout={stdOut_str}")
0126             tmpLog.debug(f"stderr={stdErr_str}")
0127             if retCode == 0:
0128                 for tmpLine in stdOut_str.split("\n"):
0129                     tmpLine = tmpLine.strip()
0130                     if len(tmpLine) == 0 or tmpLine.startswith("JobID") or tmpLine.startswith("--") or tmpLine.startswith("JOBID"):
0131                         continue
0132                     batchID = tmpLine.split()[0].strip()
0133                     batchStatus = tmpLine.split()[4].strip()
0134 
0135                     if batchStatus in ["R", "CG", "ST", "S"]:
0136                         newStatus = WorkSpec.ST_running
0137                     elif batchStatus in ["CD", "PR", "TO"]:
0138                         newStatus = WorkSpec.ST_finished
0139                     elif batchStatus in ["CA"]:
0140                         newStatus = WorkSpec.ST_cancelled
0141                     elif batchStatus in ["CF", "PD"]:
0142                         newStatus = WorkSpec.ST_submitted
0143                     else:
0144                         newStatus = WorkSpec.ST_failed
0145                     tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0146                     batch_id_status_map[batchID] = (newStatus, stdErr_str)
0147             else:
0148                 # failed
0149                 errStr = f"{stdOut_str} {stdErr_str}"
0150                 tmpLog.error(errStr)
0151                 if "slurm_load_jobs error: Invalid job id specified" in errStr:
0152                     newStatus = WorkSpec.ST_failed
0153                 for batchID in batch_id_list:
0154                     batch_id_status_map[batchID] = (newStatus, errStr)
0155 
0156         for workSpec in workspec_list:
0157             batchID = str(workSpec.batchID)
0158             newStatus, errStr = None, None
0159             if batchID in batch_id_status_map:
0160                 newStatus, errStr = batch_id_status_map[batchID]
0161             else:
0162                 newStatus = WorkSpec.ST_failed
0163                 errStr = "Unknown batchID"
0164             retList.append((newStatus, errStr))
0165             tmpLog.debug(f"Worker {workSpec.workerID} -> workerStatus {newStatus} errStr {errStr}")
0166         return True, retList