Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-11-03 10:10:17

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #include <JANA/JApplication.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/Topology/JEventSourceArrow.h>
0009 
0010 
0011 
0012 JEventSourceArrow::JEventSourceArrow(std::string name, std::vector<JEventSource*> sources)
0013     : m_sources(sources) {
0014     set_name(name);
0015     set_is_source(true);
0016     create_ports(1, 1);
0017     m_ports[EVENT_OUT].establishes_ordering = true;
0018     // All event sources establish their own ordering by default,
0019     // which is sufficient for the kinds of topologies we can create
0020     // using JTopologyBuilder. In the future we may need something
0021     // more sophisticated. Note that establishing the ordering here is a
0022     // no-op if there are no components in the topology that need it enforced,
0023     // so it doesn't hurt NUMA performance.
0024 }
0025 
0026 
0027 void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0028 
0029     LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END;
0030 
0031     // First check to see if we need to handle a barrier event before attempting to emit another event
0032     if (m_barrier_active) {
0033 
0034         auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount();
0035         auto finished_event_count = m_sources[m_current_source]->GetProcessedEventCount();
0036 
0037         // A barrier event has been emitted by the source.
0038         if (m_pending_barrier_event != nullptr) {
0039 
0040             // This barrier event is pending until the topology drains
0041             if ((emitted_event_count - finished_event_count) == 1) {
0042                 LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END;
0043 
0044                 // Topology has drained; only remaining in-flight event is the barrier event itself,
0045                 // which we have held on to until now
0046                 outputs[0] = {m_pending_barrier_event, 1};
0047                 output_count = 1;
0048                 status = JArrow::FireResult::KeepGoing;
0049 
0050                 m_pending_barrier_event = nullptr;
0051                 // There's not much for the thread team to do while the barrier event makes its way through the topology.
0052                 // Eventually we might be able to use this to communicate to the scheduler to not wake threads whose only
0053                 // available action is to hammer the JEventSourceArrow
0054                 return;
0055             }
0056             else {
0057                 // Topology has _not_ finished draining, all we can do is wait
0058                 LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END;
0059                 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0060 
0061                 assert(event == nullptr);
0062                 output_count = 0;
0063                 status = JArrow::FireResult::ComeBackLater;
0064                 return;
0065             }
0066         }
0067         else {
0068             // This barrier event has already been sent into the topology and we need to wait
0069             // until it is finished before emitting any more events
0070             if (finished_event_count == emitted_event_count) {
0071 
0072                 // Barrier event has finished.
0073                 LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END;
0074                 m_barrier_active = false;
0075                 m_next_input_port = 0;
0076 
0077                 output_count = 0;
0078                 status = JArrow::FireResult::KeepGoing;
0079                 return;
0080             }
0081             else {
0082                 // Barrier event has NOT finished
0083                 LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on in-flight barrier event" << LOG_END;
0084                 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0085                 assert(event == nullptr);
0086                 output_count = 0;
0087                 status = JArrow::FireResult::ComeBackLater;
0088                 return;
0089             }
0090         }
0091     }
0092 
0093     while (m_current_source < m_sources.size()) {
0094 
0095         auto source_status = m_sources[m_current_source]->DoNext(event->shared_from_this());
0096 
0097         if (source_status == JEventSource::Result::FailureFinished) {
0098             LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureFinished"<< LOG_END;
0099             m_current_source++;
0100             // TODO: Adjust nskip and nevents for the new source
0101         }
0102         else if (source_status == JEventSource::Result::FailureTryAgain){
0103             // This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater
0104             LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0105             outputs[0] = {event, 0}; // Reject
0106             output_count = 1;
0107             status = JArrow::FireResult::ComeBackLater;
0108             return;
0109         }
0110         else if (event->GetSequential()){
0111             // Source succeeded, but returned a barrier event
0112             LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, holding back barrier event# " << event->GetEventNumber() << LOG_END;
0113             m_pending_barrier_event = event;
0114             m_barrier_active = true;
0115             m_next_input_port = -1; // Stop popping events from the input queue until barrier event has finished
0116             
0117             // Arrow hangs on to the barrier event until the topology fully drains
0118             output_count = 0;
0119             status = JArrow::FireResult::KeepGoing; // Mysteriously livelocks if we set this to ComeBackLater??
0120             return;
0121         }
0122         else {
0123             // Source succeeded, did NOT return a barrier event
0124             LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, emitting event# " << event->GetEventNumber() << LOG_END;
0125             outputs[0] = {event, 1}; // SUCCESS!
0126             output_count = 1;
0127             status = JArrow::FireResult::KeepGoing;
0128             return;
0129         }
0130     }
0131 
0132     // All event sources have finished now
0133     outputs[0] = {event, 0}; // Reject event
0134     output_count = 1;
0135     status = JArrow::FireResult::Finished;
0136 }
0137 
0138 void JEventSourceArrow::initialize() {
0139     // We initialize everything immediately, but don't open any resources until we absolutely have to; see process(): source->DoNext()
0140     for (JEventSource* source : m_sources) {
0141         source->DoInit();
0142         LOG_INFO(m_logger) << "Initialized JEventSource '" << source->GetTypeName() << "' ('" << source->GetResourceName() << "')" << LOG_END;
0143     }
0144 }
0145 
0146 void JEventSourceArrow::finalize() {
0147     // Generally JEventSources finalize themselves as soon as they detect that they have run out of events.
0148     // However, we can't rely on the JEventSources turning themselves off since execution can be externally paused.
0149     // Instead we leave everything open until we finalize the whole topology, and finalize remaining event sources then.
0150     for (JEventSource* source : m_sources) {
0151         source->DoClose();
0152     }
0153 }