Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 09:13:20

0001 // Copyright 2025, Jefferson Science Associates, LLC.
0002 // Subject to the terms in the LICENSE file found in the top-level directory.
0003 // Author: Nathan Brei
0004 
0005 #include "JANA/JEvent.h"
0006 #include "JANA/Utils/JEventLevel.h"
0007 #include <JANA/Topology/JEventPool.h>
0008 
0009 
0010 JEventPool::JEventPool(std::shared_ptr<JComponentManager> component_manager,
0011                        size_t max_inflight_events,
0012                        size_t location_count,
0013                        JEventLevel level)
0014         : JEventQueue(max_inflight_events, location_count)
0015         , m_component_manager(component_manager)
0016         , m_level(level) {
0017 
0018     // Create JEvents and distribute them among the pools.
0019     m_owned_events.reserve(max_inflight_events);
0020     for (size_t evt_idx=0; evt_idx<max_inflight_events; evt_idx++) {
0021 
0022         m_owned_events.push_back(std::make_shared<JEvent>());
0023         auto evt = &m_owned_events.back(); 
0024         (*evt)->SetLevel(m_level); // Level needs to be set before factories get added in configure_event
0025         m_component_manager->configure_event(**evt);
0026         Push(evt->get(), evt_idx % location_count);
0027     }
0028 }
0029 
0030 void JEventPool::AttachForwardingPool(JEventPool* pool) {
0031     m_parent_pools[pool->m_level] = pool;
0032 }
0033 
0034 void JEventPool::Scale(size_t capacity) {
0035     auto old_capacity = m_capacity;
0036     if (capacity < old_capacity) {
0037         // Downscaling a JEventPool is tricky because even draining the topology
0038         // doesn't guarantee that all JEvents have been returned to the JEventPools by
0039         // the time the topology pauses. Consider JEventUnfolder, which may very well hold
0040         // on to a child event that it won't need because the parent event source has 
0041         // paused or finished.
0042         // For now we don't reduce the size of the pools or queues because there's little
0043         // penalty to keeping them around (the main penalty comes from event creation time 
0044         // and memory footprint, but if we are downscaling we already paid that)
0045         return;
0046     }
0047 
0048     // Resize queues to fit new capacity
0049     m_capacity = capacity;
0050     for (auto& local_queue: m_local_queues) {
0051         local_queue->ringbuffer.resize(capacity, nullptr);
0052         local_queue->capacity = capacity;
0053     }
0054 
0055     // Create new JEvents, add to owned_events, and distribute to queues
0056     m_owned_events.reserve(capacity);
0057 
0058     for (size_t evt_idx=old_capacity; evt_idx<capacity; evt_idx++) {
0059         m_owned_events.push_back(std::make_shared<JEvent>());
0060         auto evt = &m_owned_events.back(); 
0061         (*evt)->SetLevel(m_level); // Level needs to be set before factories get added in configure_event
0062         m_component_manager->configure_event(**evt);
0063         Push(evt->get(), evt_idx % GetLocationCount());
0064     }
0065 }
0066 
0067 void JEventPool::Ingest(JEvent* event, size_t location) {
0068 
0069     // Check if event even belongs here. If not, forward to the correct pool
0070     // This is necessary for interleaved events
0071     auto incoming_event_level = event->GetLevel();
0072     if (incoming_event_level != m_level) {
0073         //LOG << "Pool " << toString(m_level) << " forwarding event " << event->GetEventNumber() << " to parent pool " << toString(event->GetLevel());
0074         m_parent_pools.at(incoming_event_level)->Ingest(event, location);
0075         return;
0076     }
0077 
0078     // Detach and foward parents
0079     auto finished_parents = event->ReleaseAllParents();
0080     // TODO: I'd prefer to not have to do an allocation each time, but this will work for now
0081     for (auto* parent : finished_parents) {
0082         //LOG << "JEventPool::Ingest: Found finished parent of level " << toString(parent->GetLevel());
0083         m_parent_pools.at(parent->GetLevel())->NotifyThatAllChildrenFinished(parent, location);
0084         // TODO: This is likely the wrong location. Obtain from parent event?
0085     }
0086 
0087     if (event->GetChildCount() == 0) {
0088         // There's no way for additional children to appear because Ingest takes the "original" parent
0089         //LOG << "JEventPool::Ingest: " << toString(m_level) << " event is pushed";
0090         Push(event, location);
0091     }
0092     else {
0093         // We've received the original but we can't push it until all children have been pushed to 
0094         // _their_ pools, in which case we push once we receive the notification
0095         //LOG << "JEventPool::Ingest: " << toString(m_level) << " event is pending";
0096         m_pending.insert(event);
0097     }
0098 }
0099 
0100 
0101 void JEventPool::NotifyThatAllChildrenFinished(JEvent* event, size_t location) {
0102     //LOG << "JEventPool::Notify called for level " << toString(m_level);
0103     size_t was_present = m_pending.erase(event);
0104     if (was_present == 1) {
0105         Push(event, location);
0106         //LOG << "JEventPool at level " << toString(m_level) << " has pushed a parent event";
0107     }
0108 }
0109 
0110 
0111 void JEventPool::Finalize() {
0112     for (auto& evt : m_owned_events) {
0113         evt->Finish();
0114     }
0115 }
0116 
0117