Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:28:55

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <array>
0021 #include <chrono>
0022 #include <cstdint>
0023 #include <memory>
0024 #include <optional>
0025 #include <string>
0026 #include <string_view>
0027 #include <vector>
0028 
0029 #include "arrow/util/span.h"
0030 
0031 #include "parquet/column_writer.h"
0032 #include "parquet/file_writer.h"
0033 
0034 namespace parquet {
0035 
0036 /// \brief A class for writing Parquet files using an output stream type API.
0037 ///
0038 /// The values given must be of the correct type i.e. the type must
0039 /// match the file schema exactly otherwise a ParquetException will be
0040 /// thrown.
0041 ///
0042 /// The user must explicitly indicate the end of the row using the
0043 /// EndRow() function or EndRow output manipulator.
0044 ///
0045 /// A maximum row group size can be configured, the default size is
0046 /// 512MB.  Alternatively the row group size can be set to zero and the
0047 /// user can create new row groups by calling the EndRowGroup()
0048 /// function or using the EndRowGroup output manipulator.
0049 ///
0050 /// Required and optional fields are supported:
0051 /// - Required fields are written using operator<<(T)
0052 /// - Optional fields are written using
0053 ///   operator<<(std::optional<T>).
0054 ///
0055 /// Note that operator<<(T) can be used to write optional fields.
0056 ///
0057 /// Similarly, operator<<(std::optional<T>) can be used to
0058 /// write required fields.  However if the optional parameter does not
0059 /// have a value (i.e. it is nullopt) then a ParquetException will be
0060 /// raised.
0061 ///
0062 /// Currently there is no support for repeated fields.
0063 ///
0064 class PARQUET_EXPORT StreamWriter {
0065  public:
0066   template <typename T>
0067   using optional = ::std::optional<T>;
0068 
0069   // N.B. Default constructed objects are not usable.  This
0070   //      constructor is provided so that the object may be move
0071   //      assigned afterwards.
0072   StreamWriter() = default;
0073 
0074   explicit StreamWriter(std::unique_ptr<ParquetFileWriter> writer);
0075 
0076   ~StreamWriter() = default;
0077 
0078   static void SetDefaultMaxRowGroupSize(int64_t max_size);
0079 
0080   void SetMaxRowGroupSize(int64_t max_size);
0081 
0082   int current_column() const { return column_index_; }
0083 
0084   int64_t current_row() const { return current_row_; }
0085 
0086   int num_columns() const;
0087 
0088   // Moving is possible.
0089   StreamWriter(StreamWriter&&) = default;
0090   StreamWriter& operator=(StreamWriter&&) = default;
0091 
0092   // Copying is not allowed.
0093   StreamWriter(const StreamWriter&) = delete;
0094   StreamWriter& operator=(const StreamWriter&) = delete;
0095 
0096   /// \brief Output operators for required fields.
0097   /// These can also be used for optional fields when a value must be set.
0098   StreamWriter& operator<<(bool v);
0099 
0100   StreamWriter& operator<<(int8_t v);
0101 
0102   StreamWriter& operator<<(uint8_t v);
0103 
0104   StreamWriter& operator<<(int16_t v);
0105 
0106   StreamWriter& operator<<(uint16_t v);
0107 
0108   StreamWriter& operator<<(int32_t v);
0109 
0110   StreamWriter& operator<<(uint32_t v);
0111 
0112   StreamWriter& operator<<(int64_t v);
0113 
0114   StreamWriter& operator<<(uint64_t v);
0115 
0116   StreamWriter& operator<<(const std::chrono::milliseconds& v);
0117 
0118   StreamWriter& operator<<(const std::chrono::microseconds& v);
0119 
0120   StreamWriter& operator<<(float v);
0121 
0122   StreamWriter& operator<<(double v);
0123 
0124   StreamWriter& operator<<(char v);
0125 
0126   /// \brief Helper class to write fixed length strings.
0127   /// This is useful as the standard string view (such as
0128   /// std::string_view) is for variable length data.
0129   struct PARQUET_EXPORT FixedStringView {
0130     FixedStringView() = default;
0131 
0132     explicit FixedStringView(const char* data_ptr);
0133 
0134     FixedStringView(const char* data_ptr, std::size_t data_len);
0135 
0136     const char* data{NULLPTR};
0137     std::size_t size{0};
0138   };
0139 
0140   /// \brief Output operators for fixed length strings.
0141   template <int N>
0142   StreamWriter& operator<<(const char (&v)[N]) {
0143     return WriteFixedLength(v, N);
0144   }
0145   template <std::size_t N>
0146   StreamWriter& operator<<(const std::array<char, N>& v) {
0147     return WriteFixedLength(v.data(), N);
0148   }
0149   StreamWriter& operator<<(FixedStringView v);
0150 
0151   /// \brief Output operators for variable length strings.
0152   StreamWriter& operator<<(const char* v);
0153   StreamWriter& operator<<(const std::string& v);
0154   StreamWriter& operator<<(::std::string_view v);
0155 
0156   /// \brief Helper class to write variable length raw data.
0157   using RawDataView = ::arrow::util::span<const uint8_t>;
0158 
0159   /// \brief Output operators for variable length raw data.
0160   StreamWriter& operator<<(RawDataView v);
0161 
0162   /// \brief Output operator for optional fields.
0163   template <typename T>
0164   StreamWriter& operator<<(const optional<T>& v) {
0165     if (v) {
0166       return operator<<(*v);
0167     }
0168     SkipOptionalColumn();
0169     return *this;
0170   }
0171 
0172   /// \brief Skip the next N columns of optional data.  If there are
0173   /// less than N columns remaining then the excess columns are
0174   /// ignored.
0175   /// \throws ParquetException if there is an attempt to skip any
0176   /// required column.
0177   /// \return Number of columns actually skipped.
0178   int64_t SkipColumns(int num_columns_to_skip);
0179 
0180   /// \brief Terminate the current row and advance to next one.
0181   /// \throws ParquetException if all columns in the row were not
0182   /// written or skipped.
0183   void EndRow();
0184 
0185   /// \brief Terminate the current row group and create new one.
0186   void EndRowGroup();
0187 
0188  protected:
0189   template <typename WriterType, typename T>
0190   StreamWriter& Write(const T v) {
0191     auto writer = static_cast<WriterType*>(row_group_writer_->column(column_index_++));
0192 
0193     writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v);
0194 
0195     if (max_row_group_size_ > 0) {
0196       row_group_size_ += writer->estimated_buffered_value_bytes();
0197     }
0198     return *this;
0199   }
0200 
0201   StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len,
0202                                     ConvertedType::type converted_type);
0203 
0204   StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len);
0205 
0206   void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
0207                    int length = -1);
0208 
0209   /// \brief Skip the next column which must be optional.
0210   /// \throws ParquetException if the next column does not exist or is
0211   /// not optional.
0212   void SkipOptionalColumn();
0213 
0214   void WriteNullValue(ColumnWriter* writer);
0215 
0216  private:
0217   using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>;
0218 
0219   struct null_deleter {
0220     void operator()(void*) {}
0221   };
0222 
0223   int32_t column_index_{0};
0224   int64_t current_row_{0};
0225   int64_t row_group_size_{0};
0226   int64_t max_row_group_size_{default_row_group_size_};
0227 
0228   std::unique_ptr<ParquetFileWriter> file_writer_;
0229   std::unique_ptr<RowGroupWriter, null_deleter> row_group_writer_;
0230   std::vector<node_ptr_type> nodes_;
0231 
0232   static constexpr int16_t kDefLevelZero = 0;
0233   static constexpr int16_t kDefLevelOne = 1;
0234   static constexpr int16_t kRepLevelZero = 0;
0235   static constexpr int64_t kBatchSizeOne = 1;
0236 
0237   static int64_t default_row_group_size_;
0238 };
0239 
0240 struct PARQUET_EXPORT EndRowType {};
0241 constexpr EndRowType EndRow = {};
0242 
0243 struct PARQUET_EXPORT EndRowGroupType {};
0244 constexpr EndRowGroupType EndRowGroup = {};
0245 
0246 PARQUET_EXPORT
0247 StreamWriter& operator<<(StreamWriter&, EndRowType);
0248 
0249 PARQUET_EXPORT
0250 StreamWriter& operator<<(StreamWriter&, EndRowGroupType);
0251 
0252 }  // namespace parquet