File indexing completed on 2025-11-02 10:01:47
0001
0002
0003
0004
0005 #pragma once
0006 #include <cassert>
0007 #include <vector>
0008
0009 #include <JANA/JLogger.h>
0010 #include <JANA/JException.h>
0011 #include <JANA/Topology/JEventQueue.h>
0012 #include <JANA/Topology/JEventPool.h>
0013
0014
0015 class JArrow {
0016 friend class JTopologyBuilder;
0017
0018 public:
0019 using OutputData = std::array<std::pair<JEvent*, int>, 2>;
0020 enum class FireResult {NotRunYet, KeepGoing, ComeBackLater, Finished};
0021
0022 struct Port {
0023 JEventQueue* queue = nullptr;
0024 JEventPool* pool = nullptr;
0025 bool is_input = false;
0026 bool establishes_ordering = false;
0027 bool enforces_ordering = false;
0028 };
0029
0030 private:
0031 std::string m_name;
0032 bool m_is_parallel;
0033 bool m_is_source;
0034 bool m_is_sink;
0035
0036 protected:
0037 using clock_t = std::chrono::steady_clock;
0038
0039 int m_next_input_port=0;
0040 clock_t::time_point m_next_visit_time=clock_t::now();
0041 std::vector<Port> m_ports;
0042 JLogger m_logger;
0043
0044 public:
0045 const std::string& get_name() { return m_name; }
0046 JLogger& get_logger() { return m_logger; }
0047 bool is_parallel() { return m_is_parallel; }
0048 bool is_source() { return m_is_source; }
0049 bool is_sink() { return m_is_sink; }
0050 Port& get_port(size_t port_index) { return m_ports.at(port_index); }
0051 int get_next_port_index() { return m_next_input_port; }
0052
0053 void set_name(std::string name) { m_name = name; }
0054 void set_logger(JLogger logger) { m_logger = logger; }
0055 void set_is_parallel(bool is_parallel) { m_is_parallel = is_parallel; }
0056 void set_is_source(bool is_source) { m_is_source = is_source; }
0057 void set_is_sink(bool is_sink) { m_is_sink = is_sink; }
0058
0059
0060 JArrow() {
0061 m_is_parallel = false;
0062 m_is_source = false;
0063 m_is_sink = false;
0064 }
0065
0066 JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink) :
0067 m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink) {
0068 };
0069
0070 virtual ~JArrow() = default;
0071
0072 virtual void initialize() { };
0073
0074 virtual FireResult execute(size_t location_id);
0075
0076 virtual void fire(JEvent*, OutputData&, size_t&, FireResult&) {};
0077
0078 virtual void finalize() {};
0079
0080 void create_ports(size_t inputs, size_t outputs);
0081
0082 void attach(JEventQueue* queue, size_t port);
0083 void attach(JEventPool* pool, size_t port);
0084
0085 JEvent* pull(size_t input_port, size_t location_id);
0086 void push(OutputData& outputs, size_t output_count, size_t location_id);
0087 };
0088
0089
0090 std::string to_string(JArrow::FireResult r);