File indexing completed on 2026-04-17 08:28:54
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <stdio.h>
0021
0022 #include <cstdint>
0023 #include <memory>
0024 #include <ostream>
0025 #include <string>
0026 #include <utility>
0027 #include <vector>
0028
0029 #include "parquet/column_reader.h"
0030 #include "parquet/exception.h"
0031 #include "parquet/platform.h"
0032 #include "parquet/schema.h"
0033 #include "parquet/types.h"
0034
0035 namespace parquet {
0036
0037 static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
0038
0039 class PARQUET_EXPORT Scanner {
0040 public:
0041 explicit Scanner(std::shared_ptr<ColumnReader> reader,
0042 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
0043 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
0044 : batch_size_(batch_size),
0045 level_offset_(0),
0046 levels_buffered_(0),
0047 value_buffer_(AllocateBuffer(pool)),
0048 value_offset_(0),
0049 values_buffered_(0),
0050 reader_(std::move(reader)) {
0051 def_levels_.resize(
0052 descr()->max_definition_level() > 0 ? static_cast<size_t>(batch_size_) : 0);
0053 rep_levels_.resize(
0054 descr()->max_repetition_level() > 0 ? static_cast<size_t>(batch_size_) : 0);
0055 }
0056
0057 virtual ~Scanner() {}
0058
0059 static std::shared_ptr<Scanner> Make(
0060 std::shared_ptr<ColumnReader> col_reader,
0061 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
0062 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
0063
0064 virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0;
0065
0066 bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
0067
0068 const ColumnDescriptor* descr() const { return reader_->descr(); }
0069
0070 int64_t batch_size() const { return batch_size_; }
0071
0072 void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
0073
0074 protected:
0075 int64_t batch_size_;
0076
0077 std::vector<int16_t> def_levels_;
0078 std::vector<int16_t> rep_levels_;
0079 int level_offset_;
0080 int levels_buffered_;
0081
0082 std::shared_ptr<ResizableBuffer> value_buffer_;
0083 int value_offset_;
0084 int64_t values_buffered_;
0085 std::shared_ptr<ColumnReader> reader_;
0086 };
0087
0088 template <typename DType>
0089 class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner {
0090 public:
0091 typedef typename DType::c_type T;
0092
0093 explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
0094 int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
0095 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
0096 : Scanner(std::move(reader), batch_size, pool) {
0097 typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader_.get());
0098 int value_byte_size = type_traits<DType::type_num>::value_byte_size;
0099 PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
0100 values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
0101 }
0102
0103 virtual ~TypedScanner() {}
0104
0105 bool NextLevels(int16_t* def_level, int16_t* rep_level) {
0106 if (level_offset_ == levels_buffered_) {
0107 levels_buffered_ = static_cast<int>(
0108 typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(),
0109 rep_levels_.data(), values_, &values_buffered_));
0110
0111 value_offset_ = 0;
0112 level_offset_ = 0;
0113 if (!levels_buffered_) {
0114 return false;
0115 }
0116 }
0117 *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
0118 *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
0119 level_offset_++;
0120 return true;
0121 }
0122
0123 bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
0124 if (level_offset_ == levels_buffered_) {
0125 if (!HasNext()) {
0126
0127 return false;
0128 }
0129 }
0130
0131 NextLevels(def_level, rep_level);
0132 *is_null = *def_level < descr()->max_definition_level();
0133
0134 if (*is_null) {
0135 return true;
0136 }
0137
0138 if (value_offset_ == values_buffered_) {
0139 throw ParquetException("Value was non-null, but has not been buffered");
0140 }
0141 *val = values_[value_offset_++];
0142 return true;
0143 }
0144
0145
0146 bool NextValue(T* val, bool* is_null) {
0147 if (level_offset_ == levels_buffered_) {
0148 if (!HasNext()) {
0149
0150 return false;
0151 }
0152 }
0153
0154
0155 int16_t def_level = -1;
0156 int16_t rep_level = -1;
0157 NextLevels(&def_level, &rep_level);
0158 *is_null = def_level < descr()->max_definition_level();
0159
0160 if (*is_null) {
0161 return true;
0162 }
0163
0164 if (value_offset_ == values_buffered_) {
0165 throw ParquetException("Value was non-null, but has not been buffered");
0166 }
0167 *val = values_[value_offset_++];
0168 return true;
0169 }
0170
0171 virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) {
0172 T val{};
0173 int16_t def_level = -1;
0174 int16_t rep_level = -1;
0175 bool is_null = false;
0176 char buffer[80];
0177
0178 if (!Next(&val, &def_level, &rep_level, &is_null)) {
0179 throw ParquetException("No more values buffered");
0180 }
0181
0182 if (with_levels) {
0183 out << " D:" << def_level << " R:" << rep_level << " ";
0184 if (!is_null) {
0185 out << "V:";
0186 }
0187 }
0188
0189 if (is_null) {
0190 std::string null_fmt = format_fwf<ByteArrayType>(width);
0191 snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
0192 } else {
0193 FormatValue(&val, buffer, sizeof(buffer), width);
0194 }
0195 out << buffer;
0196 }
0197
0198 private:
0199
0200 TypedColumnReader<DType>* typed_reader_;
0201
0202 inline void FormatValue(void* val, char* buffer, int bufsize, int width);
0203
0204 T* values_;
0205 };
0206
0207 template <typename DType>
0208 inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize,
0209 int width) {
0210 std::string fmt = format_fwf<DType>(width);
0211 snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
0212 }
0213
0214 template <>
0215 inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize,
0216 int width) {
0217 std::string fmt = format_fwf<Int96Type>(width);
0218 std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
0219 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
0220 }
0221
0222 template <>
0223 inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize,
0224 int width) {
0225 std::string fmt = format_fwf<ByteArrayType>(width);
0226 std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
0227 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
0228 }
0229
0230 template <>
0231 inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize,
0232 int width) {
0233 std::string fmt = format_fwf<FLBAType>(width);
0234 std::string result = FixedLenByteArrayToString(
0235 *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
0236 snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
0237 }
0238
0239 typedef TypedScanner<BooleanType> BoolScanner;
0240 typedef TypedScanner<Int32Type> Int32Scanner;
0241 typedef TypedScanner<Int64Type> Int64Scanner;
0242 typedef TypedScanner<Int96Type> Int96Scanner;
0243 typedef TypedScanner<FloatType> FloatScanner;
0244 typedef TypedScanner<DoubleType> DoubleScanner;
0245 typedef TypedScanner<ByteArrayType> ByteArrayScanner;
0246 typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
0247
0248 template <typename RType>
0249 int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
0250 uint8_t* values, int64_t* values_buffered,
0251 parquet::ColumnReader* reader) {
0252 typedef typename RType::T Type;
0253 auto typed_reader = static_cast<RType*>(reader);
0254 auto vals = reinterpret_cast<Type*>(&values[0]);
0255 return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals,
0256 values_buffered);
0257 }
0258
0259 int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
0260 int16_t* rep_levels, uint8_t* values,
0261 int64_t* values_buffered,
0262 parquet::ColumnReader* reader);
0263
0264 }