File indexing completed on 2025-07-12 08:56:07
0001
0002 #include <JANA/Topology/JArrow.h>
0003
0004 void JArrow::create_ports(size_t inputs, size_t outputs) {
0005 m_ports.clear();
0006 for (size_t i=0; i<inputs; ++i) {
0007 m_ports.push_back({nullptr, nullptr, true});
0008 }
0009 for (size_t i=0; i<outputs; ++i) {
0010 m_ports.push_back({nullptr, nullptr, false});
0011 }
0012 }
0013
0014
0015 void JArrow::attach(JEventQueue* queue, size_t port) {
0016
0017
0018 if (port >= m_ports.size()) {
0019 throw JException("Attempting to attach to a non-existent port! arrow=%s, port=%d", m_name.c_str(), port);
0020 }
0021 m_ports[port].queue = queue;
0022 }
0023
0024
0025 void JArrow::attach(JEventPool* pool, size_t port) {
0026
0027
0028 if (port >= m_ports.size()) {
0029 throw JException("Attempting to attach to a non-existent place! arrow=%s, port=%d", m_name.c_str(), port);
0030 }
0031 m_ports[port].pool = pool;
0032 }
0033
0034
0035 JEvent* JArrow::pull(size_t port_index, size_t location_id) {
0036 JEvent* event = nullptr;
0037 auto& port = m_ports.at(port_index);
0038 if (port.queue != nullptr) {
0039 event = port.queue->Pop(location_id);
0040 }
0041 else if (port.pool != nullptr){
0042 event = port.pool->Pop( location_id);
0043 }
0044 else {
0045 throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index);
0046 }
0047
0048 return event;
0049 }
0050
0051
0052 void JArrow::push(OutputData& outputs, size_t output_count, size_t location_id) {
0053 for (size_t output = 0; output < output_count; ++output) {
0054 JEvent* event = outputs[output].first;
0055 int port_index = outputs[output].second;
0056 Port& port = m_ports.at(port_index);
0057 if (port.queue != nullptr) {
0058 port.queue->Push(event, location_id);
0059 }
0060 else if (port.pool != nullptr) {
0061 event->Clear(!port.is_input);
0062 port.pool->Push(event, location_id);
0063 }
0064 else {
0065 throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index);
0066 }
0067 }
0068 }
0069
0070 JArrow::FireResult JArrow::execute(size_t location_id) {
0071
0072 auto start_total_time = std::chrono::steady_clock::now();
0073 if (m_next_visit_time > start_total_time) {
0074
0075 return FireResult::ComeBackLater;
0076 }
0077
0078 JEvent* input = nullptr;
0079 if (m_next_input_port != -1) {
0080 input = pull(m_next_input_port, location_id);
0081 }
0082
0083 if (input == nullptr && m_next_input_port != -1) {
0084
0085 return FireResult::NotRunYet;
0086 }
0087
0088
0089
0090
0091 OutputData outputs;
0092 size_t output_count;
0093 JArrow::FireResult result = JArrow::FireResult::KeepGoing;
0094
0095 fire(input, outputs, output_count, result);
0096
0097 push(outputs, output_count, location_id);
0098
0099 return result;
0100 }
0101
0102
0103 std::string to_string(JArrow::FireResult r) {
0104 switch (r) {
0105 case JArrow::FireResult::NotRunYet: return "NotRunYet";
0106 case JArrow::FireResult::KeepGoing: return "KeepGoing";
0107 case JArrow::FireResult::ComeBackLater: return "ComeBackLater";
0108 case JArrow::FireResult::Finished: return "Finished";
0109 default: return "Error";
0110 }
0111 }
0112
0113
0114