File indexing completed on 2025-01-18 10:01:39
0001
0002
0003
0004
0005 #pragma once
0006
0007 #include <cstdint>
0008 #include <cstddef>
0009 #include <memory>
0010 #include <queue>
0011
0012 #include <JANA/JEvent.h>
0013 #include <JANA/JEventSource.h>
0014 #include <JANA/Streaming/JTransport.h>
0015 #include <JANA/Streaming/JTrigger.h>
0016
0017
0018
0019
0020
0021
0022 template <typename T>
0023 class JDiscreteJoin : public JEventSource {
0024 public:
0025
0026 JDiscreteJoin(std::unique_ptr<JTransport>&& transport,
0027 std::unique_ptr<JTrigger>&& trigger = std::unique_ptr<JTrigger>(new JTrigger()))
0028
0029 : JEventSource("JEventBuilder")
0030 , m_transport(std::move(transport))
0031 , m_trigger(std::move(trigger))
0032 {
0033 SetCallbackStyle(CallbackStyle::ExpertMode);
0034 }
0035
0036 void Open() override {
0037 m_transport->initialize();
0038 }
0039
0040 Result Emit(JEvent& event) override {
0041
0042 auto item = new T();
0043 auto result = m_transport->receive(*item);
0044 switch (result) {
0045 case JTransport::Result::FINISHED:
0046 return Result::FailureFinished;
0047 case JTransport::Result::TRY_AGAIN:
0048 return Result::FailureTryAgain;
0049 case JTransport::Result::FAILURE:
0050 throw JException("Transport failure!");
0051 }
0052
0053
0054 event.SetEventNumber(m_next_id);
0055 m_next_id += 1;
0056 event.Insert<T>(item);
0057 std::cout << "Emit: " << *item << std::endl;
0058 return Result::Success;
0059 }
0060
0061 static std::string GetDescription() {
0062 return "JEventBuilder";
0063 }
0064
0065
0066 private:
0067
0068 std::unique_ptr<JTransport> m_transport;
0069 std::unique_ptr<JTrigger> m_trigger;
0070 uint64_t m_delay_ms;
0071 uint64_t m_next_id = 0;
0072 };
0073
0074