Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <functional>
0021 #include <memory>
0022 #include <optional>
0023 #include <string>
0024 #include <vector>
0025 
0026 #include "arrow/acero/type_fwd.h"
0027 #include "arrow/acero/visibility.h"
0028 #include "arrow/compute/api_aggregate.h"
0029 #include "arrow/compute/api_vector.h"
0030 #include "arrow/compute/exec.h"
0031 #include "arrow/compute/expression.h"
0032 #include "arrow/record_batch.h"
0033 #include "arrow/result.h"
0034 #include "arrow/util/async_generator.h"
0035 #include "arrow/util/async_util.h"
0036 
0037 namespace arrow {
0038 
0039 using compute::Aggregate;
0040 using compute::ExecBatch;
0041 using compute::Expression;
0042 using compute::literal;
0043 using compute::Ordering;
0044 using compute::SelectKOptions;
0045 using compute::SortOptions;
0046 
0047 namespace internal {
0048 
0049 class Executor;
0050 
0051 }  // namespace internal
0052 
0053 namespace acero {
0054 
0055 /// \brief This must not be used in release-mode
0056 struct DebugOptions;
0057 
0058 using AsyncExecBatchGenerator = AsyncGenerator<std::optional<ExecBatch>>;
0059 
0060 /// \addtogroup acero-nodes
0061 /// @{
0062 
0063 /// \brief A base class for all options objects
0064 ///
0065 /// The only time this is used directly is when a node has no configuration
0066 class ARROW_ACERO_EXPORT ExecNodeOptions {
0067  public:
0068   virtual ~ExecNodeOptions() = default;
0069 
0070   /// \brief This must not be used in release-mode
0071   std::shared_ptr<DebugOptions> debug_opts;
0072 };
0073 
0074 /// \brief A node representing a generic source of data for Acero
0075 ///
0076 /// The source node will start calling `generator` during StartProducing.  An initial
0077 /// task will be created that will call `generator`.  It will not call `generator`
0078 /// reentrantly.  If the source can be read in parallel then those details should be
0079 /// encapsulated within `generator`.
0080 ///
0081 /// For each batch received a new task will be created to push that batch downstream.
0082 /// This task will slice smaller units of size `ExecPlan::kMaxBatchSize` from the
0083 /// parent batch and call InputReceived.  Thus, if the `generator` yields a large
0084 /// batch it may result in several calls to InputReceived.
0085 ///
0086 /// The SourceNode will, by default, assign an implicit ordering to outgoing batches.
0087 /// This is valid as long as the generator generates batches in a deterministic fashion.
0088 /// Currently, the only way to override this is to subclass the SourceNode.
0089 ///
0090 /// This node is not generally used directly but can serve as the basis for various
0091 /// specialized nodes.
0092 class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
0093  public:
0094   /// Create an instance from values
0095   SourceNodeOptions(std::shared_ptr<Schema> output_schema,
0096                     std::function<Future<std::optional<ExecBatch>>()> generator,
0097                     Ordering ordering = Ordering::Unordered())
0098       : output_schema(std::move(output_schema)),
0099         generator(std::move(generator)),
0100         ordering(std::move(ordering)) {}
0101 
0102   /// \brief the schema for batches that will be generated by this source
0103   std::shared_ptr<Schema> output_schema;
0104   /// \brief an asynchronous stream of batches ending with std::nullopt
0105   std::function<Future<std::optional<ExecBatch>>()> generator;
0106 
0107   Ordering ordering = Ordering::Unordered();
0108 };
0109 
0110 /// \brief a node that generates data from a table already loaded in memory
0111 ///
0112 /// The table source node will slice off chunks, defined by `max_batch_size`
0113 /// for parallel processing.  The table source node extends source node and so these
0114 /// chunks will be iteratively processed in small batches.  \see SourceNodeOptions
0115 /// for details.
0116 class ARROW_ACERO_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
0117  public:
0118   static constexpr int64_t kDefaultMaxBatchSize = 1 << 20;
0119 
0120   /// Create an instance from values
0121   TableSourceNodeOptions(std::shared_ptr<Table> table,
0122                          int64_t max_batch_size = kDefaultMaxBatchSize)
0123       : table(std::move(table)), max_batch_size(max_batch_size) {}
0124 
0125   /// \brief a table which acts as the data source
0126   std::shared_ptr<Table> table;
0127   /// \brief size of batches to emit from this node
0128   /// If the table is larger the node will emit multiple batches from the
0129   /// the table to be processed in parallel.
0130   int64_t max_batch_size;
0131 };
0132 
0133 /// \brief define a lazily resolved Arrow table.
0134 ///
0135 /// The table uniquely identified by the names can typically be resolved at the time when
0136 /// the plan is to be consumed.
0137 ///
0138 /// This node is for serialization purposes only and can never be executed.
0139 class ARROW_ACERO_EXPORT NamedTableNodeOptions : public ExecNodeOptions {
0140  public:
0141   /// Create an instance from values
0142   NamedTableNodeOptions(std::vector<std::string> names, std::shared_ptr<Schema> schema)
0143       : names(std::move(names)), schema(std::move(schema)) {}
0144 
0145   /// \brief the names to put in the serialized plan
0146   std::vector<std::string> names;
0147   /// \brief the output schema of the table
0148   std::shared_ptr<Schema> schema;
0149 };
0150 
0151 /// \brief a source node which feeds data from a synchronous iterator of batches
0152 ///
0153 /// ItMaker is a maker of an iterator of tabular data.
0154 ///
0155 /// The node can be configured to use an I/O executor.  If set then each time the
0156 /// iterator is polled a new I/O thread task will be created to do the polling.  This
0157 /// allows a blocking iterator to stay off the CPU thread pool.
0158 template <typename ItMaker>
0159 class ARROW_ACERO_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
0160  public:
0161   /// Create an instance that will create a new task on io_executor for each iteration
0162   SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
0163                           arrow::internal::Executor* io_executor)
0164       : schema(std::move(schema)),
0165         it_maker(std::move(it_maker)),
0166         io_executor(io_executor),
0167         requires_io(true) {}
0168 
0169   /// Create an instance that will either iterate synchronously or use the default I/O
0170   /// executor
0171   SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
0172                           bool requires_io = false)
0173       : schema(std::move(schema)),
0174         it_maker(std::move(it_maker)),
0175         io_executor(NULLPTR),
0176         requires_io(requires_io) {}
0177 
0178   /// \brief The schema of the record batches from the iterator
0179   std::shared_ptr<Schema> schema;
0180 
0181   /// \brief A maker of an iterator which acts as the data source
0182   ItMaker it_maker;
0183 
0184   /// \brief The executor to use for scanning the iterator
0185   ///
0186   /// Defaults to the default I/O executor.  Only used if requires_io is true.
0187   /// If requires_io is false then this MUST be nullptr.
0188   arrow::internal::Executor* io_executor;
0189 
0190   /// \brief If true then items will be fetched from the iterator on a dedicated I/O
0191   ///        thread to keep I/O off the CPU thread
0192   bool requires_io;
0193 };
0194 
0195 /// a source node that reads from a RecordBatchReader
0196 ///
0197 /// Each iteration of the RecordBatchReader will be run on a new thread task created
0198 /// on the I/O thread pool.
0199 class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions {
0200  public:
0201   /// Create an instance from values
0202   RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader,
0203                                      arrow::internal::Executor* io_executor = NULLPTR)
0204       : reader(std::move(reader)), io_executor(io_executor) {}
0205 
0206   /// \brief The RecordBatchReader which acts as the data source
0207   std::shared_ptr<RecordBatchReader> reader;
0208 
0209   /// \brief The executor to use for the reader
0210   ///
0211   /// Defaults to the default I/O executor.
0212   arrow::internal::Executor* io_executor;
0213 };
0214 
0215 /// a source node that reads from an iterator of array vectors
0216 using ArrayVectorIteratorMaker = std::function<Iterator<std::shared_ptr<ArrayVector>>()>;
0217 /// \brief An extended Source node which accepts a schema and array-vectors
0218 class ARROW_ACERO_EXPORT ArrayVectorSourceNodeOptions
0219     : public SchemaSourceNodeOptions<ArrayVectorIteratorMaker> {
0220   using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
0221 };
0222 
0223 /// a source node that reads from an iterator of ExecBatch
0224 using ExecBatchIteratorMaker = std::function<Iterator<std::shared_ptr<ExecBatch>>()>;
0225 /// \brief An extended Source node which accepts a schema and exec-batches
0226 class ARROW_ACERO_EXPORT ExecBatchSourceNodeOptions
0227     : public SchemaSourceNodeOptions<ExecBatchIteratorMaker> {
0228  public:
0229   using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
0230   ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema,
0231                              std::vector<ExecBatch> batches,
0232                              ::arrow::internal::Executor* io_executor);
0233   ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema,
0234                              std::vector<ExecBatch> batches, bool requires_io = false);
0235 };
0236 
0237 using RecordBatchIteratorMaker = std::function<Iterator<std::shared_ptr<RecordBatch>>()>;
0238 /// a source node that reads from an iterator of RecordBatch
0239 class ARROW_ACERO_EXPORT RecordBatchSourceNodeOptions
0240     : public SchemaSourceNodeOptions<RecordBatchIteratorMaker> {
0241   using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
0242 };
0243 
0244 /// \brief a node which excludes some rows from batches passed through it
0245 ///
0246 /// filter_expression will be evaluated against each batch which is pushed to
0247 /// this node. Any rows for which filter_expression does not evaluate to `true` will be
0248 /// excluded in the batch emitted by this node.
0249 ///
0250 /// This node will emit empty batches if all rows are excluded.  This is done
0251 /// to avoid gaps in the ordering.
0252 class ARROW_ACERO_EXPORT FilterNodeOptions : public ExecNodeOptions {
0253  public:
0254   /// \brief create an instance from values
0255   explicit FilterNodeOptions(Expression filter_expression)
0256       : filter_expression(std::move(filter_expression)) {}
0257 
0258   /// \brief the expression to filter batches
0259   ///
0260   /// The return type of this expression must be boolean
0261   Expression filter_expression;
0262 };
0263 
0264 /// \brief a node which selects a specified subset from the input
0265 class ARROW_ACERO_EXPORT FetchNodeOptions : public ExecNodeOptions {
0266  public:
0267   static constexpr std::string_view kName = "fetch";
0268   /// \brief create an instance from values
0269   FetchNodeOptions(int64_t offset, int64_t count) : offset(offset), count(count) {}
0270   /// \brief the number of rows to skip
0271   int64_t offset;
0272   /// \brief the number of rows to keep (not counting skipped rows)
0273   int64_t count;
0274 };
0275 
0276 /// \brief a node which executes expressions on input batches, producing batches
0277 /// of the same length with new columns.
0278 ///
0279 /// Each expression will be evaluated against each batch which is pushed to
0280 /// this node to produce a corresponding output column.
0281 ///
0282 /// If names are not provided, the string representations of exprs will be used.
0283 class ARROW_ACERO_EXPORT ProjectNodeOptions : public ExecNodeOptions {
0284  public:
0285   /// \brief create an instance from values
0286   explicit ProjectNodeOptions(std::vector<Expression> expressions,
0287                               std::vector<std::string> names = {})
0288       : expressions(std::move(expressions)), names(std::move(names)) {}
0289 
0290   /// \brief the expressions to run on the batches
0291   ///
0292   /// The output will have one column for each expression.  If you wish to keep any of
0293   /// the columns from the input then you should create a simple field_ref expression
0294   /// for that column.
0295   std::vector<Expression> expressions;
0296   /// \brief the names of the output columns
0297   ///
0298   /// If this is not specified then the result of calling ToString on the expression will
0299   /// be used instead
0300   ///
0301   /// This list should either be empty or have the same length as `expressions`
0302   std::vector<std::string> names;
0303 };
0304 
0305 /// \brief a node which aggregates input batches and calculates summary statistics
0306 ///
0307 /// The node can summarize the entire input or it can group the input with grouping keys
0308 /// and segment keys.
0309 ///
0310 /// By default, the aggregate node is a pipeline breaker.  It must accumulate all input
0311 /// before any output is produced.  Segment keys are a performance optimization.  If
0312 /// you know your input is already partitioned by one or more columns then you can
0313 /// specify these as segment keys.  At each change in the segment keys the node will
0314 /// emit values for all data seen so far.
0315 ///
0316 /// Segment keys are currently limited to single-threaded mode.
0317 ///
0318 /// Both keys and segment-keys determine the group.  However segment-keys are also used
0319 /// for determining grouping segments, which should be large, and allow streaming a
0320 /// partial aggregation result after processing each segment.  One common use-case for
0321 /// segment-keys is ordered aggregation, in which the segment-key attribute specifies a
0322 /// column with non-decreasing values or a lexicographically-ordered set of such columns.
0323 ///
0324 /// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
0325 /// expected to be a HashAggregate function. If the keys attribute is an empty vector,
0326 /// then each aggregate is assumed to be a ScalarAggregate function.
0327 ///
0328 /// If the segment_keys attribute is a non-empty vector, then segmented aggregation, as
0329 /// described above, applies.
0330 ///
0331 /// The keys and segment_keys vectors must be disjoint.
0332 ///
0333 /// If no measures are provided then you will simply get the list of unique keys.
0334 ///
0335 /// This node outputs segment keys first, followed by regular keys, followed by one
0336 /// column for each aggregate.
0337 class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions {
0338  public:
0339   /// \brief create an instance from values
0340   explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
0341                                 std::vector<FieldRef> keys = {},
0342                                 std::vector<FieldRef> segment_keys = {})
0343       : aggregates(std::move(aggregates)),
0344         keys(std::move(keys)),
0345         segment_keys(std::move(segment_keys)) {}
0346 
0347   // aggregations which will be applied to the targeted fields
0348   std::vector<Aggregate> aggregates;
0349   // keys by which aggregations will be grouped (optional)
0350   std::vector<FieldRef> keys;
0351   // keys by which aggregations will be segmented (optional)
0352   std::vector<FieldRef> segment_keys;
0353 };
0354 
0355 /// \brief a default value at which backpressure will be applied
0356 constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30;  // 1GiB
0357 /// \brief a default value at which backpressure will be removed
0358 constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28;  // 256MiB
0359 
0360 /// \brief an interface that can be queried for backpressure statistics
0361 class ARROW_ACERO_EXPORT BackpressureMonitor {
0362  public:
0363   virtual ~BackpressureMonitor() = default;
0364   /// \brief fetches the number of bytes currently queued up
0365   virtual uint64_t bytes_in_use() = 0;
0366   /// \brief checks to see if backpressure is currently applied
0367   virtual bool is_paused() = 0;
0368 };
0369 
0370 /// \brief Options to control backpressure behavior
0371 struct ARROW_ACERO_EXPORT BackpressureOptions {
0372   /// \brief Create default options that perform no backpressure
0373   BackpressureOptions() : resume_if_below(0), pause_if_above(0) {}
0374   /// \brief Create options that will perform backpressure
0375   ///
0376   /// \param resume_if_below The producer should resume producing if the backpressure
0377   ///                        queue has fewer than resume_if_below items.
0378   /// \param pause_if_above The producer should pause producing if the backpressure
0379   ///                       queue has more than pause_if_above items
0380   BackpressureOptions(uint64_t resume_if_below, uint64_t pause_if_above)
0381       : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {}
0382 
0383   /// \brief create an instance using default values for backpressure limits
0384   static BackpressureOptions DefaultBackpressure() {
0385     return BackpressureOptions(kDefaultBackpressureLowBytes,
0386                                kDefaultBackpressureHighBytes);
0387   }
0388 
0389   /// \brief helper method to determine if backpressure is disabled
0390   /// \return true if pause_if_above is greater than zero, false otherwise
0391   bool should_apply_backpressure() const { return pause_if_above > 0; }
0392 
0393   /// \brief the number of bytes at which the producer should resume producing
0394   uint64_t resume_if_below;
0395   /// \brief the number of bytes at which the producer should pause producing
0396   ///
0397   /// If this is <= 0 then backpressure will be disabled
0398   uint64_t pause_if_above;
0399 };
0400 
0401 /// \brief a sink node which collects results in a queue
0402 ///
0403 /// Emitted batches will only be ordered if there is a meaningful ordering
0404 /// and sequence_output is not set to false.
0405 class ARROW_ACERO_EXPORT SinkNodeOptions : public ExecNodeOptions {
0406  public:
0407   explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>* generator,
0408                            std::shared_ptr<Schema>* schema,
0409                            BackpressureOptions backpressure = {},
0410                            BackpressureMonitor** backpressure_monitor = NULLPTR,
0411                            std::optional<bool> sequence_output = std::nullopt)
0412       : generator(generator),
0413         schema(schema),
0414         backpressure(backpressure),
0415         backpressure_monitor(backpressure_monitor),
0416         sequence_output(sequence_output) {}
0417 
0418   explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>* generator,
0419                            BackpressureOptions backpressure = {},
0420                            BackpressureMonitor** backpressure_monitor = NULLPTR,
0421                            std::optional<bool> sequence_output = std::nullopt)
0422       : generator(generator),
0423         schema(NULLPTR),
0424         backpressure(std::move(backpressure)),
0425         backpressure_monitor(backpressure_monitor),
0426         sequence_output(sequence_output) {}
0427 
0428   /// \brief A pointer to a generator of batches.
0429   ///
0430   /// This will be set when the node is added to the plan and should be used to consume
0431   /// data from the plan.  If this function is not called frequently enough then the sink
0432   /// node will start to accumulate data and may apply backpressure.
0433   std::function<Future<std::optional<ExecBatch>>()>* generator;
0434   /// \brief A pointer which will be set to the schema of the generated batches
0435   ///
0436   /// This is optional, if nullptr is passed in then it will be ignored.
0437   /// This will be set when the node is added to the plan, before StartProducing is called
0438   std::shared_ptr<Schema>* schema;
0439   /// \brief Options to control when to apply backpressure
0440   ///
0441   /// This is optional, the default is to never apply backpressure.  If the plan is not
0442   /// consumed quickly enough the system may eventually run out of memory.
0443   BackpressureOptions backpressure;
0444   /// \brief A pointer to a backpressure monitor
0445   ///
0446   /// This will be set when the node is added to the plan.  This can be used to inspect
0447   /// the amount of data currently queued in the sink node.  This is an optional utility
0448   /// and backpressure can be applied even if this is not used.
0449   BackpressureMonitor** backpressure_monitor;
0450   /// \brief Controls whether batches should be emitted immediately or sequenced in order
0451   ///
0452   /// \see QueryOptions for more details
0453   std::optional<bool> sequence_output;
0454 };
0455 
0456 /// \brief Control used by a SinkNodeConsumer to pause & resume
0457 ///
0458 /// Callers should ensure that they do not call Pause and Resume simultaneously and they
0459 /// should sequence things so that a call to Pause() is always followed by an eventual
0460 /// call to Resume()
0461 class ARROW_ACERO_EXPORT BackpressureControl {
0462  public:
0463   virtual ~BackpressureControl() = default;
0464   /// \brief Ask the input to pause
0465   ///
0466   /// This is best effort, batches may continue to arrive
0467   /// Must eventually be followed by a call to Resume() or deadlock will occur
0468   virtual void Pause() = 0;
0469   /// \brief Ask the input to resume
0470   virtual void Resume() = 0;
0471 };
0472 
0473 /// \brief a sink node that consumes the data as part of the plan using callbacks
0474 class ARROW_ACERO_EXPORT SinkNodeConsumer {
0475  public:
0476   virtual ~SinkNodeConsumer() = default;
0477   /// \brief Prepare any consumer state
0478   ///
0479   /// This will be run once the schema is finalized as the plan is starting and
0480   /// before any calls to Consume.  A common use is to save off the schema so that
0481   /// batches can be interpreted.
0482   virtual Status Init(const std::shared_ptr<Schema>& schema,
0483                       BackpressureControl* backpressure_control, ExecPlan* plan) = 0;
0484   /// \brief Consume a batch of data
0485   virtual Status Consume(ExecBatch batch) = 0;
0486   /// \brief Signal to the consumer that the last batch has been delivered
0487   ///
0488   /// The returned future should only finish when all outstanding tasks have completed
0489   ///
0490   /// If the plan is ended early or aborts due to an error then this will not be
0491   /// called.
0492   virtual Future<> Finish() = 0;
0493 };
0494 
0495 /// \brief Add a sink node which consumes data within the exec plan run
0496 class ARROW_ACERO_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions {
0497  public:
0498   explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer,
0499                                     std::vector<std::string> names = {},
0500                                     std::optional<bool> sequence_output = std::nullopt)
0501       : consumer(std::move(consumer)),
0502         names(std::move(names)),
0503         sequence_output(sequence_output) {}
0504 
0505   std::shared_ptr<SinkNodeConsumer> consumer;
0506   /// \brief Names to rename the sink's schema fields to
0507   ///
0508   /// If specified then names must be provided for all fields. Currently, only a flat
0509   /// schema is supported (see GH-31875).
0510   ///
0511   /// If not specified then names will be generated based on the source data.
0512   std::vector<std::string> names;
0513   /// \brief Controls whether batches should be emitted immediately or sequenced in order
0514   ///
0515   /// \see QueryOptions for more details
0516   std::optional<bool> sequence_output;
0517 };
0518 
0519 /// \brief Make a node which sorts rows passed through it
0520 ///
0521 /// All batches pushed to this node will be accumulated, then sorted, by the given
0522 /// fields. Then sorted batches will be forwarded to the generator in sorted order.
0523 class ARROW_ACERO_EXPORT OrderBySinkNodeOptions : public SinkNodeOptions {
0524  public:
0525   /// \brief create an instance from values
0526   explicit OrderBySinkNodeOptions(
0527       SortOptions sort_options,
0528       std::function<Future<std::optional<ExecBatch>>()>* generator)
0529       : SinkNodeOptions(generator), sort_options(std::move(sort_options)) {}
0530 
0531   /// \brief options describing which columns and direction to sort
0532   SortOptions sort_options;
0533 };
0534 
0535 /// \brief Apply a new ordering to data
0536 ///
0537 /// Currently this node works by accumulating all data, sorting, and then emitting
0538 /// the new data with an updated batch index.
0539 ///
0540 /// Larger-than-memory sort is not currently supported.
0541 class ARROW_ACERO_EXPORT OrderByNodeOptions : public ExecNodeOptions {
0542  public:
0543   static constexpr std::string_view kName = "order_by";
0544   explicit OrderByNodeOptions(Ordering ordering) : ordering(std::move(ordering)) {}
0545 
0546   /// \brief The new ordering to apply to outgoing data
0547   Ordering ordering;
0548 };
0549 
0550 enum class JoinType {
0551   LEFT_SEMI,
0552   RIGHT_SEMI,
0553   LEFT_ANTI,
0554   RIGHT_ANTI,
0555   INNER,
0556   LEFT_OUTER,
0557   RIGHT_OUTER,
0558   FULL_OUTER
0559 };
0560 
0561 std::string ToString(JoinType t);
0562 
0563 enum class JoinKeyCmp { EQ, IS };
0564 
0565 /// \brief a node which implements a join operation using a hash table
0566 class ARROW_ACERO_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
0567  public:
0568   static constexpr const char* default_output_suffix_for_left = "";
0569   static constexpr const char* default_output_suffix_for_right = "";
0570   /// \brief create an instance from values that outputs all columns
0571   HashJoinNodeOptions(
0572       JoinType in_join_type, std::vector<FieldRef> in_left_keys,
0573       std::vector<FieldRef> in_right_keys, Expression filter = literal(true),
0574       std::string output_suffix_for_left = default_output_suffix_for_left,
0575       std::string output_suffix_for_right = default_output_suffix_for_right,
0576       bool disable_bloom_filter = false)
0577       : join_type(in_join_type),
0578         left_keys(std::move(in_left_keys)),
0579         right_keys(std::move(in_right_keys)),
0580         output_all(true),
0581         output_suffix_for_left(std::move(output_suffix_for_left)),
0582         output_suffix_for_right(std::move(output_suffix_for_right)),
0583         filter(std::move(filter)),
0584         disable_bloom_filter(disable_bloom_filter) {
0585     this->key_cmp.resize(this->left_keys.size());
0586     for (size_t i = 0; i < this->left_keys.size(); ++i) {
0587       this->key_cmp[i] = JoinKeyCmp::EQ;
0588     }
0589   }
0590   /// \brief create an instance from keys
0591   ///
0592   /// This will create an inner join that outputs all columns and has no post join filter
0593   ///
0594   /// `in_left_keys` should have the same length and types as `in_right_keys`
0595   /// @param in_left_keys the keys in the left input
0596   /// @param in_right_keys the keys in the right input
0597   HashJoinNodeOptions(std::vector<FieldRef> in_left_keys,
0598                       std::vector<FieldRef> in_right_keys)
0599       : left_keys(std::move(in_left_keys)), right_keys(std::move(in_right_keys)) {
0600     this->join_type = JoinType::INNER;
0601     this->output_all = true;
0602     this->output_suffix_for_left = default_output_suffix_for_left;
0603     this->output_suffix_for_right = default_output_suffix_for_right;
0604     this->key_cmp.resize(this->left_keys.size());
0605     for (size_t i = 0; i < this->left_keys.size(); ++i) {
0606       this->key_cmp[i] = JoinKeyCmp::EQ;
0607     }
0608     this->filter = literal(true);
0609   }
0610   /// \brief create an instance from values using JoinKeyCmp::EQ for all comparisons
0611   HashJoinNodeOptions(
0612       JoinType join_type, std::vector<FieldRef> left_keys,
0613       std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
0614       std::vector<FieldRef> right_output, Expression filter = literal(true),
0615       std::string output_suffix_for_left = default_output_suffix_for_left,
0616       std::string output_suffix_for_right = default_output_suffix_for_right,
0617       bool disable_bloom_filter = false)
0618       : join_type(join_type),
0619         left_keys(std::move(left_keys)),
0620         right_keys(std::move(right_keys)),
0621         output_all(false),
0622         left_output(std::move(left_output)),
0623         right_output(std::move(right_output)),
0624         output_suffix_for_left(std::move(output_suffix_for_left)),
0625         output_suffix_for_right(std::move(output_suffix_for_right)),
0626         filter(std::move(filter)),
0627         disable_bloom_filter(disable_bloom_filter) {
0628     this->key_cmp.resize(this->left_keys.size());
0629     for (size_t i = 0; i < this->left_keys.size(); ++i) {
0630       this->key_cmp[i] = JoinKeyCmp::EQ;
0631     }
0632   }
0633   /// \brief create an instance from values
0634   HashJoinNodeOptions(
0635       JoinType join_type, std::vector<FieldRef> left_keys,
0636       std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
0637       std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp,
0638       Expression filter = literal(true),
0639       std::string output_suffix_for_left = default_output_suffix_for_left,
0640       std::string output_suffix_for_right = default_output_suffix_for_right,
0641       bool disable_bloom_filter = false)
0642       : join_type(join_type),
0643         left_keys(std::move(left_keys)),
0644         right_keys(std::move(right_keys)),
0645         output_all(false),
0646         left_output(std::move(left_output)),
0647         right_output(std::move(right_output)),
0648         key_cmp(std::move(key_cmp)),
0649         output_suffix_for_left(std::move(output_suffix_for_left)),
0650         output_suffix_for_right(std::move(output_suffix_for_right)),
0651         filter(std::move(filter)),
0652         disable_bloom_filter(disable_bloom_filter) {}
0653 
0654   HashJoinNodeOptions() = default;
0655 
0656   // type of join (inner, left, semi...)
0657   JoinType join_type = JoinType::INNER;
0658   // key fields from left input
0659   std::vector<FieldRef> left_keys;
0660   // key fields from right input
0661   std::vector<FieldRef> right_keys;
0662   // if set all valid fields from both left and right input will be output
0663   // (and field ref vectors for output fields will be ignored)
0664   bool output_all = false;
0665   // output fields passed from left input
0666   std::vector<FieldRef> left_output;
0667   // output fields passed from right input
0668   std::vector<FieldRef> right_output;
0669   // key comparison function (determines whether a null key is equal another null
0670   // key or not)
0671   std::vector<JoinKeyCmp> key_cmp;
0672   // suffix added to names of output fields coming from left input (used to distinguish,
0673   // if necessary, between fields of the same name in left and right input and can be left
0674   // empty if there are no name collisions)
0675   std::string output_suffix_for_left;
0676   // suffix added to names of output fields coming from right input
0677   std::string output_suffix_for_right;
0678   // residual filter which is applied to matching rows.  Rows that do not match
0679   // the filter are not included.  The filter is applied against the
0680   // concatenated input schema (left fields then right fields) and can reference
0681   // fields that are not included in the output.
0682   Expression filter = literal(true);
0683   // whether or not to disable Bloom filters in this join
0684   bool disable_bloom_filter = false;
0685 };
0686 
0687 /// \brief a node which implements the asof join operation
0688 ///
0689 /// Note, this API is experimental and will change in the future
0690 ///
0691 /// This node takes one left table and any number of right tables, and asof joins them
0692 /// together. Batches produced by each input must be ordered by the "on" key.
0693 /// This node will output one row for each row in the left table.
0694 class ARROW_ACERO_EXPORT AsofJoinNodeOptions : public ExecNodeOptions {
0695  public:
0696   /// \brief Keys for one input table of the AsofJoin operation
0697   ///
0698   /// The keys must be consistent across the input tables:
0699   /// Each "on" key must refer to a field of the same type and units across the tables.
0700   /// Each "by" key must refer to a list of fields of the same types across the tables.
0701   struct Keys {
0702     /// \brief "on" key for the join.
0703     ///
0704     /// The input table must be sorted by the "on" key. Must be a single field of a common
0705     /// type. Inexact match is used on the "on" key. i.e., a row is considered a match iff
0706     /// left_on - tolerance <= right_on <= left_on.
0707     /// Currently, the "on" key must be of an integer, date, or timestamp type.
0708     FieldRef on_key;
0709     /// \brief "by" key for the join.
0710     ///
0711     /// Each input table must have each field of the "by" key.  Exact equality is used for
0712     /// each field of the "by" key.
0713     /// Currently, each field of the "by" key must be of an integer, date, timestamp, or
0714     /// base-binary type.
0715     std::vector<FieldRef> by_key;
0716   };
0717 
0718   AsofJoinNodeOptions(std::vector<Keys> input_keys, int64_t tolerance)
0719       : input_keys(std::move(input_keys)), tolerance(tolerance) {}
0720 
0721   /// \brief AsofJoin keys per input table. At least two keys must be given. The first key
0722   /// corresponds to a left table and all other keys correspond to right tables for the
0723   /// as-of-join.
0724   ///
0725   /// \see `Keys` for details.
0726   std::vector<Keys> input_keys;
0727   /// \brief Tolerance for inexact "on" key matching. A right row is considered a match
0728   /// with the left row if `right.on - left.on <= tolerance`. The `tolerance` may be:
0729   /// - negative, in which case a past-as-of-join occurs;
0730   /// - or positive, in which case a future-as-of-join occurs;
0731   /// - or zero, in which case an exact-as-of-join occurs.
0732   ///
0733   /// The tolerance is interpreted in the same units as the "on" key.
0734   int64_t tolerance;
0735 };
0736 
0737 /// \brief a node which select top_k/bottom_k rows passed through it
0738 ///
0739 /// All batches pushed to this node will be accumulated, then selected, by the given
0740 /// fields. Then sorted batches will be forwarded to the generator in sorted order.
0741 class ARROW_ACERO_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
0742  public:
0743   explicit SelectKSinkNodeOptions(
0744       SelectKOptions select_k_options,
0745       std::function<Future<std::optional<ExecBatch>>()>* generator)
0746       : SinkNodeOptions(generator), select_k_options(std::move(select_k_options)) {}
0747 
0748   /// SelectK options
0749   SelectKOptions select_k_options;
0750 };
0751 
0752 /// \brief a sink node which accumulates all output into a table
0753 class ARROW_ACERO_EXPORT TableSinkNodeOptions : public ExecNodeOptions {
0754  public:
0755   /// \brief create an instance from values
0756   explicit TableSinkNodeOptions(std::shared_ptr<Table>* output_table,
0757                                 std::optional<bool> sequence_output = std::nullopt)
0758       : output_table(output_table), sequence_output(sequence_output) {}
0759 
0760   /// \brief an "out parameter" specifying the table that will be created
0761   ///
0762   /// Must not be null and remain valid for the entirety of the plan execution.  After the
0763   /// plan has completed this will be set to point to the result table
0764   std::shared_ptr<Table>* output_table;
0765   /// \brief Controls whether batches should be emitted immediately or sequenced in order
0766   ///
0767   /// \see QueryOptions for more details
0768   std::optional<bool> sequence_output;
0769   /// \brief Custom names to use for the columns.
0770   ///
0771   /// If specified then names must be provided for all fields. Currently, only a flat
0772   /// schema is supported (see GH-31875).
0773   ///
0774   /// If not specified then names will be generated based on the source data.
0775   std::vector<std::string> names;
0776 };
0777 
0778 /// \brief a row template that describes one row that will be generated for each input row
0779 struct ARROW_ACERO_EXPORT PivotLongerRowTemplate {
0780   PivotLongerRowTemplate(std::vector<std::string> feature_values,
0781                          std::vector<std::optional<FieldRef>> measurement_values)
0782       : feature_values(std::move(feature_values)),
0783         measurement_values(std::move(measurement_values)) {}
0784   /// A (typically unique) set of feature values for the template, usually derived from a
0785   /// column name
0786   ///
0787   /// These will be used to populate the feature columns
0788   std::vector<std::string> feature_values;
0789   /// The fields containing the measurements to use for this row
0790   ///
0791   /// These will be used to populate the measurement columns.  If nullopt then nulls
0792   /// will be inserted for the given value.
0793   std::vector<std::optional<FieldRef>> measurement_values;
0794 };
0795 
0796 /// \brief Reshape a table by turning some columns into additional rows
0797 ///
0798 /// This operation is sometimes also referred to as UNPIVOT
0799 ///
0800 /// This is typically done when there are multiple observations in each row in order to
0801 /// transform to a table containing a single observation per row.
0802 ///
0803 /// For example:
0804 ///
0805 /// | time | left_temp | right_temp |
0806 /// | ---- | --------- | ---------- |
0807 /// | 1    | 10        | 20         |
0808 /// | 2    | 15        | 18         |
0809 ///
0810 /// The above table contains two observations per row.  There is an implicit feature
0811 /// "location" (left vs right) and a measurement "temp".  What we really want is:
0812 ///
0813 /// | time | location | temp |
0814 /// | ---  | ---      | ---  |
0815 /// | 1    | left     | 10   |
0816 /// | 1    | right    | 20   |
0817 /// | 2    | left     | 15   |
0818 /// | 2    | right    | 18   |
0819 ///
0820 /// For a more complex example consider:
0821 ///
0822 /// | time | ax1 | ay1 | bx1 | ay2 |
0823 /// | ---- | --- | --- | --- | --- |
0824 /// | 0    | 1   | 2   | 3   | 4   |
0825 ///
0826 /// We can pretend a vs b and x vs y are features while 1 and 2 are two different
0827 /// kinds of measurements.  We thus want to pivot to
0828 ///
0829 /// | time | a/b | x/y |  f1  |  f2  |
0830 /// | ---- | --- | --- | ---- | ---- |
0831 /// | 0    | a   | x   | 1    | null |
0832 /// | 0    | a   | y   | 2    | 4    |
0833 /// | 0    | b   | x   | 3    | null |
0834 ///
0835 /// To do this we create a row template for each combination of features.  One should
0836 /// be able to do this purely by looking at the column names.  For example, given the
0837 /// above columns "ax1", "ay1", "bx1", and "ay2" we know we have three feature
0838 /// combinations (a, x), (a, y), and (b, x).  Similarly, we know we have two possible
0839 /// measurements, "1" and "2".
0840 ///
0841 /// For each combination of features we create a row template.  In each row template we
0842 /// describe the combination and then list which columns to use for the measurements.
0843 /// If a measurement doesn't exist for a given combination then we use nullopt.
0844 ///
0845 /// So, for our above example, we have:
0846 ///
0847 /// (a, x): names={"a", "x"}, values={"ax1", nullopt}
0848 /// (a, y): names={"a", "y"}, values={"ay1", "ay2"}
0849 /// (b, x): names={"b", "x"}, values={"bx1", nullopt}
0850 ///
0851 /// Finishing it off we name our new columns:
0852 /// feature_field_names={"a/b","x/y"}
0853 /// measurement_field_names={"f1", "f2"}
0854 class ARROW_ACERO_EXPORT PivotLongerNodeOptions : public ExecNodeOptions {
0855  public:
0856   static constexpr std::string_view kName = "pivot_longer";
0857   /// One or more row templates to create new output rows
0858   ///
0859   /// Normally there are at least two row templates.  The output # of rows
0860   /// will be the input # of rows * the number of row templates
0861   std::vector<PivotLongerRowTemplate> row_templates;
0862   /// The names of the columns which describe the new features
0863   std::vector<std::string> feature_field_names;
0864   /// The names of the columns which represent the measurements
0865   std::vector<std::string> measurement_field_names;
0866 };
0867 
0868 /// @}
0869 
0870 }  // namespace acero
0871 }  // namespace arrow