File indexing completed on 2025-11-03 10:10:17
0001
0002
0003
0004
0005
0006 #include <JANA/JApplication.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/Topology/JEventSourceArrow.h>
0009
0010
0011
0012 JEventSourceArrow::JEventSourceArrow(std::string name, std::vector<JEventSource*> sources)
0013 : m_sources(sources) {
0014 set_name(name);
0015 set_is_source(true);
0016 create_ports(1, 1);
0017 m_ports[EVENT_OUT].establishes_ordering = true;
0018
0019
0020
0021
0022
0023
0024 }
0025
0026
0027 void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0028
0029 LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END;
0030
0031
0032 if (m_barrier_active) {
0033
0034 auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount();
0035 auto finished_event_count = m_sources[m_current_source]->GetProcessedEventCount();
0036
0037
0038 if (m_pending_barrier_event != nullptr) {
0039
0040
0041 if ((emitted_event_count - finished_event_count) == 1) {
0042 LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END;
0043
0044
0045
0046 outputs[0] = {m_pending_barrier_event, 1};
0047 output_count = 1;
0048 status = JArrow::FireResult::KeepGoing;
0049
0050 m_pending_barrier_event = nullptr;
0051
0052
0053
0054 return;
0055 }
0056 else {
0057
0058 LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END;
0059 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0060
0061 assert(event == nullptr);
0062 output_count = 0;
0063 status = JArrow::FireResult::ComeBackLater;
0064 return;
0065 }
0066 }
0067 else {
0068
0069
0070 if (finished_event_count == emitted_event_count) {
0071
0072
0073 LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END;
0074 m_barrier_active = false;
0075 m_next_input_port = 0;
0076
0077 output_count = 0;
0078 status = JArrow::FireResult::KeepGoing;
0079 return;
0080 }
0081 else {
0082
0083 LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on in-flight barrier event" << LOG_END;
0084 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0085 assert(event == nullptr);
0086 output_count = 0;
0087 status = JArrow::FireResult::ComeBackLater;
0088 return;
0089 }
0090 }
0091 }
0092
0093 while (m_current_source < m_sources.size()) {
0094
0095 auto source_status = m_sources[m_current_source]->DoNext(event->shared_from_this());
0096
0097 if (source_status == JEventSource::Result::FailureFinished) {
0098 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureFinished"<< LOG_END;
0099 m_current_source++;
0100
0101 }
0102 else if (source_status == JEventSource::Result::FailureTryAgain){
0103
0104 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0105 outputs[0] = {event, 0};
0106 output_count = 1;
0107 status = JArrow::FireResult::ComeBackLater;
0108 return;
0109 }
0110 else if (event->GetSequential()){
0111
0112 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, holding back barrier event# " << event->GetEventNumber() << LOG_END;
0113 m_pending_barrier_event = event;
0114 m_barrier_active = true;
0115 m_next_input_port = -1;
0116
0117
0118 output_count = 0;
0119 status = JArrow::FireResult::KeepGoing;
0120 return;
0121 }
0122 else {
0123
0124 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, emitting event# " << event->GetEventNumber() << LOG_END;
0125 outputs[0] = {event, 1};
0126 output_count = 1;
0127 status = JArrow::FireResult::KeepGoing;
0128 return;
0129 }
0130 }
0131
0132
0133 outputs[0] = {event, 0};
0134 output_count = 1;
0135 status = JArrow::FireResult::Finished;
0136 }
0137
0138 void JEventSourceArrow::initialize() {
0139
0140 for (JEventSource* source : m_sources) {
0141 source->DoInit();
0142 LOG_INFO(m_logger) << "Initialized JEventSource '" << source->GetTypeName() << "' ('" << source->GetResourceName() << "')" << LOG_END;
0143 }
0144 }
0145
0146 void JEventSourceArrow::finalize() {
0147
0148
0149
0150 for (JEventSource* source : m_sources) {
0151 source->DoClose();
0152 }
0153 }