File indexing completed on 2026-04-20 07:59:01
0001 import os
0002 import shutil
0003
0004 from pandaharvester.harvestercore import core_utils
0005 from pandaharvester.harvestermisc.superfacility_utils import SuperfacilityClient
0006 from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
0007
0008 baseLogger = core_utils.setup_logger("superfacility_sweeper")
0009
0010
0011 class SuperfacilitySweeper(BaseSweeper):
0012 def __init__(self, **kwargs):
0013 BaseSweeper.__init__(self, **kwargs)
0014 self.cred_dir = kwargs.get("superfacility_cred_dir")
0015 self.sf_client = SuperfacilityClient(self.cred_dir)
0016
0017 def kill_worker(self, workspec):
0018 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0019 jobid = workspec.batchID
0020 if not jobid:
0021 return False, "no batchID to kill"
0022
0023 try:
0024 r = self.sf_client.delete(f"/compute/jobs/perlmutter/{jobid}")
0025 data = r.json()
0026 except Exception as e:
0027 errStr = f"Submission of a job cancelling fail for jobid = {jobid} with error: {e}"
0028 tmpLog.error(errStr)
0029 return False, errStr
0030
0031 if data.get("status") == "success":
0032 tmpLog.info(f"Succeeded to kill workerID={workspec.workerID} batchID={workspec.workerID}")
0033 else:
0034 errStr = f"Failed to cancel job {jobid}: status: {data.get('status')}"
0035 tmpLog.error(errStr)
0036 return False, errStr
0037 return True, ""
0038
0039 def sweep_worker(self, workspec):
0040 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0041 ap = workspec.accessPoint
0042 if ap and os.path.exists(ap):
0043 try:
0044 shutil.rmtree(ap)
0045 tmpLog.info(f"Removed directory {ap}")
0046 except Exception as e:
0047 err = f"Failed to remove {ap}: {e}"
0048 tmpLog.error(err)
0049 return False, err
0050 else:
0051 tmpLog.info("Access point already removed or none provided.")
0052 return True, ""