File indexing completed on 2026-04-20 07:59:00
0001 import os
0002 import shutil
0003 import subprocess
0004
0005 from pandaharvester.harvestercore import core_utils
0006 from pandaharvester.harvestermisc.htcondor_utils import (
0007 CondorJobManage,
0008 _runShell,
0009 condor_job_id_from_workspec,
0010 get_host_batchid_map,
0011 )
0012 from pandaharvester.harvestersweeper.base_sweeper import BaseSweeper
0013
0014
0015 baseLogger = core_utils.setup_logger("htcondor_sweeper")
0016
0017
0018
0019 class HTCondorSweeper(BaseSweeper):
0020
0021 def __init__(self, **kwarg):
0022 BaseSweeper.__init__(self, **kwarg)
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083 def kill_workers(self, workspec_list):
0084
0085 tmpLog = self.make_logger(baseLogger, method_name="kill_workers")
0086 tmpLog.debug("start")
0087
0088 all_job_ret_map = {}
0089 retList = []
0090
0091 for submissionHost, batchIDs_dict in get_host_batchid_map(workspec_list).items():
0092 batchIDs_list = list(batchIDs_dict.keys())
0093 try:
0094 condor_job_manage = CondorJobManage(id=submissionHost)
0095 ret_map = condor_job_manage.remove(batchIDs_list)
0096 except Exception as e:
0097 ret_map = {}
0098 ret_err_str = f"Exception {e.__class__.__name__}: {e}"
0099 tmpLog.error(ret_err_str)
0100 all_job_ret_map.update(ret_map)
0101
0102 for workspec in workspec_list:
0103 if workspec.batchID is None:
0104 ret = (True, "worker without batchID; skipped")
0105 else:
0106 ret = all_job_ret_map.get(condor_job_id_from_workspec(workspec), (False, "batch job not found in return map"))
0107 retList.append(ret)
0108 tmpLog.debug("done")
0109
0110 return retList
0111
0112
0113 def sweep_worker(self, workspec):
0114
0115 tmpLog = self.make_logger(baseLogger, f"workerID={workspec.workerID}", method_name="sweep_worker")
0116 tmpLog.debug("start")
0117
0118 try:
0119 preparatorBasePath = self.preparatorBasePath
0120 except AttributeError:
0121 tmpLog.debug("No preparator base directory is configured. Skipped cleaning up preparator directory")
0122 else:
0123 if os.path.isdir(preparatorBasePath):
0124 if not workspec.get_jobspec_list():
0125 tmpLog.warning(f"No job PandaID found relate to workerID={workspec.workerID}. Skipped cleaning up preparator directory")
0126 else:
0127 for jobspec in workspec.get_jobspec_list():
0128 preparator_dir_for_cleanup = os.path.join(preparatorBasePath, str(jobspec.PandaID))
0129 if os.path.isdir(preparator_dir_for_cleanup) and preparator_dir_for_cleanup != preparatorBasePath:
0130 try:
0131 shutil.rmtree(preparator_dir_for_cleanup)
0132 except OSError as _err:
0133 if "No such file or directory" in _err.strerror:
0134 tmpLog.debug(f"Found that {_err.filename} was already removed")
0135 pass
0136 tmpLog.info(f"Succeeded to clean up preparator directory: Removed {preparator_dir_for_cleanup}")
0137 else:
0138 errStr = f"Failed to clean up preparator directory: {preparator_dir_for_cleanup} does not exist or invalid to be cleaned up"
0139 tmpLog.error(errStr)
0140 return False, errStr
0141 else:
0142 errStr = f"Configuration error: Preparator base directory {preparatorBasePath} does not exist"
0143 tmpLog.error(errStr)
0144 return False, errStr
0145 tmpLog.info(f"Succeeded to clean up everything about workerID={workspec.workerID}")
0146 tmpLog.debug("done")
0147
0148 return True, ""