Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-17 07:50:56

0001 #include "JEventProcessorManagedPODIO.h"
0002 
0003 #include <JANA/JApplication.h>
0004 #include <JANA/JApplicationFwd.h>
0005 #include <JANA/JEventSource.h>
0006 #include <JANA/Services/JComponentManager.h>
0007 #include <JANA/Utils/JTypeInfo.h>
0008 #include <errno.h>
0009 #include <fmt/format.h>
0010 #include <nlohmann/detail/json_ref.hpp>
0011 #include <nlohmann/json.hpp>
0012 #include <podio/Writer.h>
0013 #include <spdlog/logger.h>
0014 #include <zmq.h>
0015 #include <zmq.hpp>
0016 #include <algorithm>
0017 #include <cctype>
0018 #include <chrono>
0019 #include <cstring>
0020 #include <exception>
0021 #include <filesystem>
0022 #include <map>
0023 #include <stdexcept>
0024 #include <string_view>
0025 #include <thread>
0026 #include <utility>
0027 #include <vector>
0028 
0029 #include "services/io/podio/JEventProcessorPODIO.h"
0030 #include "services/io/podio/JEventSourceManagedPODIO.h"
0031 #include "services/log/Log_service.h"
0032 
0033 JEventProcessorManagedPODIO::JEventProcessorManagedPODIO() : JEventProcessorPODIO() {
0034   SetTypeName(NAME_OF_THIS);
0035 
0036   japp->SetDefaultParameter("podio:managed_socket_path", m_socket_path,
0037                             "UNIX socket path for managed PODIO processing");
0038 }
0039 
0040 JEventProcessorManagedPODIO::~JEventProcessorManagedPODIO() {
0041   m_should_stop = true;
0042   if (m_listener_thread && m_listener_thread->joinable()) {
0043     m_listener_thread->join();
0044   }
0045 }
0046 
0047 void JEventProcessorManagedPODIO::Init() {
0048   auto* app = GetApplication();
0049   m_log     = app->GetService<Log_service>()->logger("JEventProcessorManagedPODIO");
0050 
0051   m_log->info("Initializing managed PODIO processor with socket: {}", m_socket_path);
0052 
0053   // Initialize ZeroMQ
0054   try {
0055     m_zmq_context = std::make_unique<zmq::context_t>(1);
0056     m_zmq_socket  = std::make_unique<zmq::socket_t>(*m_zmq_context, ZMQ_REP);
0057 
0058     // Remove existing socket file if it exists and is actually a socket
0059     if (std::filesystem::exists(m_socket_path)) {
0060       if (std::filesystem::is_socket(m_socket_path)) {
0061         std::filesystem::remove(m_socket_path);
0062         m_log->debug("Removed existing socket file: {}", m_socket_path);
0063       } else {
0064         throw std::runtime_error(fmt::format("Path exists but is not a socket: {}", m_socket_path));
0065       }
0066     }
0067 
0068     // Bind to UNIX socket
0069     std::string bind_address = "ipc://" + m_socket_path;
0070     m_zmq_socket->bind(bind_address);
0071 
0072     m_log->info("ZeroMQ socket bound to: {}", bind_address);
0073 
0074     // Start listener thread
0075     m_listener_thread =
0076         std::make_unique<std::thread>(&JEventProcessorManagedPODIO::ListenForMessages, this);
0077 
0078   } catch (const std::exception& e) {
0079     throw std::runtime_error(fmt::format("Failed to initialize ZeroMQ: {}", e.what()));
0080   }
0081 
0082   // Don't call parent Init() since we'll manage the writer ourselves
0083 }
0084 
0085 void JEventProcessorManagedPODIO::ListenForMessages() {
0086   m_log->info("Started listening for messages on socket: {}", m_socket_path);
0087 
0088   while (!m_should_stop) {
0089     try {
0090       // Drain any queued response deposited by the JANA thread.
0091       {
0092         std::lock_guard<std::mutex> lock(m_file_mutex);
0093         if (m_queued_response.has_value()) {
0094           nlohmann::json response = std::move(*m_queued_response);
0095           m_queued_response.reset();
0096           SendResponse(response);
0097           m_awaiting_reply = false;
0098           continue;
0099         }
0100       }
0101 
0102       // While the JANA thread is still working, wait instead of recv()ing
0103       // (ZMQ_REP requires strict recv-send alternation).
0104       if (m_awaiting_reply) {
0105         std::unique_lock<std::mutex> lock(m_file_mutex);
0106         m_response_cv.wait_for(lock, std::chrono::milliseconds(100), [this] {
0107           return m_queued_response.has_value() || m_should_stop.load();
0108         });
0109         continue;
0110       }
0111 
0112       // Poll for messages with timeout
0113       zmq::pollitem_t items[] = {{*m_zmq_socket, 0, ZMQ_POLLIN, 0}};
0114       int rc                  = zmq::poll(items, 1, std::chrono::milliseconds(1000));
0115 
0116       if (rc > 0 && (items[0].revents & ZMQ_POLLIN)) {
0117         zmq::message_t request;
0118         auto result = m_zmq_socket->recv(request, zmq::recv_flags::dontwait);
0119         if (result) {
0120           std::string request_str(static_cast<char*>(request.data()), request.size());
0121           m_log->debug("Received message: {}", request_str);
0122 
0123           try {
0124             nlohmann::json request_json = nlohmann::json::parse(request_str);
0125             ProcessFileRequest(request_json);
0126           } catch (const std::exception& e) {
0127             m_log->error("Failed to parse JSON request: {}", e.what());
0128             SendResponse(
0129                 {{"status", "error"}, {"message", fmt::format("Invalid JSON: {}", e.what())}});
0130           }
0131         }
0132       }
0133     } catch (const zmq::error_t& e) {
0134       if (e.num() != EAGAIN && e.num() != EINTR) {
0135         m_log->error("ZeroMQ error in listener: {}", e.what());
0136       }
0137     } catch (const std::exception& e) {
0138       m_log->error("Unexpected error in listener: {}", e.what());
0139     }
0140   }
0141 
0142   m_log->info("Message listener thread stopped");
0143 }
0144 
0145 void JEventProcessorManagedPODIO::ProcessFileRequest(const nlohmann::json& request) {
0146   try {
0147     if (!request.contains("input_file") || !request.contains("output_file")) {
0148       SendResponse({{"status", "error"},
0149                     {"message", "Request must contain 'input_file' and 'output_file' fields"}});
0150       return;
0151     }
0152 
0153     std::string input_file  = request["input_file"];
0154     std::string output_file = request["output_file"];
0155 
0156     m_log->info("Processing request: {} -> {}", input_file, output_file);
0157 
0158     // Check if input file exists
0159     if (!std::filesystem::exists(input_file)) {
0160       SendResponse({{"status", "error"},
0161                     {"message", fmt::format("Input file does not exist: {}", input_file)}});
0162       return;
0163     }
0164 
0165     {
0166       std::lock_guard<std::mutex> lock(m_file_mutex);
0167       m_current_input_file  = input_file;
0168       m_current_output_file = output_file;
0169       m_events_processed    = 0;
0170 
0171       // Reset per-file writer state and open the output file while holding
0172       // m_file_mutex so that no JANA worker thread can observe
0173       // m_file_processing_active==true before m_writer is fully initialised.
0174       m_collections_to_write.clear();
0175       std::destroy_at(&m_is_first_event);
0176       std::construct_at(&m_is_first_event);
0177       OpenOutputFile(output_file);
0178 
0179       // Signal readiness only after the writer is in place.
0180       m_file_processing_active = true;
0181     }
0182 
0183     // Signal the event source that a new file is available
0184     NotifySourceNewFile(input_file);
0185 
0186     m_log->info("Started processing file: {} -> {}", input_file, output_file);
0187 
0188     // Zero-event files must be completed here because JANA will never call
0189     // Process(), so the completion check there would never run.
0190     if (GetNeventsInCurrentFile() == 0) {
0191       m_log->info("File has zero events, completing immediately");
0192       nlohmann::json response = CloseOutputFile();
0193       {
0194         std::lock_guard<std::mutex> lock(m_file_mutex);
0195         m_file_processing_active = false;
0196       }
0197       SendResponse(response);
0198       return;
0199     }
0200 
0201     // Reply will arrive later via QueueResponse() from the JANA thread.
0202     m_awaiting_reply = true;
0203 
0204   } catch (const std::exception& e) {
0205     m_log->error("Error processing file request: {}", e.what());
0206     SendResponse({{"status", "error"}, {"message", fmt::format("Processing error: {}", e.what())}});
0207   }
0208 }
0209 
0210 void JEventProcessorManagedPODIO::SendResponse(const nlohmann::json& response) {
0211   try {
0212     std::string response_str = response.dump();
0213     zmq::message_t reply(response_str.size());
0214     memcpy(reply.data(), response_str.c_str(), response_str.size());
0215     auto sent = m_zmq_socket->send(reply, zmq::send_flags::none);
0216     if (!sent) {
0217       throw std::runtime_error("ZeroMQ send failed");
0218     }
0219     m_log->debug("Sent response: {}", response_str);
0220   } catch (const std::exception& e) {
0221     m_log->error("Failed to send response: {}", e.what());
0222   }
0223 }
0224 
0225 void JEventProcessorManagedPODIO::QueueResponse(const nlohmann::json& response) {
0226   {
0227     std::lock_guard<std::mutex> lock(m_file_mutex);
0228     m_queued_response = response;
0229   }
0230   m_response_cv.notify_one();
0231 }
0232 
0233 void JEventProcessorManagedPODIO::OpenOutputFile(const std::string& output_file) {
0234   std::string backend_lower = m_output_backend;
0235   std::transform(backend_lower.begin(), backend_lower.end(), backend_lower.begin(),
0236                  [](unsigned char c) { return std::tolower(c); });
0237 
0238   m_log->info("Opening output file: {} with backend: {}", output_file, backend_lower);
0239 
0240   try {
0241     m_writer = std::make_unique<podio::Writer>(podio::makeWriter(output_file, backend_lower));
0242   } catch (const std::exception& e) {
0243     throw std::runtime_error(
0244         fmt::format("Failed to create writer for file '{}' with backend '{}': {}", output_file,
0245                     m_output_backend, e.what()));
0246   }
0247 }
0248 
0249 nlohmann::json JEventProcessorManagedPODIO::CloseOutputFile() {
0250   if (!m_writer) {
0251     return {{"status", "error"}, {"message", "No active writer to close"}};
0252   }
0253 
0254   try {
0255     // Propagate non-"events" frames (e.g. "runs", "metadata") to the output
0256     // and then release the reader.  Emit() no longer resets m_reader on EOF
0257     // precisely so that this code can still read from it safely.
0258     auto* app          = GetApplication();
0259     auto event_sources = app->GetService<JComponentManager>()->get_evt_srces();
0260     for (auto* source : event_sources) {
0261       auto* managed_source = dynamic_cast<JEventSourceManagedPODIO*>(source);
0262       if (managed_source == nullptr) {
0263         continue;
0264       }
0265       for (const auto& _category : managed_source->getAvailableCategories()) {
0266         std::string category{_category};
0267         if (category == "events") {
0268           continue;
0269         }
0270         std::size_t n = managed_source->getEntries(category);
0271         for (std::size_t i = 0; i < n; ++i) {
0272           m_writer->writeFrame(managed_source->getFrame(category, i), category);
0273         }
0274         m_log->info("Propagated {} '{}' frame(s) to output file", n, category);
0275       }
0276       // Now that all frames are written, release the reader so it doesn't
0277       // hold the input file open until the next SetCurrentFile() call.
0278       managed_source->ResetReader();
0279     }
0280 
0281     m_writer->finish();
0282     m_writer.reset();
0283 
0284     std::string current_output_file;
0285     std::string current_input_file;
0286     {
0287       std::lock_guard<std::mutex> lock(m_file_mutex);
0288       current_output_file = m_current_output_file;
0289       current_input_file  = m_current_input_file;
0290     }
0291 
0292     m_log->info("Closed output file: {}", current_output_file);
0293 
0294     return {{"status", "completed"},
0295             {"input_file", current_input_file},
0296             {"output_file", current_output_file},
0297             {"events_processed", m_events_processed.load()}};
0298 
0299   } catch (const std::exception& e) {
0300     m_log->error("Error closing output file: {}", e.what());
0301     m_writer.reset();
0302     return {{"status", "error"}, {"message", fmt::format("Error closing file: {}", e.what())}};
0303   }
0304 }
0305 
0306 void JEventProcessorManagedPODIO::Process(const std::shared_ptr<const JEvent>& event) {
0307   bool should_close = false;
0308   {
0309     std::lock_guard<std::mutex> lock(m_file_mutex);
0310 
0311     if (!m_file_processing_active || !m_writer) {
0312       return; // No active file processing
0313     }
0314 
0315     // Call parent class implementation
0316     JEventProcessorPODIO::Process(event);
0317 
0318     m_events_processed++;
0319 
0320     // Check completion while we still hold the lock so that concurrent
0321     // Process() threads cannot race into a second close.
0322     if (IsCurrentFileComplete()) {
0323       m_file_processing_active = false;
0324       should_close             = true;
0325     }
0326   }
0327 
0328   // CloseOutputFile() acquires m_file_mutex internally, so call it outside our lock.
0329   if (should_close) {
0330     m_log->info("File processing completed, closing output file");
0331     nlohmann::json response = CloseOutputFile();
0332     QueueResponse(response);
0333   }
0334 }
0335 
0336 void JEventProcessorManagedPODIO::Finish() {
0337   m_should_stop = true;
0338 
0339   bool should_close_file = false;
0340   {
0341     std::lock_guard<std::mutex> lock(m_file_mutex);
0342     should_close_file = m_file_processing_active;
0343     // Clear the flag under the same lock so that a concurrent Process()
0344     // thread cannot also see it as true and race into a second close.
0345     m_file_processing_active = false;
0346   }
0347 
0348   if (should_close_file) {
0349     CloseOutputFile();
0350   }
0351 
0352   if (m_listener_thread && m_listener_thread->joinable()) {
0353     m_listener_thread->join();
0354   }
0355 
0356   // Clean up socket file if it exists and is a socket
0357   if (std::filesystem::exists(m_socket_path) && std::filesystem::is_socket(m_socket_path)) {
0358     std::filesystem::remove(m_socket_path);
0359     m_log->debug("Cleaned up socket file: {}", m_socket_path);
0360   }
0361 
0362   m_log->info("Managed PODIO processor finished");
0363 }
0364 
0365 void JEventProcessorManagedPODIO::NotifySourceNewFile(const std::string& input_file) {
0366   // Find the managed event source and notify it of the new file
0367   auto* app          = GetApplication();
0368   auto event_sources = app->GetService<JComponentManager>()->get_evt_srces();
0369 
0370   for (auto* source : event_sources) {
0371     auto* managed_source = dynamic_cast<JEventSourceManagedPODIO*>(source);
0372     if (managed_source != nullptr) {
0373       m_log->debug("Notifying managed source of new file: {}", input_file);
0374       managed_source->SetCurrentFile(input_file);
0375       break;
0376     }
0377   }
0378 }
0379 
0380 bool JEventProcessorManagedPODIO::IsCurrentFileComplete() {
0381   auto* app          = GetApplication();
0382   auto event_sources = app->GetService<JComponentManager>()->get_evt_srces();
0383 
0384   for (auto* source : event_sources) {
0385     auto* managed_source = dynamic_cast<JEventSourceManagedPODIO*>(source);
0386     if (managed_source != nullptr) {
0387       return managed_source->IsFileProcessingComplete();
0388     }
0389   }
0390   return false;
0391 }
0392 
0393 std::size_t JEventProcessorManagedPODIO::GetNeventsInCurrentFile() {
0394   auto* app          = GetApplication();
0395   auto event_sources = app->GetService<JComponentManager>()->get_evt_srces();
0396 
0397   for (auto* source : event_sources) {
0398     auto* managed_source = dynamic_cast<JEventSourceManagedPODIO*>(source);
0399     if (managed_source != nullptr) {
0400       return managed_source->GetNeventsInFile();
0401     }
0402   }
0403   return 0;
0404 }