Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:59:00

0001 import os
0002 import shutil
0003 import subprocess
0004 
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestermisc.htcondor_utils import (
0007     CondorJobManage,
0008     _runShell,
0009     condor_job_id_from_workspec,
0010     get_host_batchid_map,
0011 )
0012 from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
0013 
0014 # Logger
0015 baseLogger = core_utils.setup_logger("htcondor_sweeper")
0016 
0017 
0018 # sweeper for HTCONDOR batch system
0019 class HTCondorSweeper(BaseSweeper):
0020     # constructor
0021     def __init__(self, **kwarg):
0022         BaseSweeper.__init__(self, **kwarg)
0023 
0024     # # kill a worker
0025     # def kill_worker(self, workspec):
0026     #     # Make logger
0027     #     tmpLog = self.make_logger(baseLogger, 'workerID={0}'.format(workspec.workerID),
0028     #                               method_name='kill_worker')
0029     #
0030     #     # Skip batch operation for workers without batchID
0031     #     if workspec.batchID is None:
0032     #         tmpLog.info('Found workerID={0} has submissionHost={1} batchID={2} . Cannot kill. Skipped '.format(
0033     #                         workspec.workerID, workspec.submissionHost, workspec.batchID))
0034     #         return True, ''
0035     #
0036     #     # Parse condor remote options
0037     #     name_opt, pool_opt = '', ''
0038     #     if workspec.submissionHost is None or workspec.submissionHost == 'LOCAL':
0039     #         pass
0040     #     else:
0041     #         try:
0042     #             condor_schedd, condor_pool = workspec.submissionHost.split(',')[0:2]
0043     #         except ValueError:
0044     #             errStr = 'Invalid submissionHost: {0} . Skipped'.format(workspec.submissionHost)
0045     #             tmpLog.error(errStr)
0046     #             return False, errStr
0047     #         name_opt = '-name {0}'.format(condor_schedd) if condor_schedd else ''
0048     #         pool_opt = '-pool {0}'.format(condor_pool) if condor_pool else ''
0049     #
0050     #     # Kill command
0051     #     comStr = 'condor_rm {name_opt} {pool_opt} {batchID}'.format(name_opt=name_opt,
0052     #                                                                 pool_opt=pool_opt,
0053     #                                                                 batchID=workspec.batchID)
0054     #     (retCode, stdOut, stdErr) = _runShell(comStr)
0055     #     if retCode != 0:
0056     #         comStr = 'condor_q -l {name_opt} {pool_opt} {batchID}'.format(name_opt=name_opt,
0057     #                                                                     pool_opt=pool_opt,
0058     #                                                                     batchID=workspec.batchID)
0059     #         (retCode, stdOut, stdErr) = _runShell(comStr)
0060     #         if ('ClusterId = {0}'.format(workspec.batchID) in str(stdOut) \
0061     #             and 'JobStatus = 3' not in str(stdOut)) or retCode != 0:
0062     #             # Force to cancel if batch job not terminated first time
0063     #             comStr = 'condor_rm -forcex {name_opt} {pool_opt} {batchID}'.format(name_opt=name_opt,
0064     #                                                                         pool_opt=pool_opt,
0065     #                                                                         batchID=workspec.batchID)
0066     #             (retCode, stdOut, stdErr) = _runShell(comStr)
0067     #             if retCode != 0:
0068     #                 # Command failed to kill
0069     #                 errStr = 'command "{0}" failed, retCode={1}, error: {2} {3}'.format(comStr, retCode, stdOut, stdErr)
0070     #                 tmpLog.error(errStr)
0071     #                 return False, errStr
0072     #         # Found already killed
0073     #         tmpLog.info('Found workerID={0} submissionHost={1} batchID={2} already killed'.format(
0074     #                         workspec.workerID, workspec.submissionHost, workspec.batchID))
0075     #     else:
0076     #         tmpLog.info('Succeeded to kill workerID={0} submissionHost={1} batchID={2}'.format(
0077     #                         workspec.workerID, workspec.submissionHost, workspec.batchID))
0078     #     # Return
0079     #     return True, ''
0080 
0081     # kill workers
0082 
0083     def kill_workers(self, workspec_list):
0084         # Make logger
0085         tmpLog = self.make_logger(baseLogger, method_name="kill_workers")
0086         tmpLog.debug("start")
0087         # Initialization
0088         all_job_ret_map = {}
0089         retList = []
0090         # Kill
0091         for submissionHost, batchIDs_dict in get_host_batchid_map(workspec_list).items():
0092             batchIDs_list = list(batchIDs_dict.keys())
0093             try:
0094                 condor_job_manage = CondorJobManage(id=submissionHost)
0095                 ret_map = condor_job_manage.remove(batchIDs_list)
0096             except Exception as e:
0097                 ret_map = {}
0098                 ret_err_str = f"Exception {e.__class__.__name__}: {e}"
0099                 tmpLog.error(ret_err_str)
0100             all_job_ret_map.update(ret_map)
0101         # Fill return list
0102         for workspec in workspec_list:
0103             if workspec.batchID is None:
0104                 ret = (True, "worker without batchID; skipped")
0105             else:
0106                 ret = all_job_ret_map.get(condor_job_id_from_workspec(workspec), (False, "batch job not found in return map"))
0107             retList.append(ret)
0108         tmpLog.debug("done")
0109         # Return
0110         return retList
0111 
0112     # cleanup for a worker
0113     def sweep_worker(self, workspec):
0114         # Make logger
0115         tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0116         tmpLog.debug("start")
0117         # Clean up preparator base directory (staged-in files)
0118         try:
0119             preparatorBasePath = self.preparatorBasePath
0120         except AttributeError:
0121             tmpLog.debug("No preparator base directory is configured. Skipped cleaning up preparator directory")
0122         else:
0123             if os.path.isdir(preparatorBasePath):
0124                 if not workspec.get_jobspec_list():
0125                     tmpLog.warning(f"No job PandaID found relate to workerID={workspec.workerID}. Skipped cleaning up preparator directory")
0126                 else:
0127                     for jobspec in workspec.get_jobspec_list():
0128                         preparator_dir_for_cleanup = os.path.join(preparatorBasePath, str(jobspec.PandaID))
0129                         if os.path.isdir(preparator_dir_for_cleanup) and preparator_dir_for_cleanup != preparatorBasePath:
0130                             try:
0131                                 shutil.rmtree(preparator_dir_for_cleanup)
0132                             except OSError as _err:
0133                                 if "No such file or directory" in _err.strerror:
0134                                     tmpLog.debug(f"Found that {_err.filename} was already removed")
0135                                 pass
0136                             tmpLog.info(f"Succeeded to clean up preparator directory: Removed {preparator_dir_for_cleanup}")
0137                         else:
0138                             errStr = f"Failed to clean up preparator directory: {preparator_dir_for_cleanup} does not exist or invalid to be cleaned up"
0139                             tmpLog.error(errStr)
0140                             return False, errStr
0141             else:
0142                 errStr = f"Configuration error: Preparator base directory {preparatorBasePath} does not exist"
0143                 tmpLog.error(errStr)
0144                 return False, errStr
0145         tmpLog.info(f"Succeeded to clean up everything about workerID={workspec.workerID}")
0146         tmpLog.debug("done")
0147         # Return
0148         return True, ""