File indexing completed on 2026-04-17 08:28:55
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <array>
0021 #include <chrono>
0022 #include <cstdint>
0023 #include <cstring>
0024 #include <memory>
0025 #include <optional>
0026 #include <string>
0027 #include <vector>
0028
0029 #include "parquet/column_reader.h"
0030 #include "parquet/file_reader.h"
0031 #include "parquet/stream_writer.h"
0032
0033 namespace parquet {
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058 class PARQUET_EXPORT StreamReader {
0059 public:
0060 template <typename T>
0061 using optional = ::std::optional<T>;
0062
0063
0064
0065
0066 StreamReader() = default;
0067
0068 explicit StreamReader(std::unique_ptr<ParquetFileReader> reader);
0069
0070 ~StreamReader() = default;
0071
0072 bool eof() const { return eof_; }
0073
0074 int current_column() const { return column_index_; }
0075
0076 int64_t current_row() const { return current_row_; }
0077
0078 int num_columns() const;
0079
0080 int64_t num_rows() const;
0081
0082
0083 StreamReader(StreamReader&&) = default;
0084 StreamReader& operator=(StreamReader&&) = default;
0085
0086
0087 StreamReader(const StreamReader&) = delete;
0088 StreamReader& operator=(const StreamReader&) = delete;
0089
0090 StreamReader& operator>>(bool& v);
0091
0092 StreamReader& operator>>(int8_t& v);
0093
0094 StreamReader& operator>>(uint8_t& v);
0095
0096 StreamReader& operator>>(int16_t& v);
0097
0098 StreamReader& operator>>(uint16_t& v);
0099
0100 StreamReader& operator>>(int32_t& v);
0101
0102 StreamReader& operator>>(uint32_t& v);
0103
0104 StreamReader& operator>>(int64_t& v);
0105
0106 StreamReader& operator>>(uint64_t& v);
0107
0108 StreamReader& operator>>(std::chrono::milliseconds& v);
0109
0110 StreamReader& operator>>(std::chrono::microseconds& v);
0111
0112 StreamReader& operator>>(float& v);
0113
0114 StreamReader& operator>>(double& v);
0115
0116 StreamReader& operator>>(char& v);
0117
0118 template <int N>
0119 StreamReader& operator>>(char (&v)[N]) {
0120 ReadFixedLength(v, N);
0121 return *this;
0122 }
0123
0124 template <std::size_t N>
0125 StreamReader& operator>>(std::array<char, N>& v) {
0126 ReadFixedLength(v.data(), static_cast<int>(N));
0127 return *this;
0128 }
0129
0130
0131
0132
0133
0134
0135 StreamReader& operator>>(std::string& v);
0136
0137 StreamReader& operator>>(::arrow::Decimal128& v);
0138
0139
0140
0141 StreamReader& operator>>(optional<bool>& v);
0142
0143 StreamReader& operator>>(optional<int8_t>& v);
0144
0145 StreamReader& operator>>(optional<uint8_t>& v);
0146
0147 StreamReader& operator>>(optional<int16_t>& v);
0148
0149 StreamReader& operator>>(optional<uint16_t>& v);
0150
0151 StreamReader& operator>>(optional<int32_t>& v);
0152
0153 StreamReader& operator>>(optional<uint32_t>& v);
0154
0155 StreamReader& operator>>(optional<int64_t>& v);
0156
0157 StreamReader& operator>>(optional<uint64_t>& v);
0158
0159 StreamReader& operator>>(optional<float>& v);
0160
0161 StreamReader& operator>>(optional<double>& v);
0162
0163 StreamReader& operator>>(optional<std::chrono::milliseconds>& v);
0164
0165 StreamReader& operator>>(optional<std::chrono::microseconds>& v);
0166
0167 StreamReader& operator>>(optional<char>& v);
0168
0169 StreamReader& operator>>(optional<std::string>& v);
0170
0171 StreamReader& operator>>(optional<::arrow::Decimal128>& v);
0172
0173 template <std::size_t N>
0174 StreamReader& operator>>(optional<std::array<char, N>>& v) {
0175 CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N);
0176 FixedLenByteArray flba;
0177 if (ReadOptional(&flba)) {
0178 v = std::array<char, N>{};
0179 std::memcpy(v->data(), flba.ptr, N);
0180 } else {
0181 v.reset();
0182 }
0183 return *this;
0184 }
0185
0186
0187
0188
0189 void EndRow();
0190
0191
0192
0193
0194
0195
0196
0197
0198 int64_t SkipColumns(int64_t num_columns_to_skip);
0199
0200
0201
0202
0203
0204
0205
0206 int64_t SkipRows(int64_t num_rows_to_skip);
0207
0208 protected:
0209 [[noreturn]] void ThrowReadFailedException(
0210 const std::shared_ptr<schema::PrimitiveNode>& node);
0211
0212 template <typename ReaderType, typename T>
0213 void Read(T* v) {
0214 const auto& node = nodes_[column_index_];
0215 auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
0216 int16_t def_level;
0217 int16_t rep_level;
0218 int64_t values_read;
0219
0220 reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, v, &values_read);
0221
0222 if (values_read != 1) {
0223 ThrowReadFailedException(node);
0224 }
0225 }
0226
0227 template <typename ReaderType, typename ReadType, typename T>
0228 void Read(T* v) {
0229 const auto& node = nodes_[column_index_];
0230 auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
0231 int16_t def_level;
0232 int16_t rep_level;
0233 ReadType tmp;
0234 int64_t values_read;
0235
0236 reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read);
0237
0238 if (values_read == 1) {
0239 *v = tmp;
0240 } else {
0241 ThrowReadFailedException(node);
0242 }
0243 }
0244
0245 template <typename ReaderType, typename ReadType = typename ReaderType::T, typename T>
0246 void ReadOptional(optional<T>* v) {
0247 const auto& node = nodes_[column_index_];
0248 auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
0249 int16_t def_level;
0250 int16_t rep_level;
0251 ReadType tmp;
0252 int64_t values_read;
0253
0254 reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read);
0255
0256 if (values_read == 1) {
0257 *v = T(tmp);
0258 } else if ((values_read == 0) && (def_level == 0)) {
0259 v->reset();
0260 } else {
0261 ThrowReadFailedException(node);
0262 }
0263 }
0264
0265 void ReadFixedLength(char* ptr, int len);
0266
0267 void Read(ByteArray* v);
0268
0269 void Read(FixedLenByteArray* v);
0270
0271 bool ReadOptional(ByteArray* v);
0272
0273 bool ReadOptional(FixedLenByteArray* v);
0274
0275 void NextRowGroup();
0276
0277 void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
0278 int length = 0);
0279
0280 void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip);
0281
0282 void SetEof();
0283
0284 private:
0285 std::unique_ptr<ParquetFileReader> file_reader_;
0286 std::shared_ptr<FileMetaData> file_metadata_;
0287 std::shared_ptr<RowGroupReader> row_group_reader_;
0288 std::vector<std::shared_ptr<ColumnReader>> column_readers_;
0289 std::vector<std::shared_ptr<schema::PrimitiveNode>> nodes_;
0290
0291 bool eof_{true};
0292 int row_group_index_{0};
0293 int column_index_{0};
0294 int64_t current_row_{0};
0295 int64_t row_group_row_offset_{0};
0296
0297 static constexpr int64_t kBatchSizeOne = 1;
0298 };
0299
0300 PARQUET_EXPORT
0301 StreamReader& operator>>(StreamReader&, EndRowType);
0302
0303 }