File indexing completed on 2025-01-18 10:17:36
0001
0002
0003
0004
0005 #pragma once
0006 #include <cassert>
0007 #include <vector>
0008
0009 #include <JANA/JLogger.h>
0010 #include <JANA/JException.h>
0011 #include <JANA/Topology/JEventQueue.h>
0012 #include <JANA/Topology/JEventPool.h>
0013
0014
0015 class JArrow {
0016 friend class JTopologyBuilder;
0017
0018 public:
0019 using OutputData = std::array<std::pair<JEvent*, int>, 2>;
0020 enum class FireResult {NotRunYet, KeepGoing, ComeBackLater, Finished};
0021
0022 struct Port {
0023 JEventQueue* queue = nullptr;
0024 JEventPool* pool = nullptr;
0025 bool is_input = false;
0026 };
0027
0028 private:
0029 std::string m_name;
0030 bool m_is_parallel;
0031 bool m_is_source;
0032 bool m_is_sink;
0033
0034 protected:
0035 using clock_t = std::chrono::steady_clock;
0036
0037 int m_next_input_port=0;
0038 clock_t::time_point m_next_visit_time=clock_t::now();
0039 std::vector<Port> m_ports;
0040 JLogger m_logger;
0041
0042 public:
0043 const std::string& get_name() { return m_name; }
0044 JLogger& get_logger() { return m_logger; }
0045 bool is_parallel() { return m_is_parallel; }
0046 bool is_source() { return m_is_source; }
0047 bool is_sink() { return m_is_sink; }
0048 Port& get_port(size_t port_index) { return m_ports.at(port_index); }
0049 int get_next_port_index() { return m_next_input_port; }
0050
0051 void set_name(std::string name) { m_name = name; }
0052 void set_logger(JLogger logger) { m_logger = logger; }
0053 void set_is_parallel(bool is_parallel) { m_is_parallel = is_parallel; }
0054 void set_is_source(bool is_source) { m_is_source = is_source; }
0055 void set_is_sink(bool is_sink) { m_is_sink = is_sink; }
0056
0057
0058 JArrow() {
0059 m_is_parallel = false;
0060 m_is_source = false;
0061 m_is_sink = false;
0062 }
0063
0064 JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink) :
0065 m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink) {
0066 };
0067
0068 virtual ~JArrow() = default;
0069
0070 virtual void initialize() { };
0071
0072 virtual FireResult execute(size_t location_id);
0073
0074 virtual void fire(JEvent*, OutputData&, size_t&, FireResult&) {};
0075
0076 virtual void finalize() {};
0077
0078 void create_ports(size_t inputs, size_t outputs);
0079
0080 void attach(JEventQueue* queue, size_t port);
0081 void attach(JEventPool* pool, size_t port);
0082
0083 JEvent* pull(size_t input_port, size_t location_id);
0084 void push(OutputData& outputs, size_t output_count, size_t location_id);
0085 };
0086
0087
0088 std::string to_string(JArrow::FireResult r);