File indexing completed on 2026-04-17 08:28:55
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <array>
0021 #include <chrono>
0022 #include <cstdint>
0023 #include <memory>
0024 #include <optional>
0025 #include <string>
0026 #include <string_view>
0027 #include <vector>
0028
0029 #include "arrow/util/span.h"
0030
0031 #include "parquet/column_writer.h"
0032 #include "parquet/file_writer.h"
0033
0034 namespace parquet {
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064 class PARQUET_EXPORT StreamWriter {
0065 public:
0066 template <typename T>
0067 using optional = ::std::optional<T>;
0068
0069
0070
0071
0072 StreamWriter() = default;
0073
0074 explicit StreamWriter(std::unique_ptr<ParquetFileWriter> writer);
0075
0076 ~StreamWriter() = default;
0077
0078 static void SetDefaultMaxRowGroupSize(int64_t max_size);
0079
0080 void SetMaxRowGroupSize(int64_t max_size);
0081
0082 int current_column() const { return column_index_; }
0083
0084 int64_t current_row() const { return current_row_; }
0085
0086 int num_columns() const;
0087
0088
0089 StreamWriter(StreamWriter&&) = default;
0090 StreamWriter& operator=(StreamWriter&&) = default;
0091
0092
0093 StreamWriter(const StreamWriter&) = delete;
0094 StreamWriter& operator=(const StreamWriter&) = delete;
0095
0096
0097
0098 StreamWriter& operator<<(bool v);
0099
0100 StreamWriter& operator<<(int8_t v);
0101
0102 StreamWriter& operator<<(uint8_t v);
0103
0104 StreamWriter& operator<<(int16_t v);
0105
0106 StreamWriter& operator<<(uint16_t v);
0107
0108 StreamWriter& operator<<(int32_t v);
0109
0110 StreamWriter& operator<<(uint32_t v);
0111
0112 StreamWriter& operator<<(int64_t v);
0113
0114 StreamWriter& operator<<(uint64_t v);
0115
0116 StreamWriter& operator<<(const std::chrono::milliseconds& v);
0117
0118 StreamWriter& operator<<(const std::chrono::microseconds& v);
0119
0120 StreamWriter& operator<<(float v);
0121
0122 StreamWriter& operator<<(double v);
0123
0124 StreamWriter& operator<<(char v);
0125
0126
0127
0128
0129 struct PARQUET_EXPORT FixedStringView {
0130 FixedStringView() = default;
0131
0132 explicit FixedStringView(const char* data_ptr);
0133
0134 FixedStringView(const char* data_ptr, std::size_t data_len);
0135
0136 const char* data{NULLPTR};
0137 std::size_t size{0};
0138 };
0139
0140
0141 template <int N>
0142 StreamWriter& operator<<(const char (&v)[N]) {
0143 return WriteFixedLength(v, N);
0144 }
0145 template <std::size_t N>
0146 StreamWriter& operator<<(const std::array<char, N>& v) {
0147 return WriteFixedLength(v.data(), N);
0148 }
0149 StreamWriter& operator<<(FixedStringView v);
0150
0151
0152 StreamWriter& operator<<(const char* v);
0153 StreamWriter& operator<<(const std::string& v);
0154 StreamWriter& operator<<(::std::string_view v);
0155
0156
0157 using RawDataView = ::arrow::util::span<const uint8_t>;
0158
0159
0160 StreamWriter& operator<<(RawDataView v);
0161
0162
0163 template <typename T>
0164 StreamWriter& operator<<(const optional<T>& v) {
0165 if (v) {
0166 return operator<<(*v);
0167 }
0168 SkipOptionalColumn();
0169 return *this;
0170 }
0171
0172
0173
0174
0175
0176
0177
0178 int64_t SkipColumns(int num_columns_to_skip);
0179
0180
0181
0182
0183 void EndRow();
0184
0185
0186 void EndRowGroup();
0187
0188 protected:
0189 template <typename WriterType, typename T>
0190 StreamWriter& Write(const T v) {
0191 auto writer = static_cast<WriterType*>(row_group_writer_->column(column_index_++));
0192
0193 writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v);
0194
0195 if (max_row_group_size_ > 0) {
0196 row_group_size_ += writer->estimated_buffered_value_bytes();
0197 }
0198 return *this;
0199 }
0200
0201 StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len,
0202 ConvertedType::type converted_type);
0203
0204 StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len);
0205
0206 void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
0207 int length = -1);
0208
0209
0210
0211
0212 void SkipOptionalColumn();
0213
0214 void WriteNullValue(ColumnWriter* writer);
0215
0216 private:
0217 using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>;
0218
0219 struct null_deleter {
0220 void operator()(void*) {}
0221 };
0222
0223 int32_t column_index_{0};
0224 int64_t current_row_{0};
0225 int64_t row_group_size_{0};
0226 int64_t max_row_group_size_{default_row_group_size_};
0227
0228 std::unique_ptr<ParquetFileWriter> file_writer_;
0229 std::unique_ptr<RowGroupWriter, null_deleter> row_group_writer_;
0230 std::vector<node_ptr_type> nodes_;
0231
0232 static constexpr int16_t kDefLevelZero = 0;
0233 static constexpr int16_t kDefLevelOne = 1;
0234 static constexpr int16_t kRepLevelZero = 0;
0235 static constexpr int64_t kBatchSizeOne = 1;
0236
0237 static int64_t default_row_group_size_;
0238 };
0239
0240 struct PARQUET_EXPORT EndRowType {};
0241 constexpr EndRowType EndRow = {};
0242
0243 struct PARQUET_EXPORT EndRowGroupType {};
0244 constexpr EndRowGroupType EndRowGroup = {};
0245
0246 PARQUET_EXPORT
0247 StreamWriter& operator<<(StreamWriter&, EndRowType);
0248
0249 PARQUET_EXPORT
0250 StreamWriter& operator<<(StreamWriter&, EndRowGroupType);
0251
0252 }