File indexing completed on 2025-08-28 09:13:20
0001
0002
0003
0004
0005 #include "JANA/JEvent.h"
0006 #include "JANA/Utils/JEventLevel.h"
0007 #include <JANA/Topology/JEventPool.h>
0008
0009
0010 JEventPool::JEventPool(std::shared_ptr<JComponentManager> component_manager,
0011 size_t max_inflight_events,
0012 size_t location_count,
0013 JEventLevel level)
0014 : JEventQueue(max_inflight_events, location_count)
0015 , m_component_manager(component_manager)
0016 , m_level(level) {
0017
0018
0019 m_owned_events.reserve(max_inflight_events);
0020 for (size_t evt_idx=0; evt_idx<max_inflight_events; evt_idx++) {
0021
0022 m_owned_events.push_back(std::make_shared<JEvent>());
0023 auto evt = &m_owned_events.back();
0024 (*evt)->SetLevel(m_level);
0025 m_component_manager->configure_event(**evt);
0026 Push(evt->get(), evt_idx % location_count);
0027 }
0028 }
0029
0030 void JEventPool::AttachForwardingPool(JEventPool* pool) {
0031 m_parent_pools[pool->m_level] = pool;
0032 }
0033
0034 void JEventPool::Scale(size_t capacity) {
0035 auto old_capacity = m_capacity;
0036 if (capacity < old_capacity) {
0037
0038
0039
0040
0041
0042
0043
0044
0045 return;
0046 }
0047
0048
0049 m_capacity = capacity;
0050 for (auto& local_queue: m_local_queues) {
0051 local_queue->ringbuffer.resize(capacity, nullptr);
0052 local_queue->capacity = capacity;
0053 }
0054
0055
0056 m_owned_events.reserve(capacity);
0057
0058 for (size_t evt_idx=old_capacity; evt_idx<capacity; evt_idx++) {
0059 m_owned_events.push_back(std::make_shared<JEvent>());
0060 auto evt = &m_owned_events.back();
0061 (*evt)->SetLevel(m_level);
0062 m_component_manager->configure_event(**evt);
0063 Push(evt->get(), evt_idx % GetLocationCount());
0064 }
0065 }
0066
0067 void JEventPool::Ingest(JEvent* event, size_t location) {
0068
0069
0070
0071 auto incoming_event_level = event->GetLevel();
0072 if (incoming_event_level != m_level) {
0073
0074 m_parent_pools.at(incoming_event_level)->Ingest(event, location);
0075 return;
0076 }
0077
0078
0079 auto finished_parents = event->ReleaseAllParents();
0080
0081 for (auto* parent : finished_parents) {
0082
0083 m_parent_pools.at(parent->GetLevel())->NotifyThatAllChildrenFinished(parent, location);
0084
0085 }
0086
0087 if (event->GetChildCount() == 0) {
0088
0089
0090 Push(event, location);
0091 }
0092 else {
0093
0094
0095
0096 m_pending.insert(event);
0097 }
0098 }
0099
0100
0101 void JEventPool::NotifyThatAllChildrenFinished(JEvent* event, size_t location) {
0102
0103 size_t was_present = m_pending.erase(event);
0104 if (was_present == 1) {
0105 Push(event, location);
0106
0107 }
0108 }
0109
0110
0111 void JEventPool::Finalize() {
0112 for (auto& evt : m_owned_events) {
0113 evt->Finish();
0114 }
0115 }
0116
0117