File indexing completed on 2025-01-18 10:17:35
0001
0002 #include <JANA/Topology/JArrow.h>
0003
0004 void JArrow::create_ports(size_t inputs, size_t outputs) {
0005 m_ports.clear();
0006 for (size_t i=0; i<inputs; ++i) {
0007 m_ports.push_back({nullptr, nullptr, true});
0008 }
0009 for (size_t i=0; i<outputs; ++i) {
0010 m_ports.push_back({nullptr, nullptr, false});
0011 }
0012 }
0013
0014
0015 void JArrow::attach(JEventQueue* queue, size_t port) {
0016
0017
0018 if (port >= m_ports.size()) {
0019 throw JException("Attempting to attach to a non-existent port! arrow=%s, port=%d", m_name.c_str(), port);
0020 }
0021 m_ports[port].queue = queue;
0022 }
0023
0024
0025 void JArrow::attach(JEventPool* pool, size_t port) {
0026
0027
0028 if (port >= m_ports.size()) {
0029 throw JException("Attempting to attach to a non-existent place! arrow=%s, port=%d", m_name.c_str(), port);
0030 }
0031 m_ports[port].pool = pool;
0032 }
0033
0034
0035 JEvent* JArrow::pull(size_t port_index, size_t location_id) {
0036 JEvent* event = nullptr;
0037 auto& port = m_ports.at(port_index);
0038 if (port.queue != nullptr) {
0039 event = port.queue->Pop(location_id);
0040 }
0041 else if (port.pool != nullptr){
0042 event = port.pool->Pop( location_id);
0043 }
0044 else {
0045 throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index);
0046 }
0047
0048 return event;
0049 }
0050
0051
0052 void JArrow::push(OutputData& outputs, size_t output_count, size_t location_id) {
0053 for (size_t output = 0; output < output_count; ++output) {
0054 JEvent* event = outputs[output].first;
0055 int port_index = outputs[output].second;
0056 Port& port = m_ports.at(port_index);
0057 if (port.queue != nullptr) {
0058 port.queue->Push(event, location_id);
0059 }
0060 else if (port.pool != nullptr) {
0061 if (!port.is_input) {
0062 event->Clear();
0063 }
0064 port.pool->Push(event, location_id);
0065 }
0066 else {
0067 throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index);
0068 }
0069 }
0070 }
0071
0072 JArrow::FireResult JArrow::execute(size_t location_id) {
0073
0074 auto start_total_time = std::chrono::steady_clock::now();
0075 if (m_next_visit_time > start_total_time) {
0076
0077 return FireResult::ComeBackLater;
0078 }
0079
0080 JEvent* input = nullptr;
0081 if (m_next_input_port != -1) {
0082 input = pull(m_next_input_port, location_id);
0083 }
0084
0085 if (input == nullptr && m_next_input_port != -1) {
0086
0087 return FireResult::NotRunYet;
0088 }
0089
0090
0091
0092
0093 OutputData outputs;
0094 size_t output_count;
0095 JArrow::FireResult result = JArrow::FireResult::KeepGoing;
0096
0097 fire(input, outputs, output_count, result);
0098
0099 push(outputs, output_count, location_id);
0100
0101 return result;
0102 }
0103
0104
0105 std::string to_string(JArrow::FireResult r) {
0106 switch (r) {
0107 case JArrow::FireResult::NotRunYet: return "NotRunYet";
0108 case JArrow::FireResult::KeepGoing: return "KeepGoing";
0109 case JArrow::FireResult::ComeBackLater: return "ComeBackLater";
0110 case JArrow::FireResult::Finished: return "Finished";
0111 default: return "Error";
0112 }
0113 }
0114
0115
0116