Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-17 07:50:56

0001 #include "JEventSourceManagedPODIO.h"
0002 
0003 #include <JANA/JApplication.h>
0004 #include <JANA/Utils/JTypeInfo.h>
0005 #include <fmt/format.h>
0006 #include <podio/Reader.h>
0007 #include <spdlog/logger.h>
0008 #include <exception>
0009 #include <filesystem>
0010 #include <memory>
0011 #include <stdexcept>
0012 
0013 #include "services/io/podio/JEventSourcePODIO.h"
0014 #include "services/log/Log_service.h"
0015 
0016 JEventSourceManagedPODIO::JEventSourceManagedPODIO(std::string resource_name, JApplication* app)
0017     : JEventSourcePODIO(resource_name, app) {
0018   SetTypeName(NAME_OF_THIS);
0019 
0020   m_log = GetApplication()->GetService<Log_service>()->logger("JEventSourceManagedPODIO");
0021 }
0022 
0023 JEventSourceManagedPODIO::~JEventSourceManagedPODIO() {}
0024 
0025 void JEventSourceManagedPODIO::Open() {
0026   m_log->info("Opening managed PODIO source - waiting for file requests");
0027 }
0028 
0029 void JEventSourceManagedPODIO::Close() {
0030   m_log->info("Closing Managed Event Source");
0031   m_closing = true;
0032   m_file_cv.notify_all();
0033 }
0034 
0035 JEventSourceManagedPODIO::Result JEventSourceManagedPODIO::Emit(JEvent& event) {
0036   std::unique_lock<std::mutex> lock(m_file_mutex);
0037 
0038   while (!m_file_available || !m_reader) {
0039     if (m_closing) {
0040       return Result::FailureFinished;
0041     }
0042     m_log->info("Waiting for the next file...");
0043     m_file_cv.wait(lock, [this] { return m_file_available.load() || m_closing.load(); });
0044   }
0045 
0046   if (m_closing) {
0047     return Result::FailureFinished;
0048   }
0049 
0050   // Check if we have events left to read
0051   if (Nevents_read >= Nevents_in_file) {
0052     m_log->info("No more events available in current file, waiting for next file");
0053     m_file_processing_complete = true;
0054     m_file_available           = false;
0055     return Result::FailureTryAgain;
0056   }
0057 
0058   // Use parent class logic to read the event
0059   Result result = JEventSourcePODIO::Emit(event);
0060 
0061   // Check if this was the last event
0062   if (Nevents_read >= Nevents_in_file) {
0063     m_log->info("Finished reading all events from file: {}", m_current_input_file);
0064     m_file_processing_complete = true;
0065   }
0066 
0067   return result;
0068 }
0069 
0070 std::string JEventSourceManagedPODIO::GetDescription() {
0071   return "Managed PODIO source (waits for external file requests)";
0072 }
0073 
0074 void JEventSourceManagedPODIO::SetCurrentFile(const std::string& input_file) {
0075   std::lock_guard<std::mutex> lock(m_file_mutex);
0076 
0077   m_current_input_file = input_file;
0078 
0079   m_file_processing_complete = false;
0080 
0081   try {
0082     m_log->info("Opening file for processing: {}", m_current_input_file);
0083 
0084     // Check if input file exists
0085     if (!std::filesystem::exists(m_current_input_file)) {
0086       throw std::runtime_error(fmt::format("Input file does not exist: {}", m_current_input_file));
0087     }
0088 
0089     // Reset per-file state before opening the new file
0090     m_use_event_headers = true;
0091 
0092     // Use parent class method to open the file
0093     SetResourceName(m_current_input_file);
0094     JEventSourcePODIO::Open();
0095 
0096     Nevents_in_file = m_reader->getEntries("events");
0097     Nevents_read    = 0;
0098 
0099     m_log->info("Opened PODIO file \"{}\" with {} events", m_current_input_file, Nevents_in_file);
0100 
0101   } catch (const std::exception& e) {
0102     m_log->error("Failed to open file {}: {}", m_current_input_file, e.what());
0103     throw;
0104   }
0105 
0106   m_file_available = true;
0107   m_file_cv.notify_all();
0108 }
0109 
0110 void JEventSourceManagedPODIO::ResetReader() {
0111   std::lock_guard<std::mutex> lock(m_file_mutex);
0112   m_reader.reset();
0113   m_file_available = false;
0114 }