Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-05-09 09:02:00

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #include <JANA/JApplication.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/Topology/JSourceArrow.h>
0009 
0010 
0011 
0012 JSourceArrow::JSourceArrow(std::string name, JEventLevel level, std::vector<JEventSource*> sources)
0013     : m_sources(sources) {
0014     SetName(name);
0015     SetIsSource(true);
0016     AddPort("in", level).SetSkipFinishEvent(true);
0017     AddPort("out", level).SetEstablishesOrdering(true);
0018 
0019     // All event sources establish their own ordering by default,
0020     // which is sufficient for the kinds of topologies we can create
0021     // using JTopologyBuilder. In the future we may need something
0022     // more sophisticated. Note that establishing the ordering here is a
0023     // no-op if there are no components in the topology that need it enforced,
0024     // so it doesn't hurt NUMA performance.
0025 }
0026 
0027 
0028 void JSourceArrow::Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0029 
0030     LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << LOG_END;
0031 
0032     // First check to see if we need to handle a barrier event before attempting to emit another event
0033     if (m_barrier_active) {
0034 
0035         auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount();
0036         auto finished_event_count = m_sources[m_current_source]->GetProcessedEventCount();
0037 
0038         // A barrier event has been emitted by the source.
0039         if (m_pending_barrier_event != nullptr) {
0040 
0041             // This barrier event is pending until the topology drains
0042             if ((emitted_event_count - finished_event_count) == 1) {
0043                 LOG_DEBUG(m_logger) << "JSourceArrow: Barrier event is in-flight" << LOG_END;
0044 
0045                 // Topology has drained; only remaining in-flight event is the barrier event itself,
0046                 // which we have held on to until now
0047                 outputs[0] = {m_pending_barrier_event, 1};
0048                 output_count = 1;
0049                 status = JArrow::FireResult::KeepGoing;
0050 
0051                 m_pending_barrier_event = nullptr;
0052                 // There's not much for the thread team to do while the barrier event makes its way through the topology.
0053                 // Eventually we might be able to use this to communicate to the scheduler to not wake threads whose only
0054                 // available action is to hammer the JSourceArrow
0055                 return;
0056             }
0057             else {
0058                 // Topology has _not_ finished draining, all we can do is wait
0059                 LOG_DEBUG(m_logger) << "JSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END;
0060                 LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result ComeBackLater"<< LOG_END;
0061 
0062                 assert(event == nullptr);
0063                 output_count = 0;
0064                 status = JArrow::FireResult::ComeBackLater;
0065                 return;
0066             }
0067         }
0068         else {
0069             // This barrier event has already been sent into the topology and we need to wait
0070             // until it is finished before emitting any more events
0071             if (finished_event_count == emitted_event_count) {
0072 
0073                 // Barrier event has finished.
0074                 LOG_DEBUG(m_logger) << "JSourceArrow: Barrier event finished, returning to normal operation" << LOG_END;
0075                 m_barrier_active = false;
0076                 m_next_input_port = 0;
0077 
0078                 output_count = 0;
0079                 status = JArrow::FireResult::KeepGoing;
0080                 return;
0081             }
0082             else {
0083                 // Barrier event has NOT finished
0084                 LOG_DEBUG(m_logger) << "JSourceArrow: Waiting on in-flight barrier event" << LOG_END;
0085                 LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result ComeBackLater"<< LOG_END;
0086                 assert(event == nullptr);
0087                 output_count = 0;
0088                 status = JArrow::FireResult::ComeBackLater;
0089                 return;
0090             }
0091         }
0092     }
0093 
0094     while (m_current_source < m_sources.size()) {
0095 
0096         auto source_status = m_sources[m_current_source]->DoNext(event->shared_from_this());
0097 
0098         if (source_status == JEventSource::Result::FailureFinished) {
0099             LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result FailureFinished"<< LOG_END;
0100             m_current_source++;
0101             // TODO: Adjust nskip and nevents for the new source
0102         }
0103         else if (source_status == JEventSource::Result::FailureTryAgain){
0104             // This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater
0105             LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result ComeBackLater"<< LOG_END;
0106             outputs[0] = {event, 0}; // Reject
0107             output_count = 1;
0108             status = JArrow::FireResult::ComeBackLater;
0109             return;
0110         }
0111         else if (event->GetSequential()){
0112             // Source succeeded, but returned a barrier event
0113             LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result Success, holding back barrier event# " << event->GetEventNumber() << LOG_END;
0114             m_pending_barrier_event = event;
0115             m_barrier_active = true;
0116             m_next_input_port = -1; // Stop popping events from the input queue until barrier event has finished
0117             
0118             // Arrow hangs on to the barrier event until the topology fully drains
0119             output_count = 0;
0120             status = JArrow::FireResult::KeepGoing; // Mysteriously livelocks if we set this to ComeBackLater??
0121             return;
0122         }
0123         else {
0124             // Source succeeded, did NOT return a barrier event
0125             LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result Success, emitting event# " << event->GetEventNumber() << LOG_END;
0126             outputs[0] = {event, 1}; // SUCCESS!
0127             output_count = 1;
0128             status = JArrow::FireResult::KeepGoing;
0129             return;
0130         }
0131     }
0132 
0133     // All event sources have finished now
0134     outputs[0] = {event, 0}; // Reject event
0135     output_count = 1;
0136     status = JArrow::FireResult::Finished;
0137 }
0138 
0139 void JSourceArrow::Initialize() {
0140     // We initialize everything immediately, but don't open any resources until we absolutely have to; see process(): source->DoNext()
0141     for (JEventSource* source : m_sources) {
0142         source->DoInit();
0143         LOG_INFO(m_logger) << "Initialized JEventSource '" << source->GetTypeName() << "' ('" << source->GetResourceName() << "')" << LOG_END;
0144     }
0145 }
0146 
0147 void JSourceArrow::Finalize() {
0148     // Generally JEventSources finalize themselves as soon as they detect that they have run out of events.
0149     // However, we can't rely on the JEventSources turning themselves off since execution can be externally paused.
0150     // Instead we leave everything open until we finalize the whole topology, and finalize remaining event sources then.
0151     for (JEventSource* source : m_sources) {
0152         source->DoClose();
0153     }
0154 }