File indexing completed on 2026-04-10 07:58:41
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import logging
0012 import time
0013 import threading
0014 import uuid
0015
0016 from .event import StateClaimEvent, EventBusState
0017 from .baseeventbusbackend import BaseEventBusBackend
0018
0019
0020 class BaseEventBusBackendOpt(BaseEventBusBackend):
0021 """
0022 Base Event Bus Backend
0023 """
0024
0025 def __init__(self, logger=None, **kwargs):
0026 super(BaseEventBusBackendOpt, self).__init__()
0027 self._id = str(uuid.uuid4())[:8]
0028 self._state_claim_wait = 60
0029 self._state_claim = StateClaimEvent(self._id, EventBusState.New, time.time())
0030
0031 self.graceful_stop = threading.Event()
0032
0033 self._events = {}
0034 self._events_index = {}
0035 self._events_act_id_index = {}
0036 self._events_history = {}
0037 self._events_history_clean_time = time.time()
0038 self._events_insert_time = {}
0039 self._lock = threading.RLock()
0040
0041 self.max_delay = 180
0042
0043 self.setup_logger(logger)
0044
0045 def setup_logger(self, logger=None):
0046 """
0047 Setup logger
0048 """
0049 if logger:
0050 self.logger = logger
0051 else:
0052 self.logger = logging.getLogger(self.get_class_name())
0053
0054 def get_class_name(self):
0055 return self.__class__.__name__
0056
0057 def insert_event(self, event):
0058 if event._event_type not in self._events:
0059 self._events[event._event_type] = {}
0060 self._events_index[event._event_type] = []
0061 self._events_act_id_index[event._event_type] = {}
0062 self._events_history[event._event_type] = {}
0063 self._events_insert_time[event._event_type] = {}
0064
0065 self.logger.debug("All events: %s" % self._events)
0066
0067 merged = False
0068 event_act_id = event.get_event_id()
0069 if event_act_id not in self._events_act_id_index[event._event_type]:
0070 self._events_act_id_index[event._event_type][event_act_id] = [event._id]
0071 else:
0072 old_event_ids = self._events_act_id_index[event._event_type][event_act_id].copy()
0073 for old_event_id in old_event_ids:
0074 if old_event_id not in self._events[event._event_type]:
0075 self._events_act_id_index[event._event_type][event_act_id].remove(old_event_id)
0076 else:
0077 old_event = self._events[event._event_type][old_event_id]
0078 if event.able_to_merge(old_event):
0079 old_event.merge(event)
0080 self._events[event._event_type][old_event_id] = old_event
0081 self.logger.debug("New event %s is merged to old event %s" % (event, old_event))
0082 merged = True
0083 if not merged:
0084 self._events_act_id_index[event._event_type][event_act_id].append(event._id)
0085
0086 if not merged:
0087 if event_act_id not in self._events_history[event._event_type]:
0088 self._events[event._event_type][event._id] = event
0089 self._events_index[event._event_type].insert(0, event._id)
0090 self._events_insert_time[event._event_type][event._id] = time.time()
0091 self.logger.debug("Insert new event: %s" % event)
0092 else:
0093 hist_time = self._events_history[event._event_type][event_act_id]
0094 insert_loc = len(self._events_index[event._event_type])
0095 q_event_ids = self._events_index[event._event_type].copy()
0096 q_event_ids.reverse()
0097 for q_event_id in q_event_ids:
0098 q_event = self._events[event._event_type][q_event_id]
0099 q_event_act_id = q_event.get_event_id()
0100 if (q_event_act_id not in self._events_history[event._event_type] or self._events_insert_time[event._event_type][q_event_id] + self.max_delay < time.time()):
0101 break
0102 elif self._events_history[event._event_type][q_event_act_id] > hist_time:
0103 insert_loc -= 1
0104 else:
0105 break
0106 self._events[event._event_type][event._id] = event
0107 self._events_index[event._event_type].insert(insert_loc, event._id)
0108 self._events_insert_time[event._event_type][event._id] = time.time()
0109 self.logger.debug("Insert new event: %s" % event)
0110
0111 def clean_events(self):
0112 if self._events_history_clean_time + 3600 * 4 < time.time():
0113 self._events_history_clean_time = time.time()
0114 for event_type in self._events_index:
0115 event_act_ids = []
0116 for event_id in self._events_index[event_type]:
0117 event = self._events[event_type][event_id]
0118 act_id = event.get_event_id()
0119 event_act_ids.append(act_id)
0120
0121 event_history_keys = list(self._events_history[event_type].keys())
0122 for key in event_history_keys:
0123 if key not in event_act_ids:
0124 del self._events_history[event_type][key]
0125
0126 act_id_keys = list(self._events_act_id_index[event_type].keys())
0127 for act_id_key in act_id_keys:
0128 act_id2ids = self._events_act_id_index[event_type][act_id_key].copy()
0129 for q_id in act_id2ids:
0130 if q_id not in self._events_index[event_type]:
0131 self._events_act_id_index[event_type][act_id_key].remove(q_id)
0132 if not self._events_act_id_index[event_type][act_id_key]:
0133 del self._events_act_id_index[event_type][act_id_key]
0134
0135 def send(self, event):
0136 if self.get_coordinator():
0137 return self.get_coordinator().send(event)
0138 else:
0139 with self._lock:
0140 self.insert_event(event)
0141 self.clean_events()
0142
0143 def send_bulk(self, events):
0144 if self.get_coordinator():
0145 return self.get_coordinator().send_bulk(events)
0146 else:
0147 with self._lock:
0148 for event in events:
0149 self.insert_event(event)
0150 self.clean_events()
0151
0152 def get(self, event_type, num_events=1, wait=0, callback=None):
0153 if self.get_coordinator():
0154 return self.get_coordinator().get(event_type, num_events=num_events, wait=wait, callback=callback)
0155 else:
0156 with self._lock:
0157 events = []
0158 for i in range(num_events):
0159 if event_type in self._events_index and self._events_index[event_type]:
0160 event_id = self._events_index[event_type].pop(0)
0161 event = self._events[event_type][event_id]
0162 event_act_id = event.get_event_id()
0163 self._events_history[event_type][event_act_id] = time.time()
0164 del self._events[event_type][event_id]
0165 del self._events_insert_time[event._event_type][event._id]
0166 events.append(event)
0167 else:
0168 break
0169 if callback:
0170 for event in events:
0171 callback(event)
0172 return events