Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:19

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #include <JANA/Streaming/JStreamingEventSource.h>
0007 #include <JANA/JEventSourceGeneratorT.h>
0008 #include <JANA/Utils/JBenchUtils.h>
0009 
0010 
0011 #include "RootProcessor.h"
0012 #include "MonitoringProcessor.h"
0013 #include "JFactoryGenerator_streamDet.h"
0014 #include "DecodeDASSource.h"
0015 #include "ADCSampleFactory.h"
0016 #include "INDRAMessage.h"
0017 #include "ZmqTransport.h"
0018 
0019 void dummy_publisher_loop(JApplication* app) {
0020 
0021     auto params = app->GetJParameterManager();
0022 
0023     JBenchUtils bench_utils = JBenchUtils();
0024     size_t delay_ms = 1;
0025     auto logger = params->GetLogger("dummy_publisher_loop");
0026     bench_utils.set_seed(7, "InteractiveStreamingExample.cc:dummy_publisher_loop");
0027 
0028     std::this_thread::yield();
0029     //std::this_thread::sleep_for(std::chrono::milliseconds(10));  // Wait for JANA to fire up so we don't lose data
0030     LOG_INFO(logger) << "Starting producer loop" << LOG_END;
0031 
0032     ZmqTransport transport {app->GetParameterValue<std::string>("streamDet:sub_socket"), true};
0033     transport.initialize();
0034 
0035     DASEventMessage message(app);
0036 
0037     size_t current_event_number = 1;
0038 
0039     FILE* f = fopen(app->GetParameterValue<std::string>("streamDet:data_file").c_str(), "r");
0040     LOG_INFO(logger) << "streamDet::dummy_publisher_loop -> Reading data from file "
0041               << app->GetParameterValue<std::string>("streamDet:data_file").c_str() << LOG_END;
0042     if (f == nullptr) {
0043         LOG_FATAL(logger) << "Unable to open file, exiting." << LOG_END;
0044         exit(0);
0045     }
0046 
0047     char* payload = nullptr;
0048     size_t payload_capacity;
0049     size_t payload_length;
0050     message.as_payload(&payload, &payload_length, &payload_capacity);
0051 
0052     while (fread(payload, 1, payload_capacity, f) == payload_capacity) {
0053         message.as_indra_message()->source_id = 0;
0054         message.set_event_number(current_event_number++);
0055         message.set_payload_size(static_cast<uint32_t>(payload_capacity));
0056         //LOG_DEBUG(logger) << "Send: " << message << " (" << message.get_buffer_size() << " bytes)" << LOG_END;
0057         std::cout << "dummy_producer_loop: Sending '" << message << "' (" << message.get_buffer_size() << " bytes)" << std::endl;
0058         transport.send(message);
0059         bench_utils.consume_cpu_ms(delay_ms, 0, false);
0060         std::this_thread::yield();
0061     }
0062 
0063     // Send an empty end-of-stream message
0064     message.set_payload_size(0);
0065     message.set_end_of_stream();
0066     transport.send(message);
0067     LOG_INFO(logger) << "Send: end-of-stream" << LOG_END;
0068 
0069 }
0070 
0071 extern "C" {
0072 
0073 void InitPlugin(JApplication* app) {
0074 
0075     InitJANAPlugin(app);
0076     auto logger = app->GetJParameterManager()->GetLogger("streamDet");
0077 
0078     app->SetParameterValue("nthreads", 4);
0079     app->SetParameterValue("jana:extended_report", true);
0080     app->SetParameterValue("jana:max_inflight_events", 16);
0081 
0082     // TODO: Consider making streamDet:sub_socket be the 'source_name', and use JESG to switch between JSES and DecodeDASSource
0083     // TODO: Improve parametermanager interface
0084 
0085     bool use_zmq = true;
0086     bool use_dummy_publisher = false;
0087     size_t nchannels = 80;
0088     size_t nsamples = 1024;
0089     size_t msg_print_freq = 10;
0090     std::string sub_socket_name = "tcp://127.0.0.1:5556";
0091     std::string pub_socket_name = "tcp://127.0.0.1:5557";
0092     std::string data_file_name = "run-5-mhz-80-chan-100-ev.dat";
0093 
0094     LOG_INFO(logger) << "Subscribing to INDRA messages via ZMQ on socket " << sub_socket_name << LOG_END;
0095     LOG_DEBUG(logger) << "Publishing JObjects via ZMQ on socket " << pub_socket_name << LOG_END;
0096 
0097     app->SetDefaultParameter("streamDet:use_zmq", use_zmq);
0098     app->SetDefaultParameter("streamDet:data_file", data_file_name);
0099     app->SetDefaultParameter("streamDet:use_dummy_publisher", use_dummy_publisher);
0100     app->SetDefaultParameter("streamDet:nchannels", nchannels);
0101     app->SetDefaultParameter("streamDet:nsamples", nsamples);
0102     app->SetDefaultParameter("streamDet:msg_print_freq", msg_print_freq);
0103     app->SetDefaultParameter("streamDet:sub_socket", sub_socket_name);
0104     app->SetDefaultParameter("streamDet:pub_socket", pub_socket_name);
0105 
0106     if (use_zmq) {
0107         auto transport = std::unique_ptr<ZmqTransport>(new ZmqTransport(sub_socket_name));
0108         app->Add(new JStreamingEventSource<DASEventMessage>(std::move(transport)));
0109         if (use_dummy_publisher) {
0110             new std::thread(dummy_publisher_loop, app);
0111         }
0112     }
0113     else {
0114         app->Add(app->GetParameterValue<std::string>("streamDet:data_file"));
0115         app->Add(new JEventSourceGeneratorT<DecodeDASSource>());
0116         app->Add(new JFactoryGenerator_streamDet());
0117     }
0118 
0119     app->Add(new RootProcessor());
0120     app->Add(new MonitoringProcessor());
0121     app->Add(new JFactoryGeneratorT<ADCSampleFactory>());
0122 
0123 }
0124 
0125 } // "C"
0126 
0127