File indexing completed on 2026-04-10 07:58:41
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 import logging
0012 import uuid
0013
0014 from idds.common.constants import Sections
0015 from idds.common.config import config_has_section, config_list_options
0016
0017
0018 from .baseeventbusbackendopt import BaseEventBusBackendOpt
0019 from .dbeventbusbackend import DBEventBusBackend
0020 from .msgeventbusbackend import MsgEventBusBackend
0021
0022
0023 class Singleton(object):
0024 _instance = None
0025
0026 def __new__(class_, *args, **kwargs):
0027 if not isinstance(class_._instance, class_):
0028
0029 class_._instance = super(Singleton, class_).__new__(class_)
0030 class_._instance._initialized = False
0031 return class_._instance
0032
0033
0034 class EventBus(Singleton):
0035 """
0036 Event Bus
0037 """
0038
0039 def __init__(self, logger=None):
0040 if not self._initialized:
0041 self._initialized = True
0042
0043 super(EventBus, self).__init__()
0044 self._id = str(uuid.uuid4())[:8]
0045 self.setup_logger(logger)
0046 self.config_section = Sections.EventBus
0047 attrs = self.load_attributes()
0048 self.attrs = attrs
0049 self._backend = None
0050 self._orig_backend = None
0051 self._backup_backend = BaseEventBusBackendOpt(logger=self.logger, **attrs)
0052 if 'backend' in attrs:
0053 if attrs['backend'] == 'message':
0054 self.backend = MsgEventBusBackend(logger=self.logger, **attrs)
0055 elif attrs['backend'] == "database":
0056 if 'to_archive' not in attrs:
0057 attrs['to_archive'] = True
0058 self.backend = DBEventBusBackend(**attrs)
0059 if self.backend is None:
0060 self.backend = BaseEventBusBackendOpt(logger=self.logger, **attrs)
0061 self.logger.info("EventBus backend : %s" % self.backend)
0062 self._orig_backend = self.backend
0063 self.backend.start()
0064
0065 @property
0066 def backend(self):
0067 if self._backend and self._backend == self._orig_backend and not self._backend.is_ok():
0068
0069
0070 self._backend = self._backup_backend
0071 self.logger.critical("Original backend <{self._orig_backend}> failed, switch to use backup backend <{self._backup_backend}>")
0072 elif self._orig_backend and self._orig_backend != self._backend and self._orig_backend.is_ok():
0073 self.logger.critical("Original backend <{self._orig_backend}> is ok, switch back to use it")
0074 self._backend = self._orig_backend
0075
0076 return self._backend
0077
0078 @backend.setter
0079 def backend(self, value):
0080 self._backend = value
0081
0082 def setup_logger(self, logger=None):
0083 """
0084 Setup logger
0085 """
0086 if logger:
0087 self.logger = logger
0088 else:
0089 self.logger = logging.getLogger(self.get_class_name())
0090
0091 def get_class_name(self):
0092 return self.__class__.__name__
0093
0094 def load_attributes(self):
0095 self.logger.info("Loading config for section: %s" % self.config_section)
0096 attrs = {}
0097 if config_has_section(self.config_section):
0098 options = config_list_options(self.config_section)
0099 for option, value in options:
0100 if isinstance(value, str) and value.lower() == 'true':
0101 value = True
0102 if isinstance(value, str) and value.lower() == 'false':
0103 value = False
0104 attrs[option] = value
0105 return attrs
0106
0107 def publish_event(self, event):
0108 self.backend.send(event)
0109
0110 def get_event(self, event_type, num_events=1, wait=5, callback=None):
0111
0112 event = self.backend.get(event_type, num_events=num_events, wait=wait, callback=callback)
0113 return event
0114
0115 def get(self, event_type, num_events=1, wait=5, callback=None):
0116 return self.get_event(event_type, num_events=num_events, wait=wait, callback=callback)
0117
0118 def send(self, event):
0119 return self.publish_event(event)
0120
0121 def send_bulk(self, events):
0122 self.backend.send_bulk(events)
0123
0124 def send_report(self, event, status, start_time, end_time, source, result):
0125 return self.backend.send_report(event, status, start_time, end_time, source, result)
0126
0127 def clean_event(self, event):
0128 self.backend.clean_event(event)
0129
0130 def fail_event(self, event):
0131 self.backend.fail_event(event)
0132
0133 def set_manager(self, manager):
0134 self.backend.set_manager(manager)
0135 if self._orig_backend:
0136 self._orig_backend.set_manager(manager)
0137
0138 def get_manager(self):
0139 if self._orig_backend:
0140 return self._orig_backend.get_manager()
0141 return self.backend.get_manager()
0142
0143 def get_coordinator(self):
0144 return self.backend.get_coordinator()
0145
0146 def set_coordinator(self, coordinator):
0147 self.backend.set_coordinator(coordinator)
0148
0149 def stop(self):
0150 self.backend.stop()