Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:58

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // This API is EXPERIMENTAL.
0019 
0020 #pragma once
0021 
0022 #include <functional>
0023 #include <memory>
0024 #include <string>
0025 #include <utility>
0026 #include <vector>
0027 
0028 #include "arrow/acero/options.h"
0029 #include "arrow/compute/expression.h"
0030 #include "arrow/compute/type_fwd.h"
0031 #include "arrow/dataset/dataset.h"
0032 #include "arrow/dataset/projector.h"
0033 #include "arrow/dataset/type_fwd.h"
0034 #include "arrow/dataset/visibility.h"
0035 #include "arrow/io/interfaces.h"
0036 #include "arrow/memory_pool.h"
0037 #include "arrow/type_fwd.h"
0038 #include "arrow/util/async_generator.h"
0039 #include "arrow/util/iterator.h"
0040 #include "arrow/util/thread_pool.h"
0041 #include "arrow/util/type_fwd.h"
0042 
0043 namespace arrow {
0044 
0045 using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;
0046 
0047 namespace dataset {
0048 
0049 /// \defgroup dataset-scanning Scanning API
0050 ///
0051 /// @{
0052 
0053 constexpr int64_t kDefaultBatchSize = 1 << 17;  // 128Ki rows
0054 // This will yield 64 batches ~ 8Mi rows
0055 constexpr int32_t kDefaultBatchReadahead = 16;
0056 constexpr int32_t kDefaultFragmentReadahead = 4;
0057 constexpr int32_t kDefaultBytesReadahead = 1 << 25;  // 32MiB
0058 
0059 /// Scan-specific options, which can be changed between scans of the same dataset.
0060 struct ARROW_DS_EXPORT ScanOptions {
0061   /// A row filter (which will be pushed down to partitioning/reading if supported).
0062   compute::Expression filter = compute::literal(true);
0063   /// A projection expression (which can add/remove/rename columns).
0064   compute::Expression projection;
0065 
0066   /// Schema with which batches will be read from fragments. This is also known as the
0067   /// "reader schema" it will be used (for example) in constructing CSV file readers to
0068   /// identify column types for parsing. Usually only a subset of its fields (see
0069   /// MaterializedFields) will be materialized during a scan.
0070   std::shared_ptr<Schema> dataset_schema;
0071 
0072   /// Schema of projected record batches. This is independent of dataset_schema as its
0073   /// fields are derived from the projection. For example, let
0074   ///
0075   ///   dataset_schema = {"a": int32, "b": int32, "id": utf8}
0076   ///   projection = project({equal(field_ref("a"), field_ref("b"))}, {"a_plus_b"})
0077   ///
0078   /// (no filter specified). In this case, the projected_schema would be
0079   ///
0080   ///   {"a_plus_b": int32}
0081   std::shared_ptr<Schema> projected_schema;
0082 
0083   /// Maximum row count for scanned batches.
0084   int64_t batch_size = kDefaultBatchSize;
0085 
0086   /// How many batches to read ahead within a fragment.
0087   ///
0088   /// Set to 0 to disable batch readahead
0089   ///
0090   /// Note: May not be supported by all formats
0091   /// Note: Will be ignored if use_threads is set to false
0092   int32_t batch_readahead = kDefaultBatchReadahead;
0093 
0094   /// How many files to read ahead
0095   ///
0096   /// Set to 0 to disable fragment readahead
0097   ///
0098   /// Note: May not be enforced by all scanners
0099   /// Note: Will be ignored if use_threads is set to false
0100   int32_t fragment_readahead = kDefaultFragmentReadahead;
0101 
0102   /// A pool from which materialized and scanned arrays will be allocated.
0103   MemoryPool* pool = arrow::default_memory_pool();
0104 
0105   /// IOContext for any IO tasks
0106   ///
0107   /// Note: The IOContext executor will be ignored if use_threads is set to false
0108   io::IOContext io_context;
0109 
0110   /// If true the scanner will scan in parallel
0111   ///
0112   /// Note: If true, this will use threads from both the cpu_executor and the
0113   /// io_context.executor
0114   /// Note: This  must be true in order for any readahead to happen
0115   bool use_threads = false;
0116 
0117   /// If true the scanner will add augmented fields to the output schema.
0118   bool add_augmented_fields = true;
0119 
0120   /// Fragment-specific scan options.
0121   std::shared_ptr<FragmentScanOptions> fragment_scan_options;
0122 
0123   /// Return a vector of FieldRefs that require materialization.
0124   ///
0125   /// This is usually the union of the fields referenced in the projection and the
0126   /// filter expression. Examples:
0127   ///
0128   /// - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"]
0129   /// - `SELECT a + b < 3 WHERE a > 1` => ["a", "b", "a"]
0130   ///
0131   /// This is needed for expression where a field may not be directly
0132   /// used in the final projection but is still required to evaluate the
0133   /// expression.
0134   ///
0135   /// This is used by Fragment implementations to apply the column
0136   /// sub-selection optimization.
0137   std::vector<FieldRef> MaterializedFields() const;
0138 
0139   /// Parameters which control when the plan should pause for a slow consumer
0140   acero::BackpressureOptions backpressure =
0141       acero::BackpressureOptions::DefaultBackpressure();
0142 };
0143 
0144 /// Scan-specific options, which can be changed between scans of the same dataset.
0145 ///
0146 /// A dataset consists of one or more individual fragments.  A fragment is anything
0147 /// that is independently scannable, often a file.
0148 ///
0149 /// Batches from all fragments will be converted to a single schema. This unified
0150 /// schema is referred to as the "dataset schema" and is the output schema for
0151 /// this node.
0152 ///
0153 /// Individual fragments may have schemas that are different from the dataset
0154 /// schema.  This is sometimes referred to as the physical or fragment schema.
0155 /// Conversion from the fragment schema to the dataset schema is a process
0156 /// known as evolution.
0157 struct ARROW_DS_EXPORT ScanV2Options : public acero::ExecNodeOptions {
0158   explicit ScanV2Options(std::shared_ptr<Dataset> dataset)
0159       : dataset(std::move(dataset)) {}
0160 
0161   /// \brief The dataset to scan
0162   std::shared_ptr<Dataset> dataset;
0163   /// \brief A row filter
0164   ///
0165   /// The filter expression should be written against the dataset schema.
0166   /// The filter must be unbound.
0167   ///
0168   /// This is an opportunistic pushdown filter.  Filtering capabilities will
0169   /// vary between formats.  If a format is not capable of applying the filter
0170   /// then it will ignore it.
0171   ///
0172   /// Each fragment will do its best to filter the data based on the information
0173   /// (partitioning guarantees, statistics) available to it.  If it is able to
0174   /// apply some filtering then it will indicate what filtering it was able to
0175   /// apply by attaching a guarantee to the batch.
0176   ///
0177   /// For example, if a filter is x < 50 && y > 40 then a batch may be able to
0178   /// apply a guarantee x < 50.  Post-scan filtering would then only need to
0179   /// consider y > 40 (for this specific batch).  The next batch may not be able
0180   /// to attach any guarantee and both clauses would need to be applied to that batch.
0181   ///
0182   /// A single guarantee-aware filtering operation should generally be applied to all
0183   /// resulting batches.  The scan node is not responsible for this.
0184   ///
0185   /// Fields that are referenced by the filter should be included in the `columns` vector.
0186   /// The scan node will not automatically fetch fields referenced by the filter
0187   /// expression. \see AddFieldsNeededForFilter
0188   ///
0189   /// If the filter references fields that are not included in `columns` this may or may
0190   /// not be an error, depending on the format.
0191   compute::Expression filter = compute::literal(true);
0192 
0193   /// \brief The columns to scan
0194   ///
0195   /// This is not a simple list of top-level column indices but instead a set of paths
0196   /// allowing for partial selection of columns
0197   ///
0198   /// These paths refer to the dataset schema
0199   ///
0200   /// For example, consider the following dataset schema:
0201   ///   schema({
0202   ///     field("score", int32()),
0203   ///           "marker", struct_({
0204   ///              field("color", utf8()),
0205   ///              field("location", struct_({
0206   ///                  field("x", float64()),
0207   ///                  field("y", float64())
0208   ///              })
0209   ///          })
0210   ///   })
0211   ///
0212   /// If `columns` is {{0}, {1,1,0}} then the output schema is:
0213   ///   schema({field("score", int32()), field("x", float64())})
0214   ///
0215   /// If `columns` is {{1,1,1}, {1,1}} then the output schema is:
0216   ///   schema({
0217   ///       field("y", float64()),
0218   ///       field("location", struct_({
0219   ///           field("x", float64()),
0220   ///           field("y", float64())
0221   ///       })
0222   ///   })
0223   std::vector<FieldPath> columns;
0224 
0225   /// \brief Target number of bytes to read ahead in a fragment
0226   ///
0227   /// This limit involves some amount of estimation.  Formats typically only know
0228   /// batch boundaries in terms of rows (not decoded bytes) and so an estimation
0229   /// must be done to guess the average row size.  Other formats like CSV and JSON
0230   /// must make even more generalized guesses.
0231   ///
0232   /// This is a best-effort guide.  Some formats may need to read ahead further,
0233   /// for example, if scanning a parquet file that has batches with 100MiB of data
0234   /// then the actual readahead will be at least 100MiB
0235   ///
0236   /// Set to 0 to disable readahead.  When disabled, the scanner will read the
0237   /// dataset one batch at a time
0238   ///
0239   /// This limit applies across all fragments.  If the limit is 32MiB and the
0240   /// fragment readahead allows for 20 fragments to be read at once then the
0241   /// total readahead will still be 32MiB and NOT 20 * 32MiB.
0242   int32_t target_bytes_readahead = kDefaultBytesReadahead;
0243 
0244   /// \brief Number of fragments to read ahead
0245   ///
0246   /// Higher readahead will potentially lead to more efficient I/O but will lead
0247   /// to the scan operation using more RAM.  The default is fairly conservative
0248   /// and designed for fast local disks (or slow local spinning disks which cannot
0249   /// handle much parallelism anyways).  When using a highly parallel remote filesystem
0250   /// you will likely want to increase these values.
0251   ///
0252   /// Set to 0 to disable fragment readahead.  When disabled the dataset will be scanned
0253   /// one fragment at a time.
0254   int32_t fragment_readahead = kDefaultFragmentReadahead;
0255   /// \brief Options specific to the file format
0256   const FragmentScanOptions* format_options = NULLPTR;
0257 
0258   /// \brief Utility method to get a selection representing all columns in a dataset
0259   static std::vector<FieldPath> AllColumns(const Schema& dataset_schema);
0260 
0261   /// \brief Utility method to add fields needed for the current filter
0262   ///
0263   /// This method adds any fields that are needed by `filter` which are not already
0264   /// included in the list of columns.  Any new fields added will be added to the end
0265   /// in no particular order.
0266   static Status AddFieldsNeededForFilter(ScanV2Options* options);
0267 };
0268 
0269 /// \brief Describes a projection
0270 struct ARROW_DS_EXPORT ProjectionDescr {
0271   /// \brief The projection expression itself
0272   /// This expression must be a call to make_struct
0273   compute::Expression expression;
0274   /// \brief The output schema of the projection.
0275 
0276   /// This can be calculated from the input schema and the expression but it
0277   /// is cached here for convenience.
0278   std::shared_ptr<Schema> schema;
0279 
0280   /// \brief Create a ProjectionDescr by binding an expression to the dataset schema
0281   ///
0282   /// expression must return a struct type
0283   static Result<ProjectionDescr> FromStructExpression(
0284       const compute::Expression& expression, const Schema& dataset_schema);
0285 
0286   /// \brief Create a ProjectionDescr from expressions/names for each field
0287   static Result<ProjectionDescr> FromExpressions(std::vector<compute::Expression> exprs,
0288                                                  std::vector<std::string> names,
0289                                                  const Schema& dataset_schema);
0290 
0291   /// \brief Create a default projection referencing fields in the dataset schema
0292   static Result<ProjectionDescr> FromNames(std::vector<std::string> names,
0293                                            const Schema& dataset_schema,
0294                                            bool add_augmented_fields = true);
0295 
0296   /// \brief Make a projection that projects every field in the dataset schema
0297   static Result<ProjectionDescr> Default(const Schema& dataset_schema,
0298                                          bool add_augmented_fields = true);
0299 };
0300 
0301 /// \brief Utility method to set the projection expression and schema
0302 ARROW_DS_EXPORT void SetProjection(ScanOptions* options, ProjectionDescr projection);
0303 
0304 /// \brief Combines a record batch with the fragment that the record batch originated
0305 /// from
0306 ///
0307 /// Knowing the source fragment can be useful for debugging & understanding loaded
0308 /// data
0309 struct TaggedRecordBatch {
0310   std::shared_ptr<RecordBatch> record_batch;
0311   std::shared_ptr<Fragment> fragment;
0312 };
0313 using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
0314 using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
0315 
0316 /// \brief Combines a tagged batch with positional information
0317 ///
0318 /// This is returned when scanning batches in an unordered fashion.  This information is
0319 /// needed if you ever want to reassemble the batches in order
0320 struct EnumeratedRecordBatch {
0321   Enumerated<std::shared_ptr<RecordBatch>> record_batch;
0322   Enumerated<std::shared_ptr<Fragment>> fragment;
0323 };
0324 using EnumeratedRecordBatchGenerator = std::function<Future<EnumeratedRecordBatch>()>;
0325 using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
0326 
0327 /// @}
0328 
0329 }  // namespace dataset
0330 
0331 template <>
0332 struct IterationTraits<dataset::TaggedRecordBatch> {
0333   static dataset::TaggedRecordBatch End() {
0334     return dataset::TaggedRecordBatch{NULLPTR, NULLPTR};
0335   }
0336   static bool IsEnd(const dataset::TaggedRecordBatch& val) {
0337     return val.record_batch == NULLPTR;
0338   }
0339 };
0340 
0341 template <>
0342 struct IterationTraits<dataset::EnumeratedRecordBatch> {
0343   static dataset::EnumeratedRecordBatch End() {
0344     return dataset::EnumeratedRecordBatch{
0345         IterationEnd<Enumerated<std::shared_ptr<RecordBatch>>>(),
0346         IterationEnd<Enumerated<std::shared_ptr<dataset::Fragment>>>()};
0347   }
0348   static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
0349     return IsIterationEnd(val.fragment);
0350   }
0351 };
0352 
0353 namespace dataset {
0354 
0355 /// \defgroup dataset-scanning Scanning API
0356 ///
0357 /// @{
0358 
0359 /// \brief A scanner glues together several dataset classes to load in data.
0360 /// The dataset contains a collection of fragments and partitioning rules.
0361 ///
0362 /// The fragments identify independently loadable units of data (i.e. each fragment has
0363 /// a potentially unique schema and possibly even format.  It should be possible to read
0364 /// fragments in parallel if desired).
0365 ///
0366 /// The fragment's format contains the logic necessary to actually create a task to load
0367 /// the fragment into memory.  That task may or may not support parallel execution of
0368 /// its own.
0369 ///
0370 /// The scanner is then responsible for creating scan tasks from every fragment in the
0371 /// dataset and (potentially) sequencing the loaded record batches together.
0372 ///
0373 /// The scanner should not buffer the entire dataset in memory (unless asked) instead
0374 /// yielding record batches as soon as they are ready to scan.  Various readahead
0375 /// properties control how much data is allowed to be scanned before pausing to let a
0376 /// slow consumer catchup.
0377 ///
0378 /// Today the scanner also handles projection & filtering although that may change in
0379 /// the future.
0380 class ARROW_DS_EXPORT Scanner {
0381  public:
0382   virtual ~Scanner() = default;
0383 
0384   /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple threads
0385   /// are used (via use_threads), the visitor will be invoked from those threads and is
0386   /// responsible for any synchronization.
0387   virtual Status Scan(std::function<Status(TaggedRecordBatch)> visitor) = 0;
0388   /// \brief Convert a Scanner into a Table.
0389   ///
0390   /// Use this convenience utility with care. This will serially materialize the
0391   /// Scan result in memory before creating the Table.
0392   virtual Result<std::shared_ptr<Table>> ToTable() = 0;
0393   /// \brief Scan the dataset into a stream of record batches.  Each batch is tagged
0394   /// with the fragment it originated from.  The batches will arrive in order.  The
0395   /// order of fragments is determined by the dataset.
0396   ///
0397   /// Note: The scanner will perform some readahead but will avoid materializing too
0398   /// much in memory (this is goverended by the readahead options and use_threads option).
0399   /// If the readahead queue fills up then I/O will pause until the calling thread catches
0400   /// up.
0401   virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0;
0402   virtual Result<TaggedRecordBatchGenerator> ScanBatchesAsync() = 0;
0403   virtual Result<TaggedRecordBatchGenerator> ScanBatchesAsync(
0404       ::arrow::internal::Executor* cpu_thread_pool) = 0;
0405   /// \brief Scan the dataset into a stream of record batches.  Unlike ScanBatches this
0406   /// method may allow record batches to be returned out of order.  This allows for more
0407   /// efficient scanning: some fragments may be accessed more quickly than others (e.g.
0408   /// may be cached in RAM or just happen to get scheduled earlier by the I/O)
0409   ///
0410   /// To make up for the out-of-order iteration each batch is further tagged with
0411   /// positional information.
0412   virtual Result<EnumeratedRecordBatchIterator> ScanBatchesUnordered() = 0;
0413   virtual Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync() = 0;
0414   virtual Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync(
0415       ::arrow::internal::Executor* cpu_thread_pool) = 0;
0416   /// \brief A convenience to synchronously load the given rows by index.
0417   ///
0418   /// Will only consume as many batches as needed from ScanBatches().
0419   virtual Result<std::shared_ptr<Table>> TakeRows(const Array& indices) = 0;
0420   /// \brief Get the first N rows.
0421   virtual Result<std::shared_ptr<Table>> Head(int64_t num_rows) = 0;
0422   /// \brief Count rows matching a predicate.
0423   ///
0424   /// This method will push down the predicate and compute the result based on fragment
0425   /// metadata if possible.
0426   virtual Result<int64_t> CountRows() = 0;
0427   virtual Future<int64_t> CountRowsAsync() = 0;
0428   /// \brief Convert the Scanner to a RecordBatchReader so it can be
0429   /// easily used with APIs that expect a reader.
0430   virtual Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader() = 0;
0431 
0432   /// \brief Get the options for this scan.
0433   const std::shared_ptr<ScanOptions>& options() const { return scan_options_; }
0434   /// \brief Get the dataset that this scanner will scan
0435   virtual const std::shared_ptr<Dataset>& dataset() const = 0;
0436 
0437  protected:
0438   explicit Scanner(std::shared_ptr<ScanOptions> scan_options)
0439       : scan_options_(std::move(scan_options)) {}
0440 
0441   Result<EnumeratedRecordBatchIterator> AddPositioningToInOrderScan(
0442       TaggedRecordBatchIterator scan);
0443 
0444   const std::shared_ptr<ScanOptions> scan_options_;
0445 };
0446 
0447 /// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
0448 /// to pass information, notably a potential filter expression and a subset of
0449 /// columns to materialize.
0450 class ARROW_DS_EXPORT ScannerBuilder {
0451  public:
0452   explicit ScannerBuilder(std::shared_ptr<Dataset> dataset);
0453 
0454   ScannerBuilder(std::shared_ptr<Dataset> dataset,
0455                  std::shared_ptr<ScanOptions> scan_options);
0456 
0457   ScannerBuilder(std::shared_ptr<Schema> schema, std::shared_ptr<Fragment> fragment,
0458                  std::shared_ptr<ScanOptions> scan_options);
0459 
0460   /// \brief Make a scanner from a record batch reader.
0461   ///
0462   /// The resulting scanner can be scanned only once. This is intended
0463   /// to support writing data from streaming sources or other sources
0464   /// that can be iterated only once.
0465   static std::shared_ptr<ScannerBuilder> FromRecordBatchReader(
0466       std::shared_ptr<RecordBatchReader> reader);
0467 
0468   /// \brief Set the subset of columns to materialize.
0469   ///
0470   /// Columns which are not referenced may not be read from fragments.
0471   ///
0472   /// \param[in] columns list of columns to project. Order and duplicates will
0473   ///            be preserved.
0474   ///
0475   /// \return Failure if any column name does not exists in the dataset's
0476   ///         Schema.
0477   Status Project(std::vector<std::string> columns);
0478 
0479   /// \brief Set expressions which will be evaluated to produce the materialized
0480   /// columns.
0481   ///
0482   /// Columns which are not referenced may not be read from fragments.
0483   ///
0484   /// \param[in] exprs expressions to evaluate to produce columns.
0485   /// \param[in] names list of names for the resulting columns.
0486   ///
0487   /// \return Failure if any referenced column does not exists in the dataset's
0488   ///         Schema.
0489   Status Project(std::vector<compute::Expression> exprs, std::vector<std::string> names);
0490 
0491   /// \brief Set the filter expression to return only rows matching the filter.
0492   ///
0493   /// The predicate will be passed down to Sources and corresponding
0494   /// Fragments to exploit predicate pushdown if possible using
0495   /// partition information or Fragment internal metadata, e.g. Parquet statistics.
0496   /// Columns which are not referenced may not be read from fragments.
0497   ///
0498   /// \param[in] filter expression to filter rows with.
0499   ///
0500   /// \return Failure if any referenced columns does not exist in the dataset's
0501   ///         Schema.
0502   Status Filter(const compute::Expression& filter);
0503 
0504   /// \brief Indicate if the Scanner should make use of the available
0505   ///        ThreadPool found in ScanOptions;
0506   Status UseThreads(bool use_threads = true);
0507 
0508   /// \brief Set the maximum number of rows per RecordBatch.
0509   ///
0510   /// \param[in] batch_size the maximum number of rows.
0511   /// \returns An error if the number for batch is not greater than 0.
0512   ///
0513   /// This option provides a control limiting the memory owned by any RecordBatch.
0514   Status BatchSize(int64_t batch_size);
0515 
0516   /// \brief Set the number of batches to read ahead within a fragment.
0517   ///
0518   /// \param[in] batch_readahead How many batches to read ahead within a fragment
0519   /// \returns an error if this number is less than 0.
0520   ///
0521   /// This option provides a control on the RAM vs I/O tradeoff.
0522   /// It might not be supported by all file formats, in which case it will
0523   /// simply be ignored.
0524   Status BatchReadahead(int32_t batch_readahead);
0525 
0526   /// \brief Set the number of fragments to read ahead
0527   ///
0528   /// \param[in] fragment_readahead How many fragments to read ahead
0529   /// \returns an error if this number is less than 0.
0530   ///
0531   /// This option provides a control on the RAM vs I/O tradeoff.
0532   Status FragmentReadahead(int32_t fragment_readahead);
0533 
0534   /// \brief Set the pool from which materialized and scanned arrays will be allocated.
0535   Status Pool(MemoryPool* pool);
0536 
0537   /// \brief Set fragment-specific scan options.
0538   Status FragmentScanOptions(std::shared_ptr<FragmentScanOptions> fragment_scan_options);
0539 
0540   /// \brief Override default backpressure configuration
0541   Status Backpressure(acero::BackpressureOptions backpressure);
0542 
0543   /// \brief Return the current scan options for the builder.
0544   Result<std::shared_ptr<ScanOptions>> GetScanOptions();
0545 
0546   /// \brief Return the constructed now-immutable Scanner object
0547   Result<std::shared_ptr<Scanner>> Finish();
0548 
0549   const std::shared_ptr<Schema>& schema() const;
0550   const std::shared_ptr<Schema>& projected_schema() const;
0551 
0552  private:
0553   std::shared_ptr<Dataset> dataset_;
0554   std::shared_ptr<ScanOptions> scan_options_ = std::make_shared<ScanOptions>();
0555 };
0556 
0557 /// \brief Construct a source ExecNode which yields batches from a dataset scan.
0558 ///
0559 /// Does not construct associated filter or project nodes.
0560 /// Yielded batches will be augmented with fragment/batch indices to enable stable
0561 /// ordering for simple ExecPlans.
0562 class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
0563  public:
0564   explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
0565                            std::shared_ptr<ScanOptions> scan_options,
0566                            bool require_sequenced_output = false)
0567       : dataset(std::move(dataset)),
0568         scan_options(std::move(scan_options)),
0569         require_sequenced_output(require_sequenced_output) {}
0570 
0571   std::shared_ptr<Dataset> dataset;
0572   std::shared_ptr<ScanOptions> scan_options;
0573   bool require_sequenced_output;
0574 };
0575 
0576 /// @}
0577 
0578 namespace internal {
0579 ARROW_DS_EXPORT void InitializeScanner(arrow::acero::ExecFactoryRegistry* registry);
0580 ARROW_DS_EXPORT void InitializeScannerV2(arrow::acero::ExecFactoryRegistry* registry);
0581 }  // namespace internal
0582 }  // namespace dataset
0583 }  // namespace arrow