Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 09:13:20

0001 #include "JANA/JEventSource.h"
0002 #include "JANA/Topology/JArrow.h"
0003 #include "JANA/Utils/JEventLevel.h"
0004 #include <JANA/Topology/JMultilevelSourceArrow.h>
0005 
0006 
0007 void JMultilevelSourceArrow::SetEventSource(JEventSource* source) {
0008     m_source = source;
0009     m_levels = source->GetEventLevels();
0010     m_child_event_level = m_levels.back();
0011     m_next_input_port = 0;
0012 
0013     size_t input_port_count = 0;
0014     size_t output_port_count = 0;
0015     for (auto level : m_levels) {
0016         m_port_lookup[{level, Direction::In}] = input_port_count++;
0017     }
0018     for (auto level : m_levels) {
0019         m_port_lookup[{level, Direction::Out}] = input_port_count + output_port_count++;
0020     }
0021 
0022     create_ports(input_port_count, output_port_count);
0023 }
0024 
0025 const std::vector<JEventLevel>& JMultilevelSourceArrow::GetLevels() const {
0026     return m_levels;
0027 }
0028 
0029 size_t JMultilevelSourceArrow::GetPortIndex(JEventLevel level, Direction direction) const {
0030     return m_port_lookup.at({level, direction});
0031 };
0032 
0033 void JMultilevelSourceArrow::initialize() {
0034     // We initialize everything immediately, but don't open any resources until we absolutely have to; see process(): source->DoNext()
0035     m_source->DoInit();
0036 }
0037 
0038 void JMultilevelSourceArrow::finalize() {
0039     // Generally JEventSources finalize themselves as soon as they detect that they have run out of events.
0040     // However, we can't rely on the JEventSources turning themselves off since execution can be externally paused.
0041     // Instead we leave everything open until we finalize the whole topology, and finalize remaining event sources then.
0042     m_source->DoClose();
0043 }
0044 
0045 void JMultilevelSourceArrow::EvictNextParent(OutputData& outputs, size_t& output_count) {
0046     // This is a little bit tricky: Ideally we would be able to constrain max_inflight_events:$PARENT_LEVEL to be 1, 
0047     // which would essentially behave like (nicer) barrier events. However, if there is only 1 event
0048     // inflight at $PARENT_LEVEL, we have to evict immediately so that we don't deadlock.
0049 
0050     auto it = m_pending_parents.find(m_next_input_level);
0051     if (it != m_pending_parents.end()) {
0052         if (it->second.first != nullptr) {
0053             // There IS an old parent
0054             size_t parent_output_port = GetPortIndex(m_next_input_level, Direction::Out);
0055             LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Evicting parent " << it->second.first->GetEventStamp() << " to port " << parent_output_port;
0056             outputs.at(output_count++) = {it->second.first, parent_output_port};
0057             it->second.first = nullptr;
0058         }
0059     }
0060 }
0061 
0062 void JMultilevelSourceArrow::fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0063 
0064     if (!m_finish_in_progress) {
0065 
0066         LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END;
0067         auto result = m_source->DoNext(input->shared_from_this());
0068         m_next_input_level = m_source->GetNextInputLevel();
0069         m_next_input_port = GetPortIndex(m_next_input_level, Direction::In);
0070 
0071         LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Returned from DoNext(" << toString(input->GetLevel()) << "). Next input level is " << toString(m_next_input_level);
0072 
0073         if (result == JEventSource::Result::Success) {
0074             // We have a newly filled event we have to do something with
0075 
0076             if (input->GetLevel() == m_child_event_level) {
0077                 // We acquired a child! Attach it to its parents and push it into the big wide world
0078 
0079                 for (auto [level, parent_pair] : m_pending_parents) {
0080                     // Note that this only attaches parents that we already, so if the parents arrive in the wrong order they
0081                     // will just be missing. If this is expected behavior, you'll need to set your downstream parent inputs to be optional.
0082                     if (parent_pair.first != nullptr) {
0083                         LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Attaching parent: " << parent_pair.first->GetEventStamp() << " to event " << input->GetEventStamp();
0084                         input->SetParent(parent_pair.first);
0085                     }
0086                 }
0087                 outputs.at(output_count++) = {input, GetPortIndex(m_child_event_level, Direction::Out)};
0088 
0089                 if (m_next_input_level != m_child_event_level) {
0090                     // We have to evict the parent AFTER the successful child because the child still needs the references to that parent
0091                     EvictNextParent(outputs, output_count);
0092                 }
0093                 status = JArrow::FireResult::KeepGoing;
0094                 return;
0095             }
0096             else {
0097                 // We acquired a parent event! It's predecessor SHOULD have already been evicted during the previous call to fire()
0098 
0099                 if (m_next_input_level != m_child_event_level) {
0100                     // Evict the _subsequent_ parent
0101                     EvictNextParent(outputs, output_count);
0102                 }
0103 
0104                 auto it = m_pending_parents.find(input->GetLevel());
0105                 if (it != m_pending_parents.end()) {
0106                     // There IS an old parent
0107                     if (it->second.first != nullptr) { 
0108                         throw JException("Found a parent event we weren't expecting"); 
0109                     }
0110                     it->second.first = input;
0111                     it->second.second += 1; // Increment parent count
0112                     status = JArrow::FireResult::KeepGoing;
0113                     return;
0114                 }
0115                 else {
0116                     m_pending_parents[input->GetLevel()] = {input, 1};
0117                     status = JArrow::FireResult::KeepGoing;
0118                     return;
0119                 }
0120             }
0121         }
0122         else if (result == JEventSource::Result::FailureTryAgain) {
0123             if (m_next_input_level != m_child_event_level) {
0124                 EvictNextParent(outputs, output_count);
0125             }
0126             // Return this event to the pool with no further action
0127             outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)};
0128             status = JArrow::FireResult::ComeBackLater;
0129             return;
0130         }
0131         else if (result == JEventSource::Result::FailureLevelChange) {
0132             if (m_next_input_level != m_child_event_level) {
0133                 EvictNextParent(outputs, output_count);
0134             }
0135             // Return this input event to the pool
0136             outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)};
0137 
0138             status = JArrow::FireResult::KeepGoing;
0139             return;
0140         }
0141         else if (result == JEventSource::Result::FailureFinished) {
0142             // Return this input event to the pool
0143             outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)};
0144             m_finish_in_progress = true;
0145             // Fall-through to if (finish_in_progress) below
0146         }
0147     }
0148     // At this point the only thing left to do is to evict ALL parents using as many fire() calls as necessary
0149 
0150     bool no_more_parents = false;
0151     while (output_count < 2 && !no_more_parents) {
0152         // We improvise our own m_pending_parents.pop()
0153         auto it = m_pending_parents.begin();
0154         if (it != m_pending_parents.end()) {
0155             // Found a parent
0156             auto parent = it->second.first;
0157             if (parent != nullptr) {
0158                 outputs.at(output_count++) = {parent, GetPortIndex(parent->GetLevel(), Direction::Out)};
0159             }
0160             m_pending_parents.erase(it);
0161         }
0162         else {
0163             no_more_parents = true;
0164         }
0165     }
0166     status = (no_more_parents) ? JArrow::FireResult::Finished : JArrow::FireResult::KeepGoing;
0167     return;
0168 }