Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 07:58:41

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2023 - 2025
0010 
0011 import logging
0012 import time
0013 import threading
0014 import uuid
0015 
0016 from idds.common.event import StateClaimEvent, EventBusState
0017 from idds.core import events as core_events
0018 
0019 from .baseeventbusbackend import BaseEventBusBackend
0020 
0021 
0022 class DBEventBusBackend(BaseEventBusBackend):
0023     """
0024     Database Event Bus Backend
0025     """
0026 
0027     def __init__(self, logger=None, to_archive=True, **kwargs):
0028         super(DBEventBusBackend, self).__init__()
0029         self._id = str(uuid.uuid4())[:8]
0030         self._state_claim_wait = 60
0031         self._state_claim = StateClaimEvent(self._id, EventBusState.New, time.time())
0032 
0033         self.graceful_stop = threading.Event()
0034 
0035         self._events = {}
0036         self._events_index = {}
0037         self._events_act_id_index = {}
0038         self._events_history = {}
0039         self._events_history_clean_time = time.time()
0040         self._events_insert_time = {}
0041         self._lock = threading.RLock()
0042 
0043         self.max_delay = 180
0044 
0045         self.to_archive = to_archive
0046 
0047         self.setup_logger(logger)
0048 
0049     def setup_logger(self, logger=None):
0050         """
0051         Setup logger
0052         """
0053         if logger:
0054             self.logger = logger
0055         else:
0056             self.logger = logging.getLogger(self.get_class_name())
0057 
0058     def get_class_name(self):
0059         return self.__class__.__name__
0060 
0061     def stop(self, signum=None, frame=None):
0062         self.graceful_stop.set()
0063 
0064     def send(self, event):
0065         ret = core_events.add_event(event)
0066         self.logger.info("add event: %s, ret: %s" % (event, ret))
0067 
0068     def send_bulk(self, events):
0069         for event in events:
0070             ret = core_events.add_event(event)
0071             self.logger.info("add event: %s, ret: %s" % (event, ret))
0072 
0073     def get(self, event_type, num_events=1, wait=0, callback=None):
0074         events = core_events.get_event_for_processing(event_type=event_type, num_events=num_events)
0075         if callback:
0076             for event in events:
0077                 callback(event)
0078 
0079         return events
0080 
0081     def clean_event(self, event):
0082         core_events.clean_event(event, to_archive=self.to_archive)
0083 
0084     def fail_event(self, event):
0085         core_events.fail_event(event, to_archive=self.to_archive)