File indexing completed on 2026-04-20 07:58:59
0001 import requests
0002 import json
0003 import os
0004
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestercore.work_spec import WorkSpec
0008 from pandaharvester.harvestermisc.superfacility_utils import SuperfacilityClient
0009
0010
0011 baseLogger = core_utils.setup_logger("superfacility_monitor")
0012
0013
0014 class SuperfacilityMonitor(PluginBase):
0015
0016 def __init__(self, **kwarg):
0017 PluginBase.__init__(self, **kwarg)
0018 self.cred_dir = kwarg.get("superfacility_cred_dir")
0019 self.sf_client = SuperfacilityClient(self.cred_dir)
0020
0021 def check_workers(self, workspec_list):
0022 retList = []
0023 for workSpec in workspec_list:
0024
0025 tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0026
0027 jobid = workSpec.batchID
0028 if not jobid:
0029 retList.append((WorkSpec.ST_failed, "no batchID, job is not submitted!"))
0030 continue
0031
0032 try:
0033 r = self.sf_client.get(f"/compute/jobs/perlmutter/{jobid}?sacct=true&cached=false")
0034 data = r.json()
0035
0036 except requests.HTTPError as e:
0037 newStatus = WorkSpec.ST_failed
0038 retList.append((WorkSpec.ST_failed, f"can not get query slurm job {jobid} due to {e}"))
0039 continue
0040
0041 batchStatus = data["output"][0]['state'].upper()
0042
0043 if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0044 newStatus = WorkSpec.ST_running
0045 elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0046 newStatus = WorkSpec.ST_finished
0047 elif batchStatus in ["CANCELLED"]:
0048 newStatus = WorkSpec.ST_cancelled
0049 elif batchStatus in ["CONFIGURING", "PENDING"]:
0050 newStatus = WorkSpec.ST_submitted
0051 else:
0052 newStatus = WorkSpec.ST_failed
0053 tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0054 retList.append((newStatus, ""))
0055 return True, retList