Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-17 07:50:56

0001 #pragma once
0002 
0003 #include <nlohmann/json.hpp>
0004 #include <nlohmann/json_fwd.hpp>
0005 #include <zmq.hpp>
0006 #include <atomic>
0007 #include <condition_variable>
0008 #include <cstddef>
0009 #include <memory>
0010 #include <mutex>
0011 #include <optional>
0012 #include <string>
0013 #include <thread>
0014 
0015 #include "JEventProcessorPODIO.h"
0016 
0017 class JEventProcessorManagedPODIO : public JEventProcessorPODIO {
0018 
0019 public:
0020   JEventProcessorManagedPODIO();
0021   virtual ~JEventProcessorManagedPODIO();
0022 
0023   void Init() override;
0024   void Process(const std::shared_ptr<const JEvent>& event) override;
0025   void Finish() override;
0026 
0027 private:
0028   void ListenForMessages();
0029   void ProcessFileRequest(const nlohmann::json& request);
0030   void SendResponse(const nlohmann::json& response);  // listener thread only
0031   void QueueResponse(const nlohmann::json& response); // any thread; wakes listener
0032   void OpenOutputFile(const std::string& output_file);
0033   nlohmann::json CloseOutputFile();
0034   void NotifySourceNewFile(const std::string& input_file);
0035   bool IsCurrentFileComplete();
0036   std::size_t GetNeventsInCurrentFile();
0037 
0038   // ZeroMQ components
0039   std::unique_ptr<zmq::context_t> m_zmq_context;
0040   std::unique_ptr<zmq::socket_t> m_zmq_socket;
0041   std::string m_socket_path = "/tmp/eicrecon_managed.sock";
0042 
0043   std::unique_ptr<std::thread> m_listener_thread;
0044   std::atomic<bool> m_should_stop{false};
0045 
0046   // File management (protected by m_file_mutex)
0047   std::string m_current_input_file;
0048   std::string m_current_output_file;
0049   bool m_file_processing_active = false;
0050   std::mutex m_file_mutex;
0051   // Event counting for current file
0052   std::atomic<std::size_t> m_events_processed{0};
0053 
0054   // Response queue (protected by m_file_mutex; m_response_cv wakes the listener)
0055   std::optional<nlohmann::json> m_queued_response;
0056   std::condition_variable m_response_cv;
0057 
0058   // True between the listener's recv() and its matching send() (ZMQ_REP protocol)
0059   bool m_awaiting_reply = false;
0060 };