Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:19

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #ifndef _INDRAMessage_h_
0006 #define _INDRAMessage_h_
0007 
0008 #include <string>
0009 #include <chrono>
0010 #include <cstring>
0011 #include <sstream>
0012 
0013 #include <JANA/JException.h>
0014 #include <JANA/JApplication.h>
0015 #include <JANA/Streaming/JMessage.h>
0016 
0017 // INDRAMessage should be exactly the same as INDRA_Stream_Test's stream_buffer struct
0018 struct INDRAMessage {
0019 
0020     uint32_t source_id = 0;
0021     uint32_t total_bytes{};
0022     uint32_t payload_bytes{};
0023     uint32_t compressed_bytes{};
0024     uint32_t magic{};
0025     uint32_t format_version{};
0026     uint32_t flags{};
0027     uint64_t record_counter{};
0028     struct timespec timestamp{};
0029     uint32_t payload[];
0030 
0031 };
0032 
0033 
0034 class DASEventMessage : public JEventMessage {
0035 
0036 public:
0037 
0038     ////////////////////////////////////////////////////////////////////////////////////////
0039     /// DASEventMessage constructor/destructor
0040     ///
0041     /// These enforce the following invariant:
0042     ///  - A buffer of fixed capacity (determinable at startup time) is allocated
0043     ///  - Its total capacity is always known and never changes
0044     ///  - It is released when the message is destroyed
0045     ///  - The size of the valid data contained within is NOT protected, because it is a C-style array.
0046     ////////////////////////////////////////////////////////////////////////////////////////
0047 
0048     explicit DASEventMessage(JApplication *app) {
0049 
0050         // TODO: Right now we have to default these in the InitPlugin, as otherwise they won't show up in the table
0051         //       Consider this use case in the future when we try to clean up how we get our parameters
0052 
0053         // Extract any parameters needed to figure out the buffer size
0054         m_sample_count  = app->GetParameterValue<size_t>("streamDet:nsamples");
0055         m_channel_count = app->GetParameterValue<size_t>("streamDet:nchannels");
0056 
0057         // extract parameters useful for printing messages to screen
0058         m_print_freq    = app->GetParameterValue<size_t>("streamDet:msg_print_freq");
0059         m_sub_socket    = app->GetParameterValue<std::string>("streamDet:sub_socket");
0060 
0061         // Allocate the buffer. The buffer size must be determinable at startup time and
0062         // constant for the life of the program. If we are a consumer, the buffer should be large
0063         // enough to accept any message emitted from our producer. This object won't ever reallocate the buffer.
0064         // The factor five comes from the 4 ADC bytes per sample plus the 1 space delimiter or newline character
0065         // Each buffer contains 1024 samples (single readout window) from 80 channels, which is dubbed an 'event'
0066         m_buffer_capacity = sizeof(INDRAMessage) + 5 * (m_sample_count * m_channel_count);
0067         m_buffer = new char[m_buffer_capacity];
0068 
0069     }
0070 
0071     /// This constructor won't be used by JStreamingEventSource, but will instead be used when DASEventMessages are
0072     /// 'manually' created by DummyProducers and JEventProcessors
0073     explicit DASEventMessage(size_t payload_bytes) {
0074         m_buffer_capacity = sizeof(INDRAMessage) + payload_bytes;
0075         m_buffer = new char[m_buffer_capacity];
0076     }
0077 
0078     DASEventMessage(const DASEventMessage &) = delete;
0079 
0080     DASEventMessage(DASEventMessage &&)  = delete;
0081 
0082     ~DASEventMessage() override {
0083         delete[] m_buffer;
0084     }
0085 
0086     ////////////////////////////////////////////////////////////////////////////////////////
0087     /// Everything in this section is used by JStreamingEventSource in order to figure out
0088     /// how to emit the message as an Event.
0089     ///
0090     /// Exposing a byte array representation is necessary to acquire new messages from some
0091     /// arbitrary JTransport. We distinguish between the buffer size (bytes of memory reserved)
0092     /// and the data size (bytes of memory containing useful data). Buffer size is invariant;
0093     /// data size is not, and relies on the user correctly setting a payload_length parameter
0094     /// somewhere in the message.
0095 
0096     size_t get_event_number() const override { return as_indra_message()->record_counter; }
0097     size_t get_run_number() const override { return 1; }
0098     bool is_end_of_stream() const override { return as_indra_message()->flags == 1; }
0099     char *as_buffer() override { return m_buffer; }
0100     const char *as_buffer() const override { return m_buffer; }
0101     size_t get_buffer_capacity() const override { return m_buffer_capacity; }
0102     size_t get_buffer_size() const override { return sizeof(INDRAMessage) + as_indra_message()->payload_bytes; }
0103 
0104     ////////////////////////////////////////////////////////////////////////////////////////
0105     /// The following setters are NOT required by JStreamingEventSource, but useful for writing producers.
0106     /// It is always advisable to put the code for the setters close to the code for the getters.
0107 
0108     void set_end_of_stream() { as_indra_message()->flags = 1; }
0109     void set_event_number(size_t event_number) { as_indra_message()->record_counter = event_number; }
0110     static void set_run_number(size_t /* run_number */) { ; }
0111 
0112     ////////////////////////////////////////////////////////////////////////////////////////
0113     /// Everything in this section is used by user-defined JFactories and JEventProcessors to access
0114     /// whatever data was sent. This is a 'view' into the buffer which you can define however you like.
0115     ///
0116     /// The 'simple' way to do this is to simply cast the buffer as the correct type (in this case, INDRAMessage.)
0117     /// The 'correct' way to to do this is to write getters which memcpy data out of the
0118     /// buffer from the correct offsets and convert the endianness appropriately.
0119     ///
0120     /// INDRA_Stream_Test defines the payload as a flexible array member of type uint32.
0121     /// For now, we would much rather access it as an array of char. We do NOT muck with the INDRAMessage type --
0122     /// if we define it differently from the producer code, the compiler might align/pad it differently,
0123     /// and all our data gets corrupted. Instead, we write a 'as_payload' method which reinterprets the 'payload'
0124     /// region of the buffer as a char*, and converts the sizes (measured in counts of uint32_t) to and from bytes.
0125 
0126     /// Grants read/write access to any INDRAMessage members directly
0127     INDRAMessage *as_indra_message() { return reinterpret_cast<INDRAMessage *>(m_buffer); }
0128     const INDRAMessage *as_indra_message() const { return reinterpret_cast<INDRAMessage *>(m_buffer); }
0129 
0130     /// Grants read-only access to the message payload as a byte array, which we need because INDRAMessage uses uint32_t instead
0131     void as_payload(const char **payload, size_t *payload_bytes) const {
0132 
0133         *payload = m_buffer + sizeof(INDRAMessage);
0134         *payload_bytes = as_indra_message()->payload_bytes;
0135     }
0136     void as_payload(char **payload, size_t *payload_bytes, size_t *payload_capacity) {
0137 
0138         *payload = m_buffer + sizeof(INDRAMessage);
0139         *payload_bytes = as_indra_message()->payload_bytes;
0140         *payload_capacity = m_buffer_capacity - sizeof(INDRAMessage);
0141     }
0142 
0143     /// Sets a payload size, measured in bytes
0144     void set_payload_size(uint32_t payload_bytes) {
0145 
0146         if (payload_bytes > m_buffer_capacity) {
0147             throw JException("set_payload_size: desired size exceeds buffer capacity!");
0148         }
0149         as_indra_message()->payload_bytes = payload_bytes;
0150     }
0151 
0152     /// Conveniently access message properties
0153     size_t get_sample_count() const { return m_sample_count; }
0154     size_t get_channel_count() const { return m_channel_count; }
0155     size_t get_message_print_freq() const { return m_print_freq; }
0156     std::string get_sub_socket() const { return m_sub_socket; }
0157 
0158 private:
0159 
0160     size_t m_sample_count{};
0161     size_t m_channel_count{};
0162     char *m_buffer;
0163     size_t m_buffer_capacity{};
0164     size_t m_print_freq{};
0165     std::string m_sub_socket{};
0166 
0167 };
0168 
0169 /// Conveniently print a one-line summary of any DASEventMessage for debugging
0170 inline std::ostream& operator<< (std::ostream& os, const DASEventMessage& message) {
0171 
0172     std::stringstream ss;
0173     const char* payload;
0174     size_t length;
0175     message.as_payload(&payload, &length);
0176     size_t eventNum  = message.get_event_number();
0177     size_t msgFreq   = message.get_message_print_freq();
0178     size_t buffSize  = message.get_buffer_size();
0179     std::string subSocket = message.get_sub_socket();
0180     ss << "INDRA Message received on " << subSocket
0181        << " -> Event " << eventNum
0182        << ", mess. freq. = " << msgFreq
0183        << ", buffer size = " << buffSize
0184        << ", payload = ";
0185     for (int i = 0; i < 10 && i < (int) length; ++i) {
0186         ss << payload[i] << ", ";
0187     }
0188     ss << "...";
0189     os << ss.str();
0190     return os;
0191 }
0192 
0193 #endif // _INDRAMessage_h_