File indexing completed on 2026-04-10 07:58:41
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import logging
0012 import time
0013 import threading
0014 import uuid
0015
0016 from idds.common.event import StateClaimEvent, EventBusState
0017 from idds.core import events as core_events
0018
0019 from .baseeventbusbackend import BaseEventBusBackend
0020
0021
0022 class DBEventBusBackend(BaseEventBusBackend):
0023 """
0024 Database Event Bus Backend
0025 """
0026
0027 def __init__(self, logger=None, to_archive=True, **kwargs):
0028 super(DBEventBusBackend, self).__init__()
0029 self._id = str(uuid.uuid4())[:8]
0030 self._state_claim_wait = 60
0031 self._state_claim = StateClaimEvent(self._id, EventBusState.New, time.time())
0032
0033 self.graceful_stop = threading.Event()
0034
0035 self._events = {}
0036 self._events_index = {}
0037 self._events_act_id_index = {}
0038 self._events_history = {}
0039 self._events_history_clean_time = time.time()
0040 self._events_insert_time = {}
0041 self._lock = threading.RLock()
0042
0043 self.max_delay = 180
0044
0045 self.to_archive = to_archive
0046
0047 self.setup_logger(logger)
0048
0049 def setup_logger(self, logger=None):
0050 """
0051 Setup logger
0052 """
0053 if logger:
0054 self.logger = logger
0055 else:
0056 self.logger = logging.getLogger(self.get_class_name())
0057
0058 def get_class_name(self):
0059 return self.__class__.__name__
0060
0061 def stop(self, signum=None, frame=None):
0062 self.graceful_stop.set()
0063
0064 def send(self, event):
0065 ret = core_events.add_event(event)
0066 self.logger.info("add event: %s, ret: %s" % (event, ret))
0067
0068 def send_bulk(self, events):
0069 for event in events:
0070 ret = core_events.add_event(event)
0071 self.logger.info("add event: %s, ret: %s" % (event, ret))
0072
0073 def get(self, event_type, num_events=1, wait=0, callback=None):
0074 events = core_events.get_event_for_processing(event_type=event_type, num_events=num_events)
0075 if callback:
0076 for event in events:
0077 callback(event)
0078
0079 return events
0080
0081 def clean_event(self, event):
0082 core_events.clean_event(event, to_archive=self.to_archive)
0083
0084 def fail_event(self, event):
0085 core_events.fail_event(event, to_archive=self.to_archive)