Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:36

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #include <JANA/JApplication.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/Topology/JEventSourceArrow.h>
0009 
0010 
0011 
0012 JEventSourceArrow::JEventSourceArrow(std::string name, std::vector<JEventSource*> sources)
0013     : m_sources(sources) {
0014     set_name(name);
0015     set_is_source(true);
0016     create_ports(1, 1);
0017 }
0018 
0019 
0020 void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0021 
0022     LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END;
0023 
0024     // First check to see if we need to handle a barrier event before attempting to emit another event
0025     if (m_barrier_active) {
0026 
0027         auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount();
0028         auto finished_event_count = m_sources[m_current_source]->GetFinishedEventCount();
0029 
0030         // A barrier event has been emitted by the source.
0031         if (m_pending_barrier_event != nullptr) {
0032 
0033             // This barrier event is pending until the topology drains
0034             if ((emitted_event_count - finished_event_count) == 1) {
0035                 LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END;
0036 
0037                 // Topology has drained; only remaining in-flight event is the barrier event itself,
0038                 // which we have held on to until now
0039                 outputs[0] = {m_pending_barrier_event, 1};
0040                 output_count = 1;
0041                 status = JArrow::FireResult::KeepGoing;
0042 
0043                 m_pending_barrier_event = nullptr;
0044                 // There's not much for the thread team to do while the barrier event makes its way through the topology.
0045                 // Eventually we might be able to use this to communicate to the scheduler to not wake threads whose only
0046                 // available action is to hammer the JEventSourceArrow
0047                 return;
0048             }
0049             else {
0050                 // Topology has _not_ finished draining, all we can do is wait
0051                 LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END;
0052                 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0053 
0054                 assert(event == nullptr);
0055                 output_count = 0;
0056                 status = JArrow::FireResult::ComeBackLater;
0057                 return;
0058             }
0059         }
0060         else {
0061             // This barrier event has already been sent into the topology and we need to wait
0062             // until it is finished before emitting any more events
0063             if (finished_event_count == emitted_event_count) {
0064 
0065                 // Barrier event has finished.
0066                 LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END;
0067                 m_barrier_active = false;
0068                 m_next_input_port = 0;
0069 
0070                 output_count = 0;
0071                 status = JArrow::FireResult::KeepGoing;
0072                 return;
0073             }
0074             else {
0075                 // Barrier event has NOT finished
0076                 LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on in-flight barrier event" << LOG_END;
0077                 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0078                 assert(event == nullptr);
0079                 output_count = 0;
0080                 status = JArrow::FireResult::ComeBackLater;
0081                 return;
0082             }
0083         }
0084     }
0085 
0086     while (m_current_source < m_sources.size()) {
0087 
0088         auto source_status = m_sources[m_current_source]->DoNext(event->shared_from_this());
0089 
0090         if (source_status == JEventSource::Result::FailureFinished) {
0091             LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureFinished"<< LOG_END;
0092             m_current_source++;
0093             // TODO: Adjust nskip and nevents for the new source
0094         }
0095         else if (source_status == JEventSource::Result::FailureTryAgain){
0096             // This JEventSource isn't finished yet, so we obtained either Success or TryAgainLater
0097             LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0098             outputs[0] = {event, 0}; // Reject
0099             output_count = 1;
0100             status = JArrow::FireResult::ComeBackLater;
0101             return;
0102         }
0103         else if (event->GetSequential()){
0104             // Source succeeded, but returned a barrier event
0105             LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, holding back barrier event# " << event->GetEventNumber() << LOG_END;
0106             m_pending_barrier_event = event;
0107             m_barrier_active = true;
0108             m_next_input_port = -1; // Stop popping events from the input queue until barrier event has finished
0109             
0110             // Arrow hangs on to the barrier event until the topology fully drains
0111             output_count = 0;
0112             status = JArrow::FireResult::KeepGoing; // Mysteriously livelocks if we set this to ComeBackLater??
0113             return;
0114         }
0115         else {
0116             // Source succeeded, did NOT return a barrier event
0117             LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, emitting event# " << event->GetEventNumber() << LOG_END;
0118             outputs[0] = {event, 1}; // SUCCESS!
0119             output_count = 1;
0120             status = JArrow::FireResult::KeepGoing;
0121             return;
0122         }
0123     }
0124 
0125     // All event sources have finished now
0126     outputs[0] = {event, 0}; // Reject event
0127     output_count = 1;
0128     status = JArrow::FireResult::Finished;
0129 }
0130 
0131 void JEventSourceArrow::initialize() {
0132     // We initialize everything immediately, but don't open any resources until we absolutely have to; see process(): source->DoNext()
0133     for (JEventSource* source : m_sources) {
0134         source->DoInit();
0135     }
0136 }
0137 
0138 void JEventSourceArrow::finalize() {
0139     // Generally JEventSources finalize themselves as soon as they detect that they have run out of events.
0140     // However, we can't rely on the JEventSources turning themselves off since execution can be externally paused.
0141     // Instead we leave everything open until we finalize the whole topology, and finalize remaining event sources then.
0142     for (JEventSource* source : m_sources) {
0143         source->DoClose();
0144     }
0145 }