File indexing completed on 2026-04-20 07:58:59
0001 import subprocess
0002
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.plugin_base import PluginBase
0005 from pandaharvester.harvestercore.work_spec import WorkSpec
0006
0007
0008 baseLogger = core_utils.setup_logger("slurm_monitor")
0009
0010
0011
0012 class SlurmBulkMonitor(PluginBase):
0013
0014 def __init__(self, **kwarg):
0015 PluginBase.__init__(self, **kwarg)
0016 if not hasattr(self, "use_squeue_monitor"):
0017 self.use_squeue_monitor = False
0018 self.use_squeue_monitor = bool(self.use_squeue_monitor)
0019
0020
0021 def check_workers(self, workspec_list, bulk_size=100):
0022 if self.use_squeue_monitor:
0023 return self.check_workers_squeue(workspec_list, bulk_size)
0024 else:
0025 return self.check_workers_sacct(workspec_list, bulk_size)
0026
0027
0028 def check_workers_sacct(self, workspec_list, bulk_size=100):
0029 retList = []
0030 batch_id_status_map = {}
0031 workspec_list_chunks = [workspec_list[i : i + bulk_size] for i in range(0, len(workspec_list), bulk_size)]
0032 for workspec_list_chunk in workspec_list_chunks:
0033
0034
0035 tmpLog = self.make_logger(baseLogger, "bulkWorkers", method_name="check_workers")
0036
0037 batch_id_list = []
0038 for workSpec in workspec_list_chunk:
0039 batch_id_list.append(str(workSpec.batchID))
0040 batch_id_list_str = ",".join(batch_id_list)
0041
0042 comStr = f"sacct -X --jobs={batch_id_list_str}"
0043
0044 tmpLog.debug(f"check with {comStr}")
0045 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0046 newStatus = workSpec.status
0047
0048 stdOut, stdErr = p.communicate()
0049 retCode = p.returncode
0050 tmpLog.debug(f"retCode={retCode}")
0051 errStr = ""
0052 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0053 stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0054 tmpLog.debug(f"stdout={stdOut_str}")
0055 tmpLog.debug(f"stderr={stdErr_str}")
0056 if retCode == 0:
0057 for tmpLine in stdOut_str.split("\n"):
0058 if len(tmpLine) == 0 or tmpLine.startswith("JobID") or tmpLine.startswith("--"):
0059 continue
0060 batchID = tmpLine.split()[0].strip()
0061 if len(tmpLine.split()) < 6:
0062 batchStatus = tmpLine.split()[3].strip()
0063 else:
0064 batchStatus = tmpLine.split()[5].strip()
0065
0066 if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0067 newStatus = WorkSpec.ST_running
0068 elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0069 newStatus = WorkSpec.ST_finished
0070 elif batchStatus in ["CANCELLED"]:
0071 newStatus = WorkSpec.ST_cancelled
0072 elif batchStatus in ["CONFIGURING", "PENDING"]:
0073 newStatus = WorkSpec.ST_submitted
0074 else:
0075 newStatus = WorkSpec.ST_failed
0076 tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0077 batch_id_status_map[batchID] = (newStatus, stdErr_str)
0078 else:
0079
0080 errStr = f"{stdOut_str} {stdErr_str}"
0081 tmpLog.error(errStr)
0082 if "slurm_load_jobs error: Invalid job id specified" in errStr:
0083 newStatus = WorkSpec.ST_failed
0084 for batchID in batch_id_list:
0085 batch_id_status_map[batchID] = (newStatus, errStr)
0086
0087 for workSpec in workspec_list:
0088 batchID = str(workSpec.batchID)
0089 newStatus, errStr = None, None
0090 if batchID in batch_id_status_map:
0091 newStatus, errStr = batch_id_status_map[batchID]
0092 else:
0093 newStatus = WorkSpec.ST_failed
0094 errStr = "Unknown batchID"
0095 retList.append((newStatus, errStr))
0096 tmpLog.debug(f"Worker {workSpec.workerID} -> workerStatus {newStatus} errStr {errStr}")
0097 return True, retList
0098
0099 def check_workers_squeue(self, workspec_list, bulk_size=100):
0100 retList = []
0101 batch_id_status_map = {}
0102 workspec_list_chunks = [workspec_list[i : i + bulk_size] for i in range(0, len(workspec_list), bulk_size)]
0103 for workspec_list_chunk in workspec_list_chunks:
0104
0105
0106 tmpLog = self.make_logger(baseLogger, "bulkWorkers", method_name="check_workers")
0107
0108 batch_id_list = []
0109 for workSpec in workspec_list_chunk:
0110 batch_id_list.append(str(workSpec.batchID))
0111 batch_id_list_str = ",".join(batch_id_list)
0112
0113 comStr = f"squeue -t all --jobs={batch_id_list_str}"
0114
0115 tmpLog.debug(f"check with {comStr}")
0116 p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0117 newStatus = workSpec.status
0118
0119 stdOut, stdErr = p.communicate()
0120 retCode = p.returncode
0121 tmpLog.debug(f"retCode={retCode}")
0122 errStr = ""
0123 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0124 stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0125 tmpLog.debug(f"stdout={stdOut_str}")
0126 tmpLog.debug(f"stderr={stdErr_str}")
0127 if retCode == 0:
0128 for tmpLine in stdOut_str.split("\n"):
0129 tmpLine = tmpLine.strip()
0130 if len(tmpLine) == 0 or tmpLine.startswith("JobID") or tmpLine.startswith("--") or tmpLine.startswith("JOBID"):
0131 continue
0132 batchID = tmpLine.split()[0].strip()
0133 batchStatus = tmpLine.split()[4].strip()
0134
0135 if batchStatus in ["R", "CG", "ST", "S"]:
0136 newStatus = WorkSpec.ST_running
0137 elif batchStatus in ["CD", "PR", "TO"]:
0138 newStatus = WorkSpec.ST_finished
0139 elif batchStatus in ["CA"]:
0140 newStatus = WorkSpec.ST_cancelled
0141 elif batchStatus in ["CF", "PD"]:
0142 newStatus = WorkSpec.ST_submitted
0143 else:
0144 newStatus = WorkSpec.ST_failed
0145 tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0146 batch_id_status_map[batchID] = (newStatus, stdErr_str)
0147 else:
0148
0149 errStr = f"{stdOut_str} {stdErr_str}"
0150 tmpLog.error(errStr)
0151 if "slurm_load_jobs error: Invalid job id specified" in errStr:
0152 newStatus = WorkSpec.ST_failed
0153 for batchID in batch_id_list:
0154 batch_id_status_map[batchID] = (newStatus, errStr)
0155
0156 for workSpec in workspec_list:
0157 batchID = str(workSpec.batchID)
0158 newStatus, errStr = None, None
0159 if batchID in batch_id_status_map:
0160 newStatus, errStr = batch_id_status_map[batchID]
0161 else:
0162 newStatus = WorkSpec.ST_failed
0163 errStr = "Unknown batchID"
0164 retList.append((newStatus, errStr))
0165 tmpLog.debug(f"Worker {workSpec.workerID} -> workerStatus {newStatus} errStr {errStr}")
0166 return True, retList