Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:07

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #include <memory>
0019 #include <string>
0020 #include <utility>
0021 #include <vector>
0022 
0023 #include "arrow/array.h"
0024 #include "arrow/chunked_array.h"
0025 #include "arrow/status.h"
0026 #include "arrow/type.h"
0027 #include "arrow/type_traits.h"
0028 #include "arrow/util/checked_cast.h"
0029 #include "arrow/visit_type_inline.h"
0030 
0031 namespace arrow {
0032 namespace internal {
0033 
0034 template <typename BaseConverter, template <typename...> class ConverterTrait>
0035 static Result<std::unique_ptr<BaseConverter>> MakeConverter(
0036     std::shared_ptr<DataType> type, typename BaseConverter::OptionsType options,
0037     MemoryPool* pool);
0038 
0039 template <typename Input, typename Options>
0040 class Converter {
0041  public:
0042   using Self = Converter<Input, Options>;
0043   using InputType = Input;
0044   using OptionsType = Options;
0045 
0046   virtual ~Converter() = default;
0047 
0048   Status Construct(std::shared_ptr<DataType> type, OptionsType options,
0049                    MemoryPool* pool) {
0050     type_ = std::move(type);
0051     options_ = std::move(options);
0052     return Init(pool);
0053   }
0054 
0055   virtual Status Append(InputType value) { return Status::NotImplemented("Append"); }
0056 
0057   virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) {
0058     return Status::NotImplemented("Extend");
0059   }
0060 
0061   virtual Status ExtendMasked(InputType values, InputType mask, int64_t size,
0062                               int64_t offset = 0) {
0063     return Status::NotImplemented("ExtendMasked");
0064   }
0065 
0066   const std::shared_ptr<ArrayBuilder>& builder() const { return builder_; }
0067 
0068   const std::shared_ptr<DataType>& type() const { return type_; }
0069 
0070   OptionsType options() const { return options_; }
0071 
0072   bool may_overflow() const { return may_overflow_; }
0073 
0074   bool rewind_on_overflow() const { return rewind_on_overflow_; }
0075 
0076   virtual Status Reserve(int64_t additional_capacity) {
0077     return builder_->Reserve(additional_capacity);
0078   }
0079 
0080   Status AppendNull() { return builder_->AppendNull(); }
0081 
0082   virtual Result<std::shared_ptr<Array>> ToArray() { return builder_->Finish(); }
0083 
0084   virtual Result<std::shared_ptr<Array>> ToArray(int64_t length) {
0085     ARROW_ASSIGN_OR_RAISE(auto arr, this->ToArray());
0086     return arr->Slice(0, length);
0087   }
0088 
0089   virtual Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
0090     ARROW_ASSIGN_OR_RAISE(auto array, ToArray());
0091     std::vector<std::shared_ptr<Array>> chunks = {std::move(array)};
0092     return std::make_shared<ChunkedArray>(chunks);
0093   }
0094 
0095  protected:
0096   virtual Status Init(MemoryPool* pool) { return Status::OK(); }
0097 
0098   std::shared_ptr<DataType> type_;
0099   std::shared_ptr<ArrayBuilder> builder_;
0100   OptionsType options_;
0101   bool may_overflow_ = false;
0102   bool rewind_on_overflow_ = false;
0103 };
0104 
0105 template <typename ArrowType, typename BaseConverter>
0106 class PrimitiveConverter : public BaseConverter {
0107  public:
0108   using BuilderType = typename TypeTraits<ArrowType>::BuilderType;
0109 
0110  protected:
0111   Status Init(MemoryPool* pool) override {
0112     this->builder_ = std::make_shared<BuilderType>(this->type_, pool);
0113     // Narrow variable-sized binary types may overflow
0114     this->may_overflow_ = is_binary_like(this->type_->id());
0115     primitive_type_ = checked_cast<const ArrowType*>(this->type_.get());
0116     primitive_builder_ = checked_cast<BuilderType*>(this->builder_.get());
0117     return Status::OK();
0118   }
0119 
0120   const ArrowType* primitive_type_;
0121   BuilderType* primitive_builder_;
0122 };
0123 
0124 template <typename ArrowType, typename BaseConverter,
0125           template <typename...> class ConverterTrait>
0126 class ListConverter : public BaseConverter {
0127  public:
0128   using BuilderType = typename TypeTraits<ArrowType>::BuilderType;
0129   using ConverterType = typename ConverterTrait<ArrowType>::type;
0130 
0131  protected:
0132   Status Init(MemoryPool* pool) override {
0133     list_type_ = checked_cast<const ArrowType*>(this->type_.get());
0134     ARROW_ASSIGN_OR_RAISE(value_converter_,
0135                           (MakeConverter<BaseConverter, ConverterTrait>(
0136                               list_type_->value_type(), this->options_, pool)));
0137     this->builder_ =
0138         std::make_shared<BuilderType>(pool, value_converter_->builder(), this->type_);
0139     list_builder_ = checked_cast<BuilderType*>(this->builder_.get());
0140     // Narrow list types may overflow
0141     this->may_overflow_ = this->rewind_on_overflow_ =
0142         sizeof(typename ArrowType::offset_type) < sizeof(int64_t);
0143     return Status::OK();
0144   }
0145 
0146   const ArrowType* list_type_;
0147   BuilderType* list_builder_;
0148   std::unique_ptr<BaseConverter> value_converter_;
0149 };
0150 
0151 template <typename BaseConverter, template <typename...> class ConverterTrait>
0152 class StructConverter : public BaseConverter {
0153  public:
0154   using ConverterType = typename ConverterTrait<StructType>::type;
0155 
0156   Status Reserve(int64_t additional_capacity) override {
0157     ARROW_RETURN_NOT_OK(this->builder_->Reserve(additional_capacity));
0158     for (const auto& child : children_) {
0159       ARROW_RETURN_NOT_OK(child->Reserve(additional_capacity));
0160     }
0161     return Status::OK();
0162   }
0163 
0164  protected:
0165   Status Init(MemoryPool* pool) override {
0166     std::unique_ptr<BaseConverter> child_converter;
0167     std::vector<std::shared_ptr<ArrayBuilder>> child_builders;
0168 
0169     struct_type_ = checked_cast<const StructType*>(this->type_.get());
0170     for (const auto& field : struct_type_->fields()) {
0171       ARROW_ASSIGN_OR_RAISE(child_converter,
0172                             (MakeConverter<BaseConverter, ConverterTrait>(
0173                                 field->type(), this->options_, pool)));
0174       this->may_overflow_ |= child_converter->may_overflow();
0175       this->rewind_on_overflow_ = this->may_overflow_;
0176       child_builders.push_back(child_converter->builder());
0177       children_.push_back(std::move(child_converter));
0178     }
0179 
0180     this->builder_ =
0181         std::make_shared<StructBuilder>(this->type_, pool, std::move(child_builders));
0182     struct_builder_ = checked_cast<StructBuilder*>(this->builder_.get());
0183 
0184     return Status::OK();
0185   }
0186 
0187   const StructType* struct_type_;
0188   StructBuilder* struct_builder_;
0189   std::vector<std::unique_ptr<BaseConverter>> children_;
0190 };
0191 
0192 template <typename ValueType, typename BaseConverter>
0193 class DictionaryConverter : public BaseConverter {
0194  public:
0195   using BuilderType = DictionaryBuilder<ValueType>;
0196 
0197  protected:
0198   Status Init(MemoryPool* pool) override {
0199     std::unique_ptr<ArrayBuilder> builder;
0200     ARROW_RETURN_NOT_OK(MakeDictionaryBuilder(pool, this->type_, NULLPTR, &builder));
0201     this->builder_ = std::move(builder);
0202     this->may_overflow_ = false;
0203     dict_type_ = checked_cast<const DictionaryType*>(this->type_.get());
0204     value_type_ = checked_cast<const ValueType*>(dict_type_->value_type().get());
0205     value_builder_ = checked_cast<BuilderType*>(this->builder_.get());
0206     return Status::OK();
0207   }
0208 
0209   const DictionaryType* dict_type_;
0210   const ValueType* value_type_;
0211   BuilderType* value_builder_;
0212 };
0213 
0214 template <typename BaseConverter, template <typename...> class ConverterTrait>
0215 struct MakeConverterImpl {
0216   template <typename T, typename ConverterType = typename ConverterTrait<T>::type>
0217   Status Visit(const T&) {
0218     out.reset(new ConverterType());
0219     return out->Construct(std::move(type), std::move(options), pool);
0220   }
0221 
0222   Status Visit(const DictionaryType& t) {
0223     switch (t.value_type()->id()) {
0224 #define DICTIONARY_CASE(TYPE)                                                       \
0225   case TYPE::type_id:                                                               \
0226     out = std::make_unique<                                                         \
0227         typename ConverterTrait<DictionaryType>::template dictionary_type<TYPE>>(); \
0228     break;
0229       DICTIONARY_CASE(BooleanType);
0230       DICTIONARY_CASE(Int8Type);
0231       DICTIONARY_CASE(Int16Type);
0232       DICTIONARY_CASE(Int32Type);
0233       DICTIONARY_CASE(Int64Type);
0234       DICTIONARY_CASE(UInt8Type);
0235       DICTIONARY_CASE(UInt16Type);
0236       DICTIONARY_CASE(UInt32Type);
0237       DICTIONARY_CASE(UInt64Type);
0238       DICTIONARY_CASE(FloatType);
0239       DICTIONARY_CASE(DoubleType);
0240       DICTIONARY_CASE(BinaryType);
0241       DICTIONARY_CASE(StringType);
0242       DICTIONARY_CASE(FixedSizeBinaryType);
0243 #undef DICTIONARY_CASE
0244       default:
0245         return Status::NotImplemented("DictionaryArray converter for type ", t.ToString(),
0246                                       " not implemented");
0247     }
0248     return out->Construct(std::move(type), std::move(options), pool);
0249   }
0250 
0251   Status Visit(const DataType& t) { return Status::NotImplemented(t.name()); }
0252 
0253   std::shared_ptr<DataType> type;
0254   typename BaseConverter::OptionsType options;
0255   MemoryPool* pool;
0256   std::unique_ptr<BaseConverter> out;
0257 };
0258 
0259 template <typename BaseConverter, template <typename...> class ConverterTrait>
0260 static Result<std::unique_ptr<BaseConverter>> MakeConverter(
0261     std::shared_ptr<DataType> type, typename BaseConverter::OptionsType options,
0262     MemoryPool* pool) {
0263   MakeConverterImpl<BaseConverter, ConverterTrait> visitor{
0264       std::move(type), std::move(options), pool, NULLPTR};
0265   ARROW_RETURN_NOT_OK(VisitTypeInline(*visitor.type, &visitor));
0266   return std::move(visitor.out);
0267 }
0268 
0269 template <typename Converter>
0270 class Chunker {
0271  public:
0272   using InputType = typename Converter::InputType;
0273 
0274   explicit Chunker(std::unique_ptr<Converter> converter)
0275       : converter_(std::move(converter)) {}
0276 
0277   Status Reserve(int64_t additional_capacity) {
0278     ARROW_RETURN_NOT_OK(converter_->Reserve(additional_capacity));
0279     reserved_ += additional_capacity;
0280     return Status::OK();
0281   }
0282 
0283   Status AppendNull() {
0284     auto status = converter_->AppendNull();
0285     if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
0286       if (converter_->builder()->length() == 0) {
0287         // Builder length == 0 means the individual element is too large to append.
0288         // In this case, no need to try again.
0289         return status;
0290       }
0291       ARROW_RETURN_NOT_OK(FinishChunk());
0292       return converter_->AppendNull();
0293     }
0294     ++length_;
0295     return status;
0296   }
0297 
0298   Status Append(InputType value) {
0299     auto status = converter_->Append(value);
0300     if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
0301       if (converter_->builder()->length() == 0) {
0302         return status;
0303       }
0304       ARROW_RETURN_NOT_OK(FinishChunk());
0305       return Append(value);
0306     }
0307     ++length_;
0308     return status;
0309   }
0310 
0311   Status Extend(InputType values, int64_t size, int64_t offset = 0) {
0312     while (offset < size) {
0313       auto length_before = converter_->builder()->length();
0314       auto status = converter_->Extend(values, size, offset);
0315       auto length_after = converter_->builder()->length();
0316       auto num_converted = length_after - length_before;
0317 
0318       offset += num_converted;
0319       length_ += num_converted;
0320 
0321       if (status.IsCapacityError()) {
0322         if (converter_->builder()->length() == 0) {
0323           // Builder length == 0 means the individual element is too large to append.
0324           // In this case, no need to try again.
0325           return status;
0326         } else if (converter_->rewind_on_overflow()) {
0327           // The list-like and binary-like conversion paths may raise  a capacity error,
0328           // we need to handle them differently. While the binary-like converters check
0329           // the capacity before append/extend the list-like converters just check after
0330           // append/extend. Thus depending on the implementation semantics we may need
0331           // to rewind (slice) the output chunk by one.
0332           length_ -= 1;
0333           offset -= 1;
0334         }
0335         ARROW_RETURN_NOT_OK(FinishChunk());
0336       } else if (!status.ok()) {
0337         return status;
0338       }
0339     }
0340     return Status::OK();
0341   }
0342 
0343   Status ExtendMasked(InputType values, InputType mask, int64_t size,
0344                       int64_t offset = 0) {
0345     while (offset < size) {
0346       auto length_before = converter_->builder()->length();
0347       auto status = converter_->ExtendMasked(values, mask, size, offset);
0348       auto length_after = converter_->builder()->length();
0349       auto num_converted = length_after - length_before;
0350 
0351       offset += num_converted;
0352       length_ += num_converted;
0353 
0354       if (status.IsCapacityError()) {
0355         if (converter_->builder()->length() == 0) {
0356           // Builder length == 0 means the individual element is too large to append.
0357           // In this case, no need to try again.
0358           return status;
0359         } else if (converter_->rewind_on_overflow()) {
0360           // The list-like and binary-like conversion paths may raise  a capacity error,
0361           // we need to handle them differently. While the binary-like converters check
0362           // the capacity before append/extend the list-like converters just check after
0363           // append/extend. Thus depending on the implementation semantics we may need
0364           // to rewind (slice) the output chunk by one.
0365           length_ -= 1;
0366           offset -= 1;
0367         }
0368         ARROW_RETURN_NOT_OK(FinishChunk());
0369       } else if (!status.ok()) {
0370         return status;
0371       }
0372     }
0373     return Status::OK();
0374   }
0375 
0376   Status FinishChunk() {
0377     ARROW_ASSIGN_OR_RAISE(auto chunk, converter_->ToArray(length_));
0378     chunks_.push_back(chunk);
0379     // Reserve space for the remaining items.
0380     // Besides being an optimization, it is also required if the converter's
0381     // implementation relies on unsafe builder methods in converter->Append().
0382     auto remaining = reserved_ - length_;
0383     Reset();
0384     return Reserve(remaining);
0385   }
0386 
0387   Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
0388     ARROW_RETURN_NOT_OK(FinishChunk());
0389     return std::make_shared<ChunkedArray>(chunks_);
0390   }
0391 
0392  protected:
0393   void Reset() {
0394     converter_->builder()->Reset();
0395     length_ = 0;
0396     reserved_ = 0;
0397   }
0398 
0399   int64_t length_ = 0;
0400   int64_t reserved_ = 0;
0401   std::unique_ptr<Converter> converter_;
0402   std::vector<std::shared_ptr<Array>> chunks_;
0403 };
0404 
0405 template <typename T>
0406 static Result<std::unique_ptr<Chunker<T>>> MakeChunker(std::unique_ptr<T> converter) {
0407   return std::make_unique<Chunker<T>>(std::move(converter));
0408 }
0409 
0410 }  // namespace internal
0411 }  // namespace arrow