![]() |
|
|||
File indexing completed on 2025-08-28 08:26:58
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 // This API is EXPERIMENTAL. 0019 0020 #pragma once 0021 0022 #include <functional> 0023 #include <memory> 0024 #include <string> 0025 #include <utility> 0026 #include <vector> 0027 0028 #include "arrow/acero/options.h" 0029 #include "arrow/compute/expression.h" 0030 #include "arrow/compute/type_fwd.h" 0031 #include "arrow/dataset/dataset.h" 0032 #include "arrow/dataset/projector.h" 0033 #include "arrow/dataset/type_fwd.h" 0034 #include "arrow/dataset/visibility.h" 0035 #include "arrow/io/interfaces.h" 0036 #include "arrow/memory_pool.h" 0037 #include "arrow/type_fwd.h" 0038 #include "arrow/util/async_generator.h" 0039 #include "arrow/util/iterator.h" 0040 #include "arrow/util/thread_pool.h" 0041 #include "arrow/util/type_fwd.h" 0042 0043 namespace arrow { 0044 0045 using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>; 0046 0047 namespace dataset { 0048 0049 /// \defgroup dataset-scanning Scanning API 0050 /// 0051 /// @{ 0052 0053 constexpr int64_t kDefaultBatchSize = 1 << 17; // 128Ki rows 0054 // This will yield 64 batches ~ 8Mi rows 0055 constexpr int32_t kDefaultBatchReadahead = 16; 0056 constexpr int32_t kDefaultFragmentReadahead = 4; 0057 constexpr int32_t kDefaultBytesReadahead = 1 << 25; // 32MiB 0058 0059 /// Scan-specific options, which can be changed between scans of the same dataset. 0060 struct ARROW_DS_EXPORT ScanOptions { 0061 /// A row filter (which will be pushed down to partitioning/reading if supported). 0062 compute::Expression filter = compute::literal(true); 0063 /// A projection expression (which can add/remove/rename columns). 0064 compute::Expression projection; 0065 0066 /// Schema with which batches will be read from fragments. This is also known as the 0067 /// "reader schema" it will be used (for example) in constructing CSV file readers to 0068 /// identify column types for parsing. Usually only a subset of its fields (see 0069 /// MaterializedFields) will be materialized during a scan. 0070 std::shared_ptr<Schema> dataset_schema; 0071 0072 /// Schema of projected record batches. This is independent of dataset_schema as its 0073 /// fields are derived from the projection. For example, let 0074 /// 0075 /// dataset_schema = {"a": int32, "b": int32, "id": utf8} 0076 /// projection = project({equal(field_ref("a"), field_ref("b"))}, {"a_plus_b"}) 0077 /// 0078 /// (no filter specified). In this case, the projected_schema would be 0079 /// 0080 /// {"a_plus_b": int32} 0081 std::shared_ptr<Schema> projected_schema; 0082 0083 /// Maximum row count for scanned batches. 0084 int64_t batch_size = kDefaultBatchSize; 0085 0086 /// How many batches to read ahead within a fragment. 0087 /// 0088 /// Set to 0 to disable batch readahead 0089 /// 0090 /// Note: May not be supported by all formats 0091 /// Note: Will be ignored if use_threads is set to false 0092 int32_t batch_readahead = kDefaultBatchReadahead; 0093 0094 /// How many files to read ahead 0095 /// 0096 /// Set to 0 to disable fragment readahead 0097 /// 0098 /// Note: May not be enforced by all scanners 0099 /// Note: Will be ignored if use_threads is set to false 0100 int32_t fragment_readahead = kDefaultFragmentReadahead; 0101 0102 /// A pool from which materialized and scanned arrays will be allocated. 0103 MemoryPool* pool = arrow::default_memory_pool(); 0104 0105 /// IOContext for any IO tasks 0106 /// 0107 /// Note: The IOContext executor will be ignored if use_threads is set to false 0108 io::IOContext io_context; 0109 0110 /// If true the scanner will scan in parallel 0111 /// 0112 /// Note: If true, this will use threads from both the cpu_executor and the 0113 /// io_context.executor 0114 /// Note: This must be true in order for any readahead to happen 0115 bool use_threads = false; 0116 0117 /// If true the scanner will add augmented fields to the output schema. 0118 bool add_augmented_fields = true; 0119 0120 /// Fragment-specific scan options. 0121 std::shared_ptr<FragmentScanOptions> fragment_scan_options; 0122 0123 /// Return a vector of FieldRefs that require materialization. 0124 /// 0125 /// This is usually the union of the fields referenced in the projection and the 0126 /// filter expression. Examples: 0127 /// 0128 /// - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"] 0129 /// - `SELECT a + b < 3 WHERE a > 1` => ["a", "b", "a"] 0130 /// 0131 /// This is needed for expression where a field may not be directly 0132 /// used in the final projection but is still required to evaluate the 0133 /// expression. 0134 /// 0135 /// This is used by Fragment implementations to apply the column 0136 /// sub-selection optimization. 0137 std::vector<FieldRef> MaterializedFields() const; 0138 0139 /// Parameters which control when the plan should pause for a slow consumer 0140 acero::BackpressureOptions backpressure = 0141 acero::BackpressureOptions::DefaultBackpressure(); 0142 }; 0143 0144 /// Scan-specific options, which can be changed between scans of the same dataset. 0145 /// 0146 /// A dataset consists of one or more individual fragments. A fragment is anything 0147 /// that is independently scannable, often a file. 0148 /// 0149 /// Batches from all fragments will be converted to a single schema. This unified 0150 /// schema is referred to as the "dataset schema" and is the output schema for 0151 /// this node. 0152 /// 0153 /// Individual fragments may have schemas that are different from the dataset 0154 /// schema. This is sometimes referred to as the physical or fragment schema. 0155 /// Conversion from the fragment schema to the dataset schema is a process 0156 /// known as evolution. 0157 struct ARROW_DS_EXPORT ScanV2Options : public acero::ExecNodeOptions { 0158 explicit ScanV2Options(std::shared_ptr<Dataset> dataset) 0159 : dataset(std::move(dataset)) {} 0160 0161 /// \brief The dataset to scan 0162 std::shared_ptr<Dataset> dataset; 0163 /// \brief A row filter 0164 /// 0165 /// The filter expression should be written against the dataset schema. 0166 /// The filter must be unbound. 0167 /// 0168 /// This is an opportunistic pushdown filter. Filtering capabilities will 0169 /// vary between formats. If a format is not capable of applying the filter 0170 /// then it will ignore it. 0171 /// 0172 /// Each fragment will do its best to filter the data based on the information 0173 /// (partitioning guarantees, statistics) available to it. If it is able to 0174 /// apply some filtering then it will indicate what filtering it was able to 0175 /// apply by attaching a guarantee to the batch. 0176 /// 0177 /// For example, if a filter is x < 50 && y > 40 then a batch may be able to 0178 /// apply a guarantee x < 50. Post-scan filtering would then only need to 0179 /// consider y > 40 (for this specific batch). The next batch may not be able 0180 /// to attach any guarantee and both clauses would need to be applied to that batch. 0181 /// 0182 /// A single guarantee-aware filtering operation should generally be applied to all 0183 /// resulting batches. The scan node is not responsible for this. 0184 /// 0185 /// Fields that are referenced by the filter should be included in the `columns` vector. 0186 /// The scan node will not automatically fetch fields referenced by the filter 0187 /// expression. \see AddFieldsNeededForFilter 0188 /// 0189 /// If the filter references fields that are not included in `columns` this may or may 0190 /// not be an error, depending on the format. 0191 compute::Expression filter = compute::literal(true); 0192 0193 /// \brief The columns to scan 0194 /// 0195 /// This is not a simple list of top-level column indices but instead a set of paths 0196 /// allowing for partial selection of columns 0197 /// 0198 /// These paths refer to the dataset schema 0199 /// 0200 /// For example, consider the following dataset schema: 0201 /// schema({ 0202 /// field("score", int32()), 0203 /// "marker", struct_({ 0204 /// field("color", utf8()), 0205 /// field("location", struct_({ 0206 /// field("x", float64()), 0207 /// field("y", float64()) 0208 /// }) 0209 /// }) 0210 /// }) 0211 /// 0212 /// If `columns` is {{0}, {1,1,0}} then the output schema is: 0213 /// schema({field("score", int32()), field("x", float64())}) 0214 /// 0215 /// If `columns` is {{1,1,1}, {1,1}} then the output schema is: 0216 /// schema({ 0217 /// field("y", float64()), 0218 /// field("location", struct_({ 0219 /// field("x", float64()), 0220 /// field("y", float64()) 0221 /// }) 0222 /// }) 0223 std::vector<FieldPath> columns; 0224 0225 /// \brief Target number of bytes to read ahead in a fragment 0226 /// 0227 /// This limit involves some amount of estimation. Formats typically only know 0228 /// batch boundaries in terms of rows (not decoded bytes) and so an estimation 0229 /// must be done to guess the average row size. Other formats like CSV and JSON 0230 /// must make even more generalized guesses. 0231 /// 0232 /// This is a best-effort guide. Some formats may need to read ahead further, 0233 /// for example, if scanning a parquet file that has batches with 100MiB of data 0234 /// then the actual readahead will be at least 100MiB 0235 /// 0236 /// Set to 0 to disable readahead. When disabled, the scanner will read the 0237 /// dataset one batch at a time 0238 /// 0239 /// This limit applies across all fragments. If the limit is 32MiB and the 0240 /// fragment readahead allows for 20 fragments to be read at once then the 0241 /// total readahead will still be 32MiB and NOT 20 * 32MiB. 0242 int32_t target_bytes_readahead = kDefaultBytesReadahead; 0243 0244 /// \brief Number of fragments to read ahead 0245 /// 0246 /// Higher readahead will potentially lead to more efficient I/O but will lead 0247 /// to the scan operation using more RAM. The default is fairly conservative 0248 /// and designed for fast local disks (or slow local spinning disks which cannot 0249 /// handle much parallelism anyways). When using a highly parallel remote filesystem 0250 /// you will likely want to increase these values. 0251 /// 0252 /// Set to 0 to disable fragment readahead. When disabled the dataset will be scanned 0253 /// one fragment at a time. 0254 int32_t fragment_readahead = kDefaultFragmentReadahead; 0255 /// \brief Options specific to the file format 0256 const FragmentScanOptions* format_options = NULLPTR; 0257 0258 /// \brief Utility method to get a selection representing all columns in a dataset 0259 static std::vector<FieldPath> AllColumns(const Schema& dataset_schema); 0260 0261 /// \brief Utility method to add fields needed for the current filter 0262 /// 0263 /// This method adds any fields that are needed by `filter` which are not already 0264 /// included in the list of columns. Any new fields added will be added to the end 0265 /// in no particular order. 0266 static Status AddFieldsNeededForFilter(ScanV2Options* options); 0267 }; 0268 0269 /// \brief Describes a projection 0270 struct ARROW_DS_EXPORT ProjectionDescr { 0271 /// \brief The projection expression itself 0272 /// This expression must be a call to make_struct 0273 compute::Expression expression; 0274 /// \brief The output schema of the projection. 0275 0276 /// This can be calculated from the input schema and the expression but it 0277 /// is cached here for convenience. 0278 std::shared_ptr<Schema> schema; 0279 0280 /// \brief Create a ProjectionDescr by binding an expression to the dataset schema 0281 /// 0282 /// expression must return a struct type 0283 static Result<ProjectionDescr> FromStructExpression( 0284 const compute::Expression& expression, const Schema& dataset_schema); 0285 0286 /// \brief Create a ProjectionDescr from expressions/names for each field 0287 static Result<ProjectionDescr> FromExpressions(std::vector<compute::Expression> exprs, 0288 std::vector<std::string> names, 0289 const Schema& dataset_schema); 0290 0291 /// \brief Create a default projection referencing fields in the dataset schema 0292 static Result<ProjectionDescr> FromNames(std::vector<std::string> names, 0293 const Schema& dataset_schema, 0294 bool add_augmented_fields = true); 0295 0296 /// \brief Make a projection that projects every field in the dataset schema 0297 static Result<ProjectionDescr> Default(const Schema& dataset_schema, 0298 bool add_augmented_fields = true); 0299 }; 0300 0301 /// \brief Utility method to set the projection expression and schema 0302 ARROW_DS_EXPORT void SetProjection(ScanOptions* options, ProjectionDescr projection); 0303 0304 /// \brief Combines a record batch with the fragment that the record batch originated 0305 /// from 0306 /// 0307 /// Knowing the source fragment can be useful for debugging & understanding loaded 0308 /// data 0309 struct TaggedRecordBatch { 0310 std::shared_ptr<RecordBatch> record_batch; 0311 std::shared_ptr<Fragment> fragment; 0312 }; 0313 using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>; 0314 using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>; 0315 0316 /// \brief Combines a tagged batch with positional information 0317 /// 0318 /// This is returned when scanning batches in an unordered fashion. This information is 0319 /// needed if you ever want to reassemble the batches in order 0320 struct EnumeratedRecordBatch { 0321 Enumerated<std::shared_ptr<RecordBatch>> record_batch; 0322 Enumerated<std::shared_ptr<Fragment>> fragment; 0323 }; 0324 using EnumeratedRecordBatchGenerator = std::function<Future<EnumeratedRecordBatch>()>; 0325 using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>; 0326 0327 /// @} 0328 0329 } // namespace dataset 0330 0331 template <> 0332 struct IterationTraits<dataset::TaggedRecordBatch> { 0333 static dataset::TaggedRecordBatch End() { 0334 return dataset::TaggedRecordBatch{NULLPTR, NULLPTR}; 0335 } 0336 static bool IsEnd(const dataset::TaggedRecordBatch& val) { 0337 return val.record_batch == NULLPTR; 0338 } 0339 }; 0340 0341 template <> 0342 struct IterationTraits<dataset::EnumeratedRecordBatch> { 0343 static dataset::EnumeratedRecordBatch End() { 0344 return dataset::EnumeratedRecordBatch{ 0345 IterationEnd<Enumerated<std::shared_ptr<RecordBatch>>>(), 0346 IterationEnd<Enumerated<std::shared_ptr<dataset::Fragment>>>()}; 0347 } 0348 static bool IsEnd(const dataset::EnumeratedRecordBatch& val) { 0349 return IsIterationEnd(val.fragment); 0350 } 0351 }; 0352 0353 namespace dataset { 0354 0355 /// \defgroup dataset-scanning Scanning API 0356 /// 0357 /// @{ 0358 0359 /// \brief A scanner glues together several dataset classes to load in data. 0360 /// The dataset contains a collection of fragments and partitioning rules. 0361 /// 0362 /// The fragments identify independently loadable units of data (i.e. each fragment has 0363 /// a potentially unique schema and possibly even format. It should be possible to read 0364 /// fragments in parallel if desired). 0365 /// 0366 /// The fragment's format contains the logic necessary to actually create a task to load 0367 /// the fragment into memory. That task may or may not support parallel execution of 0368 /// its own. 0369 /// 0370 /// The scanner is then responsible for creating scan tasks from every fragment in the 0371 /// dataset and (potentially) sequencing the loaded record batches together. 0372 /// 0373 /// The scanner should not buffer the entire dataset in memory (unless asked) instead 0374 /// yielding record batches as soon as they are ready to scan. Various readahead 0375 /// properties control how much data is allowed to be scanned before pausing to let a 0376 /// slow consumer catchup. 0377 /// 0378 /// Today the scanner also handles projection & filtering although that may change in 0379 /// the future. 0380 class ARROW_DS_EXPORT Scanner { 0381 public: 0382 virtual ~Scanner() = default; 0383 0384 /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple threads 0385 /// are used (via use_threads), the visitor will be invoked from those threads and is 0386 /// responsible for any synchronization. 0387 virtual Status Scan(std::function<Status(TaggedRecordBatch)> visitor) = 0; 0388 /// \brief Convert a Scanner into a Table. 0389 /// 0390 /// Use this convenience utility with care. This will serially materialize the 0391 /// Scan result in memory before creating the Table. 0392 virtual Result<std::shared_ptr<Table>> ToTable() = 0; 0393 /// \brief Scan the dataset into a stream of record batches. Each batch is tagged 0394 /// with the fragment it originated from. The batches will arrive in order. The 0395 /// order of fragments is determined by the dataset. 0396 /// 0397 /// Note: The scanner will perform some readahead but will avoid materializing too 0398 /// much in memory (this is goverended by the readahead options and use_threads option). 0399 /// If the readahead queue fills up then I/O will pause until the calling thread catches 0400 /// up. 0401 virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0; 0402 virtual Result<TaggedRecordBatchGenerator> ScanBatchesAsync() = 0; 0403 virtual Result<TaggedRecordBatchGenerator> ScanBatchesAsync( 0404 ::arrow::internal::Executor* cpu_thread_pool) = 0; 0405 /// \brief Scan the dataset into a stream of record batches. Unlike ScanBatches this 0406 /// method may allow record batches to be returned out of order. This allows for more 0407 /// efficient scanning: some fragments may be accessed more quickly than others (e.g. 0408 /// may be cached in RAM or just happen to get scheduled earlier by the I/O) 0409 /// 0410 /// To make up for the out-of-order iteration each batch is further tagged with 0411 /// positional information. 0412 virtual Result<EnumeratedRecordBatchIterator> ScanBatchesUnordered() = 0; 0413 virtual Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync() = 0; 0414 virtual Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync( 0415 ::arrow::internal::Executor* cpu_thread_pool) = 0; 0416 /// \brief A convenience to synchronously load the given rows by index. 0417 /// 0418 /// Will only consume as many batches as needed from ScanBatches(). 0419 virtual Result<std::shared_ptr<Table>> TakeRows(const Array& indices) = 0; 0420 /// \brief Get the first N rows. 0421 virtual Result<std::shared_ptr<Table>> Head(int64_t num_rows) = 0; 0422 /// \brief Count rows matching a predicate. 0423 /// 0424 /// This method will push down the predicate and compute the result based on fragment 0425 /// metadata if possible. 0426 virtual Result<int64_t> CountRows() = 0; 0427 virtual Future<int64_t> CountRowsAsync() = 0; 0428 /// \brief Convert the Scanner to a RecordBatchReader so it can be 0429 /// easily used with APIs that expect a reader. 0430 virtual Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader() = 0; 0431 0432 /// \brief Get the options for this scan. 0433 const std::shared_ptr<ScanOptions>& options() const { return scan_options_; } 0434 /// \brief Get the dataset that this scanner will scan 0435 virtual const std::shared_ptr<Dataset>& dataset() const = 0; 0436 0437 protected: 0438 explicit Scanner(std::shared_ptr<ScanOptions> scan_options) 0439 : scan_options_(std::move(scan_options)) {} 0440 0441 Result<EnumeratedRecordBatchIterator> AddPositioningToInOrderScan( 0442 TaggedRecordBatchIterator scan); 0443 0444 const std::shared_ptr<ScanOptions> scan_options_; 0445 }; 0446 0447 /// \brief ScannerBuilder is a factory class to construct a Scanner. It is used 0448 /// to pass information, notably a potential filter expression and a subset of 0449 /// columns to materialize. 0450 class ARROW_DS_EXPORT ScannerBuilder { 0451 public: 0452 explicit ScannerBuilder(std::shared_ptr<Dataset> dataset); 0453 0454 ScannerBuilder(std::shared_ptr<Dataset> dataset, 0455 std::shared_ptr<ScanOptions> scan_options); 0456 0457 ScannerBuilder(std::shared_ptr<Schema> schema, std::shared_ptr<Fragment> fragment, 0458 std::shared_ptr<ScanOptions> scan_options); 0459 0460 /// \brief Make a scanner from a record batch reader. 0461 /// 0462 /// The resulting scanner can be scanned only once. This is intended 0463 /// to support writing data from streaming sources or other sources 0464 /// that can be iterated only once. 0465 static std::shared_ptr<ScannerBuilder> FromRecordBatchReader( 0466 std::shared_ptr<RecordBatchReader> reader); 0467 0468 /// \brief Set the subset of columns to materialize. 0469 /// 0470 /// Columns which are not referenced may not be read from fragments. 0471 /// 0472 /// \param[in] columns list of columns to project. Order and duplicates will 0473 /// be preserved. 0474 /// 0475 /// \return Failure if any column name does not exists in the dataset's 0476 /// Schema. 0477 Status Project(std::vector<std::string> columns); 0478 0479 /// \brief Set expressions which will be evaluated to produce the materialized 0480 /// columns. 0481 /// 0482 /// Columns which are not referenced may not be read from fragments. 0483 /// 0484 /// \param[in] exprs expressions to evaluate to produce columns. 0485 /// \param[in] names list of names for the resulting columns. 0486 /// 0487 /// \return Failure if any referenced column does not exists in the dataset's 0488 /// Schema. 0489 Status Project(std::vector<compute::Expression> exprs, std::vector<std::string> names); 0490 0491 /// \brief Set the filter expression to return only rows matching the filter. 0492 /// 0493 /// The predicate will be passed down to Sources and corresponding 0494 /// Fragments to exploit predicate pushdown if possible using 0495 /// partition information or Fragment internal metadata, e.g. Parquet statistics. 0496 /// Columns which are not referenced may not be read from fragments. 0497 /// 0498 /// \param[in] filter expression to filter rows with. 0499 /// 0500 /// \return Failure if any referenced columns does not exist in the dataset's 0501 /// Schema. 0502 Status Filter(const compute::Expression& filter); 0503 0504 /// \brief Indicate if the Scanner should make use of the available 0505 /// ThreadPool found in ScanOptions; 0506 Status UseThreads(bool use_threads = true); 0507 0508 /// \brief Set the maximum number of rows per RecordBatch. 0509 /// 0510 /// \param[in] batch_size the maximum number of rows. 0511 /// \returns An error if the number for batch is not greater than 0. 0512 /// 0513 /// This option provides a control limiting the memory owned by any RecordBatch. 0514 Status BatchSize(int64_t batch_size); 0515 0516 /// \brief Set the number of batches to read ahead within a fragment. 0517 /// 0518 /// \param[in] batch_readahead How many batches to read ahead within a fragment 0519 /// \returns an error if this number is less than 0. 0520 /// 0521 /// This option provides a control on the RAM vs I/O tradeoff. 0522 /// It might not be supported by all file formats, in which case it will 0523 /// simply be ignored. 0524 Status BatchReadahead(int32_t batch_readahead); 0525 0526 /// \brief Set the number of fragments to read ahead 0527 /// 0528 /// \param[in] fragment_readahead How many fragments to read ahead 0529 /// \returns an error if this number is less than 0. 0530 /// 0531 /// This option provides a control on the RAM vs I/O tradeoff. 0532 Status FragmentReadahead(int32_t fragment_readahead); 0533 0534 /// \brief Set the pool from which materialized and scanned arrays will be allocated. 0535 Status Pool(MemoryPool* pool); 0536 0537 /// \brief Set fragment-specific scan options. 0538 Status FragmentScanOptions(std::shared_ptr<FragmentScanOptions> fragment_scan_options); 0539 0540 /// \brief Override default backpressure configuration 0541 Status Backpressure(acero::BackpressureOptions backpressure); 0542 0543 /// \brief Return the current scan options for the builder. 0544 Result<std::shared_ptr<ScanOptions>> GetScanOptions(); 0545 0546 /// \brief Return the constructed now-immutable Scanner object 0547 Result<std::shared_ptr<Scanner>> Finish(); 0548 0549 const std::shared_ptr<Schema>& schema() const; 0550 const std::shared_ptr<Schema>& projected_schema() const; 0551 0552 private: 0553 std::shared_ptr<Dataset> dataset_; 0554 std::shared_ptr<ScanOptions> scan_options_ = std::make_shared<ScanOptions>(); 0555 }; 0556 0557 /// \brief Construct a source ExecNode which yields batches from a dataset scan. 0558 /// 0559 /// Does not construct associated filter or project nodes. 0560 /// Yielded batches will be augmented with fragment/batch indices to enable stable 0561 /// ordering for simple ExecPlans. 0562 class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions { 0563 public: 0564 explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset, 0565 std::shared_ptr<ScanOptions> scan_options, 0566 bool require_sequenced_output = false) 0567 : dataset(std::move(dataset)), 0568 scan_options(std::move(scan_options)), 0569 require_sequenced_output(require_sequenced_output) {} 0570 0571 std::shared_ptr<Dataset> dataset; 0572 std::shared_ptr<ScanOptions> scan_options; 0573 bool require_sequenced_output; 0574 }; 0575 0576 /// @} 0577 0578 namespace internal { 0579 ARROW_DS_EXPORT void InitializeScanner(arrow::acero::ExecFactoryRegistry* registry); 0580 ARROW_DS_EXPORT void InitializeScannerV2(arrow::acero::ExecFactoryRegistry* registry); 0581 } // namespace internal 0582 } // namespace dataset 0583 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |