Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:57

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 /// Logic for automatically determining the structure of multi-file
0019 /// dataset with possible partitioning according to available
0020 /// partitioning
0021 
0022 // This API is EXPERIMENTAL.
0023 
0024 #pragma once
0025 
0026 #include <memory>
0027 #include <string>
0028 #include <variant>
0029 #include <vector>
0030 
0031 #include "arrow/dataset/partition.h"
0032 #include "arrow/dataset/type_fwd.h"
0033 #include "arrow/dataset/visibility.h"
0034 #include "arrow/filesystem/type_fwd.h"
0035 #include "arrow/result.h"
0036 #include "arrow/util/macros.h"
0037 
0038 namespace arrow {
0039 namespace dataset {
0040 
0041 /// \defgroup dataset-discovery Discovery API
0042 ///
0043 /// @{
0044 
0045 struct InspectOptions {
0046   /// See `fragments` property.
0047   static constexpr int kInspectAllFragments = -1;
0048 
0049   /// Indicate how many fragments should be inspected to infer the unified dataset
0050   /// schema. Limiting the number of fragments accessed improves the latency of
0051   /// the discovery process when dealing with a high number of fragments and/or
0052   /// high latency file systems.
0053   ///
0054   /// The default value of `1` inspects the schema of the first (in no particular
0055   /// order) fragment only. If the dataset has a uniform schema for all fragments,
0056   /// this default is the optimal value. In order to inspect all fragments and
0057   /// robustly unify their potentially varying schemas, set this option to
0058   /// `kInspectAllFragments`. A value of `0` disables inspection of fragments
0059   /// altogether so only the partitioning schema will be inspected.
0060   int fragments = 1;
0061 
0062   /// Control how to unify types. By default, types are merged strictly (the
0063   /// type must match exactly, except nulls can be merged with other types).
0064   Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults();
0065 };
0066 
0067 struct FinishOptions {
0068   /// Finalize the dataset with this given schema. If the schema is not
0069   /// provided, infer the schema via the Inspect, see the `inspect_options`
0070   /// property.
0071   std::shared_ptr<Schema> schema = NULLPTR;
0072 
0073   /// If the schema is not provided, it will be discovered by passing the
0074   /// following options to `DatasetDiscovery::Inspect`.
0075   InspectOptions inspect_options{};
0076 
0077   /// Indicate if the given Schema (when specified), should be validated against
0078   /// the fragments' schemas. `inspect_options` will control how many fragments
0079   /// are checked.
0080   bool validate_fragments = false;
0081 };
0082 
0083 /// \brief DatasetFactory provides a way to inspect/discover a Dataset's expected
0084 /// schema before materializing said Dataset.
0085 class ARROW_DS_EXPORT DatasetFactory {
0086  public:
0087   /// \brief Get the schemas of the Fragments and Partitioning.
0088   virtual Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
0089       InspectOptions options) = 0;
0090 
0091   /// \brief Get unified schema for the resulting Dataset.
0092   Result<std::shared_ptr<Schema>> Inspect(InspectOptions options = {});
0093 
0094   /// \brief Create a Dataset
0095   Result<std::shared_ptr<Dataset>> Finish();
0096   /// \brief Create a Dataset with the given schema (see \a InspectOptions::schema)
0097   Result<std::shared_ptr<Dataset>> Finish(std::shared_ptr<Schema> schema);
0098   /// \brief Create a Dataset with the given options
0099   virtual Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) = 0;
0100 
0101   /// \brief Optional root partition for the resulting Dataset.
0102   const compute::Expression& root_partition() const { return root_partition_; }
0103   /// \brief Set the root partition for the resulting Dataset.
0104   Status SetRootPartition(compute::Expression partition) {
0105     root_partition_ = std::move(partition);
0106     return Status::OK();
0107   }
0108 
0109   virtual ~DatasetFactory() = default;
0110 
0111  protected:
0112   DatasetFactory();
0113 
0114   compute::Expression root_partition_;
0115 };
0116 
0117 /// @}
0118 
0119 /// \brief DatasetFactory provides a way to inspect/discover a Dataset's
0120 /// expected schema before materialization.
0121 /// \ingroup dataset-implementations
0122 class ARROW_DS_EXPORT UnionDatasetFactory : public DatasetFactory {
0123  public:
0124   static Result<std::shared_ptr<DatasetFactory>> Make(
0125       std::vector<std::shared_ptr<DatasetFactory>> factories);
0126 
0127   /// \brief Return the list of child DatasetFactory
0128   const std::vector<std::shared_ptr<DatasetFactory>>& factories() const {
0129     return factories_;
0130   }
0131 
0132   /// \brief Get the schemas of the Datasets.
0133   ///
0134   /// Instead of applying options globally, it applies at each child factory.
0135   /// This will not respect `options.fragments` exactly, but will respect the
0136   /// spirit of peeking the first fragments or all of them.
0137   Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
0138       InspectOptions options) override;
0139 
0140   /// \brief Create a Dataset.
0141   Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
0142 
0143  protected:
0144   explicit UnionDatasetFactory(std::vector<std::shared_ptr<DatasetFactory>> factories);
0145 
0146   std::vector<std::shared_ptr<DatasetFactory>> factories_;
0147 };
0148 
0149 /// \ingroup dataset-filesystem
0150 struct FileSystemFactoryOptions {
0151   /// Either an explicit Partitioning or a PartitioningFactory to discover one.
0152   ///
0153   /// If a factory is provided, it will be used to infer a schema for partition fields
0154   /// based on file and directory paths then construct a Partitioning. The default
0155   /// is a Partitioning which will yield no partition information.
0156   ///
0157   /// The (explicit or discovered) partitioning will be applied to discovered files
0158   /// and the resulting partition information embedded in the Dataset.
0159   PartitioningOrFactory partitioning{Partitioning::Default()};
0160 
0161   /// For the purposes of applying the partitioning, paths will be stripped
0162   /// of the partition_base_dir. Files not matching the partition_base_dir
0163   /// prefix will be skipped for partition discovery. The ignored files will still
0164   /// be part of the Dataset, but will not have partition information.
0165   ///
0166   /// Example:
0167   /// partition_base_dir = "/dataset";
0168   ///
0169   /// - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the partitioning
0170   ///
0171   /// - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
0172   ///
0173   /// This is useful for partitioning which parses directory when ordering
0174   /// is important, e.g. DirectoryPartitioning.
0175   std::string partition_base_dir;
0176 
0177   /// Invalid files (via selector or explicitly) will be excluded by checking
0178   /// with the FileFormat::IsSupported method.  This will incur IO for each files
0179   /// in a serial and single threaded fashion. Disabling this feature will skip the
0180   /// IO, but unsupported files may be present in the Dataset
0181   /// (resulting in an error at scan time).
0182   bool exclude_invalid_files = false;
0183 
0184   /// When discovering from a Selector (and not from an explicit file list), ignore
0185   /// files and directories matching any of these prefixes.
0186   ///
0187   /// Example (with selector = "/dataset/**"):
0188   /// selector_ignore_prefixes = {"_", ".DS_STORE" };
0189   ///
0190   /// - "/dataset/data.csv" -> not ignored
0191   /// - "/dataset/_metadata" -> ignored
0192   /// - "/dataset/.DS_STORE" -> ignored
0193   /// - "/dataset/_hidden/dat" -> ignored
0194   /// - "/dataset/nested/.DS_STORE" -> ignored
0195   std::vector<std::string> selector_ignore_prefixes = {
0196       ".",
0197       "_",
0198   };
0199 };
0200 
0201 /// \brief FileSystemDatasetFactory creates a Dataset from a vector of
0202 /// fs::FileInfo or a fs::FileSelector.
0203 /// \ingroup dataset-filesystem
0204 class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
0205  public:
0206   /// \brief Build a FileSystemDatasetFactory from an explicit list of
0207   /// paths.
0208   ///
0209   /// \param[in] filesystem passed to FileSystemDataset
0210   /// \param[in] paths passed to FileSystemDataset
0211   /// \param[in] format passed to FileSystemDataset
0212   /// \param[in] options see FileSystemFactoryOptions for more information.
0213   static Result<std::shared_ptr<DatasetFactory>> Make(
0214       std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
0215       std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);
0216 
0217   /// \brief Build a FileSystemDatasetFactory from a fs::FileSelector.
0218   ///
0219   /// The selector will expand to a vector of FileInfo. The expansion/crawling
0220   /// is performed in this function call. Thus, the finalized Dataset is
0221   /// working with a snapshot of the filesystem.
0222   //
0223   /// If options.partition_base_dir is not provided, it will be overwritten
0224   /// with selector.base_dir.
0225   ///
0226   /// \param[in] filesystem passed to FileSystemDataset
0227   /// \param[in] selector used to crawl and search files
0228   /// \param[in] format passed to FileSystemDataset
0229   /// \param[in] options see FileSystemFactoryOptions for more information.
0230   static Result<std::shared_ptr<DatasetFactory>> Make(
0231       std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
0232       std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);
0233 
0234   /// \brief Build a FileSystemDatasetFactory from an uri including filesystem
0235   /// information.
0236   ///
0237   /// \param[in] uri passed to FileSystemDataset
0238   /// \param[in] format passed to FileSystemDataset
0239   /// \param[in] options see FileSystemFactoryOptions for more information.
0240   static Result<std::shared_ptr<DatasetFactory>> Make(std::string uri,
0241                                                       std::shared_ptr<FileFormat> format,
0242                                                       FileSystemFactoryOptions options);
0243 
0244   /// \brief Build a FileSystemDatasetFactory from an explicit list of
0245   /// file information.
0246   ///
0247   /// \param[in] filesystem passed to FileSystemDataset
0248   /// \param[in] files passed to FileSystemDataset
0249   /// \param[in] format passed to FileSystemDataset
0250   /// \param[in] options see FileSystemFactoryOptions for more information.
0251   static Result<std::shared_ptr<DatasetFactory>> Make(
0252       std::shared_ptr<fs::FileSystem> filesystem, const std::vector<fs::FileInfo>& files,
0253       std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);
0254 
0255   Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
0256       InspectOptions options) override;
0257 
0258   Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
0259 
0260  protected:
0261   FileSystemDatasetFactory(std::vector<fs::FileInfo> files,
0262                            std::shared_ptr<fs::FileSystem> filesystem,
0263                            std::shared_ptr<FileFormat> format,
0264                            FileSystemFactoryOptions options);
0265 
0266   Result<std::shared_ptr<Schema>> PartitionSchema();
0267 
0268   std::vector<fs::FileInfo> files_;
0269   std::shared_ptr<fs::FileSystem> fs_;
0270   std::shared_ptr<FileFormat> format_;
0271   FileSystemFactoryOptions options_;
0272 };
0273 
0274 }  // namespace dataset
0275 }  // namespace arrow