File indexing completed on 2025-12-16 09:23:55
0001
0002
0003
0004
0005
0006
0007
0008
0009 #include "ActsExamples/Io/Podio/PodioWriter.hpp"
0010
0011 #include "Acts/Utilities/Logger.hpp"
0012 #include "ActsExamples/Framework/DataHandle.hpp"
0013 #include "ActsPlugins/EDM4hep/PodioUtil.hpp"
0014
0015 #include <algorithm>
0016 #include <filesystem>
0017 #include <list>
0018 #include <mutex>
0019
0020 #include <podio/CollectionBase.h>
0021 #include <podio/Frame.h>
0022 #include <tbb/enumerable_thread_specific.h>
0023
0024 namespace ActsExamples {
0025 namespace detail {
0026
0027 using CollectionHandle =
0028 ConsumeDataHandle<std::unique_ptr<podio::CollectionBase>>;
0029
0030 class PodioWriterImpl {
0031 public:
0032 PodioWriterImpl(const PodioWriter::Config& config, PodioWriter& parent)
0033 : m_cfg(config),
0034 m_inputPodioFrame(&parent, "InputPodioFrame"),
0035 m_singleWriter(
0036 config.separateFilesPerThread
0037 ? nullptr
0038 : std::make_unique<ActsPlugins::PodioUtil::ROOTWriter>(
0039 config.outputPath)),
0040 m_useThreadLocalWriters(config.separateFilesPerThread) {}
0041
0042 PodioWriter::Config m_cfg;
0043
0044 ConsumeDataHandle<podio::Frame> m_inputPodioFrame;
0045
0046 std::vector<std::unique_ptr<CollectionHandle>> m_collections;
0047
0048 std::unique_ptr<ActsPlugins::PodioUtil::ROOTWriter> m_singleWriter;
0049 tbb::enumerable_thread_specific<
0050 std::unique_ptr<ActsPlugins::PodioUtil::ROOTWriter>>
0051 m_threadLocalWriters;
0052
0053 std::mutex m_writeMutex;
0054 bool m_useThreadLocalWriters{};
0055
0056 std::string threadLocalFileName(std::size_t threadId) const {
0057 namespace fs = std::filesystem;
0058 const fs::path basePath{m_cfg.outputPath};
0059 const fs::path directory = basePath.parent_path();
0060 const std::string stem = basePath.stem().string();
0061 const std::string extension = basePath.extension().string();
0062
0063 std::string threadStem = stem.empty() ? basePath.filename().string() : stem;
0064 threadStem += "_thread" + std::to_string(threadId);
0065
0066 fs::path threadFile = directory / (threadStem + extension);
0067 return threadFile.string();
0068 }
0069
0070 ActsPlugins::PodioUtil::ROOTWriter& writerForThread(std::size_t threadId) {
0071 if (!m_useThreadLocalWriters) {
0072 return *m_singleWriter;
0073 }
0074 auto& localWriter = m_threadLocalWriters.local();
0075 if (!localWriter) {
0076 localWriter = std::make_unique<ActsPlugins::PodioUtil::ROOTWriter>(
0077 threadLocalFileName(threadId));
0078 }
0079 return *localWriter;
0080 }
0081 };
0082 }
0083
0084 PodioWriter::PodioWriter(const Config& config, Acts::Logging::Level level)
0085 : m_logger(Acts::getDefaultLogger("PodioWriter", level)),
0086 m_impl(std::make_unique<detail::PodioWriterImpl>(config, *this)) {
0087 ACTS_DEBUG("Creating output file " << config.outputPath);
0088
0089 if (m_impl->m_cfg.category.empty()) {
0090 throw std::invalid_argument("Category name is not set");
0091 }
0092 if (!m_impl->m_cfg.inputFrame.has_value()) {
0093 ACTS_DEBUG("No input frame name set, will create a new one");
0094 } else {
0095 m_impl->m_inputPodioFrame.initialize(m_impl->m_cfg.inputFrame.value());
0096 }
0097
0098 std::set<std::string> collections;
0099 std::ranges::copy(m_impl->m_cfg.collections,
0100 std::inserter(collections, collections.end()));
0101 if (collections.size() != m_impl->m_cfg.collections.size()) {
0102 throw std::invalid_argument(
0103 "Duplicate collection names in config, please use check your "
0104 "configuration");
0105 }
0106 if (std::ranges::any_of(m_impl->m_cfg.collections,
0107 [](const auto& c) { return c.empty(); })) {
0108 throw std::invalid_argument("Collection name is empty");
0109 }
0110
0111 ACTS_DEBUG("Adding " << m_impl->m_cfg.collections.size() << " collections");
0112 for (const auto& collection : m_impl->m_cfg.collections) {
0113 ACTS_DEBUG("- " << collection);
0114 auto up = std::make_unique<detail::CollectionHandle>(this, collection);
0115 m_impl->m_collections.push_back(std::move(up));
0116
0117 m_impl->m_collections.back()->initialize(collection);
0118 }
0119 }
0120
0121 std::string PodioWriter::name() const {
0122 return "PodioWriter";
0123 }
0124
0125 ProcessCode PodioWriter::write(const AlgorithmContext& ctx) {
0126 ACTS_DEBUG("PodioWriter::write");
0127 podio::Frame frame = [this, &ctx]() -> podio::Frame {
0128 if (m_impl->m_inputPodioFrame.isInitialized()) {
0129 ACTS_VERBOSE(
0130 "PodioWriter::write: taking inputPodioFrame from WhiteBoard");
0131 return m_impl->m_inputPodioFrame(ctx);
0132 } else {
0133 ACTS_VERBOSE("PodioWriter::write: creating new inputPodioFrame");
0134 return {};
0135 }
0136 }();
0137
0138 std::unique_lock<std::mutex> guard;
0139 if (!m_impl->m_useThreadLocalWriters) {
0140 guard = std::unique_lock<std::mutex>(m_impl->m_writeMutex);
0141 }
0142 for (const auto& handle : m_impl->m_collections) {
0143 auto collectionPtr = (*handle)(ctx);
0144 if (!collectionPtr) {
0145 ACTS_ERROR("PodioWriter::write: collection is not initialized");
0146 return ProcessCode::ABORT;
0147 }
0148 ACTS_VERBOSE("PodioWriter::write: adding collection " << handle->name()
0149 << " to frame");
0150 frame.put(std::move(collectionPtr), handle->name());
0151 }
0152 auto& writer = m_impl->writerForThread(ctx.threadId);
0153 writer.writeFrame(frame, m_impl->m_cfg.category);
0154
0155 return ProcessCode::SUCCESS;
0156 }
0157
0158 ProcessCode PodioWriter::finalize() {
0159 if (m_impl->m_useThreadLocalWriters) {
0160 for (auto& tlsWriter : m_impl->m_threadLocalWriters) {
0161 if (tlsWriter) {
0162 tlsWriter->finish();
0163 }
0164 }
0165 } else if (m_impl->m_singleWriter) {
0166 m_impl->m_singleWriter->finish();
0167 }
0168
0169 return ProcessCode::SUCCESS;
0170 }
0171
0172 PodioWriter::~PodioWriter() = default;
0173
0174 const PodioWriter::Config& PodioWriter::config() const {
0175 return m_impl->m_cfg;
0176 }
0177
0178 }