File indexing completed on 2026-04-20 07:58:59
0001 import re
0002
0003 from globus_compute_sdk import Executor, Client
0004 from globus_compute_sdk.sdk.shell_function import ShellFunction, ShellResult
0005
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestercore.work_spec import WorkSpec
0009 from globus_sdk.services.compute.errors import ComputeAPIError
0010 from globus_compute_sdk.errors.error_types import TaskExecutionFailed
0011
0012
0013 baseLogger = core_utils.setup_logger("globus_compute_slurm_monitor")
0014
0015
0016
0017 class GlobusComputeSlurmMonitor(PluginBase):
0018
0019 def __init__(self, **kwargs):
0020 PluginBase.__init__(self, **kwargs)
0021 self.uep_id = kwargs.get("uep_id")
0022 self.gc_client = Client()
0023 self.gc_executor = Executor(endpoint_id=self.uep_id)
0024
0025 self.cmd_get_slurm_id = r'''
0026 #!/usr/bin/env bash
0027
0028 if [ ! -e "{gc_sandbox_dir}/slurm_job_id" ]; then
0029 exit 1
0030 fi
0031
0032 read -r slurm_id < "{gc_sandbox_dir}/slurm_job_id"
0033 printf '%s' "$slurm_id"
0034
0035 exit 0
0036 '''
0037 self.bf_get_slurm_id = ShellFunction(self.cmd_get_slurm_id)
0038
0039 self.cmd_sacct = r"sacct --job={slurm_job_id}"
0040 self.bf_sacct = ShellFunction(self.cmd_sacct)
0041
0042
0043
0044 def get_status_sacct(self, tmplog, slurm_job_id):
0045 tmplog.debug(f"Running get_status_sacct() for slurm_job_id = {slurm_job_id}")
0046 future = self.gc_executor.submit(self.bf_sacct, slurm_job_id=slurm_job_id)
0047 sr = future.result()
0048
0049 stdOut = sr.stdout
0050 stdErr = sr.stderr
0051 retCode = sr.returncode
0052 exception_name = sr.exception_name
0053
0054 tmplog.debug(f"for slurm_job_id = {slurm_job_id}, stdOut={stdOut}")
0055 tmplog.debug(f"for slurm_job_id = {slurm_job_id}, stdErr={stdErr}")
0056 tmplog.debug(f"for slurm_job_id = {slurm_job_id}, retCode={retCode}")
0057 tmplog.debug(f"for slurm_job_id = {slurm_job_id}, exception_name={exception_name}")
0058
0059 errStr = ""
0060 stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0061 stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0062
0063 newStatus = WorkSpec.ST_failed
0064 if retCode == 0:
0065 for tmpLine in stdOut_str.split("\n"):
0066 tmpMatch = re.search(f"{slurm_job_id} ", tmpLine)
0067 if tmpMatch is not None:
0068 errStr = tmpLine
0069 batchStatus = tmpLine.split()[5]
0070 if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0071 newStatus = WorkSpec.ST_running
0072 elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0073 newStatus = WorkSpec.ST_finished
0074 elif batchStatus in ["CANCELLED"]:
0075 newStatus = WorkSpec.ST_cancelled
0076 elif batchStatus in ["CONFIGURING", "PENDING"]:
0077 newStatus = WorkSpec.ST_submitted
0078 else:
0079 newStatus = WorkSpec.ST_failed
0080 tmplog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0081 break
0082 return newStatus, errStr
0083 else:
0084
0085 errStr = f"{stdOut_str} {stdErr_str}"
0086 tmplog.error(errStr)
0087 if "slurm_load_jobs error: Invalid job id specified" in errStr:
0088 newStatus = WorkSpec.ST_failed
0089 return newStatus, errStr
0090
0091
0092
0093
0094 def get_slurm_job_id(self, tmplog, gc_sandbox_dir):
0095 tmplog.debug(f"Running get_slurm_job_id() for gc_sandbox_dir = {gc_sandbox_dir}")
0096 future = self.gc_executor.submit(self.bf_get_slurm_id, gc_sandbox_dir=gc_sandbox_dir)
0097 sr = future.result()
0098
0099 stdOut = sr.stdout
0100 stdErr = sr.stderr
0101 retCode = sr.returncode
0102 exception_name = sr.exception_name
0103
0104 tmplog.debug(f"for gc_sandbox_dir = {gc_sandbox_dir}, stdOut={stdOut}")
0105 tmplog.debug(f"for gc_sandbox_dir = {gc_sandbox_dir}, stdErr={stdErr}")
0106 tmplog.debug(f"for gc_sandbox_dir = {gc_sandbox_dir}, retCode={retCode}")
0107 tmplog.debug(f"for gc_sandbox_dir = {gc_sandbox_dir}, exception_name={exception_name}")
0108
0109 return retCode, stdOut
0110
0111
0112
0113
0114
0115 def check_workers(self, workspec_list):
0116 retList = []
0117 for work_spec in workspec_list:
0118 tmplog = self.make_logger(baseLogger, f"workerID={work_spec.workerID}", method_name="check_workers")
0119 get_attr_status, globus_compute_attr_dict = work_spec.get_work_attribute("globus_compute_attr")
0120 if not get_attr_status:
0121 err_str = "Failed to get globus_compute_attr"
0122 tmplog.error(err_str)
0123 retList.append((work_spec.ST_failed, err_str))
0124 continue
0125 else:
0126 tmplog.debug(f"globus_compute_attr_dict ontained from work_spec = {globus_compute_attr_dict}")
0127 gc_task_id = globus_compute_attr_dict["gc_task_id"]
0128 sandbox_dir = globus_compute_attr_dict["sandbox_dir"]
0129 slurmID = work_spec.batchID
0130 if slurmID:
0131 tmplog.debug(f"for workerID={work_spec.workerID}, slurm_job_id is fetched successfully with {slurmID}, so invoke get_status_sacct")
0132 new_status, err_str = self.get_status_sacct(tmplog, slurmID)
0133 else:
0134 tmplog.debug(f"for workerID={work_spec.workerID}, slurm_job_id has not been fetched, so try to fetch it")
0135 retCode, stdOut = self.get_slurm_job_id(tmplog, sandbox_dir)
0136 new_status = None
0137 err_str = ""
0138 status_dict = None
0139
0140 if retCode == 0:
0141 slurmID = stdOut.strip()
0142 work_spec.batchID = slurmID
0143 work_spec.nativeStatus = f"update batchID to be slurm_id={slurmID}"
0144 new_status = work_spec.ST_running
0145 err_str = ""
0146 tmplog.debug(f"retCode = 0, work_spec.batchID reset to {work_spec.batchID}")
0147 else:
0148 if retCode == 1:
0149 tmplog.debug(f"File {sandbox_dir} does not exist, need to check if the job is still query")
0150 else:
0151 tmplog.error(f"Unknown return code {retCode} for {sandbox_dir}, need to check")
0152
0153 task_list = [gc_task_id]
0154 try:
0155 status_dict = self.gc_client.get_batch_result(task_list)
0156 except ComputeAPIError as e:
0157 tmplog.error("ComputeAPIError occurred while retrieving batch results: %s", e)
0158 new_status = work_spec.ST_failed
0159 err_str = "ComputeAPIError"
0160 except TaskExecutionFailed as e:
0161 tmplog.error("TaskExecutionFailed occurred while retrieving batch results: %s", e)
0162 new_status = work_spec.ST_failed
0163 err_str = "TaskExecutionFailed"
0164 except Exception as e:
0165 tmplog.exception("An unexpected error occurred while retrieving batch results")
0166 new_status = work_spec.ST_failed
0167 err_str = "Unexpected error"
0168
0169 if status_dict:
0170 tmplog.debug(f"Status dictionary received: {status_dict}")
0171 worker_status = status_dict.get(gc_task_id, {})
0172 pending = worker_status.get('pending')
0173 status = worker_status.get('status')
0174 err_str = ""
0175 if pending:
0176 new_status = work_spec.ST_running
0177 else:
0178 if status == "success":
0179 new_status = work_spec.ST_finished
0180 else:
0181 tmplog.debug("Unexpected state: pending=%s, status=%s", pending, status)
0182 new_status = work_spec.ST_running
0183 tmplog.debug(f"Final status for worker {gc_task_id}: pending={pending}, status={status}, new_status={new_status}")
0184 else:
0185 new_status = work_spec.ST_failed
0186 err_str = "Careful! It might not finished! This is testing empty dict!"
0187
0188 retList.append((new_status, err_str))
0189
0190 return True, retList