File indexing completed on 2025-11-03 09:51:14
0001 
0002 
0003 
0004 
0005 
0006 #pragma once
0007 
0008 #include <JANA/JEvent.h>
0009 #include <JANA/JEventSource.h>
0010 #include <JANA/Streaming/JTransport.h>
0011 #include <JANA/Streaming/JTrigger.h>
0012 #include <JANA/Streaming/JDiscreteJoin.h>
0013 #include <JANA/Streaming/JWindow.h>
0014 
0015 #include <cstdint>
0016 #include <cstddef>
0017 #include <memory>
0018 #include <queue>
0019 
0020 
0021 
0022 
0023 
0024 
0025 
0026 template <typename T>
0027 class JEventBuilder : public JEventSource {
0028 public:
0029 
0030     JEventBuilder(std::unique_ptr<JTransport>&& transport,
0031                   std::unique_ptr<JTrigger>&& trigger = std::unique_ptr<JTrigger>(new JTrigger()),
0032                   std::unique_ptr<JWindow<T>>&& window = std::unique_ptr<JSessionWindow<T>>(new JSessionWindow<T>()))
0033 
0034         : JEventSource("JEventBuilder")
0035         , m_transport(std::move(transport))
0036         , m_trigger(std::move(trigger))
0037         , m_window(std::move(window)) {
0038             SetCallbackStyle(CallbackStyle::ExpertMode);
0039     }
0040 
0041     void addJoin(std::unique_ptr<JDiscreteJoin<T>>&& join) {
0042         m_joins.push_back(std::move(join));
0043     }
0044 
0045     void Open() override {
0046         for (auto join : m_joins) {
0047             join->Open();
0048         }
0049     }
0050 
0051     static std::string GetDescription() {
0052         return "JEventBuilder";
0053     }
0054 
0055     Result Emit(JEvent& event) override {
0056 
0057         auto item = new T();  
0058         auto result = m_transport->receive(*item);
0059         switch (result) {
0060             case JTransport::Result::FINISHED:
0061                 return Result::FailureFinished;
0062             case JTransport::Result::TRY_AGAIN:
0063                 return Result::FailureTryAgain;
0064             case JTransport::Result::FAILURE:
0065                 throw JException("Transport failure!");
0066         }
0067         
0068 
0069         event.SetEventNumber(m_next_id);
0070         m_next_id += 1;
0071         event.Insert<T>(item);
0072 
0073         
0074         
0075         for (auto join : m_joins) {
0076             join->GetEvent(event);
0077         }
0078         std::cout << "Emit: " << *item << std::endl;
0079         return Result::Success;
0080     }
0081 
0082 
0083 private:
0084     std::unique_ptr<JTransport> m_transport;
0085     std::unique_ptr<JWindow<T>> m_window;
0086     std::unique_ptr<JTrigger> m_trigger;
0087 
0088     
0089     
0090     std::vector<std::unique_ptr<JDiscreteJoin<T>>> m_joins;
0091 
0092     uint64_t m_delay_ms;
0093     uint64_t m_next_id = 0;
0094 
0095 };
0096 
0097