![]() |
|
|||
File indexing completed on 2025-08-28 08:26:53
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <functional> 0021 #include <memory> 0022 #include <optional> 0023 #include <string> 0024 #include <vector> 0025 0026 #include "arrow/acero/type_fwd.h" 0027 #include "arrow/acero/visibility.h" 0028 #include "arrow/compute/api_aggregate.h" 0029 #include "arrow/compute/api_vector.h" 0030 #include "arrow/compute/exec.h" 0031 #include "arrow/compute/expression.h" 0032 #include "arrow/record_batch.h" 0033 #include "arrow/result.h" 0034 #include "arrow/util/async_generator.h" 0035 #include "arrow/util/async_util.h" 0036 0037 namespace arrow { 0038 0039 using compute::Aggregate; 0040 using compute::ExecBatch; 0041 using compute::Expression; 0042 using compute::literal; 0043 using compute::Ordering; 0044 using compute::SelectKOptions; 0045 using compute::SortOptions; 0046 0047 namespace internal { 0048 0049 class Executor; 0050 0051 } // namespace internal 0052 0053 namespace acero { 0054 0055 /// \brief This must not be used in release-mode 0056 struct DebugOptions; 0057 0058 using AsyncExecBatchGenerator = AsyncGenerator<std::optional<ExecBatch>>; 0059 0060 /// \addtogroup acero-nodes 0061 /// @{ 0062 0063 /// \brief A base class for all options objects 0064 /// 0065 /// The only time this is used directly is when a node has no configuration 0066 class ARROW_ACERO_EXPORT ExecNodeOptions { 0067 public: 0068 virtual ~ExecNodeOptions() = default; 0069 0070 /// \brief This must not be used in release-mode 0071 std::shared_ptr<DebugOptions> debug_opts; 0072 }; 0073 0074 /// \brief A node representing a generic source of data for Acero 0075 /// 0076 /// The source node will start calling `generator` during StartProducing. An initial 0077 /// task will be created that will call `generator`. It will not call `generator` 0078 /// reentrantly. If the source can be read in parallel then those details should be 0079 /// encapsulated within `generator`. 0080 /// 0081 /// For each batch received a new task will be created to push that batch downstream. 0082 /// This task will slice smaller units of size `ExecPlan::kMaxBatchSize` from the 0083 /// parent batch and call InputReceived. Thus, if the `generator` yields a large 0084 /// batch it may result in several calls to InputReceived. 0085 /// 0086 /// The SourceNode will, by default, assign an implicit ordering to outgoing batches. 0087 /// This is valid as long as the generator generates batches in a deterministic fashion. 0088 /// Currently, the only way to override this is to subclass the SourceNode. 0089 /// 0090 /// This node is not generally used directly but can serve as the basis for various 0091 /// specialized nodes. 0092 class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions { 0093 public: 0094 /// Create an instance from values 0095 SourceNodeOptions(std::shared_ptr<Schema> output_schema, 0096 std::function<Future<std::optional<ExecBatch>>()> generator, 0097 Ordering ordering = Ordering::Unordered()) 0098 : output_schema(std::move(output_schema)), 0099 generator(std::move(generator)), 0100 ordering(std::move(ordering)) {} 0101 0102 /// \brief the schema for batches that will be generated by this source 0103 std::shared_ptr<Schema> output_schema; 0104 /// \brief an asynchronous stream of batches ending with std::nullopt 0105 std::function<Future<std::optional<ExecBatch>>()> generator; 0106 0107 Ordering ordering = Ordering::Unordered(); 0108 }; 0109 0110 /// \brief a node that generates data from a table already loaded in memory 0111 /// 0112 /// The table source node will slice off chunks, defined by `max_batch_size` 0113 /// for parallel processing. The table source node extends source node and so these 0114 /// chunks will be iteratively processed in small batches. \see SourceNodeOptions 0115 /// for details. 0116 class ARROW_ACERO_EXPORT TableSourceNodeOptions : public ExecNodeOptions { 0117 public: 0118 static constexpr int64_t kDefaultMaxBatchSize = 1 << 20; 0119 0120 /// Create an instance from values 0121 TableSourceNodeOptions(std::shared_ptr<Table> table, 0122 int64_t max_batch_size = kDefaultMaxBatchSize) 0123 : table(std::move(table)), max_batch_size(max_batch_size) {} 0124 0125 /// \brief a table which acts as the data source 0126 std::shared_ptr<Table> table; 0127 /// \brief size of batches to emit from this node 0128 /// If the table is larger the node will emit multiple batches from the 0129 /// the table to be processed in parallel. 0130 int64_t max_batch_size; 0131 }; 0132 0133 /// \brief define a lazily resolved Arrow table. 0134 /// 0135 /// The table uniquely identified by the names can typically be resolved at the time when 0136 /// the plan is to be consumed. 0137 /// 0138 /// This node is for serialization purposes only and can never be executed. 0139 class ARROW_ACERO_EXPORT NamedTableNodeOptions : public ExecNodeOptions { 0140 public: 0141 /// Create an instance from values 0142 NamedTableNodeOptions(std::vector<std::string> names, std::shared_ptr<Schema> schema) 0143 : names(std::move(names)), schema(std::move(schema)) {} 0144 0145 /// \brief the names to put in the serialized plan 0146 std::vector<std::string> names; 0147 /// \brief the output schema of the table 0148 std::shared_ptr<Schema> schema; 0149 }; 0150 0151 /// \brief a source node which feeds data from a synchronous iterator of batches 0152 /// 0153 /// ItMaker is a maker of an iterator of tabular data. 0154 /// 0155 /// The node can be configured to use an I/O executor. If set then each time the 0156 /// iterator is polled a new I/O thread task will be created to do the polling. This 0157 /// allows a blocking iterator to stay off the CPU thread pool. 0158 template <typename ItMaker> 0159 class ARROW_ACERO_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions { 0160 public: 0161 /// Create an instance that will create a new task on io_executor for each iteration 0162 SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker, 0163 arrow::internal::Executor* io_executor) 0164 : schema(std::move(schema)), 0165 it_maker(std::move(it_maker)), 0166 io_executor(io_executor), 0167 requires_io(true) {} 0168 0169 /// Create an instance that will either iterate synchronously or use the default I/O 0170 /// executor 0171 SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker, 0172 bool requires_io = false) 0173 : schema(std::move(schema)), 0174 it_maker(std::move(it_maker)), 0175 io_executor(NULLPTR), 0176 requires_io(requires_io) {} 0177 0178 /// \brief The schema of the record batches from the iterator 0179 std::shared_ptr<Schema> schema; 0180 0181 /// \brief A maker of an iterator which acts as the data source 0182 ItMaker it_maker; 0183 0184 /// \brief The executor to use for scanning the iterator 0185 /// 0186 /// Defaults to the default I/O executor. Only used if requires_io is true. 0187 /// If requires_io is false then this MUST be nullptr. 0188 arrow::internal::Executor* io_executor; 0189 0190 /// \brief If true then items will be fetched from the iterator on a dedicated I/O 0191 /// thread to keep I/O off the CPU thread 0192 bool requires_io; 0193 }; 0194 0195 /// a source node that reads from a RecordBatchReader 0196 /// 0197 /// Each iteration of the RecordBatchReader will be run on a new thread task created 0198 /// on the I/O thread pool. 0199 class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions { 0200 public: 0201 /// Create an instance from values 0202 RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader, 0203 arrow::internal::Executor* io_executor = NULLPTR) 0204 : reader(std::move(reader)), io_executor(io_executor) {} 0205 0206 /// \brief The RecordBatchReader which acts as the data source 0207 std::shared_ptr<RecordBatchReader> reader; 0208 0209 /// \brief The executor to use for the reader 0210 /// 0211 /// Defaults to the default I/O executor. 0212 arrow::internal::Executor* io_executor; 0213 }; 0214 0215 /// a source node that reads from an iterator of array vectors 0216 using ArrayVectorIteratorMaker = std::function<Iterator<std::shared_ptr<ArrayVector>>()>; 0217 /// \brief An extended Source node which accepts a schema and array-vectors 0218 class ARROW_ACERO_EXPORT ArrayVectorSourceNodeOptions 0219 : public SchemaSourceNodeOptions<ArrayVectorIteratorMaker> { 0220 using SchemaSourceNodeOptions::SchemaSourceNodeOptions; 0221 }; 0222 0223 /// a source node that reads from an iterator of ExecBatch 0224 using ExecBatchIteratorMaker = std::function<Iterator<std::shared_ptr<ExecBatch>>()>; 0225 /// \brief An extended Source node which accepts a schema and exec-batches 0226 class ARROW_ACERO_EXPORT ExecBatchSourceNodeOptions 0227 : public SchemaSourceNodeOptions<ExecBatchIteratorMaker> { 0228 public: 0229 using SchemaSourceNodeOptions::SchemaSourceNodeOptions; 0230 ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema, 0231 std::vector<ExecBatch> batches, 0232 ::arrow::internal::Executor* io_executor); 0233 ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema, 0234 std::vector<ExecBatch> batches, bool requires_io = false); 0235 }; 0236 0237 using RecordBatchIteratorMaker = std::function<Iterator<std::shared_ptr<RecordBatch>>()>; 0238 /// a source node that reads from an iterator of RecordBatch 0239 class ARROW_ACERO_EXPORT RecordBatchSourceNodeOptions 0240 : public SchemaSourceNodeOptions<RecordBatchIteratorMaker> { 0241 using SchemaSourceNodeOptions::SchemaSourceNodeOptions; 0242 }; 0243 0244 /// \brief a node which excludes some rows from batches passed through it 0245 /// 0246 /// filter_expression will be evaluated against each batch which is pushed to 0247 /// this node. Any rows for which filter_expression does not evaluate to `true` will be 0248 /// excluded in the batch emitted by this node. 0249 /// 0250 /// This node will emit empty batches if all rows are excluded. This is done 0251 /// to avoid gaps in the ordering. 0252 class ARROW_ACERO_EXPORT FilterNodeOptions : public ExecNodeOptions { 0253 public: 0254 /// \brief create an instance from values 0255 explicit FilterNodeOptions(Expression filter_expression) 0256 : filter_expression(std::move(filter_expression)) {} 0257 0258 /// \brief the expression to filter batches 0259 /// 0260 /// The return type of this expression must be boolean 0261 Expression filter_expression; 0262 }; 0263 0264 /// \brief a node which selects a specified subset from the input 0265 class ARROW_ACERO_EXPORT FetchNodeOptions : public ExecNodeOptions { 0266 public: 0267 static constexpr std::string_view kName = "fetch"; 0268 /// \brief create an instance from values 0269 FetchNodeOptions(int64_t offset, int64_t count) : offset(offset), count(count) {} 0270 /// \brief the number of rows to skip 0271 int64_t offset; 0272 /// \brief the number of rows to keep (not counting skipped rows) 0273 int64_t count; 0274 }; 0275 0276 /// \brief a node which executes expressions on input batches, producing batches 0277 /// of the same length with new columns. 0278 /// 0279 /// Each expression will be evaluated against each batch which is pushed to 0280 /// this node to produce a corresponding output column. 0281 /// 0282 /// If names are not provided, the string representations of exprs will be used. 0283 class ARROW_ACERO_EXPORT ProjectNodeOptions : public ExecNodeOptions { 0284 public: 0285 /// \brief create an instance from values 0286 explicit ProjectNodeOptions(std::vector<Expression> expressions, 0287 std::vector<std::string> names = {}) 0288 : expressions(std::move(expressions)), names(std::move(names)) {} 0289 0290 /// \brief the expressions to run on the batches 0291 /// 0292 /// The output will have one column for each expression. If you wish to keep any of 0293 /// the columns from the input then you should create a simple field_ref expression 0294 /// for that column. 0295 std::vector<Expression> expressions; 0296 /// \brief the names of the output columns 0297 /// 0298 /// If this is not specified then the result of calling ToString on the expression will 0299 /// be used instead 0300 /// 0301 /// This list should either be empty or have the same length as `expressions` 0302 std::vector<std::string> names; 0303 }; 0304 0305 /// \brief a node which aggregates input batches and calculates summary statistics 0306 /// 0307 /// The node can summarize the entire input or it can group the input with grouping keys 0308 /// and segment keys. 0309 /// 0310 /// By default, the aggregate node is a pipeline breaker. It must accumulate all input 0311 /// before any output is produced. Segment keys are a performance optimization. If 0312 /// you know your input is already partitioned by one or more columns then you can 0313 /// specify these as segment keys. At each change in the segment keys the node will 0314 /// emit values for all data seen so far. 0315 /// 0316 /// Segment keys are currently limited to single-threaded mode. 0317 /// 0318 /// Both keys and segment-keys determine the group. However segment-keys are also used 0319 /// for determining grouping segments, which should be large, and allow streaming a 0320 /// partial aggregation result after processing each segment. One common use-case for 0321 /// segment-keys is ordered aggregation, in which the segment-key attribute specifies a 0322 /// column with non-decreasing values or a lexicographically-ordered set of such columns. 0323 /// 0324 /// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is 0325 /// expected to be a HashAggregate function. If the keys attribute is an empty vector, 0326 /// then each aggregate is assumed to be a ScalarAggregate function. 0327 /// 0328 /// If the segment_keys attribute is a non-empty vector, then segmented aggregation, as 0329 /// described above, applies. 0330 /// 0331 /// The keys and segment_keys vectors must be disjoint. 0332 /// 0333 /// If no measures are provided then you will simply get the list of unique keys. 0334 /// 0335 /// This node outputs segment keys first, followed by regular keys, followed by one 0336 /// column for each aggregate. 0337 class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions { 0338 public: 0339 /// \brief create an instance from values 0340 explicit AggregateNodeOptions(std::vector<Aggregate> aggregates, 0341 std::vector<FieldRef> keys = {}, 0342 std::vector<FieldRef> segment_keys = {}) 0343 : aggregates(std::move(aggregates)), 0344 keys(std::move(keys)), 0345 segment_keys(std::move(segment_keys)) {} 0346 0347 // aggregations which will be applied to the targeted fields 0348 std::vector<Aggregate> aggregates; 0349 // keys by which aggregations will be grouped (optional) 0350 std::vector<FieldRef> keys; 0351 // keys by which aggregations will be segmented (optional) 0352 std::vector<FieldRef> segment_keys; 0353 }; 0354 0355 /// \brief a default value at which backpressure will be applied 0356 constexpr int32_t kDefaultBackpressureHighBytes = 1 << 30; // 1GiB 0357 /// \brief a default value at which backpressure will be removed 0358 constexpr int32_t kDefaultBackpressureLowBytes = 1 << 28; // 256MiB 0359 0360 /// \brief an interface that can be queried for backpressure statistics 0361 class ARROW_ACERO_EXPORT BackpressureMonitor { 0362 public: 0363 virtual ~BackpressureMonitor() = default; 0364 /// \brief fetches the number of bytes currently queued up 0365 virtual uint64_t bytes_in_use() = 0; 0366 /// \brief checks to see if backpressure is currently applied 0367 virtual bool is_paused() = 0; 0368 }; 0369 0370 /// \brief Options to control backpressure behavior 0371 struct ARROW_ACERO_EXPORT BackpressureOptions { 0372 /// \brief Create default options that perform no backpressure 0373 BackpressureOptions() : resume_if_below(0), pause_if_above(0) {} 0374 /// \brief Create options that will perform backpressure 0375 /// 0376 /// \param resume_if_below The producer should resume producing if the backpressure 0377 /// queue has fewer than resume_if_below items. 0378 /// \param pause_if_above The producer should pause producing if the backpressure 0379 /// queue has more than pause_if_above items 0380 BackpressureOptions(uint64_t resume_if_below, uint64_t pause_if_above) 0381 : resume_if_below(resume_if_below), pause_if_above(pause_if_above) {} 0382 0383 /// \brief create an instance using default values for backpressure limits 0384 static BackpressureOptions DefaultBackpressure() { 0385 return BackpressureOptions(kDefaultBackpressureLowBytes, 0386 kDefaultBackpressureHighBytes); 0387 } 0388 0389 /// \brief helper method to determine if backpressure is disabled 0390 /// \return true if pause_if_above is greater than zero, false otherwise 0391 bool should_apply_backpressure() const { return pause_if_above > 0; } 0392 0393 /// \brief the number of bytes at which the producer should resume producing 0394 uint64_t resume_if_below; 0395 /// \brief the number of bytes at which the producer should pause producing 0396 /// 0397 /// If this is <= 0 then backpressure will be disabled 0398 uint64_t pause_if_above; 0399 }; 0400 0401 /// \brief a sink node which collects results in a queue 0402 /// 0403 /// Emitted batches will only be ordered if there is a meaningful ordering 0404 /// and sequence_output is not set to false. 0405 class ARROW_ACERO_EXPORT SinkNodeOptions : public ExecNodeOptions { 0406 public: 0407 explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>* generator, 0408 std::shared_ptr<Schema>* schema, 0409 BackpressureOptions backpressure = {}, 0410 BackpressureMonitor** backpressure_monitor = NULLPTR, 0411 std::optional<bool> sequence_output = std::nullopt) 0412 : generator(generator), 0413 schema(schema), 0414 backpressure(backpressure), 0415 backpressure_monitor(backpressure_monitor), 0416 sequence_output(sequence_output) {} 0417 0418 explicit SinkNodeOptions(std::function<Future<std::optional<ExecBatch>>()>* generator, 0419 BackpressureOptions backpressure = {}, 0420 BackpressureMonitor** backpressure_monitor = NULLPTR, 0421 std::optional<bool> sequence_output = std::nullopt) 0422 : generator(generator), 0423 schema(NULLPTR), 0424 backpressure(std::move(backpressure)), 0425 backpressure_monitor(backpressure_monitor), 0426 sequence_output(sequence_output) {} 0427 0428 /// \brief A pointer to a generator of batches. 0429 /// 0430 /// This will be set when the node is added to the plan and should be used to consume 0431 /// data from the plan. If this function is not called frequently enough then the sink 0432 /// node will start to accumulate data and may apply backpressure. 0433 std::function<Future<std::optional<ExecBatch>>()>* generator; 0434 /// \brief A pointer which will be set to the schema of the generated batches 0435 /// 0436 /// This is optional, if nullptr is passed in then it will be ignored. 0437 /// This will be set when the node is added to the plan, before StartProducing is called 0438 std::shared_ptr<Schema>* schema; 0439 /// \brief Options to control when to apply backpressure 0440 /// 0441 /// This is optional, the default is to never apply backpressure. If the plan is not 0442 /// consumed quickly enough the system may eventually run out of memory. 0443 BackpressureOptions backpressure; 0444 /// \brief A pointer to a backpressure monitor 0445 /// 0446 /// This will be set when the node is added to the plan. This can be used to inspect 0447 /// the amount of data currently queued in the sink node. This is an optional utility 0448 /// and backpressure can be applied even if this is not used. 0449 BackpressureMonitor** backpressure_monitor; 0450 /// \brief Controls whether batches should be emitted immediately or sequenced in order 0451 /// 0452 /// \see QueryOptions for more details 0453 std::optional<bool> sequence_output; 0454 }; 0455 0456 /// \brief Control used by a SinkNodeConsumer to pause & resume 0457 /// 0458 /// Callers should ensure that they do not call Pause and Resume simultaneously and they 0459 /// should sequence things so that a call to Pause() is always followed by an eventual 0460 /// call to Resume() 0461 class ARROW_ACERO_EXPORT BackpressureControl { 0462 public: 0463 virtual ~BackpressureControl() = default; 0464 /// \brief Ask the input to pause 0465 /// 0466 /// This is best effort, batches may continue to arrive 0467 /// Must eventually be followed by a call to Resume() or deadlock will occur 0468 virtual void Pause() = 0; 0469 /// \brief Ask the input to resume 0470 virtual void Resume() = 0; 0471 }; 0472 0473 /// \brief a sink node that consumes the data as part of the plan using callbacks 0474 class ARROW_ACERO_EXPORT SinkNodeConsumer { 0475 public: 0476 virtual ~SinkNodeConsumer() = default; 0477 /// \brief Prepare any consumer state 0478 /// 0479 /// This will be run once the schema is finalized as the plan is starting and 0480 /// before any calls to Consume. A common use is to save off the schema so that 0481 /// batches can be interpreted. 0482 virtual Status Init(const std::shared_ptr<Schema>& schema, 0483 BackpressureControl* backpressure_control, ExecPlan* plan) = 0; 0484 /// \brief Consume a batch of data 0485 virtual Status Consume(ExecBatch batch) = 0; 0486 /// \brief Signal to the consumer that the last batch has been delivered 0487 /// 0488 /// The returned future should only finish when all outstanding tasks have completed 0489 /// 0490 /// If the plan is ended early or aborts due to an error then this will not be 0491 /// called. 0492 virtual Future<> Finish() = 0; 0493 }; 0494 0495 /// \brief Add a sink node which consumes data within the exec plan run 0496 class ARROW_ACERO_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions { 0497 public: 0498 explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer, 0499 std::vector<std::string> names = {}, 0500 std::optional<bool> sequence_output = std::nullopt) 0501 : consumer(std::move(consumer)), 0502 names(std::move(names)), 0503 sequence_output(sequence_output) {} 0504 0505 std::shared_ptr<SinkNodeConsumer> consumer; 0506 /// \brief Names to rename the sink's schema fields to 0507 /// 0508 /// If specified then names must be provided for all fields. Currently, only a flat 0509 /// schema is supported (see GH-31875). 0510 /// 0511 /// If not specified then names will be generated based on the source data. 0512 std::vector<std::string> names; 0513 /// \brief Controls whether batches should be emitted immediately or sequenced in order 0514 /// 0515 /// \see QueryOptions for more details 0516 std::optional<bool> sequence_output; 0517 }; 0518 0519 /// \brief Make a node which sorts rows passed through it 0520 /// 0521 /// All batches pushed to this node will be accumulated, then sorted, by the given 0522 /// fields. Then sorted batches will be forwarded to the generator in sorted order. 0523 class ARROW_ACERO_EXPORT OrderBySinkNodeOptions : public SinkNodeOptions { 0524 public: 0525 /// \brief create an instance from values 0526 explicit OrderBySinkNodeOptions( 0527 SortOptions sort_options, 0528 std::function<Future<std::optional<ExecBatch>>()>* generator) 0529 : SinkNodeOptions(generator), sort_options(std::move(sort_options)) {} 0530 0531 /// \brief options describing which columns and direction to sort 0532 SortOptions sort_options; 0533 }; 0534 0535 /// \brief Apply a new ordering to data 0536 /// 0537 /// Currently this node works by accumulating all data, sorting, and then emitting 0538 /// the new data with an updated batch index. 0539 /// 0540 /// Larger-than-memory sort is not currently supported. 0541 class ARROW_ACERO_EXPORT OrderByNodeOptions : public ExecNodeOptions { 0542 public: 0543 static constexpr std::string_view kName = "order_by"; 0544 explicit OrderByNodeOptions(Ordering ordering) : ordering(std::move(ordering)) {} 0545 0546 /// \brief The new ordering to apply to outgoing data 0547 Ordering ordering; 0548 }; 0549 0550 enum class JoinType { 0551 LEFT_SEMI, 0552 RIGHT_SEMI, 0553 LEFT_ANTI, 0554 RIGHT_ANTI, 0555 INNER, 0556 LEFT_OUTER, 0557 RIGHT_OUTER, 0558 FULL_OUTER 0559 }; 0560 0561 std::string ToString(JoinType t); 0562 0563 enum class JoinKeyCmp { EQ, IS }; 0564 0565 /// \brief a node which implements a join operation using a hash table 0566 class ARROW_ACERO_EXPORT HashJoinNodeOptions : public ExecNodeOptions { 0567 public: 0568 static constexpr const char* default_output_suffix_for_left = ""; 0569 static constexpr const char* default_output_suffix_for_right = ""; 0570 /// \brief create an instance from values that outputs all columns 0571 HashJoinNodeOptions( 0572 JoinType in_join_type, std::vector<FieldRef> in_left_keys, 0573 std::vector<FieldRef> in_right_keys, Expression filter = literal(true), 0574 std::string output_suffix_for_left = default_output_suffix_for_left, 0575 std::string output_suffix_for_right = default_output_suffix_for_right, 0576 bool disable_bloom_filter = false) 0577 : join_type(in_join_type), 0578 left_keys(std::move(in_left_keys)), 0579 right_keys(std::move(in_right_keys)), 0580 output_all(true), 0581 output_suffix_for_left(std::move(output_suffix_for_left)), 0582 output_suffix_for_right(std::move(output_suffix_for_right)), 0583 filter(std::move(filter)), 0584 disable_bloom_filter(disable_bloom_filter) { 0585 this->key_cmp.resize(this->left_keys.size()); 0586 for (size_t i = 0; i < this->left_keys.size(); ++i) { 0587 this->key_cmp[i] = JoinKeyCmp::EQ; 0588 } 0589 } 0590 /// \brief create an instance from keys 0591 /// 0592 /// This will create an inner join that outputs all columns and has no post join filter 0593 /// 0594 /// `in_left_keys` should have the same length and types as `in_right_keys` 0595 /// @param in_left_keys the keys in the left input 0596 /// @param in_right_keys the keys in the right input 0597 HashJoinNodeOptions(std::vector<FieldRef> in_left_keys, 0598 std::vector<FieldRef> in_right_keys) 0599 : left_keys(std::move(in_left_keys)), right_keys(std::move(in_right_keys)) { 0600 this->join_type = JoinType::INNER; 0601 this->output_all = true; 0602 this->output_suffix_for_left = default_output_suffix_for_left; 0603 this->output_suffix_for_right = default_output_suffix_for_right; 0604 this->key_cmp.resize(this->left_keys.size()); 0605 for (size_t i = 0; i < this->left_keys.size(); ++i) { 0606 this->key_cmp[i] = JoinKeyCmp::EQ; 0607 } 0608 this->filter = literal(true); 0609 } 0610 /// \brief create an instance from values using JoinKeyCmp::EQ for all comparisons 0611 HashJoinNodeOptions( 0612 JoinType join_type, std::vector<FieldRef> left_keys, 0613 std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, 0614 std::vector<FieldRef> right_output, Expression filter = literal(true), 0615 std::string output_suffix_for_left = default_output_suffix_for_left, 0616 std::string output_suffix_for_right = default_output_suffix_for_right, 0617 bool disable_bloom_filter = false) 0618 : join_type(join_type), 0619 left_keys(std::move(left_keys)), 0620 right_keys(std::move(right_keys)), 0621 output_all(false), 0622 left_output(std::move(left_output)), 0623 right_output(std::move(right_output)), 0624 output_suffix_for_left(std::move(output_suffix_for_left)), 0625 output_suffix_for_right(std::move(output_suffix_for_right)), 0626 filter(std::move(filter)), 0627 disable_bloom_filter(disable_bloom_filter) { 0628 this->key_cmp.resize(this->left_keys.size()); 0629 for (size_t i = 0; i < this->left_keys.size(); ++i) { 0630 this->key_cmp[i] = JoinKeyCmp::EQ; 0631 } 0632 } 0633 /// \brief create an instance from values 0634 HashJoinNodeOptions( 0635 JoinType join_type, std::vector<FieldRef> left_keys, 0636 std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output, 0637 std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp, 0638 Expression filter = literal(true), 0639 std::string output_suffix_for_left = default_output_suffix_for_left, 0640 std::string output_suffix_for_right = default_output_suffix_for_right, 0641 bool disable_bloom_filter = false) 0642 : join_type(join_type), 0643 left_keys(std::move(left_keys)), 0644 right_keys(std::move(right_keys)), 0645 output_all(false), 0646 left_output(std::move(left_output)), 0647 right_output(std::move(right_output)), 0648 key_cmp(std::move(key_cmp)), 0649 output_suffix_for_left(std::move(output_suffix_for_left)), 0650 output_suffix_for_right(std::move(output_suffix_for_right)), 0651 filter(std::move(filter)), 0652 disable_bloom_filter(disable_bloom_filter) {} 0653 0654 HashJoinNodeOptions() = default; 0655 0656 // type of join (inner, left, semi...) 0657 JoinType join_type = JoinType::INNER; 0658 // key fields from left input 0659 std::vector<FieldRef> left_keys; 0660 // key fields from right input 0661 std::vector<FieldRef> right_keys; 0662 // if set all valid fields from both left and right input will be output 0663 // (and field ref vectors for output fields will be ignored) 0664 bool output_all = false; 0665 // output fields passed from left input 0666 std::vector<FieldRef> left_output; 0667 // output fields passed from right input 0668 std::vector<FieldRef> right_output; 0669 // key comparison function (determines whether a null key is equal another null 0670 // key or not) 0671 std::vector<JoinKeyCmp> key_cmp; 0672 // suffix added to names of output fields coming from left input (used to distinguish, 0673 // if necessary, between fields of the same name in left and right input and can be left 0674 // empty if there are no name collisions) 0675 std::string output_suffix_for_left; 0676 // suffix added to names of output fields coming from right input 0677 std::string output_suffix_for_right; 0678 // residual filter which is applied to matching rows. Rows that do not match 0679 // the filter are not included. The filter is applied against the 0680 // concatenated input schema (left fields then right fields) and can reference 0681 // fields that are not included in the output. 0682 Expression filter = literal(true); 0683 // whether or not to disable Bloom filters in this join 0684 bool disable_bloom_filter = false; 0685 }; 0686 0687 /// \brief a node which implements the asof join operation 0688 /// 0689 /// Note, this API is experimental and will change in the future 0690 /// 0691 /// This node takes one left table and any number of right tables, and asof joins them 0692 /// together. Batches produced by each input must be ordered by the "on" key. 0693 /// This node will output one row for each row in the left table. 0694 class ARROW_ACERO_EXPORT AsofJoinNodeOptions : public ExecNodeOptions { 0695 public: 0696 /// \brief Keys for one input table of the AsofJoin operation 0697 /// 0698 /// The keys must be consistent across the input tables: 0699 /// Each "on" key must refer to a field of the same type and units across the tables. 0700 /// Each "by" key must refer to a list of fields of the same types across the tables. 0701 struct Keys { 0702 /// \brief "on" key for the join. 0703 /// 0704 /// The input table must be sorted by the "on" key. Must be a single field of a common 0705 /// type. Inexact match is used on the "on" key. i.e., a row is considered a match iff 0706 /// left_on - tolerance <= right_on <= left_on. 0707 /// Currently, the "on" key must be of an integer, date, or timestamp type. 0708 FieldRef on_key; 0709 /// \brief "by" key for the join. 0710 /// 0711 /// Each input table must have each field of the "by" key. Exact equality is used for 0712 /// each field of the "by" key. 0713 /// Currently, each field of the "by" key must be of an integer, date, timestamp, or 0714 /// base-binary type. 0715 std::vector<FieldRef> by_key; 0716 }; 0717 0718 AsofJoinNodeOptions(std::vector<Keys> input_keys, int64_t tolerance) 0719 : input_keys(std::move(input_keys)), tolerance(tolerance) {} 0720 0721 /// \brief AsofJoin keys per input table. At least two keys must be given. The first key 0722 /// corresponds to a left table and all other keys correspond to right tables for the 0723 /// as-of-join. 0724 /// 0725 /// \see `Keys` for details. 0726 std::vector<Keys> input_keys; 0727 /// \brief Tolerance for inexact "on" key matching. A right row is considered a match 0728 /// with the left row if `right.on - left.on <= tolerance`. The `tolerance` may be: 0729 /// - negative, in which case a past-as-of-join occurs; 0730 /// - or positive, in which case a future-as-of-join occurs; 0731 /// - or zero, in which case an exact-as-of-join occurs. 0732 /// 0733 /// The tolerance is interpreted in the same units as the "on" key. 0734 int64_t tolerance; 0735 }; 0736 0737 /// \brief a node which select top_k/bottom_k rows passed through it 0738 /// 0739 /// All batches pushed to this node will be accumulated, then selected, by the given 0740 /// fields. Then sorted batches will be forwarded to the generator in sorted order. 0741 class ARROW_ACERO_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { 0742 public: 0743 explicit SelectKSinkNodeOptions( 0744 SelectKOptions select_k_options, 0745 std::function<Future<std::optional<ExecBatch>>()>* generator) 0746 : SinkNodeOptions(generator), select_k_options(std::move(select_k_options)) {} 0747 0748 /// SelectK options 0749 SelectKOptions select_k_options; 0750 }; 0751 0752 /// \brief a sink node which accumulates all output into a table 0753 class ARROW_ACERO_EXPORT TableSinkNodeOptions : public ExecNodeOptions { 0754 public: 0755 /// \brief create an instance from values 0756 explicit TableSinkNodeOptions(std::shared_ptr<Table>* output_table, 0757 std::optional<bool> sequence_output = std::nullopt) 0758 : output_table(output_table), sequence_output(sequence_output) {} 0759 0760 /// \brief an "out parameter" specifying the table that will be created 0761 /// 0762 /// Must not be null and remain valid for the entirety of the plan execution. After the 0763 /// plan has completed this will be set to point to the result table 0764 std::shared_ptr<Table>* output_table; 0765 /// \brief Controls whether batches should be emitted immediately or sequenced in order 0766 /// 0767 /// \see QueryOptions for more details 0768 std::optional<bool> sequence_output; 0769 /// \brief Custom names to use for the columns. 0770 /// 0771 /// If specified then names must be provided for all fields. Currently, only a flat 0772 /// schema is supported (see GH-31875). 0773 /// 0774 /// If not specified then names will be generated based on the source data. 0775 std::vector<std::string> names; 0776 }; 0777 0778 /// \brief a row template that describes one row that will be generated for each input row 0779 struct ARROW_ACERO_EXPORT PivotLongerRowTemplate { 0780 PivotLongerRowTemplate(std::vector<std::string> feature_values, 0781 std::vector<std::optional<FieldRef>> measurement_values) 0782 : feature_values(std::move(feature_values)), 0783 measurement_values(std::move(measurement_values)) {} 0784 /// A (typically unique) set of feature values for the template, usually derived from a 0785 /// column name 0786 /// 0787 /// These will be used to populate the feature columns 0788 std::vector<std::string> feature_values; 0789 /// The fields containing the measurements to use for this row 0790 /// 0791 /// These will be used to populate the measurement columns. If nullopt then nulls 0792 /// will be inserted for the given value. 0793 std::vector<std::optional<FieldRef>> measurement_values; 0794 }; 0795 0796 /// \brief Reshape a table by turning some columns into additional rows 0797 /// 0798 /// This operation is sometimes also referred to as UNPIVOT 0799 /// 0800 /// This is typically done when there are multiple observations in each row in order to 0801 /// transform to a table containing a single observation per row. 0802 /// 0803 /// For example: 0804 /// 0805 /// | time | left_temp | right_temp | 0806 /// | ---- | --------- | ---------- | 0807 /// | 1 | 10 | 20 | 0808 /// | 2 | 15 | 18 | 0809 /// 0810 /// The above table contains two observations per row. There is an implicit feature 0811 /// "location" (left vs right) and a measurement "temp". What we really want is: 0812 /// 0813 /// | time | location | temp | 0814 /// | --- | --- | --- | 0815 /// | 1 | left | 10 | 0816 /// | 1 | right | 20 | 0817 /// | 2 | left | 15 | 0818 /// | 2 | right | 18 | 0819 /// 0820 /// For a more complex example consider: 0821 /// 0822 /// | time | ax1 | ay1 | bx1 | ay2 | 0823 /// | ---- | --- | --- | --- | --- | 0824 /// | 0 | 1 | 2 | 3 | 4 | 0825 /// 0826 /// We can pretend a vs b and x vs y are features while 1 and 2 are two different 0827 /// kinds of measurements. We thus want to pivot to 0828 /// 0829 /// | time | a/b | x/y | f1 | f2 | 0830 /// | ---- | --- | --- | ---- | ---- | 0831 /// | 0 | a | x | 1 | null | 0832 /// | 0 | a | y | 2 | 4 | 0833 /// | 0 | b | x | 3 | null | 0834 /// 0835 /// To do this we create a row template for each combination of features. One should 0836 /// be able to do this purely by looking at the column names. For example, given the 0837 /// above columns "ax1", "ay1", "bx1", and "ay2" we know we have three feature 0838 /// combinations (a, x), (a, y), and (b, x). Similarly, we know we have two possible 0839 /// measurements, "1" and "2". 0840 /// 0841 /// For each combination of features we create a row template. In each row template we 0842 /// describe the combination and then list which columns to use for the measurements. 0843 /// If a measurement doesn't exist for a given combination then we use nullopt. 0844 /// 0845 /// So, for our above example, we have: 0846 /// 0847 /// (a, x): names={"a", "x"}, values={"ax1", nullopt} 0848 /// (a, y): names={"a", "y"}, values={"ay1", "ay2"} 0849 /// (b, x): names={"b", "x"}, values={"bx1", nullopt} 0850 /// 0851 /// Finishing it off we name our new columns: 0852 /// feature_field_names={"a/b","x/y"} 0853 /// measurement_field_names={"f1", "f2"} 0854 class ARROW_ACERO_EXPORT PivotLongerNodeOptions : public ExecNodeOptions { 0855 public: 0856 static constexpr std::string_view kName = "pivot_longer"; 0857 /// One or more row templates to create new output rows 0858 /// 0859 /// Normally there are at least two row templates. The output # of rows 0860 /// will be the input # of rows * the number of row templates 0861 std::vector<PivotLongerRowTemplate> row_templates; 0862 /// The names of the columns which describe the new features 0863 std::vector<std::string> feature_field_names; 0864 /// The names of the columns which represent the measurements 0865 std::vector<std::string> measurement_field_names; 0866 }; 0867 0868 /// @} 0869 0870 } // namespace acero 0871 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |