Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:57

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // This API is EXPERIMENTAL.
0019 
0020 #pragma once
0021 
0022 #include <functional>
0023 #include <memory>
0024 #include <string>
0025 #include <utility>
0026 #include <vector>
0027 
0028 #include "arrow/buffer.h"
0029 #include "arrow/dataset/dataset.h"
0030 #include "arrow/dataset/partition.h"
0031 #include "arrow/dataset/scanner.h"
0032 #include "arrow/dataset/type_fwd.h"
0033 #include "arrow/dataset/visibility.h"
0034 #include "arrow/filesystem/filesystem.h"
0035 #include "arrow/io/file.h"
0036 #include "arrow/type_fwd.h"
0037 #include "arrow/util/compression.h"
0038 
0039 namespace arrow {
0040 
0041 namespace dataset {
0042 
0043 /// \defgroup dataset-file-formats File formats for reading and writing datasets
0044 /// \defgroup dataset-filesystem File system datasets
0045 ///
0046 /// @{
0047 
0048 /// \brief The path and filesystem where an actual file is located or a buffer which can
0049 /// be read like a file
0050 class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {
0051  public:
0052   FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
0053              Compression::type compression = Compression::UNCOMPRESSED)
0054       : file_info_(std::move(path)),
0055         filesystem_(std::move(filesystem)),
0056         compression_(compression) {}
0057 
0058   FileSource(fs::FileInfo info, std::shared_ptr<fs::FileSystem> filesystem,
0059              Compression::type compression = Compression::UNCOMPRESSED)
0060       : file_info_(std::move(info)),
0061         filesystem_(std::move(filesystem)),
0062         compression_(compression) {}
0063 
0064   explicit FileSource(std::shared_ptr<Buffer> buffer,
0065                       Compression::type compression = Compression::UNCOMPRESSED)
0066       : buffer_(std::move(buffer)), compression_(compression) {}
0067 
0068   using CustomOpen = std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;
0069   FileSource(CustomOpen open, int64_t size)
0070       : custom_open_(std::move(open)), custom_size_(size) {}
0071 
0072   using CustomOpenWithCompression =
0073       std::function<Result<std::shared_ptr<io::RandomAccessFile>>(Compression::type)>;
0074   FileSource(CustomOpenWithCompression open_with_compression, int64_t size,
0075              Compression::type compression = Compression::UNCOMPRESSED)
0076       : custom_open_(std::bind(std::move(open_with_compression), compression)),
0077         custom_size_(size),
0078         compression_(compression) {}
0079 
0080   FileSource(std::shared_ptr<io::RandomAccessFile> file, int64_t size,
0081              Compression::type compression = Compression::UNCOMPRESSED)
0082       : custom_open_([=] { return ToResult(file); }),
0083         custom_size_(size),
0084         compression_(compression) {}
0085 
0086   explicit FileSource(std::shared_ptr<io::RandomAccessFile> file,
0087                       Compression::type compression = Compression::UNCOMPRESSED);
0088 
0089   FileSource() : custom_open_(CustomOpen{&InvalidOpen}) {}
0090 
0091   static std::vector<FileSource> FromPaths(const std::shared_ptr<fs::FileSystem>& fs,
0092                                            std::vector<std::string> paths) {
0093     std::vector<FileSource> sources;
0094     for (auto&& path : paths) {
0095       sources.emplace_back(std::move(path), fs);
0096     }
0097     return sources;
0098   }
0099 
0100   /// \brief Return the type of raw compression on the file, if any.
0101   Compression::type compression() const { return compression_; }
0102 
0103   /// \brief Return the file path, if any. Only valid when file source wraps a path.
0104   const std::string& path() const {
0105     static std::string buffer_path = "<Buffer>";
0106     static std::string custom_open_path = "<Buffer>";
0107     return filesystem_ ? file_info_.path() : buffer_ ? buffer_path : custom_open_path;
0108   }
0109 
0110   /// \brief Return the filesystem, if any. Otherwise returns nullptr
0111   const std::shared_ptr<fs::FileSystem>& filesystem() const { return filesystem_; }
0112 
0113   /// \brief Return the buffer containing the file, if any. Otherwise returns nullptr
0114   const std::shared_ptr<Buffer>& buffer() const { return buffer_; }
0115 
0116   /// \brief Get a RandomAccessFile which views this file source
0117   Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
0118   Future<std::shared_ptr<io::RandomAccessFile>> OpenAsync() const;
0119 
0120   /// \brief Get the size (in bytes) of the file or buffer
0121   /// If the file is compressed this should be the compressed (on-disk) size.
0122   int64_t Size() const;
0123 
0124   /// \brief Get an InputStream which views this file source (and decompresses if needed)
0125   /// \param[in] compression If nullopt, guess the compression scheme from the
0126   ///     filename, else decompress with the given codec
0127   Result<std::shared_ptr<io::InputStream>> OpenCompressed(
0128       std::optional<Compression::type> compression = std::nullopt) const;
0129 
0130   /// \brief equality comparison with another FileSource
0131   bool Equals(const FileSource& other) const;
0132 
0133  private:
0134   static Result<std::shared_ptr<io::RandomAccessFile>> InvalidOpen() {
0135     return Status::Invalid("Called Open() on an uninitialized FileSource");
0136   }
0137 
0138   fs::FileInfo file_info_;
0139   std::shared_ptr<fs::FileSystem> filesystem_;
0140   std::shared_ptr<Buffer> buffer_;
0141   CustomOpen custom_open_;
0142   int64_t custom_size_ = 0;
0143   Compression::type compression_ = Compression::UNCOMPRESSED;
0144 };
0145 
0146 /// \brief Base class for file format implementation
0147 class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileFormat> {
0148  public:
0149   /// Options affecting how this format is scanned.
0150   ///
0151   /// The options here can be overridden at scan time.
0152   std::shared_ptr<FragmentScanOptions> default_fragment_scan_options;
0153 
0154   virtual ~FileFormat() = default;
0155 
0156   /// \brief The name identifying the kind of file format
0157   virtual std::string type_name() const = 0;
0158 
0159   virtual bool Equals(const FileFormat& other) const = 0;
0160 
0161   /// \brief Indicate if the FileSource is supported/readable by this format.
0162   virtual Result<bool> IsSupported(const FileSource& source) const = 0;
0163 
0164   /// \brief Return the schema of the file if possible.
0165   virtual Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const = 0;
0166 
0167   /// \brief Learn what we need about the file before we start scanning it
0168   virtual Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0169       const FileSource& source, const FragmentScanOptions* format_options,
0170       compute::ExecContext* exec_context) const;
0171 
0172   virtual Result<RecordBatchGenerator> ScanBatchesAsync(
0173       const std::shared_ptr<ScanOptions>& options,
0174       const std::shared_ptr<FileFragment>& file) const = 0;
0175 
0176   virtual Future<std::optional<int64_t>> CountRows(
0177       const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
0178       const std::shared_ptr<ScanOptions>& options);
0179 
0180   virtual Future<std::shared_ptr<FragmentScanner>> BeginScan(
0181       const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0182       const FragmentScanOptions* format_options,
0183       compute::ExecContext* exec_context) const;
0184 
0185   /// \brief Open a fragment
0186   virtual Result<std::shared_ptr<FileFragment>> MakeFragment(
0187       FileSource source, compute::Expression partition_expression,
0188       std::shared_ptr<Schema> physical_schema);
0189 
0190   /// \brief Create a FileFragment for a FileSource.
0191   Result<std::shared_ptr<FileFragment>> MakeFragment(
0192       FileSource source, compute::Expression partition_expression);
0193 
0194   /// \brief Create a FileFragment for a FileSource.
0195   Result<std::shared_ptr<FileFragment>> MakeFragment(
0196       FileSource source, std::shared_ptr<Schema> physical_schema = NULLPTR);
0197 
0198   /// \brief Create a writer for this format.
0199   virtual Result<std::shared_ptr<FileWriter>> MakeWriter(
0200       std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
0201       std::shared_ptr<FileWriteOptions> options,
0202       fs::FileLocator destination_locator) const = 0;
0203 
0204   /// \brief Get default write options for this format.
0205   ///
0206   /// May return null shared_ptr if this file format does not yet support
0207   /// writing datasets.
0208   virtual std::shared_ptr<FileWriteOptions> DefaultWriteOptions() = 0;
0209 
0210  protected:
0211   explicit FileFormat(std::shared_ptr<FragmentScanOptions> default_fragment_scan_options)
0212       : default_fragment_scan_options(std::move(default_fragment_scan_options)) {}
0213 };
0214 
0215 /// \brief A Fragment that is stored in a file with a known format
0216 class ARROW_DS_EXPORT FileFragment : public Fragment,
0217                                      public util::EqualityComparable<FileFragment> {
0218  public:
0219   Result<RecordBatchGenerator> ScanBatchesAsync(
0220       const std::shared_ptr<ScanOptions>& options) override;
0221   Future<std::optional<int64_t>> CountRows(
0222       compute::Expression predicate,
0223       const std::shared_ptr<ScanOptions>& options) override;
0224   Future<std::shared_ptr<FragmentScanner>> BeginScan(
0225       const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0226       const FragmentScanOptions* format_options,
0227       compute::ExecContext* exec_context) override;
0228   Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0229       const FragmentScanOptions* format_options,
0230       compute::ExecContext* exec_context) override;
0231 
0232   std::string type_name() const override { return format_->type_name(); }
0233   std::string ToString() const override { return source_.path(); };
0234 
0235   const FileSource& source() const { return source_; }
0236   const std::shared_ptr<FileFormat>& format() const { return format_; }
0237 
0238   bool Equals(const FileFragment& other) const;
0239 
0240  protected:
0241   FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
0242                compute::Expression partition_expression,
0243                std::shared_ptr<Schema> physical_schema)
0244       : Fragment(std::move(partition_expression), std::move(physical_schema)),
0245         source_(std::move(source)),
0246         format_(std::move(format)) {}
0247 
0248   Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
0249 
0250   FileSource source_;
0251   std::shared_ptr<FileFormat> format_;
0252 
0253   friend class FileFormat;
0254 };
0255 
0256 /// \brief A Dataset of FileFragments.
0257 ///
0258 /// A FileSystemDataset is composed of one or more FileFragment. The fragments
0259 /// are independent and don't need to share the same format and/or filesystem.
0260 class ARROW_DS_EXPORT FileSystemDataset : public Dataset {
0261  public:
0262   /// \brief Create a FileSystemDataset.
0263   ///
0264   /// \param[in] schema the schema of the dataset
0265   /// \param[in] root_partition the partition expression of the dataset
0266   /// \param[in] format the format of each FileFragment.
0267   /// \param[in] filesystem the filesystem of each FileFragment, or nullptr if the
0268   ///            fragments wrap buffers.
0269   /// \param[in] fragments list of fragments to create the dataset from.
0270   /// \param[in] partitioning the Partitioning object in case the dataset is created
0271   ///            with a known partitioning (e.g. from a discovered partitioning
0272   ///            through a DatasetFactory), or nullptr if not known.
0273   ///
0274   /// Note that fragments wrapping files resident in differing filesystems are not
0275   /// permitted; to work with multiple filesystems use a UnionDataset.
0276   ///
0277   /// \return A constructed dataset.
0278   static Result<std::shared_ptr<FileSystemDataset>> Make(
0279       std::shared_ptr<Schema> schema, compute::Expression root_partition,
0280       std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem,
0281       std::vector<std::shared_ptr<FileFragment>> fragments,
0282       std::shared_ptr<Partitioning> partitioning = NULLPTR);
0283 
0284   /// \brief Write a dataset.
0285   static Status Write(const FileSystemDatasetWriteOptions& write_options,
0286                       std::shared_ptr<Scanner> scanner);
0287 
0288   /// \brief Return the type name of the dataset.
0289   std::string type_name() const override { return "filesystem"; }
0290 
0291   /// \brief Replace the schema of the dataset.
0292   Result<std::shared_ptr<Dataset>> ReplaceSchema(
0293       std::shared_ptr<Schema> schema) const override;
0294 
0295   /// \brief Return the path of files.
0296   std::vector<std::string> files() const;
0297 
0298   /// \brief Return the format.
0299   const std::shared_ptr<FileFormat>& format() const { return format_; }
0300 
0301   /// \brief Return the filesystem. May be nullptr if the fragments wrap buffers.
0302   const std::shared_ptr<fs::FileSystem>& filesystem() const { return filesystem_; }
0303 
0304   /// \brief Return the partitioning. May be nullptr if the dataset was not constructed
0305   /// with a partitioning.
0306   const std::shared_ptr<Partitioning>& partitioning() const { return partitioning_; }
0307 
0308   std::string ToString() const;
0309 
0310  protected:
0311   struct FragmentSubtrees;
0312 
0313   explicit FileSystemDataset(std::shared_ptr<Schema> schema)
0314       : Dataset(std::move(schema)) {}
0315 
0316   FileSystemDataset(std::shared_ptr<Schema> schema,
0317                     compute::Expression partition_expression)
0318       : Dataset(std::move(schema), partition_expression) {}
0319 
0320   Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
0321 
0322   void SetupSubtreePruning();
0323 
0324   std::shared_ptr<FileFormat> format_;
0325   std::shared_ptr<fs::FileSystem> filesystem_;
0326   std::vector<std::shared_ptr<FileFragment>> fragments_;
0327   std::shared_ptr<Partitioning> partitioning_;
0328 
0329   std::shared_ptr<FragmentSubtrees> subtrees_;
0330 };
0331 
0332 /// \brief Options for writing a file of this format.
0333 class ARROW_DS_EXPORT FileWriteOptions {
0334  public:
0335   virtual ~FileWriteOptions() = default;
0336 
0337   const std::shared_ptr<FileFormat>& format() const { return format_; }
0338 
0339   std::string type_name() const { return format_->type_name(); }
0340 
0341  protected:
0342   explicit FileWriteOptions(std::shared_ptr<FileFormat> format)
0343       : format_(std::move(format)) {}
0344 
0345   std::shared_ptr<FileFormat> format_;
0346 };
0347 
0348 /// \brief A writer for this format.
0349 class ARROW_DS_EXPORT FileWriter {
0350  public:
0351   virtual ~FileWriter() = default;
0352 
0353   /// \brief Write the given batch.
0354   virtual Status Write(const std::shared_ptr<RecordBatch>& batch) = 0;
0355 
0356   /// \brief Write all batches from the reader.
0357   Status Write(RecordBatchReader* batches);
0358 
0359   /// \brief Indicate that writing is done.
0360   virtual Future<> Finish();
0361 
0362   const std::shared_ptr<FileFormat>& format() const { return options_->format(); }
0363   const std::shared_ptr<Schema>& schema() const { return schema_; }
0364   const std::shared_ptr<FileWriteOptions>& options() const { return options_; }
0365   const fs::FileLocator& destination() const { return destination_locator_; }
0366 
0367   /// \brief After Finish() is called, provides number of bytes written to file.
0368   Result<int64_t> GetBytesWritten() const;
0369 
0370  protected:
0371   FileWriter(std::shared_ptr<Schema> schema, std::shared_ptr<FileWriteOptions> options,
0372              std::shared_ptr<io::OutputStream> destination,
0373              fs::FileLocator destination_locator)
0374       : schema_(std::move(schema)),
0375         options_(std::move(options)),
0376         destination_(std::move(destination)),
0377         destination_locator_(std::move(destination_locator)) {}
0378 
0379   virtual Future<> FinishInternal() = 0;
0380 
0381   std::shared_ptr<Schema> schema_;
0382   std::shared_ptr<FileWriteOptions> options_;
0383   std::shared_ptr<io::OutputStream> destination_;
0384   fs::FileLocator destination_locator_;
0385   std::optional<int64_t> bytes_written_;
0386 };
0387 
0388 /// \brief Options for writing a dataset.
0389 struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
0390   /// Options for individual fragment writing.
0391   std::shared_ptr<FileWriteOptions> file_write_options;
0392 
0393   /// FileSystem into which a dataset will be written.
0394   std::shared_ptr<fs::FileSystem> filesystem;
0395 
0396   /// Root directory into which the dataset will be written.
0397   std::string base_dir;
0398 
0399   /// Partitioning used to generate fragment paths.
0400   std::shared_ptr<Partitioning> partitioning;
0401 
0402   /// Maximum number of partitions any batch may be written into, default is 1K.
0403   int max_partitions = 1024;
0404 
0405   /// Template string used to generate fragment basenames.
0406   /// {i} will be replaced by an auto incremented integer.
0407   std::string basename_template;
0408 
0409   /// A functor which will be applied on an incremented counter.  The result will be
0410   /// inserted into the basename_template in place of {i}.
0411   ///
0412   /// This can be used, for example, to left-pad the file counter.
0413   std::function<std::string(int)> basename_template_functor;
0414 
0415   /// If greater than 0 then this will limit the maximum number of files that can be left
0416   /// open. If an attempt is made to open too many files then the least recently used file
0417   /// will be closed.  If this setting is set too low you may end up fragmenting your data
0418   /// into many small files.
0419   ///
0420   /// The default is 900 which also allows some # of files to be open by the scanner
0421   /// before hitting the default Linux limit of 1024
0422   uint32_t max_open_files = 900;
0423 
0424   /// If greater than 0 then this will limit how many rows are placed in any single file.
0425   /// Otherwise there will be no limit and one file will be created in each output
0426   /// directory unless files need to be closed to respect max_open_files
0427   uint64_t max_rows_per_file = 0;
0428 
0429   /// If greater than 0 then this will cause the dataset writer to batch incoming data
0430   /// and only write the row groups to the disk when sufficient rows have accumulated.
0431   /// The final row group size may be less than this value and other options such as
0432   /// `max_open_files` or `max_rows_per_file` lead to smaller row group sizes.
0433   uint64_t min_rows_per_group = 0;
0434 
0435   /// If greater than 0 then the dataset writer may split up large incoming batches into
0436   /// multiple row groups.  If this value is set then min_rows_per_group should also be
0437   /// set or else you may end up with very small row groups (e.g. if the incoming row
0438   /// group size is just barely larger than this value).
0439   uint64_t max_rows_per_group = 1 << 20;
0440 
0441   /// Controls what happens if an output directory already exists.
0442   ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError;
0443 
0444   /// \brief If false the dataset writer will not create directories
0445   /// This is mainly intended for filesystems that do not require directories such as S3.
0446   bool create_dir = true;
0447 
0448   /// Callback to be invoked against all FileWriters before
0449   /// they are finalized with FileWriter::Finish().
0450   std::function<Status(FileWriter*)> writer_pre_finish = [](FileWriter*) {
0451     return Status::OK();
0452   };
0453 
0454   /// Callback to be invoked against all FileWriters after they have
0455   /// called FileWriter::Finish().
0456   std::function<Status(FileWriter*)> writer_post_finish = [](FileWriter*) {
0457     return Status::OK();
0458   };
0459 
0460   const std::shared_ptr<FileFormat>& format() const {
0461     return file_write_options->format();
0462   }
0463 };
0464 
0465 /// \brief Wraps FileSystemDatasetWriteOptions for consumption as compute::ExecNodeOptions
0466 class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions {
0467  public:
0468   explicit WriteNodeOptions(
0469       FileSystemDatasetWriteOptions options,
0470       std::shared_ptr<const KeyValueMetadata> custom_metadata = NULLPTR)
0471       : write_options(std::move(options)), custom_metadata(std::move(custom_metadata)) {}
0472 
0473   /// \brief Options to control how to write the dataset
0474   FileSystemDatasetWriteOptions write_options;
0475   /// \brief Optional schema to attach to all written batches
0476   ///
0477   /// By default, we will use the output schema of the input.
0478   ///
0479   /// This can be used to alter schema metadata, field nullability, or field metadata.
0480   /// However, this cannot be used to change the type of data.  If the custom schema does
0481   /// not have the same number of fields and the same data types as the input then the
0482   /// plan will fail.
0483   std::shared_ptr<Schema> custom_schema;
0484   /// \brief Optional metadata to attach to written batches
0485   std::shared_ptr<const KeyValueMetadata> custom_metadata;
0486 };
0487 
0488 /// @}
0489 
0490 namespace internal {
0491 ARROW_DS_EXPORT void InitializeDatasetWriter(arrow::acero::ExecFactoryRegistry* registry);
0492 }
0493 
0494 }  // namespace dataset
0495 }  // namespace arrow