File indexing completed on 2025-08-28 08:26:56
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021 #pragma once
0022
0023 #include <atomic>
0024 #include <cstdint>
0025 #include <limits>
0026 #include <memory>
0027 #include <optional>
0028 #include <string>
0029 #include <utility>
0030 #include <vector>
0031
0032 #include "arrow/array/data.h"
0033 #include "arrow/compute/expression.h"
0034 #include "arrow/compute/type_fwd.h"
0035 #include "arrow/datum.h"
0036 #include "arrow/result.h"
0037 #include "arrow/type_fwd.h"
0038 #include "arrow/util/macros.h"
0039 #include "arrow/util/type_fwd.h"
0040 #include "arrow/util/visibility.h"
0041
0042 namespace arrow {
0043 namespace compute {
0044
0045
0046
0047
0048
0049 static constexpr int64_t kDefaultExecChunksize = UINT16_MAX;
0050
0051
0052
0053 class ARROW_EXPORT ExecContext {
0054 public:
0055
0056 explicit ExecContext(MemoryPool* pool = default_memory_pool(),
0057 ::arrow::internal::Executor* executor = NULLPTR,
0058 FunctionRegistry* func_registry = NULLPTR);
0059
0060
0061
0062 MemoryPool* memory_pool() const { return pool_; }
0063
0064 const ::arrow::internal::CpuInfo* cpu_info() const;
0065
0066
0067 ::arrow::internal::Executor* executor() const { return executor_; }
0068
0069
0070
0071
0072 FunctionRegistry* func_registry() const { return func_registry_; }
0073
0074
0075
0076
0077
0078 void set_exec_chunksize(int64_t chunksize) { exec_chunksize_ = chunksize; }
0079
0080
0081
0082
0083 int64_t exec_chunksize() const { return exec_chunksize_; }
0084
0085
0086
0087 void set_use_threads(bool use_threads = true) { use_threads_ = use_threads; }
0088
0089
0090
0091 bool use_threads() const { return use_threads_; }
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104 void set_preallocate_contiguous(bool preallocate) {
0105 preallocate_contiguous_ = preallocate;
0106 }
0107
0108
0109
0110
0111 bool preallocate_contiguous() const { return preallocate_contiguous_; }
0112
0113 private:
0114 MemoryPool* pool_;
0115 ::arrow::internal::Executor* executor_;
0116 FunctionRegistry* func_registry_;
0117 int64_t exec_chunksize_ = std::numeric_limits<int64_t>::max();
0118 bool preallocate_contiguous_ = true;
0119 bool use_threads_ = true;
0120 };
0121
0122
0123
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137 class ARROW_EXPORT SelectionVector {
0138 public:
0139 explicit SelectionVector(std::shared_ptr<ArrayData> data);
0140
0141 explicit SelectionVector(const Array& arr);
0142
0143
0144 static Result<std::shared_ptr<SelectionVector>> FromMask(const BooleanArray& arr);
0145
0146 const int32_t* indices() const { return indices_; }
0147 int32_t length() const;
0148
0149 private:
0150 std::shared_ptr<ArrayData> data_;
0151 const int32_t* indices_;
0152 };
0153
0154
0155 constexpr int64_t kUnsequencedIndex = -1;
0156
0157
0158
0159
0160
0161
0162
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174 struct ARROW_EXPORT ExecBatch {
0175 ExecBatch() = default;
0176 ExecBatch(std::vector<Datum> values, int64_t length)
0177 : values(std::move(values)), length(length) {}
0178
0179 explicit ExecBatch(const RecordBatch& batch);
0180
0181
0182 static Result<int64_t> InferLength(const std::vector<Datum>& values);
0183
0184
0185
0186
0187
0188
0189
0190 static Result<ExecBatch> Make(std::vector<Datum> values, int64_t length = -1);
0191
0192 Result<std::shared_ptr<RecordBatch>> ToRecordBatch(
0193 std::shared_ptr<Schema> schema, MemoryPool* pool = default_memory_pool()) const;
0194
0195
0196
0197 std::vector<Datum> values;
0198
0199
0200
0201
0202
0203
0204 std::shared_ptr<SelectionVector> selection_vector;
0205
0206
0207 Expression guarantee = literal(true);
0208
0209
0210
0211
0212
0213
0214
0215
0216
0217
0218
0219 int64_t length = 0;
0220
0221
0222
0223
0224
0225 int64_t index = kUnsequencedIndex;
0226
0227
0228
0229
0230
0231
0232
0233
0234 int64_t TotalBufferSize() const;
0235
0236
0237 template <typename index_type>
0238 inline const Datum& operator[](index_type i) const {
0239 return values[i];
0240 }
0241
0242 bool Equals(const ExecBatch& other) const;
0243
0244
0245 int num_values() const { return static_cast<int>(values.size()); }
0246
0247 ExecBatch Slice(int64_t offset, int64_t length) const;
0248
0249 Result<ExecBatch> SelectValues(const std::vector<int>& ids) const;
0250
0251
0252 std::vector<TypeHolder> GetTypes() const {
0253 std::vector<TypeHolder> result;
0254 for (const auto& value : this->values) {
0255 result.emplace_back(value.type());
0256 }
0257 return result;
0258 }
0259
0260 std::string ToString() const;
0261 };
0262
0263 inline bool operator==(const ExecBatch& l, const ExecBatch& r) { return l.Equals(r); }
0264 inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return !l.Equals(r); }
0265
0266 ARROW_EXPORT void PrintTo(const ExecBatch&, std::ostream*);
0267
0268
0269
0270
0271
0272
0273
0274
0275 struct ExecValue {
0276 ArraySpan array = {};
0277 const Scalar* scalar = NULLPTR;
0278
0279 ExecValue(Scalar* scalar)
0280 : scalar(scalar) {}
0281
0282 ExecValue(ArraySpan array)
0283 : array(std::move(array)) {}
0284
0285 ExecValue(const ArrayData& array) {
0286 this->array.SetMembers(array);
0287 }
0288
0289 ExecValue() = default;
0290 ExecValue(const ExecValue& other) = default;
0291 ExecValue& operator=(const ExecValue& other) = default;
0292 ExecValue(ExecValue&& other) = default;
0293 ExecValue& operator=(ExecValue&& other) = default;
0294
0295 int64_t length() const { return this->is_array() ? this->array.length : 1; }
0296
0297 bool is_array() const { return this->scalar == NULLPTR; }
0298 bool is_scalar() const { return !this->is_array(); }
0299
0300 void SetArray(const ArrayData& array) {
0301 this->array.SetMembers(array);
0302 this->scalar = NULLPTR;
0303 }
0304
0305 void SetScalar(const Scalar* scalar) { this->scalar = scalar; }
0306
0307 template <typename ExactType>
0308 const ExactType& scalar_as() const {
0309 return ::arrow::internal::checked_cast<const ExactType&>(*this->scalar);
0310 }
0311
0312
0313
0314 int64_t null_count() const {
0315 if (this->is_array()) {
0316 return this->array.GetNullCount();
0317 } else {
0318 return this->scalar->is_valid ? 0 : 1;
0319 }
0320 }
0321
0322 const DataType* type() const {
0323 if (this->is_array()) {
0324 return array.type;
0325 } else {
0326 return scalar->type.get();
0327 }
0328 }
0329 };
0330
0331 struct ARROW_EXPORT ExecResult {
0332
0333 std::variant<ArraySpan, std::shared_ptr<ArrayData>> value;
0334
0335 int64_t length() const {
0336 if (this->is_array_span()) {
0337 return this->array_span()->length;
0338 } else {
0339 return this->array_data()->length;
0340 }
0341 }
0342
0343 const DataType* type() const {
0344 if (this->is_array_span()) {
0345 return this->array_span()->type;
0346 } else {
0347 return this->array_data()->type.get();
0348 }
0349 }
0350
0351 const ArraySpan* array_span() const { return &std::get<ArraySpan>(this->value); }
0352 ArraySpan* array_span_mutable() { return &std::get<ArraySpan>(this->value); }
0353
0354 bool is_array_span() const { return this->value.index() == 0; }
0355
0356 const std::shared_ptr<ArrayData>& array_data() const {
0357 return std::get<std::shared_ptr<ArrayData>>(this->value);
0358 }
0359 ArrayData* array_data_mutable() {
0360 return std::get<std::shared_ptr<ArrayData>>(this->value).get();
0361 }
0362
0363 bool is_array_data() const { return this->value.index() == 1; }
0364 };
0365
0366
0367
0368
0369 struct ARROW_EXPORT ExecSpan {
0370 ExecSpan() = default;
0371 ExecSpan(const ExecSpan& other) = default;
0372 ExecSpan& operator=(const ExecSpan& other) = default;
0373 ExecSpan(ExecSpan&& other) = default;
0374 ExecSpan& operator=(ExecSpan&& other) = default;
0375
0376 explicit ExecSpan(std::vector<ExecValue> values, int64_t length)
0377 : length(length), values(std::move(values)) {}
0378
0379 explicit ExecSpan(const ExecBatch& batch) {
0380 this->length = batch.length;
0381 this->values.resize(batch.values.size());
0382 for (size_t i = 0; i < batch.values.size(); ++i) {
0383 const Datum& in_value = batch[i];
0384 ExecValue* out_value = &this->values[i];
0385 if (in_value.is_array()) {
0386 out_value->SetArray(*in_value.array());
0387 } else {
0388 out_value->SetScalar(in_value.scalar().get());
0389 }
0390 }
0391 }
0392
0393
0394 template <typename index_type>
0395 inline const ExecValue& operator[](index_type i) const {
0396 return values[i];
0397 }
0398
0399
0400 int num_values() const { return static_cast<int>(values.size()); }
0401
0402 std::vector<TypeHolder> GetTypes() const {
0403 std::vector<TypeHolder> result;
0404 for (const auto& value : this->values) {
0405 result.emplace_back(value.type());
0406 }
0407 return result;
0408 }
0409
0410 ExecBatch ToExecBatch() const {
0411 ExecBatch result;
0412 result.length = this->length;
0413 for (const ExecValue& value : this->values) {
0414 if (value.is_array()) {
0415 result.values.push_back(value.array.ToArrayData());
0416 } else {
0417 result.values.push_back(value.scalar->GetSharedPtr());
0418 }
0419 }
0420 return result;
0421 }
0422
0423 int64_t length = 0;
0424 std::vector<ExecValue> values;
0425 };
0426
0427
0428
0429
0430
0431
0432
0433
0434
0435 ARROW_EXPORT
0436 Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
0437 const FunctionOptions* options, ExecContext* ctx = NULLPTR);
0438
0439
0440
0441
0442 ARROW_EXPORT
0443 Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
0444 ExecContext* ctx = NULLPTR);
0445
0446
0447
0448
0449
0450 ARROW_EXPORT
0451 Result<Datum> CallFunction(const std::string& func_name, const ExecBatch& batch,
0452 const FunctionOptions* options, ExecContext* ctx = NULLPTR);
0453
0454
0455
0456
0457 ARROW_EXPORT
0458 Result<Datum> CallFunction(const std::string& func_name, const ExecBatch& batch,
0459 ExecContext* ctx = NULLPTR);
0460
0461
0462
0463
0464
0465
0466
0467
0468
0469
0470
0471 ARROW_EXPORT
0472 Result<std::shared_ptr<FunctionExecutor>> GetFunctionExecutor(
0473 const std::string& func_name, std::vector<TypeHolder> in_types,
0474 const FunctionOptions* options = NULLPTR, FunctionRegistry* func_registry = NULLPTR);
0475
0476
0477
0478
0479
0480
0481 ARROW_EXPORT
0482 Result<std::shared_ptr<FunctionExecutor>> GetFunctionExecutor(
0483 const std::string& func_name, const std::vector<Datum>& args,
0484 const FunctionOptions* options = NULLPTR, FunctionRegistry* func_registry = NULLPTR);
0485
0486
0487
0488 }
0489 }