File indexing completed on 2026-04-20 07:58:56
0001 from pandaharvester.harvesterbody.agent_base import AgentBase
0002 from pandaharvester.harvesterconfig import harvester_config
0003 from pandaharvester.harvestercore import core_utils
0004 from pandaharvester.harvestercore.db_proxy_pool import DBProxyPool as DBProxy
0005 from pandaharvester.harvestercore.plugin_factory import PluginFactory
0006 from pandaharvester.harvestercore.work_spec import WorkSpec
0007
0008
0009 _logger = core_utils.setup_logger("event_feeder")
0010
0011
0012
0013 class EventFeeder(AgentBase):
0014
0015 def __init__(self, communicator, queue_config_mapper, single_mode=False):
0016 AgentBase.__init__(self, single_mode)
0017 self.dbProxy = DBProxy()
0018 self.queueConfigMapper = queue_config_mapper
0019 self.communicator = communicator
0020 self.pluginFactory = PluginFactory()
0021
0022
0023 def run(self):
0024 lockedBy = f"eventfeeder-{self.get_pid()}"
0025 while True:
0026 mainLog = self.make_logger(_logger, f"id={lockedBy}", method_name="run")
0027 mainLog.debug("getting workers to feed events")
0028 workSpecsPerQueue = self.dbProxy.get_workers_to_feed_events(
0029 harvester_config.eventfeeder.maxWorkers, harvester_config.eventfeeder.lockInterval, lockedBy
0030 )
0031 mainLog.debug(f"got {len(workSpecsPerQueue)} queues")
0032
0033 for queueName, workSpecList in workSpecsPerQueue.items():
0034 tmpQueLog = self.make_logger(_logger, f"queue={queueName}", method_name="run")
0035
0036 if not self.queueConfigMapper.has_queue(queueName):
0037 tmpQueLog.error("config not found")
0038 continue
0039
0040 queueConfig = self.queueConfigMapper.get_queue(queueName)
0041 if hasattr(queueConfig, "scatteredEvents") and queueConfig.scatteredEvents:
0042 scattered = True
0043 else:
0044 scattered = False
0045
0046 messenger = self.pluginFactory.get_plugin(queueConfig.messenger)
0047
0048 for workSpec in workSpecList:
0049 tmpLog = core_utils.make_logger(_logger, f"workerID={workSpec.workerID}", method_name="run")
0050
0051 lockedFlag = self.dbProxy.lock_worker_again_to_feed_events(workSpec.workerID, lockedBy)
0052 if not lockedFlag:
0053 tmpLog.debug("skipped since locked by another")
0054 continue
0055
0056 tmpLog.debug("get events")
0057 tmpStat, events = self.communicator.get_event_ranges(workSpec.eventsRequestParams, scattered, workSpec.get_access_point())
0058
0059 if tmpStat is False:
0060 tmpLog.error(f"failed to get events with {events}")
0061 continue
0062
0063 lockedFlag = self.dbProxy.lock_worker_again_to_feed_events(workSpec.workerID, lockedBy)
0064 if not lockedFlag:
0065 tmpLog.debug("skipped before feeding since locked by another")
0066 continue
0067 tmpStat = messenger.feed_events(workSpec, events)
0068
0069 if tmpStat is False:
0070 tmpLog.error("failed to feed events")
0071 continue
0072
0073 for pandaID, eventList in events.items():
0074 try:
0075 nRanges = workSpec.eventsRequestParams[pandaID]["nRanges"]
0076 except Exception:
0077 nRanges = None
0078 tmpLog.debug(f"got {len(eventList)} events for PandaID={pandaID} while getting {nRanges} events")
0079
0080 if workSpec.mapType == WorkSpec.MT_MultiWorkers:
0081 if len(eventList) == 0 or (nRanges is not None and len(eventList) < nRanges):
0082 tmpStat = self.dbProxy.disable_multi_workers(pandaID)
0083 if tmpStat == 1:
0084 tmpStr = f"disabled MultiWorkers for PandaID={pandaID}"
0085 tmpLog.debug(tmpStr)
0086
0087 workSpec.eventsRequest = WorkSpec.EV_useEvents
0088 workSpec.eventsRequestParams = None
0089 workSpec.eventFeedTime = None
0090 workSpec.eventFeedLock = None
0091
0092 tmpStat = self.dbProxy.update_worker(workSpec, {"eventFeedLock": lockedBy})
0093 tmpLog.debug(f"done with {tmpStat}")
0094 tmpQueLog.debug("done")
0095 mainLog.debug("done")
0096
0097 if self.terminated(harvester_config.eventfeeder.sleepTime):
0098 mainLog.debug("terminated")
0099 return