Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:28:54

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <stdio.h>
0021 
0022 #include <cstdint>
0023 #include <memory>
0024 #include <ostream>
0025 #include <string>
0026 #include <utility>
0027 #include <vector>
0028 
0029 #include "parquet/column_reader.h"
0030 #include "parquet/exception.h"
0031 #include "parquet/platform.h"
0032 #include "parquet/schema.h"
0033 #include "parquet/types.h"
0034 
0035 namespace parquet {
0036 
0037 static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
0038 
0039 class PARQUET_EXPORT Scanner {
0040  public:
0041   explicit Scanner(std::shared_ptr<ColumnReader> reader,
0042                    int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
0043                    ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
0044       : batch_size_(batch_size),
0045         level_offset_(0),
0046         levels_buffered_(0),
0047         value_buffer_(AllocateBuffer(pool)),
0048         value_offset_(0),
0049         values_buffered_(0),
0050         reader_(std::move(reader)) {
0051     def_levels_.resize(
0052         descr()->max_definition_level() > 0 ? static_cast<size_t>(batch_size_) : 0);
0053     rep_levels_.resize(
0054         descr()->max_repetition_level() > 0 ? static_cast<size_t>(batch_size_) : 0);
0055   }
0056 
0057   virtual ~Scanner() {}
0058 
0059   static std::shared_ptr<Scanner> Make(
0060       std::shared_ptr<ColumnReader> col_reader,
0061       int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
0062       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
0063 
0064   virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0;
0065 
0066   bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
0067 
0068   const ColumnDescriptor* descr() const { return reader_->descr(); }
0069 
0070   int64_t batch_size() const { return batch_size_; }
0071 
0072   void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
0073 
0074  protected:
0075   int64_t batch_size_;
0076 
0077   std::vector<int16_t> def_levels_;
0078   std::vector<int16_t> rep_levels_;
0079   int level_offset_;
0080   int levels_buffered_;
0081 
0082   std::shared_ptr<ResizableBuffer> value_buffer_;
0083   int value_offset_;
0084   int64_t values_buffered_;
0085   std::shared_ptr<ColumnReader> reader_;
0086 };
0087 
0088 template <typename DType>
0089 class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner {
0090  public:
0091   typedef typename DType::c_type T;
0092 
0093   explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
0094                         int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
0095                         ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
0096       : Scanner(std::move(reader), batch_size, pool) {
0097     typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader_.get());
0098     int value_byte_size = type_traits<DType::type_num>::value_byte_size;
0099     PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
0100     values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
0101   }
0102 
0103   virtual ~TypedScanner() {}
0104 
0105   bool NextLevels(int16_t* def_level, int16_t* rep_level) {
0106     if (level_offset_ == levels_buffered_) {
0107       levels_buffered_ = static_cast<int>(
0108           typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(),
0109                                    rep_levels_.data(), values_, &values_buffered_));
0110 
0111       value_offset_ = 0;
0112       level_offset_ = 0;
0113       if (!levels_buffered_) {
0114         return false;
0115       }
0116     }
0117     *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
0118     *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
0119     level_offset_++;
0120     return true;
0121   }
0122 
0123   bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
0124     if (level_offset_ == levels_buffered_) {
0125       if (!HasNext()) {
0126         // Out of data pages
0127         return false;
0128       }
0129     }
0130 
0131     NextLevels(def_level, rep_level);
0132     *is_null = *def_level < descr()->max_definition_level();
0133 
0134     if (*is_null) {
0135       return true;
0136     }
0137 
0138     if (value_offset_ == values_buffered_) {
0139       throw ParquetException("Value was non-null, but has not been buffered");
0140     }
0141     *val = values_[value_offset_++];
0142     return true;
0143   }
0144 
0145   // Returns true if there is a next value
0146   bool NextValue(T* val, bool* is_null) {
0147     if (level_offset_ == levels_buffered_) {
0148       if (!HasNext()) {
0149         // Out of data pages
0150         return false;
0151       }
0152     }
0153 
0154     // Out of values
0155     int16_t def_level = -1;
0156     int16_t rep_level = -1;
0157     NextLevels(&def_level, &rep_level);
0158     *is_null = def_level < descr()->max_definition_level();
0159 
0160     if (*is_null) {
0161       return true;
0162     }
0163 
0164     if (value_offset_ == values_buffered_) {
0165       throw ParquetException("Value was non-null, but has not been buffered");
0166     }
0167     *val = values_[value_offset_++];
0168     return true;
0169   }
0170 
0171   virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) {
0172     T val{};
0173     int16_t def_level = -1;
0174     int16_t rep_level = -1;
0175     bool is_null = false;
0176     char buffer[80];
0177 
0178     if (!Next(&val, &def_level, &rep_level, &is_null)) {
0179       throw ParquetException("No more values buffered");
0180     }
0181 
0182     if (with_levels) {
0183       out << "  D:" << def_level << " R:" << rep_level << " ";
0184       if (!is_null) {
0185         out << "V:";
0186       }
0187     }
0188 
0189     if (is_null) {
0190       std::string null_fmt = format_fwf<ByteArrayType>(width);
0191       snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
0192     } else {
0193       FormatValue(&val, buffer, sizeof(buffer), width);
0194     }
0195     out << buffer;
0196   }
0197 
0198  private:
0199   // The ownership of this object is expressed through the reader_ variable in the base
0200   TypedColumnReader<DType>* typed_reader_;
0201 
0202   inline void FormatValue(void* val, char* buffer, int bufsize, int width);
0203 
0204   T* values_;
0205 };
0206 
0207 template <typename DType>
0208 inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize,
0209                                              int width) {
0210   std::string fmt = format_fwf<DType>(width);
0211   snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
0212 }
0213 
0214 template <>
0215 inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize,
0216                                                  int width) {
0217   std::string fmt = format_fwf<Int96Type>(width);
0218   std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
0219   snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
0220 }
0221 
0222 template <>
0223 inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize,
0224                                                      int width) {
0225   std::string fmt = format_fwf<ByteArrayType>(width);
0226   std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
0227   snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
0228 }
0229 
0230 template <>
0231 inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize,
0232                                                 int width) {
0233   std::string fmt = format_fwf<FLBAType>(width);
0234   std::string result = FixedLenByteArrayToString(
0235       *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
0236   snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
0237 }
0238 
0239 typedef TypedScanner<BooleanType> BoolScanner;
0240 typedef TypedScanner<Int32Type> Int32Scanner;
0241 typedef TypedScanner<Int64Type> Int64Scanner;
0242 typedef TypedScanner<Int96Type> Int96Scanner;
0243 typedef TypedScanner<FloatType> FloatScanner;
0244 typedef TypedScanner<DoubleType> DoubleScanner;
0245 typedef TypedScanner<ByteArrayType> ByteArrayScanner;
0246 typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
0247 
0248 template <typename RType>
0249 int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
0250                 uint8_t* values, int64_t* values_buffered,
0251                 parquet::ColumnReader* reader) {
0252   typedef typename RType::T Type;
0253   auto typed_reader = static_cast<RType*>(reader);
0254   auto vals = reinterpret_cast<Type*>(&values[0]);
0255   return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals,
0256                                  values_buffered);
0257 }
0258 
0259 int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
0260                                      int16_t* rep_levels, uint8_t* values,
0261                                      int64_t* values_buffered,
0262                                      parquet::ColumnReader* reader);
0263 
0264 }  // namespace parquet