Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:01

0001 import os
0002 import shutil
0003 import subprocess
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
0007 
0008 # logger
0009 baseLogger = core_utils.setup_logger("slurm_sweeper")
0010 
0011 
0012 # plugin for sweeper with SLURM
0013 class SlurmSweeper(BaseSweeper):
0014     # constructor
0015     def __init__(self, **kwarg):
0016         BaseSweeper.__init__(self, **kwarg)
0017 
0018     # kill a worker
0019     def kill_worker(self, workspec):
0020         """Kill a worker in a scheduling system like batch systems and computing elements.
0021 
0022         :param workspec: worker specification
0023         :type workspec: WorkSpec
0024         :return: A tuple of return code (True for success, False otherwise) and error dialog
0025         :rtype: (bool, string)
0026         """
0027         # make logger
0028         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="kill_worker")
0029         # kill command
0030         comStr = f"scancel {workspec.batchID}"
0031         # execute
0032         p = subprocess.Popen(comStr.split(), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
0033         stdOut, stdErr = p.communicate()
0034         retCode = p.returncode
0035         if retCode != 0:
0036             # failed
0037             errStr = f'command "{comStr}" failed, retCode={retCode}, error: {stdOut} {stdErr}'
0038             tmpLog.error(errStr)
0039             return False, errStr
0040         else:
0041             tmpLog.info(f"Succeeded to kill workerID={workspec.workerID} batchID={workspec.workerID}")
0042         # return
0043         return True, ""
0044 
0045     # cleanup for a worker
0046     def sweep_worker(self, workspec):
0047         """Perform cleanup procedures for a worker, such as deletion of work directory.
0048 
0049         :param workspec: worker specification
0050         :type workspec: WorkSpec
0051         :return: A tuple of return code (True for success, False otherwise) and error dialog
0052         :rtype: (bool, string)
0053         """
0054         # make logger
0055         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0056         # clean up worker directory
0057         if os.path.exists(workspec.accessPoint):
0058             shutil.rmtree(workspec.accessPoint)
0059             tmpLog.info(f"removed {workspec.accessPoint}")
0060         else:
0061             tmpLog.info("access point already removed.")
0062         # return
0063         return True, ""