Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:59

0001 import requests
0002 import json
0003 import os
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestercore.work_spec import WorkSpec
0008 from pandaharvester.harvestermisc.superfacility_utils import SuperfacilityClient
0009 
0010 # logger
0011 baseLogger = core_utils.setup_logger("superfacility_monitor")
0012 
0013 # monitor for SuperFacility API
0014 class SuperfacilityMonitor(PluginBase):
0015     # constructor
0016     def __init__(self, **kwarg):
0017         PluginBase.__init__(self, **kwarg)
0018         self.cred_dir = kwarg.get("superfacility_cred_dir")
0019         self.sf_client = SuperfacilityClient(self.cred_dir)
0020  
0021     def check_workers(self, workspec_list):
0022         retList = []
0023         for workSpec in workspec_list:
0024             # make logger
0025             tmpLog = self.make_logger(baseLogger, f"workerID={workSpec.workerID}", method_name="check_workers")
0026 
0027             jobid = workSpec.batchID
0028             if not jobid:
0029                 retList.append((WorkSpec.ST_failed, "no batchID, job is not submitted!"))
0030                 continue
0031 
0032             try:
0033                 r = self.sf_client.get(f"/compute/jobs/perlmutter/{jobid}?sacct=true&cached=false")
0034                 data = r.json()
0035             #FIXME: How to handle httperror?
0036             except requests.HTTPError as e:
0037                 newStatus = WorkSpec.ST_failed
0038                 retList.append((WorkSpec.ST_failed, f"can not get query slurm job {jobid} due to {e}"))
0039                 continue
0040             
0041             batchStatus = data["output"][0]['state'].upper()
0042             #FIXME: Are these mapping correct? Some do not exist, and some seem mismatch
0043             if batchStatus in ["RUNNING", "COMPLETING", "STOPPED", "SUSPENDED"]:
0044                 newStatus = WorkSpec.ST_running
0045             elif batchStatus in ["COMPLETED", "PREEMPTED", "TIMEOUT"]:
0046                 newStatus = WorkSpec.ST_finished
0047             elif batchStatus in ["CANCELLED"]:
0048                 newStatus = WorkSpec.ST_cancelled
0049             elif batchStatus in ["CONFIGURING", "PENDING"]:
0050                 newStatus = WorkSpec.ST_submitted
0051             else:
0052                 newStatus = WorkSpec.ST_failed
0053             tmpLog.debug(f"batchStatus {batchStatus} -> workerStatus {newStatus}")
0054             retList.append((newStatus, ""))
0055         return True, retList