File indexing completed on 2025-08-28 08:26:58
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #pragma once
0021
0022 #include <memory>
0023 #include <optional>
0024 #include <string>
0025 #include <unordered_set>
0026 #include <utility>
0027 #include <vector>
0028
0029 #include "arrow/dataset/discovery.h"
0030 #include "arrow/dataset/file_base.h"
0031 #include "arrow/dataset/type_fwd.h"
0032 #include "arrow/dataset/visibility.h"
0033 #include "arrow/io/caching.h"
0034
0035 namespace parquet {
0036 class ParquetFileReader;
0037 class Statistics;
0038 class ColumnChunkMetaData;
0039 class RowGroupMetaData;
0040 class FileMetaData;
0041 class FileDecryptionProperties;
0042 class FileEncryptionProperties;
0043
0044 class ReaderProperties;
0045 class ArrowReaderProperties;
0046
0047 class WriterProperties;
0048 class ArrowWriterProperties;
0049
0050 namespace arrow {
0051 class FileReader;
0052 class FileWriter;
0053 struct SchemaManifest;
0054 }
0055 }
0056
0057 namespace arrow {
0058 namespace dataset {
0059
0060 struct ParquetDecryptionConfig;
0061 struct ParquetEncryptionConfig;
0062
0063
0064
0065
0066
0067 constexpr char kParquetTypeName[] = "parquet";
0068
0069
0070 class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
0071 public:
0072 ParquetFileFormat();
0073
0074
0075
0076 explicit ParquetFileFormat(const parquet::ReaderProperties& reader_properties);
0077
0078 std::string type_name() const override { return kParquetTypeName; }
0079
0080 bool Equals(const FileFormat& other) const override;
0081
0082 struct ReaderOptions {
0083
0084
0085
0086
0087
0088
0089
0090
0091 std::unordered_set<std::string> dict_columns;
0092 arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
0093
0094 } reader_options;
0095
0096 Result<bool> IsSupported(const FileSource& source) const override;
0097
0098
0099 Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;
0100
0101 Result<RecordBatchGenerator> ScanBatchesAsync(
0102 const std::shared_ptr<ScanOptions>& options,
0103 const std::shared_ptr<FileFragment>& file) const override;
0104
0105 Future<std::optional<int64_t>> CountRows(
0106 const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
0107 const std::shared_ptr<ScanOptions>& options) override;
0108
0109 using FileFormat::MakeFragment;
0110
0111
0112 Result<std::shared_ptr<FileFragment>> MakeFragment(
0113 FileSource source, compute::Expression partition_expression,
0114 std::shared_ptr<Schema> physical_schema) override;
0115
0116
0117 Result<std::shared_ptr<ParquetFileFragment>> MakeFragment(
0118 FileSource source, compute::Expression partition_expression,
0119 std::shared_ptr<Schema> physical_schema, std::vector<int> row_groups);
0120
0121
0122 Result<std::shared_ptr<parquet::arrow::FileReader>> GetReader(
0123 const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;
0124
0125 Result<std::shared_ptr<parquet::arrow::FileReader>> GetReader(
0126 const FileSource& source, const std::shared_ptr<ScanOptions>& options,
0127 const std::shared_ptr<parquet::FileMetaData>& metadata) const;
0128
0129 Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
0130 const FileSource& source, const std::shared_ptr<ScanOptions>& options) const;
0131
0132 Future<std::shared_ptr<parquet::arrow::FileReader>> GetReaderAsync(
0133 const FileSource& source, const std::shared_ptr<ScanOptions>& options,
0134 const std::shared_ptr<parquet::FileMetaData>& metadata) const;
0135
0136 Result<std::shared_ptr<FileWriter>> MakeWriter(
0137 std::shared_ptr<io::OutputStream> destination, std::shared_ptr<Schema> schema,
0138 std::shared_ptr<FileWriteOptions> options,
0139 fs::FileLocator destination_locator) const override;
0140
0141 std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
0142 };
0143
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156 class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
0157 public:
0158 Result<FragmentVector> SplitByRowGroup(compute::Expression predicate);
0159
0160
0161 const std::vector<int>& row_groups() const {
0162 if (row_groups_) return *row_groups_;
0163 static std::vector<int> empty;
0164 return empty;
0165 }
0166
0167
0168 std::shared_ptr<parquet::FileMetaData> metadata();
0169
0170
0171 Status EnsureCompleteMetadata(parquet::arrow::FileReader* reader = NULLPTR);
0172
0173
0174 Result<std::shared_ptr<Fragment>> Subset(compute::Expression predicate);
0175 Result<std::shared_ptr<Fragment>> Subset(std::vector<int> row_group_ids);
0176
0177 static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
0178 const Field& field, const parquet::Statistics& statistics);
0179
0180 static std::optional<compute::Expression> EvaluateStatisticsAsExpression(
0181 const Field& field, const FieldRef& field_ref,
0182 const parquet::Statistics& statistics);
0183
0184 private:
0185 ParquetFileFragment(FileSource source, std::shared_ptr<FileFormat> format,
0186 compute::Expression partition_expression,
0187 std::shared_ptr<Schema> physical_schema,
0188 std::optional<std::vector<int>> row_groups);
0189
0190 Status SetMetadata(std::shared_ptr<parquet::FileMetaData> metadata,
0191 std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
0192 std::shared_ptr<parquet::FileMetaData> original_metadata = {});
0193
0194
0195 Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override {
0196 ARROW_RETURN_NOT_OK(EnsureCompleteMetadata());
0197 return physical_schema_;
0198 }
0199
0200
0201 Result<std::vector<int>> FilterRowGroups(compute::Expression predicate);
0202
0203 Result<std::vector<compute::Expression>> TestRowGroups(compute::Expression predicate);
0204
0205
0206
0207 Result<std::optional<int64_t>> TryCountRows(compute::Expression predicate);
0208
0209 ParquetFileFormat& parquet_format_;
0210
0211
0212
0213 std::optional<std::vector<int>> row_groups_;
0214
0215
0216
0217 std::vector<compute::Expression> statistics_expressions_;
0218
0219
0220 std::vector<bool> statistics_expressions_complete_;
0221 std::shared_ptr<parquet::FileMetaData> metadata_;
0222 std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
0223
0224 std::shared_ptr<parquet::FileMetaData> original_metadata_;
0225
0226 friend class ParquetFileFormat;
0227 friend class ParquetDatasetFactory;
0228 };
0229
0230
0231 class ARROW_DS_EXPORT ParquetFragmentScanOptions : public FragmentScanOptions {
0232 public:
0233 ParquetFragmentScanOptions();
0234 std::string type_name() const override { return kParquetTypeName; }
0235
0236
0237
0238 std::shared_ptr<parquet::ReaderProperties> reader_properties;
0239
0240
0241
0242 std::shared_ptr<parquet::ArrowReaderProperties> arrow_reader_properties;
0243
0244 std::shared_ptr<ParquetDecryptionConfig> parquet_decryption_config = NULLPTR;
0245 };
0246
0247 class ARROW_DS_EXPORT ParquetFileWriteOptions : public FileWriteOptions {
0248 public:
0249
0250 std::shared_ptr<parquet::WriterProperties> writer_properties;
0251
0252
0253 std::shared_ptr<parquet::ArrowWriterProperties> arrow_writer_properties;
0254
0255
0256 std::shared_ptr<ParquetEncryptionConfig> parquet_encryption_config = NULLPTR;
0257
0258 protected:
0259 explicit ParquetFileWriteOptions(std::shared_ptr<FileFormat> format)
0260 : FileWriteOptions(std::move(format)) {}
0261
0262 friend class ParquetFileFormat;
0263 };
0264
0265 class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter {
0266 public:
0267 const std::shared_ptr<parquet::arrow::FileWriter>& parquet_writer() const {
0268 return parquet_writer_;
0269 }
0270
0271 Status Write(const std::shared_ptr<RecordBatch>& batch) override;
0272
0273 private:
0274 ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
0275 std::shared_ptr<parquet::arrow::FileWriter> writer,
0276 std::shared_ptr<ParquetFileWriteOptions> options,
0277 fs::FileLocator destination_locator);
0278
0279 Future<> FinishInternal() override;
0280
0281 std::shared_ptr<parquet::arrow::FileWriter> parquet_writer_;
0282
0283 friend class ParquetFileFormat;
0284 };
0285
0286
0287 struct ParquetFactoryOptions {
0288
0289
0290
0291
0292
0293
0294
0295
0296 PartitioningOrFactory partitioning{Partitioning::Default()};
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307
0308
0309
0310
0311
0312 std::string partition_base_dir;
0313
0314
0315
0316
0317
0318
0319 bool validate_column_chunk_paths = false;
0320 };
0321
0322
0323
0324
0325
0326
0327
0328
0329
0330
0331 class ARROW_DS_EXPORT ParquetDatasetFactory : public DatasetFactory {
0332 public:
0333
0334
0335
0336
0337
0338
0339
0340
0341
0342 static Result<std::shared_ptr<DatasetFactory>> Make(
0343 const std::string& metadata_path, std::shared_ptr<fs::FileSystem> filesystem,
0344 std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options);
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355
0356
0357 static Result<std::shared_ptr<DatasetFactory>> Make(
0358 const FileSource& metadata, const std::string& base_path,
0359 std::shared_ptr<fs::FileSystem> filesystem,
0360 std::shared_ptr<ParquetFileFormat> format, ParquetFactoryOptions options);
0361
0362 Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
0363 InspectOptions options) override;
0364
0365 Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
0366
0367 protected:
0368 ParquetDatasetFactory(
0369 std::shared_ptr<fs::FileSystem> filesystem,
0370 std::shared_ptr<ParquetFileFormat> format,
0371 std::shared_ptr<parquet::FileMetaData> metadata,
0372 std::shared_ptr<parquet::arrow::SchemaManifest> manifest,
0373 std::shared_ptr<Schema> physical_schema, std::string base_path,
0374 ParquetFactoryOptions options,
0375 std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids)
0376 : filesystem_(std::move(filesystem)),
0377 format_(std::move(format)),
0378 metadata_(std::move(metadata)),
0379 manifest_(std::move(manifest)),
0380 physical_schema_(std::move(physical_schema)),
0381 base_path_(std::move(base_path)),
0382 options_(std::move(options)),
0383 paths_with_row_group_ids_(std::move(paths_with_row_group_ids)) {}
0384
0385 std::shared_ptr<fs::FileSystem> filesystem_;
0386 std::shared_ptr<ParquetFileFormat> format_;
0387 std::shared_ptr<parquet::FileMetaData> metadata_;
0388 std::shared_ptr<parquet::arrow::SchemaManifest> manifest_;
0389 std::shared_ptr<Schema> physical_schema_;
0390 std::string base_path_;
0391 ParquetFactoryOptions options_;
0392 std::vector<std::pair<std::string, std::vector<int>>> paths_with_row_group_ids_;
0393
0394 private:
0395 Result<std::vector<std::shared_ptr<FileFragment>>> CollectParquetFragments(
0396 const Partitioning& partitioning);
0397
0398 Result<std::shared_ptr<Schema>> PartitionSchema();
0399 };
0400
0401
0402
0403 }
0404 }