File indexing completed on 2026-04-10 07:58:41
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import logging
0012 import random
0013 import socket
0014 import string
0015 import time
0016 import threading
0017 import traceback
0018 import uuid
0019
0020 import zmq
0021 from zmq.auth.thread import ThreadAuthenticator
0022
0023 from idds.common.utils import json_dumps, json_loads
0024
0025 from .event import StateClaimEvent, EventBusState, TestEvent
0026 from .baseeventbusbackend import BaseEventBusBackend
0027
0028
0029 class MsgEventBusBackendReceiver(threading.Thread):
0030 def __init__(self, name="MsgEventBusBackendReceiver", logger=None, debug=False,
0031 graceful_stop=None, coordinator=None, coordinator_socket=None, **kwargs):
0032 threading.Thread.__init__(self, name=name)
0033 self.logger = logger
0034 self.graceful_stop = graceful_stop
0035 self.coordinator = coordinator
0036 self.coordinator_socket = coordinator_socket
0037
0038 self._events = {}
0039 self._events_index = {}
0040 self._events_act_id_index = {}
0041 self._events_history = {}
0042 self._events_history_clean_time = time.time()
0043 self._events_insert_time = {}
0044
0045 self.max_delay = 180
0046
0047 self._stop = threading.Event()
0048 self._lock = threading.RLock()
0049
0050 self.debug = debug
0051
0052 def set_coordinator(self, coordinator):
0053 self.coordinator = coordinator
0054
0055 def stop(self):
0056 self._stop.set()
0057
0058 def run(self):
0059 while not self.graceful_stop.is_set():
0060 try:
0061 if self._stop.is_set():
0062 return
0063
0064 try:
0065 req = self.coordinator_socket.recv_string()
0066 if self.debug:
0067 self.logger.debug("MsgEventBusBackendReceiver received: %s" % req)
0068 except Exception as error:
0069 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0070 self.coordinator_socket.close()
0071
0072 try:
0073 req = json_loads(req)
0074 reply = {'ret': None}
0075 if self.coordinator:
0076 if req['type'] == 'send_event':
0077 event = req['event']
0078 ret = self.coordinator.send(event)
0079 reply = {'type': 'send_event_ret', 'ret': ret}
0080 elif req['type'] == 'send_bulk':
0081 events = req['events']
0082 ret = self.coordinator.send_bulk(events)
0083 reply = {'type': 'send_bulk_ret', 'ret': ret}
0084 elif req['type'] == 'get_event':
0085 event_type = req['event_type']
0086 num_events = req['num_events']
0087 wait = req['wait']
0088 ret = self.coordinator.get(event_type, num_events=num_events, wait=wait)
0089 reply = {'type': 'get_event_ret', 'ret': ret}
0090 else:
0091 if req['type'] == 'send_event':
0092 event = req['event']
0093 ret = self.send(event)
0094 reply = {'type': 'send_event_ret', 'ret': ret}
0095 elif req['type'] == 'send_bulk':
0096 events = req['events']
0097 ret = self.send_bulk(events)
0098 reply = {'type': 'send_bulk_ret', 'ret': ret}
0099 elif req['type'] == 'get_event':
0100 event_type = req['event_type']
0101 num_events = req['num_events']
0102 wait = req['wait']
0103 ret = self.get(event_type, num_events=num_events, wait=wait)
0104 reply = {'type': 'get_event_ret', 'ret': ret}
0105 except Exception as error:
0106 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0107 reply = {'type': 'error', 'ret': None}
0108
0109 reply = json_dumps(reply)
0110 try:
0111 if self.debug:
0112 self.logger.debug("MsgEventBusBackendReceiver reply: %s" % reply)
0113 self.coordinator_socket.send_string(reply)
0114 except Exception as error:
0115 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0116 self.coordinator_socket.close()
0117
0118 self.graceful_stop.wait(0.1)
0119 except Exception as error:
0120 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0121
0122 def insert_event(self, event):
0123 if event._event_type not in self._events:
0124 self._events[event._event_type] = {}
0125 self._events_index[event._event_type] = []
0126 self._events_act_id_index[event._event_type] = {}
0127 self._events_history[event._event_type] = {}
0128 self._events_insert_time[event._event_type] = {}
0129
0130 self.logger.debug("All events: %s" % self._events)
0131
0132 merged = False
0133 event_act_id = event.get_event_id()
0134 if event_act_id not in self._events_act_id_index[event._event_type]:
0135 self._events_act_id_index[event._event_type][event_act_id] = [event._id]
0136 else:
0137 old_event_ids = self._events_act_id_index[event._event_type][event_act_id].copy()
0138 for old_event_id in old_event_ids:
0139 if old_event_id not in self._events[event._event_type]:
0140 self._events_act_id_index[event._event_type][event_act_id].remove(old_event_id)
0141 else:
0142 old_event = self._events[event._event_type][old_event_id]
0143 if event.able_to_merge(old_event):
0144 old_event.merge(event)
0145 self._events[event._event_type][old_event_id] = old_event
0146 self.logger.debug("New event %s is merged to old event %s" % (event, old_event))
0147 merged = True
0148 if not merged:
0149 self._events_act_id_index[event._event_type][event_act_id].append(event._id)
0150
0151 if not merged:
0152 if event_act_id not in self._events_history[event._event_type]:
0153 self._events[event._event_type][event._id] = event
0154 self._events_index[event._event_type].insert(0, event._id)
0155 self._events_insert_time[event._event_type][event._id] = time.time()
0156 self.logger.debug("Insert new event: %s" % event)
0157 else:
0158 hist_time = self._events_history[event._event_type][event_act_id]
0159 insert_loc = len(self._events_index[event._event_type])
0160 q_event_ids = self._events_index[event._event_type].copy()
0161 q_event_ids.reverse()
0162 for q_event_id in q_event_ids:
0163 q_event = self._events[event._event_type][q_event_id]
0164 q_event_act_id = q_event.get_event_id()
0165 if (q_event_act_id not in self._events_history[event._event_type] or self._events_insert_time[event._event_type][q_event_id] + self.max_delay < time.time()):
0166 break
0167 elif self._events_history[event._event_type][q_event_act_id] > hist_time:
0168 insert_loc -= 1
0169 else:
0170 break
0171 self._events[event._event_type][event._id] = event
0172 self._events_index[event._event_type].insert(insert_loc, event._id)
0173 self._events_insert_time[event._event_type][event._id] = time.time()
0174 self.logger.debug("Insert new event: %s" % event)
0175
0176 def clean_events(self):
0177 if self._events_history_clean_time + 3600 * 4 < time.time():
0178 self._events_history_clean_time = time.time()
0179 for event_type in self._events_index:
0180 event_act_ids = []
0181 for event_id in self._events_index[event_type]:
0182 event = self._events[event_type][event_id]
0183 act_id = event.get_event_id()
0184 event_act_ids.append(act_id)
0185
0186 event_history_keys = list(self._events_history[event_type].keys())
0187 for key in event_history_keys:
0188 if key not in event_act_ids:
0189 del self._events_history[event_type][key]
0190
0191 act_id_keys = list(self._events_act_id_index[event_type].keys())
0192 for act_id_key in act_id_keys:
0193 act_id2ids = self._events_act_id_index[event_type][act_id_key].copy()
0194 for q_id in act_id2ids:
0195 if q_id not in self._events_index[event_type]:
0196 self._events_act_id_index[event_type][act_id_key].remove(q_id)
0197 if not self._events_act_id_index[event_type][act_id_key]:
0198 del self._events_act_id_index[event_type][act_id_key]
0199
0200 def send(self, event):
0201 with self._lock:
0202 self.insert_event(event)
0203 self.clean_events()
0204
0205 def send_bulk(self, events):
0206 with self._lock:
0207 for event in events:
0208 self.insert_event(event)
0209 self.clean_events()
0210
0211 def get(self, event_type, num_events=1, wait=0, callback=None):
0212 with self._lock:
0213 events = []
0214 for i in range(num_events):
0215 if event_type in self._events_index and self._events_index[event_type]:
0216 event_id = self._events_index[event_type].pop(0)
0217 event = self._events[event_type][event_id]
0218 event_act_id = event.get_event_id()
0219 self._events_history[event_type][event_act_id] = time.time()
0220 del self._events[event_type][event_id]
0221 del self._events_insert_time[event._event_type][event._id]
0222 events.append(event)
0223 if callback:
0224 for event in events:
0225 callback(event)
0226
0227 return events
0228
0229
0230 class MsgEventBusBackend(BaseEventBusBackend):
0231 """
0232 Msg Event Bus Backend
0233 """
0234
0235 def __init__(self, logger=None, coordinator_port=5556, socket_timeout=10, debug=False,
0236 timeout_threshold=5, failure_threshold=5, failure_timeout=180,
0237 num_of_set_failed_at_threshold=10, connection_retries=3, **kwargs):
0238 super(MsgEventBusBackend, self).__init__()
0239 self._id = str(uuid.uuid4())[:8]
0240 self._state_claim_wait = 60
0241 self._state_claim = StateClaimEvent(self._id, EventBusState.New, time.time())
0242
0243 self.graceful_stop = threading.Event()
0244
0245 self._lock = threading.RLock()
0246
0247 self.max_delay = 180
0248
0249 self._username = 'idds'
0250 self._password = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(20))
0251
0252 self._is_ok = True
0253 self._is_bad = False
0254 self._failed_at = None
0255 self._failure_timeout = int(failure_timeout)
0256 self._num_of_set_failed_at = 0
0257 self._num_of_set_failed_at_threshold = int(num_of_set_failed_at_threshold)
0258 self.num_success = 0
0259 self.num_failures = 0
0260 self.num_timeout = 0
0261 self.cache_events = []
0262
0263 self.setup_logger(logger)
0264
0265 self.socket_timeout = int(socket_timeout)
0266 self.timeout_threshold = int(timeout_threshold)
0267 self.failure_threshold = int(failure_threshold)
0268
0269 self.coordinator_port = int(coordinator_port)
0270 self.context = None
0271 self.auth = None
0272 self.coordinator_socket = None
0273 self.coordinator_con_string = None
0274
0275 self.processor = None
0276
0277 self.manager = None
0278 self.manager_socket = None
0279
0280 self.debug = debug
0281
0282 self.connection_retries = connection_retries
0283
0284 self.init_msg_channel()
0285
0286 def setup_logger(self, logger=None):
0287 """
0288 Setup logger
0289 """
0290 if logger:
0291 self.logger = logger
0292 else:
0293 self.logger = logging.getLogger(self.get_class_name())
0294
0295 def get_class_name(self):
0296 return self.__class__.__name__
0297
0298 def stop(self, signum=None, frame=None):
0299 self.logger.debug("graceful stop")
0300 self.graceful_stop.set()
0301 if self.auth:
0302 self.logger.debug("auth stop")
0303 self.auth.stop()
0304
0305 def init_msg_channel(self):
0306 with self._lock:
0307 for i in range(self.connection_retries):
0308 try:
0309 if not self.context:
0310 self.context = zmq.Context()
0311 if self.auth:
0312 self.auth.stop()
0313
0314 self.auth = ThreadAuthenticator(self.context)
0315 self.auth.start()
0316
0317 self.auth.allow()
0318
0319 self.auth.configure_plain(domain='*', passwords={self._username: self._password})
0320
0321 if not self.coordinator_socket or self.coordinator_socket.closed:
0322 self.coordinator_socket = self.context.socket(zmq.REP)
0323 self.coordinator_socket.plain_server = True
0324 self.coordinator_socket.bind("tcp://*:%s" % self.coordinator_port)
0325
0326 hostname = socket.getfqdn()
0327 self.coordinator_con_string = "tcp://%s:%s" % (hostname, self.coordinator_port)
0328
0329 if self.processor:
0330 self.processor.stop()
0331
0332 self.processor = MsgEventBusBackendReceiver(logger=self.logger,
0333 graceful_stop=self.graceful_stop,
0334 debug=self.debug,
0335 coordinator_socket=self.coordinator_socket,
0336 coordinator=self.coordinator)
0337 self.processor.start()
0338
0339 self._is_bad = False
0340
0341 break
0342 except (zmq.error.ZMQError, zmq.Again) as error:
0343 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0344 self.num_failures += 1
0345 self._is_bad = True
0346 if 'Address already in use' in str(error):
0347 self.coordinator_port = self.coordinator_port + random.randint(1, 100)
0348 self.logger.info(f"Address already in use, switch to new port: {self.coordinator_port}")
0349 except Exception as error:
0350 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0351 self.num_failures += 1
0352 self._is_bad = True
0353
0354 try:
0355 if not self.manager_socket or self.manager_socket.closed:
0356 manager = self.get_manager()
0357 if manager['username'] and manager['password'] and manager['connect']:
0358 self.manager_socket = self.context.socket(zmq.REQ)
0359 self.manager_socket.plain_username = manager['username'].encode('utf-8')
0360 self.manager_socket.plain_password = manager['password'].encode('utf-8')
0361 self.manager_socket.connect(manager['connect'])
0362 else:
0363 self._is_bad = True
0364 except (zmq.error.ZMQError, zmq.Again) as error:
0365 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0366 self.num_failures += 1
0367 self._is_bad = True
0368 except Exception as error:
0369 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0370 self.num_failures += 1
0371 self._is_bad = True
0372
0373 def set_manager(self, manager):
0374 if not manager:
0375 manager = self.get_manager()
0376
0377 if (not self.manager and not manager['connect'] and not manager['username'] and not manager['password']):
0378 if (self.manager['connect'] != manager['connect']
0379 or self.manager['username'] != manager['username']
0380 or self.manager['password'] != manager['password']):
0381 with self._lock:
0382 try:
0383 self.manager = manager
0384 self.manager_socket = self.context.socket(zmq.REQ)
0385 self.manager_socket.plain_username = manager['username'].encode('utf-8')
0386 self.manager_socket.plain_password = manager['password'].encode('utf-8')
0387 self.manager_socket.connect(manager['connect'])
0388 except (zmq.error.ZMQError, zmq.Again) as error:
0389 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0390 self.num_failures += 1
0391 except Exception as error:
0392 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0393 self.num_failures += 1
0394
0395 def get_manager(self, myself=False):
0396 if myself:
0397 if self.coordinator_con_string and self._username and self._password:
0398 manager = {'connect': self.coordinator_con_string,
0399 'username': self._username,
0400 'password': self._password}
0401 return manager
0402
0403 if (self.manager and self.manager['connect'] and self.manager['username'] and self.manager['password']):
0404 return self.manager
0405
0406 if self.coordinator_con_string and self._username and self._password:
0407 manager = {'connect': self.coordinator_con_string,
0408 'username': self._username,
0409 'password': self._password}
0410 return manager
0411 return None
0412
0413 def set_coordinator(self, coordinator):
0414 self.coordinator = coordinator
0415 if self.processor:
0416 self.processor.set_coordinator(coordinator)
0417
0418 def get_coordinator(self):
0419 return self.coordinator
0420
0421 def send(self, event):
0422 with self._lock:
0423 try:
0424 req = {'type': 'send_event', 'event': event}
0425 req = json_dumps(req)
0426
0427 if self.debug:
0428 self.logger.debug("MsgEventBusBackend send event: %s" % req)
0429
0430 if not self.manager_socket or self.manager_socket.closed:
0431 self.init_msg_channel()
0432
0433 self.manager_socket.send_string(req)
0434 if self.manager_socket.poll(self.socket_timeout * 1000):
0435 reply = self.manager_socket.recv_string()
0436
0437 if self.debug:
0438 self.logger.debug("MsgEventBusBackend send event reply: %s" % reply)
0439 reply = json_loads(reply)
0440 ret = reply['ret']
0441
0442
0443 self.num_failures = 0
0444 self.num_timeout = 0
0445 self.num_success += 1
0446 else:
0447 ret = None
0448 self.cache_events.append(event)
0449 self.num_timeout += 1
0450 self.logger.critical("timeout to receive a message")
0451
0452 return ret
0453 except (zmq.error.ZMQError, zmq.Again) as error:
0454 if not self.graceful_stop.is_set():
0455 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0456 self.manager_socket.close()
0457 self.cache_events.append(event)
0458 self.num_failures += 1
0459 except Exception as error:
0460 if not self.graceful_stop.is_set():
0461 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0462 self.manager_socket.close()
0463 self.cache_events.append(event)
0464 self.num_failures += 1
0465
0466 def send_bulk(self, events):
0467 with self._lock:
0468 try:
0469 req = {'type': 'send_bulk', 'events': events}
0470 req = json_dumps(req)
0471
0472 if self.debug:
0473 self.logger.debug("MsgEventBusBackend send bulk event: %s" % req)
0474
0475 if not self.manager_socket or self.manager_socket.closed:
0476 self.init_msg_channel()
0477
0478 self.manager_socket.send_string(req)
0479 if self.manager_socket.poll(self.socket_timeout * 1000):
0480 reply = self.manager_socket.recv_string()
0481
0482 if self.debug:
0483 self.logger.debug("MsgEventBusBackend send bulk event reply: %s" % reply)
0484 reply = json_loads(reply)
0485 ret = reply['ret']
0486
0487
0488 self.num_failures = 0
0489 self.num_timeout = 0
0490 self.num_success += 1
0491 else:
0492 ret = None
0493 for event in events:
0494 self.cache_events.append(event)
0495 self.num_timeout += 1
0496 self.logger.critical("timeout to receive a message")
0497
0498 return ret
0499 except (zmq.error.ZMQError, zmq.Again) as error:
0500 if not self.graceful_stop.is_set():
0501 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0502 self.manager_socket.close()
0503 for event in events:
0504 self.cache_events.append(event)
0505 self.num_failures += 1
0506 except Exception as error:
0507 if not self.graceful_stop.is_set():
0508 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0509 self.manager_socket.close()
0510 for event in events:
0511 self.cache_events.append(event)
0512 self.num_failures += 1
0513
0514 def get(self, event_type, num_events=1, wait=0):
0515 with self._lock:
0516 try:
0517 req = {'type': 'get_event', 'event_type': event_type, 'num_events': num_events, 'wait': wait}
0518 req = json_dumps(req)
0519
0520
0521 if self.debug:
0522 self.logger.debug("MsgEventBusBackend get event: %s" % req)
0523
0524 if not self.manager_socket or self.manager_socket.closed:
0525 self.init_msg_channel()
0526
0527 self.manager_socket.send_string(req)
0528
0529 if self.manager_socket.poll(10 * 1000):
0530 reply = self.manager_socket.recv_string()
0531
0532 if self.debug:
0533 self.logger.debug("MsgEventBusBackend get event reply: %s" % reply)
0534 reply = json_loads(reply)
0535 ret = reply['ret']
0536
0537
0538 self.num_failures = 0
0539 self.num_success += 1
0540 self.num_timeout = 0
0541 else:
0542 ret = None
0543 self.num_timeout += 1
0544 self.logger.critical("timeout to receive a message")
0545
0546 return ret
0547 except (zmq.error.ZMQError, zmq.Again) as error:
0548 if not self.graceful_stop.is_set():
0549 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0550 self.manager_socket.close()
0551 self.num_failures += 1
0552 except Exception as error:
0553 if not self.graceful_stop.is_set():
0554 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0555 self.manager_socket.close()
0556 self.num_failures += 1
0557 return []
0558
0559 def test(self):
0560 if self.num_failures > 0 or self.num_timeout > 0:
0561 event = TestEvent()
0562 self.send(event)
0563 self.get(event._event_type)
0564 if self.num_timeout > 5:
0565 if not self.manager_socket.closed:
0566 self.logger.critical("The number of timeout reached a threshold, close connection.")
0567 self.manager_socket.close()
0568
0569 def send_report(self, event, status, start_time, end_time, source, result):
0570 if self.get_coordinator():
0571 return self.get_coordinator().send_report(event, status, start_time, end_time, source, result)
0572
0573 def clean_event(self, event):
0574 pass
0575
0576 def fail_event(self, event):
0577 pass
0578
0579 def is_ok(self):
0580 if self._is_bad:
0581 self._is_ok = False
0582 elif self._num_of_set_failed_at < self._num_of_set_failed_at_threshold and self._failed_at and self._failed_at + self._failure_timeout < time.time():
0583 self._is_ok = True
0584 self._failed_at = None
0585 self.num_failures = 0
0586 self.num_timeout = 0
0587 elif self.num_failures > self.failure_threshold or self.num_timeout > self.timeout_threshold:
0588 self._is_ok = False
0589 if not self._failed_at:
0590 self._failed_at = time.time()
0591 self._num_of_set_failed_at += 1
0592 else:
0593 self._is_ok = True
0594 return self._is_ok
0595
0596 def replay_cache_events(self):
0597 cache_events = self.cache_events
0598 self.cache_events = []
0599 for event in cache_events:
0600 self.send(event)
0601
0602 def execute(self):
0603 while not self.graceful_stop.is_set():
0604 try:
0605 self.init_msg_channel()
0606 self.test()
0607 if self.is_ok():
0608 self.replay_cache_events()
0609 self.graceful_stop.wait(1)
0610 else:
0611 if self.num_failures > 20:
0612 self.graceful_stop.wait(300)
0613 else:
0614 self.graceful_stop.wait(60)
0615 except Exception as error:
0616 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0617 self.stop()
0618
0619 def run(self):
0620 self.execute()