Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 07:58:41

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2022 - 2023
0010 
0011 import logging
0012 import uuid
0013 
0014 from idds.common.constants import Sections
0015 from idds.common.config import config_has_section, config_list_options
0016 
0017 # from .localeventbusbackend import LocalEventBusBackend
0018 from .baseeventbusbackendopt import BaseEventBusBackendOpt
0019 from .dbeventbusbackend import DBEventBusBackend
0020 from .msgeventbusbackend import MsgEventBusBackend
0021 
0022 
0023 class Singleton(object):
0024     _instance = None
0025 
0026     def __new__(class_, *args, **kwargs):
0027         if not isinstance(class_._instance, class_):
0028             # class_._instance = object.__new__(class_, *args, **kwargs)
0029             class_._instance = super(Singleton, class_).__new__(class_)
0030             class_._instance._initialized = False
0031         return class_._instance
0032 
0033 
0034 class EventBus(Singleton):
0035     """
0036     Event Bus
0037     """
0038 
0039     def __init__(self, logger=None):
0040         if not self._initialized:
0041             self._initialized = True
0042 
0043             super(EventBus, self).__init__()
0044             self._id = str(uuid.uuid4())[:8]
0045             self.setup_logger(logger)
0046             self.config_section = Sections.EventBus
0047             attrs = self.load_attributes()
0048             self.attrs = attrs
0049             self._backend = None
0050             self._orig_backend = None
0051             self._backup_backend = BaseEventBusBackendOpt(logger=self.logger, **attrs)
0052             if 'backend' in attrs:
0053                 if attrs['backend'] == 'message':
0054                     self.backend = MsgEventBusBackend(logger=self.logger, **attrs)
0055                 elif attrs['backend'] == "database":
0056                     if 'to_archive' not in attrs:
0057                         attrs['to_archive'] = True
0058                     self.backend = DBEventBusBackend(**attrs)
0059             if self.backend is None:
0060                 self.backend = BaseEventBusBackendOpt(logger=self.logger, **attrs)
0061             self.logger.info("EventBus backend : %s" % self.backend)
0062             self._orig_backend = self.backend
0063             self.backend.start()
0064 
0065     @property
0066     def backend(self):
0067         if self._backend and self._backend == self._orig_backend and not self._backend.is_ok():
0068             # self._orig_backend = self._backend
0069             # self._backend = BaseEventBusBackendOpt(logger=self.logger, **self.attrs)
0070             self._backend = self._backup_backend
0071             self.logger.critical("Original backend <{self._orig_backend}> failed, switch to use backup backend <{self._backup_backend}>")
0072         elif self._orig_backend and self._orig_backend != self._backend and self._orig_backend.is_ok():
0073             self.logger.critical("Original backend <{self._orig_backend}> is ok, switch back to use it")
0074             self._backend = self._orig_backend
0075             # self._orig_backend = None
0076         return self._backend
0077 
0078     @backend.setter
0079     def backend(self, value):
0080         self._backend = value
0081 
0082     def setup_logger(self, logger=None):
0083         """
0084         Setup logger
0085         """
0086         if logger:
0087             self.logger = logger
0088         else:
0089             self.logger = logging.getLogger(self.get_class_name())
0090 
0091     def get_class_name(self):
0092         return self.__class__.__name__
0093 
0094     def load_attributes(self):
0095         self.logger.info("Loading config for section: %s" % self.config_section)
0096         attrs = {}
0097         if config_has_section(self.config_section):
0098             options = config_list_options(self.config_section)
0099             for option, value in options:
0100                 if isinstance(value, str) and value.lower() == 'true':
0101                     value = True
0102                 if isinstance(value, str) and value.lower() == 'false':
0103                     value = False
0104                 attrs[option] = value
0105         return attrs
0106 
0107     def publish_event(self, event):
0108         self.backend.send(event)
0109 
0110     def get_event(self, event_type, num_events=1, wait=5, callback=None):
0111         # demand_event = DemandEvent(event._event_type, self._id)
0112         event = self.backend.get(event_type, num_events=num_events, wait=wait, callback=callback)
0113         return event
0114 
0115     def get(self, event_type, num_events=1, wait=5, callback=None):
0116         return self.get_event(event_type, num_events=num_events, wait=wait, callback=callback)
0117 
0118     def send(self, event):
0119         return self.publish_event(event)
0120 
0121     def send_bulk(self, events):
0122         self.backend.send_bulk(events)
0123 
0124     def send_report(self, event, status, start_time, end_time, source, result):
0125         return self.backend.send_report(event, status, start_time, end_time, source, result)
0126 
0127     def clean_event(self, event):
0128         self.backend.clean_event(event)
0129 
0130     def fail_event(self, event):
0131         self.backend.fail_event(event)
0132 
0133     def set_manager(self, manager):
0134         self.backend.set_manager(manager)
0135         if self._orig_backend:
0136             self._orig_backend.set_manager(manager)
0137 
0138     def get_manager(self):
0139         if self._orig_backend:
0140             return self._orig_backend.get_manager()
0141         return self.backend.get_manager()
0142 
0143     def get_coordinator(self):
0144         return self.backend.get_coordinator()
0145 
0146     def set_coordinator(self, coordinator):
0147         self.backend.set_coordinator(coordinator)
0148 
0149     def stop(self):
0150         self.backend.stop()