Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 07:58:41

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023 - 2025
0010 
0011 import logging
0012 import random
0013 import socket
0014 import string
0015 import time
0016 import threading
0017 import traceback
0018 import uuid
0019 
0020 import zmq
0021 from zmq.auth.thread import ThreadAuthenticator
0022 
0023 from idds.common.utils import json_dumps, json_loads
0024 
0025 from .event import StateClaimEvent, EventBusState, TestEvent
0026 from .baseeventbusbackend import BaseEventBusBackend
0027 
0028 
0029 class MsgEventBusBackendReceiver(threading.Thread):
0030     def __init__(self, name="MsgEventBusBackendReceiver", logger=None, debug=False,
0031                  graceful_stop=None, coordinator=None, coordinator_socket=None, **kwargs):
0032         threading.Thread.__init__(self, name=name)
0033         self.logger = logger
0034         self.graceful_stop = graceful_stop
0035         self.coordinator = coordinator
0036         self.coordinator_socket = coordinator_socket
0037 
0038         self._events = {}
0039         self._events_index = {}
0040         self._events_act_id_index = {}
0041         self._events_history = {}
0042         self._events_history_clean_time = time.time()
0043         self._events_insert_time = {}
0044 
0045         self.max_delay = 180
0046 
0047         self._stop = threading.Event()
0048         self._lock = threading.RLock()
0049 
0050         self.debug = debug
0051 
0052     def set_coordinator(self, coordinator):
0053         self.coordinator = coordinator
0054 
0055     def stop(self):
0056         self._stop.set()
0057 
0058     def run(self):
0059         while not self.graceful_stop.is_set():
0060             try:
0061                 if self._stop.is_set():
0062                     return
0063 
0064                 try:
0065                     req = self.coordinator_socket.recv_string()
0066                     if self.debug:
0067                         self.logger.debug("MsgEventBusBackendReceiver received: %s" % req)
0068                 except Exception as error:
0069                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0070                     self.coordinator_socket.close()
0071 
0072                 try:
0073                     req = json_loads(req)
0074                     reply = {'ret': None}
0075                     if self.coordinator:
0076                         if req['type'] == 'send_event':
0077                             event = req['event']
0078                             ret = self.coordinator.send(event)
0079                             reply = {'type': 'send_event_ret', 'ret': ret}
0080                         elif req['type'] == 'send_bulk':
0081                             events = req['events']
0082                             ret = self.coordinator.send_bulk(events)
0083                             reply = {'type': 'send_bulk_ret', 'ret': ret}
0084                         elif req['type'] == 'get_event':
0085                             event_type = req['event_type']
0086                             num_events = req['num_events']
0087                             wait = req['wait']
0088                             ret = self.coordinator.get(event_type, num_events=num_events, wait=wait)
0089                             reply = {'type': 'get_event_ret', 'ret': ret}
0090                     else:
0091                         if req['type'] == 'send_event':
0092                             event = req['event']
0093                             ret = self.send(event)
0094                             reply = {'type': 'send_event_ret', 'ret': ret}
0095                         elif req['type'] == 'send_bulk':
0096                             events = req['events']
0097                             ret = self.send_bulk(events)
0098                             reply = {'type': 'send_bulk_ret', 'ret': ret}
0099                         elif req['type'] == 'get_event':
0100                             event_type = req['event_type']
0101                             num_events = req['num_events']
0102                             wait = req['wait']
0103                             ret = self.get(event_type, num_events=num_events, wait=wait)
0104                             reply = {'type': 'get_event_ret', 'ret': ret}
0105                 except Exception as error:
0106                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0107                     reply = {'type': 'error', 'ret': None}
0108 
0109                 reply = json_dumps(reply)
0110                 try:
0111                     if self.debug:
0112                         self.logger.debug("MsgEventBusBackendReceiver reply: %s" % reply)
0113                     self.coordinator_socket.send_string(reply)
0114                 except Exception as error:
0115                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0116                     self.coordinator_socket.close()
0117 
0118                 self.graceful_stop.wait(0.1)
0119             except Exception as error:
0120                 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0121 
0122     def insert_event(self, event):
0123         if event._event_type not in self._events:
0124             self._events[event._event_type] = {}
0125             self._events_index[event._event_type] = []
0126             self._events_act_id_index[event._event_type] = {}
0127             self._events_history[event._event_type] = {}
0128             self._events_insert_time[event._event_type] = {}
0129 
0130         self.logger.debug("All events: %s" % self._events)
0131 
0132         merged = False
0133         event_act_id = event.get_event_id()
0134         if event_act_id not in self._events_act_id_index[event._event_type]:
0135             self._events_act_id_index[event._event_type][event_act_id] = [event._id]
0136         else:
0137             old_event_ids = self._events_act_id_index[event._event_type][event_act_id].copy()
0138             for old_event_id in old_event_ids:
0139                 if old_event_id not in self._events[event._event_type]:
0140                     self._events_act_id_index[event._event_type][event_act_id].remove(old_event_id)
0141                 else:
0142                     old_event = self._events[event._event_type][old_event_id]
0143                     if event.able_to_merge(old_event):
0144                         old_event.merge(event)
0145                         self._events[event._event_type][old_event_id] = old_event
0146                         self.logger.debug("New event %s is merged to old event %s" % (event, old_event))
0147                         merged = True
0148             if not merged:
0149                 self._events_act_id_index[event._event_type][event_act_id].append(event._id)
0150 
0151         if not merged:
0152             if event_act_id not in self._events_history[event._event_type]:
0153                 self._events[event._event_type][event._id] = event
0154                 self._events_index[event._event_type].insert(0, event._id)
0155                 self._events_insert_time[event._event_type][event._id] = time.time()
0156                 self.logger.debug("Insert new event: %s" % event)
0157             else:
0158                 hist_time = self._events_history[event._event_type][event_act_id]
0159                 insert_loc = len(self._events_index[event._event_type])
0160                 q_event_ids = self._events_index[event._event_type].copy()
0161                 q_event_ids.reverse()
0162                 for q_event_id in q_event_ids:
0163                     q_event = self._events[event._event_type][q_event_id]
0164                     q_event_act_id = q_event.get_event_id()
0165                     if (q_event_act_id not in self._events_history[event._event_type] or self._events_insert_time[event._event_type][q_event_id] + self.max_delay < time.time()):
0166                         break
0167                     elif self._events_history[event._event_type][q_event_act_id] > hist_time:
0168                         insert_loc -= 1
0169                     else:
0170                         break
0171                 self._events[event._event_type][event._id] = event
0172                 self._events_index[event._event_type].insert(insert_loc, event._id)
0173                 self._events_insert_time[event._event_type][event._id] = time.time()
0174                 self.logger.debug("Insert new event: %s" % event)
0175 
0176     def clean_events(self):
0177         if self._events_history_clean_time + 3600 * 4 < time.time():
0178             self._events_history_clean_time = time.time()
0179             for event_type in self._events_index:
0180                 event_act_ids = []
0181                 for event_id in self._events_index[event_type]:
0182                     event = self._events[event_type][event_id]
0183                     act_id = event.get_event_id()
0184                     event_act_ids.append(act_id)
0185 
0186                 event_history_keys = list(self._events_history[event_type].keys())
0187                 for key in event_history_keys:
0188                     if key not in event_act_ids:
0189                         del self._events_history[event_type][key]
0190 
0191                 act_id_keys = list(self._events_act_id_index[event_type].keys())
0192                 for act_id_key in act_id_keys:
0193                     act_id2ids = self._events_act_id_index[event_type][act_id_key].copy()
0194                     for q_id in act_id2ids:
0195                         if q_id not in self._events_index[event_type]:
0196                             self._events_act_id_index[event_type][act_id_key].remove(q_id)
0197                     if not self._events_act_id_index[event_type][act_id_key]:
0198                         del self._events_act_id_index[event_type][act_id_key]
0199 
0200     def send(self, event):
0201         with self._lock:
0202             self.insert_event(event)
0203             self.clean_events()
0204 
0205     def send_bulk(self, events):
0206         with self._lock:
0207             for event in events:
0208                 self.insert_event(event)
0209             self.clean_events()
0210 
0211     def get(self, event_type, num_events=1, wait=0, callback=None):
0212         with self._lock:
0213             events = []
0214             for i in range(num_events):
0215                 if event_type in self._events_index and self._events_index[event_type]:
0216                     event_id = self._events_index[event_type].pop(0)
0217                     event = self._events[event_type][event_id]
0218                     event_act_id = event.get_event_id()
0219                     self._events_history[event_type][event_act_id] = time.time()
0220                     del self._events[event_type][event_id]
0221                     del self._events_insert_time[event._event_type][event._id]
0222                     events.append(event)
0223             if callback:
0224                 for event in events:
0225                     callback(event)
0226 
0227             return events
0228 
0229 
0230 class MsgEventBusBackend(BaseEventBusBackend):
0231     """
0232     Msg Event Bus Backend
0233     """
0234 
0235     def __init__(self, logger=None, coordinator_port=5556, socket_timeout=10, debug=False,
0236                  timeout_threshold=5, failure_threshold=5, failure_timeout=180,
0237                  num_of_set_failed_at_threshold=10, connection_retries=3, **kwargs):
0238         super(MsgEventBusBackend, self).__init__()
0239         self._id = str(uuid.uuid4())[:8]
0240         self._state_claim_wait = 60
0241         self._state_claim = StateClaimEvent(self._id, EventBusState.New, time.time())
0242 
0243         self.graceful_stop = threading.Event()
0244 
0245         self._lock = threading.RLock()
0246 
0247         self.max_delay = 180
0248 
0249         self._username = 'idds'
0250         self._password = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(20))
0251 
0252         self._is_ok = True
0253         self._is_bad = False
0254         self._failed_at = None
0255         self._failure_timeout = int(failure_timeout)
0256         self._num_of_set_failed_at = 0
0257         self._num_of_set_failed_at_threshold = int(num_of_set_failed_at_threshold)
0258         self.num_success = 0
0259         self.num_failures = 0
0260         self.num_timeout = 0
0261         self.cache_events = []
0262 
0263         self.setup_logger(logger)
0264 
0265         self.socket_timeout = int(socket_timeout)
0266         self.timeout_threshold = int(timeout_threshold)
0267         self.failure_threshold = int(failure_threshold)
0268 
0269         self.coordinator_port = int(coordinator_port)
0270         self.context = None
0271         self.auth = None
0272         self.coordinator_socket = None
0273         self.coordinator_con_string = None
0274 
0275         self.processor = None
0276 
0277         self.manager = None
0278         self.manager_socket = None
0279 
0280         self.debug = debug
0281 
0282         self.connection_retries = connection_retries
0283 
0284         self.init_msg_channel()
0285 
0286     def setup_logger(self, logger=None):
0287         """
0288         Setup logger
0289         """
0290         if logger:
0291             self.logger = logger
0292         else:
0293             self.logger = logging.getLogger(self.get_class_name())
0294 
0295     def get_class_name(self):
0296         return self.__class__.__name__
0297 
0298     def stop(self, signum=None, frame=None):
0299         self.logger.debug("graceful stop")
0300         self.graceful_stop.set()
0301         if self.auth:
0302             self.logger.debug("auth stop")
0303             self.auth.stop()
0304 
0305     def init_msg_channel(self):
0306         with self._lock:
0307             for i in range(self.connection_retries):
0308                 try:
0309                     if not self.context:
0310                         self.context = zmq.Context()
0311                         if self.auth:
0312                             self.auth.stop()
0313 
0314                         self.auth = ThreadAuthenticator(self.context)
0315                         self.auth.start()
0316                         # self.auth.allow('127.0.0.1')
0317                         self.auth.allow()
0318                         # Instruct authenticator to handle PLAIN requests
0319                         self.auth.configure_plain(domain='*', passwords={self._username: self._password})
0320 
0321                     if not self.coordinator_socket or self.coordinator_socket.closed:
0322                         self.coordinator_socket = self.context.socket(zmq.REP)
0323                         self.coordinator_socket.plain_server = True
0324                         self.coordinator_socket.bind("tcp://*:%s" % self.coordinator_port)
0325 
0326                         hostname = socket.getfqdn()
0327                         self.coordinator_con_string = "tcp://%s:%s" % (hostname, self.coordinator_port)
0328 
0329                         if self.processor:
0330                             self.processor.stop()
0331 
0332                         self.processor = MsgEventBusBackendReceiver(logger=self.logger,
0333                                                                     graceful_stop=self.graceful_stop,
0334                                                                     debug=self.debug,
0335                                                                     coordinator_socket=self.coordinator_socket,
0336                                                                     coordinator=self.coordinator)
0337                         self.processor.start()
0338 
0339                     self._is_bad = False
0340                     # break from the loop
0341                     break
0342                 except (zmq.error.ZMQError, zmq.Again) as error:
0343                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0344                     self.num_failures += 1
0345                     self._is_bad = True
0346                     if 'Address already in use' in str(error):
0347                         self.coordinator_port = self.coordinator_port + random.randint(1, 100)
0348                         self.logger.info(f"Address already in use, switch to new port: {self.coordinator_port}")
0349                 except Exception as error:
0350                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0351                     self.num_failures += 1
0352                     self._is_bad = True
0353 
0354             try:
0355                 if not self.manager_socket or self.manager_socket.closed:
0356                     manager = self.get_manager()
0357                     if manager['username'] and manager['password'] and manager['connect']:
0358                         self.manager_socket = self.context.socket(zmq.REQ)
0359                         self.manager_socket.plain_username = manager['username'].encode('utf-8')
0360                         self.manager_socket.plain_password = manager['password'].encode('utf-8')
0361                         self.manager_socket.connect(manager['connect'])
0362                     else:
0363                         self._is_bad = True
0364             except (zmq.error.ZMQError, zmq.Again) as error:
0365                 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0366                 self.num_failures += 1
0367                 self._is_bad = True
0368             except Exception as error:
0369                 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0370                 self.num_failures += 1
0371                 self._is_bad = True
0372 
0373     def set_manager(self, manager):
0374         if not manager:
0375             manager = self.get_manager()
0376 
0377         if (not self.manager and not manager['connect'] and not manager['username'] and not manager['password']):
0378             if (self.manager['connect'] != manager['connect']
0379                 or self.manager['username'] != manager['username']                        # noqa W503, E129
0380                 or self.manager['password'] != manager['password']):                      # noqa W503, E129
0381                 with self._lock:
0382                     try:
0383                         self.manager = manager
0384                         self.manager_socket = self.context.socket(zmq.REQ)
0385                         self.manager_socket.plain_username = manager['username'].encode('utf-8')
0386                         self.manager_socket.plain_password = manager['password'].encode('utf-8')
0387                         self.manager_socket.connect(manager['connect'])
0388                     except (zmq.error.ZMQError, zmq.Again) as error:
0389                         self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0390                         self.num_failures += 1
0391                     except Exception as error:
0392                         self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0393                         self.num_failures += 1
0394 
0395     def get_manager(self, myself=False):
0396         if myself:
0397             if self.coordinator_con_string and self._username and self._password:
0398                 manager = {'connect': self.coordinator_con_string,
0399                            'username': self._username,
0400                            'password': self._password}
0401                 return manager
0402 
0403         if (self.manager and self.manager['connect'] and self.manager['username'] and self.manager['password']):
0404             return self.manager
0405 
0406         if self.coordinator_con_string and self._username and self._password:
0407             manager = {'connect': self.coordinator_con_string,
0408                        'username': self._username,
0409                        'password': self._password}
0410             return manager
0411         return None
0412 
0413     def set_coordinator(self, coordinator):
0414         self.coordinator = coordinator
0415         if self.processor:
0416             self.processor.set_coordinator(coordinator)
0417 
0418     def get_coordinator(self):
0419         return self.coordinator
0420 
0421     def send(self, event):
0422         with self._lock:
0423             try:
0424                 req = {'type': 'send_event', 'event': event}
0425                 req = json_dumps(req)
0426                 # self.logger.debug("send:send %s" % req)
0427                 if self.debug:
0428                     self.logger.debug("MsgEventBusBackend send event: %s" % req)
0429 
0430                 if not self.manager_socket or self.manager_socket.closed:
0431                     self.init_msg_channel()
0432 
0433                 self.manager_socket.send_string(req)
0434                 if self.manager_socket.poll(self.socket_timeout * 1000):
0435                     reply = self.manager_socket.recv_string()
0436                     # self.logger.debug("send:recv %s" % reply)
0437                     if self.debug:
0438                         self.logger.debug("MsgEventBusBackend send event reply: %s" % reply)
0439                     reply = json_loads(reply)
0440                     ret = reply['ret']
0441 
0442                     # refresh failures when there are successful requests
0443                     self.num_failures = 0
0444                     self.num_timeout = 0
0445                     self.num_success += 1
0446                 else:
0447                     ret = None
0448                     self.cache_events.append(event)
0449                     self.num_timeout += 1
0450                     self.logger.critical("timeout to receive a message")
0451 
0452                 return ret
0453             except (zmq.error.ZMQError, zmq.Again) as error:
0454                 if not self.graceful_stop.is_set():
0455                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0456                 self.manager_socket.close()
0457                 self.cache_events.append(event)
0458                 self.num_failures += 1
0459             except Exception as error:
0460                 if not self.graceful_stop.is_set():
0461                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0462                 self.manager_socket.close()
0463                 self.cache_events.append(event)
0464                 self.num_failures += 1
0465 
0466     def send_bulk(self, events):
0467         with self._lock:
0468             try:
0469                 req = {'type': 'send_bulk', 'events': events}
0470                 req = json_dumps(req)
0471                 # self.logger.debug("send:send %s" % req)
0472                 if self.debug:
0473                     self.logger.debug("MsgEventBusBackend send bulk event: %s" % req)
0474 
0475                 if not self.manager_socket or self.manager_socket.closed:
0476                     self.init_msg_channel()
0477 
0478                 self.manager_socket.send_string(req)
0479                 if self.manager_socket.poll(self.socket_timeout * 1000):
0480                     reply = self.manager_socket.recv_string()
0481                     # self.logger.debug("send:recv %s" % reply)
0482                     if self.debug:
0483                         self.logger.debug("MsgEventBusBackend send bulk event reply: %s" % reply)
0484                     reply = json_loads(reply)
0485                     ret = reply['ret']
0486 
0487                     # refresh failures when there are successful requests
0488                     self.num_failures = 0
0489                     self.num_timeout = 0
0490                     self.num_success += 1
0491                 else:
0492                     ret = None
0493                     for event in events:
0494                         self.cache_events.append(event)
0495                     self.num_timeout += 1
0496                     self.logger.critical("timeout to receive a message")
0497 
0498                 return ret
0499             except (zmq.error.ZMQError, zmq.Again) as error:
0500                 if not self.graceful_stop.is_set():
0501                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0502                 self.manager_socket.close()
0503                 for event in events:
0504                     self.cache_events.append(event)
0505                 self.num_failures += 1
0506             except Exception as error:
0507                 if not self.graceful_stop.is_set():
0508                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0509                 self.manager_socket.close()
0510                 for event in events:
0511                     self.cache_events.append(event)
0512                 self.num_failures += 1
0513 
0514     def get(self, event_type, num_events=1, wait=0):
0515         with self._lock:
0516             try:
0517                 req = {'type': 'get_event', 'event_type': event_type, 'num_events': num_events, 'wait': wait}
0518                 req = json_dumps(req)
0519                 # self.logger.debug("get:send %s" % req)
0520 
0521                 if self.debug:
0522                     self.logger.debug("MsgEventBusBackend get event: %s" % req)
0523 
0524                 if not self.manager_socket or self.manager_socket.closed:
0525                     self.init_msg_channel()
0526 
0527                 self.manager_socket.send_string(req)
0528 
0529                 if self.manager_socket.poll(10 * 1000):
0530                     reply = self.manager_socket.recv_string()
0531                     # self.logger.debug("send:recv %s" % reply)
0532                     if self.debug:
0533                         self.logger.debug("MsgEventBusBackend get event reply: %s" % reply)
0534                     reply = json_loads(reply)
0535                     ret = reply['ret']
0536 
0537                     # refresh failures when there are successful requests
0538                     self.num_failures = 0
0539                     self.num_success += 1
0540                     self.num_timeout = 0
0541                 else:
0542                     ret = None
0543                     self.num_timeout += 1
0544                     self.logger.critical("timeout to receive a message")
0545 
0546                 return ret
0547             except (zmq.error.ZMQError, zmq.Again) as error:
0548                 if not self.graceful_stop.is_set():
0549                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0550                 self.manager_socket.close()
0551                 self.num_failures += 1
0552             except Exception as error:
0553                 if not self.graceful_stop.is_set():
0554                     self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0555                 self.manager_socket.close()
0556                 self.num_failures += 1
0557         return []
0558 
0559     def test(self):
0560         if self.num_failures > 0 or self.num_timeout > 0:
0561             event = TestEvent()
0562             self.send(event)
0563             self.get(event._event_type)
0564         if self.num_timeout > 5:
0565             if not self.manager_socket.closed:
0566                 self.logger.critical("The number of timeout reached a threshold, close connection.")
0567                 self.manager_socket.close()
0568 
0569     def send_report(self, event, status, start_time, end_time, source, result):
0570         if self.get_coordinator():
0571             return self.get_coordinator().send_report(event, status, start_time, end_time, source, result)
0572 
0573     def clean_event(self, event):
0574         pass
0575 
0576     def fail_event(self, event):
0577         pass
0578 
0579     def is_ok(self):
0580         if self._is_bad:
0581             self._is_ok = False
0582         elif self._num_of_set_failed_at < self._num_of_set_failed_at_threshold and self._failed_at and self._failed_at + self._failure_timeout < time.time():
0583             self._is_ok = True
0584             self._failed_at = None
0585             self.num_failures = 0
0586             self.num_timeout = 0
0587         elif self.num_failures > self.failure_threshold or self.num_timeout > self.timeout_threshold:
0588             self._is_ok = False
0589             if not self._failed_at:
0590                 self._failed_at = time.time()
0591                 self._num_of_set_failed_at += 1
0592         else:
0593             self._is_ok = True
0594         return self._is_ok
0595 
0596     def replay_cache_events(self):
0597         cache_events = self.cache_events
0598         self.cache_events = []
0599         for event in cache_events:
0600             self.send(event)
0601 
0602     def execute(self):
0603         while not self.graceful_stop.is_set():
0604             try:
0605                 self.init_msg_channel()
0606                 self.test()
0607                 if self.is_ok():
0608                     self.replay_cache_events()
0609                     self.graceful_stop.wait(1)
0610                 else:
0611                     if self.num_failures > 20:
0612                         self.graceful_stop.wait(300)
0613                     else:
0614                         self.graceful_stop.wait(60)
0615             except Exception as error:
0616                 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0617         self.stop()
0618 
0619     def run(self):
0620         self.execute()