Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:52

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cstddef>
0021 #include <cstdint>
0022 #include <functional>
0023 #include <memory>
0024 #include <optional>
0025 #include <string>
0026 #include <utility>
0027 #include <vector>
0028 
0029 #include "arrow/acero/type_fwd.h"
0030 #include "arrow/acero/visibility.h"
0031 #include "arrow/compute/api_vector.h"
0032 #include "arrow/compute/exec.h"
0033 #include "arrow/compute/ordering.h"
0034 #include "arrow/type_fwd.h"
0035 #include "arrow/util/future.h"
0036 #include "arrow/util/macros.h"
0037 #include "arrow/util/tracing.h"
0038 #include "arrow/util/type_fwd.h"
0039 
0040 namespace arrow {
0041 
0042 using compute::ExecBatch;
0043 using compute::ExecContext;
0044 using compute::FunctionRegistry;
0045 using compute::GetFunctionRegistry;
0046 using compute::Ordering;
0047 using compute::threaded_exec_context;
0048 
0049 namespace acero {
0050 
0051 /// \addtogroup acero-internals
0052 /// @{
0053 
0054 class ARROW_ACERO_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
0055  public:
0056   // This allows operators to rely on signed 16-bit indices
0057   static const uint32_t kMaxBatchSize = 1 << 15;
0058   using NodeVector = std::vector<ExecNode*>;
0059 
0060   virtual ~ExecPlan() = default;
0061 
0062   QueryContext* query_context();
0063 
0064   /// \brief retrieve the nodes in the plan
0065   const NodeVector& nodes() const;
0066 
0067   /// Make an empty exec plan
0068   static Result<std::shared_ptr<ExecPlan>> Make(
0069       QueryOptions options, ExecContext exec_context = *threaded_exec_context(),
0070       std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR);
0071 
0072   static Result<std::shared_ptr<ExecPlan>> Make(
0073       ExecContext exec_context = *threaded_exec_context(),
0074       std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR);
0075 
0076   static Result<std::shared_ptr<ExecPlan>> Make(
0077       QueryOptions options, ExecContext* exec_context,
0078       std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR);
0079 
0080   static Result<std::shared_ptr<ExecPlan>> Make(
0081       ExecContext* exec_context,
0082       std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR);
0083 
0084   ExecNode* AddNode(std::unique_ptr<ExecNode> node);
0085 
0086   template <typename Node, typename... Args>
0087   Node* EmplaceNode(Args&&... args) {
0088     std::unique_ptr<Node> node{new Node{std::forward<Args>(args)...}};
0089     auto out = node.get();
0090     AddNode(std::move(node));
0091     return out;
0092   }
0093 
0094   Status Validate();
0095 
0096   /// \brief Start producing on all nodes
0097   ///
0098   /// Nodes are started in reverse topological order, such that any node
0099   /// is started before all of its inputs.
0100   void StartProducing();
0101 
0102   /// \brief Stop producing on all nodes
0103   ///
0104   /// Triggers all sources to stop producing new data.  In order to cleanly stop the plan
0105   /// will continue to run any tasks that are already in progress.  The caller should
0106   /// still wait for `finished` to complete before destroying the plan.
0107   void StopProducing();
0108 
0109   /// \brief A future which will be marked finished when all tasks have finished.
0110   Future<> finished();
0111 
0112   /// \brief Return whether the plan has non-empty metadata
0113   bool HasMetadata() const;
0114 
0115   /// \brief Return the plan's attached metadata
0116   std::shared_ptr<const KeyValueMetadata> metadata() const;
0117 
0118   std::string ToString() const;
0119 };
0120 
0121 // Acero can be extended by providing custom implementations of ExecNode.  The methods
0122 // below are documented in detail and provide careful instruction on how to fulfill the
0123 // ExecNode contract.  It's suggested you familiarize yourself with the Acero
0124 // documentation in the C++ user guide.
0125 class ARROW_ACERO_EXPORT ExecNode {
0126  public:
0127   using NodeVector = std::vector<ExecNode*>;
0128 
0129   virtual ~ExecNode() = default;
0130 
0131   virtual const char* kind_name() const = 0;
0132 
0133   // The number of inputs expected by this node
0134   int num_inputs() const { return static_cast<int>(inputs_.size()); }
0135 
0136   /// This node's predecessors in the exec plan
0137   const NodeVector& inputs() const { return inputs_; }
0138 
0139   /// True if the plan has no output schema (is a sink)
0140   bool is_sink() const { return !output_schema_; }
0141 
0142   /// \brief Labels identifying the function of each input.
0143   const std::vector<std::string>& input_labels() const { return input_labels_; }
0144 
0145   /// This node's successor in the exec plan
0146   const ExecNode* output() const { return output_; }
0147 
0148   /// The datatypes for batches produced by this node
0149   const std::shared_ptr<Schema>& output_schema() const { return output_schema_; }
0150 
0151   /// This node's exec plan
0152   ExecPlan* plan() { return plan_; }
0153 
0154   /// \brief An optional label, for display and debugging
0155   ///
0156   /// There is no guarantee that this value is non-empty or unique.
0157   const std::string& label() const { return label_; }
0158   void SetLabel(std::string label) { label_ = std::move(label); }
0159 
0160   virtual Status Validate() const;
0161 
0162   /// \brief the ordering of the output batches
0163   ///
0164   /// This does not guarantee the batches will be emitted by this node
0165   /// in order.  Instead it guarantees that the batches will have their
0166   /// ExecBatch::index property set in a way that respects this ordering.
0167   ///
0168   /// In other words, given the ordering {{"x", SortOrder::Ascending}} we
0169   /// know that all values of x in a batch with index N will be less than
0170   /// or equal to all values of x in a batch with index N+k (assuming k > 0).
0171   /// Furthermore, we also know that values will be sorted within a batch.
0172   /// Any row N will have a value of x that is less than the value for
0173   /// any row N+k.
0174   ///
0175   /// Note that an ordering can be both Ordering::Unordered and Ordering::Implicit.
0176   /// A node's output should be marked Ordering::Unordered if the order is
0177   /// non-deterministic.  For example, a hash-join has no predictable output order.
0178   ///
0179   /// If the ordering is Ordering::Implicit then there is a meaningful order but that
0180   /// ordering is not represented by any column in the data.  The most common case for
0181   /// this is when reading data from an in-memory table.  The data has an implicit "row
0182   /// order" which is not necessarily represented in the data set.
0183   ///
0184   /// A filter or project node will not modify the ordering.  Nothing needs to be done
0185   /// other than ensure the index assigned to output batches is the same as the
0186   /// input batch that was mapped.
0187   ///
0188   /// Other nodes may introduce order.  For example, an order-by node will emit
0189   /// a brand new ordering independent of the input ordering.
0190   ///
0191   /// Finally, as described above, such as a hash-join or aggregation may may
0192   /// destroy ordering (although these nodes could also choose to establish a
0193   /// new ordering based on the hash keys).
0194   ///
0195   /// Some nodes will require an ordering.  For example, a fetch node or an
0196   /// asof join node will only function if the input data is ordered (for fetch
0197   /// it is enough to be implicitly ordered.  For an asof join the ordering must
0198   /// be explicit and compatible with the on key.)
0199   ///
0200   /// Nodes that maintain ordering should be careful to avoid introducing gaps
0201   /// in the batch index.  This may require emitting empty batches in order to
0202   /// maintain continuity.
0203   virtual const Ordering& ordering() const;
0204 
0205   /// Upstream API:
0206   /// These functions are called by input nodes that want to inform this node
0207   /// about an updated condition (a new input batch or an impending
0208   /// end of stream).
0209   ///
0210   /// Implementation rules:
0211   /// - these may be called anytime after StartProducing() has succeeded
0212   ///   (and even during or after StopProducing())
0213   /// - these may be called concurrently
0214   /// - these are allowed to call back into PauseProducing(), ResumeProducing()
0215   ///   and StopProducing()
0216 
0217   /// Transfer input batch to ExecNode
0218   ///
0219   /// A node will typically perform some kind of operation on the batch
0220   /// and then call InputReceived on its outputs with the result.
0221   ///
0222   /// Other nodes may need to accumulate some number of inputs before any
0223   /// output can be produced.  These nodes will add the batch to some kind
0224   /// of in-memory accumulation queue and return.
0225   virtual Status InputReceived(ExecNode* input, ExecBatch batch) = 0;
0226 
0227   /// Mark the inputs finished after the given number of batches.
0228   ///
0229   /// This may be called before all inputs are received.  This simply fixes
0230   /// the total number of incoming batches for an input, so that the ExecNode
0231   /// knows when it has received all input, regardless of order.
0232   virtual Status InputFinished(ExecNode* input, int total_batches) = 0;
0233 
0234   /// \brief Perform any needed initialization
0235   ///
0236   /// This hook performs any actions in between creation of ExecPlan and the call to
0237   /// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes
0238   /// that executes this method is undefined, but the calls are made synchronously.
0239   ///
0240   /// At this point a node can rely on all inputs & outputs (and the input schemas)
0241   /// being well defined.
0242   virtual Status Init();
0243 
0244   /// Lifecycle API:
0245   /// - start / stop to initiate and terminate production
0246   /// - pause / resume to apply backpressure
0247   ///
0248   /// Implementation rules:
0249   /// - StartProducing() should not recurse into the inputs, as it is
0250   ///   handled by ExecPlan::StartProducing()
0251   /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
0252   ///   concurrently, potentially even before the call to StartProducing
0253   ///   has finished.
0254   /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
0255   ///   by the downstream nodes' InputReceived(), InputFinished() methods
0256   ///
0257   /// StopProducing may be called due to an error, by the user (e.g. cancel), or
0258   /// because a node has all the data it needs (e.g. limit, top-k on sorted data).
0259   /// This means the method may be called multiple times and we have the following
0260   /// additional rules
0261   /// - StopProducing() must be idempotent
0262   /// - StopProducing() must be forwarded to inputs (this is needed for the limit/top-k
0263   ///     case because we may not be stopping the entire plan)
0264 
0265   // Right now, since synchronous calls happen in both directions (input to
0266   // output and then output to input), a node must be careful to be reentrant
0267   // against synchronous calls from its output, *and* also concurrent calls from
0268   // other threads.  The most reliable solution is to update the internal state
0269   // first, and notify outputs only at the end.
0270   //
0271   // Concurrent calls to PauseProducing and ResumeProducing can be hard to sequence
0272   // as they may travel at different speeds through the plan.
0273   //
0274   // For example, consider a resume that comes quickly after a pause.  If the source
0275   // receives the resume before the pause the source may think the destination is full
0276   // and halt production which would lead to deadlock.
0277   //
0278   // To resolve this a counter is sent for all calls to pause/resume.  Only the call with
0279   // the highest counter value is valid.  So if a call to PauseProducing(5) comes after
0280   // a call to ResumeProducing(6) then the source should continue producing.
0281 
0282   /// \brief Start producing
0283   ///
0284   /// This must only be called once.
0285   ///
0286   /// This is typically called automatically by ExecPlan::StartProducing().
0287   virtual Status StartProducing() = 0;
0288 
0289   /// \brief Pause producing temporarily
0290   ///
0291   /// \param output Pointer to the output that is full
0292   /// \param counter Counter used to sequence calls to pause/resume
0293   ///
0294   /// This call is a hint that an output node is currently not willing
0295   /// to receive data.
0296   ///
0297   /// This may be called any number of times.
0298   /// However, the node is still free to produce data (which may be difficult
0299   /// to prevent anyway if data is produced using multiple threads).
0300   virtual void PauseProducing(ExecNode* output, int32_t counter) = 0;
0301 
0302   /// \brief Resume producing after a temporary pause
0303   ///
0304   /// \param output Pointer to the output that is now free
0305   /// \param counter Counter used to sequence calls to pause/resume
0306   ///
0307   /// This call is a hint that an output node is willing to receive data again.
0308   ///
0309   /// This may be called any number of times.
0310   virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0;
0311 
0312   /// \brief Stop producing new data
0313   ///
0314   /// If this node is a source then the source should stop generating data
0315   /// as quickly as possible.  If this node is not a source then there is typically
0316   /// nothing that needs to be done although a node may choose to start ignoring incoming
0317   /// data.
0318   ///
0319   /// This method will be called when an error occurs in the plan
0320   /// This method may also be called by the user if they wish to end a plan early
0321   /// Finally, this method may be called if a node determines it no longer needs any more
0322   /// input (for example, a limit node).
0323   ///
0324   /// This method may be called multiple times.
0325   ///
0326   /// This is not a pause.  There will be no way to start the source again after this has
0327   /// been called.
0328   virtual Status StopProducing();
0329 
0330   std::string ToString(int indent = 0) const;
0331 
0332  protected:
0333   ExecNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels,
0334            std::shared_ptr<Schema> output_schema);
0335 
0336   virtual Status StopProducingImpl() = 0;
0337 
0338   /// Provide extra info to include in the string representation.
0339   virtual std::string ToStringExtra(int indent = 0) const;
0340 
0341   std::atomic<bool> stopped_;
0342   ExecPlan* plan_;
0343   std::string label_;
0344 
0345   NodeVector inputs_;
0346   std::vector<std::string> input_labels_;
0347 
0348   std::shared_ptr<Schema> output_schema_;
0349   ExecNode* output_ = NULLPTR;
0350 };
0351 
0352 /// \brief An extensible registry for factories of ExecNodes
0353 class ARROW_ACERO_EXPORT ExecFactoryRegistry {
0354  public:
0355   using Factory = std::function<Result<ExecNode*>(ExecPlan*, std::vector<ExecNode*>,
0356                                                   const ExecNodeOptions&)>;
0357 
0358   virtual ~ExecFactoryRegistry() = default;
0359 
0360   /// \brief Get the named factory from this registry
0361   ///
0362   /// will raise if factory_name is not found
0363   virtual Result<Factory> GetFactory(const std::string& factory_name) = 0;
0364 
0365   /// \brief Add a factory to this registry with the provided name
0366   ///
0367   /// will raise if factory_name is already in the registry
0368   virtual Status AddFactory(std::string factory_name, Factory factory) = 0;
0369 };
0370 
0371 /// The default registry, which includes built-in factories.
0372 ARROW_ACERO_EXPORT
0373 ExecFactoryRegistry* default_exec_factory_registry();
0374 
0375 /// \brief Construct an ExecNode using the named factory
0376 inline Result<ExecNode*> MakeExecNode(
0377     const std::string& factory_name, ExecPlan* plan, std::vector<ExecNode*> inputs,
0378     const ExecNodeOptions& options,
0379     ExecFactoryRegistry* registry = default_exec_factory_registry()) {
0380   ARROW_ASSIGN_OR_RAISE(auto factory, registry->GetFactory(factory_name));
0381   return factory(plan, std::move(inputs), options);
0382 }
0383 
0384 /// @}
0385 
0386 /// \addtogroup acero-api
0387 /// @{
0388 
0389 /// \brief Helper class for declaring execution nodes
0390 ///
0391 /// A Declaration represents an unconstructed ExecNode (and potentially an entire graph
0392 /// since its inputs may also be Declarations)
0393 ///
0394 /// A Declaration can be converted to a plan and executed using one of the
0395 /// DeclarationToXyz methods.
0396 ///
0397 /// For more direct control, a Declaration can be added to an existing execution
0398 /// plan with Declaration::AddToPlan, which will recursively construct any inputs as
0399 /// necessary.
0400 struct ARROW_ACERO_EXPORT Declaration {
0401   using Input = std::variant<ExecNode*, Declaration>;
0402 
0403   Declaration() {}
0404 
0405   /// \brief construct a declaration
0406   /// \param factory_name the name of the exec node to construct.  The node must have
0407   ///                     been added to the exec node registry with this name.
0408   /// \param inputs the inputs to the node, these should be other declarations
0409   /// \param options options that control the behavior of the node.  You must use
0410   ///                the appropriate subclass.  For example, if `factory_name` is
0411   ///                "project" then `options` should be ProjectNodeOptions.
0412   /// \param label a label to give the node.  Can be used to distinguish it from other
0413   ///              nodes of the same type in the plan.
0414   Declaration(std::string factory_name, std::vector<Input> inputs,
0415               std::shared_ptr<ExecNodeOptions> options, std::string label)
0416       : factory_name{std::move(factory_name)},
0417         inputs{std::move(inputs)},
0418         options{std::move(options)},
0419         label{std::move(label)} {}
0420 
0421   template <typename Options>
0422   Declaration(std::string factory_name, std::vector<Input> inputs, Options options,
0423               std::string label)
0424       : Declaration{std::move(factory_name), std::move(inputs),
0425                     std::shared_ptr<ExecNodeOptions>(
0426                         std::make_shared<Options>(std::move(options))),
0427                     std::move(label)} {}
0428 
0429   template <typename Options>
0430   Declaration(std::string factory_name, std::vector<Input> inputs, Options options)
0431       : Declaration{std::move(factory_name), std::move(inputs), std::move(options),
0432                     /*label=*/""} {}
0433 
0434   template <typename Options>
0435   Declaration(std::string factory_name, Options options)
0436       : Declaration{std::move(factory_name), {}, std::move(options), /*label=*/""} {}
0437 
0438   template <typename Options>
0439   Declaration(std::string factory_name, Options options, std::string label)
0440       : Declaration{std::move(factory_name), {}, std::move(options), std::move(label)} {}
0441 
0442   /// \brief Convenience factory for the common case of a simple sequence of nodes.
0443   ///
0444   /// Each of decls will be appended to the inputs of the subsequent declaration,
0445   /// and the final modified declaration will be returned.
0446   ///
0447   /// Without this convenience factory, constructing a sequence would require explicit,
0448   /// difficult-to-read nesting:
0449   ///
0450   ///     Declaration{"n3",
0451   ///                   {
0452   ///                       Declaration{"n2",
0453   ///                                   {
0454   ///                                       Declaration{"n1",
0455   ///                                                   {
0456   ///                                                       Declaration{"n0", N0Opts{}},
0457   ///                                                   },
0458   ///                                                   N1Opts{}},
0459   ///                                   },
0460   ///                                   N2Opts{}},
0461   ///                   },
0462   ///                   N3Opts{}};
0463   ///
0464   /// An equivalent Declaration can be constructed more tersely using Sequence:
0465   ///
0466   ///     Declaration::Sequence({
0467   ///         {"n0", N0Opts{}},
0468   ///         {"n1", N1Opts{}},
0469   ///         {"n2", N2Opts{}},
0470   ///         {"n3", N3Opts{}},
0471   ///     });
0472   static Declaration Sequence(std::vector<Declaration> decls);
0473 
0474   /// \brief add the declaration to an already created execution plan
0475   /// \param plan the plan to add the node to
0476   /// \param registry the registry to use to lookup the node factory
0477   ///
0478   /// This method will recursively call AddToPlan on all of the declaration's inputs.
0479   /// This method is only for advanced use when the DeclarationToXyz methods are not
0480   /// sufficient.
0481   ///
0482   /// \return the instantiated execution node
0483   Result<ExecNode*> AddToPlan(ExecPlan* plan, ExecFactoryRegistry* registry =
0484                                                   default_exec_factory_registry()) const;
0485 
0486   // Validate a declaration
0487   bool IsValid(ExecFactoryRegistry* registry = default_exec_factory_registry()) const;
0488 
0489   /// \brief the name of the factory to use when creating a node
0490   std::string factory_name;
0491   /// \brief the declarations's inputs
0492   std::vector<Input> inputs;
0493   /// \brief options to control the behavior of the node
0494   std::shared_ptr<ExecNodeOptions> options;
0495   /// \brief a label to give the node in the plan
0496   std::string label;
0497 };
0498 
0499 /// \brief How to handle unaligned buffers
0500 enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kError };
0501 
0502 /// \brief get the default behavior of unaligned buffer handling
0503 ///
0504 /// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which
0505 /// can be set to "warn", "ignore", "reallocate", or "error".  If the environment
0506 /// variable is not set, or is set to an invalid value, this will return kWarn
0507 UnalignedBufferHandling GetDefaultUnalignedBufferHandling();
0508 
0509 /// \brief plan-wide options that can be specified when executing an execution plan
0510 struct ARROW_ACERO_EXPORT QueryOptions {
0511   /// \brief Should the plan use a legacy batching strategy
0512   ///
0513   /// This is currently in place only to support the Scanner::ToTable
0514   /// method.  This method relies on batch indices from the scanner
0515   /// remaining consistent.  This is impractical in the ExecPlan which
0516   /// might slice batches as needed (e.g. for a join)
0517   ///
0518   /// However, it still works for simple plans and this is the only way
0519   /// we have at the moment for maintaining implicit order.
0520   bool use_legacy_batching = false;
0521 
0522   /// If the output has a meaningful order then sequence the output of the plan
0523   ///
0524   /// The default behavior (std::nullopt) will sequence output batches if there
0525   /// is a meaningful ordering in the final node and will emit batches immediately
0526   /// otherwise.
0527   ///
0528   /// If explicitly set to true then plan execution will fail if there is no
0529   /// meaningful ordering.  This can be useful to validate a query that should
0530   /// be emitting ordered results.
0531   ///
0532   /// If explicitly set to false then batches will be emit immediately even if there
0533   /// is a meaningful ordering.  This could cause batches to be emit out of order but
0534   /// may offer a small decrease to latency.
0535   std::optional<bool> sequence_output = std::nullopt;
0536 
0537   /// \brief should the plan use multiple background threads for CPU-intensive work
0538   ///
0539   /// If this is false then all CPU work will be done on the calling thread.  I/O tasks
0540   /// will still happen on the I/O executor and may be multi-threaded (but should not use
0541   /// significant CPU resources).
0542   ///
0543   /// Will be ignored if custom_cpu_executor is set
0544   bool use_threads = true;
0545 
0546   /// \brief custom executor to use for CPU-intensive work
0547   ///
0548   /// Must be null or remain valid for the duration of the plan.  If this is null then
0549   /// a default thread pool will be chosen whose behavior will be controlled by
0550   /// the `use_threads` option.
0551   ::arrow::internal::Executor* custom_cpu_executor = NULLPTR;
0552 
0553   /// \brief custom executor to use for IO work
0554   ///
0555   /// Must be null or remain valid for the duration of the plan.  If this is null then
0556   /// the global io thread pool will be chosen whose behavior will be controlled by
0557   /// the "ARROW_IO_THREADS" environment.
0558   ::arrow::internal::Executor* custom_io_executor = NULLPTR;
0559 
0560   /// \brief a memory pool to use for allocations
0561   ///
0562   /// Must remain valid for the duration of the plan.
0563   MemoryPool* memory_pool = default_memory_pool();
0564 
0565   /// \brief a function registry to use for the plan
0566   ///
0567   /// Must remain valid for the duration of the plan.
0568   FunctionRegistry* function_registry = GetFunctionRegistry();
0569   /// \brief the names of the output columns
0570   ///
0571   /// If this is empty then names will be generated based on the input columns
0572   ///
0573   /// If set then the number of names must equal the number of output columns
0574   std::vector<std::string> field_names;
0575 
0576   /// \brief Policy for unaligned buffers in source data
0577   ///
0578   /// Various compute functions and acero internals will type pun array
0579   /// buffers from uint8_t* to some kind of value type (e.g. we might
0580   /// cast to int32_t* to add two int32 arrays)
0581   ///
0582   /// If the buffer is poorly aligned (e.g. an int32 array is not aligned
0583   /// on a 4-byte boundary) then this is technically undefined behavior in C++.
0584   /// However, most modern compilers and CPUs are fairly tolerant of this
0585   /// behavior and nothing bad (beyond a small hit to performance) is likely
0586   /// to happen.
0587   ///
0588   /// Note that this only applies to source buffers.  All buffers allocated internally
0589   /// by Acero will be suitably aligned.
0590   ///
0591   /// If this field is set to kWarn then Acero will check if any buffers are unaligned
0592   /// and, if they are, will emit a warning.
0593   ///
0594   /// If this field is set to kReallocate then Acero will allocate a new, suitably aligned
0595   /// buffer and copy the contents from the old buffer into this new buffer.
0596   ///
0597   /// If this field is set to kError then Acero will gracefully abort the plan instead.
0598   ///
0599   /// If this field is set to kIgnore then Acero will not even check if the buffers are
0600   /// unaligned.
0601   ///
0602   /// If this field is not set then it will be treated as kWarn unless overridden
0603   /// by the ACERO_ALIGNMENT_HANDLING environment variable
0604   std::optional<UnalignedBufferHandling> unaligned_buffer_handling;
0605 };
0606 
0607 /// \brief Calculate the output schema of a declaration
0608 ///
0609 /// This does not actually execute the plan.  This operation may fail if the
0610 /// declaration represents an invalid plan (e.g. a project node with multiple inputs)
0611 ///
0612 /// \param declaration A declaration describing an execution plan
0613 /// \param function_registry The function registry to use for function execution.  If null
0614 ///                          then the default function registry will be used.
0615 ///
0616 /// \return the schema that batches would have after going through the execution plan
0617 ARROW_ACERO_EXPORT Result<std::shared_ptr<Schema>> DeclarationToSchema(
0618     const Declaration& declaration, FunctionRegistry* function_registry = NULLPTR);
0619 
0620 /// \brief Create a string representation of a plan
0621 ///
0622 /// This representation is for debug purposes only.
0623 ///
0624 /// Conversion to a string may fail if the declaration represents an
0625 /// invalid plan.
0626 ///
0627 /// Use Substrait for complete serialization of plans
0628 ///
0629 /// \param declaration A declaration describing an execution plan
0630 /// \param function_registry The function registry to use for function execution.  If null
0631 ///                          then the default function registry will be used.
0632 ///
0633 /// \return a string representation of the plan suitable for debugging output
0634 ARROW_ACERO_EXPORT Result<std::string> DeclarationToString(
0635     const Declaration& declaration, FunctionRegistry* function_registry = NULLPTR);
0636 
0637 /// \brief Utility method to run a declaration and collect the results into a table
0638 ///
0639 /// \param declaration A declaration describing the plan to run
0640 /// \param use_threads If `use_threads` is false then all CPU work will be done on the
0641 ///                    calling thread.  I/O tasks will still happen on the I/O executor
0642 ///                    and may be multi-threaded (but should not use significant CPU
0643 ///                    resources).
0644 /// \param memory_pool The memory pool to use for allocations made while running the plan.
0645 /// \param function_registry The function registry to use for function execution.  If null
0646 ///                          then the default function registry will be used.
0647 ///
0648 /// This method will add a sink node to the declaration to collect results into a
0649 /// table.  It will then create an ExecPlan from the declaration, start the exec plan,
0650 /// block until the plan has finished, and return the created table.
0651 ARROW_ACERO_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
0652     Declaration declaration, bool use_threads = true,
0653     MemoryPool* memory_pool = default_memory_pool(),
0654     FunctionRegistry* function_registry = NULLPTR);
0655 
0656 ARROW_ACERO_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable(
0657     Declaration declaration, QueryOptions query_options);
0658 
0659 /// \brief Asynchronous version of \see DeclarationToTable
0660 ///
0661 /// \param declaration A declaration describing the plan to run
0662 /// \param use_threads The behavior of use_threads is slightly different than the
0663 ///                    synchronous version since we cannot run synchronously on the
0664 ///                    calling thread. Instead, if use_threads=false then a new thread
0665 ///                    pool will be created with a single thread and this will be used for
0666 ///                    all compute work.
0667 /// \param memory_pool The memory pool to use for allocations made while running the plan.
0668 /// \param function_registry The function registry to use for function execution. If null
0669 ///                          then the default function registry will be used.
0670 ARROW_ACERO_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
0671     Declaration declaration, bool use_threads = true,
0672     MemoryPool* memory_pool = default_memory_pool(),
0673     FunctionRegistry* function_registry = NULLPTR);
0674 
0675 /// \brief Overload of \see DeclarationToTableAsync accepting a custom exec context
0676 ///
0677 /// The executor must be specified (cannot be null) and must be kept alive until the
0678 /// returned future finishes.
0679 ARROW_ACERO_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync(
0680     Declaration declaration, ExecContext custom_exec_context);
0681 
0682 /// \brief a collection of exec batches with a common schema
0683 struct BatchesWithCommonSchema {
0684   std::vector<ExecBatch> batches;
0685   std::shared_ptr<Schema> schema;
0686 };
0687 
0688 /// \brief Utility method to run a declaration and collect the results into ExecBatch
0689 /// vector
0690 ///
0691 /// \see DeclarationToTable for details on threading & execution
0692 ARROW_ACERO_EXPORT Result<BatchesWithCommonSchema> DeclarationToExecBatches(
0693     Declaration declaration, bool use_threads = true,
0694     MemoryPool* memory_pool = default_memory_pool(),
0695     FunctionRegistry* function_registry = NULLPTR);
0696 
0697 ARROW_ACERO_EXPORT Result<BatchesWithCommonSchema> DeclarationToExecBatches(
0698     Declaration declaration, QueryOptions query_options);
0699 
0700 /// \brief Asynchronous version of \see DeclarationToExecBatches
0701 ///
0702 /// \see DeclarationToTableAsync for details on threading & execution
0703 ARROW_ACERO_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
0704     Declaration declaration, bool use_threads = true,
0705     MemoryPool* memory_pool = default_memory_pool(),
0706     FunctionRegistry* function_registry = NULLPTR);
0707 
0708 /// \brief Overload of \see DeclarationToExecBatchesAsync accepting a custom exec context
0709 ///
0710 /// \see DeclarationToTableAsync for details on threading & execution
0711 ARROW_ACERO_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync(
0712     Declaration declaration, ExecContext custom_exec_context);
0713 
0714 /// \brief Utility method to run a declaration and collect the results into a vector
0715 ///
0716 /// \see DeclarationToTable for details on threading & execution
0717 ARROW_ACERO_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
0718     Declaration declaration, bool use_threads = true,
0719     MemoryPool* memory_pool = default_memory_pool(),
0720     FunctionRegistry* function_registry = NULLPTR);
0721 
0722 ARROW_ACERO_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches(
0723     Declaration declaration, QueryOptions query_options);
0724 
0725 /// \brief Asynchronous version of \see DeclarationToBatches
0726 ///
0727 /// \see DeclarationToTableAsync for details on threading & execution
0728 ARROW_ACERO_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>>
0729 DeclarationToBatchesAsync(Declaration declaration, bool use_threads = true,
0730                           MemoryPool* memory_pool = default_memory_pool(),
0731                           FunctionRegistry* function_registry = NULLPTR);
0732 
0733 /// \brief Overload of \see DeclarationToBatchesAsync accepting a custom exec context
0734 ///
0735 /// \see DeclarationToTableAsync for details on threading & execution
0736 ARROW_ACERO_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>>
0737 DeclarationToBatchesAsync(Declaration declaration, ExecContext exec_context);
0738 
0739 /// \brief Utility method to run a declaration and return results as a RecordBatchReader
0740 ///
0741 /// If an exec context is not provided then a default exec context will be used based
0742 /// on the value of `use_threads`.  If `use_threads` is false then the CPU executor will
0743 /// be a serial executor and all CPU work will be done on the calling thread.  I/O tasks
0744 /// will still happen on the I/O executor and may be multi-threaded.
0745 ///
0746 /// If `use_threads` is false then all CPU work will happen during the calls to
0747 /// RecordBatchReader::Next and no CPU work will happen in the background.  If
0748 /// `use_threads` is true then CPU work will happen on the CPU thread pool and tasks may
0749 /// run in between calls to RecordBatchReader::Next.  If the returned reader is not
0750 /// consumed quickly enough then the plan will eventually pause as the backpressure queue
0751 /// fills up.
0752 ///
0753 /// If a custom exec context is provided then the value of `use_threads` will be ignored.
0754 ///
0755 /// The returned RecordBatchReader can be closed early to cancel the computation of record
0756 /// batches. In this case, only errors encountered by the computation may be reported. In
0757 /// particular, no cancellation error may be reported.
0758 ARROW_ACERO_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
0759     Declaration declaration, bool use_threads = true,
0760     MemoryPool* memory_pool = default_memory_pool(),
0761     FunctionRegistry* function_registry = NULLPTR);
0762 
0763 ARROW_ACERO_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(
0764     Declaration declaration, QueryOptions query_options);
0765 
0766 /// \brief Utility method to run a declaration and ignore results
0767 ///
0768 /// This can be useful when the data are consumed as part of the plan itself, for
0769 /// example, when the plan ends with a write node.
0770 ///
0771 /// \see DeclarationToTable for details on threading & execution
0772 ARROW_ACERO_EXPORT Status
0773 DeclarationToStatus(Declaration declaration, bool use_threads = true,
0774                     MemoryPool* memory_pool = default_memory_pool(),
0775                     FunctionRegistry* function_registry = NULLPTR);
0776 
0777 ARROW_ACERO_EXPORT Status DeclarationToStatus(Declaration declaration,
0778                                               QueryOptions query_options);
0779 
0780 /// \brief Asynchronous version of \see DeclarationToStatus
0781 ///
0782 /// This can be useful when the data are consumed as part of the plan itself, for
0783 /// example, when the plan ends with a write node.
0784 ///
0785 /// \see DeclarationToTableAsync for details on threading & execution
0786 ARROW_ACERO_EXPORT Future<> DeclarationToStatusAsync(
0787     Declaration declaration, bool use_threads = true,
0788     MemoryPool* memory_pool = default_memory_pool(),
0789     FunctionRegistry* function_registry = NULLPTR);
0790 
0791 /// \brief Overload of \see DeclarationToStatusAsync accepting a custom exec context
0792 ///
0793 /// \see DeclarationToTableAsync for details on threading & execution
0794 ARROW_ACERO_EXPORT Future<> DeclarationToStatusAsync(Declaration declaration,
0795                                                      ExecContext exec_context);
0796 
0797 /// @}
0798 
0799 /// \brief Wrap an ExecBatch generator in a RecordBatchReader.
0800 ///
0801 /// The RecordBatchReader does not impose any ordering on emitted batches.
0802 ARROW_ACERO_EXPORT
0803 std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
0804     std::shared_ptr<Schema>, std::function<Future<std::optional<ExecBatch>>()>,
0805     MemoryPool*);
0806 
0807 constexpr int kDefaultBackgroundMaxQ = 32;
0808 constexpr int kDefaultBackgroundQRestart = 16;
0809 
0810 /// \brief Make a generator of RecordBatchReaders
0811 ///
0812 /// Useful as a source node for an Exec plan
0813 ARROW_ACERO_EXPORT
0814 Result<std::function<Future<std::optional<ExecBatch>>()>> MakeReaderGenerator(
0815     std::shared_ptr<RecordBatchReader> reader, arrow::internal::Executor* io_executor,
0816     int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart);
0817 
0818 }  // namespace acero
0819 }  // namespace arrow