Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import re
0002 
0003 from globus_compute_sdk import Executor, Client
0004 from globus_compute_sdk.sdk.shell_function import ShellFunction, ShellResult
0005 
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 from pandaharvester.harvestercore.work_spec import WorkSpec
0009 from globus_sdk.services.compute.errors import ComputeAPIError
0010 from globus_compute_sdk.errors.error_types import TaskExecutionFailed
0011 
0012 # logger
0013 baseLogger = core_utils.setup_logger("globus_compute_slurm_monitor")
0014 
0015 
0016 # monitor for Globus Compute on SLURM system
0017 class GlobusComputeSlurmMonitor(PluginBase):
0018     # constructor
0019     def __init__(self, **kwargs):
0020         PluginBase.__init__(self, **kwargs)
0021         self.uep_id         = kwargs.get("uep_id")
0022         self.gc_client      = Client()
0023         self.gc_executor    = Executor(endpoint_id=self.uep_id)
0024 
0025         self.cmd_get_slurm_id = r'''
0026 #!/usr/bin/env bash
0027 
0028 if [ ! -e "{gc_sandbox_dir}/slurm_job_id" ]; then
0029   exit 1
0030 fi
0031 
0032 read -r slurm_id < "{gc_sandbox_dir}/slurm_job_id"
0033 printf '%s' "$slurm_id"
0034 
0035 exit 0
0036 '''
0037         self.bf_get_slurm_id = ShellFunction(self.cmd_get_slurm_id)
0038 
0039         self.cmd_sacct = r"sacct --job={slurm_job_id}"
0040         self.bf_sacct = ShellFunction(self.cmd_sacct)
0041 
0042 
0043     # Helper function, given a slurm job id, return the status of slurm sandbox as in local mode
0044     def get_status_sacct(self, tmplog, slurm_job_id):
0045         tmplog.debug(f"Running get_status_sacct() for slurm_job_id = {slurm_job_id}")
0046         future = self.gc_executor.submit(self.bf_sacct, slurm_job_id=slurm_job_id)
0047         sr = future.result()
0048 
0049         stdOut = sr.stdout
0050         stdErr = sr.stderr
0051         retCode = sr.returncode
0052         exception_name = sr.exception_name
0053 
0054         tmplog.debug(f"for slurm_job_id = {slurm_job_id}, stdOut={stdOut}")
0055         tmplog.debug(f"for slurm_job_id = {slurm_job_id}, stdErr={stdErr}")
0056         tmplog.debug(f"for slurm_job_id = {slurm_job_id}, retCode={retCode}")
0057         tmplog.debug(f"for slurm_job_id = {slurm_job_id}, exception_name={exception_name}")
0058 
0059         errStr = ""
0060         stdOut_str = stdOut if (isinstance(stdOut, str) or stdOut is None) else stdOut.decode()
0061         stdErr_str = stdErr if (isinstance(stdErr, str) or stdErr is None) else stdErr.decode()
0062 
0063         newStatus = WorkSpec.ST_failed
0064         if retCode == 0:
0065             for tmpLine in stdOut_str.split("\n"):
0066                 tmpMatch = re.search(f"{slurm_job_id} ", tmpLine)
0067                 if tmpMatch is not None:
0068                     errStr = tmpLine
0069                     batchStatus = tmpLine.split()[5]
0070                     if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0071                         newStatus = WorkSpec.ST_running
0072                     elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0073                         newStatus = WorkSpec.ST_finished
0074                     elif batchStatus in ["CANCELLED"]:
0075                         newStatus = WorkSpec.ST_cancelled
0076                     elif batchStatus in ["CONFIGURING", "PENDING"]:
0077                         newStatus = WorkSpec.ST_submitted
0078                     else:
0079                         newStatus = WorkSpec.ST_failed
0080                     tmplog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0081                     break
0082             return newStatus, errStr
0083         else:
0084             # failed
0085             errStr = f"{stdOut_str} {stdErr_str}"
0086             tmplog.error(errStr)
0087             if "slurm_load_jobs error: Invalid job id specified" in errStr:
0088                 newStatus = WorkSpec.ST_failed
0089             return newStatus, errStr
0090 
0091     # Helper function, given a gc sandbox dir which includes stdout/stderr/slurm_job_id, check if slurm_job_id exist and try to return slurm_job_id
0092     # return code = 0: success, stdout is the slurm_job_id
0093     # return code = 1: file does not exist, should try again later
0094     def get_slurm_job_id(self, tmplog, gc_sandbox_dir):
0095         tmplog.debug(f"Running get_slurm_job_id() for gc_sandbox_dir = {gc_sandbox_dir}")
0096         future = self.gc_executor.submit(self.bf_get_slurm_id, gc_sandbox_dir=gc_sandbox_dir)
0097         sr = future.result()
0098 
0099         stdOut = sr.stdout
0100         stdErr = sr.stderr
0101         retCode = sr.returncode
0102         exception_name = sr.exception_name
0103 
0104         tmplog.debug(f"for gc_sandbox_dir = {gc_sandbox_dir}, stdOut={stdOut}")
0105         tmplog.debug(f"for gc_sandbox_dir = {gc_sandbox_dir}, stdErr={stdErr}")
0106         tmplog.debug(f"for gc_sandbox_dir = {gc_sandbox_dir}, retCode={retCode}")
0107         tmplog.debug(f"for gc_sandbox_dir = {gc_sandbox_dir}, exception_name={exception_name}")
0108 
0109         return retCode, stdOut
0110 
0111     # check workers
0112     # First check if slurm_job_id is already fetched. 
0113     # If already have, use sacct to query job info
0114     # If not, try to fetch slurm job id, if fetch successfully, skip the rest and continue, otherwise crosscheck with the status of the job using gc API
0115     def check_workers(self, workspec_list):
0116         retList = []
0117         for work_spec in workspec_list:
0118             tmplog = self.make_logger(baseLogger, f"workerID={work_spec.workerID}", method_name="check_workers")
0119             get_attr_status, globus_compute_attr_dict = work_spec.get_work_attribute("globus_compute_attr")
0120             if not get_attr_status:
0121                 err_str = "Failed to get globus_compute_attr"
0122                 tmplog.error(err_str)
0123                 retList.append((work_spec.ST_failed, err_str))
0124                 continue
0125             else:
0126                 tmplog.debug(f"globus_compute_attr_dict ontained from work_spec = {globus_compute_attr_dict}")
0127             gc_task_id = globus_compute_attr_dict["gc_task_id"]
0128             sandbox_dir = globus_compute_attr_dict["sandbox_dir"]
0129             slurmID = work_spec.batchID
0130             if slurmID:
0131                 tmplog.debug(f"for workerID={work_spec.workerID}, slurm_job_id is fetched successfully with {slurmID}, so invoke get_status_sacct")
0132                 new_status, err_str = self.get_status_sacct(tmplog, slurmID)
0133             else:
0134                 tmplog.debug(f"for workerID={work_spec.workerID}, slurm_job_id has not been fetched, so try to fetch it")
0135                 retCode, stdOut = self.get_slurm_job_id(tmplog, sandbox_dir)
0136                 new_status = None
0137                 err_str = ""
0138                 status_dict = None
0139 
0140                 if retCode == 0:
0141                     slurmID = stdOut.strip()
0142                     work_spec.batchID = slurmID
0143                     work_spec.nativeStatus = f"update batchID to be slurm_id={slurmID}"
0144                     new_status = work_spec.ST_running
0145                     err_str = ""
0146                     tmplog.debug(f"retCode = 0, work_spec.batchID reset to {work_spec.batchID}")
0147                 else:
0148                     if retCode == 1:
0149                         tmplog.debug(f"File {sandbox_dir} does not exist, need to check if the job is still query")
0150                     else:
0151                         tmplog.error(f"Unknown return code {retCode} for {sandbox_dir}, need to check")
0152                     
0153                     task_list = [gc_task_id]
0154                     try:
0155                         status_dict = self.gc_client.get_batch_result(task_list)
0156                     except ComputeAPIError as e:
0157                         tmplog.error("ComputeAPIError occurred while retrieving batch results: %s", e)
0158                         new_status = work_spec.ST_failed
0159                         err_str = "ComputeAPIError"
0160                     except TaskExecutionFailed as e:
0161                         tmplog.error("TaskExecutionFailed occurred while retrieving batch results: %s", e)
0162                         new_status = work_spec.ST_failed
0163                         err_str = "TaskExecutionFailed"
0164                     except Exception as e:
0165                         tmplog.exception("An unexpected error occurred while retrieving batch results")
0166                         new_status = work_spec.ST_failed
0167                         err_str = "Unexpected error"
0168 
0169                     if status_dict:
0170                         tmplog.debug(f"Status dictionary received: {status_dict}")
0171                         worker_status = status_dict.get(gc_task_id, {})
0172                         pending = worker_status.get('pending')
0173                         status = worker_status.get('status')
0174                         err_str = ""
0175                         if pending:
0176                             new_status = work_spec.ST_running
0177                         else:
0178                             if status == "success":
0179                                 new_status = work_spec.ST_finished
0180                             else:
0181                                 tmplog.debug("Unexpected state: pending=%s, status=%s", pending, status)
0182                                 new_status = work_spec.ST_running
0183                         tmplog.debug(f"Final status for worker {gc_task_id}: pending={pending}, status={status}, new_status={new_status}")
0184                     else:
0185                         new_status = work_spec.ST_failed
0186                         err_str = "Careful! It might not finished! This is testing empty dict!"
0187 
0188             retList.append((new_status, err_str))
0189 
0190         return True, retList