File indexing completed on 2025-08-28 08:26:57
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #pragma once
0021
0022 #include <functional>
0023 #include <memory>
0024 #include <optional>
0025 #include <string>
0026 #include <utility>
0027 #include <vector>
0028
0029 #include "arrow/compute/expression.h"
0030 #include "arrow/dataset/type_fwd.h"
0031 #include "arrow/dataset/visibility.h"
0032 #include "arrow/util/async_generator_fwd.h"
0033 #include "arrow/util/future.h"
0034 #include "arrow/util/macros.h"
0035 #include "arrow/util/mutex.h"
0036
0037 namespace arrow {
0038
0039 namespace internal {
0040 class Executor;
0041 }
0042
0043 namespace dataset {
0044
0045 using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;
0046
0047
0048 struct ARROW_DS_EXPORT FragmentSelectionColumn {
0049
0050 FieldPath path;
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060 DataType* requested_type;
0061 };
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071 class ARROW_DS_EXPORT FragmentSelection {
0072 public:
0073 explicit FragmentSelection(std::vector<FragmentSelectionColumn> columns)
0074 : columns_(std::move(columns)) {}
0075 virtual ~FragmentSelection() = default;
0076
0077 const std::vector<FragmentSelectionColumn>& columns() const { return columns_; }
0078
0079 private:
0080 std::vector<FragmentSelectionColumn> columns_;
0081 };
0082
0083
0084
0085
0086
0087
0088 struct ARROW_DS_EXPORT FragmentScanRequest {
0089
0090
0091
0092
0093
0094 compute::Expression filter = compute::literal(true);
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106
0107 std::shared_ptr<FragmentSelection> fragment_selection;
0108
0109 const FragmentScanOptions* format_scan_options;
0110 };
0111
0112
0113 class ARROW_DS_EXPORT FragmentScanner {
0114 public:
0115
0116
0117
0118
0119
0120 virtual ~FragmentScanner() = default;
0121
0122
0123 virtual Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) = 0;
0124
0125
0126
0127
0128 virtual int64_t EstimatedDataBytes(int batch_number) = 0;
0129
0130 virtual int NumBatches() = 0;
0131 };
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142 struct ARROW_DS_EXPORT InspectedFragment {
0143 explicit InspectedFragment(std::vector<std::string> column_names)
0144 : column_names(std::move(column_names)) {}
0145 std::vector<std::string> column_names;
0146 };
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156 class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
0157 public:
0158
0159 static const compute::Expression kNoPartitionInformation;
0160
0161
0162
0163
0164
0165
0166 Result<std::shared_ptr<Schema>> ReadPhysicalSchema();
0167
0168
0169 virtual Result<RecordBatchGenerator> ScanBatchesAsync(
0170 const std::shared_ptr<ScanOptions>& options) = 0;
0171
0172
0173
0174
0175
0176
0177 virtual Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0178 const FragmentScanOptions* format_options, compute::ExecContext* exec_context);
0179
0180
0181 virtual Future<std::shared_ptr<FragmentScanner>> BeginScan(
0182 const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0183 const FragmentScanOptions* format_options, compute::ExecContext* exec_context);
0184
0185
0186
0187
0188
0189
0190 virtual Future<std::optional<int64_t>> CountRows(
0191 compute::Expression predicate, const std::shared_ptr<ScanOptions>& options);
0192
0193 virtual std::string type_name() const = 0;
0194 virtual std::string ToString() const { return type_name(); }
0195
0196
0197
0198 const compute::Expression& partition_expression() const {
0199 return partition_expression_;
0200 }
0201
0202 virtual ~Fragment() = default;
0203
0204 protected:
0205 Fragment() = default;
0206 explicit Fragment(compute::Expression partition_expression,
0207 std::shared_ptr<Schema> physical_schema);
0208
0209 virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0;
0210
0211 util::Mutex physical_schema_mutex_;
0212 compute::Expression partition_expression_ = compute::literal(true);
0213 std::shared_ptr<Schema> physical_schema_;
0214 };
0215
0216
0217
0218
0219
0220
0221
0222
0223
0224 class ARROW_DS_EXPORT FragmentScanOptions {
0225 public:
0226 virtual std::string type_name() const = 0;
0227 virtual std::string ToString() const { return type_name(); }
0228 virtual ~FragmentScanOptions() = default;
0229 };
0230
0231
0232
0233
0234
0235
0236
0237 class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
0238 public:
0239 class Scanner;
0240 InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
0241 compute::Expression = compute::literal(true));
0242 explicit InMemoryFragment(RecordBatchVector record_batches,
0243 compute::Expression = compute::literal(true));
0244
0245 Result<RecordBatchGenerator> ScanBatchesAsync(
0246 const std::shared_ptr<ScanOptions>& options) override;
0247 Future<std::optional<int64_t>> CountRows(
0248 compute::Expression predicate,
0249 const std::shared_ptr<ScanOptions>& options) override;
0250
0251 Future<std::shared_ptr<InspectedFragment>> InspectFragment(
0252 const FragmentScanOptions* format_options,
0253 compute::ExecContext* exec_context) override;
0254 Future<std::shared_ptr<FragmentScanner>> BeginScan(
0255 const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
0256 const FragmentScanOptions* format_options,
0257 compute::ExecContext* exec_context) override;
0258
0259 std::string type_name() const override { return "in-memory"; }
0260
0261 protected:
0262 Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
0263
0264 RecordBatchVector record_batches_;
0265 };
0266
0267
0268
0269 using FragmentGenerator = AsyncGenerator<std::shared_ptr<Fragment>>;
0270
0271
0272 class ARROW_DS_EXPORT FragmentEvolutionStrategy {
0273 public:
0274
0275
0276 virtual ~FragmentEvolutionStrategy() = default;
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287 virtual Result<compute::Expression> GetGuarantee(
0288 const std::vector<FieldPath>& dataset_schema_selection) const = 0;
0289
0290
0291
0292
0293
0294
0295 virtual Result<std::unique_ptr<FragmentSelection>> DevolveSelection(
0296 const std::vector<FieldPath>& dataset_schema_selection) const = 0;
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307
0308 virtual Result<compute::Expression> DevolveFilter(
0309 const compute::Expression& filter) const = 0;
0310
0311
0312
0313
0314
0315
0316
0317
0318
0319
0320 virtual Result<compute::ExecBatch> EvolveBatch(
0321 const std::shared_ptr<RecordBatch>& batch,
0322 const std::vector<FieldPath>& dataset_selection,
0323 const FragmentSelection& selection) const = 0;
0324
0325
0326 virtual std::string ToString() const = 0;
0327 };
0328
0329
0330 class ARROW_DS_EXPORT DatasetEvolutionStrategy {
0331 public:
0332 virtual ~DatasetEvolutionStrategy() = default;
0333
0334
0335 virtual std::unique_ptr<FragmentEvolutionStrategy> GetStrategy(
0336 const Dataset& dataset, const Fragment& fragment,
0337 const InspectedFragment& inspected_fragment) = 0;
0338
0339
0340 virtual std::string ToString() const = 0;
0341 };
0342
0343 ARROW_DS_EXPORT std::unique_ptr<DatasetEvolutionStrategy>
0344 MakeBasicDatasetEvolutionStrategy();
0345
0346
0347
0348
0349
0350
0351 class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
0352 public:
0353
0354 Result<std::shared_ptr<ScannerBuilder>> NewScan();
0355
0356
0357 Result<FragmentIterator> GetFragments(compute::Expression predicate);
0358 Result<FragmentIterator> GetFragments();
0359
0360
0361 Result<FragmentGenerator> GetFragmentsAsync(compute::Expression predicate);
0362 Result<FragmentGenerator> GetFragmentsAsync();
0363
0364 const std::shared_ptr<Schema>& schema() const { return schema_; }
0365
0366
0367
0368 const compute::Expression& partition_expression() const {
0369 return partition_expression_;
0370 }
0371
0372
0373 virtual std::string type_name() const = 0;
0374
0375
0376
0377
0378
0379 virtual Result<std::shared_ptr<Dataset>> ReplaceSchema(
0380 std::shared_ptr<Schema> schema) const = 0;
0381
0382
0383 DatasetEvolutionStrategy* evolution_strategy() { return evolution_strategy_.get(); }
0384
0385 virtual ~Dataset() = default;
0386
0387 protected:
0388 explicit Dataset(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
0389
0390 Dataset(std::shared_ptr<Schema> schema, compute::Expression partition_expression);
0391
0392 virtual Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) = 0;
0393
0394
0395
0396
0397
0398
0399
0400
0401
0402
0403 virtual Result<FragmentGenerator> GetFragmentsAsyncImpl(
0404 compute::Expression predicate, arrow::internal::Executor* executor);
0405
0406 std::shared_ptr<Schema> schema_;
0407 compute::Expression partition_expression_ = compute::literal(true);
0408 std::unique_ptr<DatasetEvolutionStrategy> evolution_strategy_ =
0409 MakeBasicDatasetEvolutionStrategy();
0410 };
0411
0412
0413
0414
0415
0416
0417
0418
0419 class ARROW_DS_EXPORT InMemoryDataset : public Dataset {
0420 public:
0421 class RecordBatchGenerator {
0422 public:
0423 virtual ~RecordBatchGenerator() = default;
0424 virtual RecordBatchIterator Get() const = 0;
0425 };
0426
0427
0428 InMemoryDataset(std::shared_ptr<Schema> schema,
0429 std::shared_ptr<RecordBatchGenerator> get_batches)
0430 : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {}
0431
0432
0433 InMemoryDataset(std::shared_ptr<Schema> schema, RecordBatchVector batches);
0434
0435
0436 explicit InMemoryDataset(std::shared_ptr<Table> table);
0437
0438 std::string type_name() const override { return "in-memory"; }
0439
0440 Result<std::shared_ptr<Dataset>> ReplaceSchema(
0441 std::shared_ptr<Schema> schema) const override;
0442
0443 protected:
0444 Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
0445
0446 std::shared_ptr<RecordBatchGenerator> get_batches_;
0447 };
0448
0449
0450 class ARROW_DS_EXPORT UnionDataset : public Dataset {
0451 public:
0452
0453
0454
0455
0456
0457 static Result<std::shared_ptr<UnionDataset>> Make(std::shared_ptr<Schema> schema,
0458 DatasetVector children);
0459
0460 const DatasetVector& children() const { return children_; }
0461
0462 std::string type_name() const override { return "union"; }
0463
0464 Result<std::shared_ptr<Dataset>> ReplaceSchema(
0465 std::shared_ptr<Schema> schema) const override;
0466
0467 protected:
0468 Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) override;
0469
0470 explicit UnionDataset(std::shared_ptr<Schema> schema, DatasetVector children)
0471 : Dataset(std::move(schema)), children_(std::move(children)) {}
0472
0473 DatasetVector children_;
0474
0475 friend class UnionDatasetFactory;
0476 };
0477
0478
0479
0480 }
0481 }