Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:57

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // This API is EXPERIMENTAL.
0019 
0020 #pragma once
0021 
0022 #include <functional>
0023 #include <memory>
0024 #include <optional>
0025 #include <string>
0026 #include <utility>
0027 #include <vector>
0028 
0029 #include "arrow/compute/expression.h"
0030 #include "arrow/dataset/type_fwd.h"
0031 #include "arrow/dataset/visibility.h"
0032 #include "arrow/util/async_generator_fwd.h"
0033 #include "arrow/util/future.h"
0034 #include "arrow/util/macros.h"
0035 #include "arrow/util/mutex.h"
0036 
0037 namespace arrow {
0038 
0039 namespace internal {
0040 class Executor;
0041 }  // namespace internal
0042 
0043 namespace dataset {
0044 
0045 using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;
0046 
0047 /// \brief Description of a column to scan
0048 struct ARROW_DS_EXPORT FragmentSelectionColumn {
0049   /// \brief The path to the column to load
0050   FieldPath path;
0051   /// \brief The type of the column in the dataset schema
0052   ///
0053   /// A format may choose to ignore this field completely.  For example, when
0054   /// reading from IPC the reader can just return the column in the data type
0055   /// that is stored on disk.  There is no point in doing anything special.
0056   ///
0057   /// However, some formats may be capable of casting on the fly.  For example,
0058   /// when reading from CSV, if we know the target type of the column, we can
0059   /// convert from string to the target type as we read.
0060   DataType* requested_type;
0061 };
0062 
0063 /// \brief A list of columns that should be loaded from a fragment
0064 ///
0065 /// The paths in this selection should be referring to the fragment schema.  This class
0066 /// contains a virtual destructor as it is expected evolution strategies will need to
0067 /// extend this to add any information needed to later evolve the batches.
0068 ///
0069 /// For example, in the basic evolution strategy, we keep track of which columns
0070 /// were missing from the file so that we can fill those in with null when evolving.
0071 class ARROW_DS_EXPORT FragmentSelection {
0072  public:
0073   explicit FragmentSelection(std::vector<FragmentSelectionColumn> columns)
0074       : columns_(std::move(columns)) {}
0075   virtual ~FragmentSelection() = default;
0076   /// The columns that should be loaded from the fragment
0077   const std::vector<FragmentSelectionColumn>& columns() const { return columns_; }
0078 
0079  private:
0080   std::vector<FragmentSelectionColumn> columns_;
0081 };
0082 
0083 /// \brief Instructions for scanning a particular fragment
0084 ///
0085 /// The fragment scan request is derived from ScanV2Options.  The main
0086 /// difference is that the scan options are based on the dataset schema
0087 /// while the fragment request is based on the fragment schema.
0088 struct ARROW_DS_EXPORT FragmentScanRequest {
0089   /// \brief A row filter
0090   ///
0091   /// The filter expression should be written against the fragment schema.
0092   ///
0093   /// \see ScanV2Options for details on how this filter should be applied
0094   compute::Expression filter = compute::literal(true);
0095 
0096   /// \brief The columns to scan
0097   ///
0098   /// These indices refer to the fragment schema
0099   ///
0100   /// Note: This is NOT a simple list of top-level column indices.
0101   /// For more details \see ScanV2Options
0102   ///
0103   /// If possible a fragment should only read from disk the data needed
0104   /// to satisfy these columns.  If a format cannot partially read a nested
0105   /// column (e.g. JSON) then it must apply the column selection (in memory)
0106   /// before returning the scanned batch.
0107   std::shared_ptr<FragmentSelection> fragment_selection;
0108   /// \brief Options specific to the format being scanned
0109   const FragmentScanOptions* format_scan_options;
0110 };
0111 
0112 /// \brief An iterator-like object that can yield batches created from a fragment
0113 class ARROW_DS_EXPORT FragmentScanner {
0114  public:
0115   /// This instance will only be destroyed after all ongoing scan futures
0116   /// have been completed.
0117   ///
0118   /// This means any callbacks created as part of the scan can safely
0119   /// capture `this`
0120   virtual ~FragmentScanner() = default;
0121   /// \brief Scan a batch of data from the file
0122   /// \param batch_number The index of the batch to read
0123   virtual Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) = 0;
0124   /// \brief Calculate an estimate of how many data bytes the given batch will represent
0125   ///
0126   /// "Data bytes" should be the total size of all the buffers once the data has been
0127   /// decoded into the Arrow format.
0128   virtual int64_t EstimatedDataBytes(int batch_number) = 0;
0129   /// \brief The number of batches in the fragment to scan
0130   virtual int NumBatches() = 0;
0131 };
0132 
0133 /// \brief Information learned about a fragment through inspection
0134 ///
0135 /// This information can be used to figure out which fields need
0136 /// to be read from a file and how the data read in should be evolved
0137 /// to match the dataset schema.
0138 ///
0139 /// For example, from a CSV file we can inspect and learn the column
0140 /// names and use those column names to determine which columns to load
0141 /// from the CSV file.
0142 struct ARROW_DS_EXPORT InspectedFragment {
0143   explicit InspectedFragment(std::vector<std::string> column_names)
0144       : column_names(std::move(column_names)) {}
0145   std::vector<std::string> column_names;
0146 };
0147 
0148 /// \brief A granular piece of a Dataset, such as an individual file.
0149 ///
0150 /// A Fragment can be read/scanned separately from other fragments. It yields a
0151 /// collection of RecordBatches when scanned
0152 ///
0153 /// Note that Fragments have well defined physical schemas which are reconciled by
0154 /// the Datasets which contain them; these physical schemas may differ from a parent
0155 /// Dataset's schema and the physical schemas of sibling Fragments.
0156 class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
0157  public:
0158   /// \brief An expression that represents no known partition information
0159   static const compute::Expression kNoPartitionInformation;
0160 
0161   /// \brief Return the physical schema of the Fragment.
0162   ///
0163   /// The physical schema is also called the writer schema.
0164   /// This method is blocking and may suffer from high latency filesystem.
0165   /// The schema is cached after being read once, or may be specified at construction.
0166   Result<std::shared_ptr<Schema>> ReadPhysicalSchema();
0167 
0168   /// An asynchronous version of Scan
0169   virtual Result<RecordBatchGenerator> ScanBatchesAsync(
0170       const std::shared_ptr<ScanOptions>& options) = 0;
0171 
0172   /// \brief Inspect a fragment to learn basic information
0173   ///
0174   /// This will be called before a scan and a fragment should attach whatever
0175   /// information will be needed to figure out an evolution strategy.  This information
0176   /// will then be passed to the call to BeginScan
0177   virtual Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0178       const FragmentScanOptions* format_options, compute::ExecContext* exec_context);
0179 
0180   /// \brief Start a scan operation
0181   virtual Future<std::shared_ptr<FragmentScanner>> BeginScan(
0182       const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0183       const FragmentScanOptions* format_options, compute::ExecContext* exec_context);
0184 
0185   /// \brief Count the number of rows in this fragment matching the filter using metadata
0186   /// only. That is, this method may perform I/O, but will not load data.
0187   ///
0188   /// If this is not possible, resolve with an empty optional. The fragment can perform
0189   /// I/O (e.g. to read metadata) before it deciding whether it can satisfy the request.
0190   virtual Future<std::optional<int64_t>> CountRows(
0191       compute::Expression predicate, const std::shared_ptr<ScanOptions>& options);
0192 
0193   virtual std::string type_name() const = 0;
0194   virtual std::string ToString() const { return type_name(); }
0195 
0196   /// \brief An expression which evaluates to true for all data viewed by this
0197   /// Fragment.
0198   const compute::Expression& partition_expression() const {
0199     return partition_expression_;
0200   }
0201 
0202   virtual ~Fragment() = default;
0203 
0204  protected:
0205   Fragment() = default;
0206   explicit Fragment(compute::Expression partition_expression,
0207                     std::shared_ptr<Schema> physical_schema);
0208 
0209   virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0;
0210 
0211   util::Mutex physical_schema_mutex_;
0212   compute::Expression partition_expression_ = compute::literal(true);
0213   std::shared_ptr<Schema> physical_schema_;
0214 };
0215 
0216 /// \brief Per-scan options for fragment(s) in a dataset.
0217 ///
0218 /// These options are not intrinsic to the format or fragment itself, but do affect
0219 /// the results of a scan. These are options which make sense to change between
0220 /// repeated reads of the same dataset, such as format-specific conversion options
0221 /// (that do not affect the schema).
0222 ///
0223 /// \ingroup dataset-scanning
0224 class ARROW_DS_EXPORT FragmentScanOptions {
0225  public:
0226   virtual std::string type_name() const = 0;
0227   virtual std::string ToString() const { return type_name(); }
0228   virtual ~FragmentScanOptions() = default;
0229 };
0230 
0231 /// \defgroup dataset-implementations Concrete implementations
0232 ///
0233 /// @{
0234 
0235 /// \brief A trivial Fragment that yields ScanTask out of a fixed set of
0236 /// RecordBatch.
0237 class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
0238  public:
0239   class Scanner;
0240   InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
0241                    compute::Expression = compute::literal(true));
0242   explicit InMemoryFragment(RecordBatchVector record_batches,
0243                             compute::Expression = compute::literal(true));
0244 
0245   Result<RecordBatchGenerator> ScanBatchesAsync(
0246       const std::shared_ptr<ScanOptions>& options) override;
0247   Future<std::optional<int64_t>> CountRows(
0248       compute::Expression predicate,
0249       const std::shared_ptr<ScanOptions>& options) override;
0250 
0251   Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0252       const FragmentScanOptions* format_options,
0253       compute::ExecContext* exec_context) override;
0254   Future<std::shared_ptr<FragmentScanner>> BeginScan(
0255       const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0256       const FragmentScanOptions* format_options,
0257       compute::ExecContext* exec_context) override;
0258 
0259   std::string type_name() const override { return "in-memory"; }
0260 
0261  protected:
0262   Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
0263 
0264   RecordBatchVector record_batches_;
0265 };
0266 
0267 /// @}
0268 
0269 using FragmentGenerator = AsyncGenerator<std::shared_ptr<Fragment>>;
0270 
0271 /// \brief Rules for converting the dataset schema to and from fragment schemas
0272 class ARROW_DS_EXPORT FragmentEvolutionStrategy {
0273  public:
0274   /// This instance will only be destroyed when all scan operations for the
0275   /// fragment have completed.
0276   virtual ~FragmentEvolutionStrategy() = default;
0277   /// \brief A guarantee that applies to all batches of this fragment
0278   ///
0279   /// For example, if a fragment is missing one of the fields in the dataset
0280   /// schema then a typical evolution strategy is to set that field to null.
0281   ///
0282   /// So if the column at index 3 is missing then the guarantee is
0283   /// FieldRef(3) == null
0284   ///
0285   /// Individual field guarantees should be AND'd together and returned
0286   /// as a single expression.
0287   virtual Result<compute::Expression> GetGuarantee(
0288       const std::vector<FieldPath>& dataset_schema_selection) const = 0;
0289 
0290   /// \brief Return a fragment schema selection given a dataset schema selection
0291   ///
0292   /// For example, if the user wants fields 2 & 4 of the dataset schema and
0293   /// in this fragment the field 2 is missing and the field 4 is at index 1 then
0294   /// this should return {1}
0295   virtual Result<std::unique_ptr<FragmentSelection>> DevolveSelection(
0296       const std::vector<FieldPath>& dataset_schema_selection) const = 0;
0297 
0298   /// \brief Return a filter expression bound to the fragment schema given
0299   ///        a filter expression bound to the dataset schema
0300   ///
0301   /// The dataset scan filter will first be simplified by the guarantee returned
0302   /// by GetGuarantee.  This means an evolution that only handles dropping or casting
0303   /// fields doesn't need to do anything here except return the given filter.
0304   ///
0305   /// On the other hand, an evolution that is doing some kind of aliasing will likely
0306   /// need to convert field references in the filter to the aliased field references
0307   /// where appropriate.
0308   virtual Result<compute::Expression> DevolveFilter(
0309       const compute::Expression& filter) const = 0;
0310 
0311   /// \brief Convert a batch from the fragment schema to the dataset schema
0312   ///
0313   /// Typically this involves casting columns from the data type stored on disk
0314   /// to the data type of the dataset schema.  For example, this fragment might
0315   /// have columns stored as int32 and the dataset schema might have int64 for
0316   /// the column.  In this case we should cast the column from int32 to int64.
0317   ///
0318   /// Note: A fragment may perform this cast as the data is read from disk.  In
0319   /// that case a cast might not be needed.
0320   virtual Result<compute::ExecBatch> EvolveBatch(
0321       const std::shared_ptr<RecordBatch>& batch,
0322       const std::vector<FieldPath>& dataset_selection,
0323       const FragmentSelection& selection) const = 0;
0324 
0325   /// \brief Return a string description of this strategy
0326   virtual std::string ToString() const = 0;
0327 };
0328 
0329 /// \brief Lookup to create a FragmentEvolutionStrategy for a given fragment
0330 class ARROW_DS_EXPORT DatasetEvolutionStrategy {
0331  public:
0332   virtual ~DatasetEvolutionStrategy() = default;
0333   /// \brief Create a strategy for evolving from the given fragment
0334   ///        to the schema of the given dataset
0335   virtual std::unique_ptr<FragmentEvolutionStrategy> GetStrategy(
0336       const Dataset& dataset, const Fragment& fragment,
0337       const InspectedFragment& inspected_fragment) = 0;
0338 
0339   /// \brief Return a string description of this strategy
0340   virtual std::string ToString() const = 0;
0341 };
0342 
0343 ARROW_DS_EXPORT std::unique_ptr<DatasetEvolutionStrategy>
0344 MakeBasicDatasetEvolutionStrategy();
0345 
0346 /// \brief A container of zero or more Fragments.
0347 ///
0348 /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
0349 /// directory. A Dataset has a schema to which Fragments must align during a
0350 /// scan operation. This is analogous to Avro's reader and writer schema.
0351 class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
0352  public:
0353   /// \brief Begin to build a new Scan operation against this Dataset
0354   Result<std::shared_ptr<ScannerBuilder>> NewScan();
0355 
0356   /// \brief GetFragments returns an iterator of Fragments given a predicate.
0357   Result<FragmentIterator> GetFragments(compute::Expression predicate);
0358   Result<FragmentIterator> GetFragments();
0359 
0360   /// \brief Async versions of `GetFragments`.
0361   Result<FragmentGenerator> GetFragmentsAsync(compute::Expression predicate);
0362   Result<FragmentGenerator> GetFragmentsAsync();
0363 
0364   const std::shared_ptr<Schema>& schema() const { return schema_; }
0365 
0366   /// \brief An expression which evaluates to true for all data viewed by this Dataset.
0367   /// May be null, which indicates no information is available.
0368   const compute::Expression& partition_expression() const {
0369     return partition_expression_;
0370   }
0371 
0372   /// \brief The name identifying the kind of Dataset
0373   virtual std::string type_name() const = 0;
0374 
0375   /// \brief Return a copy of this Dataset with a different schema.
0376   ///
0377   /// The copy will view the same Fragments. If the new schema is not compatible with the
0378   /// original dataset's schema then an error will be raised.
0379   virtual Result<std::shared_ptr<Dataset>> ReplaceSchema(
0380       std::shared_ptr<Schema> schema) const = 0;
0381 
0382   /// \brief Rules used by this dataset to handle schema evolution
0383   DatasetEvolutionStrategy* evolution_strategy() { return evolution_strategy_.get(); }
0384 
0385   virtual ~Dataset() = default;
0386 
0387  protected:
0388   explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
0389 
0390   Dataset(std::shared_ptr<Schema> schema, compute::Expression partition_expression);
0391 
0392   virtual Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) = 0;
0393   /// \brief Default non-virtual implementation method for the base
0394   /// `GetFragmentsAsyncImpl` method, which creates a fragment generator for
0395   /// the dataset, possibly filtering results with a predicate (forwarding to
0396   /// the synchronous `GetFragmentsImpl` method and moving the computations
0397   /// to the background, using the IO thread pool).
0398   ///
0399   /// Currently, `executor` is always the same as `internal::GetCPUThreadPool()`,
0400   /// which means the results from the underlying fragment generator will be
0401   /// transferred to the default CPU thread pool. The generator itself is
0402   /// offloaded to run on the default IO thread pool.
0403   virtual Result<FragmentGenerator> GetFragmentsAsyncImpl(
0404       compute::Expression predicate, arrow::internal::Executor* executor);
0405 
0406   std::shared_ptr<Schema> schema_;
0407   compute::Expression partition_expression_ = compute::literal(true);
0408   std::unique_ptr<DatasetEvolutionStrategy> evolution_strategy_ =
0409       MakeBasicDatasetEvolutionStrategy();
0410 };
0411 
0412 /// \addtogroup dataset-implementations
0413 ///
0414 /// @{
0415 
0416 /// \brief A Source which yields fragments wrapping a stream of record batches.
0417 ///
0418 /// The record batches must match the schema provided to the source at construction.
0419 class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
0420  public:
0421   class RecordBatchGenerator {
0422    public:
0423     virtual ~RecordBatchGenerator() = default;
0424     virtual RecordBatchIterator Get() const = 0;
0425   };
0426 
0427   /// Construct a dataset from a schema and a factory of record batch iterators.
0428   InMemoryDataset(std::shared_ptr<Schema> schema,
0429                   std::shared_ptr<RecordBatchGenerator> get_batches)
0430       : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}
0431 
0432   /// Convenience constructor taking a fixed list of batches
0433   InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches);
0434 
0435   /// Convenience constructor taking a Table
0436   explicit InMemoryDataset(std::shared_ptr<Table> table);
0437 
0438   std::string type_name() const override { return "in-memory"; }
0439 
0440   Result<std::shared_ptr<Dataset>> ReplaceSchema(
0441       std::shared_ptr<Schema> schema) const override;
0442 
0443  protected:
0444   Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
0445 
0446   std::shared_ptr<RecordBatchGenerator> get_batches_;
0447 };
0448 
0449 /// \brief A Dataset wrapping child Datasets.
0450 class ARROW_DS_EXPORT UnionDataset : public Dataset {
0451  public:
0452   /// \brief Construct a UnionDataset wrapping child Datasets.
0453   ///
0454   /// \param[in] schema the schema of the resulting dataset.
0455   /// \param[in] children one or more child Datasets. Their schemas must be identical to
0456   /// schema.
0457   static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema,
0458                                                     DatasetVector children);
0459 
0460   const DatasetVector& children() const { return children_; }
0461 
0462   std::string type_name() const override { return "union"; }
0463 
0464   Result<std::shared_ptr<Dataset>> ReplaceSchema(
0465       std::shared_ptr<Schema> schema) const override;
0466 
0467  protected:
0468   Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
0469 
0470   explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
0471       : Dataset(std::move(schema)), children_(std::move(children)) {}
0472 
0473   DatasetVector children_;
0474 
0475   friend class UnionDatasetFactory;
0476 };
0477 
0478 /// @}
0479 
0480 }  // namespace dataset
0481 }  // namespace arrow