Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:35

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 #include <cstdint>
0007 #include <cstddef>
0008 #include <memory>
0009 #include <queue>
0010 
0011 #include <JANA/JEventSource.h>
0012 #include <JANA/Streaming/JTransport.h>
0013 
0014 /// JStreamingEventSource is a class template which simplifies streaming events into JANA.
0015 ///
0016 /// JStreamingEventSource makes it convenient to stream existing events into JANA by handling transport and message
0017 /// format as separate, orthogonal concerns. The user need only implement classes satisfying the JTransport and
0018 /// JEventMessage interfaces, and then they get a JStreamingEventSource 'for free'.
0019 ///
0020 /// JStreamingEventSource<T> is templated on some message type T, which is a subclass of JEventMessage.
0021 /// This is needed so that it can construct new message objects of the appropriate subclass.
0022 /// As a result, JStreamingEventSource<T> only emits JEvents containing JEventMessages of type T,
0023 /// which means that every message produced upstream must be compatible with that message format.
0024 /// This shouldn't be a problem: it is always possible to make the message format more flexible, and any associated
0025 /// complexity is fundamentally a property of the message format anyway. However, if we are using JStreamingEventSource,
0026 /// it is essential that each message corresponds to one JEvent.
0027 ///
0028 /// The JStreamingEventSource owns its JTransport, but passes ownership of each JMessage to its enclosing JEvent.
0029 
0030 template <class MessageT>
0031 class JStreamingEventSource : public JEventSource {
0032 
0033     std::unique_ptr<JTransport> m_transport;   ///< Pointer to underlying transport
0034     MessageT* m_next_item;     ///< An empty message buffer kept in reserve for when the next receive() succeeds
0035     size_t m_next_evt_nr = 1;  ///< If the event number is not encoded in the message payload, be able to assign one
0036 
0037 public:
0038 
0039     /// The constructor requires a unique pointer to a JTransport implementation. This is a reasonable assumption to
0040     /// make because each JEventSource already corresponds to some unique resource. JStreamingEventSource should be free
0041     /// to destroy its transport object whenever it likes, so try to keep the JTransport free of weird shared state.
0042 
0043     explicit JStreamingEventSource(std::unique_ptr<JTransport>&& transport)
0044         : JEventSource("JStreamingEventSource")
0045         , m_transport(std::move(transport))
0046         , m_next_item(nullptr)
0047     {
0048         SetCallbackStyle(CallbackStyle::ExpertMode);
0049     }
0050 
0051     /// Open delegates down to the transport, which will open a network socket or similar.
0052 
0053     void Open() override {
0054         m_transport->initialize();
0055     }
0056 
0057     /// GetEvent attempts to receive a JEventMessage. If it succeeds, it inserts it into the JEvent and sets
0058     /// the event number and run number appropriately.
0059 
0060     Result Emit(JEvent& event) override {
0061 
0062         if (m_next_item == nullptr) {
0063             m_next_item = new MessageT(GetApplication());
0064         }
0065 
0066         auto result = m_transport->receive(*m_next_item);
0067         switch (result) {
0068             case JTransport::Result::FINISHED:   return Result::FailureFinished;
0069             case JTransport::Result::TRY_AGAIN:  return Result::FailureTryAgain;
0070             case JTransport::Result::FAILURE:    throw JException("Transport failure");
0071             default:                             break;
0072         }
0073 
0074         // At this point, we know that item contains a valid JEventMessage
0075         MessageT* item = m_next_item;
0076         m_next_item = nullptr;
0077 
0078         size_t evt_nr = item->get_event_number();
0079         event.SetEventNumber(evt_nr == 0 ? m_next_evt_nr++ : evt_nr);
0080         event.SetRunNumber(item->get_run_number());
0081         event.Insert<MessageT>(item);
0082         std::cout << "JStreamingEventSource: Emitting " << *item << std::endl;
0083         return Result::Success;
0084     }
0085 
0086     static std::string GetDescription() {
0087         return "JStreamingEventSource";
0088     }
0089 };
0090