Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:57

0001 import os
0002 from os import walk
0003 
0004 from pandaharvester.harvesterbody.agent_base import AgentBase
0005 from pandaharvester.harvesterconfig import harvester_config
0006 from pandaharvester.harvestercore import core_utils
0007 from pandaharvester.harvestercore.command_spec import CommandSpec
0008 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0009 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0010 
0011 # logger
0012 _logger = core_utils.setup_logger("sweeper")
0013 
0014 
0015 # class for cleanup
0016 class Sweeper(AgentBase):
0017     # constructor
0018     def __init__(self, queue_config_mapper, single_mode=False):
0019         AgentBase.__init__(self, single_mode)
0020         self.dbProxy = DBProxy()
0021         self.queueConfigMapper = queue_config_mapper
0022         self.pluginFactory = PluginFactory()
0023         self.lockedBy = None
0024 
0025     def process_kill_commands(self):
0026         # process commands for marking workers that need to be killed
0027 
0028         tmp_log = self.make_logger(_logger, f"id={self.lockedBy}", method_name="process_commands")
0029 
0030         # 1. KILL_WORKER commands that were sent to panda server and forwarded to harvester
0031         stopwatch = core_utils.get_stopwatch()
0032         command_string = CommandSpec.COM_killWorkers
0033         tmp_log.debug(f"try to get {command_string} commands")
0034         command_specs = self.dbProxy.get_commands_for_receiver("sweeper", command_string)
0035         tmp_log.debug(f"got {len(command_specs)} {command_string} commands")
0036         for command_spec in command_specs:
0037             n_to_kill = self.dbProxy.mark_workers_to_kill_by_query(command_spec.params)
0038             tmp_log.debug(f"will kill {n_to_kill} workers with {command_spec.params}")
0039         tmp_log.debug(f"done handling {command_string} commands took {stopwatch.get_elapsed_time()}")
0040 
0041         # 2. SYNC_WORKERS_KILL commands from comparing worker status provided by pilot and harvester
0042         stopwatch = core_utils.get_stopwatch()
0043         command_string = CommandSpec.COM_syncWorkersKill
0044         tmp_log.debug(f"try to get {command_string} commands")
0045         command_specs = self.dbProxy.get_commands_for_receiver("sweeper", command_string)
0046         tmp_log.debug(f"got {len(command_specs)} {command_string} commands")
0047         for command_spec in command_specs:
0048             n_to_kill = self.dbProxy.mark_workers_to_kill_by_workerids(command_spec.params)
0049             tmp_log.debug(f"will kill {n_to_kill} workers with {command_spec.params}")
0050         tmp_log.debug(f"done handling {command_string} commands took {stopwatch.get_elapsed_time()}")
0051 
0052     # main loop
0053     def run(self):
0054         self.lockedBy = f"sweeper-{self.get_pid()}"
0055         while True:
0056             sw_main = core_utils.get_stopwatch()
0057             main_log = self.make_logger(_logger, f"id={self.lockedBy}", method_name="run")
0058 
0059             # process commands that mark workers to be killed
0060             try:
0061                 self.process_kill_commands()
0062             except Exception:
0063                 core_utils.dump_error_message(main_log)
0064 
0065             # actual killing stage
0066             sw_kill = core_utils.get_stopwatch()
0067             main_log.debug("try to get workers to kill")
0068             # get workers to kill
0069             workers_to_kill = self.dbProxy.get_workers_to_kill(harvester_config.sweeper.maxWorkers, harvester_config.sweeper.checkInterval)
0070             main_log.debug(f"got {len(workers_to_kill)} queues to kill workers")
0071             # loop over all workers
0072             sw = core_utils.get_stopwatch()
0073             for queue_name, configIdWorkSpecList in workers_to_kill.items():
0074                 for configID, workspec_list in configIdWorkSpecList.items():
0075                     # get sweeper
0076                     if not self.queueConfigMapper.has_queue(queue_name, configID):
0077                         main_log.error(f"queue config for {queue_name}/{configID} not found")
0078                         continue
0079                     queue_config = self.queueConfigMapper.get_queue(queue_name, configID)
0080                     try:
0081                         sweeper_core = self.pluginFactory.get_plugin(queue_config.sweeper)
0082                     except Exception:
0083                         main_log.error(f"failed to launch sweeper plugin for {queue_name}/{configID}")
0084                         core_utils.dump_error_message(main_log)
0085                         continue
0086                     sw.reset()
0087                     n_workers = len(workspec_list)
0088                     try:
0089                         # try bulk method
0090                         tmp_log = self.make_logger(_logger, f"id={self.lockedBy}", method_name="run")
0091                         tmp_log.debug("start killing")
0092                         tmp_list = sweeper_core.kill_workers(workspec_list)
0093                     except AttributeError:
0094                         # fall back to single-worker method
0095                         for workspec in workspec_list:
0096                             tmp_log = self.make_logger(_logger, f"workerID={workspec.workerID}", method_name="run")
0097                             try:
0098                                 tmp_log.debug("start killing one worker")
0099                                 tmp_stat, tmp_out = sweeper_core.kill_worker(workspec)
0100                                 tmp_log.debug(f"done killing with status={tmp_stat} diag={tmp_out}")
0101                             except Exception:
0102                                 core_utils.dump_error_message(tmp_log)
0103                     except Exception:
0104                         core_utils.dump_error_message(main_log)
0105                     else:
0106                         # bulk method
0107                         n_killed = 0
0108                         for workspec, (tmp_stat, tmp_out) in zip(workspec_list, tmp_list):
0109                             tmp_log.debug(f"done killing workerID={workspec.workerID} with status={tmp_stat} diag={tmp_out}")
0110                             if tmp_stat:
0111                                 n_killed += 1
0112                         tmp_log.debug(f"killed {n_killed}/{n_workers} workers")
0113                     main_log.debug(f"done killing {n_workers} workers" + sw.get_elapsed_time())
0114             main_log.debug("done all killing" + sw_kill.get_elapsed_time())
0115 
0116             # cleanup stage
0117             sw_cleanup = core_utils.get_stopwatch()
0118             # timeout for missed
0119             try:
0120                 keep_missed = harvester_config.sweeper.keepMissed
0121             except Exception:
0122                 keep_missed = 24
0123             try:
0124                 keep_pending = harvester_config.sweeper.keepPending
0125             except Exception:
0126                 keep_pending = 24
0127             # get workers for cleanup
0128             statusTimeoutMap = {
0129                 "finished": harvester_config.sweeper.keepFinished,
0130                 "failed": harvester_config.sweeper.keepFailed,
0131                 "cancelled": harvester_config.sweeper.keepCancelled,
0132                 "missed": keep_missed,
0133                 "pending": keep_pending,
0134             }
0135             workersForCleanup = self.dbProxy.get_workers_for_cleanup(harvester_config.sweeper.maxWorkers, statusTimeoutMap)
0136             main_log.debug(f"got {len(workersForCleanup)} queues for workers cleanup")
0137             sw = core_utils.get_stopwatch()
0138             for queue_name, configIdWorkSpecList in workersForCleanup.items():
0139                 for configID, workspec_list in configIdWorkSpecList.items():
0140                     # get sweeper
0141                     if not self.queueConfigMapper.has_queue(queue_name, configID):
0142                         main_log.error(f"queue config for {queue_name}/{configID} not found")
0143                         continue
0144                     queue_config = self.queueConfigMapper.get_queue(queue_name, configID)
0145                     sweeper_core = self.pluginFactory.get_plugin(queue_config.sweeper)
0146                     messenger = self.pluginFactory.get_plugin(queue_config.messenger)
0147                     sw.reset()
0148                     n_workers = len(workspec_list)
0149                     # make sure workers to clean up are all terminated
0150                     main_log.debug("making sure workers to clean up are all terminated")
0151                     try:
0152                         # try bulk method
0153                         tmp_list = sweeper_core.kill_workers(workspec_list)
0154                     except AttributeError:
0155                         # fall back to single-worker method
0156                         for workspec in workspec_list:
0157                             tmp_log = self.make_logger(_logger, f"workerID={workspec.workerID}", method_name="run")
0158                             try:
0159                                 tmp_stat, tmp_out = sweeper_core.kill_worker(workspec)
0160                             except Exception:
0161                                 core_utils.dump_error_message(tmp_log)
0162                     except Exception:
0163                         core_utils.dump_error_message(main_log)
0164                     main_log.debug("made sure workers to clean up are all terminated")
0165                     # start cleanup
0166                     for workspec in workspec_list:
0167                         tmp_log = self.make_logger(_logger, f"workerID={workspec.workerID}", method_name="run")
0168                         try:
0169                             tmp_log.debug("start cleaning up one worker")
0170                             # sweep worker
0171                             tmp_stat, tmp_out = sweeper_core.sweep_worker(workspec)
0172                             tmp_log.debug(f"swept_worker with status={tmp_stat} diag={tmp_out}")
0173                             tmp_log.debug("start messenger cleanup")
0174                             mc_tmp_stat, mc_tmp_out = messenger.clean_up(workspec)
0175                             tmp_log.debug(f"messenger cleaned up with status={mc_tmp_stat} diag={mc_tmp_out}")
0176                             if tmp_stat:
0177                                 self.dbProxy.delete_worker(workspec.workerID)
0178                         except Exception:
0179                             core_utils.dump_error_message(tmp_log)
0180                     main_log.debug(f"done cleaning up {n_workers} workers" + sw.get_elapsed_time())
0181             main_log.debug("done all cleanup" + sw_cleanup.get_elapsed_time())
0182 
0183             # old-job-deletion stage
0184             sw_delete = core_utils.get_stopwatch()
0185             main_log.debug("delete old jobs")
0186             jobTimeout = max(statusTimeoutMap.values()) + 1
0187             self.dbProxy.delete_old_jobs(jobTimeout)
0188             # delete orphaned job info
0189             self.dbProxy.delete_orphaned_job_info()
0190             main_log.debug("done deletion of old jobs" + sw_delete.get_elapsed_time())
0191             # disk cleanup
0192             if hasattr(harvester_config.sweeper, "diskCleanUpInterval") and hasattr(harvester_config.sweeper, "diskHighWatermark"):
0193                 locked = self.dbProxy.get_process_lock("sweeper", self.get_pid(), harvester_config.sweeper.diskCleanUpInterval * 60 * 60)
0194                 if locked:
0195                     try:
0196                         all_active_files = None
0197                         for item in harvester_config.sweeper.diskHighWatermark.split(","):
0198                             # dir name and watermark in GB
0199                             dir_name, watermark = item.split("|")
0200                             main_log.debug(f"checking {dir_name} for cleanup with watermark {watermark} GB")
0201                             watermark = int(watermark) * 10**9
0202                             total_size = 0
0203                             file_dict = {}
0204                             # scan dir
0205                             for root, dirs, filenames in walk(dir_name):
0206                                 for base_name in filenames:
0207                                     full_name = os.path.join(root, base_name)
0208                                     f_size = os.path.getsize(full_name)
0209                                     total_size += f_size
0210                                     mtime = os.path.getmtime(full_name)
0211                                     file_dict.setdefault(mtime, set())
0212                                     file_dict[mtime].add((base_name, full_name, f_size))
0213                             # delete if necessary
0214                             if total_size < watermark:
0215                                 main_log.debug(f"skip cleanup {dir_name} due to total_size {total_size // 10 ** 9} GB < watermark {watermark // 10 ** 9} GB")
0216                             else:
0217                                 main_log.debug(f"cleanup {dir_name} due to total_size {total_size // 10 ** 9} GB >= watermark {watermark // 10 ** 9} GB")
0218                                 # get active input files
0219                                 if all_active_files is None:
0220                                     all_active_files = self.dbProxy.get_all_active_input_files()
0221                                 deleted_size = 0
0222                                 mtimes = sorted(file_dict.keys())
0223                                 for mtime in mtimes:
0224                                     for base_name, full_name, f_size in file_dict[mtime]:
0225                                         # keep if active
0226                                         if base_name in all_active_files:
0227                                             continue
0228                                         try:
0229                                             os.remove(full_name)
0230                                         except Exception:
0231                                             core_utils.dump_error_message(main_log)
0232                                         deleted_size += f_size
0233                                         if total_size - deleted_size < watermark:
0234                                             break
0235                                     if total_size - deleted_size < watermark:
0236                                         break
0237                     except Exception:
0238                         core_utils.dump_error_message(main_log)
0239             # time the cycle
0240             main_log.debug("done a sweeper cycle" + sw_main.get_elapsed_time())
0241             # check if being terminated
0242             if self.terminated(harvester_config.sweeper.sleepTime):
0243                 main_log.debug("terminated")
0244                 return