File indexing completed on 2026-07-01 08:44:16
0001
0002
0003
0004
0005 #pragma once
0006 #include <cassert>
0007 #include <vector>
0008
0009 #include <JANA/JLogger.h>
0010 #include <JANA/JException.h>
0011 #include <JANA/Topology/JEventQueue.h>
0012 #include <JANA/Topology/JEventPool.h>
0013
0014
0015 class JArrow {
0016 friend class JTopologyBuilder;
0017
0018 public:
0019 using OutputData = std::array<std::pair<JEvent*, int>, 2>;
0020 enum class FireResult {NotRunYet, KeepGoing, ComeBackLater, Finished};
0021 enum class PortDirection { In, Out };
0022
0023 class Port {
0024 std::string m_name;
0025 std::vector<JEventLevel> m_levels;
0026 JEventQueue* m_queue = nullptr;
0027 JEventPool* m_pool = nullptr;
0028 bool m_skip_finish_event = false;
0029 bool m_establishes_ordering = false;
0030 bool m_enforces_ordering = false;
0031
0032 public:
0033 Port(std::string name, std::vector<JEventLevel> levels): m_name(name), m_levels(levels) {};
0034
0035 Port(std::string name, JEventLevel level): m_name(name) {
0036 m_levels.push_back(level);
0037 };
0038
0039 const std::string& GetName() { return m_name; }
0040 const std::vector<JEventLevel>& GetLevels() { return m_levels; }
0041 bool GetEstablishesOrdering() { return m_establishes_ordering; }
0042 bool GetEnforcesOrdering() { return m_enforces_ordering; }
0043 bool GetSkipFinishEvent() { return m_skip_finish_event; }
0044
0045 Port& SetEstablishesOrdering(bool establishes) {
0046 m_establishes_ordering = establishes;
0047 return *this;
0048 }
0049
0050 Port& SetEnforcesOrdering(bool enforces) {
0051 m_enforces_ordering = enforces;
0052 return *this;
0053 }
0054
0055 Port& SetSkipFinishEvent(bool skip_finish_event) {
0056 this->m_skip_finish_event = skip_finish_event;
0057 return *this;
0058 }
0059
0060 inline JEventPool* GetPool() { return m_pool; }
0061 inline JEventQueue* GetQueue() { return m_queue; }
0062
0063 void Attach(JEventQueue* queue) {
0064 this->m_pool = nullptr;
0065 this->m_queue = queue;
0066 }
0067
0068 void Attach(JEventPool* pool) {
0069 this->m_pool = pool;
0070 this->m_queue = nullptr;
0071 }
0072 };
0073
0074 private:
0075 std::string m_name;
0076 int m_id;
0077 bool m_is_parallel = false;
0078 bool m_is_source = false;
0079 bool m_is_sink = false;
0080
0081 protected:
0082 using clock_t = std::chrono::steady_clock;
0083
0084 int m_next_input_port=0;
0085 clock_t::time_point m_next_visit_time=clock_t::now();
0086 std::vector<std::unique_ptr<Port>> m_ports;
0087 std::map<std::string, int> m_port_lookup;
0088 std::map<std::pair<JEventLevel, PortDirection>, int> m_auto_port_lookup;
0089 JLogger m_logger;
0090
0091 public:
0092 JArrow() = default;
0093
0094 virtual ~JArrow() = default;
0095
0096 virtual void Initialize() {};
0097
0098 virtual void Fire(JEvent*, OutputData&, size_t&, FireResult&) {};
0099
0100 virtual void Finalize() {};
0101
0102
0103 FireResult Execute(size_t location_id);
0104
0105 JEvent* Pull(size_t input_port, size_t location_id);
0106
0107 void Push(OutputData& outputs, size_t output_count, size_t location_id);
0108
0109
0110 const std::string& GetName() { return m_name; }
0111 int GetId() { return m_id; }
0112 JLogger& GetLogger() { return m_logger; }
0113 bool IsParallel() { return m_is_parallel; }
0114 bool IsSource() { return m_is_source; }
0115 bool IsSink() { return m_is_sink; }
0116 int GetNextPortIndex() { return m_next_input_port; }
0117
0118 void SetName(std::string name) { m_name = name; }
0119 void SetId(int id) { m_id = id; }
0120 void SetLogger(JLogger logger) { m_logger = logger; }
0121 void SetIsParallel(bool is_parallel) { m_is_parallel = is_parallel; }
0122 void SetIsSource(bool is_source) { m_is_source = is_source; }
0123 void SetIsSink(bool is_sink) { m_is_sink = is_sink; }
0124
0125 Port& AddPort(std::string port_name, JEventLevel level, PortDirection direction);
0126 Port& GetPort(size_t port_index) { return *m_ports.at(port_index); }
0127 Port& GetPort(JEventLevel level, PortDirection direction) { return *m_ports.at(m_auto_port_lookup.at({level, direction})); }
0128
0129 int GetPortIndex(JEventLevel level, PortDirection direction);
0130 int GetPortIndex(const std::string& port_name);
0131 void SetNextPortIndex(int input_port) { m_next_input_port = input_port; }
0132
0133 };
0134
0135
0136 std::string ToString(JArrow::FireResult r);
0137 std::string ToString(JArrow::PortDirection d);