File indexing completed on 2026-04-20 07:59:00
0001
0002
0003 import os
0004 import shutil
0005 import subprocess
0006
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009
0010
0011
0012
0013
0014
0015 baseLogger = core_utils.setup_logger("cobalt_sweeper")
0016
0017
0018
0019
0020
0021
0022 def _runShell(cmd):
0023 cmd = str(cmd)
0024 p = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0025 stdOut, stdErr = p.communicate()
0026 retCode = p.returncode
0027 return (retCode, stdOut, stdErr)
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 class CobaltSweeper(PluginBase):
0038
0039 def __init__(self, **kwarg):
0040 PluginBase.__init__(self, **kwarg)
0041
0042
0043 def kill_worker(self, workspec):
0044 """Kill a worker in a scheduling system like batch systems and computing elements.
0045
0046 :param workspec: worker specification
0047 :type workspec: WorkSpec
0048 :return: A tuple of return code (True for success, False otherwise) and error dialog
0049 :rtype: (bool, string)
0050 """
0051
0052
0053 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0054
0055
0056 comStr = f"qdel {workspec.batchID}"
0057 (retCode, stdOut, stdErr) = _runShell(comStr)
0058 if retCode != 0:
0059
0060 errStr = f'command "{comStr}" failed, retCode={retCode}, error: {stdOut} {stdErr}'
0061 tmpLog.error(errStr)
0062 return False, errStr
0063 else:
0064 tmpLog.info(f"Succeeded to kill workerID={workspec.workerID} batchID={workspec.workerID}")
0065
0066
0067 return True, ""
0068
0069
0070 def sweep_worker(self, workspec):
0071 """Perform cleanup procedures for a worker, such as deletion of work directory.
0072
0073 :param workspec: worker specification
0074 :type workspec: WorkSpec
0075 :return: A tuple of return code (True for success, False otherwise) and error dialog
0076 :rtype: (bool, string)
0077 """
0078
0079
0080 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0081
0082
0083 if os.path.exists(workspec.accessPoint):
0084 shutil.rmtree(workspec.accessPoint)
0085 tmpLog.info(" removed {1}".format(workspec.workerID, workspec.accessPoint))
0086 else:
0087 tmpLog.info("access point already removed.")
0088
0089 return True, ""
0090
0091
0092