File indexing completed on 2026-04-09 07:58:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import time
0012 import threading
0013 import traceback
0014 try:
0015
0016 from queue import Queue
0017 except ImportError:
0018
0019 from Queue import Queue
0020
0021 from idds.common.constants import Sections, ReturnCode
0022 from idds.common.exceptions import AgentPluginError, IDDSException
0023 from idds.common.utils import setup_logging, get_logger
0024 from idds.common.utils import json_dumps
0025 from idds.core import messages as core_messages, catalog as core_catalog
0026 from idds.core import health as core_health
0027 from idds.agents.common.baseagent import BaseAgent
0028
0029 from idds.agents.common.eventbus.event import (EventType, MessageEvent,
0030 TriggerProcessingEvent)
0031
0032 from .utils import handle_messages_processing
0033 from .iutils import handle_messages_asyncresult
0034
0035 setup_logging(__name__)
0036
0037
0038 class Receiver(BaseAgent):
0039 """
0040 Receiver works to receive workload management messages to update task/job status.
0041 """
0042
0043 def __init__(self, receiver_num_threads=8, num_threads=1, bulk_message_delay=30, bulk_message_size=2000,
0044 random_delay=None, use_process_pool=False, update_processing_interval=300, mode='single',
0045 separate_logger=False, **kwargs):
0046 super(Receiver, self).__init__(num_threads=receiver_num_threads, name='Receiver', use_process_pool=use_process_pool, **kwargs)
0047 self.config_section = Sections.Carrier
0048 self.bulk_message_delay = int(bulk_message_delay)
0049 self.bulk_message_size = int(bulk_message_size)
0050 self.message_queue = Queue()
0051 if separate_logger:
0052 self.logger = get_logger(self.__class__.__name__)
0053 self.update_processing_interval = update_processing_interval
0054 if self.update_processing_interval:
0055 self.update_processing_interval = int(self.update_processing_interval)
0056 else:
0057 self.update_processing_interval = 300
0058
0059 self.mode = mode
0060 self.selected = None
0061 self.selected_receiver = None
0062
0063 self.log_prefix = ''
0064
0065 self._lock = threading.RLock()
0066
0067 def __del__(self):
0068 self.stop_receiver()
0069
0070 def start_receiver(self):
0071 if 'receiver' not in self.plugins:
0072 raise AgentPluginError('Plugin receiver is required')
0073 self.receiver = self.plugins['receiver']
0074
0075 self.logger.info("Starting receiver: %s" % self.receiver)
0076 self.receiver.set_output_queue(self.message_queue)
0077 self.setup_logger(self.logger)
0078 self.receiver.start()
0079
0080 def stop_receiver(self):
0081 if hasattr(self, 'receiver') and self.receiver:
0082 self.logger.info("Stopping receiver: %s" % self.receiver)
0083 self.receiver.stop()
0084 self.receiver = None
0085
0086 def suspend_receiver(self):
0087 if hasattr(self, 'receiver') and self.receiver:
0088 self.logger.info("Stopping receiver: %s" % self.receiver)
0089 self.receiver.suspend()
0090
0091 def resume_receiver(self):
0092 if hasattr(self, 'receiver') and self.receiver:
0093 self.logger.info("Resuming receiver: %s" % self.receiver)
0094 self.receiver.resume()
0095
0096 def is_receiver_started(self):
0097 if hasattr(self, 'receiver') and self.receiver and self.receiver.is_processing():
0098 return True
0099 return False
0100
0101 def get_num_queued_messages(self):
0102 return self.message_queue.qsize()
0103
0104 def get_output_messages(self):
0105 with self._lock:
0106 msgs = {}
0107 try:
0108 msg_size = 0
0109 while not self.message_queue.empty():
0110 msg = self.message_queue.get(False)
0111 if msg:
0112 if msg_size < 10:
0113 self.logger.debug("Received message(only log first 10 messages): %s" % str(msg))
0114 name = msg['name']
0115
0116
0117
0118 if name not in msgs:
0119 msgs[name] = []
0120 msgs[name].append(msg)
0121 msg_size += 1
0122 if msg_size >= self.bulk_message_size:
0123 break
0124 except Exception as error:
0125 self.logger.error("Failed to get output messages: %s, %s" % (error, traceback.format_exc()))
0126 if msgs:
0127 total_msgs = self.get_num_queued_messages()
0128 got_msgs = 0
0129 for name in msgs:
0130 got_msgs += len(msgs[name])
0131 self.logger.info("process_messages: Get %s messages, left %s messages" % (got_msgs, total_msgs))
0132 return msgs
0133
0134 def is_selected(self):
0135 selected = None
0136 if not self.selected_receiver:
0137 selected = True
0138 else:
0139 selected = self.is_self(self.selected_receiver)
0140 if self.selected is None or self.selected != selected:
0141 self.logger.info("is_selected changed from %s to %s" % (self.selected, selected))
0142 self.selected = selected
0143 return self.selected
0144
0145 def monitor_receiver(self):
0146 if self.mode == "single":
0147 self.logger.info("Receiver single mode")
0148 self.selected_receiver = core_health.select_agent(name='Receiver', newer_than=self.heartbeat_delay * 2)
0149 self.logger.debug("Selected receiver: %s" % self.selected_receiver)
0150
0151 def add_receiver_monitor_task(self):
0152 task = self.create_task(task_func=self.monitor_receiver, task_output_queue=None,
0153 task_args=tuple(), task_kwargs={}, delay_time=self.heartbeat_delay,
0154 priority=1)
0155 self.add_task(task)
0156
0157 def handle_messages(self, output_messages, log_prefix):
0158 output_messages_new = []
0159 for msg in output_messages:
0160 output_messages_new.append(msg['msg']['body'])
0161 ret_msg_handle = handle_messages_processing(output_messages_new,
0162 logger=self.logger,
0163 log_prefix=log_prefix,
0164 update_processing_interval=self.update_processing_interval)
0165
0166 update_processings, update_processings_by_job, terminated_processings, update_contents, msgs = ret_msg_handle
0167 if msgs:
0168
0169 core_messages.add_messages(msgs, bulk_size=self.bulk_message_size)
0170
0171 num_to_update_contents = 0
0172 if update_contents:
0173 self.logger.info(log_prefix + "update_contents[:3]: %s" % json_dumps(update_contents[:3]))
0174
0175
0176 core_catalog.add_contents_update(update_contents)
0177 num_to_update_contents = len(update_contents)
0178
0179 pr_id_triggered = []
0180 for pr_id in update_processings_by_job:
0181
0182
0183
0184 self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id)
0185 event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id)
0186 self.event_bus.send(event)
0187 pr_id_triggered.append(pr_id)
0188
0189 for pr_id in update_processings:
0190 if pr_id in pr_id_triggered:
0191 continue
0192
0193
0194 self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id)
0195 event = TriggerProcessingEvent(publisher_id=self.id, processing_id=pr_id,
0196 content={'num_to_update_contents': num_to_update_contents})
0197 event.set_has_updates()
0198 self.event_bus.send(event)
0199 pr_id_triggered.append(pr_id)
0200
0201 for pr_id in terminated_processings:
0202 if pr_id in pr_id_triggered:
0203 continue
0204 self.logger.info(log_prefix + "TriggerProcessingEvent(processing_id: %s)" % pr_id)
0205 event = TriggerProcessingEvent(publisher_id=self.id,
0206 processing_id=pr_id,
0207 content={'Terminated': True, 'source': 'Receiver'})
0208 event.set_terminating()
0209 self.event_bus.send(event)
0210 pr_id_triggered.append(pr_id)
0211
0212 def handle_messages_asyncresult(self, output_messages, log_prefix):
0213 handle_messages_asyncresult(output_messages,
0214 logger=self.logger,
0215 log_prefix=log_prefix,
0216 update_processing_interval=self.update_processing_interval)
0217
0218 def handle_messages_channels(self, output_messages, log_prefix):
0219 for channel in output_messages:
0220 if channel in ['asyncresult', 'AsyncResult']:
0221 self.handle_messages_asyncresult(output_messages[channel], log_prefix)
0222 else:
0223 self.handle_messages(output_messages[channel], log_prefix)
0224
0225 def process_messages(self, log_prefix=None):
0226 output_messages = self.get_output_messages()
0227 has_messages = False
0228 if output_messages:
0229 self.logger.info("process_messages: Received %s messages" % (len(output_messages)))
0230 self.handle_messages_channels(output_messages, log_prefix=log_prefix)
0231 self.logger.info("process_messages: Handled %s messages" % len(output_messages))
0232 has_messages = True
0233 return has_messages
0234
0235 def worker(self, log_prefix):
0236 while not self.graceful_stop.is_set():
0237 try:
0238 has_messages = self.process_messages(log_prefix)
0239 if not has_messages:
0240 time.sleep(1)
0241 except IDDSException as error:
0242 self.logger.error("Worker thread IDDSException: %s" % str(error))
0243 except Exception as error:
0244 self.logger.critical("Worker thread exception: %s\n%s" % (str(error), traceback.format_exc()))
0245
0246 def is_ok_to_run_more_workers(self):
0247 if self.executors.has_free_workers():
0248 return True
0249 return False
0250
0251 def process_messages_event(self, event):
0252 try:
0253 pro_ret = ReturnCode.Ok.value
0254 if event:
0255 output_messages = event.get_message()
0256 if output_messages:
0257 self.logger.info("process_messages: Received %s messages" % (len(output_messages)))
0258 self.handle_messages_channels(output_messages, log_prefix=self.log_prefix)
0259 self.logger.info("process_messages: Handled %s messages" % len(output_messages))
0260 except Exception as ex:
0261 self.logger.error(ex)
0262 self.logger.error(traceback.format_exc())
0263 pro_ret = ReturnCode.Failed.value
0264 return pro_ret
0265
0266 def init_event_function_map(self):
0267 self.event_func_map = {
0268 EventType.Message: {
0269 'pre_check': self.is_ok_to_run_more_workers,
0270 'exec_func': self.process_messages_event
0271 }
0272 }
0273
0274 def run(self):
0275 """
0276 Main run function.
0277 """
0278 try:
0279 self.logger.info("Starting main thread")
0280 self.init_thread_info()
0281
0282 self.add_default_tasks()
0283
0284 task = self.create_task(task_func=self.load_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=600, priority=1)
0285 self.add_task(task)
0286
0287 if self.mode == "single":
0288 self.logger.debug("single mode")
0289 self.add_receiver_monitor_task()
0290
0291 self.load_plugins()
0292
0293 self.add_health_message_task()
0294
0295 log_prefix = "<Message>"
0296 self.log_prefix = log_prefix
0297
0298
0299 self.init_event_function_map()
0300
0301 self.start_receiver()
0302
0303 time_start = None
0304 while not self.graceful_stop.is_set():
0305 try:
0306 self.execute_schedules()
0307
0308 if not time_start or time.time() > time_start + self.bulk_message_delay:
0309 self.logger.info(f"is_selected: {self.is_selected()}, is_receiver_started: {self.is_receiver_started()}")
0310 if self.is_selected():
0311 if not self.is_receiver_started():
0312 self.resume_receiver()
0313
0314 if not self.is_selected():
0315 if self.is_receiver_started():
0316 self.suspend_receiver()
0317
0318 time_start = time.time()
0319 msg = self.get_output_messages()
0320 if msg:
0321 event = MessageEvent(message=msg)
0322
0323
0324 self.submit(self.process_messages_event, **{"event": event})
0325
0326 self.graceful_stop.wait(0.00001)
0327 except IDDSException as error:
0328 self.logger.error("Main thread IDDSException: %s" % str(error))
0329 except Exception as error:
0330 self.logger.critical("Main thread exception: %s\n%s" % (str(error), traceback.format_exc()))
0331 except KeyboardInterrupt:
0332 self.stop()
0333
0334 def stop(self):
0335 super(Receiver, self).stop()
0336 self.stop_receiver()
0337
0338
0339 if __name__ == '__main__':
0340 agent = Receiver()
0341 agent()