Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:56

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // NOTE: API is EXPERIMENTAL and will change without going through a
0019 // deprecation cycle
0020 
0021 #pragma once
0022 
0023 #include <atomic>
0024 #include <cstdint>
0025 #include <limits>
0026 #include <memory>
0027 #include <optional>
0028 #include <string>
0029 #include <utility>
0030 #include <vector>
0031 
0032 #include "arrow/array/data.h"
0033 #include "arrow/compute/expression.h"
0034 #include "arrow/compute/type_fwd.h"
0035 #include "arrow/datum.h"
0036 #include "arrow/result.h"
0037 #include "arrow/type_fwd.h"
0038 #include "arrow/util/macros.h"
0039 #include "arrow/util/type_fwd.h"
0040 #include "arrow/util/visibility.h"
0041 
0042 namespace arrow {
0043 namespace compute {
0044 
0045 // It seems like 64K might be a good default chunksize to use for execution
0046 // based on the experience of other query processing systems. The current
0047 // default is not to chunk contiguous arrays, though, but this may change in
0048 // the future once parallel execution is implemented
0049 static constexpr int64_t kDefaultExecChunksize = UINT16_MAX;
0050 
0051 /// \brief Context for expression-global variables and options used by
0052 /// function evaluation
0053 class ARROW_EXPORT ExecContext {
0054  public:
0055   // If no function registry passed, the default is used.
0056   explicit ExecContext(MemoryPool* pool = default_memory_pool(),
0057                        ::arrow::internal::Executor* executor = NULLPTR,
0058                        FunctionRegistry* func_registry = NULLPTR);
0059 
0060   /// \brief The MemoryPool used for allocations, default is
0061   /// default_memory_pool().
0062   MemoryPool* memory_pool() const { return pool_; }
0063 
0064   const ::arrow::internal::CpuInfo* cpu_info() const;
0065 
0066   /// \brief An Executor which may be used to parallelize execution.
0067   ::arrow::internal::Executor* executor() const { return executor_; }
0068 
0069   /// \brief The FunctionRegistry for looking up functions by name and
0070   /// selecting kernels for execution. Defaults to the library-global function
0071   /// registry provided by GetFunctionRegistry.
0072   FunctionRegistry* func_registry() const { return func_registry_; }
0073 
0074   // \brief Set maximum length unit of work for kernel execution. Larger
0075   // contiguous array inputs will be split into smaller chunks, and, if
0076   // possible and enabled, processed in parallel. The default chunksize is
0077   // INT64_MAX, so contiguous arrays are not split.
0078   void set_exec_chunksize(int64_t chunksize) { exec_chunksize_ = chunksize; }
0079 
0080   // \brief Maximum length for ExecBatch data chunks processed by
0081   // kernels. Contiguous array inputs with longer length will be split into
0082   // smaller chunks.
0083   int64_t exec_chunksize() const { return exec_chunksize_; }
0084 
0085   /// \brief Set whether to use multiple threads for function execution. This
0086   /// is not yet used.
0087   void set_use_threads(bool use_threads = true) { use_threads_ = use_threads; }
0088 
0089   /// \brief If true, then utilize multiple threads where relevant for function
0090   /// execution. This is not yet used.
0091   bool use_threads() const { return use_threads_; }
0092 
0093   // Set the preallocation strategy for kernel execution as it relates to
0094   // chunked execution. For chunked execution, whether via ChunkedArray inputs
0095   // or splitting larger Array arguments into smaller pieces, contiguous
0096   // allocation (if permitted by the kernel) will allocate one large array to
0097   // write output into yielding it to the caller at the end. If this option is
0098   // set to off, then preallocations will be performed independently for each
0099   // chunk of execution
0100   //
0101   // TODO: At some point we might want the limit the size of contiguous
0102   // preallocations. For example, even if the exec_chunksize is 64K or less, we
0103   // might limit contiguous allocations to 1M records, say.
0104   void set_preallocate_contiguous(bool preallocate) {
0105     preallocate_contiguous_ = preallocate;
0106   }
0107 
0108   /// \brief If contiguous preallocations should be used when doing chunked
0109   /// execution as specified by exec_chunksize(). See
0110   /// set_preallocate_contiguous() for more information.
0111   bool preallocate_contiguous() const { return preallocate_contiguous_; }
0112 
0113  private:
0114   MemoryPool* pool_;
0115   ::arrow::internal::Executor* executor_;
0116   FunctionRegistry* func_registry_;
0117   int64_t exec_chunksize_ = std::numeric_limits<int64_t>::max();
0118   bool preallocate_contiguous_ = true;
0119   bool use_threads_ = true;
0120 };
0121 
0122 // TODO: Consider standardizing on uint16 selection vectors and only use them
0123 // when we can ensure that each value is 64K length or smaller
0124 
0125 /// \brief Container for an array of value selection indices that were
0126 /// materialized from a filter.
0127 ///
0128 /// Columnar query engines (see e.g. [1]) have found that rather than
0129 /// materializing filtered data, the filter can instead be converted to an
0130 /// array of the "on" indices and then "fusing" these indices in operator
0131 /// implementations. This is especially relevant for aggregations but also
0132 /// applies to scalar operations.
0133 ///
0134 /// We are not yet using this so this is mostly a placeholder for now.
0135 ///
0136 /// [1]: http://cidrdb.org/cidr2005/papers/P19.pdf
0137 class ARROW_EXPORT SelectionVector {
0138  public:
0139   explicit SelectionVector(std::shared_ptr<ArrayData> data);
0140 
0141   explicit SelectionVector(const Array& arr);
0142 
0143   /// \brief Create SelectionVector from boolean mask
0144   static Result<std::shared_ptr<SelectionVector>> FromMask(const BooleanArray& arr);
0145 
0146   const int32_t* indices() const { return indices_; }
0147   int32_t length() const;
0148 
0149  private:
0150   std::shared_ptr<ArrayData> data_;
0151   const int32_t* indices_;
0152 };
0153 
0154 /// An index to represent that a batch does not belong to an ordered stream
0155 constexpr int64_t kUnsequencedIndex = -1;
0156 
0157 /// \brief A unit of work for kernel execution. It contains a collection of
0158 /// Array and Scalar values and an optional SelectionVector indicating that
0159 /// there is an unmaterialized filter that either must be materialized, or (if
0160 /// the kernel supports it) pushed down into the kernel implementation.
0161 ///
0162 /// ExecBatch is semantically similar to RecordBatch in that in a SQL context
0163 /// it represents a collection of records, but constant "columns" are
0164 /// represented by Scalar values rather than having to be converted into arrays
0165 /// with repeated values.
0166 ///
0167 /// TODO: Datum uses arrow/util/variant.h which may be a bit heavier-weight
0168 /// than is desirable for this class. Microbenchmarks would help determine for
0169 /// sure. See ARROW-8928.
0170 
0171 /// \addtogroup acero-internals
0172 /// @{
0173 
0174 struct ARROW_EXPORT ExecBatch {
0175   ExecBatch() = default;
0176   ExecBatch(std::vector<Datum> values, int64_t length)
0177       : values(std::move(values)), length(length) {}
0178 
0179   explicit ExecBatch(const RecordBatch& batch);
0180 
0181   /// \brief Infer the ExecBatch length from values.
0182   static Result<int64_t> InferLength(const std::vector<Datum>& values);
0183 
0184   /// Creates an ExecBatch with length-validation.
0185   ///
0186   /// If any value is given, then all values must have a common length. If the given
0187   /// length is negative, then the length of the ExecBatch is set to this common length,
0188   /// or to 1 if no values are given. Otherwise, the given length must equal the common
0189   /// length, if any value is given.
0190   static Result<ExecBatch> Make(std::vector<Datum> values, int64_t length = -1);
0191 
0192   Result<std::shared_ptr<RecordBatch>> ToRecordBatch(
0193       std::shared_ptr<Schema> schema, MemoryPool* pool = default_memory_pool()) const;
0194 
0195   /// The values representing positional arguments to be passed to a kernel's
0196   /// exec function for processing.
0197   std::vector<Datum> values;
0198 
0199   /// A deferred filter represented as an array of indices into the values.
0200   ///
0201   /// For example, the filter [true, true, false, true] would be represented as
0202   /// the selection vector [0, 1, 3]. When the selection vector is set,
0203   /// ExecBatch::length is equal to the length of this array.
0204   std::shared_ptr<SelectionVector> selection_vector;
0205 
0206   /// A predicate Expression guaranteed to evaluate to true for all rows in this batch.
0207   Expression guarantee = literal(true);
0208 
0209   /// The semantic length of the ExecBatch. When the values are all scalars,
0210   /// the length should be set to 1 for non-aggregate kernels, otherwise the
0211   /// length is taken from the array values, except when there is a selection
0212   /// vector. When there is a selection vector set, the length of the batch is
0213   /// the length of the selection. Aggregate kernels can have an ExecBatch
0214   /// formed by projecting just the partition columns from a batch in which
0215   /// case, it would have scalar rows with length greater than 1.
0216   ///
0217   /// If the array values are of length 0 then the length is 0 regardless of
0218   /// whether any values are Scalar.
0219   int64_t length = 0;
0220 
0221   /// \brief index of this batch in a sorted stream of batches
0222   ///
0223   /// This index must be strictly monotonic starting at 0 without gaps or
0224   /// it can be set to kUnsequencedIndex if there is no meaningful order
0225   int64_t index = kUnsequencedIndex;
0226 
0227   /// \brief The sum of bytes in each buffer referenced by the batch
0228   ///
0229   /// Note: Scalars are not counted
0230   /// Note: Some values may referenced only part of a buffer, for
0231   ///       example, an array with an offset.  The actual data
0232   ///       visible to this batch will be smaller than the total
0233   ///       buffer size in this case.
0234   int64_t TotalBufferSize() const;
0235 
0236   /// \brief Return the value at the i-th index
0237   template <typename index_type>
0238   inline const Datum& operator[](index_type i) const {
0239     return values[i];
0240   }
0241 
0242   bool Equals(const ExecBatch& other) const;
0243 
0244   /// \brief A convenience for the number of values / arguments.
0245   int num_values() const { return static_cast<int>(values.size()); }
0246 
0247   ExecBatch Slice(int64_t offset, int64_t length) const;
0248 
0249   Result<ExecBatch> SelectValues(const std::vector<int>& ids) const;
0250 
0251   /// \brief A convenience for returning the types from the batch.
0252   std::vector<TypeHolder> GetTypes() const {
0253     std::vector<TypeHolder> result;
0254     for (const auto& value : this->values) {
0255       result.emplace_back(value.type());
0256     }
0257     return result;
0258   }
0259 
0260   std::string ToString() const;
0261 };
0262 
0263 inline bool operator==(const ExecBatch& l, const ExecBatch& r) { return l.Equals(r); }
0264 inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return !l.Equals(r); }
0265 
0266 ARROW_EXPORT void PrintTo(const ExecBatch&, std::ostream*);
0267 
0268 /// @}
0269 
0270 /// \defgroup compute-internals Utilities for calling functions, useful for those
0271 /// extending the function registry
0272 ///
0273 /// @{
0274 
0275 struct ExecValue {
0276   ArraySpan array = {};
0277   const Scalar* scalar = NULLPTR;
0278 
0279   ExecValue(Scalar* scalar)  // NOLINT implicit conversion
0280       : scalar(scalar) {}
0281 
0282   ExecValue(ArraySpan array)  // NOLINT implicit conversion
0283       : array(std::move(array)) {}
0284 
0285   ExecValue(const ArrayData& array) {  // NOLINT implicit conversion
0286     this->array.SetMembers(array);
0287   }
0288 
0289   ExecValue() = default;
0290   ExecValue(const ExecValue& other) = default;
0291   ExecValue& operator=(const ExecValue& other) = default;
0292   ExecValue(ExecValue&& other) = default;
0293   ExecValue& operator=(ExecValue&& other) = default;
0294 
0295   int64_t length() const { return this->is_array() ? this->array.length : 1; }
0296 
0297   bool is_array() const { return this->scalar == NULLPTR; }
0298   bool is_scalar() const { return !this->is_array(); }
0299 
0300   void SetArray(const ArrayData& array) {
0301     this->array.SetMembers(array);
0302     this->scalar = NULLPTR;
0303   }
0304 
0305   void SetScalar(const Scalar* scalar) { this->scalar = scalar; }
0306 
0307   template <typename ExactType>
0308   const ExactType& scalar_as() const {
0309     return ::arrow::internal::checked_cast<const ExactType&>(*this->scalar);
0310   }
0311 
0312   /// XXX: here temporarily for compatibility with datum, see
0313   /// e.g. MakeStructExec in scalar_nested.cc
0314   int64_t null_count() const {
0315     if (this->is_array()) {
0316       return this->array.GetNullCount();
0317     } else {
0318       return this->scalar->is_valid ? 0 : 1;
0319     }
0320   }
0321 
0322   const DataType* type() const {
0323     if (this->is_array()) {
0324       return array.type;
0325     } else {
0326       return scalar->type.get();
0327     }
0328   }
0329 };
0330 
0331 struct ARROW_EXPORT ExecResult {
0332   // The default value of the variant is ArraySpan
0333   std::variant<ArraySpan, std::shared_ptr<ArrayData>> value;
0334 
0335   int64_t length() const {
0336     if (this->is_array_span()) {
0337       return this->array_span()->length;
0338     } else {
0339       return this->array_data()->length;
0340     }
0341   }
0342 
0343   const DataType* type() const {
0344     if (this->is_array_span()) {
0345       return this->array_span()->type;
0346     } else {
0347       return this->array_data()->type.get();
0348     }
0349   }
0350 
0351   const ArraySpan* array_span() const { return &std::get<ArraySpan>(this->value); }
0352   ArraySpan* array_span_mutable() { return &std::get<ArraySpan>(this->value); }
0353 
0354   bool is_array_span() const { return this->value.index() == 0; }
0355 
0356   const std::shared_ptr<ArrayData>& array_data() const {
0357     return std::get<std::shared_ptr<ArrayData>>(this->value);
0358   }
0359   ArrayData* array_data_mutable() {
0360     return std::get<std::shared_ptr<ArrayData>>(this->value).get();
0361   }
0362 
0363   bool is_array_data() const { return this->value.index() == 1; }
0364 };
0365 
0366 /// \brief A "lightweight" column batch object which contains no
0367 /// std::shared_ptr objects and does not have any memory ownership
0368 /// semantics. Can represent a view onto an "owning" ExecBatch.
0369 struct ARROW_EXPORT ExecSpan {
0370   ExecSpan() = default;
0371   ExecSpan(const ExecSpan& other) = default;
0372   ExecSpan& operator=(const ExecSpan& other) = default;
0373   ExecSpan(ExecSpan&& other) = default;
0374   ExecSpan& operator=(ExecSpan&& other) = default;
0375 
0376   explicit ExecSpan(std::vector<ExecValue> values, int64_t length)
0377       : length(length), values(std::move(values)) {}
0378 
0379   explicit ExecSpan(const ExecBatch& batch) {
0380     this->length = batch.length;
0381     this->values.resize(batch.values.size());
0382     for (size_t i = 0; i < batch.values.size(); ++i) {
0383       const Datum& in_value = batch[i];
0384       ExecValue* out_value = &this->values[i];
0385       if (in_value.is_array()) {
0386         out_value->SetArray(*in_value.array());
0387       } else {
0388         out_value->SetScalar(in_value.scalar().get());
0389       }
0390     }
0391   }
0392 
0393   /// \brief Return the value at the i-th index
0394   template <typename index_type>
0395   inline const ExecValue& operator[](index_type i) const {
0396     return values[i];
0397   }
0398 
0399   /// \brief A convenience for the number of values / arguments.
0400   int num_values() const { return static_cast<int>(values.size()); }
0401 
0402   std::vector<TypeHolder> GetTypes() const {
0403     std::vector<TypeHolder> result;
0404     for (const auto& value : this->values) {
0405       result.emplace_back(value.type());
0406     }
0407     return result;
0408   }
0409 
0410   ExecBatch ToExecBatch() const {
0411     ExecBatch result;
0412     result.length = this->length;
0413     for (const ExecValue& value : this->values) {
0414       if (value.is_array()) {
0415         result.values.push_back(value.array.ToArrayData());
0416       } else {
0417         result.values.push_back(value.scalar->GetSharedPtr());
0418       }
0419     }
0420     return result;
0421   }
0422 
0423   int64_t length = 0;
0424   std::vector<ExecValue> values;
0425 };
0426 
0427 /// \defgroup compute-call-function One-shot calls to compute functions
0428 ///
0429 /// @{
0430 
0431 /// \brief One-shot invoker for all types of functions.
0432 ///
0433 /// Does kernel dispatch, argument checking, iteration of ChunkedArray inputs,
0434 /// and wrapping of outputs.
0435 ARROW_EXPORT
0436 Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
0437                            const FunctionOptions* options, ExecContext* ctx = NULLPTR);
0438 
0439 /// \brief Variant of CallFunction which uses a function's default options.
0440 ///
0441 /// NB: Some functions require FunctionOptions be provided.
0442 ARROW_EXPORT
0443 Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
0444                            ExecContext* ctx = NULLPTR);
0445 
0446 /// \brief One-shot invoker for all types of functions.
0447 ///
0448 /// Does kernel dispatch, argument checking, iteration of ChunkedArray inputs,
0449 /// and wrapping of outputs.
0450 ARROW_EXPORT
0451 Result<Datum> CallFunction(const std::string& func_name, const ExecBatch& batch,
0452                            const FunctionOptions* options, ExecContext* ctx = NULLPTR);
0453 
0454 /// \brief Variant of CallFunction which uses a function's default options.
0455 ///
0456 /// NB: Some functions require FunctionOptions be provided.
0457 ARROW_EXPORT
0458 Result<Datum> CallFunction(const std::string& func_name, const ExecBatch& batch,
0459                            ExecContext* ctx = NULLPTR);
0460 
0461 /// @}
0462 
0463 /// \defgroup compute-function-executor One-shot calls to obtain function executors
0464 ///
0465 /// @{
0466 
0467 /// \brief One-shot executor provider for all types of functions.
0468 ///
0469 /// This function creates and initializes a `FunctionExecutor` appropriate
0470 /// for the given function name, input types and function options.
0471 ARROW_EXPORT
0472 Result<std::shared_ptr<FunctionExecutor>> GetFunctionExecutor(
0473     const std::string& func_name, std::vector<TypeHolder> in_types,
0474     const FunctionOptions* options = NULLPTR, FunctionRegistry* func_registry = NULLPTR);
0475 
0476 /// \brief One-shot executor provider for all types of functions.
0477 ///
0478 /// This function creates and initializes a `FunctionExecutor` appropriate
0479 /// for the given function name, input types (taken from the Datum arguments)
0480 /// and function options.
0481 ARROW_EXPORT
0482 Result<std::shared_ptr<FunctionExecutor>> GetFunctionExecutor(
0483     const std::string& func_name, const std::vector<Datum>& args,
0484     const FunctionOptions* options = NULLPTR, FunctionRegistry* func_registry = NULLPTR);
0485 
0486 /// @}
0487 
0488 }  // namespace compute
0489 }  // namespace arrow