Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import os
0002 import shutil
0003 
0004 from act.atlas.aCTDBPanda import aCTDBPanda
0005 from act.common.aCTConfig import aCTConfigARC
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008 
0009 # logger
0010 baseLogger = core_utils.setup_logger("act_sweeper")
0011 
0012 
0013 # plugin for aCT sweeper
0014 class ACTSweeper(PluginBase):
0015     # constructor
0016     def __init__(self, **kwarg):
0017         PluginBase.__init__(self, **kwarg)
0018 
0019         self.log = core_utils.make_logger(baseLogger, "aCT sweeper", method_name="__init__")
0020         try:
0021             self.actDB = aCTDBPanda(self.log)
0022         except Exception as e:
0023             self.log.error(f"Could not connect to aCT database: {str(e)}")
0024             self.actDB = None
0025 
0026     # kill a worker
0027 
0028     def kill_worker(self, workspec):
0029         """Mark aCT job as tobekilled.
0030 
0031         :param workspec: worker specification
0032         :type workspec: WorkSpec
0033         :return: A tuple of return code (True for success, False otherwise) and error dialog
0034         :rtype: (bool, string)
0035         """
0036         # make logger
0037         tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0038 
0039         if workspec.batchID is None:
0040             tmpLog.info(f"workerID={workspec.workerID} has no batch ID so assume was not submitted - skipped")
0041             return True, ""
0042 
0043         try:
0044             # Only kill jobs which are still active
0045             self.actDB.updateJobs(
0046                 f"id={workspec.batchID} AND actpandastatus IN ('sent', 'starting', 'running')", {"actpandastatus": "tobekilled", "pandastatus": None}
0047             )
0048         except Exception as e:
0049             if self.actDB:
0050                 tmpLog.error(f"Failed to cancel job {workspec.batchID} in aCT: {str(e)}")
0051             return False, str(e)
0052 
0053         tmpLog.info(f"Job {workspec.batchID} cancelled in aCT")
0054         return True, ""
0055 
0056     # cleanup for a worker
0057 
0058     def sweep_worker(self, workspec):
0059         """Clean up access point. aCT takes care of archiving its own jobs.
0060 
0061         :param workspec: worker specification
0062         :type workspec: WorkSpec
0063         :return: A tuple of return code (True for success, False otherwise) and error dialog
0064         :rtype: (bool, string)
0065         """
0066         # make logger
0067         tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0068         # clean up worker directory
0069         if os.path.exists(workspec.accessPoint):
0070             shutil.rmtree(workspec.accessPoint)
0071             tmpLog.info(f"removed {workspec.accessPoint}")
0072         else:
0073             tmpLog.info(f"access point {workspec.accessPoint} already removed.")
0074         # return
0075         return True, ""