|
|
|||
File indexing completed on 2026-04-17 08:28:53
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <cstdint> 0021 // N.B. we don't include async_generator.h as it's relatively heavy 0022 #include <functional> 0023 #include <memory> 0024 #include <vector> 0025 0026 #include "parquet/file_reader.h" 0027 #include "parquet/platform.h" 0028 #include "parquet/properties.h" 0029 0030 namespace arrow { 0031 0032 class ChunkedArray; 0033 class KeyValueMetadata; 0034 class RecordBatchReader; 0035 struct Scalar; 0036 class Schema; 0037 class Table; 0038 class RecordBatch; 0039 0040 } // namespace arrow 0041 0042 namespace parquet { 0043 0044 class FileMetaData; 0045 class SchemaDescriptor; 0046 0047 namespace arrow { 0048 0049 class ColumnChunkReader; 0050 class ColumnReader; 0051 struct SchemaManifest; 0052 class RowGroupReader; 0053 0054 /// \brief Arrow read adapter class for deserializing Parquet files as Arrow row batches. 0055 /// 0056 /// This interfaces caters for different use cases and thus provides different 0057 /// interfaces. In its most simplistic form, we cater for a user that wants to 0058 /// read the whole Parquet at once with the `FileReader::ReadTable` method. 0059 /// 0060 /// More advanced users that also want to implement parallelism on top of each 0061 /// single Parquet files should do this on the RowGroup level. For this, they can 0062 /// call `FileReader::RowGroup(i)->ReadTable` to receive only the specified 0063 /// RowGroup as a table. 0064 /// 0065 /// In the most advanced situation, where a consumer wants to independently read 0066 /// RowGroups in parallel and consume each column individually, they can call 0067 /// `FileReader::RowGroup(i)->Column(j)->Read` and receive an `arrow::Column` 0068 /// instance. 0069 /// 0070 /// Finally, one can also get a stream of record batches using 0071 /// `FileReader::GetRecordBatchReader()`. This can internally decode columns 0072 /// in parallel if use_threads was enabled in the ArrowReaderProperties. 0073 /// 0074 /// The parquet format supports an optional integer field_id which can be assigned 0075 /// to a field. Arrow will convert these field IDs to a metadata key named 0076 /// PARQUET:field_id on the appropriate field. 0077 // TODO(wesm): nested data does not always make sense with this user 0078 // interface unless you are only reading a single leaf node from a branch of 0079 // a table. For example: 0080 // 0081 // repeated group data { 0082 // optional group record { 0083 // optional int32 val1; 0084 // optional byte_array val2; 0085 // optional bool val3; 0086 // } 0087 // optional int32 val4; 0088 // } 0089 // 0090 // In the Parquet file, there are 4 leaf nodes: 0091 // 0092 // * data.record.val1 0093 // * data.record.val2 0094 // * data.record.val3 0095 // * data.val4 0096 // 0097 // When materializing this data in an Arrow array, we would have: 0098 // 0099 // data: list<struct< 0100 // record: struct< 0101 // val1: int32, 0102 // val2: string (= list<uint8>), 0103 // val3: bool, 0104 // >, 0105 // val4: int32 0106 // >> 0107 // 0108 // However, in the Parquet format, each leaf node has its own repetition and 0109 // definition levels describing the structure of the intermediate nodes in 0110 // this array structure. Thus, we will need to scan the leaf data for a group 0111 // of leaf nodes part of the same type tree to create a single result Arrow 0112 // nested array structure. 0113 // 0114 // This is additionally complicated "chunky" repeated fields or very large byte 0115 // arrays 0116 class PARQUET_EXPORT FileReader { 0117 public: 0118 /// Factory function to create a FileReader from a ParquetFileReader and properties 0119 static ::arrow::Status Make(::arrow::MemoryPool* pool, 0120 std::unique_ptr<ParquetFileReader> reader, 0121 const ArrowReaderProperties& properties, 0122 std::unique_ptr<FileReader>* out); 0123 0124 /// Factory function to create a FileReader from a ParquetFileReader 0125 static ::arrow::Status Make(::arrow::MemoryPool* pool, 0126 std::unique_ptr<ParquetFileReader> reader, 0127 std::unique_ptr<FileReader>* out); 0128 0129 // Since the distribution of columns amongst a Parquet file's row groups may 0130 // be uneven (the number of values in each column chunk can be different), we 0131 // provide a column-oriented read interface. The ColumnReader hides the 0132 // details of paging through the file's row groups and yielding 0133 // fully-materialized arrow::Array instances 0134 // 0135 // Returns error status if the column of interest is not flat. 0136 // The indicated column index is relative to the schema 0137 virtual ::arrow::Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) = 0; 0138 0139 /// \brief Return arrow schema for all the columns. 0140 virtual ::arrow::Status GetSchema(std::shared_ptr<::arrow::Schema>* out) = 0; 0141 0142 /// \brief Read column as a whole into a chunked array. 0143 /// 0144 /// The index i refers the index of the top level schema field, which may 0145 /// be nested or flat - e.g. 0146 /// 0147 /// 0 foo.bar 0148 /// foo.bar.baz 0149 /// foo.qux 0150 /// 1 foo2 0151 /// 2 foo3 0152 /// 0153 /// i=0 will read the entire foo struct, i=1 the foo2 primitive column etc 0154 virtual ::arrow::Status ReadColumn(int i, 0155 std::shared_ptr<::arrow::ChunkedArray>* out) = 0; 0156 0157 /// \brief Return a RecordBatchReader of all row groups and columns. 0158 virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>> 0159 GetRecordBatchReader() = 0; 0160 0161 /// \brief Return a RecordBatchReader of row groups selected from row_group_indices. 0162 /// 0163 /// Note that the ordering in row_group_indices matters. FileReaders must outlive 0164 /// their RecordBatchReaders. 0165 /// 0166 /// \returns error Result if row_group_indices contains an invalid index 0167 virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>> 0168 GetRecordBatchReader(const std::vector<int>& row_group_indices) = 0; 0169 0170 /// \brief Return a RecordBatchReader of row groups selected from 0171 /// row_group_indices, whose columns are selected by column_indices. 0172 /// 0173 /// Note that the ordering in row_group_indices and column_indices 0174 /// matter. FileReaders must outlive their RecordBatchReaders. 0175 /// 0176 /// \returns error Result if either row_group_indices or column_indices 0177 /// contains an invalid index 0178 virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>> 0179 GetRecordBatchReader(const std::vector<int>& row_group_indices, 0180 const std::vector<int>& column_indices) = 0; 0181 0182 /// \brief Return a RecordBatchReader of row groups selected from 0183 /// row_group_indices, whose columns are selected by column_indices. 0184 /// 0185 /// Note that the ordering in row_group_indices and column_indices 0186 /// matter. FileReaders must outlive their RecordBatchReaders. 0187 /// 0188 /// \param row_group_indices which row groups to read (order determines read order). 0189 /// \param column_indices which columns to read (order determines output schema). 0190 /// \param[out] out record batch stream from parquet data. 0191 /// 0192 /// \returns error Status if either row_group_indices or column_indices 0193 /// contains an invalid index 0194 /// \deprecated Deprecated in 21.0.0. Use arrow::Result version instead. 0195 ARROW_DEPRECATED("Deprecated in 21.0.0. Use arrow::Result version instead.") 0196 ::arrow::Status GetRecordBatchReader(const std::vector<int>& row_group_indices, 0197 const std::vector<int>& column_indices, 0198 std::shared_ptr<::arrow::RecordBatchReader>* out); 0199 0200 /// \deprecated Deprecated in 21.0.0. Use arrow::Result version instead. 0201 ARROW_DEPRECATED("Deprecated in 21.0.0. Use arrow::Result version instead.") 0202 ::arrow::Status GetRecordBatchReader(const std::vector<int>& row_group_indices, 0203 std::shared_ptr<::arrow::RecordBatchReader>* out); 0204 0205 /// \deprecated Deprecated in 21.0.0. Use arrow::Result version instead. 0206 ARROW_DEPRECATED("Deprecated in 21.0.0. Use arrow::Result version instead.") 0207 ::arrow::Status GetRecordBatchReader(std::shared_ptr<::arrow::RecordBatchReader>* out); 0208 0209 /// \brief Return a generator of record batches. 0210 /// 0211 /// The FileReader must outlive the generator, so this requires that you pass in a 0212 /// shared_ptr. 0213 /// 0214 /// \returns error Result if either row_group_indices or column_indices contains an 0215 /// invalid index 0216 virtual ::arrow::Result< 0217 std::function<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>()>> 0218 GetRecordBatchGenerator(std::shared_ptr<FileReader> reader, 0219 const std::vector<int> row_group_indices, 0220 const std::vector<int> column_indices, 0221 ::arrow::internal::Executor* cpu_executor = NULLPTR, 0222 int64_t rows_to_readahead = 0) = 0; 0223 0224 /// Read all columns into a Table 0225 virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0; 0226 0227 /// \brief Read the given columns into a Table 0228 /// 0229 /// The indicated column indices are relative to the internal representation 0230 /// of the parquet table. For instance : 0231 /// 0 foo.bar 0232 /// foo.bar.baz 0 0233 /// foo.bar.baz2 1 0234 /// foo.qux 2 0235 /// 1 foo2 3 0236 /// 2 foo3 4 0237 /// 0238 /// i=0 will read foo.bar.baz, i=1 will read only foo.bar.baz2 and so on. 0239 /// Only leaf fields have indices; foo itself doesn't have an index. 0240 /// To get the index for a particular leaf field, one can use 0241 /// manifest().schema_fields to get the top level fields, and then walk the 0242 /// tree to identify the relevant leaf fields and access its column_index. 0243 /// To get the total number of leaf fields, use FileMetadata.num_columns(). 0244 virtual ::arrow::Status ReadTable(const std::vector<int>& column_indices, 0245 std::shared_ptr<::arrow::Table>* out) = 0; 0246 0247 virtual ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices, 0248 std::shared_ptr<::arrow::Table>* out) = 0; 0249 0250 virtual ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out) = 0; 0251 0252 virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups, 0253 const std::vector<int>& column_indices, 0254 std::shared_ptr<::arrow::Table>* out) = 0; 0255 0256 virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups, 0257 std::shared_ptr<::arrow::Table>* out) = 0; 0258 0259 /// \brief Scan file contents with one thread, return number of rows 0260 virtual ::arrow::Status ScanContents(std::vector<int> columns, 0261 const int32_t column_batch_size, 0262 int64_t* num_rows) = 0; 0263 0264 /// \brief Return a reader for the RowGroup, this object must not outlive the 0265 /// FileReader. 0266 virtual std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) = 0; 0267 0268 /// \brief The number of row groups in the file 0269 virtual int num_row_groups() const = 0; 0270 0271 virtual ParquetFileReader* parquet_reader() const = 0; 0272 0273 /// Set whether to use multiple threads during reads of multiple columns. 0274 /// By default only one thread is used. 0275 virtual void set_use_threads(bool use_threads) = 0; 0276 0277 /// Set number of records to read per batch for the RecordBatchReader. 0278 virtual void set_batch_size(int64_t batch_size) = 0; 0279 0280 virtual const ArrowReaderProperties& properties() const = 0; 0281 0282 virtual const SchemaManifest& manifest() const = 0; 0283 0284 virtual ~FileReader() = default; 0285 }; 0286 0287 class RowGroupReader { 0288 public: 0289 virtual ~RowGroupReader() = default; 0290 virtual std::shared_ptr<ColumnChunkReader> Column(int column_index) = 0; 0291 virtual ::arrow::Status ReadTable(const std::vector<int>& column_indices, 0292 std::shared_ptr<::arrow::Table>* out) = 0; 0293 virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0; 0294 0295 private: 0296 struct Iterator; 0297 }; 0298 0299 class ColumnChunkReader { 0300 public: 0301 virtual ~ColumnChunkReader() = default; 0302 virtual ::arrow::Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) = 0; 0303 }; 0304 0305 // At this point, the column reader is a stream iterator. It only knows how to 0306 // read the next batch of values for a particular column from the file until it 0307 // runs out. 0308 // 0309 // We also do not expose any internal Parquet details, such as row groups. This 0310 // might change in the future. 0311 class PARQUET_EXPORT ColumnReader { 0312 public: 0313 virtual ~ColumnReader() = default; 0314 0315 // Scan the next array of the indicated size. The actual size of the 0316 // returned array may be less than the passed size depending how much data is 0317 // available in the file. 0318 // 0319 // When all the data in the file has been exhausted, the result is set to 0320 // nullptr. 0321 // 0322 // Returns Status::OK on a successful read, including if you have exhausted 0323 // the data available in the file. 0324 virtual ::arrow::Status NextBatch(int64_t batch_size, 0325 std::shared_ptr<::arrow::ChunkedArray>* out) = 0; 0326 }; 0327 0328 /// \brief Experimental helper class for bindings (like Python) that struggle 0329 /// either with std::move or C++ exceptions 0330 class PARQUET_EXPORT FileReaderBuilder { 0331 public: 0332 FileReaderBuilder(); 0333 0334 /// Create FileReaderBuilder from Arrow file and optional properties / metadata 0335 ::arrow::Status Open(std::shared_ptr<::arrow::io::RandomAccessFile> file, 0336 const ReaderProperties& properties = default_reader_properties(), 0337 std::shared_ptr<FileMetaData> metadata = NULLPTR); 0338 0339 /// Create FileReaderBuilder from file path and optional properties / metadata 0340 ::arrow::Status OpenFile(const std::string& path, bool memory_map = false, 0341 const ReaderProperties& props = default_reader_properties(), 0342 std::shared_ptr<FileMetaData> metadata = NULLPTR); 0343 0344 ParquetFileReader* raw_reader() { return raw_reader_.get(); } 0345 0346 /// Set Arrow MemoryPool for memory allocation 0347 FileReaderBuilder* memory_pool(::arrow::MemoryPool* pool); 0348 /// Set Arrow reader properties 0349 FileReaderBuilder* properties(const ArrowReaderProperties& arg_properties); 0350 /// Build FileReader instance 0351 ::arrow::Status Build(std::unique_ptr<FileReader>* out); 0352 ::arrow::Result<std::unique_ptr<FileReader>> Build(); 0353 0354 private: 0355 ::arrow::MemoryPool* pool_; 0356 ArrowReaderProperties properties_; 0357 std::unique_ptr<ParquetFileReader> raw_reader_; 0358 }; 0359 0360 /// \defgroup parquet-arrow-reader-factories Factory functions for Parquet Arrow readers 0361 /// 0362 /// @{ 0363 0364 /// \brief Build FileReader from Arrow file and MemoryPool 0365 /// 0366 /// Advanced settings are supported through the FileReaderBuilder class. 0367 PARQUET_EXPORT 0368 ::arrow::Result<std::unique_ptr<FileReader>> OpenFile( 0369 std::shared_ptr<::arrow::io::RandomAccessFile>, ::arrow::MemoryPool* allocator); 0370 0371 /// @} 0372 0373 PARQUET_EXPORT 0374 ::arrow::Status StatisticsAsScalars(const Statistics& Statistics, 0375 std::shared_ptr<::arrow::Scalar>* min, 0376 std::shared_ptr<::arrow::Scalar>* max); 0377 0378 namespace internal { 0379 0380 PARQUET_EXPORT 0381 ::arrow::Status FuzzReader(const uint8_t* data, int64_t size); 0382 0383 } // namespace internal 0384 } // namespace arrow 0385 } // namespace parquet
| [ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
|
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
|