Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 07:58:41

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023 - 2025
0010 
0011 import logging
0012 import time
0013 import threading
0014 import traceback
0015 import uuid
0016 
0017 from .event import StateClaimEvent, EventBusState
0018 
0019 
0020 class BaseEventBusBackend(threading.Thread):
0021     """
0022     Base Event Bus Backend
0023     """
0024 
0025     def __init__(self, logger=None, **kwargs):
0026         super(BaseEventBusBackend, self).__init__()
0027         self._id = str(uuid.uuid4())[:8]
0028         self._state_claim_wait = 60
0029         self._state_claim = StateClaimEvent(self._id, EventBusState.New, time.time())
0030 
0031         self.graceful_stop = threading.Event()
0032 
0033         self._events = {}
0034         self._events_index = {}
0035 
0036         self._lock = threading.RLock()
0037 
0038         self.setup_logger(logger)
0039 
0040         self.coordinator = None
0041 
0042     def setup_logger(self, logger=None):
0043         """
0044         Setup logger
0045         """
0046         if logger:
0047             self.logger = logger
0048         else:
0049             self.logger = logging.getLogger(self.get_class_name())
0050 
0051     def get_class_name(self):
0052         return self.__class__.__name__
0053 
0054     def stop(self, signum=None, frame=None):
0055         self.graceful_stop.set()
0056 
0057     def send(self, event):
0058         if self.get_coordinator():
0059             return self.get_coordinator().send(event)
0060         else:
0061             with self._lock:
0062                 if event._event_type not in self._events:
0063                     self._events[event._event_type] = {}
0064                     self._events_index[event._event_type] = []
0065                 self._events[event._event_type][event._id] = event
0066                 self._events_index[event._event_type].append(event._id)
0067 
0068     def send_bulk(self, events):
0069         if self.get_coordinator():
0070             return self.get_coordinator().send_bulk(events)
0071         else:
0072             with self._lock:
0073                 for event in events:
0074                     if event._event_type not in self._events:
0075                         self._events[event._event_type] = {}
0076                         self._events_index[event._event_type] = []
0077                     self._events[event._event_type][event._id] = event
0078                     self._events_index[event._event_type].append(event._id)
0079 
0080     def get(self, event_type, num_events=1, wait=0, callback=None):
0081         if self.get_coordinator():
0082             return self.get_coordinator().get(event_type, num_events=num_events, wait=wait, callback=callback)
0083         else:
0084             with self._lock:
0085                 events = []
0086                 for i in range(num_events):
0087                     if event_type in self._events_index and self._events_index[event_type]:
0088                         event_id = self._events_index[event_type].pop(0)
0089                         event = self._events[event_type][event_id]
0090                         del self._events[event_type][event_id]
0091                         events.append(event)
0092                     else:
0093                         break
0094                 if callback:
0095                     for event in events:
0096                         callback(event)
0097                 return events
0098 
0099     def send_report(self, event, status, start_time, end_time, source, result):
0100         if self.get_coordinator():
0101             return self.get_coordinator().send_report(event, status, start_time, end_time, source, result)
0102 
0103     def clean_event(self, event):
0104         pass
0105 
0106     def fail_event(self, event):
0107         pass
0108 
0109     def set_manager(self, manager):
0110         pass
0111 
0112     def get_manager(self):
0113         return None
0114 
0115     def set_coordinator(self, coordinator):
0116         self.coordinator = coordinator
0117 
0118     def get_coordinator(self):
0119         return self.coordinator
0120 
0121     def is_ok(self):
0122         return True
0123 
0124     def execute(self):
0125         while not self.graceful_stop.is_set():
0126             try:
0127                 self.graceful_stop.wait(0.1)
0128             except Exception as error:
0129                 self.logger.critical("Caught an exception: %s\n%s" % (str(error), traceback.format_exc()))
0130 
0131     def run(self):
0132         self.execute()