Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-07-01 08:44:16

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 #include <cassert>
0007 #include <vector>
0008 
0009 #include <JANA/JLogger.h>
0010 #include <JANA/JException.h>
0011 #include <JANA/Topology/JEventQueue.h>
0012 #include <JANA/Topology/JEventPool.h>
0013 
0014 
0015 class JArrow {
0016     friend class JTopologyBuilder;
0017 
0018 public:
0019     using OutputData = std::array<std::pair<JEvent*, int>, 2>;
0020     enum class FireResult {NotRunYet, KeepGoing, ComeBackLater, Finished};
0021     enum class PortDirection { In, Out };
0022 
0023     class Port {
0024         std::string m_name;
0025         std::vector<JEventLevel> m_levels;
0026         JEventQueue* m_queue = nullptr;
0027         JEventPool* m_pool = nullptr;
0028         bool m_skip_finish_event = false;
0029         bool m_establishes_ordering = false;
0030         bool m_enforces_ordering = false;
0031 
0032     public:
0033         Port(std::string name, std::vector<JEventLevel> levels): m_name(name), m_levels(levels) {};
0034 
0035         Port(std::string name, JEventLevel level): m_name(name) {
0036             m_levels.push_back(level);
0037         };
0038 
0039         const std::string& GetName() { return m_name; }
0040         const std::vector<JEventLevel>& GetLevels() { return m_levels; }
0041         bool GetEstablishesOrdering() { return m_establishes_ordering; }
0042         bool GetEnforcesOrdering() { return m_enforces_ordering; }
0043         bool GetSkipFinishEvent() { return m_skip_finish_event; }
0044 
0045         Port& SetEstablishesOrdering(bool establishes) { 
0046             m_establishes_ordering = establishes; 
0047             return *this;
0048         }
0049 
0050         Port& SetEnforcesOrdering(bool enforces) { 
0051             m_enforces_ordering = enforces;
0052             return *this;
0053         }
0054 
0055         Port& SetSkipFinishEvent(bool skip_finish_event) {
0056             this->m_skip_finish_event = skip_finish_event;
0057             return *this;
0058         }
0059 
0060         inline JEventPool* GetPool() { return m_pool; }
0061         inline JEventQueue* GetQueue() { return m_queue; }
0062 
0063         void Attach(JEventQueue* queue) {
0064             this->m_pool = nullptr;
0065             this->m_queue = queue;
0066         }
0067 
0068         void Attach(JEventPool* pool) {
0069             this->m_pool = pool;
0070             this->m_queue = nullptr;
0071         }
0072     };
0073 
0074 private:
0075     std::string m_name;            // Used for human understanding
0076     int m_id;                      // Used internally
0077     bool m_is_parallel = false;    // Whether or not it is safe to parallelize
0078     bool m_is_source = false;      // Whether or not this arrow should activate/drain the topology
0079     bool m_is_sink = false;        // Whether or not tnis arrow contributes to the final event count
0080 
0081 protected:
0082     using clock_t = std::chrono::steady_clock;
0083 
0084     int m_next_input_port=0; // -1 denotes "no input necessary", e.g. for barrier events
0085     clock_t::time_point m_next_visit_time=clock_t::now();
0086     std::vector<std::unique_ptr<Port>> m_ports;
0087     std::map<std::string, int> m_port_lookup;
0088     std::map<std::pair<JEventLevel, PortDirection>, int> m_auto_port_lookup;
0089     JLogger m_logger;
0090 
0091 public:
0092     JArrow() = default;
0093 
0094     virtual ~JArrow() = default;
0095 
0096     virtual void Initialize() {};
0097 
0098     virtual void Fire(JEvent*, OutputData&, size_t&, FireResult&) {};
0099 
0100     virtual void Finalize() {};
0101 
0102 
0103     FireResult Execute(size_t location_id);
0104 
0105     JEvent* Pull(size_t input_port, size_t location_id);
0106 
0107     void Push(OutputData& outputs, size_t output_count, size_t location_id);
0108 
0109 
0110     const std::string& GetName() { return m_name; }
0111     int GetId() { return m_id; }
0112     JLogger& GetLogger() { return m_logger; }
0113     bool IsParallel() { return m_is_parallel; }
0114     bool IsSource() { return m_is_source; }
0115     bool IsSink() { return m_is_sink; }
0116     int GetNextPortIndex() { return m_next_input_port; }
0117 
0118     void SetName(std::string name) { m_name = name; }
0119     void SetId(int id) { m_id = id; }
0120     void SetLogger(JLogger logger) { m_logger = logger; }
0121     void SetIsParallel(bool is_parallel) { m_is_parallel = is_parallel; }
0122     void SetIsSource(bool is_source) { m_is_source = is_source; }
0123     void SetIsSink(bool is_sink) { m_is_sink = is_sink; }
0124 
0125     Port& AddPort(std::string port_name, JEventLevel level, PortDirection direction);
0126     Port& GetPort(size_t port_index) { return *m_ports.at(port_index); }
0127     Port& GetPort(JEventLevel level, PortDirection direction) { return *m_ports.at(m_auto_port_lookup.at({level, direction})); }
0128 
0129     int GetPortIndex(JEventLevel level, PortDirection direction);
0130     int GetPortIndex(const std::string& port_name);
0131     void SetNextPortIndex(int input_port) { m_next_input_port = input_port; }
0132 
0133 };
0134 
0135 
0136 std::string ToString(JArrow::FireResult r);
0137 std::string ToString(JArrow::PortDirection d);