Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:28:55

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <array>
0021 #include <chrono>
0022 #include <cstdint>
0023 #include <cstring>
0024 #include <memory>
0025 #include <optional>
0026 #include <string>
0027 #include <vector>
0028 
0029 #include "parquet/column_reader.h"
0030 #include "parquet/file_reader.h"
0031 #include "parquet/stream_writer.h"
0032 
0033 namespace parquet {
0034 
0035 /// \brief A class for reading Parquet files using an output stream type API.
0036 ///
0037 /// The values given must be of the correct type i.e. the type must
0038 /// match the file schema exactly otherwise a ParquetException will be
0039 /// thrown.
0040 ///
0041 /// The user must explicitly advance to the next row using the
0042 /// EndRow() function or EndRow input manipulator.
0043 ///
0044 /// Required and optional fields are supported:
0045 /// - Required fields are read using operator>>(T)
0046 /// - Optional fields are read with
0047 ///   operator>>(std::optional<T>)
0048 ///
0049 /// Note that operator>>(std::optional<T>) can be used to read
0050 /// required fields.
0051 ///
0052 /// Similarly operator>>(T) can be used to read optional fields.
0053 /// However, if the value is not present then a ParquetException will
0054 /// be raised.
0055 ///
0056 /// Currently there is no support for repeated fields.
0057 ///
0058 class PARQUET_EXPORT StreamReader {
0059  public:
0060   template <typename T>
0061   using optional = ::std::optional<T>;
0062 
0063   // N.B. Default constructed objects are not usable.  This
0064   //      constructor is provided so that the object may be move
0065   //      assigned afterwards.
0066   StreamReader() = default;
0067 
0068   explicit StreamReader(std::unique_ptr<ParquetFileReader> reader);
0069 
0070   ~StreamReader() = default;
0071 
0072   bool eof() const { return eof_; }
0073 
0074   int current_column() const { return column_index_; }
0075 
0076   int64_t current_row() const { return current_row_; }
0077 
0078   int num_columns() const;
0079 
0080   int64_t num_rows() const;
0081 
0082   // Moving is possible.
0083   StreamReader(StreamReader&&) = default;
0084   StreamReader& operator=(StreamReader&&) = default;
0085 
0086   // Copying is not allowed.
0087   StreamReader(const StreamReader&) = delete;
0088   StreamReader& operator=(const StreamReader&) = delete;
0089 
0090   StreamReader& operator>>(bool& v);
0091 
0092   StreamReader& operator>>(int8_t& v);
0093 
0094   StreamReader& operator>>(uint8_t& v);
0095 
0096   StreamReader& operator>>(int16_t& v);
0097 
0098   StreamReader& operator>>(uint16_t& v);
0099 
0100   StreamReader& operator>>(int32_t& v);
0101 
0102   StreamReader& operator>>(uint32_t& v);
0103 
0104   StreamReader& operator>>(int64_t& v);
0105 
0106   StreamReader& operator>>(uint64_t& v);
0107 
0108   StreamReader& operator>>(std::chrono::milliseconds& v);
0109 
0110   StreamReader& operator>>(std::chrono::microseconds& v);
0111 
0112   StreamReader& operator>>(float& v);
0113 
0114   StreamReader& operator>>(double& v);
0115 
0116   StreamReader& operator>>(char& v);
0117 
0118   template <int N>
0119   StreamReader& operator>>(char (&v)[N]) {
0120     ReadFixedLength(v, N);
0121     return *this;
0122   }
0123 
0124   template <std::size_t N>
0125   StreamReader& operator>>(std::array<char, N>& v) {
0126     ReadFixedLength(v.data(), static_cast<int>(N));
0127     return *this;
0128   }
0129 
0130   // N.B. Cannot allow for reading to a arbitrary char pointer as the
0131   //      length cannot be verified.  Also it would overshadow the
0132   //      char[N] input operator.
0133   // StreamReader& operator>>(char * v);
0134 
0135   StreamReader& operator>>(std::string& v);
0136 
0137   StreamReader& operator>>(::arrow::Decimal128& v);
0138 
0139   // Input operators for optional fields.
0140 
0141   StreamReader& operator>>(optional<bool>& v);
0142 
0143   StreamReader& operator>>(optional<int8_t>& v);
0144 
0145   StreamReader& operator>>(optional<uint8_t>& v);
0146 
0147   StreamReader& operator>>(optional<int16_t>& v);
0148 
0149   StreamReader& operator>>(optional<uint16_t>& v);
0150 
0151   StreamReader& operator>>(optional<int32_t>& v);
0152 
0153   StreamReader& operator>>(optional<uint32_t>& v);
0154 
0155   StreamReader& operator>>(optional<int64_t>& v);
0156 
0157   StreamReader& operator>>(optional<uint64_t>& v);
0158 
0159   StreamReader& operator>>(optional<float>& v);
0160 
0161   StreamReader& operator>>(optional<double>& v);
0162 
0163   StreamReader& operator>>(optional<std::chrono::milliseconds>& v);
0164 
0165   StreamReader& operator>>(optional<std::chrono::microseconds>& v);
0166 
0167   StreamReader& operator>>(optional<char>& v);
0168 
0169   StreamReader& operator>>(optional<std::string>& v);
0170 
0171   StreamReader& operator>>(optional<::arrow::Decimal128>& v);
0172 
0173   template <std::size_t N>
0174   StreamReader& operator>>(optional<std::array<char, N>>& v) {
0175     CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N);
0176     FixedLenByteArray flba;
0177     if (ReadOptional(&flba)) {
0178       v = std::array<char, N>{};
0179       std::memcpy(v->data(), flba.ptr, N);
0180     } else {
0181       v.reset();
0182     }
0183     return *this;
0184   }
0185 
0186   /// \brief Terminate current row and advance to next one.
0187   /// \throws ParquetException if all columns in the row were not
0188   /// read or skipped.
0189   void EndRow();
0190 
0191   /// \brief Skip the data in the next columns.
0192   /// If the number of columns exceeds the columns remaining on the
0193   /// current row then skipping is terminated - it does _not_ continue
0194   /// skipping columns on the next row.
0195   /// Skipping of columns still requires the use 'EndRow' even if all
0196   /// remaining columns were skipped.
0197   /// \return Number of columns actually skipped.
0198   int64_t SkipColumns(int64_t num_columns_to_skip);
0199 
0200   /// \brief Skip the data in the next rows.
0201   /// Skipping of rows is not allowed if reading of data for the
0202   /// current row is not finished.
0203   /// Skipping of rows will be terminated if the end of file is
0204   /// reached.
0205   /// \return Number of rows actually skipped.
0206   int64_t SkipRows(int64_t num_rows_to_skip);
0207 
0208  protected:
0209   [[noreturn]] void ThrowReadFailedException(
0210       const std::shared_ptr<schema::PrimitiveNode>& node);
0211 
0212   template <typename ReaderType, typename T>
0213   void Read(T* v) {
0214     const auto& node = nodes_[column_index_];
0215     auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
0216     int16_t def_level;
0217     int16_t rep_level;
0218     int64_t values_read;
0219 
0220     reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, v, &values_read);
0221 
0222     if (values_read != 1) {
0223       ThrowReadFailedException(node);
0224     }
0225   }
0226 
0227   template <typename ReaderType, typename ReadType, typename T>
0228   void Read(T* v) {
0229     const auto& node = nodes_[column_index_];
0230     auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
0231     int16_t def_level;
0232     int16_t rep_level;
0233     ReadType tmp;
0234     int64_t values_read;
0235 
0236     reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read);
0237 
0238     if (values_read == 1) {
0239       *v = tmp;
0240     } else {
0241       ThrowReadFailedException(node);
0242     }
0243   }
0244 
0245   template <typename ReaderType, typename ReadType = typename ReaderType::T, typename T>
0246   void ReadOptional(optional<T>* v) {
0247     const auto& node = nodes_[column_index_];
0248     auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
0249     int16_t def_level;
0250     int16_t rep_level;
0251     ReadType tmp;
0252     int64_t values_read;
0253 
0254     reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read);
0255 
0256     if (values_read == 1) {
0257       *v = T(tmp);
0258     } else if ((values_read == 0) && (def_level == 0)) {
0259       v->reset();
0260     } else {
0261       ThrowReadFailedException(node);
0262     }
0263   }
0264 
0265   void ReadFixedLength(char* ptr, int len);
0266 
0267   void Read(ByteArray* v);
0268 
0269   void Read(FixedLenByteArray* v);
0270 
0271   bool ReadOptional(ByteArray* v);
0272 
0273   bool ReadOptional(FixedLenByteArray* v);
0274 
0275   void NextRowGroup();
0276 
0277   void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
0278                    int length = 0);
0279 
0280   void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip);
0281 
0282   void SetEof();
0283 
0284  private:
0285   std::unique_ptr<ParquetFileReader> file_reader_;
0286   std::shared_ptr<FileMetaData> file_metadata_;
0287   std::shared_ptr<RowGroupReader> row_group_reader_;
0288   std::vector<std::shared_ptr<ColumnReader>> column_readers_;
0289   std::vector<std::shared_ptr<schema::PrimitiveNode>> nodes_;
0290 
0291   bool eof_{true};
0292   int row_group_index_{0};
0293   int column_index_{0};
0294   int64_t current_row_{0};
0295   int64_t row_group_row_offset_{0};
0296 
0297   static constexpr int64_t kBatchSizeOne = 1;
0298 };  // namespace parquet
0299 
0300 PARQUET_EXPORT
0301 StreamReader& operator>>(StreamReader&, EndRowType);
0302 
0303 }  // namespace parquet