Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:17

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2025
0010 
0011 import time
0012 import threading
0013 import traceback
0014 try:
0015     # python 3
0016     from queue import Queue
0017 except ImportError:
0018     # Python 2
0019     from Queue import Queue
0020 
0021 from idds.common.constants import Sections, ReturnCode
0022 from idds.common.exceptions import AgentPluginError, IDDSException
0023 from idds.common.utils import setup_logging, get_logger
0024 from idds.common.utils import json_dumps
0025 from idds.core import messages as core_messages, catalog as core_catalog
0026 from idds.core import health as core_health
0027 from idds.agents.common.baseagent import BaseAgent
0028 # from idds.agents.common.eventbus.event import TerminatedProcessingEvent
0029 from idds.agents.common.eventbus.event import (EventType, MessageEvent,
0030                                                TriggerProcessingEvent)
0031 
0032 from .utils import handle_messages_processing
0033 from .iutils import handle_messages_asyncresult
0034 
0035 setup_logging(__name__)
0036 
0037 
0038 class Receiver(BaseAgent):
0039     """
0040     Receiver works to receive workload management messages to update task/job status.
0041     """
0042 
0043     def __init__(self, receiver_num_threads=8, num_threads=1, bulk_message_delay=30, bulk_message_size=2000,
0044                  random_delay=None, use_process_pool=False, update_processing_interval=300, mode='single',
0045                  separate_logger=False, **kwargs):
0046         super(Receiver, self).__init__(num_threads=receiver_num_threads, name='Receiver', use_process_pool=use_process_pool, **kwargs)
0047         self.config_section = Sections.Carrier
0048         self.bulk_message_delay = int(bulk_message_delay)
0049         self.bulk_message_size = int(bulk_message_size)
0050         self.message_queue = Queue()
0051         if separate_logger:
0052             self.logger = get_logger(self.__class__.__name__)
0053         self.update_processing_interval = update_processing_interval
0054         if self.update_processing_interval:
0055             self.update_processing_interval = int(self.update_processing_interval)
0056         else:
0057             self.update_processing_interval = 300
0058 
0059         self.mode = mode
0060         self.selected = None
0061         self.selected_receiver = None
0062 
0063         self.log_prefix = ''
0064 
0065         self._lock = threading.RLock()
0066 
0067     def __del__(self):
0068         self.stop_receiver()
0069 
0070     def start_receiver(self):
0071         if 'receiver' not in self.plugins:
0072             raise AgentPluginError('Plugin receiver is required')
0073         self.receiver = self.plugins['receiver']
0074 
0075         self.logger.info("Starting receiver: %s" % self.receiver)
0076         self.receiver.set_output_queue(self.message_queue)
0077         self.setup_logger(self.logger)
0078         self.receiver.start()
0079 
0080     def stop_receiver(self):
0081         if hasattr(self, 'receiver') and self.receiver:
0082             self.logger.info("Stopping receiver: %s" % self.receiver)
0083             self.receiver.stop()
0084             self.receiver = None
0085 
0086     def suspend_receiver(self):
0087         if hasattr(self, 'receiver') and self.receiver:
0088             self.logger.info("Stopping receiver: %s" % self.receiver)
0089             self.receiver.suspend()
0090 
0091     def resume_receiver(self):
0092         if hasattr(self, 'receiver') and self.receiver:
0093             self.logger.info("Resuming receiver: %s" % self.receiver)
0094             self.receiver.resume()
0095 
0096     def is_receiver_started(self):
0097         if hasattr(self, 'receiver') and self.receiver and self.receiver.is_processing():
0098             return True
0099         return False
0100 
0101     def get_num_queued_messages(self):
0102         return self.message_queue.qsize()
0103 
0104     def get_output_messages(self):
0105         with self._lock:
0106             msgs = {}
0107             try:
0108                 msg_size = 0
0109                 while not self.message_queue.empty():
0110                     msg = self.message_queue.get(False)
0111                     if msg:
0112                         if msg_size < 10:
0113                             self.logger.debug("Received message(only log first 10 messages): %s" % str(msg))
0114                         name = msg['name']
0115                         # headers = msg['headers']
0116                         # body = msg['body']
0117                         # from_idds = msg['from_idds']
0118                         if name not in msgs:
0119                             msgs[name] = []
0120                         msgs[name].append(msg)
0121                         msg_size += 1
0122                         if msg_size >= self.bulk_message_size:
0123                             break
0124             except Exception as error:
0125                 self.logger.error("Failed to get output messages: %s, %s" % (error, traceback.format_exc()))
0126             if msgs:
0127                 total_msgs = self.get_num_queued_messages()
0128                 got_msgs = 0
0129                 for name in msgs:
0130                     got_msgs += len(msgs[name])
0131                 self.logger.info("process_messages: Get %s messages, left %s messages" % (got_msgs, total_msgs))
0132             return msgs
0133 
0134     def is_selected(self):
0135         selected = None
0136         if not self.selected_receiver:
0137             selected = True
0138         else:
0139             selected = self.is_self(self.selected_receiver)
0140         if self.selected is None or self.selected != selected:
0141             self.logger.info("is_selected changed from %s to %s" % (self.selected, selected))
0142         self.selected = selected
0143         return self.selected
0144 
0145     def monitor_receiver(self):
0146         if self.mode == "single":
0147             self.logger.info("Receiver single mode")
0148             self.selected_receiver = core_health.select_agent(name='Receiver', newer_than=self.heartbeat_delay * 2)
0149             self.logger.debug("Selected receiver: %s" % self.selected_receiver)
0150 
0151     def add_receiver_monitor_task(self):
0152         task = self.create_task(task_func=self.monitor_receiver, task_output_queue=None,
0153                                 task_args=tuple(), task_kwargs={}, delay_time=self.heartbeat_delay,
0154                                 priority=1)
0155         self.add_task(task)
0156 
0157     def handle_messages(self, output_messages, log_prefix):
0158         output_messages_new = []
0159         for msg in output_messages:
0160             output_messages_new.append(msg['msg']['body'])
0161         ret_msg_handle = handle_messages_processing(output_messages_new,
0162                                                     logger=self.logger,
0163                                                     log_prefix=log_prefix,
0164                                                     update_processing_interval=self.update_processing_interval)
0165 
0166         update_processings, update_processings_by_job, terminated_processings, update_contents, msgs = ret_msg_handle
0167         if msgs:
0168             # self.logger.debug(log_prefix + "adding messages[:3]: %s" % json_dumps(msgs[:3]))
0169             core_messages.add_messages(msgs, bulk_size=self.bulk_message_size)
0170 
0171         num_to_update_contents = 0
0172         if update_contents:
0173             self.logger.info(log_prefix + "update_contents[:3]: %s" % json_dumps(update_contents[:3]))
0174             # instead of update contents directly, add contents to contents_update table.
0175             # core_catalog.update_contents(update_contents)
0176             core_catalog.add_contents_update(update_contents)
0177             num_to_update_contents = len(update_contents)
0178 
0179         pr_id_triggered = []
0180         for pr_id in update_processings_by_job:
0181             # self.logger.info(log_prefix + "TerminatedProcessingEvent(processing_id: %s)" % pr_id)
0182             # event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr_id)
0183             # self.logger.info(log_prefix + "MsgTriggerProcessingEvent(processing_id: %s)" % pr_id)
0184             self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id)
0185             event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id)
0186             self.event_bus.send(event)
0187             pr_id_triggered.append(pr_id)
0188 
0189         for pr_id in update_processings:
0190             if pr_id in pr_id_triggered:
0191                 continue
0192             # self.logger.info(log_prefix + "TerminatedProcessingEvent(processing_id: %s)" % pr_id)
0193             # event = TerminatedProcessingEvent(publisher_id=self.id, processing_id=pr_id)
0194             self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id)
0195             event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id,
0196                                            content={'num_to_update_contents': num_to_update_contents})
0197             event.set_has_updates()
0198             self.event_bus.send(event)
0199             pr_id_triggered.append(pr_id)
0200 
0201         for pr_id in terminated_processings:
0202             if pr_id in pr_id_triggered:
0203                 continue
0204             self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id)
0205             event = TriggerProcessingEvent(publisher_id=self.id,
0206                                            processing_id=pr_id,
0207                                            content={'Terminated': True, 'source': 'Receiver'})
0208             event.set_terminating()
0209             self.event_bus.send(event)
0210             pr_id_triggered.append(pr_id)
0211 
0212     def handle_messages_asyncresult(self, output_messages, log_prefix):
0213         handle_messages_asyncresult(output_messages,
0214                                     logger=self.logger,
0215                                     log_prefix=log_prefix,
0216                                     update_processing_interval=self.update_processing_interval)
0217 
0218     def handle_messages_channels(self, output_messages, log_prefix):
0219         for channel in output_messages:
0220             if channel in ['asyncresult', 'AsyncResult']:
0221                 self.handle_messages_asyncresult(output_messages[channel], log_prefix)
0222             else:
0223                 self.handle_messages(output_messages[channel], log_prefix)
0224 
0225     def process_messages(self, log_prefix=None):
0226         output_messages = self.get_output_messages()
0227         has_messages = False
0228         if output_messages:
0229             self.logger.info("process_messages: Received %s messages" % (len(output_messages)))
0230             self.handle_messages_channels(output_messages, log_prefix=log_prefix)
0231             self.logger.info("process_messages: Handled %s messages" % len(output_messages))
0232             has_messages = True
0233         return has_messages
0234 
0235     def worker(self, log_prefix):
0236         while not self.graceful_stop.is_set():
0237             try:
0238                 has_messages = self.process_messages(log_prefix)
0239                 if not has_messages:
0240                     time.sleep(1)
0241             except IDDSException as error:
0242                 self.logger.error("Worker thread IDDSException: %s" % str(error))
0243             except Exception as error:
0244                 self.logger.critical("Worker thread exception: %s\n%s" % (str(error), traceback.format_exc()))
0245 
0246     def is_ok_to_run_more_workers(self):
0247         if self.executors.has_free_workers():
0248             return True
0249         return False
0250 
0251     def process_messages_event(self, event):
0252         try:
0253             pro_ret = ReturnCode.Ok.value
0254             if event:
0255                 output_messages = event.get_message()
0256                 if output_messages:
0257                     self.logger.info("process_messages: Received %s messages" % (len(output_messages)))
0258                     self.handle_messages_channels(output_messages, log_prefix=self.log_prefix)
0259                     self.logger.info("process_messages: Handled %s messages" % len(output_messages))
0260         except Exception as ex:
0261             self.logger.error(ex)
0262             self.logger.error(traceback.format_exc())
0263             pro_ret = ReturnCode.Failed.value
0264         return pro_ret
0265 
0266     def init_event_function_map(self):
0267         self.event_func_map = {
0268             EventType.Message: {
0269                 'pre_check': self.is_ok_to_run_more_workers,
0270                 'exec_func': self.process_messages_event
0271             }
0272         }
0273 
0274     def run(self):
0275         """
0276         Main run function.
0277         """
0278         try:
0279             self.logger.info("Starting main thread")
0280             self.init_thread_info()
0281 
0282             self.add_default_tasks()
0283 
0284             task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0285             self.add_task(task)
0286 
0287             if self.mode == "single":
0288                 self.logger.debug("single mode")
0289                 self.add_receiver_monitor_task()
0290 
0291             self.load_plugins()
0292 
0293             self.add_health_message_task()
0294 
0295             log_prefix = "<Message>"
0296             self.log_prefix = log_prefix
0297 
0298             # [self.executors.submit(self.worker, log_prefix) for i in range(self.executors.get_max_workers())]
0299             self.init_event_function_map()
0300 
0301             self.start_receiver()
0302 
0303             time_start = None
0304             while not self.graceful_stop.is_set():
0305                 try:
0306                     self.execute_schedules()
0307 
0308                     if not time_start or time.time() > time_start + self.bulk_message_delay:
0309                         self.logger.info(f"is_selected: {self.is_selected()}, is_receiver_started: {self.is_receiver_started()}")
0310                         if self.is_selected():
0311                             if not self.is_receiver_started():
0312                                 self.resume_receiver()
0313 
0314                         if not self.is_selected():
0315                             if self.is_receiver_started():
0316                                 self.suspend_receiver()
0317 
0318                         time_start = time.time()
0319                         msg = self.get_output_messages()
0320                         if msg:
0321                             event = MessageEvent(message=msg)
0322                             # self.event_bus.send(event)
0323                             # self.process_messages_event(event)
0324                             self.submit(self.process_messages_event, **{"event": event})
0325 
0326                     self.graceful_stop.wait(0.00001)
0327                 except IDDSException as error:
0328                     self.logger.error("Main thread IDDSException: %s" % str(error))
0329                 except Exception as error:
0330                     self.logger.critical("Main thread exception: %s\n%s" % (str(error), traceback.format_exc()))
0331         except KeyboardInterrupt:
0332             self.stop()
0333 
0334     def stop(self):
0335         super(Receiver, self).stop()
0336         self.stop_receiver()
0337 
0338 
0339 if __name__ == '__main__':
0340     agent = Receiver()
0341     agent()