File indexing completed on 2026-04-20 07:58:57
0001 import os
0002 from os import walk
0003
0004 from pandaharvester.harvesterbody.agent_base import AgentBase
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.command_spec import CommandSpec
0008 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0009 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0010
0011
0012 _logger = core_utils.setup_logger("sweeper")
0013
0014
0015
0016 class Sweeper(AgentBase):
0017
0018 def __init__(self, queue_config_mapper, single_mode=False):
0019 AgentBase.__init__(self, single_mode)
0020 self.dbProxy = DBProxy()
0021 self.queueConfigMapper = queue_config_mapper
0022 self.pluginFactory = PluginFactory()
0023 self.lockedBy = None
0024
0025 def process_kill_commands(self):
0026
0027
0028 tmp_log = self.make_logger(_logger, f"id={self.lockedBy}", method_name="process_commands")
0029
0030
0031 stopwatch = core_utils.get_stopwatch()
0032 command_string = CommandSpec.COM_killWorkers
0033 tmp_log.debug(f"try to get {command_string} commands")
0034 command_specs = self.dbProxy.get_commands_for_receiver("sweeper", command_string)
0035 tmp_log.debug(f"got {len(command_specs)} {command_string} commands")
0036 for command_spec in command_specs:
0037 n_to_kill = self.dbProxy.mark_workers_to_kill_by_query(command_spec.params)
0038 tmp_log.debug(f"will kill {n_to_kill} workers with {command_spec.params}")
0039 tmp_log.debug(f"done handling {command_string} commands took {stopwatch.get_elapsed_time()}")
0040
0041
0042 stopwatch = core_utils.get_stopwatch()
0043 command_string = CommandSpec.COM_syncWorkersKill
0044 tmp_log.debug(f"try to get {command_string} commands")
0045 command_specs = self.dbProxy.get_commands_for_receiver("sweeper", command_string)
0046 tmp_log.debug(f"got {len(command_specs)} {command_string} commands")
0047 for command_spec in command_specs:
0048 n_to_kill = self.dbProxy.mark_workers_to_kill_by_workerids(command_spec.params)
0049 tmp_log.debug(f"will kill {n_to_kill} workers with {command_spec.params}")
0050 tmp_log.debug(f"done handling {command_string} commands took {stopwatch.get_elapsed_time()}")
0051
0052
0053 def run(self):
0054 self.lockedBy = f"sweeper-{self.get_pid()}"
0055 while True:
0056 sw_main = core_utils.get_stopwatch()
0057 main_log = self.make_logger(_logger, f"id={self.lockedBy}", method_name="run")
0058
0059
0060 try:
0061 self.process_kill_commands()
0062 except Exception:
0063 core_utils.dump_error_message(main_log)
0064
0065
0066 sw_kill = core_utils.get_stopwatch()
0067 main_log.debug("try to get workers to kill")
0068
0069 workers_to_kill = self.dbProxy.get_workers_to_kill(harvester_config.sweeper.maxWorkers, harvester_config.sweeper.checkInterval)
0070 main_log.debug(f"got {len(workers_to_kill)} queues to kill workers")
0071
0072 sw = core_utils.get_stopwatch()
0073 for queue_name, configIdWorkSpecList in workers_to_kill.items():
0074 for configID, workspec_list in configIdWorkSpecList.items():
0075
0076 if not self.queueConfigMapper.has_queue(queue_name, configID):
0077 main_log.error(f"queue config for {queue_name}/{configID} not found")
0078 continue
0079 queue_config = self.queueConfigMapper.get_queue(queue_name, configID)
0080 try:
0081 sweeper_core = self.pluginFactory.get_plugin(queue_config.sweeper)
0082 except Exception:
0083 main_log.error(f"failed to launch sweeper plugin for {queue_name}/{configID}")
0084 core_utils.dump_error_message(main_log)
0085 continue
0086 sw.reset()
0087 n_workers = len(workspec_list)
0088 try:
0089
0090 tmp_log = self.make_logger(_logger, f"id={self.lockedBy}", method_name="run")
0091 tmp_log.debug("start killing")
0092 tmp_list = sweeper_core.kill_workers(workspec_list)
0093 except AttributeError:
0094
0095 for workspec in workspec_list:
0096 tmp_log = self.make_logger(_logger, f"workerID={workspec.workerID}", method_name="run")
0097 try:
0098 tmp_log.debug("start killing one worker")
0099 tmp_stat, tmp_out = sweeper_core.kill_worker(workspec)
0100 tmp_log.debug(f"done killing with status={tmp_stat} diag={tmp_out}")
0101 except Exception:
0102 core_utils.dump_error_message(tmp_log)
0103 except Exception:
0104 core_utils.dump_error_message(main_log)
0105 else:
0106
0107 n_killed = 0
0108 for workspec, (tmp_stat, tmp_out) in zip(workspec_list, tmp_list):
0109 tmp_log.debug(f"done killing workerID={workspec.workerID} with status={tmp_stat} diag={tmp_out}")
0110 if tmp_stat:
0111 n_killed += 1
0112 tmp_log.debug(f"killed {n_killed}/{n_workers} workers")
0113 main_log.debug(f"done killing {n_workers} workers" + sw.get_elapsed_time())
0114 main_log.debug("done all killing" + sw_kill.get_elapsed_time())
0115
0116
0117 sw_cleanup = core_utils.get_stopwatch()
0118
0119 try:
0120 keep_missed = harvester_config.sweeper.keepMissed
0121 except Exception:
0122 keep_missed = 24
0123 try:
0124 keep_pending = harvester_config.sweeper.keepPending
0125 except Exception:
0126 keep_pending = 24
0127
0128 statusTimeoutMap = {
0129 "finished": harvester_config.sweeper.keepFinished,
0130 "failed": harvester_config.sweeper.keepFailed,
0131 "cancelled": harvester_config.sweeper.keepCancelled,
0132 "missed": keep_missed,
0133 "pending": keep_pending,
0134 }
0135 workersForCleanup = self.dbProxy.get_workers_for_cleanup(harvester_config.sweeper.maxWorkers, statusTimeoutMap)
0136 main_log.debug(f"got {len(workersForCleanup)} queues for workers cleanup")
0137 sw = core_utils.get_stopwatch()
0138 for queue_name, configIdWorkSpecList in workersForCleanup.items():
0139 for configID, workspec_list in configIdWorkSpecList.items():
0140
0141 if not self.queueConfigMapper.has_queue(queue_name, configID):
0142 main_log.error(f"queue config for {queue_name}/{configID} not found")
0143 continue
0144 queue_config = self.queueConfigMapper.get_queue(queue_name, configID)
0145 sweeper_core = self.pluginFactory.get_plugin(queue_config.sweeper)
0146 messenger = self.pluginFactory.get_plugin(queue_config.messenger)
0147 sw.reset()
0148 n_workers = len(workspec_list)
0149
0150 main_log.debug("making sure workers to clean up are all terminated")
0151 try:
0152
0153 tmp_list = sweeper_core.kill_workers(workspec_list)
0154 except AttributeError:
0155
0156 for workspec in workspec_list:
0157 tmp_log = self.make_logger(_logger, f"workerID={workspec.workerID}", method_name="run")
0158 try:
0159 tmp_stat, tmp_out = sweeper_core.kill_worker(workspec)
0160 except Exception:
0161 core_utils.dump_error_message(tmp_log)
0162 except Exception:
0163 core_utils.dump_error_message(main_log)
0164 main_log.debug("made sure workers to clean up are all terminated")
0165
0166 for workspec in workspec_list:
0167 tmp_log = self.make_logger(_logger, f"workerID={workspec.workerID}", method_name="run")
0168 try:
0169 tmp_log.debug("start cleaning up one worker")
0170
0171 tmp_stat, tmp_out = sweeper_core.sweep_worker(workspec)
0172 tmp_log.debug(f"swept_worker with status={tmp_stat} diag={tmp_out}")
0173 tmp_log.debug("start messenger cleanup")
0174 mc_tmp_stat, mc_tmp_out = messenger.clean_up(workspec)
0175 tmp_log.debug(f"messenger cleaned up with status={mc_tmp_stat} diag={mc_tmp_out}")
0176 if tmp_stat:
0177 self.dbProxy.delete_worker(workspec.workerID)
0178 except Exception:
0179 core_utils.dump_error_message(tmp_log)
0180 main_log.debug(f"done cleaning up {n_workers} workers" + sw.get_elapsed_time())
0181 main_log.debug("done all cleanup" + sw_cleanup.get_elapsed_time())
0182
0183
0184 sw_delete = core_utils.get_stopwatch()
0185 main_log.debug("delete old jobs")
0186 jobTimeout = max(statusTimeoutMap.values()) + 1
0187 self.dbProxy.delete_old_jobs(jobTimeout)
0188
0189 self.dbProxy.delete_orphaned_job_info()
0190 main_log.debug("done deletion of old jobs" + sw_delete.get_elapsed_time())
0191
0192 if hasattr(harvester_config.sweeper, "diskCleanUpInterval") and hasattr(harvester_config.sweeper, "diskHighWatermark"):
0193 locked = self.dbProxy.get_process_lock("sweeper", self.get_pid(), harvester_config.sweeper.diskCleanUpInterval * 60 * 60)
0194 if locked:
0195 try:
0196 all_active_files = None
0197 for item in harvester_config.sweeper.diskHighWatermark.split(","):
0198
0199 dir_name, watermark = item.split("|")
0200 main_log.debug(f"checking {dir_name} for cleanup with watermark {watermark} GB")
0201 watermark = int(watermark) * 10**9
0202 total_size = 0
0203 file_dict = {}
0204
0205 for root, dirs, filenames in walk(dir_name):
0206 for base_name in filenames:
0207 full_name = os.path.join(root, base_name)
0208 f_size = os.path.getsize(full_name)
0209 total_size += f_size
0210 mtime = os.path.getmtime(full_name)
0211 file_dict.setdefault(mtime, set())
0212 file_dict[mtime].add((base_name, full_name, f_size))
0213
0214 if total_size < watermark:
0215 main_log.debug(f"skip cleanup {dir_name} due to total_size {total_size // 10 ** 9} GB < watermark {watermark // 10 ** 9} GB")
0216 else:
0217 main_log.debug(f"cleanup {dir_name} due to total_size {total_size // 10 ** 9} GB >= watermark {watermark // 10 ** 9} GB")
0218
0219 if all_active_files is None:
0220 all_active_files = self.dbProxy.get_all_active_input_files()
0221 deleted_size = 0
0222 mtimes = sorted(file_dict.keys())
0223 for mtime in mtimes:
0224 for base_name, full_name, f_size in file_dict[mtime]:
0225
0226 if base_name in all_active_files:
0227 continue
0228 try:
0229 os.remove(full_name)
0230 except Exception:
0231 core_utils.dump_error_message(main_log)
0232 deleted_size += f_size
0233 if total_size - deleted_size < watermark:
0234 break
0235 if total_size - deleted_size < watermark:
0236 break
0237 except Exception:
0238 core_utils.dump_error_message(main_log)
0239
0240 main_log.debug("done a sweeper cycle" + sw_main.get_elapsed_time())
0241
0242 if self.terminated(harvester_config.sweeper.sleepTime):
0243 main_log.debug("terminated")
0244 return