File indexing completed on 2026-04-17 08:28:54
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <cstdint>
0021 #include <cstring>
0022 #include <memory>
0023
0024 #include "arrow/type_fwd.h"
0025 #include "arrow/util/compression.h"
0026 #include "parquet/exception.h"
0027 #include "parquet/platform.h"
0028 #include "parquet/types.h"
0029
0030 namespace arrow {
0031
0032 class Array;
0033
0034 namespace bit_util {
0035 class BitWriter;
0036 }
0037
0038 namespace util {
0039 class RleBitPackedEncoder;
0040 class CodecOptions;
0041 }
0042
0043 }
0044
0045 namespace parquet {
0046
0047 struct ArrowWriteContext;
0048 class ColumnChunkMetaDataBuilder;
0049 class ColumnDescriptor;
0050 class ColumnIndexBuilder;
0051 class DataPage;
0052 class DictionaryPage;
0053 class Encryptor;
0054 class OffsetIndexBuilder;
0055 class WriterProperties;
0056
0057 class PARQUET_EXPORT LevelEncoder {
0058 public:
0059 LevelEncoder();
0060 ~LevelEncoder();
0061
0062 static int MaxBufferSize(Encoding::type encoding, int16_t max_level,
0063 int num_buffered_values);
0064
0065
0066 void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values,
0067 uint8_t* data, int data_size);
0068
0069
0070 int Encode(int batch_size, const int16_t* levels);
0071
0072 int32_t len() {
0073 if (encoding_ != Encoding::RLE) {
0074 throw ParquetException("Only implemented for RLE encoding");
0075 }
0076 return rle_length_;
0077 }
0078
0079 private:
0080 int bit_width_;
0081 int rle_length_;
0082 Encoding::type encoding_;
0083 std::unique_ptr<::arrow::util::RleBitPackedEncoder> rle_encoder_;
0084 std::unique_ptr<::arrow::bit_util::BitWriter> bit_packed_encoder_;
0085 };
0086
0087 class PARQUET_EXPORT PageWriter {
0088 public:
0089 virtual ~PageWriter() {}
0090
0091 static std::unique_ptr<PageWriter> Open(
0092 std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
0093 ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal = -1,
0094 int16_t column_chunk_ordinal = -1,
0095 ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
0096 bool buffered_row_group = false,
0097 std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
0098 std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
0099 bool page_write_checksum_enabled = false,
0100
0101 ColumnIndexBuilder* column_index_builder = NULLPTR,
0102
0103 OffsetIndexBuilder* offset_index_builder = NULLPTR,
0104 const CodecOptions& codec_options = CodecOptions{});
0105
0106
0107
0108
0109 virtual void Close(bool has_dictionary, bool fallback) = 0;
0110
0111
0112 virtual int64_t WriteDataPage(const DataPage& page) = 0;
0113
0114
0115 virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
0116
0117
0118
0119 virtual int64_t total_compressed_bytes_written() const = 0;
0120
0121 virtual bool has_compressor() = 0;
0122
0123 virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
0124 };
0125
0126 class PARQUET_EXPORT ColumnWriter {
0127 public:
0128 virtual ~ColumnWriter() = default;
0129
0130 static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
0131 std::unique_ptr<PageWriter>,
0132 const WriterProperties* properties);
0133
0134
0135
0136 virtual int64_t Close() = 0;
0137
0138
0139 virtual Type::type type() const = 0;
0140
0141
0142 virtual const ColumnDescriptor* descr() const = 0;
0143
0144
0145 virtual int64_t rows_written() const = 0;
0146
0147
0148
0149
0150
0151 virtual int64_t total_compressed_bytes() const = 0;
0152
0153
0154
0155
0156 virtual int64_t total_bytes_written() const = 0;
0157
0158
0159
0160
0161
0162 virtual int64_t total_compressed_bytes_written() const = 0;
0163
0164
0165 virtual int64_t estimated_buffered_value_bytes() const = 0;
0166
0167
0168 virtual const WriterProperties* properties() = 0;
0169
0170
0171
0172
0173
0174 virtual void AddKeyValueMetadata(
0175 const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata) = 0;
0176
0177
0178
0179 virtual void ResetKeyValueMetadata() = 0;
0180
0181
0182
0183
0184
0185
0186
0187
0188 virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
0189 int64_t num_levels, const ::arrow::Array& leaf_array,
0190 ArrowWriteContext* ctx,
0191 bool leaf_field_nullable) = 0;
0192 };
0193
0194
0195 template <typename DType>
0196 class TypedColumnWriter : public ColumnWriter {
0197 public:
0198 using T = typename DType::c_type;
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210 virtual int64_t WriteBatch(int64_t num_values, const int16_t* def_levels,
0211 const int16_t* rep_levels, const T* values) = 0;
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232
0233
0234
0235
0236
0237
0238
0239 virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
0240 const int16_t* rep_levels, const uint8_t* valid_bits,
0241 int64_t valid_bits_offset, const T* values) = 0;
0242 };
0243
0244 using BoolWriter = TypedColumnWriter<BooleanType>;
0245 using Int32Writer = TypedColumnWriter<Int32Type>;
0246 using Int64Writer = TypedColumnWriter<Int64Type>;
0247 using Int96Writer = TypedColumnWriter<Int96Type>;
0248 using FloatWriter = TypedColumnWriter<FloatType>;
0249 using DoubleWriter = TypedColumnWriter<DoubleType>;
0250 using ByteArrayWriter = TypedColumnWriter<ByteArrayType>;
0251 using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>;
0252
0253 namespace internal {
0254
0255
0256
0257
0258 constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
0259
0260 template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
0261 inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
0262 int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
0263 (*impala_timestamp).value[2] = (uint32_t)julian_days;
0264
0265 int64_t last_day_units = time % UnitPerDay;
0266 auto last_day_nanos = last_day_units * NanosecondsPerUnit;
0267
0268
0269 std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
0270 }
0271
0272 constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
0273
0274 inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
0275 ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
0276 impala_timestamp);
0277 }
0278
0279 constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
0280
0281 inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
0282 Int96* impala_timestamp) {
0283 ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
0284 milliseconds, impala_timestamp);
0285 }
0286
0287 constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
0288
0289 inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
0290 Int96* impala_timestamp) {
0291 ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
0292 microseconds, impala_timestamp);
0293 }
0294
0295 constexpr int64_t kNanosecondsInNanos = INT64_C(1);
0296
0297 inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
0298 Int96* impala_timestamp) {
0299 ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
0300 nanoseconds, impala_timestamp);
0301 }
0302
0303 }
0304 }