Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 08:28:54

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cstdint>
0021 #include <cstring>
0022 #include <memory>
0023 
0024 #include "arrow/type_fwd.h"
0025 #include "arrow/util/compression.h"
0026 #include "parquet/exception.h"
0027 #include "parquet/platform.h"
0028 #include "parquet/types.h"
0029 
0030 namespace arrow {
0031 
0032 class Array;
0033 
0034 namespace bit_util {
0035 class BitWriter;
0036 }  // namespace bit_util
0037 
0038 namespace util {
0039 class RleBitPackedEncoder;
0040 class CodecOptions;
0041 }  // namespace util
0042 
0043 }  // namespace arrow
0044 
0045 namespace parquet {
0046 
0047 struct ArrowWriteContext;
0048 class ColumnChunkMetaDataBuilder;
0049 class ColumnDescriptor;
0050 class ColumnIndexBuilder;
0051 class DataPage;
0052 class DictionaryPage;
0053 class Encryptor;
0054 class OffsetIndexBuilder;
0055 class WriterProperties;
0056 
0057 class PARQUET_EXPORT LevelEncoder {
0058  public:
0059   LevelEncoder();
0060   ~LevelEncoder();
0061 
0062   static int MaxBufferSize(Encoding::type encoding, int16_t max_level,
0063                            int num_buffered_values);
0064 
0065   // Initialize the LevelEncoder.
0066   void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values,
0067             uint8_t* data, int data_size);
0068 
0069   // Encodes a batch of levels from an array and returns the number of levels encoded
0070   int Encode(int batch_size, const int16_t* levels);
0071 
0072   int32_t len() {
0073     if (encoding_ != Encoding::RLE) {
0074       throw ParquetException("Only implemented for RLE encoding");
0075     }
0076     return rle_length_;
0077   }
0078 
0079  private:
0080   int bit_width_;
0081   int rle_length_;
0082   Encoding::type encoding_;
0083   std::unique_ptr<::arrow::util::RleBitPackedEncoder> rle_encoder_;
0084   std::unique_ptr<::arrow::bit_util::BitWriter> bit_packed_encoder_;
0085 };
0086 
0087 class PARQUET_EXPORT PageWriter {
0088  public:
0089   virtual ~PageWriter() {}
0090 
0091   static std::unique_ptr<PageWriter> Open(
0092       std::shared_ptr<ArrowOutputStream> sink, Compression::type codec,
0093       ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal = -1,
0094       int16_t column_chunk_ordinal = -1,
0095       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
0096       bool buffered_row_group = false,
0097       std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
0098       std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
0099       bool page_write_checksum_enabled = false,
0100       // column_index_builder MUST outlive the PageWriter
0101       ColumnIndexBuilder* column_index_builder = NULLPTR,
0102       // offset_index_builder MUST outlive the PageWriter
0103       OffsetIndexBuilder* offset_index_builder = NULLPTR,
0104       const CodecOptions& codec_options = CodecOptions{});
0105 
0106   // The Column Writer decides if dictionary encoding is used if set and
0107   // if the dictionary encoding has fallen back to default encoding on reaching dictionary
0108   // page limit
0109   virtual void Close(bool has_dictionary, bool fallback) = 0;
0110 
0111   // Return the number of uncompressed bytes written (including header size)
0112   virtual int64_t WriteDataPage(const DataPage& page) = 0;
0113 
0114   // Return the number of uncompressed bytes written (including header size)
0115   virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
0116 
0117   /// \brief The total number of bytes written as serialized data and
0118   /// dictionary pages to the sink so far.
0119   virtual int64_t total_compressed_bytes_written() const = 0;
0120 
0121   virtual bool has_compressor() = 0;
0122 
0123   virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
0124 };
0125 
0126 class PARQUET_EXPORT ColumnWriter {
0127  public:
0128   virtual ~ColumnWriter() = default;
0129 
0130   static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
0131                                             std::unique_ptr<PageWriter>,
0132                                             const WriterProperties* properties);
0133 
0134   /// \brief Closes the ColumnWriter, commits any buffered values to pages.
0135   /// \return Total size of the column in bytes
0136   virtual int64_t Close() = 0;
0137 
0138   /// \brief The physical Parquet type of the column
0139   virtual Type::type type() const = 0;
0140 
0141   /// \brief The schema for the column
0142   virtual const ColumnDescriptor* descr() const = 0;
0143 
0144   /// \brief The number of rows written so far
0145   virtual int64_t rows_written() const = 0;
0146 
0147   /// \brief The total size of the compressed pages + page headers. Values
0148   /// are still buffered and not written to a pager yet
0149   ///
0150   /// So in un-buffered mode, it always returns 0
0151   virtual int64_t total_compressed_bytes() const = 0;
0152 
0153   /// \brief The total number of bytes written as serialized data and
0154   /// dictionary pages to the ColumnChunk so far
0155   /// These bytes are uncompressed bytes.
0156   virtual int64_t total_bytes_written() const = 0;
0157 
0158   /// \brief The total number of bytes written as serialized data and
0159   /// dictionary pages to the ColumnChunk so far.
0160   /// If the column is uncompressed, the value would be equal to
0161   /// total_bytes_written().
0162   virtual int64_t total_compressed_bytes_written() const = 0;
0163 
0164   /// \brief Estimated size of the values that are not written to a page yet.
0165   virtual int64_t estimated_buffered_value_bytes() const = 0;
0166 
0167   /// \brief The file-level writer properties
0168   virtual const WriterProperties* properties() = 0;
0169 
0170   /// \brief Add key-value metadata to the ColumnChunk.
0171   /// \param[in] key_value_metadata the metadata to add.
0172   /// \note This will overwrite any existing metadata with the same key.
0173   /// \throw ParquetException if Close() has been called.
0174   virtual void AddKeyValueMetadata(
0175       const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata) = 0;
0176 
0177   /// \brief Reset the ColumnChunk key-value metadata.
0178   /// \throw ParquetException if Close() has been called.
0179   virtual void ResetKeyValueMetadata() = 0;
0180 
0181   /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
0182   /// error status if the array data type is not compatible with the concrete
0183   /// writer type.
0184   ///
0185   /// leaf_array is always a primitive (possibly dictionary encoded type).
0186   /// Leaf_field_nullable indicates whether the leaf array is considered nullable
0187   /// according to its schema in a Table or its parent array.
0188   virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
0189                                      int64_t num_levels, const ::arrow::Array& leaf_array,
0190                                      ArrowWriteContext* ctx,
0191                                      bool leaf_field_nullable) = 0;
0192 };
0193 
0194 // API to write values to a single column. This is the main client facing API.
0195 template <typename DType>
0196 class TypedColumnWriter : public ColumnWriter {
0197  public:
0198   using T = typename DType::c_type;
0199 
0200   // Write a batch of repetition levels, definition levels, and values to the
0201   // column.
0202   // `num_values` is the number of logical leaf values.
0203   // `def_levels` (resp. `rep_levels`) can be null if the column's max definition level
0204   // (resp. max repetition level) is 0.
0205   // If not null, each of `def_levels` and `rep_levels` must have at least
0206   // `num_values`.
0207   //
0208   // The number of physical values written (taken from `values`) is returned.
0209   // It can be smaller than `num_values` is there are some undefined values.
0210   virtual int64_t WriteBatch(int64_t num_values, const int16_t* def_levels,
0211                              const int16_t* rep_levels, const T* values) = 0;
0212 
0213   /// Write a batch of repetition levels, definition levels, and values to the
0214   /// column.
0215   ///
0216   /// In comparison to WriteBatch the length of repetition and definition levels
0217   /// is the same as of the number of values read for max_definition_level == 1.
0218   /// In the case of max_definition_level > 1, the repetition and definition
0219   /// levels are larger than the values but the values include the null entries
0220   /// with definition_level == (max_definition_level - 1). Thus we have to differentiate
0221   /// in the parameters of this function if the input has the length of num_values or the
0222   /// _number of rows in the lowest nesting level_.
0223   ///
0224   /// In the case that the most inner node in the Parquet is required, the _number of rows
0225   /// in the lowest nesting level_ is equal to the number of non-null values. If the
0226   /// inner-most schema node is optional, the _number of rows in the lowest nesting level_
0227   /// also includes all values with definition_level == (max_definition_level - 1).
0228   ///
0229   /// @param num_values number of levels to write.
0230   /// @param def_levels The Parquet definition levels, length is num_values
0231   /// @param rep_levels The Parquet repetition levels, length is num_values
0232   /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting
0233   ///   level. The length is number of rows in the lowest nesting level.
0234   /// @param valid_bits_offset The offset in bits of the valid_bits where the
0235   ///   first relevant bit resides.
0236   /// @param values The values in the lowest nested level including
0237   ///   spacing for nulls on the lowest levels; input has the length
0238   ///   of the number of rows on the lowest nesting level.
0239   virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
0240                                 const int16_t* rep_levels, const uint8_t* valid_bits,
0241                                 int64_t valid_bits_offset, const T* values) = 0;
0242 };
0243 
0244 using BoolWriter = TypedColumnWriter<BooleanType>;
0245 using Int32Writer = TypedColumnWriter<Int32Type>;
0246 using Int64Writer = TypedColumnWriter<Int64Type>;
0247 using Int96Writer = TypedColumnWriter<Int96Type>;
0248 using FloatWriter = TypedColumnWriter<FloatType>;
0249 using DoubleWriter = TypedColumnWriter<DoubleType>;
0250 using ByteArrayWriter = TypedColumnWriter<ByteArrayType>;
0251 using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>;
0252 
0253 namespace internal {
0254 
0255 /**
0256  * Timestamp conversion constants
0257  */
0258 constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
0259 
0260 template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
0261 inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
0262   int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
0263   (*impala_timestamp).value[2] = (uint32_t)julian_days;
0264 
0265   int64_t last_day_units = time % UnitPerDay;
0266   auto last_day_nanos = last_day_units * NanosecondsPerUnit;
0267   // impala_timestamp will be unaligned every other entry so do memcpy instead
0268   // of assign and reinterpret cast to avoid undefined behavior.
0269   std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
0270 }
0271 
0272 constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
0273 
0274 inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
0275   ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
0276                                                                    impala_timestamp);
0277 }
0278 
0279 constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
0280 
0281 inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
0282                                           Int96* impala_timestamp) {
0283   ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
0284       milliseconds, impala_timestamp);
0285 }
0286 
0287 constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
0288 
0289 inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
0290                                           Int96* impala_timestamp) {
0291   ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
0292       microseconds, impala_timestamp);
0293 }
0294 
0295 constexpr int64_t kNanosecondsInNanos = INT64_C(1);
0296 
0297 inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
0298                                          Int96* impala_timestamp) {
0299   ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
0300       nanoseconds, impala_timestamp);
0301 }
0302 
0303 }  // namespace internal
0304 }  // namespace parquet