File indexing completed on 2025-01-18 10:17:36
0001
0002
0003
0004
0005
0006 #include <JANA/JApplication.h>
0007 #include <JANA/JEventSource.h>
0008 #include <JANA/Topology/JEventSourceArrow.h>
0009
0010
0011
0012 JEventSourceArrow::JEventSourceArrow(std::string name, std::vector<JEventSource*> sources)
0013 : m_sources(sources) {
0014 set_name(name);
0015 set_is_source(true);
0016 create_ports(1, 1);
0017 }
0018
0019
0020 void JEventSourceArrow::fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0021
0022 LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END;
0023
0024
0025 if (m_barrier_active) {
0026
0027 auto emitted_event_count = m_sources[m_current_source]->GetEmittedEventCount();
0028 auto finished_event_count = m_sources[m_current_source]->GetFinishedEventCount();
0029
0030
0031 if (m_pending_barrier_event != nullptr) {
0032
0033
0034 if ((emitted_event_count - finished_event_count) == 1) {
0035 LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event is in-flight" << LOG_END;
0036
0037
0038
0039 outputs[0] = {m_pending_barrier_event, 1};
0040 output_count = 1;
0041 status = JArrow::FireResult::KeepGoing;
0042
0043 m_pending_barrier_event = nullptr;
0044
0045
0046
0047 return;
0048 }
0049 else {
0050
0051 LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on pending barrier event. Emitted = " << emitted_event_count << ", Finished = " << finished_event_count << LOG_END;
0052 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0053
0054 assert(event == nullptr);
0055 output_count = 0;
0056 status = JArrow::FireResult::ComeBackLater;
0057 return;
0058 }
0059 }
0060 else {
0061
0062
0063 if (finished_event_count == emitted_event_count) {
0064
0065
0066 LOG_DEBUG(m_logger) << "JEventSourceArrow: Barrier event finished, returning to normal operation" << LOG_END;
0067 m_barrier_active = false;
0068 m_next_input_port = 0;
0069
0070 output_count = 0;
0071 status = JArrow::FireResult::KeepGoing;
0072 return;
0073 }
0074 else {
0075
0076 LOG_DEBUG(m_logger) << "JEventSourceArrow: Waiting on in-flight barrier event" << LOG_END;
0077 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0078 assert(event == nullptr);
0079 output_count = 0;
0080 status = JArrow::FireResult::ComeBackLater;
0081 return;
0082 }
0083 }
0084 }
0085
0086 while (m_current_source < m_sources.size()) {
0087
0088 auto source_status = m_sources[m_current_source]->DoNext(event->shared_from_this());
0089
0090 if (source_status == JEventSource::Result::FailureFinished) {
0091 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result FailureFinished"<< LOG_END;
0092 m_current_source++;
0093
0094 }
0095 else if (source_status == JEventSource::Result::FailureTryAgain){
0096
0097 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result ComeBackLater"<< LOG_END;
0098 outputs[0] = {event, 0};
0099 output_count = 1;
0100 status = JArrow::FireResult::ComeBackLater;
0101 return;
0102 }
0103 else if (event->GetSequential()){
0104
0105 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, holding back barrier event# " << event->GetEventNumber() << LOG_END;
0106 m_pending_barrier_event = event;
0107 m_barrier_active = true;
0108 m_next_input_port = -1;
0109
0110
0111 output_count = 0;
0112 status = JArrow::FireResult::KeepGoing;
0113 return;
0114 }
0115 else {
0116
0117 LOG_DEBUG(m_logger) << "Executed arrow " << get_name() << " with result Success, emitting event# " << event->GetEventNumber() << LOG_END;
0118 outputs[0] = {event, 1};
0119 output_count = 1;
0120 status = JArrow::FireResult::KeepGoing;
0121 return;
0122 }
0123 }
0124
0125
0126 outputs[0] = {event, 0};
0127 output_count = 1;
0128 status = JArrow::FireResult::Finished;
0129 }
0130
0131 void JEventSourceArrow::initialize() {
0132
0133 for (JEventSource* source : m_sources) {
0134 source->DoInit();
0135 }
0136 }
0137
0138 void JEventSourceArrow::finalize() {
0139
0140
0141
0142 for (JEventSource* source : m_sources) {
0143 source->DoClose();
0144 }
0145 }