Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 # === Imports ==================================================
0002 
0003 import os
0004 import shutil
0005 import subprocess
0006 
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestercore.plugin_base import PluginBase
0009 
0010 # ==============================================================
0011 
0012 # === Definitions ==============================================
0013 
0014 # Logger
0015 baseLogger = core_utils.setup_logger("cobalt_sweeper")
0016 
0017 # ==============================================================
0018 
0019 # === Functions ================================================
0020 
0021 
0022 def _runShell(cmd):
0023     cmd = str(cmd)
0024     p = subprocess.Popen(cmd.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0025     stdOut, stdErr = p.communicate()
0026     retCode = p.returncode
0027     return (retCode, stdOut, stdErr)
0028 
0029 
0030 # ==============================================================
0031 
0032 # === Classes ==================================================
0033 
0034 # dummy plugin for sweeper
0035 
0036 
0037 class CobaltSweeper(PluginBase):
0038     # constructor
0039     def __init__(self, **kwarg):
0040         PluginBase.__init__(self, **kwarg)
0041 
0042     # kill a worker
0043     def kill_worker(self, workspec):
0044         """Kill a worker in a scheduling system like batch systems and computing elements.
0045 
0046         :param workspec: worker specification
0047         :type workspec: WorkSpec
0048         :return: A tuple of return code (True for success, False otherwise) and error dialog
0049         :rtype: (bool, string)
0050         """
0051 
0052         # Make logger
0053         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0054 
0055         # Kill command
0056         comStr = f"qdel {workspec.batchID}"
0057         (retCode, stdOut, stdErr) = _runShell(comStr)
0058         if retCode != 0:
0059             # Command failed
0060             errStr = f'command "{comStr}" failed, retCode={retCode}, error: {stdOut} {stdErr}'
0061             tmpLog.error(errStr)
0062             return False, errStr
0063         else:
0064             tmpLog.info(f"Succeeded to kill workerID={workspec.workerID} batchID={workspec.workerID}")
0065 
0066         # Return
0067         return True, ""
0068 
0069     # cleanup for a worker
0070     def sweep_worker(self, workspec):
0071         """Perform cleanup procedures for a worker, such as deletion of work directory.
0072 
0073         :param workspec: worker specification
0074         :type workspec: WorkSpec
0075         :return: A tuple of return code (True for success, False otherwise) and error dialog
0076         :rtype: (bool, string)
0077         """
0078 
0079         # Make logger
0080         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0081 
0082         # Clean up worker directory
0083         if os.path.exists(workspec.accessPoint):
0084             shutil.rmtree(workspec.accessPoint)
0085             tmpLog.info(" removed {1}".format(workspec.workerID, workspec.accessPoint))
0086         else:
0087             tmpLog.info("access point already removed.")
0088         # Return
0089         return True, ""
0090 
0091 
0092 # ==============================================================