File indexing completed on 2025-01-18 10:17:19
0001
0002
0003
0004
0005 #include "MonitoringProcessor.h"
0006 #include "ADCSample.h"
0007
0008
0009
0010
0011 MonitoringProcessor::MonitoringProcessor() = default;
0012
0013
0014
0015
0016 MonitoringProcessor::~MonitoringProcessor() {
0017 delete m_transport;
0018 delete m_message;
0019 }
0020
0021
0022
0023
0024 void MonitoringProcessor::Init() {
0025
0026
0027
0028 auto app = GetApplication();
0029 m_message = new DASEventMessage(app);
0030 m_pub_socket = app->GetParameterValue<std::string>("streamDet:pub_socket");
0031 m_transport = new ZmqTransport(m_pub_socket, true);
0032 m_transport->initialize();
0033 std::cout << "MonitoringProccessor::Init -> Initialized ZMQ sink on socket " << m_pub_socket << std::endl;
0034 }
0035
0036
0037
0038
0039 void MonitoringProcessor::Process(const std::shared_ptr<const JEvent>& aEvent) {
0040
0041 auto oriMessage = aEvent->GetSingle<DASEventMessage>();
0042 std::lock_guard<std::mutex> lck(msgMutex);
0043 auto result = m_transport->send(*oriMessage);
0044 if (result == JTransport::SUCCESS) {
0045 std::cout << "MonitoringProcessor::Process: Success sending " << *oriMessage << std::endl;
0046 }
0047 else {
0048 std::cout << "MonitoringProcessor::Process: Failure sending " << *oriMessage << std::endl;
0049 }
0050 size_t eventNum = oriMessage->get_event_number();
0051 size_t buffSize = oriMessage->get_buffer_size();
0052 size_t msgFreq = oriMessage->get_message_print_freq();
0053 if (eventNum % msgFreq == 0) {
0054 std::cout << "MonitoringProcessor::Process -> Published event " << eventNum
0055 << " on socket " << m_pub_socket
0056 << " with buffer size = " << buffSize << " bytes" << std::endl;
0057 }
0058 }
0059
0060
0061
0062
0063 void MonitoringProcessor::Finish() {
0064
0065 }