Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-20 07:58:56

0001 from pandaharvester.harvesterbody.agent_base import AgentBase
0002 from pandaharvester.harvesterconfig import harvester_config
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0005 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007 
0008 # logger
0009 _logger = core_utils.setup_logger("event_feeder")
0010 
0011 
0012 # class to feed events to workers
0013 class EventFeeder(AgentBase):
0014     # constructor
0015     def __init__(self, communicator, queue_config_mapper, single_mode=False):
0016         AgentBase.__init__(self, single_mode)
0017         self.dbProxy = DBProxy()
0018         self.queueConfigMapper = queue_config_mapper
0019         self.communicator = communicator
0020         self.pluginFactory = PluginFactory()
0021 
0022     # main loop
0023     def run(self):
0024         lockedBy = f"eventfeeder-{self.get_pid()}"
0025         while True:
0026             mainLog = self.make_logger(_logger, f"id={lockedBy}", method_name="run")
0027             mainLog.debug("getting workers to feed events")
0028             workSpecsPerQueue = self.dbProxy.get_workers_to_feed_events(
0029                 harvester_config.eventfeeder.maxWorkers, harvester_config.eventfeeder.lockInterval, lockedBy
0030             )
0031             mainLog.debug(f"got {len(workSpecsPerQueue)} queues")
0032             # loop over all workers
0033             for queueName, workSpecList in workSpecsPerQueue.items():
0034                 tmpQueLog = self.make_logger(_logger, f"queue={queueName}", method_name="run")
0035                 # check queue
0036                 if not self.queueConfigMapper.has_queue(queueName):
0037                     tmpQueLog.error("config not found")
0038                     continue
0039                 # get queue
0040                 queueConfig = self.queueConfigMapper.get_queue(queueName)
0041                 if hasattr(queueConfig, "scatteredEvents") and queueConfig.scatteredEvents:
0042                     scattered = True
0043                 else:
0044                     scattered = False
0045                 # get plugin
0046                 messenger = self.pluginFactory.get_plugin(queueConfig.messenger)
0047                 # loop over all workers
0048                 for workSpec in workSpecList:
0049                     tmpLog = core_utils.make_logger(_logger, f"workerID={workSpec.workerID}", method_name="run")
0050                     # lock worker again
0051                     lockedFlag = self.dbProxy.lock_worker_again_to_feed_events(workSpec.workerID, lockedBy)
0052                     if not lockedFlag:
0053                         tmpLog.debug("skipped since locked by another")
0054                         continue
0055                     # get events
0056                     tmpLog.debug("get events")
0057                     tmpStat, events = self.communicator.get_event_ranges(workSpec.eventsRequestParams, scattered, workSpec.get_access_point())
0058                     # failed
0059                     if tmpStat is False:
0060                         tmpLog.error(f"failed to get events with {events}")
0061                         continue
0062                     # lock worker again
0063                     lockedFlag = self.dbProxy.lock_worker_again_to_feed_events(workSpec.workerID, lockedBy)
0064                     if not lockedFlag:
0065                         tmpLog.debug("skipped before feeding since locked by another")
0066                         continue
0067                     tmpStat = messenger.feed_events(workSpec, events)
0068                     # failed
0069                     if tmpStat is False:
0070                         tmpLog.error("failed to feed events")
0071                         continue
0072                     # dump
0073                     for pandaID, eventList in events.items():
0074                         try:
0075                             nRanges = workSpec.eventsRequestParams[pandaID]["nRanges"]
0076                         except Exception:
0077                             nRanges = None
0078                         tmpLog.debug(f"got {len(eventList)} events for PandaID={pandaID} while getting {nRanges} events")
0079                         # disable multi workers
0080                         if workSpec.mapType == WorkSpec.MT_MultiWorkers:
0081                             if len(eventList) == 0 or (nRanges is not None and len(eventList) < nRanges):
0082                                 tmpStat = self.dbProxy.disable_multi_workers(pandaID)
0083                                 if tmpStat == 1:
0084                                     tmpStr = f"disabled MultiWorkers for PandaID={pandaID}"
0085                                     tmpLog.debug(tmpStr)
0086                     # update worker
0087                     workSpec.eventsRequest = WorkSpec.EV_useEvents
0088                     workSpec.eventsRequestParams = None
0089                     workSpec.eventFeedTime = None
0090                     workSpec.eventFeedLock = None
0091                     # update local database
0092                     tmpStat = self.dbProxy.update_worker(workSpec, {"eventFeedLock": lockedBy})
0093                     tmpLog.debug(f"done with {tmpStat}")
0094                 tmpQueLog.debug("done")
0095             mainLog.debug("done")
0096             # check if being terminated
0097             if self.terminated(harvester_config.eventfeeder.sleepTime):
0098                 mainLog.debug("terminated")
0099                 return