File indexing completed on 2026-04-10 07:58:41
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import logging
0012 import time
0013 import threading
0014 import traceback
0015 import uuid
0016
0017 from .event import StateClaimEvent, EventBusState
0018
0019
0020 class BaseEventBusBackend(threading.Thread):
0021 """
0022 Base Event Bus Backend
0023 """
0024
0025 def __init__(self, logger=None, **kwargs):
0026 super(BaseEventBusBackend, self).__init__()
0027 self._id = str(uuid.uuid4())[:8]
0028 self._state_claim_wait = 60
0029 self._state_claim = StateClaimEvent(self._id, EventBusState.New, time.time())
0030
0031 self.graceful_stop = threading.Event()
0032
0033 self._events = {}
0034 self._events_index = {}
0035
0036 self._lock = threading.RLock()
0037
0038 self.setup_logger(logger)
0039
0040 self.coordinator = None
0041
0042 def setup_logger(self, logger=None):
0043 """
0044 Setup logger
0045 """
0046 if logger:
0047 self.logger = logger
0048 else:
0049 self.logger = logging.getLogger(self.get_class_name())
0050
0051 def get_class_name(self):
0052 return self.__class__.__name__
0053
0054 def stop(self, signum=None, frame=None):
0055 self.graceful_stop.set()
0056
0057 def send(self, event):
0058 if self.get_coordinator():
0059 return self.get_coordinator().send(event)
0060 else:
0061 with self._lock:
0062 if event._event_type not in self._events:
0063 self._events[event._event_type] = {}
0064 self._events_index[event._event_type] = []
0065 self._events[event._event_type][event._id] = event
0066 self._events_index[event._event_type].append(event._id)
0067
0068 def send_bulk(self, events):
0069 if self.get_coordinator():
0070 return self.get_coordinator().send_bulk(events)
0071 else:
0072 with self._lock:
0073 for event in events:
0074 if event._event_type not in self._events:
0075 self._events[event._event_type] = {}
0076 self._events_index[event._event_type] = []
0077 self._events[event._event_type][event._id] = event
0078 self._events_index[event._event_type].append(event._id)
0079
0080 def get(self, event_type, num_events=1, wait=0, callback=None):
0081 if self.get_coordinator():
0082 return self.get_coordinator().get(event_type, num_events=num_events, wait=wait, callback=callback)
0083 else:
0084 with self._lock:
0085 events = []
0086 for i in range(num_events):
0087 if event_type in self._events_index and self._events_index[event_type]:
0088 event_id = self._events_index[event_type].pop(0)
0089 event = self._events[event_type][event_id]
0090 del self._events[event_type][event_id]
0091 events.append(event)
0092 else:
0093 break
0094 if callback:
0095 for event in events:
0096 callback(event)
0097 return events
0098
0099 def send_report(self, event, status, start_time, end_time, source, result):
0100 if self.get_coordinator():
0101 return self.get_coordinator().send_report(event, status, start_time, end_time, source, result)
0102
0103 def clean_event(self, event):
0104 pass
0105
0106 def fail_event(self, event):
0107 pass
0108
0109 def set_manager(self, manager):
0110 pass
0111
0112 def get_manager(self):
0113 return None
0114
0115 def set_coordinator(self, coordinator):
0116 self.coordinator = coordinator
0117
0118 def get_coordinator(self):
0119 return self.coordinator
0120
0121 def is_ok(self):
0122 return True
0123
0124 def execute(self):
0125 while not self.graceful_stop.is_set():
0126 try:
0127 self.graceful_stop.wait(0.1)
0128 except Exception as error:
0129 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0130
0131 def run(self):
0132 self.execute()