Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 07:58:41

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2019 - 2023
0010 
0011 
0012 import logging
0013 import random
0014 import socket
0015 import threading
0016 import time
0017 import traceback
0018 import stomp
0019 
0020 from idds.common.plugin.plugin_base import PluginBase
0021 from idds.common.utils import setup_logging, get_logger, json_dumps, json_loads
0022 
0023 
0024 setup_logging(__name__)
0025 logging.getLogger("stomp").setLevel(logging.CRITICAL)
0026 
0027 
0028 class MessagingListener(stomp.ConnectionListener):
0029     '''
0030     Messaging Listener
0031     '''
0032     def __init__(self, broker, output_queue, name=None, logger=None):
0033         '''
0034         __init__
0035         '''
0036         # self.name = "MessagingListener"
0037         self.name = name
0038         if not self.name:
0039             self.name = 'default'
0040         self.__broker = broker
0041         self.__output_queue = output_queue
0042         # self.logger = logging.getLogger(self.__class__.__name__)
0043         if logger:
0044             self.logger = logger
0045         else:
0046             self.logger = get_logger(self.__class__.__name__)
0047 
0048     def on_error(self, frame):
0049         '''
0050         Error handler
0051         '''
0052         self.logger.error('[broker] [%s]: %s', self.__broker, frame.body)
0053 
0054     def on_message(self, frame):
0055         self.logger.debug(f'[broker] {self.name} [{self.__broker}]: headers: {frame.headers}, body: {frame.body}')
0056         headers = frame.headers
0057         from_idds = headers.get('from_idds', 'false')
0058         self.__output_queue.put({'name': self.name, 'from_idds': from_idds, 'msg': {'headers': frame.headers, 'body': json_loads(frame.body)}})
0059         pass
0060 
0061 
0062 class MessagingSender(PluginBase, threading.Thread):
0063     def __init__(self, name="MessagingSender", logger=None, **kwargs):
0064         threading.Thread.__init__(self, name=name)
0065         super(MessagingSender, self).__init__(name=name, logger=logger, **kwargs)
0066 
0067         if logger:
0068             self.logger = logger
0069         self.graceful_stop = threading.Event()
0070         self.graceful_suspend = threading.Event()
0071 
0072         self.request_queue = None
0073         self.output_queue = None
0074         self.response_queue = None
0075 
0076         if not hasattr(self, 'channels'):
0077             raise Exception('"channels" is required but not defined.')
0078         self.channels = json_loads(self.channels)
0079 
0080         self.broker_timeout = 3600
0081 
0082         if not hasattr(self, 'timetolive'):
0083             self.timetolive = 12 * 3600 * 1000     # milliseconds
0084         else:
0085             self.timetolive = int(self.timetolive)
0086 
0087         self.conns = []
0088 
0089     def setup_logger(self, logger):
0090         if logger:
0091             self.logger = logger
0092         else:
0093             self.logger = get_logger(self.__class__.__name__)
0094 
0095     def set_logger(self, logger):
0096         self.logger = logger
0097 
0098     def get_logger(self):
0099         return self.logger
0100 
0101     def stop(self):
0102         self.graceful_stop.set()
0103 
0104     def suspend(self):
0105         self.graceful_suspend.set()
0106 
0107     def resume(self):
0108         self.graceful_suspend.clear()
0109 
0110     def is_processing(self):
0111         return (not self.graceful_stop.is_set()) and (not self.graceful_suspend.is_set())
0112 
0113     def set_request_queue(self, request_queue):
0114         self.request_queue = request_queue
0115 
0116     def set_output_queue(self, output_queue):
0117         self.output_queue = output_queue
0118 
0119     def set_response_queue(self, response_queue):
0120         self.response_queue = response_queue
0121 
0122     def connect_to_messaging_brokers(self, sender=True):
0123         channel_conns = {}
0124         for name in self.channels:
0125             channel = self.channels[name]
0126             if channel and 'brokers' in channel:
0127                 brokers = channel['brokers']
0128                 if type(brokers) in [list, tuple]:
0129                     pass
0130                 else:
0131                     brokers = brokers.split(",")
0132                 # destination = channel['destination']
0133                 # username = channel['username']
0134                 # password = channel['password']
0135                 broker_timeout = channel['broker_timeout']
0136 
0137                 broker_addresses = []
0138                 for b in brokers:
0139                     try:
0140                         b, port = b.split(":")
0141 
0142                         addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
0143                         for addrinfo in addrinfos:
0144                             b_addr = addrinfo[4][0]
0145                             broker_addresses.append((b_addr, port))
0146                     except socket.gaierror as error:
0147                         self.logger.error('Cannot resolve hostname %s: %s' % (b, str(error)))
0148 
0149                 self.logger.info("Resolved broker addresses for channel %s: %s" % (name, broker_addresses))
0150 
0151                 timeout = broker_timeout
0152 
0153                 conns = []
0154                 for broker, port in broker_addresses:
0155                     conn = stomp.Connection12(host_and_ports=[(broker, port)],
0156                                               keepalive=True,
0157                                               heartbeats=(30000, 30000),     # half minute = num / 1000
0158                                               timeout=timeout)
0159                     conns.append(conn)
0160                 channel_conns[name] = conns
0161             else:
0162                 channel_conns[name] = None
0163         return channel_conns
0164 
0165     def disconnect(self, conns):
0166         for name in conns:
0167             if conns[name]:
0168                 for conn in conns[name]:
0169                     try:
0170                         if conn.is_connected():
0171                             conn.disconnect()
0172                     except Exception:
0173                         pass
0174 
0175     def get_connection(self, destination):
0176         try:
0177             if destination not in self.conns:
0178                 destination = 'default'
0179             if self.conns[destination]:
0180                 conn = random.sample(self.conns[destination], 1)[0]
0181                 queue_dest = self.channels[destination]['destination']
0182                 username = self.channels[destination]['username']
0183                 password = self.channels[destination]['password']
0184                 if not conn.is_connected():
0185                     # conn.start()
0186                     conn.connect(username, password, wait=True)
0187                 return conn, queue_dest, destination
0188             elif self.conns[destination] is None:
0189                 return None, None, destination
0190             else:
0191                 # return None, None, destination
0192                 pass
0193         except Exception as error:
0194             self.logger.error("Failed to connect to message broker(will re-resolve brokers): %s" % str(error))
0195 
0196         self.disconnect(self.conns)
0197 
0198         try:
0199             self.conns = self.connect_to_messaging_brokers(sender=True)
0200             if destination not in self.conns:
0201                 destination = 'default'
0202             conn = random.sample(self.conns[destination], 1)[0]
0203             queue_dest = self.channels[destination]['destination']
0204             if not conn.is_connected():
0205                 conn.connect(self.username, self.password, wait=True)
0206                 return conn, queue_dest, destination
0207         except Exception as error:
0208             self.logger.error("Failed to connect to message broker(will re-resolve brokers): %s" % str(error))
0209 
0210     def send_message(self, msg):
0211         destination = msg['destination'] if 'destination' in msg else 'default'
0212         conn, queue_dest, destination = self.get_connection(destination)
0213 
0214         from_idds = 'false'
0215         if 'from_idds' in msg and msg['from_idds']:
0216             from_idds = 'true'
0217 
0218         if conn:
0219             self.logger.info("Sending message to message broker(%s): %s" % (destination, msg['msg_id']))
0220             self.logger.debug("Sending message to message broker(%s): %s" % (destination, json_dumps(msg['msg_content'])))
0221             if type(msg['msg_content']) in [dict] and 'headers' in msg['msg_content'] and 'body' in msg['msg_content']:
0222                 msg['msg_content']['headers']['from_idds'] = from_idds
0223                 conn.send(body=json_dumps(msg['msg_content']['body']),
0224                           headers=msg['msg_content']['headers'],
0225                           destination=queue_dest,
0226                           id='atlas-idds-messaging',
0227                           ack='auto')
0228             else:
0229                 conn.send(body=json_dumps(msg['msg_content']),
0230                           destination=queue_dest,
0231                           id='atlas-idds-messaging',
0232                           ack='auto',
0233                           headers={'persistent': 'true',
0234                                    'ttl': self.timetolive,
0235                                    'vo': 'atlas',
0236                                    'from_idds': from_idds,
0237                                    'msg_type': str(msg['msg_type']).lower()})
0238         else:
0239             self.logger.info("No brokers defined, discard(%s): %s" % (destination, msg['msg_id']))
0240 
0241     def execute_send(self):
0242         try:
0243             self.conns = self.connect_to_messaging_brokers(sender=True)
0244         except Exception as error:
0245             self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))
0246 
0247         while not self.graceful_stop.is_set():
0248             try:
0249                 if not self.request_queue.empty():
0250                     msg = self.request_queue.get(False)
0251                     if msg:
0252                         self.send_message(msg)
0253                         self.response_queue.put(msg)
0254                 else:
0255                     time.sleep(0.1)
0256             except Exception as error:
0257                 self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))
0258 
0259     def run(self):
0260         try:
0261             self.execute_send()
0262         except Exception as error:
0263             self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))
0264 
0265     def __call__(self):
0266         self.run()
0267 
0268 
0269 class MessagingReceiver(MessagingSender):
0270     def __init__(self, name="MessagingReceiver", logger=None, **kwargs):
0271         super(MessagingReceiver, self).__init__(name=name, logger=logger, **kwargs)
0272         self.listener = {}
0273         self.receiver_conns = []
0274 
0275     def get_listener(self, broker, name):
0276         if self.listener is None:
0277             self.listener = {}
0278         if name not in self.listener:
0279             self.listener[name] = MessagingListener(broker, self.output_queue, name=name, logger=self.logger)
0280         return self.listener[name]
0281 
0282     def subscribe(self):
0283         self.receiver_conns = self.connect_to_messaging_brokers()
0284 
0285         for name in self.receiver_conns:
0286             for conn in self.receiver_conns[name]:
0287                 self.logger.info(f'connecting to {name}: {conn.transport._Transport__host_and_ports[0][0]}')
0288                 conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0], name))
0289                 conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True)
0290                 conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto')
0291 
0292     def execute_subscribe(self):
0293         try:
0294             self.subscribe()
0295         except Exception as error:
0296             self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0297 
0298         sleep_count = 0
0299         while not self.graceful_stop.is_set():
0300             if self.graceful_suspend.is_set():
0301                 try:
0302                     self.disconnect(self.receiver_conns)
0303                 except Exception as error:
0304                     self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0305                 time.sleep(1)
0306                 sleep_count += 1
0307                 if sleep_count > 300:
0308                     self.logger.info("graceful_suspend is set. sleeping")
0309                     sleep_count = 0
0310             else:
0311                 sleep_count = 0
0312                 has_failed_connection = False
0313                 try:
0314                     for name in self.receiver_conns:
0315                         for conn in self.receiver_conns[name]:
0316                             if not conn.is_connected():
0317                                 conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0], name))
0318                                 # conn.start()
0319                                 conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True)
0320                                 conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto')
0321                     time.sleep(0.1)
0322                 except Exception as error:
0323                     self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0324                     has_failed_connection = True
0325 
0326                 if has_failed_connection or len(self.receiver_conns) == 0:
0327                     try:
0328                         # re-subscribe
0329                         self.disconnect(self.receiver_conns)
0330                         self.subscribe()
0331                     except Exception as error:
0332                         self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0333 
0334         self.logger.info('receiver graceful stop requested')
0335 
0336         self.disconnect(self.receiver_conns)
0337 
0338     def run(self):
0339         try:
0340             self.execute_subscribe()
0341         except Exception as error:
0342             self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0343 
0344     def __call__(self):
0345         self.run()
0346 
0347 
0348 class MessagingMessager(MessagingReceiver):
0349     def __init__(self, name="MessagingMessager", logger=None, **kwargs):
0350         super(MessagingMessager, self).__init__(name=name, logger=logger, **kwargs)
0351 
0352     def execute_send_subscribe(self):
0353         try:
0354             self.conns = self.connect_to_messaging_brokers(sender=True)
0355             self.subscribe()
0356         except Exception as error:
0357             self.logger.error("Messaging sender_subscriber throws an exception: %s, %s" % (error, traceback.format_exc()))
0358 
0359         while not self.graceful_stop.is_set():
0360             # send
0361             while True:
0362                 try:
0363                     if not self.request_queue.empty():
0364                         msg = self.request_queue.get(False)
0365                         if msg:
0366                             self.send_message(msg)
0367                             if self.response_queue:
0368                                 self.response_queue.put(msg)
0369                     else:
0370                         break
0371                 except Exception as error:
0372                     self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))
0373 
0374             # subscribe
0375             has_failed_connection = False
0376             try:
0377                 for name in self.receiver_conns:
0378                     for conn in self.receiver_conns[name]:
0379                         if not conn.is_connected():
0380                             conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0]))
0381                             # conn.start()
0382                             conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True)
0383                             conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto')
0384             except Exception as error:
0385                 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0386                 has_failed_connection = True
0387 
0388             if has_failed_connection or len(self.receiver_conns) == 0:
0389                 try:
0390                     # re-subscribe
0391                     self.disconnect(self.receiver_conns)
0392                     self.subscribe()
0393                 except Exception as error:
0394                     self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0395 
0396             time.sleep(0.1)
0397 
0398         self.logger.info('sender_receiver graceful stop requested')
0399         self.disconnect(self.conns)
0400         self.disconnect(self.receiver_conns)
0401 
0402     def run(self):
0403         try:
0404             self.execute_send_subscribe()
0405         except Exception as error:
0406             self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0407 
0408     def __call__(self):
0409         self.run()