File indexing completed on 2026-04-20 07:59:00
0001 import os
0002 import shutil
0003
0004 from act.atlas.aCTDBPanda import aCTDBPanda
0005 from act.common.aCTConfig import aCTConfigARC
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.plugin_base import PluginBase
0008
0009
0010 baseLogger = core_utils.setup_logger("act_sweeper")
0011
0012
0013
0014 class ACTSweeper(PluginBase):
0015
0016 def __init__(self, **kwarg):
0017 PluginBase.__init__(self, **kwarg)
0018
0019 self.log = core_utils.make_logger(baseLogger, "aCT sweeper", method_name="__init__")
0020 try:
0021 self.actDB = aCTDBPanda(self.log)
0022 except Exception as e:
0023 self.log.error(f"Could not connect to aCT database: {str(e)}")
0024 self.actDB = None
0025
0026
0027
0028 def kill_worker(self, workspec):
0029 """Mark aCT job as tobekilled.
0030
0031 :param workspec: worker specification
0032 :type workspec: WorkSpec
0033 :return: A tuple of return code (True for success, False otherwise) and error dialog
0034 :rtype: (bool, string)
0035 """
0036
0037 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0038
0039 if workspec.batchID is None:
0040 tmpLog.info(f"workerID={workspec.workerID} has no batch ID so assume was not submitted - skipped")
0041 return True, ""
0042
0043 try:
0044
0045 self.actDB.updateJobs(
0046 f"id={workspec.batchID} AND actpandastatus IN ('sent', 'starting', 'running')", {"actpandastatus": "tobekilled", "pandastatus": None}
0047 )
0048 except Exception as e:
0049 if self.actDB:
0050 tmpLog.error(f"Failed to cancel job {workspec.batchID} in aCT: {str(e)}")
0051 return False, str(e)
0052
0053 tmpLog.info(f"Job {workspec.batchID} cancelled in aCT")
0054 return True, ""
0055
0056
0057
0058 def sweep_worker(self, workspec):
0059 """Clean up access point. aCT takes care of archiving its own jobs.
0060
0061 :param workspec: worker specification
0062 :type workspec: WorkSpec
0063 :return: A tuple of return code (True for success, False otherwise) and error dialog
0064 :rtype: (bool, string)
0065 """
0066
0067 tmpLog = core_utils.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0068
0069 if os.path.exists(workspec.accessPoint):
0070 shutil.rmtree(workspec.accessPoint)
0071 tmpLog.info(f"removed {workspec.accessPoint}")
0072 else:
0073 tmpLog.info(f"access point {workspec.accessPoint} already removed.")
0074
0075 return True, ""