|
|
|||
File indexing completed on 2026-04-17 08:28:54
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <cstdint> 0021 #include <memory> 0022 #include <utility> 0023 #include <vector> 0024 0025 #include "arrow/type_fwd.h" 0026 #include "arrow/util/macros.h" 0027 #include "parquet/exception.h" 0028 #include "parquet/level_conversion.h" 0029 #include "parquet/metadata.h" 0030 #include "parquet/platform.h" 0031 #include "parquet/properties.h" 0032 #include "parquet/schema.h" 0033 #include "parquet/types.h" 0034 0035 namespace arrow { 0036 0037 namespace bit_util { 0038 class BitReader; 0039 } // namespace bit_util 0040 0041 namespace util { 0042 template <typename T> 0043 class RleBitPackedDecoder; 0044 } // namespace util 0045 0046 } // namespace arrow 0047 0048 namespace parquet { 0049 0050 class Decryptor; 0051 class Page; 0052 0053 // 16 MB is the default maximum page header size 0054 static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; 0055 0056 // 16 KB is the default expected page header size 0057 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; 0058 0059 // \brief DataPageStats stores encoded statistics and number of values/rows for 0060 // a page. 0061 struct PARQUET_EXPORT DataPageStats { 0062 DataPageStats(const EncodedStatistics* encoded_statistics, int32_t num_values, 0063 std::optional<int32_t> num_rows) 0064 : encoded_statistics(encoded_statistics), 0065 num_values(num_values), 0066 num_rows(num_rows) {} 0067 0068 // Encoded statistics extracted from the page header. 0069 // Nullptr if there are no statistics in the page header. 0070 const EncodedStatistics* encoded_statistics; 0071 // Number of values stored in the page. Filled for both V1 and V2 data pages. 0072 // For repeated fields, this can be greater than number of rows. For 0073 // non-repeated fields, this will be the same as the number of rows. 0074 int32_t num_values; 0075 // Number of rows stored in the page. std::nullopt if not available. 0076 std::optional<int32_t> num_rows; 0077 }; 0078 0079 class PARQUET_EXPORT LevelDecoder { 0080 public: 0081 LevelDecoder(); 0082 ~LevelDecoder(); 0083 0084 // Initialize the LevelDecoder state with new data 0085 // and return the number of bytes consumed 0086 int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values, 0087 const uint8_t* data, int32_t data_size); 0088 0089 void SetDataV2(int32_t num_bytes, int16_t max_level, int num_buffered_values, 0090 const uint8_t* data); 0091 0092 // Decodes a batch of levels into an array and returns the number of levels decoded 0093 int Decode(int batch_size, int16_t* levels); 0094 0095 private: 0096 int bit_width_; 0097 int num_values_remaining_; 0098 Encoding::type encoding_; 0099 std::unique_ptr<::arrow::util::RleBitPackedDecoder<int16_t>> rle_decoder_; 0100 std::unique_ptr<::arrow::bit_util::BitReader> bit_packed_decoder_; 0101 int16_t max_level_; 0102 }; 0103 0104 struct CryptoContext { 0105 bool start_decrypt_with_dictionary_page = false; 0106 int16_t row_group_ordinal = -1; 0107 int16_t column_ordinal = -1; 0108 std::function<std::unique_ptr<Decryptor>()> meta_decryptor_factory; 0109 std::function<std::unique_ptr<Decryptor>()> data_decryptor_factory; 0110 }; 0111 0112 // Abstract page iterator interface. This way, we can feed column pages to the 0113 // ColumnReader through whatever mechanism we choose 0114 class PARQUET_EXPORT PageReader { 0115 using DataPageFilter = std::function<bool(const DataPageStats&)>; 0116 0117 public: 0118 virtual ~PageReader() = default; 0119 0120 static std::unique_ptr<PageReader> Open( 0121 std::shared_ptr<ArrowInputStream> stream, int64_t total_num_values, 0122 Compression::type codec, bool always_compressed = false, 0123 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), 0124 const CryptoContext* ctx = NULLPTR); 0125 static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> stream, 0126 int64_t total_num_values, 0127 Compression::type codec, 0128 const ReaderProperties& properties, 0129 bool always_compressed = false, 0130 const CryptoContext* ctx = NULLPTR); 0131 0132 // If data_page_filter is present (not null), NextPage() will call the 0133 // callback function exactly once per page in the order the pages appear in 0134 // the column. If the callback function returns true the page will be 0135 // skipped. The callback will be called only if the page type is DATA_PAGE or 0136 // DATA_PAGE_V2. Dictionary pages will not be skipped. 0137 // Caller is responsible for checking that statistics are correct using 0138 // ApplicationVersion::HasCorrectStatistics(). 0139 // \note API EXPERIMENTAL 0140 void set_data_page_filter(DataPageFilter data_page_filter) { 0141 data_page_filter_ = std::move(data_page_filter); 0142 } 0143 0144 // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page> 0145 // containing new Page otherwise 0146 // 0147 // The returned Page may contain references that aren't guaranteed to live 0148 // beyond the next call to NextPage(). 0149 virtual std::shared_ptr<Page> NextPage() = 0; 0150 0151 virtual void set_max_page_header_size(uint32_t size) = 0; 0152 0153 protected: 0154 // Callback that decides if we should skip a page or not. 0155 DataPageFilter data_page_filter_; 0156 }; 0157 0158 class PARQUET_EXPORT ColumnReader { 0159 public: 0160 virtual ~ColumnReader() = default; 0161 0162 static std::shared_ptr<ColumnReader> Make( 0163 const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, 0164 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); 0165 0166 // Returns true if there are still values in this column. 0167 virtual bool HasNext() = 0; 0168 0169 virtual Type::type type() const = 0; 0170 0171 virtual const ColumnDescriptor* descr() const = 0; 0172 0173 // Get the encoding that can be exposed by this reader. If it returns 0174 // dictionary encoding, then ReadBatchWithDictionary can be used to read data. 0175 // 0176 // \note API EXPERIMENTAL 0177 virtual ExposedEncoding GetExposedEncoding() = 0; 0178 0179 protected: 0180 friend class RowGroupReader; 0181 // Set the encoding that can be exposed by this reader. 0182 // 0183 // \note API EXPERIMENTAL 0184 virtual void SetExposedEncoding(ExposedEncoding encoding) = 0; 0185 }; 0186 0187 // API to read values from a single column. This is a main client facing API. 0188 template <typename DType> 0189 class TypedColumnReader : public ColumnReader { 0190 public: 0191 using T = typename DType::c_type; 0192 0193 // Read a batch of repetition levels, definition levels, and values from the 0194 // column. 0195 // 0196 // Since null values are not stored in the values, the number of values read 0197 // may be less than the number of repetition and definition levels. With 0198 // nested data this is almost certainly true. 0199 // 0200 // Set def_levels or rep_levels to nullptr if you want to skip reading them. 0201 // This is only safe if you know through some other source that there are no 0202 // undefined values. 0203 // 0204 // To fully exhaust a row group, you must read batches until the number of 0205 // values read reaches the number of stored values according to the metadata. 0206 // 0207 // This API is the same for both V1 and V2 of the DataPage 0208 // 0209 // @returns: actual number of levels read (see values_read for number of values read) 0210 virtual int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, 0211 T* values, int64_t* values_read) = 0; 0212 0213 // Skip reading values. This method will work for both repeated and 0214 // non-repeated fields. Note that this method is skipping values and not 0215 // records. This distinction is important for repeated fields, meaning that 0216 // we are not skipping over the values to the next record. For example, 0217 // consider the following two consecutive records containing one repeated field: 0218 // {[1, 2, 3]}, {[4, 5]}. If we Skip(2), our next read value will be 3, which 0219 // is inside the first record. 0220 // Returns the number of values skipped. 0221 virtual int64_t Skip(int64_t num_values_to_skip) = 0; 0222 0223 // Read a batch of repetition levels, definition levels, and indices from the 0224 // column. And read the dictionary if a dictionary page is encountered during 0225 // reading pages. This API is similar to ReadBatch(), with ability to read 0226 // dictionary and indices. It is only valid to call this method when the reader can 0227 // expose dictionary encoding. (i.e., the reader's GetExposedEncoding() returns 0228 // DICTIONARY). 0229 // 0230 // The dictionary is read along with the data page. When there's no data page, 0231 // the dictionary won't be returned. 0232 // 0233 // @param batch_size The batch size to read 0234 // @param[out] def_levels The Parquet definition levels. 0235 // @param[out] rep_levels The Parquet repetition levels. 0236 // @param[out] indices The dictionary indices. 0237 // @param[out] indices_read The number of indices read. 0238 // @param[out] dict The pointer to dictionary values. It will return nullptr if 0239 // there's no data page. Each column chunk only has one dictionary page. The dictionary 0240 // is owned by the reader, so the caller is responsible for copying the dictionary 0241 // values before the reader gets destroyed. 0242 // @param[out] dict_len The dictionary length. It will return 0 if there's no data 0243 // page. 0244 // @returns: actual number of levels read (see indices_read for number of 0245 // indices read 0246 // 0247 // \note API EXPERIMENTAL 0248 virtual int64_t ReadBatchWithDictionary(int64_t batch_size, int16_t* def_levels, 0249 int16_t* rep_levels, int32_t* indices, 0250 int64_t* indices_read, const T** dict, 0251 int32_t* dict_len) = 0; 0252 }; 0253 0254 namespace internal { 0255 0256 /// \brief Stateful column reader that delimits semantic records for both flat 0257 /// and nested columns 0258 /// 0259 /// \note API EXPERIMENTAL 0260 /// \since 1.3.0 0261 class PARQUET_EXPORT RecordReader { 0262 public: 0263 /// \brief Creates a record reader. 0264 /// @param descr Column descriptor 0265 /// @param leaf_info Level info, used to determine if a column is nullable or not 0266 /// @param pool Memory pool to use for buffering values and rep/def levels 0267 /// @param read_dictionary True if reading directly as Arrow dictionary-encoded 0268 /// @param read_dense_for_nullable True if reading dense and not leaving space for null 0269 /// values 0270 /// @param arrow_type Which type to read this column as (optional). Currently 0271 /// only used for byte array columns (see BinaryRecordReader::GetBuilderChunks). 0272 static std::shared_ptr<RecordReader> Make( 0273 const ColumnDescriptor* descr, LevelInfo leaf_info, 0274 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), 0275 bool read_dictionary = false, bool read_dense_for_nullable = false, 0276 const std::shared_ptr<::arrow::DataType>& arrow_type = NULLPTR); 0277 0278 virtual ~RecordReader() = default; 0279 0280 /// \brief Attempt to read indicated number of records from column chunk 0281 /// Note that for repeated fields, a record may have more than one value 0282 /// and all of them are read. If read_dense_for_nullable() it will 0283 /// not leave any space for null values. Otherwise, it will read spaced. 0284 /// \return number of records read 0285 virtual int64_t ReadRecords(int64_t num_records) = 0; 0286 0287 /// \brief Attempt to skip indicated number of records from column chunk. 0288 /// Note that for repeated fields, a record may have more than one value 0289 /// and all of them are skipped. 0290 /// \return number of records skipped 0291 virtual int64_t SkipRecords(int64_t num_records) = 0; 0292 0293 /// \brief Pre-allocate space for data. Results in better flat read performance 0294 virtual void Reserve(int64_t num_values) = 0; 0295 0296 /// \brief Clear consumed values and repetition/definition levels as the 0297 /// result of calling ReadRecords 0298 /// For FLBA and ByteArray types, call GetBuilderChunks() to reset them. 0299 virtual void Reset() = 0; 0300 0301 /// \brief Transfer filled values buffer to caller. A new one will be 0302 /// allocated in subsequent ReadRecords calls 0303 virtual std::shared_ptr<ResizableBuffer> ReleaseValues() = 0; 0304 0305 /// \brief Transfer filled validity bitmap buffer to caller. A new one will 0306 /// be allocated in subsequent ReadRecords calls 0307 virtual std::shared_ptr<ResizableBuffer> ReleaseIsValid() = 0; 0308 0309 /// \brief Return true if the record reader has more internal data yet to 0310 /// process 0311 virtual bool HasMoreData() const = 0; 0312 0313 /// \brief Advance record reader to the next row group. Must be set before 0314 /// any records could be read/skipped. 0315 /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader 0316 virtual void SetPageReader(std::unique_ptr<PageReader> reader) = 0; 0317 0318 /// \brief Returns the underlying column reader's descriptor. 0319 virtual const ColumnDescriptor* descr() const = 0; 0320 0321 virtual void DebugPrintState() = 0; 0322 0323 /// \brief Returns the dictionary owned by the current decoder. Throws an 0324 /// exception if the current decoder is not for dictionary encoding. The caller is 0325 /// responsible for casting the returned pointer to proper type depending on the 0326 /// column's physical type. An example: 0327 /// const ByteArray* dict = reinterpret_cast<const ByteArray*>(ReadDictionary(&len)); 0328 /// or: 0329 /// const float* dict = reinterpret_cast<const float*>(ReadDictionary(&len)); 0330 /// \param[out] dictionary_length The number of dictionary entries. 0331 virtual const void* ReadDictionary(int32_t* dictionary_length) = 0; 0332 0333 /// \brief Decoded definition levels 0334 int16_t* def_levels() const { 0335 return reinterpret_cast<int16_t*>(def_levels_->mutable_data()); 0336 } 0337 0338 /// \brief Decoded repetition levels 0339 int16_t* rep_levels() const { 0340 return reinterpret_cast<int16_t*>(rep_levels_->mutable_data()); 0341 } 0342 0343 /// \brief Decoded values, including nulls, if any 0344 /// FLBA and ByteArray types do not use this array and read into their own 0345 /// builders. 0346 uint8_t* values() const { return values_->mutable_data(); } 0347 0348 /// \brief Number of values written, including space left for nulls if any. 0349 /// If this Reader was constructed with read_dense_for_nullable(), there is no space for 0350 /// nulls and null_count() will be 0. There is no read-ahead/buffering for values. For 0351 /// FLBA and ByteArray types this value reflects the values written with the last 0352 /// ReadRecords call since those readers will reset the values after each call. 0353 int64_t values_written() const { return values_written_; } 0354 0355 /// \brief Number of definition / repetition levels (from those that have 0356 /// been decoded) that have been consumed inside the reader. 0357 int64_t levels_position() const { return levels_position_; } 0358 0359 /// \brief Number of definition / repetition levels that have been written 0360 /// internally in the reader. This may be larger than values_written() because 0361 /// for repeated fields we need to look at the levels in advance to figure out 0362 /// the record boundaries. 0363 int64_t levels_written() const { return levels_written_; } 0364 0365 /// \brief Number of nulls in the leaf that we have read so far into the 0366 /// values vector. This is only valid when !read_dense_for_nullable(). When 0367 /// read_dense_for_nullable() it will always be 0. 0368 int64_t null_count() const { return null_count_; } 0369 0370 /// \brief True if the leaf values are nullable 0371 bool nullable_values() const { return nullable_values_; } 0372 0373 /// \brief True if reading directly as Arrow dictionary-encoded 0374 bool read_dictionary() const { return read_dictionary_; } 0375 0376 /// \brief True if reading dense for nullable columns. 0377 bool read_dense_for_nullable() const { return read_dense_for_nullable_; } 0378 0379 protected: 0380 /// \brief Indicates if we can have nullable values. Note that repeated fields 0381 /// may or may not be nullable. 0382 bool nullable_values_; 0383 0384 bool at_record_start_; 0385 int64_t records_read_; 0386 0387 /// \brief Stores values. These values are populated based on each ReadRecords 0388 /// call. No extra values are buffered for the next call. SkipRecords will not 0389 /// add any value to this buffer. 0390 std::shared_ptr<::arrow::ResizableBuffer> values_; 0391 /// \brief False for FIXED_LEN_BYTE_ARRAY and BYTE_ARRAY, in which case we 0392 /// don't allocate the values buffer and we directly read into builder classes. 0393 bool uses_values_; 0394 0395 /// \brief Values that we have read into 'values_' + 'null_count_'. 0396 int64_t values_written_; 0397 int64_t values_capacity_; 0398 int64_t null_count_; 0399 0400 /// \brief Each bit corresponds to one element in 'values_' and specifies if it 0401 /// is null or not null. 0402 /// 0403 /// Not set if leaf type is not nullable or read_dense_for_nullable_ is true. 0404 std::shared_ptr<::arrow::ResizableBuffer> valid_bits_; 0405 0406 /// \brief Buffer for definition levels. May contain more levels than 0407 /// is actually read. This is because we read levels ahead to 0408 /// figure out record boundaries for repeated fields. 0409 /// For flat required fields, 'def_levels_' and 'rep_levels_' are not 0410 /// populated. For non-repeated fields 'rep_levels_' is not populated. 0411 /// 'def_levels_' and 'rep_levels_' must be of the same size if present. 0412 std::shared_ptr<::arrow::ResizableBuffer> def_levels_; 0413 /// \brief Buffer for repetition levels. Only populated for repeated 0414 /// fields. 0415 std::shared_ptr<::arrow::ResizableBuffer> rep_levels_; 0416 0417 /// \brief Number of definition / repetition levels that have been written 0418 /// internally in the reader. This may be larger than values_written() since 0419 /// for repeated fields we need to look at the levels in advance to figure out 0420 /// the record boundaries. 0421 int64_t levels_written_; 0422 /// \brief Position of the next level that should be consumed. 0423 int64_t levels_position_; 0424 int64_t levels_capacity_; 0425 0426 bool read_dictionary_ = false; 0427 // If true, we will not leave any space for the null values in the values_ 0428 // vector or fill nulls values in BinaryRecordReader/DictionaryRecordReader. 0429 // 0430 // If read_dense_for_nullable_ is true, the BinaryRecordReader/DictionaryRecordReader 0431 // might still populate the validity bitmap buffer. 0432 bool read_dense_for_nullable_ = false; 0433 }; 0434 0435 class BinaryRecordReader : virtual public RecordReader { 0436 public: 0437 virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0; 0438 }; 0439 0440 /// \brief Read records directly to dictionary-encoded Arrow form (int32 0441 /// indices). Only valid for BYTE_ARRAY columns 0442 class DictionaryRecordReader : virtual public RecordReader { 0443 public: 0444 virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0; 0445 }; 0446 0447 } // namespace internal 0448 0449 using BoolReader = TypedColumnReader<BooleanType>; 0450 using Int32Reader = TypedColumnReader<Int32Type>; 0451 using Int64Reader = TypedColumnReader<Int64Type>; 0452 using Int96Reader = TypedColumnReader<Int96Type>; 0453 using FloatReader = TypedColumnReader<FloatType>; 0454 using DoubleReader = TypedColumnReader<DoubleType>; 0455 using ByteArrayReader = TypedColumnReader<ByteArrayType>; 0456 using FixedLenByteArrayReader = TypedColumnReader<FLBAType>; 0457 0458 } // namespace parquet
| [ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
|
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
|