File indexing completed on 2025-01-18 10:17:19
0001
0002
0003
0004
0005
0006 #include <JANA/Streaming/JStreamingEventSource.h>
0007 #include <JANA/JEventSourceGeneratorT.h>
0008 #include <JANA/Utils/JBenchUtils.h>
0009
0010
0011 #include "RootProcessor.h"
0012 #include "MonitoringProcessor.h"
0013 #include "JFactoryGenerator_streamDet.h"
0014 #include "DecodeDASSource.h"
0015 #include "ADCSampleFactory.h"
0016 #include "INDRAMessage.h"
0017 #include "ZmqTransport.h"
0018
0019 void dummy_publisher_loop(JApplication* app) {
0020
0021 auto params = app->GetJParameterManager();
0022
0023 JBenchUtils bench_utils = JBenchUtils();
0024 size_t delay_ms = 1;
0025 auto logger = params->GetLogger("dummy_publisher_loop");
0026 bench_utils.set_seed(7, "InteractiveStreamingExample.cc:dummy_publisher_loop");
0027
0028 std::this_thread::yield();
0029
0030 LOG_INFO(logger) << "Starting producer loop" << LOG_END;
0031
0032 ZmqTransport transport {app->GetParameterValue<std::string>("streamDet:sub_socket"), true};
0033 transport.initialize();
0034
0035 DASEventMessage message(app);
0036
0037 size_t current_event_number = 1;
0038
0039 FILE* f = fopen(app->GetParameterValue<std::string>("streamDet:data_file").c_str(), "r");
0040 LOG_INFO(logger) << "streamDet::dummy_publisher_loop -> Reading data from file "
0041 << app->GetParameterValue<std::string>("streamDet:data_file").c_str() << LOG_END;
0042 if (f == nullptr) {
0043 LOG_FATAL(logger) << "Unable to open file, exiting." << LOG_END;
0044 exit(0);
0045 }
0046
0047 char* payload = nullptr;
0048 size_t payload_capacity;
0049 size_t payload_length;
0050 message.as_payload(&payload, &payload_length, &payload_capacity);
0051
0052 while (fread(payload, 1, payload_capacity, f) == payload_capacity) {
0053 message.as_indra_message()->source_id = 0;
0054 message.set_event_number(current_event_number++);
0055 message.set_payload_size(static_cast<uint32_t>(payload_capacity));
0056
0057 std::cout << "dummy_producer_loop: Sending '" << message << "' (" << message.get_buffer_size() << " bytes)" << std::endl;
0058 transport.send(message);
0059 bench_utils.consume_cpu_ms(delay_ms, 0, false);
0060 std::this_thread::yield();
0061 }
0062
0063
0064 message.set_payload_size(0);
0065 message.set_end_of_stream();
0066 transport.send(message);
0067 LOG_INFO(logger) << "Send: end-of-stream" << LOG_END;
0068
0069 }
0070
0071 extern "C" {
0072
0073 void InitPlugin(JApplication* app) {
0074
0075 InitJANAPlugin(app);
0076 auto logger = app->GetJParameterManager()->GetLogger("streamDet");
0077
0078 app->SetParameterValue("nthreads", 4);
0079 app->SetParameterValue("jana:extended_report", true);
0080 app->SetParameterValue("jana:max_inflight_events", 16);
0081
0082
0083
0084
0085 bool use_zmq = true;
0086 bool use_dummy_publisher = false;
0087 size_t nchannels = 80;
0088 size_t nsamples = 1024;
0089 size_t msg_print_freq = 10;
0090 std::string sub_socket_name = "tcp://127.0.0.1:5556";
0091 std::string pub_socket_name = "tcp://127.0.0.1:5557";
0092 std::string data_file_name = "run-5-mhz-80-chan-100-ev.dat";
0093
0094 LOG_INFO(logger) << "Subscribing to INDRA messages via ZMQ on socket " << sub_socket_name << LOG_END;
0095 LOG_DEBUG(logger) << "Publishing JObjects via ZMQ on socket " << pub_socket_name << LOG_END;
0096
0097 app->SetDefaultParameter("streamDet:use_zmq", use_zmq);
0098 app->SetDefaultParameter("streamDet:data_file", data_file_name);
0099 app->SetDefaultParameter("streamDet:use_dummy_publisher", use_dummy_publisher);
0100 app->SetDefaultParameter("streamDet:nchannels", nchannels);
0101 app->SetDefaultParameter("streamDet:nsamples", nsamples);
0102 app->SetDefaultParameter("streamDet:msg_print_freq", msg_print_freq);
0103 app->SetDefaultParameter("streamDet:sub_socket", sub_socket_name);
0104 app->SetDefaultParameter("streamDet:pub_socket", pub_socket_name);
0105
0106 if (use_zmq) {
0107 auto transport = std::unique_ptr<ZmqTransport>(new ZmqTransport(sub_socket_name));
0108 app->Add(new JStreamingEventSource<DASEventMessage>(std::move(transport)));
0109 if (use_dummy_publisher) {
0110 new std::thread(dummy_publisher_loop, app);
0111 }
0112 }
0113 else {
0114 app->Add(app->GetParameterValue<std::string>("streamDet:data_file"));
0115 app->Add(new JEventSourceGeneratorT<DecodeDASSource>());
0116 app->Add(new JFactoryGenerator_streamDet());
0117 }
0118
0119 app->Add(new RootProcessor());
0120 app->Add(new MonitoringProcessor());
0121 app->Add(new JFactoryGeneratorT<ADCSampleFactory>());
0122
0123 }
0124
0125 }
0126
0127