File indexing completed on 2026-04-20 07:59:00
0001 import os
0002 import shutil
0003
0004 import saga
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestersubmitter.saga_submitter import SAGASubmitter
0008
0009
0010 baseLogger = core_utils.setup_logger("saga_sweeper")
0011
0012
0013
0014 class SAGASweeper(PluginBase):
0015
0016 def __init__(self, **kwarg):
0017 PluginBase.__init__(self, **kwarg)
0018 tmpLog = core_utils.make_logger(baseLogger, method_name="__init__")
0019 tmpLog.info(f"[{self.adaptor}] SAGA adaptor will be used")
0020
0021
0022 def kill_worker(self, workspec):
0023 """Kill a worker in a scheduling system like batch systems and computing elements.
0024
0025 :param workspec: worker specification
0026 :type workspec: WorkSpec
0027 :return: A tuple of return code (True for success, False otherwise) and error dialog
0028 :rtype: (bool, string)
0029 """
0030 job_service = saga.job.Service(self.adaptor)
0031 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0032 tmpLog.info(f"[{self.adaptor}] SAGA adaptor will be used to kill worker {workspec.workerID} with batchid {workspec.batchID}")
0033 errStr = ""
0034
0035 if workspec.batchID:
0036 saga_submission_id = f"[{self.adaptor}]-[{workspec.batchID}]"
0037 try:
0038 worker = job_service.get_job(saga_submission_id)
0039 tmpLog.info(f"SAGA State for submission with batchid: {workspec.batchID} is: {worker.state}")
0040 harvester_job_state = SAGASubmitter.status_translator(worker.state)
0041 tmpLog.info(f"Worker state with batchid: {workspec.batchID} is: {harvester_job_state}")
0042 if worker.state in [saga.job.PENDING, saga.job.RUNNING]:
0043 worker.cancel()
0044 tmpLog.info(f"Worker {workspec.workerID} with batchid {workspec.batchID} canceled")
0045 except saga.SagaException as ex:
0046 errStr = ex.get_message()
0047 tmpLog.info(f"An exception occured during canceling of worker: {errStr}")
0048
0049
0050
0051
0052 job_service.close()
0053
0054 return True, errStr
0055
0056
0057 def sweep_worker(self, workspec):
0058 """Perform cleanup procedures for a worker, such as deletion of work directory.
0059
0060 :param workspec: worker specification
0061 :type workspec: WorkSpec
0062 :return: A tuple of return code (True for success, False otherwise) and error dialog
0063 :rtype: (bool, string)
0064 """
0065
0066
0067 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0068
0069
0070 if os.path.exists(workspec.accessPoint):
0071 shutil.rmtree(workspec.accessPoint)
0072 tmpLog.info(" removed {1}".format(workspec.workerID, workspec.accessPoint))
0073 else:
0074 tmpLog.info("access point already removed.")
0075
0076 return True, ""