Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:58

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // This API is EXPERIMENTAL.
0019 
0020 #pragma once
0021 
0022 #include <memory>
0023 #include <optional>
0024 #include <string>
0025 #include <unordered_set>
0026 #include <utility>
0027 #include <vector>
0028 
0029 #include "arrow/dataset/discovery.h"
0030 #include "arrow/dataset/file_base.h"
0031 #include "arrow/dataset/type_fwd.h"
0032 #include "arrow/dataset/visibility.h"
0033 #include "arrow/io/caching.h"
0034 
0035 namespace parquet {
0036 class ParquetFileReader;
0037 class Statistics;
0038 class ColumnChunkMetaData;
0039 class RowGroupMetaData;
0040 class FileMetaData;
0041 class FileDecryptionProperties;
0042 class FileEncryptionProperties;
0043 
0044 class ReaderProperties;
0045 class ArrowReaderProperties;
0046 
0047 class WriterProperties;
0048 class ArrowWriterProperties;
0049 
0050 namespace arrow {
0051 class FileReader;
0052 class FileWriter;
0053 struct SchemaManifest;
0054 }  // namespace arrow
0055 }  // namespace parquet
0056 
0057 namespace arrow {
0058 namespace dataset {
0059 
0060 struct ParquetDecryptionConfig;
0061 struct ParquetEncryptionConfig;
0062 
0063 /// \addtogroup dataset-file-formats
0064 ///
0065 /// @{
0066 
0067 constexpr char kParquetTypeName[] = "parquet";
0068 
0069 /// \brief A FileFormat implementation that reads from Parquet files
0070 class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
0071  public:
0072   ParquetFileFormat();
0073 
0074   /// Convenience constructor which copies properties from a parquet::ReaderProperties.
0075   /// memory_pool will be ignored.
0076   explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties);
0077 
0078   std::string type_name() const override { return kParquetTypeName; }
0079 
0080   bool Equals(const FileFormat& other) const override;
0081 
0082   struct ReaderOptions {
0083     /// \defgroup parquet-file-format-arrow-reader-properties properties which correspond
0084     /// to members of parquet::ArrowReaderProperties.
0085     ///
0086     /// We don't embed parquet::ReaderProperties directly because column names (rather
0087     /// than indices) are used to indicate dictionary columns, and other options are
0088     /// deferred to scan time.
0089     ///
0090     /// @{
0091     std::unordered_set<std::string> dict_columns;
0092     arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
0093     /// @}
0094   } reader_options;
0095 
0096   Result<bool> IsSupported(const FileSource& source) const override;
0097 
0098   /// \brief Return the schema of the file if possible.
0099   Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
0100 
0101   Result<RecordBatchGenerator> ScanBatchesAsync(
0102       const std::shared_ptr<ScanOptions>& options,
0103       const std::shared_ptr<FileFragment>& file) const override;
0104 
0105   Future<std::optional<int64_t>> CountRows(
0106       const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
0107       const std::shared_ptr<ScanOptions>& options) override;
0108 
0109   using FileFormat::MakeFragment;
0110 
0111   /// \brief Create a Fragment targeting all RowGroups.
0112   Result<std::shared_ptr<FileFragment>> MakeFragment(
0113       FileSource source, compute::Expression partition_expression,
0114       std::shared_ptr<Schema> physical_schema) override;
0115 
0116   /// \brief Create a Fragment, restricted to the specified row groups.
0117   Result<std::shared_ptr<ParquetFileFragment>> MakeFragment(
0118       FileSource source, compute::Expression partition_expression,
0119       std::shared_ptr<Schema> physical_schema, std::vector<int> row_groups);
0120 
0121   /// \brief Return a FileReader on the given source.
0122   Result<std::shared_ptr<parquet::arrow::FileReader>> GetReader(
0123       const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;
0124 
0125   Result<std::shared_ptr<parquet::arrow::FileReader>> GetReader(
0126       const FileSource& source, const std::shared_ptr<ScanOptions>& options,
0127       const std::shared_ptr<parquet::FileMetaData>& metadata) const;
0128 
0129   Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
0130       const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;
0131 
0132   Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
0133       const FileSource& source, const std::shared_ptr<ScanOptions>& options,
0134       const std::shared_ptr<parquet::FileMetaData>& metadata) const;
0135 
0136   Result<std::shared_ptr<FileWriter>> MakeWriter(
0137       std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
0138       std::shared_ptr<FileWriteOptions> options,
0139       fs::FileLocator destination_locator) const override;
0140 
0141   std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
0142 };
0143 
0144 /// \brief A FileFragment with parquet logic.
0145 ///
0146 /// ParquetFileFragment provides a lazy (with respect to IO) interface to
0147 /// scan parquet files. Any heavy IO calls are deferred to the Scan() method.
0148 ///
0149 /// The caller can provide an optional list of selected RowGroups to limit the
0150 /// number of scanned RowGroups, or to partition the scans across multiple
0151 /// threads.
0152 ///
0153 /// Metadata can be explicitly provided, enabling pushdown predicate benefits without
0154 /// the potentially heavy IO of loading Metadata from the file system. This can induce
0155 /// significant performance boost when scanning high latency file systems.
0156 class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
0157  public:
0158   Result<FragmentVector> SplitByRowGroup(compute::Expression predicate);
0159 
0160   /// \brief Return the RowGroups selected by this fragment.
0161   const std::vector<int>& row_groups() const {
0162     if (row_groups_) return *row_groups_;
0163     static std::vector<int> empty;
0164     return empty;
0165   }
0166 
0167   /// \brief Return the FileMetaData associated with this fragment.
0168   std::shared_ptr<parquet::FileMetaData> metadata();
0169 
0170   /// \brief Ensure this fragment's FileMetaData is in memory.
0171   Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR);
0172 
0173   /// \brief Return fragment which selects a filtered subset of this fragment's RowGroups.
0174   Result<std::shared_ptr<Fragment>> Subset(compute::Expression predicate);
0175   Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);
0176 
0177   static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
0178       const Field& field, const parquet::Statistics& statistics);
0179 
0180   static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
0181       const Field& field, const FieldRef& field_ref,
0182       const parquet::Statistics& statistics);
0183 
0184  private:
0185   ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
0186                       compute::Expression partition_expression,
0187                       std::shared_ptr<Schema> physical_schema,
0188                       std::optional<std::vector<int>> row_groups);
0189 
0190   Status SetMetadata(std::shared_ptr<parquet::FileMetaData> metadata,
0191                      std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
0192                      std::shared_ptr<parquet::FileMetaData> original_metadata = {});
0193 
0194   // Overridden to opportunistically set metadata since a reader must be opened anyway.
0195   Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
0196     ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
0197     return physical_schema_;
0198   }
0199 
0200   /// Return a filtered subset of row group indices.
0201   Result<std::vector<int>> FilterRowGroups(compute::Expression predicate);
0202   /// Simplify the predicate against the statistics of each row group.
0203   Result<std::vector<compute::Expression>> TestRowGroups(compute::Expression predicate);
0204   /// Try to count rows matching the predicate using metadata. Expects
0205   /// metadata to be present, and expects the predicate to have been
0206   /// simplified against the partition expression already.
0207   Result<std::optional<int64_t>> TryCountRows(compute::Expression predicate);
0208 
0209   ParquetFileFormat& parquet_format_;
0210 
0211   /// Indices of row groups selected by this fragment,
0212   /// or std::nullopt if all row groups are selected.
0213   std::optional<std::vector<int>> row_groups_;
0214 
0215   // the expressions (combined for all columns for which statistics have been
0216   // processed) are stored per column group
0217   std::vector<compute::Expression> statistics_expressions_;
0218   // statistics status are kept track of by Parquet Schema column indices
0219   // (i.e. not Arrow schema field index)
0220   std::vector<bool> statistics_expressions_complete_;
0221   std::shared_ptr<parquet::FileMetaData> metadata_;
0222   std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
0223   // The FileMetaData that owns the SchemaDescriptor pointed by SchemaManifest.
0224   std::shared_ptr<parquet::FileMetaData> original_metadata_;
0225 
0226   friend class ParquetFileFormat;
0227   friend class ParquetDatasetFactory;
0228 };
0229 
0230 /// \brief Per-scan options for Parquet fragments
0231 class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
0232  public:
0233   ParquetFragmentScanOptions();
0234   std::string type_name() const override { return kParquetTypeName; }
0235 
0236   /// Reader properties. Not all properties are respected: memory_pool comes from
0237   /// ScanOptions.
0238   std::shared_ptr<parquet::ReaderProperties> reader_properties;
0239   /// Arrow reader properties. Not all properties are respected: batch_size comes from
0240   /// ScanOptions. Additionally, dictionary columns come from
0241   /// ParquetFileFormat::ReaderOptions::dict_columns.
0242   std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
0243   /// A configuration structure that provides decryption properties for a dataset
0244   std::shared_ptr<ParquetDecryptionConfig> parquet_decryption_config = NULLPTR;
0245 };
0246 
0247 class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
0248  public:
0249   /// \brief Parquet writer properties.
0250   std::shared_ptr<parquet::WriterProperties> writer_properties;
0251 
0252   /// \brief Parquet Arrow writer properties.
0253   std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;
0254 
0255   // A configuration structure that provides encryption properties for a dataset
0256   std::shared_ptr<ParquetEncryptionConfig> parquet_encryption_config = NULLPTR;
0257 
0258  protected:
0259   explicit ParquetFileWriteOptions(std::shared_ptr<FileFormat> format)
0260       : FileWriteOptions(std::move(format)) {}
0261 
0262   friend class ParquetFileFormat;
0263 };
0264 
0265 class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter {
0266  public:
0267   const std::shared_ptr<parquet::arrow::FileWriter>& parquet_writer() const {
0268     return parquet_writer_;
0269   }
0270 
0271   Status Write(const std::shared_ptr<RecordBatch>& batch) override;
0272 
0273  private:
0274   ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
0275                     std::shared_ptr<parquet::arrow::FileWriter> writer,
0276                     std::shared_ptr<ParquetFileWriteOptions> options,
0277                     fs::FileLocator destination_locator);
0278 
0279   Future<> FinishInternal() override;
0280 
0281   std::shared_ptr<parquet::arrow::FileWriter> parquet_writer_;
0282 
0283   friend class ParquetFileFormat;
0284 };
0285 
0286 /// \brief Options for making a FileSystemDataset from a Parquet _metadata file.
0287 struct ParquetFactoryOptions {
0288   /// Either an explicit Partitioning or a PartitioningFactory to discover one.
0289   ///
0290   /// If a factory is provided, it will be used to infer a schema for partition fields
0291   /// based on file and directory paths then construct a Partitioning. The default
0292   /// is a Partitioning which will yield no partition information.
0293   ///
0294   /// The (explicit or discovered) partitioning will be applied to discovered files
0295   /// and the resulting partition information embedded in the Dataset.
0296   PartitioningOrFactory partitioning{Partitioning::Default()};
0297 
0298   /// For the purposes of applying the partitioning, paths will be stripped
0299   /// of the partition_base_dir. Files not matching the partition_base_dir
0300   /// prefix will be skipped for partition discovery. The ignored files will still
0301   /// be part of the Dataset, but will not have partition information.
0302   ///
0303   /// Example:
0304   /// partition_base_dir = "/dataset";
0305   ///
0306   /// - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the partitioning
0307   ///
0308   /// - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
0309   ///
0310   /// This is useful for partitioning which parses directory when ordering
0311   /// is important, e.g. DirectoryPartitioning.
0312   std::string partition_base_dir;
0313 
0314   /// Assert that all ColumnChunk paths are consistent. The parquet spec allows for
0315   /// ColumnChunk data to be stored in multiple files, but ParquetDatasetFactory
0316   /// supports only a single file with all ColumnChunk data. If this flag is set
0317   /// construction of a ParquetDatasetFactory will raise an error if ColumnChunk
0318   /// data is not resident in a single file.
0319   bool validate_column_chunk_paths = false;
0320 };
0321 
0322 /// \brief Create FileSystemDataset from custom `_metadata` cache file.
0323 ///
0324 /// Dask and other systems will generate a cache metadata file by concatenating
0325 /// the RowGroupMetaData of multiple parquet files into a single parquet file
0326 /// that only contains metadata and no ColumnChunk data.
0327 ///
0328 /// ParquetDatasetFactory creates a FileSystemDataset composed of
0329 /// ParquetFileFragment where each fragment is pre-populated with the exact
0330 /// number of row groups and statistics for each columns.
0331 class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
0332  public:
0333   /// \brief Create a ParquetDatasetFactory from a metadata path.
0334   ///
0335   /// The `metadata_path` will be read from `filesystem`. Each RowGroup
0336   /// contained in the metadata file will be relative to `dirname(metadata_path)`.
0337   ///
0338   /// \param[in] metadata_path path of the metadata parquet file
0339   /// \param[in] filesystem from which to open/read the path
0340   /// \param[in] format to read the file with.
0341   /// \param[in] options see ParquetFactoryOptions
0342   static Result<std::shared_ptr<DatasetFactory>> Make(
0343       const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
0344       std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options);
0345 
0346   /// \brief Create a ParquetDatasetFactory from a metadata source.
0347   ///
0348   /// Similar to the previous Make definition, but the metadata can be a Buffer
0349   /// and the base_path is explicit instead of inferred from the metadata
0350   /// path.
0351   ///
0352   /// \param[in] metadata source to open the metadata parquet file from
0353   /// \param[in] base_path used as the prefix of every parquet files referenced
0354   /// \param[in] filesystem from which to read the files referenced.
0355   /// \param[in] format to read the file with.
0356   /// \param[in] options see ParquetFactoryOptions
0357   static Result<std::shared_ptr<DatasetFactory>> Make(
0358       const FileSource& metadata, const std::string& base_path,
0359       std::shared_ptr<fs::FileSystem> filesystem,
0360       std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options);
0361 
0362   Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
0363       InspectOptions options) override;
0364 
0365   Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
0366 
0367  protected:
0368   ParquetDatasetFactory(
0369       std::shared_ptr<fs::FileSystem> filesystem,
0370       std::shared_ptr<ParquetFileFormat> format,
0371       std::shared_ptr<parquet::FileMetaData> metadata,
0372       std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
0373       std::shared_ptr<Schema> physical_schema, std::string base_path,
0374       ParquetFactoryOptions options,
0375       std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids)
0376       : filesystem_(std::move(filesystem)),
0377         format_(std::move(format)),
0378         metadata_(std::move(metadata)),
0379         manifest_(std::move(manifest)),
0380         physical_schema_(std::move(physical_schema)),
0381         base_path_(std::move(base_path)),
0382         options_(std::move(options)),
0383         paths_with_row_group_ids_(std::move(paths_with_row_group_ids)) {}
0384 
0385   std::shared_ptr<fs::FileSystem> filesystem_;
0386   std::shared_ptr<ParquetFileFormat> format_;
0387   std::shared_ptr<parquet::FileMetaData> metadata_;
0388   std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
0389   std::shared_ptr<Schema> physical_schema_;
0390   std::string base_path_;
0391   ParquetFactoryOptions options_;
0392   std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids_;
0393 
0394  private:
0395   Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
0396       const Partitioning& partitioning);
0397 
0398   Result<std::shared_ptr<Schema>> PartitionSchema();
0399 };
0400 
0401 /// @}
0402 
0403 }  // namespace dataset
0404 }  // namespace arrow