File indexing completed on 2025-08-28 08:26:58
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020 #pragma once
0021
0022 #include <functional>
0023 #include <iosfwd>
0024 #include <memory>
0025 #include <optional>
0026 #include <string>
0027 #include <unordered_map>
0028 #include <utility>
0029 #include <vector>
0030
0031 #include "arrow/compute/expression.h"
0032 #include "arrow/dataset/type_fwd.h"
0033 #include "arrow/dataset/visibility.h"
0034 #include "arrow/util/compare.h"
0035
0036 namespace arrow {
0037
0038 namespace dataset {
0039
0040 constexpr char kFilenamePartitionSep = '_';
0041
0042 struct ARROW_DS_EXPORT PartitionPathFormat {
0043 std::string directory, filename;
0044 };
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067 class ARROW_DS_EXPORT Partitioning : public util::EqualityComparable<Partitioning> {
0068 public:
0069 virtual ~Partitioning() = default;
0070
0071
0072 virtual std::string type_name() const = 0;
0073
0074
0075 virtual bool Equals(const Partitioning& other) const {
0076 return schema_->Equals(other.schema_, false);
0077 }
0078
0079
0080
0081 struct PartitionedBatches {
0082 RecordBatchVector batches;
0083 std::vector<compute::Expression> expressions;
0084 };
0085 virtual Result<PartitionedBatches> Partition(
0086 const std::shared_ptr<RecordBatch>& batch) const = 0;
0087
0088
0089 virtual Result<compute::Expression> Parse(const std::string& path) const = 0;
0090
0091 virtual Result<PartitionPathFormat> Format(const compute::Expression& expr) const = 0;
0092
0093
0094
0095 static std::shared_ptr<Partitioning> Default();
0096
0097
0098 const std::shared_ptr<Schema>& schema() const { return schema_; }
0099
0100 protected:
0101 explicit Partitioning(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
0102
0103 std::shared_ptr<Schema> schema_;
0104 };
0105
0106
0107 enum class SegmentEncoding : int8_t {
0108
0109 None = 0,
0110
0111 Uri = 1,
0112 };
0113
0114 ARROW_DS_EXPORT
0115 std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding);
0116
0117
0118 struct ARROW_DS_EXPORT KeyValuePartitioningOptions {
0119
0120
0121 SegmentEncoding segment_encoding = SegmentEncoding::Uri;
0122 };
0123
0124
0125 struct ARROW_DS_EXPORT PartitioningFactoryOptions {
0126
0127
0128
0129
0130 bool infer_dictionary = false;
0131
0132
0133
0134 std::shared_ptr<Schema> schema;
0135
0136
0137 SegmentEncoding segment_encoding = SegmentEncoding::Uri;
0138
0139 KeyValuePartitioningOptions AsPartitioningOptions() const;
0140 };
0141
0142
0143 struct ARROW_DS_EXPORT HivePartitioningFactoryOptions : PartitioningFactoryOptions {
0144
0145 std::string null_fallback;
0146
0147 HivePartitioningOptions AsHivePartitioningOptions() const;
0148 };
0149
0150
0151
0152 class ARROW_DS_EXPORT PartitioningFactory {
0153 public:
0154 virtual ~PartitioningFactory() = default;
0155
0156
0157 virtual std::string type_name() const = 0;
0158
0159
0160
0161 virtual Result<std::shared_ptr<Schema>> Inspect(
0162 const std::vector<std::string>& paths) = 0;
0163
0164
0165
0166 virtual Result<std::shared_ptr<Partitioning>> Finish(
0167 const std::shared_ptr<Schema>& schema) const = 0;
0168 };
0169
0170
0171
0172 class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
0173 public:
0174
0175
0176 struct Key {
0177 std::string name;
0178 std::optional<std::string> value;
0179 };
0180
0181 Result<PartitionedBatches> Partition(
0182 const std::shared_ptr<RecordBatch>& batch) const override;
0183
0184 Result<compute::Expression> Parse(const std::string& path) const override;
0185
0186 Result<PartitionPathFormat> Format(const compute::Expression& expr) const override;
0187
0188 const ArrayVector& dictionaries() const { return dictionaries_; }
0189
0190 SegmentEncoding segment_encoding() const { return options_.segment_encoding; }
0191
0192 bool Equals(const Partitioning& other) const override;
0193
0194 protected:
0195 KeyValuePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries,
0196 KeyValuePartitioningOptions options)
0197 : Partitioning(std::move(schema)),
0198 dictionaries_(std::move(dictionaries)),
0199 options_(options) {
0200 if (dictionaries_.empty()) {
0201 dictionaries_.resize(schema_->num_fields());
0202 }
0203 }
0204
0205 virtual Result<std::vector<Key>> ParseKeys(const std::string& path) const = 0;
0206
0207 virtual Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const = 0;
0208
0209
0210 Result<compute::Expression> ConvertKey(const Key& key) const;
0211
0212 Result<std::vector<std::string>> FormatPartitionSegments(
0213 const ScalarVector& values) const;
0214 Result<std::vector<Key>> ParsePartitionSegments(
0215 const std::vector<std::string>& segments) const;
0216
0217 ArrayVector dictionaries_;
0218 KeyValuePartitioningOptions options_;
0219 };
0220
0221
0222
0223
0224
0225
0226
0227 class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning {
0228 public:
0229
0230
0231 explicit DirectoryPartitioning(std::shared_ptr<Schema> schema,
0232 ArrayVector dictionaries = {},
0233 KeyValuePartitioningOptions options = {});
0234
0235 std::string type_name() const override { return "directory"; }
0236
0237 bool Equals(const Partitioning& other) const override;
0238
0239
0240
0241
0242
0243 static std::shared_ptr<PartitioningFactory> MakeFactory(
0244 std::vector<std::string> field_names, PartitioningFactoryOptions = {});
0245
0246 private:
0247 Result<std::vector<Key>> ParseKeys(const std::string& path) const override;
0248
0249 Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override;
0250 };
0251
0252
0253 static constexpr char kDefaultHiveNullFallback[] = "__HIVE_DEFAULT_PARTITION__";
0254
0255 struct ARROW_DS_EXPORT HivePartitioningOptions : public KeyValuePartitioningOptions {
0256 std::string null_fallback = kDefaultHiveNullFallback;
0257
0258 static HivePartitioningOptions DefaultsWithNullFallback(std::string fallback) {
0259 HivePartitioningOptions options;
0260 options.null_fallback = std::move(fallback);
0261 return options;
0262 }
0263 };
0264
0265
0266
0267
0268
0269
0270
0271
0272
0273
0274 class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning {
0275 public:
0276
0277
0278 explicit HivePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries = {},
0279 std::string null_fallback = kDefaultHiveNullFallback)
0280 : KeyValuePartitioning(std::move(schema), std::move(dictionaries),
0281 KeyValuePartitioningOptions()),
0282 hive_options_(
0283 HivePartitioningOptions::DefaultsWithNullFallback(std::move(null_fallback))) {
0284 }
0285
0286 explicit HivePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries,
0287 HivePartitioningOptions options)
0288 : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options),
0289 hive_options_(options) {}
0290
0291 std::string type_name() const override { return "hive"; }
0292 std::string null_fallback() const { return hive_options_.null_fallback; }
0293 const HivePartitioningOptions& options() const { return hive_options_; }
0294
0295 static Result<std::optional<Key>> ParseKey(const std::string& segment,
0296 const HivePartitioningOptions& options);
0297
0298 bool Equals(const Partitioning& other) const override;
0299
0300
0301 static std::shared_ptr<PartitioningFactory> MakeFactory(
0302 HivePartitioningFactoryOptions = {});
0303
0304 private:
0305 const HivePartitioningOptions hive_options_;
0306 Result<std::vector<Key>> ParseKeys(const std::string& path) const override;
0307
0308 Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override;
0309 };
0310
0311
0312 class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning {
0313 public:
0314 using ParseImpl = std::function<Result<compute::Expression>(const std::string&)>;
0315
0316 using FormatImpl =
0317 std::function<Result<PartitionPathFormat>(const compute::Expression&)>;
0318
0319 FunctionPartitioning(std::shared_ptr<Schema> schema, ParseImpl parse_impl,
0320 FormatImpl format_impl = NULLPTR, std::string name = "function")
0321 : Partitioning(std::move(schema)),
0322 parse_impl_(std::move(parse_impl)),
0323 format_impl_(std::move(format_impl)),
0324 name_(std::move(name)) {}
0325
0326 std::string type_name() const override { return name_; }
0327
0328 bool Equals(const Partitioning& other) const override { return false; }
0329
0330 Result<compute::Expression> Parse(const std::string& path) const override {
0331 return parse_impl_(path);
0332 }
0333
0334 Result<PartitionPathFormat> Format(const compute::Expression& expr) const override {
0335 if (format_impl_) {
0336 return format_impl_(expr);
0337 }
0338 return Status::NotImplemented("formatting paths from ", type_name(), " Partitioning");
0339 }
0340
0341 Result<PartitionedBatches> Partition(
0342 const std::shared_ptr<RecordBatch>& batch) const override {
0343 return Status::NotImplemented("partitioning batches from ", type_name(),
0344 " Partitioning");
0345 }
0346
0347 private:
0348 ParseImpl parse_impl_;
0349 FormatImpl format_impl_;
0350 std::string name_;
0351 };
0352
0353 class ARROW_DS_EXPORT FilenamePartitioning : public KeyValuePartitioning {
0354 public:
0355
0356
0357
0358
0359 explicit FilenamePartitioning(std::shared_ptr<Schema> schema,
0360 ArrayVector dictionaries = {},
0361 KeyValuePartitioningOptions options = {});
0362
0363 std::string type_name() const override { return "filename"; }
0364
0365
0366
0367
0368
0369 static std::shared_ptr<PartitioningFactory> MakeFactory(
0370 std::vector<std::string> field_names, PartitioningFactoryOptions = {});
0371
0372 bool Equals(const Partitioning& other) const override;
0373
0374 private:
0375 Result<std::vector<Key>> ParseKeys(const std::string& path) const override;
0376
0377 Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override;
0378 };
0379
0380 ARROW_DS_EXPORT std::string StripPrefix(const std::string& path,
0381 const std::string& prefix);
0382
0383
0384
0385
0386
0387 ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path,
0388 const std::string& prefix);
0389
0390
0391 ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
0392 const std::vector<std::string>& paths, const std::string& prefix);
0393
0394
0395 ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
0396 const std::vector<fs::FileInfo>& files, const std::string& prefix);
0397
0398
0399 class ARROW_DS_EXPORT PartitioningOrFactory {
0400 public:
0401 explicit PartitioningOrFactory(std::shared_ptr<Partitioning> partitioning)
0402 : partitioning_(std::move(partitioning)) {}
0403
0404 explicit PartitioningOrFactory(std::shared_ptr<PartitioningFactory> factory)
0405 : factory_(std::move(factory)) {}
0406
0407 PartitioningOrFactory& operator=(std::shared_ptr<Partitioning> partitioning) {
0408 return *this = PartitioningOrFactory(std::move(partitioning));
0409 }
0410
0411 PartitioningOrFactory& operator=(std::shared_ptr<PartitioningFactory> factory) {
0412 return *this = PartitioningOrFactory(std::move(factory));
0413 }
0414
0415
0416 const std::shared_ptr<Partitioning>& partitioning() const { return partitioning_; }
0417
0418
0419 const std::shared_ptr<PartitioningFactory>& factory() const { return factory_; }
0420
0421
0422 Result<std::shared_ptr<Schema>> GetOrInferSchema(const std::vector<std::string>& paths);
0423
0424 private:
0425 std::shared_ptr<PartitioningFactory> factory_;
0426 std::shared_ptr<Partitioning> partitioning_;
0427 };
0428
0429
0430
0431 }
0432 }