File indexing completed on 2026-06-17 07:50:56
0001 #pragma once
0002
0003 #include <nlohmann/json.hpp>
0004 #include <nlohmann/json_fwd.hpp>
0005 #include <zmq.hpp>
0006 #include <atomic>
0007 #include <condition_variable>
0008 #include <cstddef>
0009 #include <memory>
0010 #include <mutex>
0011 #include <optional>
0012 #include <string>
0013 #include <thread>
0014
0015 #include "JEventProcessorPODIO.h"
0016
0017 class JEventProcessorManagedPODIO : public JEventProcessorPODIO {
0018
0019 public:
0020 JEventProcessorManagedPODIO();
0021 virtual ~JEventProcessorManagedPODIO();
0022
0023 void Init() override;
0024 void Process(const std::shared_ptr<const JEvent>& event) override;
0025 void Finish() override;
0026
0027 private:
0028 void ListenForMessages();
0029 void ProcessFileRequest(const nlohmann::json& request);
0030 void SendResponse(const nlohmann::json& response);
0031 void QueueResponse(const nlohmann::json& response);
0032 void OpenOutputFile(const std::string& output_file);
0033 nlohmann::json CloseOutputFile();
0034 void NotifySourceNewFile(const std::string& input_file);
0035 bool IsCurrentFileComplete();
0036 std::size_t GetNeventsInCurrentFile();
0037
0038
0039 std::unique_ptr<zmq::context_t> m_zmq_context;
0040 std::unique_ptr<zmq::socket_t> m_zmq_socket;
0041 std::string m_socket_path = "/tmp/eicrecon_managed.sock";
0042
0043 std::unique_ptr<std::thread> m_listener_thread;
0044 std::atomic<bool> m_should_stop{false};
0045
0046
0047 std::string m_current_input_file;
0048 std::string m_current_output_file;
0049 bool m_file_processing_active = false;
0050 std::mutex m_file_mutex;
0051
0052 std::atomic<std::size_t> m_events_processed{0};
0053
0054
0055 std::optional<nlohmann::json> m_queued_response;
0056 std::condition_variable m_response_cv;
0057
0058
0059 bool m_awaiting_reply = false;
0060 };