File indexing completed on 2026-05-09 09:02:00
0001
0002
0003
0004
0005
0006 #include <JANA/JApplication.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/Topology/JSourceArrow.h>
0009
0010
0011
0012 JSourceArrow::JSourceArrow(std::string name, JEventLevel level, std::vector<JEventSource*> sources)
0013 : m_sources(sources) {
0014 SetName(name);
0015 SetIsSource(true);
0016 AddPort("in", level).SetSkipFinishEvent(true);
0017 AddPort("out", level).SetEstablishesOrdering(true);
0018
0019
0020
0021
0022
0023
0024
0025 }
0026
0027
0028 void JSourceArrow::Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0029
0030 LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << LOG_END;
0031
0032
0033 if (m_barrier_active) {
0034
0035 auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount();
0036 auto finished_event_count = m_sources[m_current_source]->GetProcessedEventCount();
0037
0038
0039 if (m_pending_barrier_event != nullptr) {
0040
0041
0042 if ((emitted_event_count - finished_event_count) == 1) {
0043 LOG_DEBUG(m_logger) << "JSourceArrow: Barrier event is in-flight" << LOG_END;
0044
0045
0046
0047 outputs[0] = {m_pending_barrier_event, 1};
0048 output_count = 1;
0049 status = JArrow::FireResult::KeepGoing;
0050
0051 m_pending_barrier_event = nullptr;
0052
0053
0054
0055 return;
0056 }
0057 else {
0058
0059 LOG_DEBUG(m_logger) << "JSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END;
0060 LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result ComeBackLater"<< LOG_END;
0061
0062 assert(event == nullptr);
0063 output_count = 0;
0064 status = JArrow::FireResult::ComeBackLater;
0065 return;
0066 }
0067 }
0068 else {
0069
0070
0071 if (finished_event_count == emitted_event_count) {
0072
0073
0074 LOG_DEBUG(m_logger) << "JSourceArrow: Barrier event finished, returning to normal operation" << LOG_END;
0075 m_barrier_active = false;
0076 m_next_input_port = 0;
0077
0078 output_count = 0;
0079 status = JArrow::FireResult::KeepGoing;
0080 return;
0081 }
0082 else {
0083
0084 LOG_DEBUG(m_logger) << "JSourceArrow: Waiting on in-flight barrier event" << LOG_END;
0085 LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result ComeBackLater"<< LOG_END;
0086 assert(event == nullptr);
0087 output_count = 0;
0088 status = JArrow::FireResult::ComeBackLater;
0089 return;
0090 }
0091 }
0092 }
0093
0094 while (m_current_source < m_sources.size()) {
0095
0096 auto source_status = m_sources[m_current_source]->DoNext(event->shared_from_this());
0097
0098 if (source_status == JEventSource::Result::FailureFinished) {
0099 LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result FailureFinished"<< LOG_END;
0100 m_current_source++;
0101
0102 }
0103 else if (source_status == JEventSource::Result::FailureTryAgain){
0104
0105 LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result ComeBackLater"<< LOG_END;
0106 outputs[0] = {event, 0};
0107 output_count = 1;
0108 status = JArrow::FireResult::ComeBackLater;
0109 return;
0110 }
0111 else if (event->GetSequential()){
0112
0113 LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result Success, holding back barrier event# " << event->GetEventNumber() << LOG_END;
0114 m_pending_barrier_event = event;
0115 m_barrier_active = true;
0116 m_next_input_port = -1;
0117
0118
0119 output_count = 0;
0120 status = JArrow::FireResult::KeepGoing;
0121 return;
0122 }
0123 else {
0124
0125 LOG_DEBUG(m_logger) << "Executed arrow " << GetName() << " with result Success, emitting event# " << event->GetEventNumber() << LOG_END;
0126 outputs[0] = {event, 1};
0127 output_count = 1;
0128 status = JArrow::FireResult::KeepGoing;
0129 return;
0130 }
0131 }
0132
0133
0134 outputs[0] = {event, 0};
0135 output_count = 1;
0136 status = JArrow::FireResult::Finished;
0137 }
0138
0139 void JSourceArrow::Initialize() {
0140
0141 for (JEventSource* source : m_sources) {
0142 source->DoInit();
0143 LOG_INFO(m_logger) << "Initialized JEventSource '" << source->GetTypeName() << "' ('" << source->GetResourceName() << "')" << LOG_END;
0144 }
0145 }
0146
0147 void JSourceArrow::Finalize() {
0148
0149
0150
0151 for (JEventSource* source : m_sources) {
0152 source->DoClose();
0153 }
0154 }