File indexing completed on 2025-08-28 08:27:07
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #include <memory>
0019 #include <string>
0020 #include <utility>
0021 #include <vector>
0022
0023 #include "arrow/array.h"
0024 #include "arrow/chunked_array.h"
0025 #include "arrow/status.h"
0026 #include "arrow/type.h"
0027 #include "arrow/type_traits.h"
0028 #include "arrow/util/checked_cast.h"
0029 #include "arrow/visit_type_inline.h"
0030
0031 namespace arrow {
0032 namespace internal {
0033
0034 template <typename BaseConverter, template <typename...> class ConverterTrait>
0035 static Result<std::unique_ptr<BaseConverter>> MakeConverter(
0036 std::shared_ptr<DataType> type, typename BaseConverter::OptionsType options,
0037 MemoryPool* pool);
0038
0039 template <typename Input, typename Options>
0040 class Converter {
0041 public:
0042 using Self = Converter<Input, Options>;
0043 using InputType = Input;
0044 using OptionsType = Options;
0045
0046 virtual ~Converter() = default;
0047
0048 Status Construct(std::shared_ptr<DataType> type, OptionsType options,
0049 MemoryPool* pool) {
0050 type_ = std::move(type);
0051 options_ = std::move(options);
0052 return Init(pool);
0053 }
0054
0055 virtual Status Append(InputType value) { return Status::NotImplemented("Append"); }
0056
0057 virtual Status Extend(InputType values, int64_t size, int64_t offset = 0) {
0058 return Status::NotImplemented("Extend");
0059 }
0060
0061 virtual Status ExtendMasked(InputType values, InputType mask, int64_t size,
0062 int64_t offset = 0) {
0063 return Status::NotImplemented("ExtendMasked");
0064 }
0065
0066 const std::shared_ptr<ArrayBuilder>& builder() const { return builder_; }
0067
0068 const std::shared_ptr<DataType>& type() const { return type_; }
0069
0070 OptionsType options() const { return options_; }
0071
0072 bool may_overflow() const { return may_overflow_; }
0073
0074 bool rewind_on_overflow() const { return rewind_on_overflow_; }
0075
0076 virtual Status Reserve(int64_t additional_capacity) {
0077 return builder_->Reserve(additional_capacity);
0078 }
0079
0080 Status AppendNull() { return builder_->AppendNull(); }
0081
0082 virtual Result<std::shared_ptr<Array>> ToArray() { return builder_->Finish(); }
0083
0084 virtual Result<std::shared_ptr<Array>> ToArray(int64_t length) {
0085 ARROW_ASSIGN_OR_RAISE(auto arr, this->ToArray());
0086 return arr->Slice(0, length);
0087 }
0088
0089 virtual Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
0090 ARROW_ASSIGN_OR_RAISE(auto array, ToArray());
0091 std::vector<std::shared_ptr<Array>> chunks = {std::move(array)};
0092 return std::make_shared<ChunkedArray>(chunks);
0093 }
0094
0095 protected:
0096 virtual Status Init(MemoryPool* pool) { return Status::OK(); }
0097
0098 std::shared_ptr<DataType> type_;
0099 std::shared_ptr<ArrayBuilder> builder_;
0100 OptionsType options_;
0101 bool may_overflow_ = false;
0102 bool rewind_on_overflow_ = false;
0103 };
0104
0105 template <typename ArrowType, typename BaseConverter>
0106 class PrimitiveConverter : public BaseConverter {
0107 public:
0108 using BuilderType = typename TypeTraits<ArrowType>::BuilderType;
0109
0110 protected:
0111 Status Init(MemoryPool* pool) override {
0112 this->builder_ = std::make_shared<BuilderType>(this->type_, pool);
0113
0114 this->may_overflow_ = is_binary_like(this->type_->id());
0115 primitive_type_ = checked_cast<const ArrowType*>(this->type_.get());
0116 primitive_builder_ = checked_cast<BuilderType*>(this->builder_.get());
0117 return Status::OK();
0118 }
0119
0120 const ArrowType* primitive_type_;
0121 BuilderType* primitive_builder_;
0122 };
0123
0124 template <typename ArrowType, typename BaseConverter,
0125 template <typename...> class ConverterTrait>
0126 class ListConverter : public BaseConverter {
0127 public:
0128 using BuilderType = typename TypeTraits<ArrowType>::BuilderType;
0129 using ConverterType = typename ConverterTrait<ArrowType>::type;
0130
0131 protected:
0132 Status Init(MemoryPool* pool) override {
0133 list_type_ = checked_cast<const ArrowType*>(this->type_.get());
0134 ARROW_ASSIGN_OR_RAISE(value_converter_,
0135 (MakeConverter<BaseConverter, ConverterTrait>(
0136 list_type_->value_type(), this->options_, pool)));
0137 this->builder_ =
0138 std::make_shared<BuilderType>(pool, value_converter_->builder(), this->type_);
0139 list_builder_ = checked_cast<BuilderType*>(this->builder_.get());
0140
0141 this->may_overflow_ = this->rewind_on_overflow_ =
0142 sizeof(typename ArrowType::offset_type) < sizeof(int64_t);
0143 return Status::OK();
0144 }
0145
0146 const ArrowType* list_type_;
0147 BuilderType* list_builder_;
0148 std::unique_ptr<BaseConverter> value_converter_;
0149 };
0150
0151 template <typename BaseConverter, template <typename...> class ConverterTrait>
0152 class StructConverter : public BaseConverter {
0153 public:
0154 using ConverterType = typename ConverterTrait<StructType>::type;
0155
0156 Status Reserve(int64_t additional_capacity) override {
0157 ARROW_RETURN_NOT_OK(this->builder_->Reserve(additional_capacity));
0158 for (const auto& child : children_) {
0159 ARROW_RETURN_NOT_OK(child->Reserve(additional_capacity));
0160 }
0161 return Status::OK();
0162 }
0163
0164 protected:
0165 Status Init(MemoryPool* pool) override {
0166 std::unique_ptr<BaseConverter> child_converter;
0167 std::vector<std::shared_ptr<ArrayBuilder>> child_builders;
0168
0169 struct_type_ = checked_cast<const StructType*>(this->type_.get());
0170 for (const auto& field : struct_type_->fields()) {
0171 ARROW_ASSIGN_OR_RAISE(child_converter,
0172 (MakeConverter<BaseConverter, ConverterTrait>(
0173 field->type(), this->options_, pool)));
0174 this->may_overflow_ |= child_converter->may_overflow();
0175 this->rewind_on_overflow_ = this->may_overflow_;
0176 child_builders.push_back(child_converter->builder());
0177 children_.push_back(std::move(child_converter));
0178 }
0179
0180 this->builder_ =
0181 std::make_shared<StructBuilder>(this->type_, pool, std::move(child_builders));
0182 struct_builder_ = checked_cast<StructBuilder*>(this->builder_.get());
0183
0184 return Status::OK();
0185 }
0186
0187 const StructType* struct_type_;
0188 StructBuilder* struct_builder_;
0189 std::vector<std::unique_ptr<BaseConverter>> children_;
0190 };
0191
0192 template <typename ValueType, typename BaseConverter>
0193 class DictionaryConverter : public BaseConverter {
0194 public:
0195 using BuilderType = DictionaryBuilder<ValueType>;
0196
0197 protected:
0198 Status Init(MemoryPool* pool) override {
0199 std::unique_ptr<ArrayBuilder> builder;
0200 ARROW_RETURN_NOT_OK(MakeDictionaryBuilder(pool, this->type_, NULLPTR, &builder));
0201 this->builder_ = std::move(builder);
0202 this->may_overflow_ = false;
0203 dict_type_ = checked_cast<const DictionaryType*>(this->type_.get());
0204 value_type_ = checked_cast<const ValueType*>(dict_type_->value_type().get());
0205 value_builder_ = checked_cast<BuilderType*>(this->builder_.get());
0206 return Status::OK();
0207 }
0208
0209 const DictionaryType* dict_type_;
0210 const ValueType* value_type_;
0211 BuilderType* value_builder_;
0212 };
0213
0214 template <typename BaseConverter, template <typename...> class ConverterTrait>
0215 struct MakeConverterImpl {
0216 template <typename T, typename ConverterType = typename ConverterTrait<T>::type>
0217 Status Visit(const T&) {
0218 out.reset(new ConverterType());
0219 return out->Construct(std::move(type), std::move(options), pool);
0220 }
0221
0222 Status Visit(const DictionaryType& t) {
0223 switch (t.value_type()->id()) {
0224 #define DICTIONARY_CASE(TYPE) \
0225 case TYPE::type_id: \
0226 out = std::make_unique< \
0227 typename ConverterTrait<DictionaryType>::template dictionary_type<TYPE>>(); \
0228 break;
0229 DICTIONARY_CASE(BooleanType);
0230 DICTIONARY_CASE(Int8Type);
0231 DICTIONARY_CASE(Int16Type);
0232 DICTIONARY_CASE(Int32Type);
0233 DICTIONARY_CASE(Int64Type);
0234 DICTIONARY_CASE(UInt8Type);
0235 DICTIONARY_CASE(UInt16Type);
0236 DICTIONARY_CASE(UInt32Type);
0237 DICTIONARY_CASE(UInt64Type);
0238 DICTIONARY_CASE(FloatType);
0239 DICTIONARY_CASE(DoubleType);
0240 DICTIONARY_CASE(BinaryType);
0241 DICTIONARY_CASE(StringType);
0242 DICTIONARY_CASE(FixedSizeBinaryType);
0243 #undef DICTIONARY_CASE
0244 default:
0245 return Status::NotImplemented("DictionaryArray converter for type ", t.ToString(),
0246 " not implemented");
0247 }
0248 return out->Construct(std::move(type), std::move(options), pool);
0249 }
0250
0251 Status Visit(const DataType& t) { return Status::NotImplemented(t.name()); }
0252
0253 std::shared_ptr<DataType> type;
0254 typename BaseConverter::OptionsType options;
0255 MemoryPool* pool;
0256 std::unique_ptr<BaseConverter> out;
0257 };
0258
0259 template <typename BaseConverter, template <typename...> class ConverterTrait>
0260 static Result<std::unique_ptr<BaseConverter>> MakeConverter(
0261 std::shared_ptr<DataType> type, typename BaseConverter::OptionsType options,
0262 MemoryPool* pool) {
0263 MakeConverterImpl<BaseConverter, ConverterTrait> visitor{
0264 std::move(type), std::move(options), pool, NULLPTR};
0265 ARROW_RETURN_NOT_OK(VisitTypeInline(*visitor.type, &visitor));
0266 return std::move(visitor.out);
0267 }
0268
0269 template <typename Converter>
0270 class Chunker {
0271 public:
0272 using InputType = typename Converter::InputType;
0273
0274 explicit Chunker(std::unique_ptr<Converter> converter)
0275 : converter_(std::move(converter)) {}
0276
0277 Status Reserve(int64_t additional_capacity) {
0278 ARROW_RETURN_NOT_OK(converter_->Reserve(additional_capacity));
0279 reserved_ += additional_capacity;
0280 return Status::OK();
0281 }
0282
0283 Status AppendNull() {
0284 auto status = converter_->AppendNull();
0285 if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
0286 if (converter_->builder()->length() == 0) {
0287
0288
0289 return status;
0290 }
0291 ARROW_RETURN_NOT_OK(FinishChunk());
0292 return converter_->AppendNull();
0293 }
0294 ++length_;
0295 return status;
0296 }
0297
0298 Status Append(InputType value) {
0299 auto status = converter_->Append(value);
0300 if (ARROW_PREDICT_FALSE(status.IsCapacityError())) {
0301 if (converter_->builder()->length() == 0) {
0302 return status;
0303 }
0304 ARROW_RETURN_NOT_OK(FinishChunk());
0305 return Append(value);
0306 }
0307 ++length_;
0308 return status;
0309 }
0310
0311 Status Extend(InputType values, int64_t size, int64_t offset = 0) {
0312 while (offset < size) {
0313 auto length_before = converter_->builder()->length();
0314 auto status = converter_->Extend(values, size, offset);
0315 auto length_after = converter_->builder()->length();
0316 auto num_converted = length_after - length_before;
0317
0318 offset += num_converted;
0319 length_ += num_converted;
0320
0321 if (status.IsCapacityError()) {
0322 if (converter_->builder()->length() == 0) {
0323
0324
0325 return status;
0326 } else if (converter_->rewind_on_overflow()) {
0327
0328
0329
0330
0331
0332 length_ -= 1;
0333 offset -= 1;
0334 }
0335 ARROW_RETURN_NOT_OK(FinishChunk());
0336 } else if (!status.ok()) {
0337 return status;
0338 }
0339 }
0340 return Status::OK();
0341 }
0342
0343 Status ExtendMasked(InputType values, InputType mask, int64_t size,
0344 int64_t offset = 0) {
0345 while (offset < size) {
0346 auto length_before = converter_->builder()->length();
0347 auto status = converter_->ExtendMasked(values, mask, size, offset);
0348 auto length_after = converter_->builder()->length();
0349 auto num_converted = length_after - length_before;
0350
0351 offset += num_converted;
0352 length_ += num_converted;
0353
0354 if (status.IsCapacityError()) {
0355 if (converter_->builder()->length() == 0) {
0356
0357
0358 return status;
0359 } else if (converter_->rewind_on_overflow()) {
0360
0361
0362
0363
0364
0365 length_ -= 1;
0366 offset -= 1;
0367 }
0368 ARROW_RETURN_NOT_OK(FinishChunk());
0369 } else if (!status.ok()) {
0370 return status;
0371 }
0372 }
0373 return Status::OK();
0374 }
0375
0376 Status FinishChunk() {
0377 ARROW_ASSIGN_OR_RAISE(auto chunk, converter_->ToArray(length_));
0378 chunks_.push_back(chunk);
0379
0380
0381
0382 auto remaining = reserved_ - length_;
0383 Reset();
0384 return Reserve(remaining);
0385 }
0386
0387 Result<std::shared_ptr<ChunkedArray>> ToChunkedArray() {
0388 ARROW_RETURN_NOT_OK(FinishChunk());
0389 return std::make_shared<ChunkedArray>(chunks_);
0390 }
0391
0392 protected:
0393 void Reset() {
0394 converter_->builder()->Reset();
0395 length_ = 0;
0396 reserved_ = 0;
0397 }
0398
0399 int64_t length_ = 0;
0400 int64_t reserved_ = 0;
0401 std::unique_ptr<Converter> converter_;
0402 std::vector<std::shared_ptr<Array>> chunks_;
0403 };
0404
0405 template <typename T>
0406 static Result<std::unique_ptr<Chunker<T>>> MakeChunker(std::unique_ptr<T> converter) {
0407 return std::make_unique<Chunker<T>>(std::move(converter));
0408 }
0409
0410 }
0411 }