File indexing completed on 2026-06-20 07:36:36
0001
0002
0003
0004
0005
0006
0007
0008
0009 #include "ActsExamples/Io/Parquet/ParquetReader.hpp"
0010
0011 #include "Acts/Utilities/ScopedTimer.hpp"
0012 #include "ActsExamples/Framework/DataHandle.hpp"
0013 #include "ActsPlugins/Arrow/ArrowUtil.hpp"
0014
0015 #include <format>
0016 #include <stdexcept>
0017 #include <unordered_map>
0018 #include <unordered_set>
0019
0020 #include <arrow/api.h>
0021
0022 namespace ActsExamples {
0023
0024 class ParquetReader::Impl {
0025 public:
0026 struct CollectionState {
0027 std::string name;
0028 std::unique_ptr<ActsPlugins::ArrowUtil::ParquetDatasetReader> reader;
0029 std::unique_ptr<WriteDataHandle<ActsPlugins::ArrowUtil::ArrowTable>> handle;
0030 };
0031
0032 explicit Impl(ParquetReader::Config cfg, ParquetReader& parent)
0033 : m_cfg(std::move(cfg)) {
0034 if (m_cfg.collections.empty()) {
0035 throw std::invalid_argument("ParquetReader: no collections configured");
0036 }
0037 std::unordered_set<std::string> seenDirs;
0038 for (const auto& [name, rawPath] : m_cfg.collections) {
0039 if (name.empty()) {
0040 throw std::invalid_argument("ParquetReader: empty collection name");
0041 }
0042 if (rawPath.empty()) {
0043 throw std::invalid_argument(std::format(
0044 "ParquetReader: empty input directory for collection '{}'", name));
0045 }
0046 std::filesystem::path resolved =
0047 rawPath.is_absolute() ? rawPath : m_cfg.inputDir / rawPath;
0048 if (!seenDirs.insert(resolved.lexically_normal().string()).second) {
0049 throw std::invalid_argument(
0050 std::format("ParquetReader: duplicate input directory '{}'",
0051 resolved.string()));
0052 }
0053 auto schemaIt = m_cfg.expectedSchemas.find(name);
0054 if (schemaIt == m_cfg.expectedSchemas.end() || !schemaIt->second) {
0055 throw std::invalid_argument(std::format(
0056 "ParquetReader: collection '{}' has no expected schema. Every "
0057 "configured collection must declare an expected schema; the "
0058 "scanner uses it as the dataset's target so missing columns "
0059 "become nulls and extras are dropped.",
0060 name));
0061 }
0062 }
0063 for (const auto& [name, _] : m_cfg.expectedSchemas) {
0064 if (!m_cfg.collections.contains(name)) {
0065 throw std::invalid_argument(std::format(
0066 "ParquetReader: expectedSchemas has entry for '{}' but no matching "
0067 "collection",
0068 name));
0069 }
0070 }
0071
0072 std::int64_t referenceEvents = -1;
0073 std::string referenceName;
0074
0075 for (const auto& [name, rawPath] : m_cfg.collections) {
0076 std::filesystem::path directory =
0077 rawPath.is_absolute() ? rawPath : m_cfg.inputDir / rawPath;
0078 if (!std::filesystem::exists(directory)) {
0079 throw std::invalid_argument(std::format(
0080 "ParquetReader: missing directory '{}'", directory.string()));
0081 }
0082
0083 auto state = std::make_unique<CollectionState>();
0084 state->name = name;
0085 state->reader =
0086 std::make_unique<ActsPlugins::ArrowUtil::ParquetDatasetReader>(
0087 directory, m_cfg.expectedSchemas.at(name).schema());
0088
0089 const auto events = state->reader->numEvents();
0090 if (referenceEvents < 0) {
0091 referenceEvents = events;
0092 referenceName = name;
0093 } else if (events != referenceEvents) {
0094 throw std::invalid_argument(std::format(
0095 "ParquetReader: event count mismatch across collections. '{}' has "
0096 "{} events, '{}' has {}. All collections must describe the same "
0097 "events.",
0098 referenceName, referenceEvents, name, events));
0099 }
0100
0101 state->handle =
0102 std::make_unique<WriteDataHandle<ActsPlugins::ArrowUtil::ArrowTable>>(
0103 &parent, name);
0104 state->handle->initialize(name);
0105 m_collectionStates.push_back(std::move(state));
0106 }
0107
0108 m_eventsRange = {
0109 0, referenceEvents < 0 ? 0 : static_cast<std::size_t>(referenceEvents)};
0110 }
0111
0112 ParquetReader::Config m_cfg;
0113 std::vector<std::unique_ptr<CollectionState>> m_collectionStates;
0114 std::pair<std::size_t, std::size_t> m_eventsRange{0, 0};
0115 };
0116
0117 ParquetReader::ParquetReader(const Config& config,
0118 std::unique_ptr<const Acts::Logger> logger)
0119 : m_impl(std::make_unique<Impl>(config, *this)),
0120 m_logger(std::move(logger)) {}
0121
0122 ParquetReader::ParquetReader(const Config& config, Acts::Logging::Level level)
0123 : ParquetReader(config, Acts::getDefaultLogger("ParquetReader", level)) {}
0124
0125 ParquetReader::~ParquetReader() = default;
0126
0127 std::string ParquetReader::name() const {
0128 return "ParquetReader";
0129 }
0130
0131 std::pair<std::size_t, std::size_t> ParquetReader::availableEvents() const {
0132 return m_impl->m_eventsRange;
0133 }
0134
0135 ProcessCode ParquetReader::read(const AlgorithmContext& context) {
0136 Acts::ScopedTimer timer("Reading Parquet inputs", logger(),
0137 Acts::Logging::DEBUG);
0138
0139 for (const auto& state : m_impl->m_collectionStates) {
0140 auto table = state->reader->readEvent(
0141 static_cast<std::uint64_t>(context.eventNumber));
0142 if (table == nullptr || table->num_rows() == 0) {
0143 ACTS_ERROR("ParquetReader: no row matched event "
0144 << context.eventNumber << " in collection '" << state->name
0145 << "'");
0146 return ProcessCode::ABORT;
0147 }
0148 (*state->handle)(context,
0149 ActsPlugins::ArrowUtil::ArrowTable{std::move(table)});
0150 }
0151
0152 return ProcessCode::SUCCESS;
0153 }
0154
0155 const ParquetReader::Config& ParquetReader::config() const {
0156 return m_impl->m_cfg;
0157 }
0158
0159 }