Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 09:23:55

0001 // This file is part of the ACTS project.
0002 //
0003 // Copyright (C) 2016 CERN for the benefit of the ACTS project
0004 //
0005 // This Source Code Form is subject to the terms of the Mozilla Public
0006 // License, v. 2.0. If a copy of the MPL was not distributed with this
0007 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
0008 
0009 #include "ActsExamples/Io/Podio/PodioWriter.hpp"
0010 
0011 #include "Acts/Utilities/Logger.hpp"
0012 #include "ActsExamples/Framework/DataHandle.hpp"
0013 #include "ActsPlugins/EDM4hep/PodioUtil.hpp"
0014 
0015 #include <algorithm>
0016 #include <filesystem>
0017 #include <list>
0018 #include <mutex>
0019 
0020 #include <podio/CollectionBase.h>
0021 #include <podio/Frame.h>
0022 #include <tbb/enumerable_thread_specific.h>
0023 
0024 namespace ActsExamples {
0025 namespace detail {
0026 
0027 using CollectionHandle =
0028     ConsumeDataHandle<std::unique_ptr<podio::CollectionBase>>;
0029 
0030 class PodioWriterImpl {
0031  public:
0032   PodioWriterImpl(const PodioWriter::Config& config, PodioWriter& parent)
0033       : m_cfg(config),
0034         m_inputPodioFrame(&parent, "InputPodioFrame"),
0035         m_singleWriter(
0036             config.separateFilesPerThread
0037                 ? nullptr
0038                 : std::make_unique<ActsPlugins::PodioUtil::ROOTWriter>(
0039                       config.outputPath)),
0040         m_useThreadLocalWriters(config.separateFilesPerThread) {}
0041 
0042   PodioWriter::Config m_cfg;
0043 
0044   ConsumeDataHandle<podio::Frame> m_inputPodioFrame;
0045 
0046   std::vector<std::unique_ptr<CollectionHandle>> m_collections;
0047 
0048   std::unique_ptr<ActsPlugins::PodioUtil::ROOTWriter> m_singleWriter;
0049   tbb::enumerable_thread_specific<
0050       std::unique_ptr<ActsPlugins::PodioUtil::ROOTWriter>>
0051       m_threadLocalWriters;
0052 
0053   std::mutex m_writeMutex;
0054   bool m_useThreadLocalWriters{};
0055 
0056   std::string threadLocalFileName(std::size_t threadId) const {
0057     namespace fs = std::filesystem;
0058     const fs::path basePath{m_cfg.outputPath};
0059     const fs::path directory = basePath.parent_path();
0060     const std::string stem = basePath.stem().string();
0061     const std::string extension = basePath.extension().string();
0062 
0063     std::string threadStem = stem.empty() ? basePath.filename().string() : stem;
0064     threadStem += "_thread" + std::to_string(threadId);
0065 
0066     fs::path threadFile = directory / (threadStem + extension);
0067     return threadFile.string();
0068   }
0069 
0070   ActsPlugins::PodioUtil::ROOTWriter& writerForThread(std::size_t threadId) {
0071     if (!m_useThreadLocalWriters) {
0072       return *m_singleWriter;
0073     }
0074     auto& localWriter = m_threadLocalWriters.local();
0075     if (!localWriter) {
0076       localWriter = std::make_unique<ActsPlugins::PodioUtil::ROOTWriter>(
0077           threadLocalFileName(threadId));
0078     }
0079     return *localWriter;
0080   }
0081 };
0082 }  // namespace detail
0083 
0084 PodioWriter::PodioWriter(const Config& config, Acts::Logging::Level level)
0085     : m_logger(Acts::getDefaultLogger("PodioWriter", level)),
0086       m_impl(std::make_unique<detail::PodioWriterImpl>(config, *this)) {
0087   ACTS_DEBUG("Creating output file " << config.outputPath);
0088 
0089   if (m_impl->m_cfg.category.empty()) {
0090     throw std::invalid_argument("Category name is not set");
0091   }
0092   if (!m_impl->m_cfg.inputFrame.has_value()) {
0093     ACTS_DEBUG("No input frame name set, will create a new one");
0094   } else {
0095     m_impl->m_inputPodioFrame.initialize(m_impl->m_cfg.inputFrame.value());
0096   }
0097 
0098   std::set<std::string> collections;
0099   std::ranges::copy(m_impl->m_cfg.collections,
0100                     std::inserter(collections, collections.end()));
0101   if (collections.size() != m_impl->m_cfg.collections.size()) {
0102     throw std::invalid_argument(
0103         "Duplicate collection names in config, please use check your "
0104         "configuration");
0105   }
0106   if (std::ranges::any_of(m_impl->m_cfg.collections,
0107                           [](const auto& c) { return c.empty(); })) {
0108     throw std::invalid_argument("Collection name is empty");
0109   }
0110 
0111   ACTS_DEBUG("Adding " << m_impl->m_cfg.collections.size() << " collections");
0112   for (const auto& collection : m_impl->m_cfg.collections) {
0113     ACTS_DEBUG("- " << collection);
0114     auto up = std::make_unique<detail::CollectionHandle>(this, collection);
0115     m_impl->m_collections.push_back(std::move(up));
0116 
0117     m_impl->m_collections.back()->initialize(collection);
0118   }
0119 }
0120 
0121 std::string PodioWriter::name() const {
0122   return "PodioWriter";
0123 }
0124 
0125 ProcessCode PodioWriter::write(const AlgorithmContext& ctx) {
0126   ACTS_DEBUG("PodioWriter::write");
0127   podio::Frame frame = [this, &ctx]() -> podio::Frame {
0128     if (m_impl->m_inputPodioFrame.isInitialized()) {
0129       ACTS_VERBOSE(
0130           "PodioWriter::write: taking inputPodioFrame from WhiteBoard");
0131       return m_impl->m_inputPodioFrame(ctx);
0132     } else {
0133       ACTS_VERBOSE("PodioWriter::write: creating new inputPodioFrame");
0134       return {};
0135     }
0136   }();
0137 
0138   std::unique_lock<std::mutex> guard;
0139   if (!m_impl->m_useThreadLocalWriters) {
0140     guard = std::unique_lock<std::mutex>(m_impl->m_writeMutex);
0141   }
0142   for (const auto& handle : m_impl->m_collections) {
0143     auto collectionPtr = (*handle)(ctx);
0144     if (!collectionPtr) {
0145       ACTS_ERROR("PodioWriter::write: collection is not initialized");
0146       return ProcessCode::ABORT;
0147     }
0148     ACTS_VERBOSE("PodioWriter::write: adding collection " << handle->name()
0149                                                           << " to frame");
0150     frame.put(std::move(collectionPtr), handle->name());
0151   }
0152   auto& writer = m_impl->writerForThread(ctx.threadId);
0153   writer.writeFrame(frame, m_impl->m_cfg.category);
0154 
0155   return ProcessCode::SUCCESS;
0156 }
0157 
0158 ProcessCode PodioWriter::finalize() {
0159   if (m_impl->m_useThreadLocalWriters) {
0160     for (auto& tlsWriter : m_impl->m_threadLocalWriters) {
0161       if (tlsWriter) {
0162         tlsWriter->finish();
0163       }
0164     }
0165   } else if (m_impl->m_singleWriter) {
0166     m_impl->m_singleWriter->finish();
0167   }
0168 
0169   return ProcessCode::SUCCESS;
0170 }
0171 
0172 PodioWriter::~PodioWriter() = default;
0173 
0174 const PodioWriter::Config& PodioWriter::config() const {
0175   return m_impl->m_cfg;
0176 }
0177 
0178 }  // namespace ActsExamples