File indexing completed on 2026-06-17 07:50:56
0001 #include "JEventSourceManagedPODIO.h"
0002
0003 #include <JANA/JApplication.h>
0004 #include <JANA/Utils/JTypeInfo.h>
0005 #include <fmt/format.h>
0006 #include <podio/Reader.h>
0007 #include <spdlog/logger.h>
0008 #include <exception>
0009 #include <filesystem>
0010 #include <memory>
0011 #include <stdexcept>
0012
0013 #include "services/io/podio/JEventSourcePODIO.h"
0014 #include "services/log/Log_service.h"
0015
0016 JEventSourceManagedPODIO::JEventSourceManagedPODIO(std::string resource_name, JApplication* app)
0017 : JEventSourcePODIO(resource_name, app) {
0018 SetTypeName(NAME_OF_THIS);
0019
0020 m_log = GetApplication()->GetService<Log_service>()->logger("JEventSourceManagedPODIO");
0021 }
0022
0023 JEventSourceManagedPODIO::~JEventSourceManagedPODIO() {}
0024
0025 void JEventSourceManagedPODIO::Open() {
0026 m_log->info("Opening managed PODIO source - waiting for file requests");
0027 }
0028
0029 void JEventSourceManagedPODIO::Close() {
0030 m_log->info("Closing Managed Event Source");
0031 m_closing = true;
0032 m_file_cv.notify_all();
0033 }
0034
0035 JEventSourceManagedPODIO::Result JEventSourceManagedPODIO::Emit(JEvent& event) {
0036 std::unique_lock<std::mutex> lock(m_file_mutex);
0037
0038 while (!m_file_available || !m_reader) {
0039 if (m_closing) {
0040 return Result::FailureFinished;
0041 }
0042 m_log->info("Waiting for the next file...");
0043 m_file_cv.wait(lock, [this] { return m_file_available.load() || m_closing.load(); });
0044 }
0045
0046 if (m_closing) {
0047 return Result::FailureFinished;
0048 }
0049
0050
0051 if (Nevents_read >= Nevents_in_file) {
0052 m_log->info("No more events available in current file, waiting for next file");
0053 m_file_processing_complete = true;
0054 m_file_available = false;
0055 return Result::FailureTryAgain;
0056 }
0057
0058
0059 Result result = JEventSourcePODIO::Emit(event);
0060
0061
0062 if (Nevents_read >= Nevents_in_file) {
0063 m_log->info("Finished reading all events from file: {}", m_current_input_file);
0064 m_file_processing_complete = true;
0065 }
0066
0067 return result;
0068 }
0069
0070 std::string JEventSourceManagedPODIO::GetDescription() {
0071 return "Managed PODIO source (waits for external file requests)";
0072 }
0073
0074 void JEventSourceManagedPODIO::SetCurrentFile(const std::string& input_file) {
0075 std::lock_guard<std::mutex> lock(m_file_mutex);
0076
0077 m_current_input_file = input_file;
0078
0079 m_file_processing_complete = false;
0080
0081 try {
0082 m_log->info("Opening file for processing: {}", m_current_input_file);
0083
0084
0085 if (!std::filesystem::exists(m_current_input_file)) {
0086 throw std::runtime_error(fmt::format("Input file does not exist: {}", m_current_input_file));
0087 }
0088
0089
0090 m_use_event_headers = true;
0091
0092
0093 SetResourceName(m_current_input_file);
0094 JEventSourcePODIO::Open();
0095
0096 Nevents_in_file = m_reader->getEntries("events");
0097 Nevents_read = 0;
0098
0099 m_log->info("Opened PODIO file \"{}\" with {} events", m_current_input_file, Nevents_in_file);
0100
0101 } catch (const std::exception& e) {
0102 m_log->error("Failed to open file {}: {}", m_current_input_file, e.what());
0103 throw;
0104 }
0105
0106 m_file_available = true;
0107 m_file_cv.notify_all();
0108 }
0109
0110 void JEventSourceManagedPODIO::ResetReader() {
0111 std::lock_guard<std::mutex> lock(m_file_mutex);
0112 m_reader.reset();
0113 m_file_available = false;
0114 }