File indexing completed on 2025-01-18 10:17:19
0001
0002
0003
0004
0005 #ifndef _INDRAMessage_h_
0006 #define _INDRAMessage_h_
0007
0008 #include <string>
0009 #include <chrono>
0010 #include <cstring>
0011 #include <sstream>
0012
0013 #include <JANA/JException.h>
0014 #include <JANA/JApplication.h>
0015 #include <JANA/Streaming/JMessage.h>
0016
0017
0018 struct INDRAMessage {
0019
0020 uint32_t source_id = 0;
0021 uint32_t total_bytes{};
0022 uint32_t payload_bytes{};
0023 uint32_t compressed_bytes{};
0024 uint32_t magic{};
0025 uint32_t format_version{};
0026 uint32_t flags{};
0027 uint64_t record_counter{};
0028 struct timespec timestamp{};
0029 uint32_t payload[];
0030
0031 };
0032
0033
0034 class DASEventMessage : public JEventMessage {
0035
0036 public:
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048 explicit DASEventMessage(JApplication *app) {
0049
0050
0051
0052
0053
0054 m_sample_count = app->GetParameterValue<size_t>("streamDet:nsamples");
0055 m_channel_count = app->GetParameterValue<size_t>("streamDet:nchannels");
0056
0057
0058 m_print_freq = app->GetParameterValue<size_t>("streamDet:msg_print_freq");
0059 m_sub_socket = app->GetParameterValue<std::string>("streamDet:sub_socket");
0060
0061
0062
0063
0064
0065
0066 m_buffer_capacity = sizeof(INDRAMessage) + 5 * (m_sample_count * m_channel_count);
0067 m_buffer = new char[m_buffer_capacity];
0068
0069 }
0070
0071
0072
0073 explicit DASEventMessage(size_t payload_bytes) {
0074 m_buffer_capacity = sizeof(INDRAMessage) + payload_bytes;
0075 m_buffer = new char[m_buffer_capacity];
0076 }
0077
0078 DASEventMessage(const DASEventMessage &) = delete;
0079
0080 DASEventMessage(DASEventMessage &&) = delete;
0081
0082 ~DASEventMessage() override {
0083 delete[] m_buffer;
0084 }
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096 size_t get_event_number() const override { return as_indra_message()->record_counter; }
0097 size_t get_run_number() const override { return 1; }
0098 bool is_end_of_stream() const override { return as_indra_message()->flags == 1; }
0099 char *as_buffer() override { return m_buffer; }
0100 const char *as_buffer() const override { return m_buffer; }
0101 size_t get_buffer_capacity() const override { return m_buffer_capacity; }
0102 size_t get_buffer_size() const override { return sizeof(INDRAMessage) + as_indra_message()->payload_bytes; }
0103
0104
0105
0106
0107
0108 void set_end_of_stream() { as_indra_message()->flags = 1; }
0109 void set_event_number(size_t event_number) { as_indra_message()->record_counter = event_number; }
0110 static void set_run_number(size_t ) { ; }
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127 INDRAMessage *as_indra_message() { return reinterpret_cast<INDRAMessage *>(m_buffer); }
0128 const INDRAMessage *as_indra_message() const { return reinterpret_cast<INDRAMessage *>(m_buffer); }
0129
0130
0131 void as_payload(const char **payload, size_t *payload_bytes) const {
0132
0133 *payload = m_buffer + sizeof(INDRAMessage);
0134 *payload_bytes = as_indra_message()->payload_bytes;
0135 }
0136 void as_payload(char **payload, size_t *payload_bytes, size_t *payload_capacity) {
0137
0138 *payload = m_buffer + sizeof(INDRAMessage);
0139 *payload_bytes = as_indra_message()->payload_bytes;
0140 *payload_capacity = m_buffer_capacity - sizeof(INDRAMessage);
0141 }
0142
0143
0144 void set_payload_size(uint32_t payload_bytes) {
0145
0146 if (payload_bytes > m_buffer_capacity) {
0147 throw JException("set_payload_size: desired size exceeds buffer capacity!");
0148 }
0149 as_indra_message()->payload_bytes = payload_bytes;
0150 }
0151
0152
0153 size_t get_sample_count() const { return m_sample_count; }
0154 size_t get_channel_count() const { return m_channel_count; }
0155 size_t get_message_print_freq() const { return m_print_freq; }
0156 std::string get_sub_socket() const { return m_sub_socket; }
0157
0158 private:
0159
0160 size_t m_sample_count{};
0161 size_t m_channel_count{};
0162 char *m_buffer;
0163 size_t m_buffer_capacity{};
0164 size_t m_print_freq{};
0165 std::string m_sub_socket{};
0166
0167 };
0168
0169
0170 inline std::ostream& operator<< (std::ostream& os, const DASEventMessage& message) {
0171
0172 std::stringstream ss;
0173 const char* payload;
0174 size_t length;
0175 message.as_payload(&payload, &length);
0176 size_t eventNum = message.get_event_number();
0177 size_t msgFreq = message.get_message_print_freq();
0178 size_t buffSize = message.get_buffer_size();
0179 std::string subSocket = message.get_sub_socket();
0180 ss << "INDRA Message received on " << subSocket
0181 << " -> Event " << eventNum
0182 << ", mess. freq. = " << msgFreq
0183 << ", buffer size = " << buffSize
0184 << ", payload = ";
0185 for (int i = 0; i < 10 && i < (int) length; ++i) {
0186 ss << payload[i] << ", ";
0187 }
0188 ss << "...";
0189 os << ss.str();
0190 return os;
0191 }
0192
0193 #endif