Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:28:54

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cstdint>
0021 #include <memory>
0022 #include <utility>
0023 #include <vector>
0024 
0025 #include "arrow/type_fwd.h"
0026 #include "arrow/util/macros.h"
0027 #include "parquet/exception.h"
0028 #include "parquet/level_conversion.h"
0029 #include "parquet/metadata.h"
0030 #include "parquet/platform.h"
0031 #include "parquet/properties.h"
0032 #include "parquet/schema.h"
0033 #include "parquet/types.h"
0034 
0035 namespace arrow {
0036 
0037 namespace bit_util {
0038 class BitReader;
0039 }  // namespace bit_util
0040 
0041 namespace util {
0042 template <typename T>
0043 class RleBitPackedDecoder;
0044 }  // namespace util
0045 
0046 }  // namespace arrow
0047 
0048 namespace parquet {
0049 
0050 class Decryptor;
0051 class Page;
0052 
0053 // 16 MB is the default maximum page header size
0054 static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
0055 
0056 // 16 KB is the default expected page header size
0057 static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
0058 
0059 // \brief DataPageStats stores encoded statistics and number of values/rows for
0060 // a page.
0061 struct PARQUET_EXPORT DataPageStats {
0062   DataPageStats(const EncodedStatistics* encoded_statistics, int32_t num_values,
0063                 std::optional<int32_t> num_rows)
0064       : encoded_statistics(encoded_statistics),
0065         num_values(num_values),
0066         num_rows(num_rows) {}
0067 
0068   // Encoded statistics extracted from the page header.
0069   // Nullptr if there are no statistics in the page header.
0070   const EncodedStatistics* encoded_statistics;
0071   // Number of values stored in the page. Filled for both V1 and V2 data pages.
0072   // For repeated fields, this can be greater than number of rows. For
0073   // non-repeated fields, this will be the same as the number of rows.
0074   int32_t num_values;
0075   // Number of rows stored in the page. std::nullopt if not available.
0076   std::optional<int32_t> num_rows;
0077 };
0078 
0079 class PARQUET_EXPORT LevelDecoder {
0080  public:
0081   LevelDecoder();
0082   ~LevelDecoder();
0083 
0084   // Initialize the LevelDecoder state with new data
0085   // and return the number of bytes consumed
0086   int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values,
0087               const uint8_t* data, int32_t data_size);
0088 
0089   void SetDataV2(int32_t num_bytes, int16_t max_level, int num_buffered_values,
0090                  const uint8_t* data);
0091 
0092   // Decodes a batch of levels into an array and returns the number of levels decoded
0093   int Decode(int batch_size, int16_t* levels);
0094 
0095  private:
0096   int bit_width_;
0097   int num_values_remaining_;
0098   Encoding::type encoding_;
0099   std::unique_ptr<::arrow::util::RleBitPackedDecoder<int16_t>> rle_decoder_;
0100   std::unique_ptr<::arrow::bit_util::BitReader> bit_packed_decoder_;
0101   int16_t max_level_;
0102 };
0103 
0104 struct CryptoContext {
0105   bool start_decrypt_with_dictionary_page = false;
0106   int16_t row_group_ordinal = -1;
0107   int16_t column_ordinal = -1;
0108   std::function<std::unique_ptr<Decryptor>()> meta_decryptor_factory;
0109   std::function<std::unique_ptr<Decryptor>()> data_decryptor_factory;
0110 };
0111 
0112 // Abstract page iterator interface. This way, we can feed column pages to the
0113 // ColumnReader through whatever mechanism we choose
0114 class PARQUET_EXPORT PageReader {
0115   using DataPageFilter = std::function<bool(const DataPageStats&)>;
0116 
0117  public:
0118   virtual ~PageReader() = default;
0119 
0120   static std::unique_ptr<PageReader> Open(
0121       std::shared_ptr<ArrowInputStream> stream, int64_t total_num_values,
0122       Compression::type codec, bool always_compressed = false,
0123       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
0124       const CryptoContext* ctx = NULLPTR);
0125   static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> stream,
0126                                           int64_t total_num_values,
0127                                           Compression::type codec,
0128                                           const ReaderProperties& properties,
0129                                           bool always_compressed = false,
0130                                           const CryptoContext* ctx = NULLPTR);
0131 
0132   // If data_page_filter is present (not null), NextPage() will call the
0133   // callback function exactly once per page in the order the pages appear in
0134   // the column. If the callback function returns true the page will be
0135   // skipped. The callback will be called only if the page type is DATA_PAGE or
0136   // DATA_PAGE_V2. Dictionary pages will not be skipped.
0137   // Caller is responsible for checking that statistics are correct using
0138   // ApplicationVersion::HasCorrectStatistics().
0139   // \note API EXPERIMENTAL
0140   void set_data_page_filter(DataPageFilter data_page_filter) {
0141     data_page_filter_ = std::move(data_page_filter);
0142   }
0143 
0144   // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
0145   // containing new Page otherwise
0146   //
0147   // The returned Page may contain references that aren't guaranteed to live
0148   // beyond the next call to NextPage().
0149   virtual std::shared_ptr<Page> NextPage() = 0;
0150 
0151   virtual void set_max_page_header_size(uint32_t size) = 0;
0152 
0153  protected:
0154   // Callback that decides if we should skip a page or not.
0155   DataPageFilter data_page_filter_;
0156 };
0157 
0158 class PARQUET_EXPORT ColumnReader {
0159  public:
0160   virtual ~ColumnReader() = default;
0161 
0162   static std::shared_ptr<ColumnReader> Make(
0163       const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager,
0164       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
0165 
0166   // Returns true if there are still values in this column.
0167   virtual bool HasNext() = 0;
0168 
0169   virtual Type::type type() const = 0;
0170 
0171   virtual const ColumnDescriptor* descr() const = 0;
0172 
0173   // Get the encoding that can be exposed by this reader. If it returns
0174   // dictionary encoding, then ReadBatchWithDictionary can be used to read data.
0175   //
0176   // \note API EXPERIMENTAL
0177   virtual ExposedEncoding GetExposedEncoding() = 0;
0178 
0179  protected:
0180   friend class RowGroupReader;
0181   // Set the encoding that can be exposed by this reader.
0182   //
0183   // \note API EXPERIMENTAL
0184   virtual void SetExposedEncoding(ExposedEncoding encoding) = 0;
0185 };
0186 
0187 // API to read values from a single column. This is a main client facing API.
0188 template <typename DType>
0189 class TypedColumnReader : public ColumnReader {
0190  public:
0191   using T = typename DType::c_type;
0192 
0193   // Read a batch of repetition levels, definition levels, and values from the
0194   // column.
0195   //
0196   // Since null values are not stored in the values, the number of values read
0197   // may be less than the number of repetition and definition levels. With
0198   // nested data this is almost certainly true.
0199   //
0200   // Set def_levels or rep_levels to nullptr if you want to skip reading them.
0201   // This is only safe if you know through some other source that there are no
0202   // undefined values.
0203   //
0204   // To fully exhaust a row group, you must read batches until the number of
0205   // values read reaches the number of stored values according to the metadata.
0206   //
0207   // This API is the same for both V1 and V2 of the DataPage
0208   //
0209   // @returns: actual number of levels read (see values_read for number of values read)
0210   virtual int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
0211                             T* values, int64_t* values_read) = 0;
0212 
0213   // Skip reading values. This method will work for both repeated and
0214   // non-repeated fields. Note that this method is skipping values and not
0215   // records. This distinction is important for repeated fields, meaning that
0216   // we are not skipping over the values to the next record. For example,
0217   // consider the following two consecutive records containing one repeated field:
0218   // {[1, 2, 3]}, {[4, 5]}. If we Skip(2), our next read value will be 3, which
0219   // is inside the first record.
0220   // Returns the number of values skipped.
0221   virtual int64_t Skip(int64_t num_values_to_skip) = 0;
0222 
0223   // Read a batch of repetition levels, definition levels, and indices from the
0224   // column. And read the dictionary if a dictionary page is encountered during
0225   // reading pages. This API is similar to ReadBatch(), with ability to read
0226   // dictionary and indices. It is only valid to call this method  when the reader can
0227   // expose dictionary encoding. (i.e., the reader's GetExposedEncoding() returns
0228   // DICTIONARY).
0229   //
0230   // The dictionary is read along with the data page. When there's no data page,
0231   // the dictionary won't be returned.
0232   //
0233   // @param batch_size The batch size to read
0234   // @param[out] def_levels The Parquet definition levels.
0235   // @param[out] rep_levels The Parquet repetition levels.
0236   // @param[out] indices The dictionary indices.
0237   // @param[out] indices_read The number of indices read.
0238   // @param[out] dict The pointer to dictionary values. It will return nullptr if
0239   // there's no data page. Each column chunk only has one dictionary page. The dictionary
0240   // is owned by the reader, so the caller is responsible for copying the dictionary
0241   // values before the reader gets destroyed.
0242   // @param[out] dict_len The dictionary length. It will return 0 if there's no data
0243   // page.
0244   // @returns: actual number of levels read (see indices_read for number of
0245   // indices read
0246   //
0247   // \note API EXPERIMENTAL
0248   virtual int64_t ReadBatchWithDictionary(int64_t batch_size, int16_t* def_levels,
0249                                           int16_t* rep_levels, int32_t* indices,
0250                                           int64_t* indices_read, const T** dict,
0251                                           int32_t* dict_len) = 0;
0252 };
0253 
0254 namespace internal {
0255 
0256 /// \brief Stateful column reader that delimits semantic records for both flat
0257 /// and nested columns
0258 ///
0259 /// \note API EXPERIMENTAL
0260 /// \since 1.3.0
0261 class PARQUET_EXPORT RecordReader {
0262  public:
0263   /// \brief Creates a record reader.
0264   /// @param descr Column descriptor
0265   /// @param leaf_info Level info, used to determine if a column is nullable or not
0266   /// @param pool Memory pool to use for buffering values and rep/def levels
0267   /// @param read_dictionary True if reading directly as Arrow dictionary-encoded
0268   /// @param read_dense_for_nullable True if reading dense and not leaving space for null
0269   /// values
0270   /// @param arrow_type Which type to read this column as (optional). Currently
0271   /// only used for byte array columns (see BinaryRecordReader::GetBuilderChunks).
0272   static std::shared_ptr<RecordReader> Make(
0273       const ColumnDescriptor* descr, LevelInfo leaf_info,
0274       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
0275       bool read_dictionary = false, bool read_dense_for_nullable = false,
0276       const std::shared_ptr<::arrow::DataType>& arrow_type = NULLPTR);
0277 
0278   virtual ~RecordReader() = default;
0279 
0280   /// \brief Attempt to read indicated number of records from column chunk
0281   /// Note that for repeated fields, a record may have more than one value
0282   /// and all of them are read. If read_dense_for_nullable() it will
0283   /// not leave any space for null values. Otherwise, it will read spaced.
0284   /// \return number of records read
0285   virtual int64_t ReadRecords(int64_t num_records) = 0;
0286 
0287   /// \brief Attempt to skip indicated number of records from column chunk.
0288   /// Note that for repeated fields, a record may have more than one value
0289   /// and all of them are skipped.
0290   /// \return number of records skipped
0291   virtual int64_t SkipRecords(int64_t num_records) = 0;
0292 
0293   /// \brief Pre-allocate space for data. Results in better flat read performance
0294   virtual void Reserve(int64_t num_values) = 0;
0295 
0296   /// \brief Clear consumed values and repetition/definition levels as the
0297   /// result of calling ReadRecords
0298   /// For FLBA and ByteArray types, call GetBuilderChunks() to reset them.
0299   virtual void Reset() = 0;
0300 
0301   /// \brief Transfer filled values buffer to caller. A new one will be
0302   /// allocated in subsequent ReadRecords calls
0303   virtual std::shared_ptr<ResizableBuffer> ReleaseValues() = 0;
0304 
0305   /// \brief Transfer filled validity bitmap buffer to caller. A new one will
0306   /// be allocated in subsequent ReadRecords calls
0307   virtual std::shared_ptr<ResizableBuffer> ReleaseIsValid() = 0;
0308 
0309   /// \brief Return true if the record reader has more internal data yet to
0310   /// process
0311   virtual bool HasMoreData() const = 0;
0312 
0313   /// \brief Advance record reader to the next row group. Must be set before
0314   /// any records could be read/skipped.
0315   /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader
0316   virtual void SetPageReader(std::unique_ptr<PageReader> reader) = 0;
0317 
0318   /// \brief Returns the underlying column reader's descriptor.
0319   virtual const ColumnDescriptor* descr() const = 0;
0320 
0321   virtual void DebugPrintState() = 0;
0322 
0323   /// \brief Returns the dictionary owned by the current decoder. Throws an
0324   /// exception if the current decoder is not for dictionary encoding. The caller is
0325   /// responsible for casting the returned pointer to proper type depending on the
0326   /// column's physical type. An example:
0327   ///   const ByteArray* dict = reinterpret_cast<const ByteArray*>(ReadDictionary(&len));
0328   /// or:
0329   ///   const float* dict = reinterpret_cast<const float*>(ReadDictionary(&len));
0330   /// \param[out] dictionary_length The number of dictionary entries.
0331   virtual const void* ReadDictionary(int32_t* dictionary_length) = 0;
0332 
0333   /// \brief Decoded definition levels
0334   int16_t* def_levels() const {
0335     return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
0336   }
0337 
0338   /// \brief Decoded repetition levels
0339   int16_t* rep_levels() const {
0340     return reinterpret_cast<int16_t*>(rep_levels_->mutable_data());
0341   }
0342 
0343   /// \brief Decoded values, including nulls, if any
0344   /// FLBA and ByteArray types do not use this array and read into their own
0345   /// builders.
0346   uint8_t* values() const { return values_->mutable_data(); }
0347 
0348   /// \brief Number of values written, including space left for nulls if any.
0349   /// If this Reader was constructed with read_dense_for_nullable(), there is no space for
0350   /// nulls and null_count() will be 0. There is no read-ahead/buffering for values. For
0351   /// FLBA and ByteArray types this value reflects the values written with the last
0352   /// ReadRecords call since those readers will reset the values after each call.
0353   int64_t values_written() const { return values_written_; }
0354 
0355   /// \brief Number of definition / repetition levels (from those that have
0356   /// been decoded) that have been consumed inside the reader.
0357   int64_t levels_position() const { return levels_position_; }
0358 
0359   /// \brief Number of definition / repetition levels that have been written
0360   /// internally in the reader. This may be larger than values_written() because
0361   /// for repeated fields we need to look at the levels in advance to figure out
0362   /// the record boundaries.
0363   int64_t levels_written() const { return levels_written_; }
0364 
0365   /// \brief Number of nulls in the leaf that we have read so far into the
0366   /// values vector. This is only valid when !read_dense_for_nullable(). When
0367   /// read_dense_for_nullable() it will always be 0.
0368   int64_t null_count() const { return null_count_; }
0369 
0370   /// \brief True if the leaf values are nullable
0371   bool nullable_values() const { return nullable_values_; }
0372 
0373   /// \brief True if reading directly as Arrow dictionary-encoded
0374   bool read_dictionary() const { return read_dictionary_; }
0375 
0376   /// \brief True if reading dense for nullable columns.
0377   bool read_dense_for_nullable() const { return read_dense_for_nullable_; }
0378 
0379  protected:
0380   /// \brief Indicates if we can have nullable values. Note that repeated fields
0381   /// may or may not be nullable.
0382   bool nullable_values_;
0383 
0384   bool at_record_start_;
0385   int64_t records_read_;
0386 
0387   /// \brief Stores values. These values are populated based on each ReadRecords
0388   /// call. No extra values are buffered for the next call. SkipRecords will not
0389   /// add any value to this buffer.
0390   std::shared_ptr<::arrow::ResizableBuffer> values_;
0391   /// \brief False for FIXED_LEN_BYTE_ARRAY and BYTE_ARRAY, in which case we
0392   /// don't allocate the values buffer and we directly read into builder classes.
0393   bool uses_values_;
0394 
0395   /// \brief Values that we have read into 'values_' + 'null_count_'.
0396   int64_t values_written_;
0397   int64_t values_capacity_;
0398   int64_t null_count_;
0399 
0400   /// \brief Each bit corresponds to one element in 'values_' and specifies if it
0401   /// is null or not null.
0402   ///
0403   /// Not set if leaf type is not nullable or read_dense_for_nullable_ is true.
0404   std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
0405 
0406   /// \brief Buffer for definition levels. May contain more levels than
0407   /// is actually read. This is because we read levels ahead to
0408   /// figure out record boundaries for repeated fields.
0409   /// For flat required fields, 'def_levels_' and 'rep_levels_' are not
0410   ///  populated. For non-repeated fields 'rep_levels_' is not populated.
0411   /// 'def_levels_' and 'rep_levels_' must be of the same size if present.
0412   std::shared_ptr<::arrow::ResizableBuffer> def_levels_;
0413   /// \brief Buffer for repetition levels. Only populated for repeated
0414   /// fields.
0415   std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;
0416 
0417   /// \brief Number of definition / repetition levels that have been written
0418   /// internally in the reader. This may be larger than values_written() since
0419   /// for repeated fields we need to look at the levels in advance to figure out
0420   /// the record boundaries.
0421   int64_t levels_written_;
0422   /// \brief Position of the next level that should be consumed.
0423   int64_t levels_position_;
0424   int64_t levels_capacity_;
0425 
0426   bool read_dictionary_ = false;
0427   // If true, we will not leave any space for the null values in the values_
0428   // vector or fill nulls values in BinaryRecordReader/DictionaryRecordReader.
0429   //
0430   // If read_dense_for_nullable_ is true, the BinaryRecordReader/DictionaryRecordReader
0431   // might still populate the validity bitmap buffer.
0432   bool read_dense_for_nullable_ = false;
0433 };
0434 
0435 class BinaryRecordReader : virtual public RecordReader {
0436  public:
0437   virtual std::vector<std::shared_ptr<::arrow::Array>> GetBuilderChunks() = 0;
0438 };
0439 
0440 /// \brief Read records directly to dictionary-encoded Arrow form (int32
0441 /// indices). Only valid for BYTE_ARRAY columns
0442 class DictionaryRecordReader : virtual public RecordReader {
0443  public:
0444   virtual std::shared_ptr<::arrow::ChunkedArray> GetResult() = 0;
0445 };
0446 
0447 }  // namespace internal
0448 
0449 using BoolReader = TypedColumnReader<BooleanType>;
0450 using Int32Reader = TypedColumnReader<Int32Type>;
0451 using Int64Reader = TypedColumnReader<Int64Type>;
0452 using Int96Reader = TypedColumnReader<Int96Type>;
0453 using FloatReader = TypedColumnReader<FloatType>;
0454 using DoubleReader = TypedColumnReader<DoubleType>;
0455 using ByteArrayReader = TypedColumnReader<ByteArrayType>;
0456 using FixedLenByteArrayReader = TypedColumnReader<FLBAType>;
0457 
0458 }  // namespace parquet