Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import os
0002 import shutil
0003 
0004 import saga
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestercore.plugin_base import PluginBase
0007 from pandaharvester.harvestersubmitter.saga_submitter import SAGASubmitter
0008 
0009 # logger
0010 baseLogger = core_utils.setup_logger("saga_sweeper")
0011 
0012 
0013 # dummy plugin for sweeper
0014 class SAGASweeper(PluginBase):
0015     # constructor
0016     def __init__(self, **kwarg):
0017         PluginBase.__init__(self, **kwarg)
0018         tmpLog = core_utils.make_logger(baseLogger, method_name="__init__")
0019         tmpLog.info(f"[{self.adaptor}] SAGA adaptor will be used")
0020 
0021     # kill a worker
0022     def kill_worker(self, workspec):
0023         """Kill a worker in a scheduling system like batch systems and computing elements.
0024 
0025         :param workspec: worker specification
0026         :type workspec: WorkSpec
0027         :return: A tuple of return code (True for success, False otherwise) and error dialog
0028         :rtype: (bool, string)
0029         """
0030         job_service = saga.job.Service(self.adaptor)
0031         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0032         tmpLog.info(f"[{self.adaptor}] SAGA adaptor will be used to kill worker {workspec.workerID} with batchid {workspec.batchID}")
0033         errStr = ""
0034 
0035         if workspec.batchID:
0036             saga_submission_id = f"[{self.adaptor}]-[{workspec.batchID}]"
0037             try:
0038                 worker = job_service.get_job(saga_submission_id)
0039                 tmpLog.info(f"SAGA State for submission with batchid: {workspec.batchID} is: {worker.state}")
0040                 harvester_job_state = SAGASubmitter.status_translator(worker.state)
0041                 tmpLog.info(f"Worker state with batchid: {workspec.batchID} is: {harvester_job_state}")
0042                 if worker.state in [saga.job.PENDING, saga.job.RUNNING]:
0043                     worker.cancel()
0044                     tmpLog.info(f"Worker {workspec.workerID} with batchid {workspec.batchID} canceled")
0045             except saga.SagaException as ex:
0046                 errStr = ex.get_message()
0047                 tmpLog.info(f"An exception occured during canceling of worker: {errStr}")
0048 
0049                 # probably 'failed' is not proper state in this case, 'undefined' looks a bit better
0050                 # harvester_job_state = workspec.ST_failed
0051 
0052         job_service.close()
0053 
0054         return True, errStr
0055 
0056     # cleanup for a worker
0057     def sweep_worker(self, workspec):
0058         """Perform cleanup procedures for a worker, such as deletion of work directory.
0059 
0060         :param workspec: worker specification
0061         :type workspec: WorkSpec
0062         :return: A tuple of return code (True for success, False otherwise) and error dialog
0063         :rtype: (bool, string)
0064         """
0065 
0066         # Make logger
0067         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0068 
0069         # Clean up worker directory
0070         if os.path.exists(workspec.accessPoint):
0071             shutil.rmtree(workspec.accessPoint)
0072             tmpLog.info(" removed {1}".format(workspec.workerID, workspec.accessPoint))
0073         else:
0074             tmpLog.info("access point already removed.")
0075         # Return
0076         return True, ""