![]() |
|
|||
File indexing completed on 2025-08-28 08:26:52
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <cstddef> 0021 #include <cstdint> 0022 #include <functional> 0023 #include <memory> 0024 #include <optional> 0025 #include <string> 0026 #include <utility> 0027 #include <vector> 0028 0029 #include "arrow/acero/type_fwd.h" 0030 #include "arrow/acero/visibility.h" 0031 #include "arrow/compute/api_vector.h" 0032 #include "arrow/compute/exec.h" 0033 #include "arrow/compute/ordering.h" 0034 #include "arrow/type_fwd.h" 0035 #include "arrow/util/future.h" 0036 #include "arrow/util/macros.h" 0037 #include "arrow/util/tracing.h" 0038 #include "arrow/util/type_fwd.h" 0039 0040 namespace arrow { 0041 0042 using compute::ExecBatch; 0043 using compute::ExecContext; 0044 using compute::FunctionRegistry; 0045 using compute::GetFunctionRegistry; 0046 using compute::Ordering; 0047 using compute::threaded_exec_context; 0048 0049 namespace acero { 0050 0051 /// \addtogroup acero-internals 0052 /// @{ 0053 0054 class ARROW_ACERO_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> { 0055 public: 0056 // This allows operators to rely on signed 16-bit indices 0057 static const uint32_t kMaxBatchSize = 1 << 15; 0058 using NodeVector = std::vector<ExecNode*>; 0059 0060 virtual ~ExecPlan() = default; 0061 0062 QueryContext* query_context(); 0063 0064 /// \brief retrieve the nodes in the plan 0065 const NodeVector& nodes() const; 0066 0067 /// Make an empty exec plan 0068 static Result<std::shared_ptr<ExecPlan>> Make( 0069 QueryOptions options, ExecContext exec_context = *threaded_exec_context(), 0070 std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR); 0071 0072 static Result<std::shared_ptr<ExecPlan>> Make( 0073 ExecContext exec_context = *threaded_exec_context(), 0074 std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR); 0075 0076 static Result<std::shared_ptr<ExecPlan>> Make( 0077 QueryOptions options, ExecContext* exec_context, 0078 std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR); 0079 0080 static Result<std::shared_ptr<ExecPlan>> Make( 0081 ExecContext* exec_context, 0082 std::shared_ptr<const KeyValueMetadata> metadata = NULLPTR); 0083 0084 ExecNode* AddNode(std::unique_ptr<ExecNode> node); 0085 0086 template <typename Node, typename... Args> 0087 Node* EmplaceNode(Args&&... args) { 0088 std::unique_ptr<Node> node{new Node{std::forward<Args>(args)...}}; 0089 auto out = node.get(); 0090 AddNode(std::move(node)); 0091 return out; 0092 } 0093 0094 Status Validate(); 0095 0096 /// \brief Start producing on all nodes 0097 /// 0098 /// Nodes are started in reverse topological order, such that any node 0099 /// is started before all of its inputs. 0100 void StartProducing(); 0101 0102 /// \brief Stop producing on all nodes 0103 /// 0104 /// Triggers all sources to stop producing new data. In order to cleanly stop the plan 0105 /// will continue to run any tasks that are already in progress. The caller should 0106 /// still wait for `finished` to complete before destroying the plan. 0107 void StopProducing(); 0108 0109 /// \brief A future which will be marked finished when all tasks have finished. 0110 Future<> finished(); 0111 0112 /// \brief Return whether the plan has non-empty metadata 0113 bool HasMetadata() const; 0114 0115 /// \brief Return the plan's attached metadata 0116 std::shared_ptr<const KeyValueMetadata> metadata() const; 0117 0118 std::string ToString() const; 0119 }; 0120 0121 // Acero can be extended by providing custom implementations of ExecNode. The methods 0122 // below are documented in detail and provide careful instruction on how to fulfill the 0123 // ExecNode contract. It's suggested you familiarize yourself with the Acero 0124 // documentation in the C++ user guide. 0125 class ARROW_ACERO_EXPORT ExecNode { 0126 public: 0127 using NodeVector = std::vector<ExecNode*>; 0128 0129 virtual ~ExecNode() = default; 0130 0131 virtual const char* kind_name() const = 0; 0132 0133 // The number of inputs expected by this node 0134 int num_inputs() const { return static_cast<int>(inputs_.size()); } 0135 0136 /// This node's predecessors in the exec plan 0137 const NodeVector& inputs() const { return inputs_; } 0138 0139 /// True if the plan has no output schema (is a sink) 0140 bool is_sink() const { return !output_schema_; } 0141 0142 /// \brief Labels identifying the function of each input. 0143 const std::vector<std::string>& input_labels() const { return input_labels_; } 0144 0145 /// This node's successor in the exec plan 0146 const ExecNode* output() const { return output_; } 0147 0148 /// The datatypes for batches produced by this node 0149 const std::shared_ptr<Schema>& output_schema() const { return output_schema_; } 0150 0151 /// This node's exec plan 0152 ExecPlan* plan() { return plan_; } 0153 0154 /// \brief An optional label, for display and debugging 0155 /// 0156 /// There is no guarantee that this value is non-empty or unique. 0157 const std::string& label() const { return label_; } 0158 void SetLabel(std::string label) { label_ = std::move(label); } 0159 0160 virtual Status Validate() const; 0161 0162 /// \brief the ordering of the output batches 0163 /// 0164 /// This does not guarantee the batches will be emitted by this node 0165 /// in order. Instead it guarantees that the batches will have their 0166 /// ExecBatch::index property set in a way that respects this ordering. 0167 /// 0168 /// In other words, given the ordering {{"x", SortOrder::Ascending}} we 0169 /// know that all values of x in a batch with index N will be less than 0170 /// or equal to all values of x in a batch with index N+k (assuming k > 0). 0171 /// Furthermore, we also know that values will be sorted within a batch. 0172 /// Any row N will have a value of x that is less than the value for 0173 /// any row N+k. 0174 /// 0175 /// Note that an ordering can be both Ordering::Unordered and Ordering::Implicit. 0176 /// A node's output should be marked Ordering::Unordered if the order is 0177 /// non-deterministic. For example, a hash-join has no predictable output order. 0178 /// 0179 /// If the ordering is Ordering::Implicit then there is a meaningful order but that 0180 /// ordering is not represented by any column in the data. The most common case for 0181 /// this is when reading data from an in-memory table. The data has an implicit "row 0182 /// order" which is not necessarily represented in the data set. 0183 /// 0184 /// A filter or project node will not modify the ordering. Nothing needs to be done 0185 /// other than ensure the index assigned to output batches is the same as the 0186 /// input batch that was mapped. 0187 /// 0188 /// Other nodes may introduce order. For example, an order-by node will emit 0189 /// a brand new ordering independent of the input ordering. 0190 /// 0191 /// Finally, as described above, such as a hash-join or aggregation may may 0192 /// destroy ordering (although these nodes could also choose to establish a 0193 /// new ordering based on the hash keys). 0194 /// 0195 /// Some nodes will require an ordering. For example, a fetch node or an 0196 /// asof join node will only function if the input data is ordered (for fetch 0197 /// it is enough to be implicitly ordered. For an asof join the ordering must 0198 /// be explicit and compatible with the on key.) 0199 /// 0200 /// Nodes that maintain ordering should be careful to avoid introducing gaps 0201 /// in the batch index. This may require emitting empty batches in order to 0202 /// maintain continuity. 0203 virtual const Ordering& ordering() const; 0204 0205 /// Upstream API: 0206 /// These functions are called by input nodes that want to inform this node 0207 /// about an updated condition (a new input batch or an impending 0208 /// end of stream). 0209 /// 0210 /// Implementation rules: 0211 /// - these may be called anytime after StartProducing() has succeeded 0212 /// (and even during or after StopProducing()) 0213 /// - these may be called concurrently 0214 /// - these are allowed to call back into PauseProducing(), ResumeProducing() 0215 /// and StopProducing() 0216 0217 /// Transfer input batch to ExecNode 0218 /// 0219 /// A node will typically perform some kind of operation on the batch 0220 /// and then call InputReceived on its outputs with the result. 0221 /// 0222 /// Other nodes may need to accumulate some number of inputs before any 0223 /// output can be produced. These nodes will add the batch to some kind 0224 /// of in-memory accumulation queue and return. 0225 virtual Status InputReceived(ExecNode* input, ExecBatch batch) = 0; 0226 0227 /// Mark the inputs finished after the given number of batches. 0228 /// 0229 /// This may be called before all inputs are received. This simply fixes 0230 /// the total number of incoming batches for an input, so that the ExecNode 0231 /// knows when it has received all input, regardless of order. 0232 virtual Status InputFinished(ExecNode* input, int total_batches) = 0; 0233 0234 /// \brief Perform any needed initialization 0235 /// 0236 /// This hook performs any actions in between creation of ExecPlan and the call to 0237 /// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes 0238 /// that executes this method is undefined, but the calls are made synchronously. 0239 /// 0240 /// At this point a node can rely on all inputs & outputs (and the input schemas) 0241 /// being well defined. 0242 virtual Status Init(); 0243 0244 /// Lifecycle API: 0245 /// - start / stop to initiate and terminate production 0246 /// - pause / resume to apply backpressure 0247 /// 0248 /// Implementation rules: 0249 /// - StartProducing() should not recurse into the inputs, as it is 0250 /// handled by ExecPlan::StartProducing() 0251 /// - PauseProducing(), ResumeProducing(), StopProducing() may be called 0252 /// concurrently, potentially even before the call to StartProducing 0253 /// has finished. 0254 /// - PauseProducing(), ResumeProducing(), StopProducing() may be called 0255 /// by the downstream nodes' InputReceived(), InputFinished() methods 0256 /// 0257 /// StopProducing may be called due to an error, by the user (e.g. cancel), or 0258 /// because a node has all the data it needs (e.g. limit, top-k on sorted data). 0259 /// This means the method may be called multiple times and we have the following 0260 /// additional rules 0261 /// - StopProducing() must be idempotent 0262 /// - StopProducing() must be forwarded to inputs (this is needed for the limit/top-k 0263 /// case because we may not be stopping the entire plan) 0264 0265 // Right now, since synchronous calls happen in both directions (input to 0266 // output and then output to input), a node must be careful to be reentrant 0267 // against synchronous calls from its output, *and* also concurrent calls from 0268 // other threads. The most reliable solution is to update the internal state 0269 // first, and notify outputs only at the end. 0270 // 0271 // Concurrent calls to PauseProducing and ResumeProducing can be hard to sequence 0272 // as they may travel at different speeds through the plan. 0273 // 0274 // For example, consider a resume that comes quickly after a pause. If the source 0275 // receives the resume before the pause the source may think the destination is full 0276 // and halt production which would lead to deadlock. 0277 // 0278 // To resolve this a counter is sent for all calls to pause/resume. Only the call with 0279 // the highest counter value is valid. So if a call to PauseProducing(5) comes after 0280 // a call to ResumeProducing(6) then the source should continue producing. 0281 0282 /// \brief Start producing 0283 /// 0284 /// This must only be called once. 0285 /// 0286 /// This is typically called automatically by ExecPlan::StartProducing(). 0287 virtual Status StartProducing() = 0; 0288 0289 /// \brief Pause producing temporarily 0290 /// 0291 /// \param output Pointer to the output that is full 0292 /// \param counter Counter used to sequence calls to pause/resume 0293 /// 0294 /// This call is a hint that an output node is currently not willing 0295 /// to receive data. 0296 /// 0297 /// This may be called any number of times. 0298 /// However, the node is still free to produce data (which may be difficult 0299 /// to prevent anyway if data is produced using multiple threads). 0300 virtual void PauseProducing(ExecNode* output, int32_t counter) = 0; 0301 0302 /// \brief Resume producing after a temporary pause 0303 /// 0304 /// \param output Pointer to the output that is now free 0305 /// \param counter Counter used to sequence calls to pause/resume 0306 /// 0307 /// This call is a hint that an output node is willing to receive data again. 0308 /// 0309 /// This may be called any number of times. 0310 virtual void ResumeProducing(ExecNode* output, int32_t counter) = 0; 0311 0312 /// \brief Stop producing new data 0313 /// 0314 /// If this node is a source then the source should stop generating data 0315 /// as quickly as possible. If this node is not a source then there is typically 0316 /// nothing that needs to be done although a node may choose to start ignoring incoming 0317 /// data. 0318 /// 0319 /// This method will be called when an error occurs in the plan 0320 /// This method may also be called by the user if they wish to end a plan early 0321 /// Finally, this method may be called if a node determines it no longer needs any more 0322 /// input (for example, a limit node). 0323 /// 0324 /// This method may be called multiple times. 0325 /// 0326 /// This is not a pause. There will be no way to start the source again after this has 0327 /// been called. 0328 virtual Status StopProducing(); 0329 0330 std::string ToString(int indent = 0) const; 0331 0332 protected: 0333 ExecNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels, 0334 std::shared_ptr<Schema> output_schema); 0335 0336 virtual Status StopProducingImpl() = 0; 0337 0338 /// Provide extra info to include in the string representation. 0339 virtual std::string ToStringExtra(int indent = 0) const; 0340 0341 std::atomic<bool> stopped_; 0342 ExecPlan* plan_; 0343 std::string label_; 0344 0345 NodeVector inputs_; 0346 std::vector<std::string> input_labels_; 0347 0348 std::shared_ptr<Schema> output_schema_; 0349 ExecNode* output_ = NULLPTR; 0350 }; 0351 0352 /// \brief An extensible registry for factories of ExecNodes 0353 class ARROW_ACERO_EXPORT ExecFactoryRegistry { 0354 public: 0355 using Factory = std::function<Result<ExecNode*>(ExecPlan*, std::vector<ExecNode*>, 0356 const ExecNodeOptions&)>; 0357 0358 virtual ~ExecFactoryRegistry() = default; 0359 0360 /// \brief Get the named factory from this registry 0361 /// 0362 /// will raise if factory_name is not found 0363 virtual Result<Factory> GetFactory(const std::string& factory_name) = 0; 0364 0365 /// \brief Add a factory to this registry with the provided name 0366 /// 0367 /// will raise if factory_name is already in the registry 0368 virtual Status AddFactory(std::string factory_name, Factory factory) = 0; 0369 }; 0370 0371 /// The default registry, which includes built-in factories. 0372 ARROW_ACERO_EXPORT 0373 ExecFactoryRegistry* default_exec_factory_registry(); 0374 0375 /// \brief Construct an ExecNode using the named factory 0376 inline Result<ExecNode*> MakeExecNode( 0377 const std::string& factory_name, ExecPlan* plan, std::vector<ExecNode*> inputs, 0378 const ExecNodeOptions& options, 0379 ExecFactoryRegistry* registry = default_exec_factory_registry()) { 0380 ARROW_ASSIGN_OR_RAISE(auto factory, registry->GetFactory(factory_name)); 0381 return factory(plan, std::move(inputs), options); 0382 } 0383 0384 /// @} 0385 0386 /// \addtogroup acero-api 0387 /// @{ 0388 0389 /// \brief Helper class for declaring execution nodes 0390 /// 0391 /// A Declaration represents an unconstructed ExecNode (and potentially an entire graph 0392 /// since its inputs may also be Declarations) 0393 /// 0394 /// A Declaration can be converted to a plan and executed using one of the 0395 /// DeclarationToXyz methods. 0396 /// 0397 /// For more direct control, a Declaration can be added to an existing execution 0398 /// plan with Declaration::AddToPlan, which will recursively construct any inputs as 0399 /// necessary. 0400 struct ARROW_ACERO_EXPORT Declaration { 0401 using Input = std::variant<ExecNode*, Declaration>; 0402 0403 Declaration() {} 0404 0405 /// \brief construct a declaration 0406 /// \param factory_name the name of the exec node to construct. The node must have 0407 /// been added to the exec node registry with this name. 0408 /// \param inputs the inputs to the node, these should be other declarations 0409 /// \param options options that control the behavior of the node. You must use 0410 /// the appropriate subclass. For example, if `factory_name` is 0411 /// "project" then `options` should be ProjectNodeOptions. 0412 /// \param label a label to give the node. Can be used to distinguish it from other 0413 /// nodes of the same type in the plan. 0414 Declaration(std::string factory_name, std::vector<Input> inputs, 0415 std::shared_ptr<ExecNodeOptions> options, std::string label) 0416 : factory_name{std::move(factory_name)}, 0417 inputs{std::move(inputs)}, 0418 options{std::move(options)}, 0419 label{std::move(label)} {} 0420 0421 template <typename Options> 0422 Declaration(std::string factory_name, std::vector<Input> inputs, Options options, 0423 std::string label) 0424 : Declaration{std::move(factory_name), std::move(inputs), 0425 std::shared_ptr<ExecNodeOptions>( 0426 std::make_shared<Options>(std::move(options))), 0427 std::move(label)} {} 0428 0429 template <typename Options> 0430 Declaration(std::string factory_name, std::vector<Input> inputs, Options options) 0431 : Declaration{std::move(factory_name), std::move(inputs), std::move(options), 0432 /*label=*/""} {} 0433 0434 template <typename Options> 0435 Declaration(std::string factory_name, Options options) 0436 : Declaration{std::move(factory_name), {}, std::move(options), /*label=*/""} {} 0437 0438 template <typename Options> 0439 Declaration(std::string factory_name, Options options, std::string label) 0440 : Declaration{std::move(factory_name), {}, std::move(options), std::move(label)} {} 0441 0442 /// \brief Convenience factory for the common case of a simple sequence of nodes. 0443 /// 0444 /// Each of decls will be appended to the inputs of the subsequent declaration, 0445 /// and the final modified declaration will be returned. 0446 /// 0447 /// Without this convenience factory, constructing a sequence would require explicit, 0448 /// difficult-to-read nesting: 0449 /// 0450 /// Declaration{"n3", 0451 /// { 0452 /// Declaration{"n2", 0453 /// { 0454 /// Declaration{"n1", 0455 /// { 0456 /// Declaration{"n0", N0Opts{}}, 0457 /// }, 0458 /// N1Opts{}}, 0459 /// }, 0460 /// N2Opts{}}, 0461 /// }, 0462 /// N3Opts{}}; 0463 /// 0464 /// An equivalent Declaration can be constructed more tersely using Sequence: 0465 /// 0466 /// Declaration::Sequence({ 0467 /// {"n0", N0Opts{}}, 0468 /// {"n1", N1Opts{}}, 0469 /// {"n2", N2Opts{}}, 0470 /// {"n3", N3Opts{}}, 0471 /// }); 0472 static Declaration Sequence(std::vector<Declaration> decls); 0473 0474 /// \brief add the declaration to an already created execution plan 0475 /// \param plan the plan to add the node to 0476 /// \param registry the registry to use to lookup the node factory 0477 /// 0478 /// This method will recursively call AddToPlan on all of the declaration's inputs. 0479 /// This method is only for advanced use when the DeclarationToXyz methods are not 0480 /// sufficient. 0481 /// 0482 /// \return the instantiated execution node 0483 Result<ExecNode*> AddToPlan(ExecPlan* plan, ExecFactoryRegistry* registry = 0484 default_exec_factory_registry()) const; 0485 0486 // Validate a declaration 0487 bool IsValid(ExecFactoryRegistry* registry = default_exec_factory_registry()) const; 0488 0489 /// \brief the name of the factory to use when creating a node 0490 std::string factory_name; 0491 /// \brief the declarations's inputs 0492 std::vector<Input> inputs; 0493 /// \brief options to control the behavior of the node 0494 std::shared_ptr<ExecNodeOptions> options; 0495 /// \brief a label to give the node in the plan 0496 std::string label; 0497 }; 0498 0499 /// \brief How to handle unaligned buffers 0500 enum class UnalignedBufferHandling { kWarn, kIgnore, kReallocate, kError }; 0501 0502 /// \brief get the default behavior of unaligned buffer handling 0503 /// 0504 /// This is configurable via the ACERO_ALIGNMENT_HANDLING environment variable which 0505 /// can be set to "warn", "ignore", "reallocate", or "error". If the environment 0506 /// variable is not set, or is set to an invalid value, this will return kWarn 0507 UnalignedBufferHandling GetDefaultUnalignedBufferHandling(); 0508 0509 /// \brief plan-wide options that can be specified when executing an execution plan 0510 struct ARROW_ACERO_EXPORT QueryOptions { 0511 /// \brief Should the plan use a legacy batching strategy 0512 /// 0513 /// This is currently in place only to support the Scanner::ToTable 0514 /// method. This method relies on batch indices from the scanner 0515 /// remaining consistent. This is impractical in the ExecPlan which 0516 /// might slice batches as needed (e.g. for a join) 0517 /// 0518 /// However, it still works for simple plans and this is the only way 0519 /// we have at the moment for maintaining implicit order. 0520 bool use_legacy_batching = false; 0521 0522 /// If the output has a meaningful order then sequence the output of the plan 0523 /// 0524 /// The default behavior (std::nullopt) will sequence output batches if there 0525 /// is a meaningful ordering in the final node and will emit batches immediately 0526 /// otherwise. 0527 /// 0528 /// If explicitly set to true then plan execution will fail if there is no 0529 /// meaningful ordering. This can be useful to validate a query that should 0530 /// be emitting ordered results. 0531 /// 0532 /// If explicitly set to false then batches will be emit immediately even if there 0533 /// is a meaningful ordering. This could cause batches to be emit out of order but 0534 /// may offer a small decrease to latency. 0535 std::optional<bool> sequence_output = std::nullopt; 0536 0537 /// \brief should the plan use multiple background threads for CPU-intensive work 0538 /// 0539 /// If this is false then all CPU work will be done on the calling thread. I/O tasks 0540 /// will still happen on the I/O executor and may be multi-threaded (but should not use 0541 /// significant CPU resources). 0542 /// 0543 /// Will be ignored if custom_cpu_executor is set 0544 bool use_threads = true; 0545 0546 /// \brief custom executor to use for CPU-intensive work 0547 /// 0548 /// Must be null or remain valid for the duration of the plan. If this is null then 0549 /// a default thread pool will be chosen whose behavior will be controlled by 0550 /// the `use_threads` option. 0551 ::arrow::internal::Executor* custom_cpu_executor = NULLPTR; 0552 0553 /// \brief custom executor to use for IO work 0554 /// 0555 /// Must be null or remain valid for the duration of the plan. If this is null then 0556 /// the global io thread pool will be chosen whose behavior will be controlled by 0557 /// the "ARROW_IO_THREADS" environment. 0558 ::arrow::internal::Executor* custom_io_executor = NULLPTR; 0559 0560 /// \brief a memory pool to use for allocations 0561 /// 0562 /// Must remain valid for the duration of the plan. 0563 MemoryPool* memory_pool = default_memory_pool(); 0564 0565 /// \brief a function registry to use for the plan 0566 /// 0567 /// Must remain valid for the duration of the plan. 0568 FunctionRegistry* function_registry = GetFunctionRegistry(); 0569 /// \brief the names of the output columns 0570 /// 0571 /// If this is empty then names will be generated based on the input columns 0572 /// 0573 /// If set then the number of names must equal the number of output columns 0574 std::vector<std::string> field_names; 0575 0576 /// \brief Policy for unaligned buffers in source data 0577 /// 0578 /// Various compute functions and acero internals will type pun array 0579 /// buffers from uint8_t* to some kind of value type (e.g. we might 0580 /// cast to int32_t* to add two int32 arrays) 0581 /// 0582 /// If the buffer is poorly aligned (e.g. an int32 array is not aligned 0583 /// on a 4-byte boundary) then this is technically undefined behavior in C++. 0584 /// However, most modern compilers and CPUs are fairly tolerant of this 0585 /// behavior and nothing bad (beyond a small hit to performance) is likely 0586 /// to happen. 0587 /// 0588 /// Note that this only applies to source buffers. All buffers allocated internally 0589 /// by Acero will be suitably aligned. 0590 /// 0591 /// If this field is set to kWarn then Acero will check if any buffers are unaligned 0592 /// and, if they are, will emit a warning. 0593 /// 0594 /// If this field is set to kReallocate then Acero will allocate a new, suitably aligned 0595 /// buffer and copy the contents from the old buffer into this new buffer. 0596 /// 0597 /// If this field is set to kError then Acero will gracefully abort the plan instead. 0598 /// 0599 /// If this field is set to kIgnore then Acero will not even check if the buffers are 0600 /// unaligned. 0601 /// 0602 /// If this field is not set then it will be treated as kWarn unless overridden 0603 /// by the ACERO_ALIGNMENT_HANDLING environment variable 0604 std::optional<UnalignedBufferHandling> unaligned_buffer_handling; 0605 }; 0606 0607 /// \brief Calculate the output schema of a declaration 0608 /// 0609 /// This does not actually execute the plan. This operation may fail if the 0610 /// declaration represents an invalid plan (e.g. a project node with multiple inputs) 0611 /// 0612 /// \param declaration A declaration describing an execution plan 0613 /// \param function_registry The function registry to use for function execution. If null 0614 /// then the default function registry will be used. 0615 /// 0616 /// \return the schema that batches would have after going through the execution plan 0617 ARROW_ACERO_EXPORT Result<std::shared_ptr<Schema>> DeclarationToSchema( 0618 const Declaration& declaration, FunctionRegistry* function_registry = NULLPTR); 0619 0620 /// \brief Create a string representation of a plan 0621 /// 0622 /// This representation is for debug purposes only. 0623 /// 0624 /// Conversion to a string may fail if the declaration represents an 0625 /// invalid plan. 0626 /// 0627 /// Use Substrait for complete serialization of plans 0628 /// 0629 /// \param declaration A declaration describing an execution plan 0630 /// \param function_registry The function registry to use for function execution. If null 0631 /// then the default function registry will be used. 0632 /// 0633 /// \return a string representation of the plan suitable for debugging output 0634 ARROW_ACERO_EXPORT Result<std::string> DeclarationToString( 0635 const Declaration& declaration, FunctionRegistry* function_registry = NULLPTR); 0636 0637 /// \brief Utility method to run a declaration and collect the results into a table 0638 /// 0639 /// \param declaration A declaration describing the plan to run 0640 /// \param use_threads If `use_threads` is false then all CPU work will be done on the 0641 /// calling thread. I/O tasks will still happen on the I/O executor 0642 /// and may be multi-threaded (but should not use significant CPU 0643 /// resources). 0644 /// \param memory_pool The memory pool to use for allocations made while running the plan. 0645 /// \param function_registry The function registry to use for function execution. If null 0646 /// then the default function registry will be used. 0647 /// 0648 /// This method will add a sink node to the declaration to collect results into a 0649 /// table. It will then create an ExecPlan from the declaration, start the exec plan, 0650 /// block until the plan has finished, and return the created table. 0651 ARROW_ACERO_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable( 0652 Declaration declaration, bool use_threads = true, 0653 MemoryPool* memory_pool = default_memory_pool(), 0654 FunctionRegistry* function_registry = NULLPTR); 0655 0656 ARROW_ACERO_EXPORT Result<std::shared_ptr<Table>> DeclarationToTable( 0657 Declaration declaration, QueryOptions query_options); 0658 0659 /// \brief Asynchronous version of \see DeclarationToTable 0660 /// 0661 /// \param declaration A declaration describing the plan to run 0662 /// \param use_threads The behavior of use_threads is slightly different than the 0663 /// synchronous version since we cannot run synchronously on the 0664 /// calling thread. Instead, if use_threads=false then a new thread 0665 /// pool will be created with a single thread and this will be used for 0666 /// all compute work. 0667 /// \param memory_pool The memory pool to use for allocations made while running the plan. 0668 /// \param function_registry The function registry to use for function execution. If null 0669 /// then the default function registry will be used. 0670 ARROW_ACERO_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync( 0671 Declaration declaration, bool use_threads = true, 0672 MemoryPool* memory_pool = default_memory_pool(), 0673 FunctionRegistry* function_registry = NULLPTR); 0674 0675 /// \brief Overload of \see DeclarationToTableAsync accepting a custom exec context 0676 /// 0677 /// The executor must be specified (cannot be null) and must be kept alive until the 0678 /// returned future finishes. 0679 ARROW_ACERO_EXPORT Future<std::shared_ptr<Table>> DeclarationToTableAsync( 0680 Declaration declaration, ExecContext custom_exec_context); 0681 0682 /// \brief a collection of exec batches with a common schema 0683 struct BatchesWithCommonSchema { 0684 std::vector<ExecBatch> batches; 0685 std::shared_ptr<Schema> schema; 0686 }; 0687 0688 /// \brief Utility method to run a declaration and collect the results into ExecBatch 0689 /// vector 0690 /// 0691 /// \see DeclarationToTable for details on threading & execution 0692 ARROW_ACERO_EXPORT Result<BatchesWithCommonSchema> DeclarationToExecBatches( 0693 Declaration declaration, bool use_threads = true, 0694 MemoryPool* memory_pool = default_memory_pool(), 0695 FunctionRegistry* function_registry = NULLPTR); 0696 0697 ARROW_ACERO_EXPORT Result<BatchesWithCommonSchema> DeclarationToExecBatches( 0698 Declaration declaration, QueryOptions query_options); 0699 0700 /// \brief Asynchronous version of \see DeclarationToExecBatches 0701 /// 0702 /// \see DeclarationToTableAsync for details on threading & execution 0703 ARROW_ACERO_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync( 0704 Declaration declaration, bool use_threads = true, 0705 MemoryPool* memory_pool = default_memory_pool(), 0706 FunctionRegistry* function_registry = NULLPTR); 0707 0708 /// \brief Overload of \see DeclarationToExecBatchesAsync accepting a custom exec context 0709 /// 0710 /// \see DeclarationToTableAsync for details on threading & execution 0711 ARROW_ACERO_EXPORT Future<BatchesWithCommonSchema> DeclarationToExecBatchesAsync( 0712 Declaration declaration, ExecContext custom_exec_context); 0713 0714 /// \brief Utility method to run a declaration and collect the results into a vector 0715 /// 0716 /// \see DeclarationToTable for details on threading & execution 0717 ARROW_ACERO_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches( 0718 Declaration declaration, bool use_threads = true, 0719 MemoryPool* memory_pool = default_memory_pool(), 0720 FunctionRegistry* function_registry = NULLPTR); 0721 0722 ARROW_ACERO_EXPORT Result<std::vector<std::shared_ptr<RecordBatch>>> DeclarationToBatches( 0723 Declaration declaration, QueryOptions query_options); 0724 0725 /// \brief Asynchronous version of \see DeclarationToBatches 0726 /// 0727 /// \see DeclarationToTableAsync for details on threading & execution 0728 ARROW_ACERO_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> 0729 DeclarationToBatchesAsync(Declaration declaration, bool use_threads = true, 0730 MemoryPool* memory_pool = default_memory_pool(), 0731 FunctionRegistry* function_registry = NULLPTR); 0732 0733 /// \brief Overload of \see DeclarationToBatchesAsync accepting a custom exec context 0734 /// 0735 /// \see DeclarationToTableAsync for details on threading & execution 0736 ARROW_ACERO_EXPORT Future<std::vector<std::shared_ptr<RecordBatch>>> 0737 DeclarationToBatchesAsync(Declaration declaration, ExecContext exec_context); 0738 0739 /// \brief Utility method to run a declaration and return results as a RecordBatchReader 0740 /// 0741 /// If an exec context is not provided then a default exec context will be used based 0742 /// on the value of `use_threads`. If `use_threads` is false then the CPU executor will 0743 /// be a serial executor and all CPU work will be done on the calling thread. I/O tasks 0744 /// will still happen on the I/O executor and may be multi-threaded. 0745 /// 0746 /// If `use_threads` is false then all CPU work will happen during the calls to 0747 /// RecordBatchReader::Next and no CPU work will happen in the background. If 0748 /// `use_threads` is true then CPU work will happen on the CPU thread pool and tasks may 0749 /// run in between calls to RecordBatchReader::Next. If the returned reader is not 0750 /// consumed quickly enough then the plan will eventually pause as the backpressure queue 0751 /// fills up. 0752 /// 0753 /// If a custom exec context is provided then the value of `use_threads` will be ignored. 0754 /// 0755 /// The returned RecordBatchReader can be closed early to cancel the computation of record 0756 /// batches. In this case, only errors encountered by the computation may be reported. In 0757 /// particular, no cancellation error may be reported. 0758 ARROW_ACERO_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader( 0759 Declaration declaration, bool use_threads = true, 0760 MemoryPool* memory_pool = default_memory_pool(), 0761 FunctionRegistry* function_registry = NULLPTR); 0762 0763 ARROW_ACERO_EXPORT Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader( 0764 Declaration declaration, QueryOptions query_options); 0765 0766 /// \brief Utility method to run a declaration and ignore results 0767 /// 0768 /// This can be useful when the data are consumed as part of the plan itself, for 0769 /// example, when the plan ends with a write node. 0770 /// 0771 /// \see DeclarationToTable for details on threading & execution 0772 ARROW_ACERO_EXPORT Status 0773 DeclarationToStatus(Declaration declaration, bool use_threads = true, 0774 MemoryPool* memory_pool = default_memory_pool(), 0775 FunctionRegistry* function_registry = NULLPTR); 0776 0777 ARROW_ACERO_EXPORT Status DeclarationToStatus(Declaration declaration, 0778 QueryOptions query_options); 0779 0780 /// \brief Asynchronous version of \see DeclarationToStatus 0781 /// 0782 /// This can be useful when the data are consumed as part of the plan itself, for 0783 /// example, when the plan ends with a write node. 0784 /// 0785 /// \see DeclarationToTableAsync for details on threading & execution 0786 ARROW_ACERO_EXPORT Future<> DeclarationToStatusAsync( 0787 Declaration declaration, bool use_threads = true, 0788 MemoryPool* memory_pool = default_memory_pool(), 0789 FunctionRegistry* function_registry = NULLPTR); 0790 0791 /// \brief Overload of \see DeclarationToStatusAsync accepting a custom exec context 0792 /// 0793 /// \see DeclarationToTableAsync for details on threading & execution 0794 ARROW_ACERO_EXPORT Future<> DeclarationToStatusAsync(Declaration declaration, 0795 ExecContext exec_context); 0796 0797 /// @} 0798 0799 /// \brief Wrap an ExecBatch generator in a RecordBatchReader. 0800 /// 0801 /// The RecordBatchReader does not impose any ordering on emitted batches. 0802 ARROW_ACERO_EXPORT 0803 std::shared_ptr<RecordBatchReader> MakeGeneratorReader( 0804 std::shared_ptr<Schema>, std::function<Future<std::optional<ExecBatch>>()>, 0805 MemoryPool*); 0806 0807 constexpr int kDefaultBackgroundMaxQ = 32; 0808 constexpr int kDefaultBackgroundQRestart = 16; 0809 0810 /// \brief Make a generator of RecordBatchReaders 0811 /// 0812 /// Useful as a source node for an Exec plan 0813 ARROW_ACERO_EXPORT 0814 Result<std::function<Future<std::optional<ExecBatch>>()>> MakeReaderGenerator( 0815 std::shared_ptr<RecordBatchReader> reader, arrow::internal::Executor* io_executor, 0816 int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart); 0817 0818 } // namespace acero 0819 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |