File indexing completed on 2025-08-28 08:26:57
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #pragma once
0021
0022 #include <functional>
0023 #include <memory>
0024 #include <string>
0025 #include <utility>
0026 #include <vector>
0027
0028 #include "arrow/buffer.h"
0029 #include "arrow/dataset/dataset.h"
0030 #include "arrow/dataset/partition.h"
0031 #include "arrow/dataset/scanner.h"
0032 #include "arrow/dataset/type_fwd.h"
0033 #include "arrow/dataset/visibility.h"
0034 #include "arrow/filesystem/filesystem.h"
0035 #include "arrow/io/file.h"
0036 #include "arrow/type_fwd.h"
0037 #include "arrow/util/compression.h"
0038
0039 namespace arrow {
0040
0041 namespace dataset {
0042
0043
0044
0045
0046
0047
0048
0049
0050 class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {
0051 public:
0052 FileSource(std::string path, std::shared_ptr<fs::FileSystem> filesystem,
0053 Compression::type compression = Compression::UNCOMPRESSED)
0054 : file_info_(std::move(path)),
0055 filesystem_(std::move(filesystem)),
0056 compression_(compression) {}
0057
0058 FileSource(fs::FileInfo info, std::shared_ptr<fs::FileSystem> filesystem,
0059 Compression::type compression = Compression::UNCOMPRESSED)
0060 : file_info_(std::move(info)),
0061 filesystem_(std::move(filesystem)),
0062 compression_(compression) {}
0063
0064 explicit FileSource(std::shared_ptr<Buffer> buffer,
0065 Compression::type compression = Compression::UNCOMPRESSED)
0066 : buffer_(std::move(buffer)), compression_(compression) {}
0067
0068 using CustomOpen = std::function<Result<std::shared_ptr<io::RandomAccessFile>>()>;
0069 FileSource(CustomOpen open, int64_t size)
0070 : custom_open_(std::move(open)), custom_size_(size) {}
0071
0072 using CustomOpenWithCompression =
0073 std::function<Result<std::shared_ptr<io::RandomAccessFile>>(Compression::type)>;
0074 FileSource(CustomOpenWithCompression open_with_compression, int64_t size,
0075 Compression::type compression = Compression::UNCOMPRESSED)
0076 : custom_open_(std::bind(std::move(open_with_compression), compression)),
0077 custom_size_(size),
0078 compression_(compression) {}
0079
0080 FileSource(std::shared_ptr<io::RandomAccessFile> file, int64_t size,
0081 Compression::type compression = Compression::UNCOMPRESSED)
0082 : custom_open_([=] { return ToResult(file); }),
0083 custom_size_(size),
0084 compression_(compression) {}
0085
0086 explicit FileSource(std::shared_ptr<io::RandomAccessFile> file,
0087 Compression::type compression = Compression::UNCOMPRESSED);
0088
0089 FileSource() : custom_open_(CustomOpen{&InvalidOpen}) {}
0090
0091 static std::vector<FileSource> FromPaths(const std::shared_ptr<fs::FileSystem>& fs,
0092 std::vector<std::string> paths) {
0093 std::vector<FileSource> sources;
0094 for (auto&& path : paths) {
0095 sources.emplace_back(std::move(path), fs);
0096 }
0097 return sources;
0098 }
0099
0100
0101 Compression::type compression() const { return compression_; }
0102
0103
0104 const std::string& path() const {
0105 static std::string buffer_path = "<Buffer>";
0106 static std::string custom_open_path = "<Buffer>";
0107 return filesystem_ ? file_info_.path() : buffer_ ? buffer_path : custom_open_path;
0108 }
0109
0110
0111 const std::shared_ptr<fs::FileSystem>& filesystem() const { return filesystem_; }
0112
0113
0114 const std::shared_ptr<Buffer>& buffer() const { return buffer_; }
0115
0116
0117 Result<std::shared_ptr<io::RandomAccessFile>> Open() const;
0118 Future<std::shared_ptr<io::RandomAccessFile>> OpenAsync() const;
0119
0120
0121
0122 int64_t Size() const;
0123
0124
0125
0126
0127 Result<std::shared_ptr<io::InputStream>> OpenCompressed(
0128 std::optional<Compression::type> compression = std::nullopt) const;
0129
0130
0131 bool Equals(const FileSource& other) const;
0132
0133 private:
0134 static Result<std::shared_ptr<io::RandomAccessFile>> InvalidOpen() {
0135 return Status::Invalid("Called Open() on an uninitialized FileSource");
0136 }
0137
0138 fs::FileInfo file_info_;
0139 std::shared_ptr<fs::FileSystem> filesystem_;
0140 std::shared_ptr<Buffer> buffer_;
0141 CustomOpen custom_open_;
0142 int64_t custom_size_ = 0;
0143 Compression::type compression_ = Compression::UNCOMPRESSED;
0144 };
0145
0146
0147 class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileFormat> {
0148 public:
0149
0150
0151
0152 std::shared_ptr<FragmentScanOptions> default_fragment_scan_options;
0153
0154 virtual ~FileFormat() = default;
0155
0156
0157 virtual std::string type_name() const = 0;
0158
0159 virtual bool Equals(const FileFormat& other) const = 0;
0160
0161
0162 virtual Result<bool> IsSupported(const FileSource& source) const = 0;
0163
0164
0165 virtual Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const = 0;
0166
0167
0168 virtual Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0169 const FileSource& source, const FragmentScanOptions* format_options,
0170 compute::ExecContext* exec_context) const;
0171
0172 virtual Result<RecordBatchGenerator> ScanBatchesAsync(
0173 const std::shared_ptr<ScanOptions>& options,
0174 const std::shared_ptr<FileFragment>& file) const = 0;
0175
0176 virtual Future<std::optional<int64_t>> CountRows(
0177 const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
0178 const std::shared_ptr<ScanOptions>& options);
0179
0180 virtual Future<std::shared_ptr<FragmentScanner>> BeginScan(
0181 const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0182 const FragmentScanOptions* format_options,
0183 compute::ExecContext* exec_context) const;
0184
0185
0186 virtual Result<std::shared_ptr<FileFragment>> MakeFragment(
0187 FileSource source, compute::Expression partition_expression,
0188 std::shared_ptr<Schema> physical_schema);
0189
0190
0191 Result<std::shared_ptr<FileFragment>> MakeFragment(
0192 FileSource source, compute::Expression partition_expression);
0193
0194
0195 Result<std::shared_ptr<FileFragment>> MakeFragment(
0196 FileSource source, std::shared_ptr<Schema> physical_schema = NULLPTR);
0197
0198
0199 virtual Result<std::shared_ptr<FileWriter>> MakeWriter(
0200 std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
0201 std::shared_ptr<FileWriteOptions> options,
0202 fs::FileLocator destination_locator) const = 0;
0203
0204
0205
0206
0207
0208 virtual std::shared_ptr<FileWriteOptions> DefaultWriteOptions() = 0;
0209
0210 protected:
0211 explicit FileFormat(std::shared_ptr<FragmentScanOptions> default_fragment_scan_options)
0212 : default_fragment_scan_options(std::move(default_fragment_scan_options)) {}
0213 };
0214
0215
0216 class ARROW_DS_EXPORT FileFragment : public Fragment,
0217 public util::EqualityComparable<FileFragment> {
0218 public:
0219 Result<RecordBatchGenerator> ScanBatchesAsync(
0220 const std::shared_ptr<ScanOptions>& options) override;
0221 Future<std::optional<int64_t>> CountRows(
0222 compute::Expression predicate,
0223 const std::shared_ptr<ScanOptions>& options) override;
0224 Future<std::shared_ptr<FragmentScanner>> BeginScan(
0225 const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0226 const FragmentScanOptions* format_options,
0227 compute::ExecContext* exec_context) override;
0228 Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0229 const FragmentScanOptions* format_options,
0230 compute::ExecContext* exec_context) override;
0231
0232 std::string type_name() const override { return format_->type_name(); }
0233 std::string ToString() const override { return source_.path(); };
0234
0235 const FileSource& source() const { return source_; }
0236 const std::shared_ptr<FileFormat>& format() const { return format_; }
0237
0238 bool Equals(const FileFragment& other) const;
0239
0240 protected:
0241 FileFragment(FileSource source, std::shared_ptr<FileFormat> format,
0242 compute::Expression partition_expression,
0243 std::shared_ptr<Schema> physical_schema)
0244 : Fragment(std::move(partition_expression), std::move(physical_schema)),
0245 source_(std::move(source)),
0246 format_(std::move(format)) {}
0247
0248 Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
0249
0250 FileSource source_;
0251 std::shared_ptr<FileFormat> format_;
0252
0253 friend class FileFormat;
0254 };
0255
0256
0257
0258
0259
0260 class ARROW_DS_EXPORT FileSystemDataset : public Dataset {
0261 public:
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271
0272
0273
0274
0275
0276
0277
0278 static Result<std::shared_ptr<FileSystemDataset>> Make(
0279 std::shared_ptr<Schema> schema, compute::Expression root_partition,
0280 std::shared_ptr<FileFormat> format, std::shared_ptr<fs::FileSystem> filesystem,
0281 std::vector<std::shared_ptr<FileFragment>> fragments,
0282 std::shared_ptr<Partitioning> partitioning = NULLPTR);
0283
0284
0285 static Status Write(const FileSystemDatasetWriteOptions& write_options,
0286 std::shared_ptr<Scanner> scanner);
0287
0288
0289 std::string type_name() const override { return "filesystem"; }
0290
0291
0292 Result<std::shared_ptr<Dataset>> ReplaceSchema(
0293 std::shared_ptr<Schema> schema) const override;
0294
0295
0296 std::vector<std::string> files() const;
0297
0298
0299 const std::shared_ptr<FileFormat>& format() const { return format_; }
0300
0301
0302 const std::shared_ptr<fs::FileSystem>& filesystem() const { return filesystem_; }
0303
0304
0305
0306 const std::shared_ptr<Partitioning>& partitioning() const { return partitioning_; }
0307
0308 std::string ToString() const;
0309
0310 protected:
0311 struct FragmentSubtrees;
0312
0313 explicit FileSystemDataset(std::shared_ptr<Schema> schema)
0314 : Dataset(std::move(schema)) {}
0315
0316 FileSystemDataset(std::shared_ptr<Schema> schema,
0317 compute::Expression partition_expression)
0318 : Dataset(std::move(schema), partition_expression) {}
0319
0320 Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
0321
0322 void SetupSubtreePruning();
0323
0324 std::shared_ptr<FileFormat> format_;
0325 std::shared_ptr<fs::FileSystem> filesystem_;
0326 std::vector<std::shared_ptr<FileFragment>> fragments_;
0327 std::shared_ptr<Partitioning> partitioning_;
0328
0329 std::shared_ptr<FragmentSubtrees> subtrees_;
0330 };
0331
0332
0333 class ARROW_DS_EXPORT FileWriteOptions {
0334 public:
0335 virtual ~FileWriteOptions() = default;
0336
0337 const std::shared_ptr<FileFormat>& format() const { return format_; }
0338
0339 std::string type_name() const { return format_->type_name(); }
0340
0341 protected:
0342 explicit FileWriteOptions(std::shared_ptr<FileFormat> format)
0343 : format_(std::move(format)) {}
0344
0345 std::shared_ptr<FileFormat> format_;
0346 };
0347
0348
0349 class ARROW_DS_EXPORT FileWriter {
0350 public:
0351 virtual ~FileWriter() = default;
0352
0353
0354 virtual Status Write(const std::shared_ptr<RecordBatch>& batch) = 0;
0355
0356
0357 Status Write(RecordBatchReader* batches);
0358
0359
0360 virtual Future<> Finish();
0361
0362 const std::shared_ptr<FileFormat>& format() const { return options_->format(); }
0363 const std::shared_ptr<Schema>& schema() const { return schema_; }
0364 const std::shared_ptr<FileWriteOptions>& options() const { return options_; }
0365 const fs::FileLocator& destination() const { return destination_locator_; }
0366
0367
0368 Result<int64_t> GetBytesWritten() const;
0369
0370 protected:
0371 FileWriter(std::shared_ptr<Schema> schema, std::shared_ptr<FileWriteOptions> options,
0372 std::shared_ptr<io::OutputStream> destination,
0373 fs::FileLocator destination_locator)
0374 : schema_(std::move(schema)),
0375 options_(std::move(options)),
0376 destination_(std::move(destination)),
0377 destination_locator_(std::move(destination_locator)) {}
0378
0379 virtual Future<> FinishInternal() = 0;
0380
0381 std::shared_ptr<Schema> schema_;
0382 std::shared_ptr<FileWriteOptions> options_;
0383 std::shared_ptr<io::OutputStream> destination_;
0384 fs::FileLocator destination_locator_;
0385 std::optional<int64_t> bytes_written_;
0386 };
0387
0388
0389 struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
0390
0391 std::shared_ptr<FileWriteOptions> file_write_options;
0392
0393
0394 std::shared_ptr<fs::FileSystem> filesystem;
0395
0396
0397 std::string base_dir;
0398
0399
0400 std::shared_ptr<Partitioning> partitioning;
0401
0402
0403 int max_partitions = 1024;
0404
0405
0406
0407 std::string basename_template;
0408
0409
0410
0411
0412
0413 std::function<std::string(int)> basename_template_functor;
0414
0415
0416
0417
0418
0419
0420
0421
0422 uint32_t max_open_files = 900;
0423
0424
0425
0426
0427 uint64_t max_rows_per_file = 0;
0428
0429
0430
0431
0432
0433 uint64_t min_rows_per_group = 0;
0434
0435
0436
0437
0438
0439 uint64_t max_rows_per_group = 1 << 20;
0440
0441
0442 ExistingDataBehavior existing_data_behavior = ExistingDataBehavior::kError;
0443
0444
0445
0446 bool create_dir = true;
0447
0448
0449
0450 std::function<Status(FileWriter*)> writer_pre_finish = [](FileWriter*) {
0451 return Status::OK();
0452 };
0453
0454
0455
0456 std::function<Status(FileWriter*)> writer_post_finish = [](FileWriter*) {
0457 return Status::OK();
0458 };
0459
0460 const std::shared_ptr<FileFormat>& format() const {
0461 return file_write_options->format();
0462 }
0463 };
0464
0465
0466 class ARROW_DS_EXPORT WriteNodeOptions : public acero::ExecNodeOptions {
0467 public:
0468 explicit WriteNodeOptions(
0469 FileSystemDatasetWriteOptions options,
0470 std::shared_ptr<const KeyValueMetadata> custom_metadata = NULLPTR)
0471 : write_options(std::move(options)), custom_metadata(std::move(custom_metadata)) {}
0472
0473
0474 FileSystemDatasetWriteOptions write_options;
0475
0476
0477
0478
0479
0480
0481
0482
0483 std::shared_ptr<Schema> custom_schema;
0484
0485 std::shared_ptr<const KeyValueMetadata> custom_metadata;
0486 };
0487
0488
0489
0490 namespace internal {
0491 ARROW_DS_EXPORT void InitializeDatasetWriter(arrow::acero::ExecFactoryRegistry* registry);
0492 }
0493
0494 }
0495 }