File indexing completed on 2025-08-28 09:13:20
0001 #include "JANA/JEventSource.h"
0002 #include "JANA/Topology/JArrow.h"
0003 #include "JANA/Utils/JEventLevel.h"
0004 #include <JANA/Topology/JMultilevelSourceArrow.h>
0005
0006
0007 void JMultilevelSourceArrow::SetEventSource(JEventSource* source) {
0008 m_source = source;
0009 m_levels = source->GetEventLevels();
0010 m_child_event_level = m_levels.back();
0011 m_next_input_port = 0;
0012
0013 size_t input_port_count = 0;
0014 size_t output_port_count = 0;
0015 for (auto level : m_levels) {
0016 m_port_lookup[{level, Direction::In}] = input_port_count++;
0017 }
0018 for (auto level : m_levels) {
0019 m_port_lookup[{level, Direction::Out}] = input_port_count + output_port_count++;
0020 }
0021
0022 create_ports(input_port_count, output_port_count);
0023 }
0024
0025 const std::vector<JEventLevel>& JMultilevelSourceArrow::GetLevels() const {
0026 return m_levels;
0027 }
0028
0029 size_t JMultilevelSourceArrow::GetPortIndex(JEventLevel level, Direction direction) const {
0030 return m_port_lookup.at({level, direction});
0031 };
0032
0033 void JMultilevelSourceArrow::initialize() {
0034
0035 m_source->DoInit();
0036 }
0037
0038 void JMultilevelSourceArrow::finalize() {
0039
0040
0041
0042 m_source->DoClose();
0043 }
0044
0045 void JMultilevelSourceArrow::EvictNextParent(OutputData& outputs, size_t& output_count) {
0046
0047
0048
0049
0050 auto it = m_pending_parents.find(m_next_input_level);
0051 if (it != m_pending_parents.end()) {
0052 if (it->second.first != nullptr) {
0053
0054 size_t parent_output_port = GetPortIndex(m_next_input_level, Direction::Out);
0055 LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Evicting parent " << it->second.first->GetEventStamp() << " to port " << parent_output_port;
0056 outputs.at(output_count++) = {it->second.first, parent_output_port};
0057 it->second.first = nullptr;
0058 }
0059 }
0060 }
0061
0062 void JMultilevelSourceArrow::fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) {
0063
0064 if (!m_finish_in_progress) {
0065
0066 LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << LOG_END;
0067 auto result = m_source->DoNext(input->shared_from_this());
0068 m_next_input_level = m_source->GetNextInputLevel();
0069 m_next_input_port = GetPortIndex(m_next_input_level, Direction::In);
0070
0071 LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Returned from DoNext(" << toString(input->GetLevel()) << "). Next input level is " << toString(m_next_input_level);
0072
0073 if (result == JEventSource::Result::Success) {
0074
0075
0076 if (input->GetLevel() == m_child_event_level) {
0077
0078
0079 for (auto [level, parent_pair] : m_pending_parents) {
0080
0081
0082 if (parent_pair.first != nullptr) {
0083 LOG_DEBUG(get_logger()) << "JMultilevelSourceArrow: Attaching parent: " << parent_pair.first->GetEventStamp() << " to event " << input->GetEventStamp();
0084 input->SetParent(parent_pair.first);
0085 }
0086 }
0087 outputs.at(output_count++) = {input, GetPortIndex(m_child_event_level, Direction::Out)};
0088
0089 if (m_next_input_level != m_child_event_level) {
0090
0091 EvictNextParent(outputs, output_count);
0092 }
0093 status = JArrow::FireResult::KeepGoing;
0094 return;
0095 }
0096 else {
0097
0098
0099 if (m_next_input_level != m_child_event_level) {
0100
0101 EvictNextParent(outputs, output_count);
0102 }
0103
0104 auto it = m_pending_parents.find(input->GetLevel());
0105 if (it != m_pending_parents.end()) {
0106
0107 if (it->second.first != nullptr) {
0108 throw JException("Found a parent event we weren't expecting");
0109 }
0110 it->second.first = input;
0111 it->second.second += 1;
0112 status = JArrow::FireResult::KeepGoing;
0113 return;
0114 }
0115 else {
0116 m_pending_parents[input->GetLevel()] = {input, 1};
0117 status = JArrow::FireResult::KeepGoing;
0118 return;
0119 }
0120 }
0121 }
0122 else if (result == JEventSource::Result::FailureTryAgain) {
0123 if (m_next_input_level != m_child_event_level) {
0124 EvictNextParent(outputs, output_count);
0125 }
0126
0127 outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)};
0128 status = JArrow::FireResult::ComeBackLater;
0129 return;
0130 }
0131 else if (result == JEventSource::Result::FailureLevelChange) {
0132 if (m_next_input_level != m_child_event_level) {
0133 EvictNextParent(outputs, output_count);
0134 }
0135
0136 outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)};
0137
0138 status = JArrow::FireResult::KeepGoing;
0139 return;
0140 }
0141 else if (result == JEventSource::Result::FailureFinished) {
0142
0143 outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)};
0144 m_finish_in_progress = true;
0145
0146 }
0147 }
0148
0149
0150 bool no_more_parents = false;
0151 while (output_count < 2 && !no_more_parents) {
0152
0153 auto it = m_pending_parents.begin();
0154 if (it != m_pending_parents.end()) {
0155
0156 auto parent = it->second.first;
0157 if (parent != nullptr) {
0158 outputs.at(output_count++) = {parent, GetPortIndex(parent->GetLevel(), Direction::Out)};
0159 }
0160 m_pending_parents.erase(it);
0161 }
0162 else {
0163 no_more_parents = true;
0164 }
0165 }
0166 status = (no_more_parents) ? JArrow::FireResult::Finished : JArrow::FireResult::KeepGoing;
0167 return;
0168 }