File indexing completed on 2026-06-17 07:50:56
0001 #include "JEventProcessorManagedPODIO.h"
0002
0003 #include <JANA/JApplication.h>
0004 #include <JANA/JApplicationFwd.h>
0005 #include <JANA/JEventSource.h>
0006 #include <JANA/Services/JComponentManager.h>
0007 #include <JANA/Utils/JTypeInfo.h>
0008 #include <errno.h>
0009 #include <fmt/format.h>
0010 #include <nlohmann/detail/json_ref.hpp>
0011 #include <nlohmann/json.hpp>
0012 #include <podio/Writer.h>
0013 #include <spdlog/logger.h>
0014 #include <zmq.h>
0015 #include <zmq.hpp>
0016 #include <algorithm>
0017 #include <cctype>
0018 #include <chrono>
0019 #include <cstring>
0020 #include <exception>
0021 #include <filesystem>
0022 #include <map>
0023 #include <stdexcept>
0024 #include <string_view>
0025 #include <thread>
0026 #include <utility>
0027 #include <vector>
0028
0029 #include "services/io/podio/JEventProcessorPODIO.h"
0030 #include "services/io/podio/JEventSourceManagedPODIO.h"
0031 #include "services/log/Log_service.h"
0032
0033 JEventProcessorManagedPODIO::JEventProcessorManagedPODIO() : JEventProcessorPODIO() {
0034 SetTypeName(NAME_OF_THIS);
0035
0036 japp->SetDefaultParameter("podio:managed_socket_path", m_socket_path,
0037 "UNIX socket path for managed PODIO processing");
0038 }
0039
0040 JEventProcessorManagedPODIO::~JEventProcessorManagedPODIO() {
0041 m_should_stop = true;
0042 if (m_listener_thread && m_listener_thread->joinable()) {
0043 m_listener_thread->join();
0044 }
0045 }
0046
0047 void JEventProcessorManagedPODIO::Init() {
0048 auto* app = GetApplication();
0049 m_log = app->GetService<Log_service>()->logger("JEventProcessorManagedPODIO");
0050
0051 m_log->info("Initializing managed PODIO processor with socket: {}", m_socket_path);
0052
0053
0054 try {
0055 m_zmq_context = std::make_unique<zmq::context_t>(1);
0056 m_zmq_socket = std::make_unique<zmq::socket_t>(*m_zmq_context, ZMQ_REP);
0057
0058
0059 if (std::filesystem::exists(m_socket_path)) {
0060 if (std::filesystem::is_socket(m_socket_path)) {
0061 std::filesystem::remove(m_socket_path);
0062 m_log->debug("Removed existing socket file: {}", m_socket_path);
0063 } else {
0064 throw std::runtime_error(fmt::format("Path exists but is not a socket: {}", m_socket_path));
0065 }
0066 }
0067
0068
0069 std::string bind_address = "ipc://" + m_socket_path;
0070 m_zmq_socket->bind(bind_address);
0071
0072 m_log->info("ZeroMQ socket bound to: {}", bind_address);
0073
0074
0075 m_listener_thread =
0076 std::make_unique<std::thread>(&JEventProcessorManagedPODIO::ListenForMessages, this);
0077
0078 } catch (const std::exception& e) {
0079 throw std::runtime_error(fmt::format("Failed to initialize ZeroMQ: {}", e.what()));
0080 }
0081
0082
0083 }
0084
0085 void JEventProcessorManagedPODIO::ListenForMessages() {
0086 m_log->info("Started listening for messages on socket: {}", m_socket_path);
0087
0088 while (!m_should_stop) {
0089 try {
0090
0091 {
0092 std::lock_guard<std::mutex> lock(m_file_mutex);
0093 if (m_queued_response.has_value()) {
0094 nlohmann::json response = std::move(*m_queued_response);
0095 m_queued_response.reset();
0096 SendResponse(response);
0097 m_awaiting_reply = false;
0098 continue;
0099 }
0100 }
0101
0102
0103
0104 if (m_awaiting_reply) {
0105 std::unique_lock<std::mutex> lock(m_file_mutex);
0106 m_response_cv.wait_for(lock, std::chrono::milliseconds(100), [this] {
0107 return m_queued_response.has_value() || m_should_stop.load();
0108 });
0109 continue;
0110 }
0111
0112
0113 zmq::pollitem_t items[] = {{*m_zmq_socket, 0, ZMQ_POLLIN, 0}};
0114 int rc = zmq::poll(items, 1, std::chrono::milliseconds(1000));
0115
0116 if (rc > 0 && (items[0].revents & ZMQ_POLLIN)) {
0117 zmq::message_t request;
0118 auto result = m_zmq_socket->recv(request, zmq::recv_flags::dontwait);
0119 if (result) {
0120 std::string request_str(static_cast<char*>(request.data()), request.size());
0121 m_log->debug("Received message: {}", request_str);
0122
0123 try {
0124 nlohmann::json request_json = nlohmann::json::parse(request_str);
0125 ProcessFileRequest(request_json);
0126 } catch (const std::exception& e) {
0127 m_log->error("Failed to parse JSON request: {}", e.what());
0128 SendResponse(
0129 {{"status", "error"}, {"message", fmt::format("Invalid JSON: {}", e.what())}});
0130 }
0131 }
0132 }
0133 } catch (const zmq::error_t& e) {
0134 if (e.num() != EAGAIN && e.num() != EINTR) {
0135 m_log->error("ZeroMQ error in listener: {}", e.what());
0136 }
0137 } catch (const std::exception& e) {
0138 m_log->error("Unexpected error in listener: {}", e.what());
0139 }
0140 }
0141
0142 m_log->info("Message listener thread stopped");
0143 }
0144
0145 void JEventProcessorManagedPODIO::ProcessFileRequest(const nlohmann::json& request) {
0146 try {
0147 if (!request.contains("input_file") || !request.contains("output_file")) {
0148 SendResponse({{"status", "error"},
0149 {"message", "Request must contain 'input_file' and 'output_file' fields"}});
0150 return;
0151 }
0152
0153 std::string input_file = request["input_file"];
0154 std::string output_file = request["output_file"];
0155
0156 m_log->info("Processing request: {} -> {}", input_file, output_file);
0157
0158
0159 if (!std::filesystem::exists(input_file)) {
0160 SendResponse({{"status", "error"},
0161 {"message", fmt::format("Input file does not exist: {}", input_file)}});
0162 return;
0163 }
0164
0165 {
0166 std::lock_guard<std::mutex> lock(m_file_mutex);
0167 m_current_input_file = input_file;
0168 m_current_output_file = output_file;
0169 m_events_processed = 0;
0170
0171
0172
0173
0174 m_collections_to_write.clear();
0175 std::destroy_at(&m_is_first_event);
0176 std::construct_at(&m_is_first_event);
0177 OpenOutputFile(output_file);
0178
0179
0180 m_file_processing_active = true;
0181 }
0182
0183
0184 NotifySourceNewFile(input_file);
0185
0186 m_log->info("Started processing file: {} -> {}", input_file, output_file);
0187
0188
0189
0190 if (GetNeventsInCurrentFile() == 0) {
0191 m_log->info("File has zero events, completing immediately");
0192 nlohmann::json response = CloseOutputFile();
0193 {
0194 std::lock_guard<std::mutex> lock(m_file_mutex);
0195 m_file_processing_active = false;
0196 }
0197 SendResponse(response);
0198 return;
0199 }
0200
0201
0202 m_awaiting_reply = true;
0203
0204 } catch (const std::exception& e) {
0205 m_log->error("Error processing file request: {}", e.what());
0206 SendResponse({{"status", "error"}, {"message", fmt::format("Processing error: {}", e.what())}});
0207 }
0208 }
0209
0210 void JEventProcessorManagedPODIO::SendResponse(const nlohmann::json& response) {
0211 try {
0212 std::string response_str = response.dump();
0213 zmq::message_t reply(response_str.size());
0214 memcpy(reply.data(), response_str.c_str(), response_str.size());
0215 auto sent = m_zmq_socket->send(reply, zmq::send_flags::none);
0216 if (!sent) {
0217 throw std::runtime_error("ZeroMQ send failed");
0218 }
0219 m_log->debug("Sent response: {}", response_str);
0220 } catch (const std::exception& e) {
0221 m_log->error("Failed to send response: {}", e.what());
0222 }
0223 }
0224
0225 void JEventProcessorManagedPODIO::QueueResponse(const nlohmann::json& response) {
0226 {
0227 std::lock_guard<std::mutex> lock(m_file_mutex);
0228 m_queued_response = response;
0229 }
0230 m_response_cv.notify_one();
0231 }
0232
0233 void JEventProcessorManagedPODIO::OpenOutputFile(const std::string& output_file) {
0234 std::string backend_lower = m_output_backend;
0235 std::transform(backend_lower.begin(), backend_lower.end(), backend_lower.begin(),
0236 [](unsigned char c) { return std::tolower(c); });
0237
0238 m_log->info("Opening output file: {} with backend: {}", output_file, backend_lower);
0239
0240 try {
0241 m_writer = std::make_unique<podio::Writer>(podio::makeWriter(output_file, backend_lower));
0242 } catch (const std::exception& e) {
0243 throw std::runtime_error(
0244 fmt::format("Failed to create writer for file '{}' with backend '{}': {}", output_file,
0245 m_output_backend, e.what()));
0246 }
0247 }
0248
0249 nlohmann::json JEventProcessorManagedPODIO::CloseOutputFile() {
0250 if (!m_writer) {
0251 return {{"status", "error"}, {"message", "No active writer to close"}};
0252 }
0253
0254 try {
0255
0256
0257
0258 auto* app = GetApplication();
0259 auto event_sources = app->GetService<JComponentManager>()->get_evt_srces();
0260 for (auto* source : event_sources) {
0261 auto* managed_source = dynamic_cast<JEventSourceManagedPODIO*>(source);
0262 if (managed_source == nullptr) {
0263 continue;
0264 }
0265 for (const auto& _category : managed_source->getAvailableCategories()) {
0266 std::string category{_category};
0267 if (category == "events") {
0268 continue;
0269 }
0270 std::size_t n = managed_source->getEntries(category);
0271 for (std::size_t i = 0; i < n; ++i) {
0272 m_writer->writeFrame(managed_source->getFrame(category, i), category);
0273 }
0274 m_log->info("Propagated {} '{}' frame(s) to output file", n, category);
0275 }
0276
0277
0278 managed_source->ResetReader();
0279 }
0280
0281 m_writer->finish();
0282 m_writer.reset();
0283
0284 std::string current_output_file;
0285 std::string current_input_file;
0286 {
0287 std::lock_guard<std::mutex> lock(m_file_mutex);
0288 current_output_file = m_current_output_file;
0289 current_input_file = m_current_input_file;
0290 }
0291
0292 m_log->info("Closed output file: {}", current_output_file);
0293
0294 return {{"status", "completed"},
0295 {"input_file", current_input_file},
0296 {"output_file", current_output_file},
0297 {"events_processed", m_events_processed.load()}};
0298
0299 } catch (const std::exception& e) {
0300 m_log->error("Error closing output file: {}", e.what());
0301 m_writer.reset();
0302 return {{"status", "error"}, {"message", fmt::format("Error closing file: {}", e.what())}};
0303 }
0304 }
0305
0306 void JEventProcessorManagedPODIO::Process(const std::shared_ptr<const JEvent>& event) {
0307 bool should_close = false;
0308 {
0309 std::lock_guard<std::mutex> lock(m_file_mutex);
0310
0311 if (!m_file_processing_active || !m_writer) {
0312 return;
0313 }
0314
0315
0316 JEventProcessorPODIO::Process(event);
0317
0318 m_events_processed++;
0319
0320
0321
0322 if (IsCurrentFileComplete()) {
0323 m_file_processing_active = false;
0324 should_close = true;
0325 }
0326 }
0327
0328
0329 if (should_close) {
0330 m_log->info("File processing completed, closing output file");
0331 nlohmann::json response = CloseOutputFile();
0332 QueueResponse(response);
0333 }
0334 }
0335
0336 void JEventProcessorManagedPODIO::Finish() {
0337 m_should_stop = true;
0338
0339 bool should_close_file = false;
0340 {
0341 std::lock_guard<std::mutex> lock(m_file_mutex);
0342 should_close_file = m_file_processing_active;
0343
0344
0345 m_file_processing_active = false;
0346 }
0347
0348 if (should_close_file) {
0349 CloseOutputFile();
0350 }
0351
0352 if (m_listener_thread && m_listener_thread->joinable()) {
0353 m_listener_thread->join();
0354 }
0355
0356
0357 if (std::filesystem::exists(m_socket_path) && std::filesystem::is_socket(m_socket_path)) {
0358 std::filesystem::remove(m_socket_path);
0359 m_log->debug("Cleaned up socket file: {}", m_socket_path);
0360 }
0361
0362 m_log->info("Managed PODIO processor finished");
0363 }
0364
0365 void JEventProcessorManagedPODIO::NotifySourceNewFile(const std::string& input_file) {
0366
0367 auto* app = GetApplication();
0368 auto event_sources = app->GetService<JComponentManager>()->get_evt_srces();
0369
0370 for (auto* source : event_sources) {
0371 auto* managed_source = dynamic_cast<JEventSourceManagedPODIO*>(source);
0372 if (managed_source != nullptr) {
0373 m_log->debug("Notifying managed source of new file: {}", input_file);
0374 managed_source->SetCurrentFile(input_file);
0375 break;
0376 }
0377 }
0378 }
0379
0380 bool JEventProcessorManagedPODIO::IsCurrentFileComplete() {
0381 auto* app = GetApplication();
0382 auto event_sources = app->GetService<JComponentManager>()->get_evt_srces();
0383
0384 for (auto* source : event_sources) {
0385 auto* managed_source = dynamic_cast<JEventSourceManagedPODIO*>(source);
0386 if (managed_source != nullptr) {
0387 return managed_source->IsFileProcessingComplete();
0388 }
0389 }
0390 return false;
0391 }
0392
0393 std::size_t JEventProcessorManagedPODIO::GetNeventsInCurrentFile() {
0394 auto* app = GetApplication();
0395 auto event_sources = app->GetService<JComponentManager>()->get_evt_srces();
0396
0397 for (auto* source : event_sources) {
0398 auto* managed_source = dynamic_cast<JEventSourceManagedPODIO*>(source);
0399 if (managed_source != nullptr) {
0400 return managed_source->GetNeventsInFile();
0401 }
0402 }
0403 return 0;
0404 }