Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-20 07:36:36

0001 // This file is part of the ACTS project.
0002 //
0003 // Copyright (C) 2016 CERN for the benefit of the ACTS project
0004 //
0005 // This Source Code Form is subject to the terms of the Mozilla Public
0006 // License, v. 2.0. If a copy of the MPL was not distributed with this
0007 // file, You can obtain one at https://mozilla.org/MPL/2.0/.
0008 
0009 #include "ActsExamples/Io/Parquet/ParquetReader.hpp"
0010 
0011 #include "Acts/Utilities/ScopedTimer.hpp"
0012 #include "ActsExamples/Framework/DataHandle.hpp"
0013 #include "ActsPlugins/Arrow/ArrowUtil.hpp"
0014 
0015 #include <format>
0016 #include <stdexcept>
0017 #include <unordered_map>
0018 #include <unordered_set>
0019 
0020 #include <arrow/api.h>
0021 
0022 namespace ActsExamples {
0023 
0024 class ParquetReader::Impl {
0025  public:
0026   struct CollectionState {
0027     std::string name;
0028     std::unique_ptr<ActsPlugins::ArrowUtil::ParquetDatasetReader> reader;
0029     std::unique_ptr<WriteDataHandle<ActsPlugins::ArrowUtil::ArrowTable>> handle;
0030   };
0031 
0032   explicit Impl(ParquetReader::Config cfg, ParquetReader& parent)
0033       : m_cfg(std::move(cfg)) {
0034     if (m_cfg.collections.empty()) {
0035       throw std::invalid_argument("ParquetReader: no collections configured");
0036     }
0037     std::unordered_set<std::string> seenDirs;
0038     for (const auto& [name, rawPath] : m_cfg.collections) {
0039       if (name.empty()) {
0040         throw std::invalid_argument("ParquetReader: empty collection name");
0041       }
0042       if (rawPath.empty()) {
0043         throw std::invalid_argument(std::format(
0044             "ParquetReader: empty input directory for collection '{}'", name));
0045       }
0046       std::filesystem::path resolved =
0047           rawPath.is_absolute() ? rawPath : m_cfg.inputDir / rawPath;
0048       if (!seenDirs.insert(resolved.lexically_normal().string()).second) {
0049         throw std::invalid_argument(
0050             std::format("ParquetReader: duplicate input directory '{}'",
0051                         resolved.string()));
0052       }
0053       auto schemaIt = m_cfg.expectedSchemas.find(name);
0054       if (schemaIt == m_cfg.expectedSchemas.end() || !schemaIt->second) {
0055         throw std::invalid_argument(std::format(
0056             "ParquetReader: collection '{}' has no expected schema. Every "
0057             "configured collection must declare an expected schema; the "
0058             "scanner uses it as the dataset's target so missing columns "
0059             "become nulls and extras are dropped.",
0060             name));
0061       }
0062     }
0063     for (const auto& [name, _] : m_cfg.expectedSchemas) {
0064       if (!m_cfg.collections.contains(name)) {
0065         throw std::invalid_argument(std::format(
0066             "ParquetReader: expectedSchemas has entry for '{}' but no matching "
0067             "collection",
0068             name));
0069       }
0070     }
0071 
0072     std::int64_t referenceEvents = -1;
0073     std::string referenceName;
0074 
0075     for (const auto& [name, rawPath] : m_cfg.collections) {
0076       std::filesystem::path directory =
0077           rawPath.is_absolute() ? rawPath : m_cfg.inputDir / rawPath;
0078       if (!std::filesystem::exists(directory)) {
0079         throw std::invalid_argument(std::format(
0080             "ParquetReader: missing directory '{}'", directory.string()));
0081       }
0082 
0083       auto state = std::make_unique<CollectionState>();
0084       state->name = name;
0085       state->reader =
0086           std::make_unique<ActsPlugins::ArrowUtil::ParquetDatasetReader>(
0087               directory, m_cfg.expectedSchemas.at(name).schema());
0088 
0089       const auto events = state->reader->numEvents();
0090       if (referenceEvents < 0) {
0091         referenceEvents = events;
0092         referenceName = name;
0093       } else if (events != referenceEvents) {
0094         throw std::invalid_argument(std::format(
0095             "ParquetReader: event count mismatch across collections. '{}' has "
0096             "{} events, '{}' has {}. All collections must describe the same "
0097             "events.",
0098             referenceName, referenceEvents, name, events));
0099       }
0100 
0101       state->handle =
0102           std::make_unique<WriteDataHandle<ActsPlugins::ArrowUtil::ArrowTable>>(
0103               &parent, name);
0104       state->handle->initialize(name);
0105       m_collectionStates.push_back(std::move(state));
0106     }
0107 
0108     m_eventsRange = {
0109         0, referenceEvents < 0 ? 0 : static_cast<std::size_t>(referenceEvents)};
0110   }
0111 
0112   ParquetReader::Config m_cfg;
0113   std::vector<std::unique_ptr<CollectionState>> m_collectionStates;
0114   std::pair<std::size_t, std::size_t> m_eventsRange{0, 0};
0115 };
0116 
0117 ParquetReader::ParquetReader(const Config& config,
0118                              std::unique_ptr<const Acts::Logger> logger)
0119     : m_impl(std::make_unique<Impl>(config, *this)),
0120       m_logger(std::move(logger)) {}
0121 
0122 ParquetReader::ParquetReader(const Config& config, Acts::Logging::Level level)
0123     : ParquetReader(config, Acts::getDefaultLogger("ParquetReader", level)) {}
0124 
0125 ParquetReader::~ParquetReader() = default;
0126 
0127 std::string ParquetReader::name() const {
0128   return "ParquetReader";
0129 }
0130 
0131 std::pair<std::size_t, std::size_t> ParquetReader::availableEvents() const {
0132   return m_impl->m_eventsRange;
0133 }
0134 
0135 ProcessCode ParquetReader::read(const AlgorithmContext& context) {
0136   Acts::ScopedTimer timer("Reading Parquet inputs", logger(),
0137                           Acts::Logging::DEBUG);
0138 
0139   for (const auto& state : m_impl->m_collectionStates) {
0140     auto table = state->reader->readEvent(
0141         static_cast<std::uint64_t>(context.eventNumber));
0142     if (table == nullptr || table->num_rows() == 0) {
0143       ACTS_ERROR("ParquetReader: no row matched event "
0144                  << context.eventNumber << " in collection '" << state->name
0145                  << "'");
0146       return ProcessCode::ABORT;
0147     }
0148     (*state->handle)(context,
0149                      ActsPlugins::ArrowUtil::ArrowTable{std::move(table)});
0150   }
0151 
0152   return ProcessCode::SUCCESS;
0153 }
0154 
0155 const ParquetReader::Config& ParquetReader::config() const {
0156   return m_impl->m_cfg;
0157 }
0158 
0159 }  // namespace ActsExamples