Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 07:58:41

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023 - 2025
0010 
0011 import logging
0012 import time
0013 import threading
0014 import uuid
0015 
0016 from .event import StateClaimEvent, EventBusState
0017 from .baseeventbusbackend import BaseEventBusBackend
0018 
0019 
0020 class BaseEventBusBackendOpt(BaseEventBusBackend):
0021     """
0022     Base Event Bus Backend
0023     """
0024 
0025     def __init__(self, logger=None, **kwargs):
0026         super(BaseEventBusBackendOpt, self).__init__()
0027         self._id = str(uuid.uuid4())[:8]
0028         self._state_claim_wait = 60
0029         self._state_claim = StateClaimEvent(self._id, EventBusState.New, time.time())
0030 
0031         self.graceful_stop = threading.Event()
0032 
0033         self._events = {}
0034         self._events_index = {}
0035         self._events_act_id_index = {}
0036         self._events_history = {}
0037         self._events_history_clean_time = time.time()
0038         self._events_insert_time = {}
0039         self._lock = threading.RLock()
0040 
0041         self.max_delay = 180
0042 
0043         self.setup_logger(logger)
0044 
0045     def setup_logger(self, logger=None):
0046         """
0047         Setup logger
0048         """
0049         if logger:
0050             self.logger = logger
0051         else:
0052             self.logger = logging.getLogger(self.get_class_name())
0053 
0054     def get_class_name(self):
0055         return self.__class__.__name__
0056 
0057     def insert_event(self, event):
0058         if event._event_type not in self._events:
0059             self._events[event._event_type] = {}
0060             self._events_index[event._event_type] = []
0061             self._events_act_id_index[event._event_type] = {}
0062             self._events_history[event._event_type] = {}
0063             self._events_insert_time[event._event_type] = {}
0064 
0065         self.logger.debug("All events: %s" % self._events)
0066 
0067         merged = False
0068         event_act_id = event.get_event_id()
0069         if event_act_id not in self._events_act_id_index[event._event_type]:
0070             self._events_act_id_index[event._event_type][event_act_id] = [event._id]
0071         else:
0072             old_event_ids = self._events_act_id_index[event._event_type][event_act_id].copy()
0073             for old_event_id in old_event_ids:
0074                 if old_event_id not in self._events[event._event_type]:
0075                     self._events_act_id_index[event._event_type][event_act_id].remove(old_event_id)
0076                 else:
0077                     old_event = self._events[event._event_type][old_event_id]
0078                     if event.able_to_merge(old_event):
0079                         old_event.merge(event)
0080                         self._events[event._event_type][old_event_id] = old_event
0081                         self.logger.debug("New event %s is merged to old event %s" % (event, old_event))
0082                         merged = True
0083             if not merged:
0084                 self._events_act_id_index[event._event_type][event_act_id].append(event._id)
0085 
0086         if not merged:
0087             if event_act_id not in self._events_history[event._event_type]:
0088                 self._events[event._event_type][event._id] = event
0089                 self._events_index[event._event_type].insert(0, event._id)
0090                 self._events_insert_time[event._event_type][event._id] = time.time()
0091                 self.logger.debug("Insert new event: %s" % event)
0092             else:
0093                 hist_time = self._events_history[event._event_type][event_act_id]
0094                 insert_loc = len(self._events_index[event._event_type])
0095                 q_event_ids = self._events_index[event._event_type].copy()
0096                 q_event_ids.reverse()
0097                 for q_event_id in q_event_ids:
0098                     q_event = self._events[event._event_type][q_event_id]
0099                     q_event_act_id = q_event.get_event_id()
0100                     if (q_event_act_id not in self._events_history[event._event_type] or self._events_insert_time[event._event_type][q_event_id] + self.max_delay < time.time()):
0101                         break
0102                     elif self._events_history[event._event_type][q_event_act_id] > hist_time:
0103                         insert_loc -= 1
0104                     else:
0105                         break
0106                 self._events[event._event_type][event._id] = event
0107                 self._events_index[event._event_type].insert(insert_loc, event._id)
0108                 self._events_insert_time[event._event_type][event._id] = time.time()
0109                 self.logger.debug("Insert new event: %s" % event)
0110 
0111     def clean_events(self):
0112         if self._events_history_clean_time + 3600 * 4 < time.time():
0113             self._events_history_clean_time = time.time()
0114             for event_type in self._events_index:
0115                 event_act_ids = []
0116                 for event_id in self._events_index[event_type]:
0117                     event = self._events[event_type][event_id]
0118                     act_id = event.get_event_id()
0119                     event_act_ids.append(act_id)
0120 
0121                 event_history_keys = list(self._events_history[event_type].keys())
0122                 for key in event_history_keys:
0123                     if key not in event_act_ids:
0124                         del self._events_history[event_type][key]
0125 
0126                 act_id_keys = list(self._events_act_id_index[event_type].keys())
0127                 for act_id_key in act_id_keys:
0128                     act_id2ids = self._events_act_id_index[event_type][act_id_key].copy()
0129                     for q_id in act_id2ids:
0130                         if q_id not in self._events_index[event_type]:
0131                             self._events_act_id_index[event_type][act_id_key].remove(q_id)
0132                     if not self._events_act_id_index[event_type][act_id_key]:
0133                         del self._events_act_id_index[event_type][act_id_key]
0134 
0135     def send(self, event):
0136         if self.get_coordinator():
0137             return self.get_coordinator().send(event)
0138         else:
0139             with self._lock:
0140                 self.insert_event(event)
0141                 self.clean_events()
0142 
0143     def send_bulk(self, events):
0144         if self.get_coordinator():
0145             return self.get_coordinator().send_bulk(events)
0146         else:
0147             with self._lock:
0148                 for event in events:
0149                     self.insert_event(event)
0150                 self.clean_events()
0151 
0152     def get(self, event_type, num_events=1, wait=0, callback=None):
0153         if self.get_coordinator():
0154             return self.get_coordinator().get(event_type, num_events=num_events, wait=wait, callback=callback)
0155         else:
0156             with self._lock:
0157                 events = []
0158                 for i in range(num_events):
0159                     if event_type in self._events_index and self._events_index[event_type]:
0160                         event_id = self._events_index[event_type].pop(0)
0161                         event = self._events[event_type][event_id]
0162                         event_act_id = event.get_event_id()
0163                         self._events_history[event_type][event_act_id] = time.time()
0164                         del self._events[event_type][event_id]
0165                         del self._events_insert_time[event._event_type][event._id]
0166                         events.append(event)
0167                     else:
0168                         break
0169                 if callback:
0170                     for event in events:
0171                         callback(event)
0172                 return events