|
||||
File indexing completed on 2025-01-18 10:17:35
0001 0002 // Copyright 2020, Jefferson Science Associates, LLC. 0003 // Subject to the terms in the LICENSE file found in the top-level directory. 0004 0005 #pragma once 0006 #include <cstdint> 0007 #include <cstddef> 0008 #include <memory> 0009 #include <queue> 0010 0011 #include <JANA/JEventSource.h> 0012 #include <JANA/Streaming/JTransport.h> 0013 0014 /// JStreamingEventSource is a class template which simplifies streaming events into JANA. 0015 /// 0016 /// JStreamingEventSource makes it convenient to stream existing events into JANA by handling transport and message 0017 /// format as separate, orthogonal concerns. The user need only implement classes satisfying the JTransport and 0018 /// JEventMessage interfaces, and then they get a JStreamingEventSource 'for free'. 0019 /// 0020 /// JStreamingEventSource<T> is templated on some message type T, which is a subclass of JEventMessage. 0021 /// This is needed so that it can construct new message objects of the appropriate subclass. 0022 /// As a result, JStreamingEventSource<T> only emits JEvents containing JEventMessages of type T, 0023 /// which means that every message produced upstream must be compatible with that message format. 0024 /// This shouldn't be a problem: it is always possible to make the message format more flexible, and any associated 0025 /// complexity is fundamentally a property of the message format anyway. However, if we are using JStreamingEventSource, 0026 /// it is essential that each message corresponds to one JEvent. 0027 /// 0028 /// The JStreamingEventSource owns its JTransport, but passes ownership of each JMessage to its enclosing JEvent. 0029 0030 template <class MessageT> 0031 class JStreamingEventSource : public JEventSource { 0032 0033 std::unique_ptr<JTransport> m_transport; ///< Pointer to underlying transport 0034 MessageT* m_next_item; ///< An empty message buffer kept in reserve for when the next receive() succeeds 0035 size_t m_next_evt_nr = 1; ///< If the event number is not encoded in the message payload, be able to assign one 0036 0037 public: 0038 0039 /// The constructor requires a unique pointer to a JTransport implementation. This is a reasonable assumption to 0040 /// make because each JEventSource already corresponds to some unique resource. JStreamingEventSource should be free 0041 /// to destroy its transport object whenever it likes, so try to keep the JTransport free of weird shared state. 0042 0043 explicit JStreamingEventSource(std::unique_ptr<JTransport>&& transport) 0044 : JEventSource("JStreamingEventSource") 0045 , m_transport(std::move(transport)) 0046 , m_next_item(nullptr) 0047 { 0048 SetCallbackStyle(CallbackStyle::ExpertMode); 0049 } 0050 0051 /// Open delegates down to the transport, which will open a network socket or similar. 0052 0053 void Open() override { 0054 m_transport->initialize(); 0055 } 0056 0057 /// GetEvent attempts to receive a JEventMessage. If it succeeds, it inserts it into the JEvent and sets 0058 /// the event number and run number appropriately. 0059 0060 Result Emit(JEvent& event) override { 0061 0062 if (m_next_item == nullptr) { 0063 m_next_item = new MessageT(GetApplication()); 0064 } 0065 0066 auto result = m_transport->receive(*m_next_item); 0067 switch (result) { 0068 case JTransport::Result::FINISHED: return Result::FailureFinished; 0069 case JTransport::Result::TRY_AGAIN: return Result::FailureTryAgain; 0070 case JTransport::Result::FAILURE: throw JException("Transport failure"); 0071 default: break; 0072 } 0073 0074 // At this point, we know that item contains a valid JEventMessage 0075 MessageT* item = m_next_item; 0076 m_next_item = nullptr; 0077 0078 size_t evt_nr = item->get_event_number(); 0079 event.SetEventNumber(evt_nr == 0 ? m_next_evt_nr++ : evt_nr); 0080 event.SetRunNumber(item->get_run_number()); 0081 event.Insert<MessageT>(item); 0082 std::cout << "JStreamingEventSource: Emitting " << *item << std::endl; 0083 return Result::Success; 0084 } 0085 0086 static std::string GetDescription() { 0087 return "JStreamingEventSource"; 0088 } 0089 }; 0090
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |