Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:01:39

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #pragma once
0007 
0008 #include <JANA/JEvent.h>
0009 #include <JANA/JEventSource.h>
0010 #include <JANA/Streaming/JTransport.h>
0011 #include <JANA/Streaming/JTrigger.h>
0012 #include <JANA/Streaming/JDiscreteJoin.h>
0013 #include <JANA/Streaming/JWindow.h>
0014 
0015 #include <cstdint>
0016 #include <cstddef>
0017 #include <memory>
0018 #include <queue>
0019 
0020 
0021 
0022 /// JEventBuilder pulls JMessages off of a user-specified JTransport, aggregates them into
0023 /// JEvents using the JWindow of their choice, and decides which to keep via a user-specified
0024 /// JTrigger.
0025 
0026 template <typename T>
0027 class JEventBuilder : public JEventSource {
0028 public:
0029 
0030     JEventBuilder(std::unique_ptr<JTransport>&& transport,
0031                   std::unique_ptr<JTrigger>&& trigger = std::unique_ptr<JTrigger>(new JTrigger()),
0032                   std::unique_ptr<JWindow<T>>&& window = std::unique_ptr<JSessionWindow<T>>(new JSessionWindow<T>()))
0033 
0034         : JEventSource("JEventBuilder")
0035         , m_transport(std::move(transport))
0036         , m_trigger(std::move(trigger))
0037         , m_window(std::move(window)) {
0038             SetCallbackStyle(CallbackStyle::ExpertMode);
0039     }
0040 
0041     void addJoin(std::unique_ptr<JDiscreteJoin<T>>&& join) {
0042         m_joins.push_back(std::move(join));
0043     }
0044 
0045     void Open() override {
0046         for (auto join : m_joins) {
0047             join->Open();
0048         }
0049     }
0050 
0051     static std::string GetDescription() {
0052         return "JEventBuilder";
0053     }
0054 
0055     Result Emit(JEvent& event) override {
0056 
0057         auto item = new T();  // This is why T requires a zero-arg ctor
0058         auto result = m_transport->receive(*item);
0059         switch (result) {
0060             case JTransport::Result::FINISHED:
0061                 return Result::FailureFinished;
0062             case JTransport::Result::TRY_AGAIN:
0063                 return Result::FailureTryAgain;
0064             case JTransport::Result::FAILURE:
0065                 throw JException("Transport failure!");
0066         }
0067         // At this point, we know that item contains a valid Sample<T>
0068 
0069         event.SetEventNumber(m_next_id);
0070         m_next_id += 1;
0071         event.Insert<T>(item);
0072 
0073         /// This is really bad because we have to worry about downstream HitSource returning TryAgainLater
0074         /// and we really don't want to block here
0075         for (auto join : m_joins) {
0076             join->GetEvent(event);
0077         }
0078         std::cout << "Emit: " << *item << std::endl;
0079         return Result::Success;
0080     }
0081 
0082 
0083 private:
0084     std::unique_ptr<JTransport> m_transport;
0085     std::unique_ptr<JWindow<T>> m_window;
0086     std::unique_ptr<JTrigger> m_trigger;
0087 
0088     // Downstream joins should probably be managed externally,
0089     // since we will want these with regular EventSources as well
0090     std::vector<std::unique_ptr<JDiscreteJoin<T>>> m_joins;
0091 
0092     uint64_t m_delay_ms;
0093     uint64_t m_next_id = 0;
0094 
0095 };
0096 
0097