File indexing completed on 2025-01-18 10:01:39
0001
0002
0003
0004
0005
0006 #pragma once
0007
0008 #include <JANA/JEvent.h>
0009 #include <JANA/JEventSource.h>
0010 #include <JANA/Streaming/JTransport.h>
0011 #include <JANA/Streaming/JTrigger.h>
0012 #include <JANA/Streaming/JDiscreteJoin.h>
0013 #include <JANA/Streaming/JWindow.h>
0014
0015 #include <cstdint>
0016 #include <cstddef>
0017 #include <memory>
0018 #include <queue>
0019
0020
0021
0022
0023
0024
0025
0026 template <typename T>
0027 class JEventBuilder : public JEventSource {
0028 public:
0029
0030 JEventBuilder(std::unique_ptr<JTransport>&& transport,
0031 std::unique_ptr<JTrigger>&& trigger = std::unique_ptr<JTrigger>(new JTrigger()),
0032 std::unique_ptr<JWindow<T>>&& window = std::unique_ptr<JSessionWindow<T>>(new JSessionWindow<T>()))
0033
0034 : JEventSource("JEventBuilder")
0035 , m_transport(std::move(transport))
0036 , m_trigger(std::move(trigger))
0037 , m_window(std::move(window)) {
0038 SetCallbackStyle(CallbackStyle::ExpertMode);
0039 }
0040
0041 void addJoin(std::unique_ptr<JDiscreteJoin<T>>&& join) {
0042 m_joins.push_back(std::move(join));
0043 }
0044
0045 void Open() override {
0046 for (auto join : m_joins) {
0047 join->Open();
0048 }
0049 }
0050
0051 static std::string GetDescription() {
0052 return "JEventBuilder";
0053 }
0054
0055 Result Emit(JEvent& event) override {
0056
0057 auto item = new T();
0058 auto result = m_transport->receive(*item);
0059 switch (result) {
0060 case JTransport::Result::FINISHED:
0061 return Result::FailureFinished;
0062 case JTransport::Result::TRY_AGAIN:
0063 return Result::FailureTryAgain;
0064 case JTransport::Result::FAILURE:
0065 throw JException("Transport failure!");
0066 }
0067
0068
0069 event.SetEventNumber(m_next_id);
0070 m_next_id += 1;
0071 event.Insert<T>(item);
0072
0073
0074
0075 for (auto join : m_joins) {
0076 join->GetEvent(event);
0077 }
0078 std::cout << "Emit: " << *item << std::endl;
0079 return Result::Success;
0080 }
0081
0082
0083 private:
0084 std::unique_ptr<JTransport> m_transport;
0085 std::unique_ptr<JWindow<T>> m_window;
0086 std::unique_ptr<JTrigger> m_trigger;
0087
0088
0089
0090 std::vector<std::unique_ptr<JDiscreteJoin<T>>> m_joins;
0091
0092 uint64_t m_delay_ms;
0093 uint64_t m_next_id = 0;
0094
0095 };
0096
0097