Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 import datetime
0002 import socket
0003 
0004 from pandaharvester import commit_timestamp, panda_pkg_info
0005 from pandaharvester.harvesterbody.agent_base import AgentBase
0006 from pandaharvester.harvesterconfig import harvester_config
0007 from pandaharvester.harvestercore import core_utils
0008 from pandaharvester.harvestercore.command_spec import CommandSpec
0009 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0010 
0011 # logger
0012 _logger = core_utils.setup_logger("command_manager")
0013 
0014 
0015 # class to retrieve commands from panda server
0016 class CommandManager(AgentBase):
0017     # constructor
0018     def __init__(self, communicator, queue_config_mapper, single_mode=False):
0019         AgentBase.__init__(self, single_mode)
0020         self.db_proxy = DBProxy()
0021         self.communicator = communicator
0022         self.queueConfigMapper = queue_config_mapper
0023         self.nodeName = socket.gethostname()
0024         self.lastHeartbeat = None
0025 
0026     # set single mode
0027     def set_single_mode(self, single_mode):
0028         self.singleMode = single_mode
0029 
0030     def convert_to_command_specs(self, commands):
0031         """
0032         Generates a list of CommandSpec objects
0033         """
0034         command_specs = []
0035         for command in commands:
0036             command_spec = CommandSpec()
0037             command_spec.convert_command_json(command)
0038             for comStr, receiver in CommandSpec.receiver_map.items():
0039                 if command_spec.command.startswith(comStr):
0040                     command_spec.receiver = receiver
0041                     break
0042             if command_spec.receiver is not None:
0043                 command_specs.append(command_spec)
0044         return command_specs
0045 
0046     def run(self):
0047         """
0048         main
0049         """
0050         main_log = self.make_logger(_logger, f"id={self.get_pid()}", method_name="run")
0051         bulk_size = harvester_config.commandmanager.commands_bulk_size
0052         locked = self.db_proxy.get_process_lock("commandmanager", self.get_pid(), harvester_config.commandmanager.sleepTime)
0053         if locked:
0054             # send command list to be received
0055             siteNames = set()
0056             commandList = []
0057             for queueName, queueConfig in self.queueConfigMapper.get_active_queues().items():
0058                 if queueConfig is None or queueConfig.runMode != "slave":
0059                     continue
0060                 # one command for all queues in one site
0061                 if queueConfig.siteName not in siteNames:
0062                     commandItem = {
0063                         "command": CommandSpec.COM_reportWorkerStats,
0064                         "computingSite": queueConfig.siteName,
0065                         "resourceType": queueConfig.resourceType,
0066                     }
0067                     commandList.append(commandItem)
0068                 siteNames.add(queueConfig.siteName)
0069                 # one command for each queue
0070                 commandItem = {"command": CommandSpec.COM_setNWorkers, "computingSite": queueConfig.siteName, "resourceType": queueConfig.resourceType}
0071                 commandList.append(commandItem)
0072             data = {"startTime": core_utils.naive_utcnow(), "sw_version": panda_pkg_info.release_version, "commit_stamp": commit_timestamp.timestamp}
0073             if len(commandList) > 0:
0074                 main_log.debug("sending command list to receive")
0075                 data["commands"] = commandList
0076             self.communicator.is_alive(data)
0077 
0078         # main loop
0079         while True:
0080             # get lock
0081             locked = self.db_proxy.get_process_lock("commandmanager", self.get_pid(), harvester_config.commandmanager.sleepTime)
0082             if locked or self.singleMode:
0083                 main_log.debug("polling commands loop")
0084 
0085                 # send heartbeat
0086                 if self.lastHeartbeat is None or self.lastHeartbeat < core_utils.naive_utcnow() - datetime.timedelta(minutes=10):
0087                     self.lastHeartbeat = core_utils.naive_utcnow()
0088                     self.communicator.is_alive({})
0089 
0090                 continuous_loop = True  # as long as there are commands, retrieve them
0091 
0092                 while continuous_loop:
0093                     # get commands from panda server for this harvester instance
0094                     commands = self.communicator.get_commands(bulk_size)
0095                     main_log.debug(f"got {len(commands)} commands (bulk size: {bulk_size})")
0096                     command_specs = self.convert_to_command_specs(commands)
0097 
0098                     # cache commands in internal DB
0099                     self.db_proxy.store_commands(command_specs)
0100                     main_log.debug(f"cached {len(command_specs)} commands in internal DB")
0101 
0102                     # retrieve processed commands from harvester cache
0103                     command_ids_ack = self.db_proxy.get_commands_ack()
0104 
0105                     for shard in core_utils.create_shards(command_ids_ack, bulk_size):
0106                         # post acknowledgements to panda server
0107                         self.communicator.ack_commands(shard)
0108                         main_log.debug(f"acknowledged {len(shard)} commands to panda server")
0109 
0110                         # clean acknowledged commands
0111                         self.db_proxy.clean_commands_by_id(shard)
0112 
0113                     # clean commands that have been processed and do not need acknowledgement
0114                     self.db_proxy.clean_processed_commands()
0115 
0116                     # if we didn't collect the full bulk, give panda server a break
0117                     if len(commands) < bulk_size:
0118                         continuous_loop = False
0119 
0120             # check if being terminated
0121             if self.terminated(harvester_config.commandmanager.sleepTime, randomize=False):
0122                 main_log.debug("terminated")
0123                 return