File indexing completed on 2026-04-10 07:58:41
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import logging
0013 import random
0014 import socket
0015 import threading
0016 import time
0017 import traceback
0018 import stomp
0019
0020 from idds.common.plugin.plugin_base import PluginBase
0021 from idds.common.utils import setup_logging, get_logger, json_dumps, json_loads
0022
0023
0024 setup_logging(__name__)
0025 logging.getLogger("stomp").setLevel(logging.CRITICAL)
0026
0027
0028 class MessagingListener(stomp.ConnectionListener):
0029 '''
0030 Messaging Listener
0031 '''
0032 def __init__(self, broker, output_queue, name=None, logger=None):
0033 '''
0034 __init__
0035 '''
0036
0037 self.name = name
0038 if not self.name:
0039 self.name = 'default'
0040 self.__broker = broker
0041 self.__output_queue = output_queue
0042
0043 if logger:
0044 self.logger = logger
0045 else:
0046 self.logger = get_logger(self.__class__.__name__)
0047
0048 def on_error(self, frame):
0049 '''
0050 Error handler
0051 '''
0052 self.logger.error('[broker] [%s]: %s', self.__broker, frame.body)
0053
0054 def on_message(self, frame):
0055 self.logger.debug(f'[broker] {self.name} [{self.__broker}]: headers: {frame.headers}, body: {frame.body}')
0056 headers = frame.headers
0057 from_idds = headers.get('from_idds', 'false')
0058 self.__output_queue.put({'name': self.name, 'from_idds': from_idds, 'msg': {'headers': frame.headers, 'body': json_loads(frame.body)}})
0059 pass
0060
0061
0062 class MessagingSender(PluginBase, threading.Thread):
0063 def __init__(self, name="MessagingSender", logger=None, **kwargs):
0064 threading.Thread.__init__(self, name=name)
0065 super(MessagingSender, self).__init__(name=name, logger=logger, **kwargs)
0066
0067 if logger:
0068 self.logger = logger
0069 self.graceful_stop = threading.Event()
0070 self.graceful_suspend = threading.Event()
0071
0072 self.request_queue = None
0073 self.output_queue = None
0074 self.response_queue = None
0075
0076 if not hasattr(self, 'channels'):
0077 raise Exception('"channels" is required but not defined.')
0078 self.channels = json_loads(self.channels)
0079
0080 self.broker_timeout = 3600
0081
0082 if not hasattr(self, 'timetolive'):
0083 self.timetolive = 12 * 3600 * 1000
0084 else:
0085 self.timetolive = int(self.timetolive)
0086
0087 self.conns = []
0088
0089 def setup_logger(self, logger):
0090 if logger:
0091 self.logger = logger
0092 else:
0093 self.logger = get_logger(self.__class__.__name__)
0094
0095 def set_logger(self, logger):
0096 self.logger = logger
0097
0098 def get_logger(self):
0099 return self.logger
0100
0101 def stop(self):
0102 self.graceful_stop.set()
0103
0104 def suspend(self):
0105 self.graceful_suspend.set()
0106
0107 def resume(self):
0108 self.graceful_suspend.clear()
0109
0110 def is_processing(self):
0111 return (not self.graceful_stop.is_set()) and (not self.graceful_suspend.is_set())
0112
0113 def set_request_queue(self, request_queue):
0114 self.request_queue = request_queue
0115
0116 def set_output_queue(self, output_queue):
0117 self.output_queue = output_queue
0118
0119 def set_response_queue(self, response_queue):
0120 self.response_queue = response_queue
0121
0122 def connect_to_messaging_brokers(self, sender=True):
0123 channel_conns = {}
0124 for name in self.channels:
0125 channel = self.channels[name]
0126 if channel and 'brokers' in channel:
0127 brokers = channel['brokers']
0128 if type(brokers) in [list, tuple]:
0129 pass
0130 else:
0131 brokers = brokers.split(",")
0132
0133
0134
0135 broker_timeout = channel['broker_timeout']
0136
0137 broker_addresses = []
0138 for b in brokers:
0139 try:
0140 b, port = b.split(":")
0141
0142 addrinfos = socket.getaddrinfo(b, 0, socket.AF_INET, 0, socket.IPPROTO_TCP)
0143 for addrinfo in addrinfos:
0144 b_addr = addrinfo[4][0]
0145 broker_addresses.append((b_addr, port))
0146 except socket.gaierror as error:
0147 self.logger.error('Cannot resolve hostname %s: %s' % (b, str(error)))
0148
0149 self.logger.info("Resolved broker addresses for channel %s: %s" % (name, broker_addresses))
0150
0151 timeout = broker_timeout
0152
0153 conns = []
0154 for broker, port in broker_addresses:
0155 conn = stomp.Connection12(host_and_ports=[(broker, port)],
0156 keepalive=True,
0157 heartbeats=(30000, 30000),
0158 timeout=timeout)
0159 conns.append(conn)
0160 channel_conns[name] = conns
0161 else:
0162 channel_conns[name] = None
0163 return channel_conns
0164
0165 def disconnect(self, conns):
0166 for name in conns:
0167 if conns[name]:
0168 for conn in conns[name]:
0169 try:
0170 if conn.is_connected():
0171 conn.disconnect()
0172 except Exception:
0173 pass
0174
0175 def get_connection(self, destination):
0176 try:
0177 if destination not in self.conns:
0178 destination = 'default'
0179 if self.conns[destination]:
0180 conn = random.sample(self.conns[destination], 1)[0]
0181 queue_dest = self.channels[destination]['destination']
0182 username = self.channels[destination]['username']
0183 password = self.channels[destination]['password']
0184 if not conn.is_connected():
0185
0186 conn.connect(username, password, wait=True)
0187 return conn, queue_dest, destination
0188 elif self.conns[destination] is None:
0189 return None, None, destination
0190 else:
0191
0192 pass
0193 except Exception as error:
0194 self.logger.error("Failed to connect to message broker(will re-resolve brokers): %s" % str(error))
0195
0196 self.disconnect(self.conns)
0197
0198 try:
0199 self.conns = self.connect_to_messaging_brokers(sender=True)
0200 if destination not in self.conns:
0201 destination = 'default'
0202 conn = random.sample(self.conns[destination], 1)[0]
0203 queue_dest = self.channels[destination]['destination']
0204 if not conn.is_connected():
0205 conn.connect(self.username, self.password, wait=True)
0206 return conn, queue_dest, destination
0207 except Exception as error:
0208 self.logger.error("Failed to connect to message broker(will re-resolve brokers): %s" % str(error))
0209
0210 def send_message(self, msg):
0211 destination = msg['destination'] if 'destination' in msg else 'default'
0212 conn, queue_dest, destination = self.get_connection(destination)
0213
0214 from_idds = 'false'
0215 if 'from_idds' in msg and msg['from_idds']:
0216 from_idds = 'true'
0217
0218 if conn:
0219 self.logger.info("Sending message to message broker(%s): %s" % (destination, msg['msg_id']))
0220 self.logger.debug("Sending message to message broker(%s): %s" % (destination, json_dumps(msg['msg_content'])))
0221 if type(msg['msg_content']) in [dict] and 'headers' in msg['msg_content'] and 'body' in msg['msg_content']:
0222 msg['msg_content']['headers']['from_idds'] = from_idds
0223 conn.send(body=json_dumps(msg['msg_content']['body']),
0224 headers=msg['msg_content']['headers'],
0225 destination=queue_dest,
0226 id='atlas-idds-messaging',
0227 ack='auto')
0228 else:
0229 conn.send(body=json_dumps(msg['msg_content']),
0230 destination=queue_dest,
0231 id='atlas-idds-messaging',
0232 ack='auto',
0233 headers={'persistent': 'true',
0234 'ttl': self.timetolive,
0235 'vo': 'atlas',
0236 'from_idds': from_idds,
0237 'msg_type': str(msg['msg_type']).lower()})
0238 else:
0239 self.logger.info("No brokers defined, discard(%s): %s" % (destination, msg['msg_id']))
0240
0241 def execute_send(self):
0242 try:
0243 self.conns = self.connect_to_messaging_brokers(sender=True)
0244 except Exception as error:
0245 self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))
0246
0247 while not self.graceful_stop.is_set():
0248 try:
0249 if not self.request_queue.empty():
0250 msg = self.request_queue.get(False)
0251 if msg:
0252 self.send_message(msg)
0253 self.response_queue.put(msg)
0254 else:
0255 time.sleep(0.1)
0256 except Exception as error:
0257 self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))
0258
0259 def run(self):
0260 try:
0261 self.execute_send()
0262 except Exception as error:
0263 self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))
0264
0265 def __call__(self):
0266 self.run()
0267
0268
0269 class MessagingReceiver(MessagingSender):
0270 def __init__(self, name="MessagingReceiver", logger=None, **kwargs):
0271 super(MessagingReceiver, self).__init__(name=name, logger=logger, **kwargs)
0272 self.listener = {}
0273 self.receiver_conns = []
0274
0275 def get_listener(self, broker, name):
0276 if self.listener is None:
0277 self.listener = {}
0278 if name not in self.listener:
0279 self.listener[name] = MessagingListener(broker, self.output_queue, name=name, logger=self.logger)
0280 return self.listener[name]
0281
0282 def subscribe(self):
0283 self.receiver_conns = self.connect_to_messaging_brokers()
0284
0285 for name in self.receiver_conns:
0286 for conn in self.receiver_conns[name]:
0287 self.logger.info(f'connecting to {name}: {conn.transport._Transport__host_and_ports[0][0]}')
0288 conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0], name))
0289 conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True)
0290 conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto')
0291
0292 def execute_subscribe(self):
0293 try:
0294 self.subscribe()
0295 except Exception as error:
0296 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0297
0298 sleep_count = 0
0299 while not self.graceful_stop.is_set():
0300 if self.graceful_suspend.is_set():
0301 try:
0302 self.disconnect(self.receiver_conns)
0303 except Exception as error:
0304 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0305 time.sleep(1)
0306 sleep_count += 1
0307 if sleep_count > 300:
0308 self.logger.info("graceful_suspend is set. sleeping")
0309 sleep_count = 0
0310 else:
0311 sleep_count = 0
0312 has_failed_connection = False
0313 try:
0314 for name in self.receiver_conns:
0315 for conn in self.receiver_conns[name]:
0316 if not conn.is_connected():
0317 conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0], name))
0318
0319 conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True)
0320 conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto')
0321 time.sleep(0.1)
0322 except Exception as error:
0323 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0324 has_failed_connection = True
0325
0326 if has_failed_connection or len(self.receiver_conns) == 0:
0327 try:
0328
0329 self.disconnect(self.receiver_conns)
0330 self.subscribe()
0331 except Exception as error:
0332 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0333
0334 self.logger.info('receiver graceful stop requested')
0335
0336 self.disconnect(self.receiver_conns)
0337
0338 def run(self):
0339 try:
0340 self.execute_subscribe()
0341 except Exception as error:
0342 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0343
0344 def __call__(self):
0345 self.run()
0346
0347
0348 class MessagingMessager(MessagingReceiver):
0349 def __init__(self, name="MessagingMessager", logger=None, **kwargs):
0350 super(MessagingMessager, self).__init__(name=name, logger=logger, **kwargs)
0351
0352 def execute_send_subscribe(self):
0353 try:
0354 self.conns = self.connect_to_messaging_brokers(sender=True)
0355 self.subscribe()
0356 except Exception as error:
0357 self.logger.error("Messaging sender_subscriber throws an exception: %s, %s" % (error, traceback.format_exc()))
0358
0359 while not self.graceful_stop.is_set():
0360
0361 while True:
0362 try:
0363 if not self.request_queue.empty():
0364 msg = self.request_queue.get(False)
0365 if msg:
0366 self.send_message(msg)
0367 if self.response_queue:
0368 self.response_queue.put(msg)
0369 else:
0370 break
0371 except Exception as error:
0372 self.logger.error("Messaging sender throws an exception: %s, %s" % (error, traceback.format_exc()))
0373
0374
0375 has_failed_connection = False
0376 try:
0377 for name in self.receiver_conns:
0378 for conn in self.receiver_conns[name]:
0379 if not conn.is_connected():
0380 conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0]))
0381
0382 conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True)
0383 conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto')
0384 except Exception as error:
0385 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0386 has_failed_connection = True
0387
0388 if has_failed_connection or len(self.receiver_conns) == 0:
0389 try:
0390
0391 self.disconnect(self.receiver_conns)
0392 self.subscribe()
0393 except Exception as error:
0394 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0395
0396 time.sleep(0.1)
0397
0398 self.logger.info('sender_receiver graceful stop requested')
0399 self.disconnect(self.conns)
0400 self.disconnect(self.receiver_conns)
0401
0402 def run(self):
0403 try:
0404 self.execute_send_subscribe()
0405 except Exception as error:
0406 self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc()))
0407
0408 def __call__(self):
0409 self.run()