File indexing completed on 2026-04-20 07:58:56
0001 import datetime
0002 import socket
0003
0004 from pandaharvester import commit_timestamp, panda_pkg_info
0005 from pandaharvester.harvesterbody.agent_base import AgentBase
0006 from pandaharvester.harvesterconfig import harvester_config
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestercore.command_spec import CommandSpec
0009 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0010
0011
0012 _logger = core_utils.setup_logger("command_manager")
0013
0014
0015
0016 class CommandManager(AgentBase):
0017
0018 def __init__(self, communicator, queue_config_mapper, single_mode=False):
0019 AgentBase.__init__(self, single_mode)
0020 self.db_proxy = DBProxy()
0021 self.communicator = communicator
0022 self.queueConfigMapper = queue_config_mapper
0023 self.nodeName = socket.gethostname()
0024 self.lastHeartbeat = None
0025
0026
0027 def set_single_mode(self, single_mode):
0028 self.singleMode = single_mode
0029
0030 def convert_to_command_specs(self, commands):
0031 """
0032 Generates a list of CommandSpec objects
0033 """
0034 command_specs = []
0035 for command in commands:
0036 command_spec = CommandSpec()
0037 command_spec.convert_command_json(command)
0038 for comStr, receiver in CommandSpec.receiver_map.items():
0039 if command_spec.command.startswith(comStr):
0040 command_spec.receiver = receiver
0041 break
0042 if command_spec.receiver is not None:
0043 command_specs.append(command_spec)
0044 return command_specs
0045
0046 def run(self):
0047 """
0048 main
0049 """
0050 main_log = self.make_logger(_logger, f"id={self.get_pid()}", method_name="run")
0051 bulk_size = harvester_config.commandmanager.commands_bulk_size
0052 locked = self.db_proxy.get_process_lock("commandmanager", self.get_pid(), harvester_config.commandmanager.sleepTime)
0053 if locked:
0054
0055 siteNames = set()
0056 commandList = []
0057 for queueName, queueConfig in self.queueConfigMapper.get_active_queues().items():
0058 if queueConfig is None or queueConfig.runMode != "slave":
0059 continue
0060
0061 if queueConfig.siteName not in siteNames:
0062 commandItem = {
0063 "command": CommandSpec.COM_reportWorkerStats,
0064 "computingSite": queueConfig.siteName,
0065 "resourceType": queueConfig.resourceType,
0066 }
0067 commandList.append(commandItem)
0068 siteNames.add(queueConfig.siteName)
0069
0070 commandItem = {"command": CommandSpec.COM_setNWorkers, "computingSite": queueConfig.siteName, "resourceType": queueConfig.resourceType}
0071 commandList.append(commandItem)
0072 data = {"startTime": core_utils.naive_utcnow(), "sw_version": panda_pkg_info.release_version, "commit_stamp": commit_timestamp.timestamp}
0073 if len(commandList) > 0:
0074 main_log.debug("sending command list to receive")
0075 data["commands"] = commandList
0076 self.communicator.is_alive(data)
0077
0078
0079 while True:
0080
0081 locked = self.db_proxy.get_process_lock("commandmanager", self.get_pid(), harvester_config.commandmanager.sleepTime)
0082 if locked or self.singleMode:
0083 main_log.debug("polling commands loop")
0084
0085
0086 if self.lastHeartbeat is None or self.lastHeartbeat < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0087 self.lastHeartbeat = core_utils.naive_utcnow()
0088 self.communicator.is_alive({})
0089
0090 continuous_loop = True
0091
0092 while continuous_loop:
0093
0094 commands = self.communicator.get_commands(bulk_size)
0095 main_log.debug(f"got {len(commands)} commands (bulk size: {bulk_size})")
0096 command_specs = self.convert_to_command_specs(commands)
0097
0098
0099 self.db_proxy.store_commands(command_specs)
0100 main_log.debug(f"cached {len(command_specs)} commands in internal DB")
0101
0102
0103 command_ids_ack = self.db_proxy.get_commands_ack()
0104
0105 for shard in core_utils.create_shards(command_ids_ack, bulk_size):
0106
0107 self.communicator.ack_commands(shard)
0108 main_log.debug(f"acknowledged {len(shard)} commands to panda server")
0109
0110
0111 self.db_proxy.clean_commands_by_id(shard)
0112
0113
0114 self.db_proxy.clean_processed_commands()
0115
0116
0117 if len(commands) < bulk_size:
0118 continuous_loop = False
0119
0120
0121 if self.terminated(harvester_config.commandmanager.sleepTime, randomize=False):
0122 main_log.debug("terminated")
0123 return