Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:58

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 // This API is EXPERIMENTAL.
0019 
0020 #pragma once
0021 
0022 #include <functional>
0023 #include <iosfwd>
0024 #include <memory>
0025 #include <optional>
0026 #include <string>
0027 #include <unordered_map>
0028 #include <utility>
0029 #include <vector>
0030 
0031 #include "arrow/compute/expression.h"
0032 #include "arrow/dataset/type_fwd.h"
0033 #include "arrow/dataset/visibility.h"
0034 #include "arrow/util/compare.h"
0035 
0036 namespace arrow {
0037 
0038 namespace dataset {
0039 
0040 constexpr char kFilenamePartitionSep = '_';
0041 
0042 struct ARROW_DS_EXPORT PartitionPathFormat {
0043   std::string directory, filename;
0044 };
0045 
0046 // ----------------------------------------------------------------------
0047 // Partitioning
0048 
0049 /// \defgroup dataset-partitioning Partitioning API
0050 ///
0051 /// @{
0052 
0053 /// \brief Interface for parsing partition expressions from string partition
0054 /// identifiers.
0055 ///
0056 /// For example, the identifier "foo=5" might be parsed to an equality expression
0057 /// between the "foo" field and the value 5.
0058 ///
0059 /// Some partitionings may store the field names in a metadata
0060 /// store instead of in file paths, for example
0061 /// dataset_root/2009/11/... could be used when the partition fields
0062 /// are "year" and "month"
0063 ///
0064 /// Paths are consumed from left to right. Paths must be relative to
0065 /// the root of a partition; path prefixes must be removed before passing
0066 /// the path to a partitioning for parsing.
0067 class ARROW_DS_EXPORT Partitioning : public util::EqualityComparable<Partitioning> {
0068  public:
0069   virtual ~Partitioning() = default;
0070 
0071   /// \brief The name identifying the kind of partitioning
0072   virtual std::string type_name() const = 0;
0073 
0074   //// \brief Return whether the partitionings are equal
0075   virtual bool Equals(const Partitioning& other) const {
0076     return schema_->Equals(other.schema_, /*check_metadata=*/false);
0077   }
0078 
0079   /// \brief If the input batch shares any fields with this partitioning,
0080   /// produce sub-batches which satisfy mutually exclusive Expressions.
0081   struct PartitionedBatches {
0082     RecordBatchVector batches;
0083     std::vector<compute::Expression> expressions;
0084   };
0085   virtual Result<PartitionedBatches> Partition(
0086       const std::shared_ptr<RecordBatch>& batch) const = 0;
0087 
0088   /// \brief Parse a path into a partition expression
0089   virtual Result<compute::Expression> Parse(const std::string& path) const = 0;
0090 
0091   virtual Result<PartitionPathFormat> Format(const compute::Expression& expr) const = 0;
0092 
0093   /// \brief A default Partitioning which is a DirectoryPartitioning
0094   /// with an empty schema.
0095   static std::shared_ptr<Partitioning> Default();
0096 
0097   /// \brief The partition schema.
0098   const std::shared_ptr<Schema>& schema() const { return schema_; }
0099 
0100  protected:
0101   explicit Partitioning(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {}
0102 
0103   std::shared_ptr<Schema> schema_;
0104 };
0105 
0106 /// \brief The encoding of partition segments.
0107 enum class SegmentEncoding : int8_t {
0108   /// No encoding.
0109   None = 0,
0110   /// Segment values are URL-encoded.
0111   Uri = 1,
0112 };
0113 
0114 ARROW_DS_EXPORT
0115 std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding);
0116 
0117 /// \brief Options for key-value based partitioning (hive/directory).
0118 struct ARROW_DS_EXPORT KeyValuePartitioningOptions {
0119   /// After splitting a path into components, decode the path components
0120   /// before parsing according to this scheme.
0121   SegmentEncoding segment_encoding = SegmentEncoding::Uri;
0122 };
0123 
0124 /// \brief Options for inferring a partitioning.
0125 struct ARROW_DS_EXPORT PartitioningFactoryOptions {
0126   /// When inferring a schema for partition fields, yield dictionary encoded types
0127   /// instead of plain. This can be more efficient when materializing virtual
0128   /// columns, and Expressions parsed by the finished Partitioning will include
0129   /// dictionaries of all unique inspected values for each field.
0130   bool infer_dictionary = false;
0131   /// Optionally, an expected schema can be provided, in which case inference
0132   /// will only check discovered fields against the schema and update internal
0133   /// state (such as dictionaries).
0134   std::shared_ptr<Schema> schema;
0135   /// After splitting a path into components, decode the path components
0136   /// before parsing according to this scheme.
0137   SegmentEncoding segment_encoding = SegmentEncoding::Uri;
0138 
0139   KeyValuePartitioningOptions AsPartitioningOptions() const;
0140 };
0141 
0142 /// \brief Options for inferring a hive-style partitioning.
0143 struct ARROW_DS_EXPORT HivePartitioningFactoryOptions : PartitioningFactoryOptions {
0144   /// The hive partitioning scheme maps null to a hard coded fallback string.
0145   std::string null_fallback;
0146 
0147   HivePartitioningOptions AsHivePartitioningOptions() const;
0148 };
0149 
0150 /// \brief PartitioningFactory provides creation of a partitioning  when the
0151 /// specific schema must be inferred from available paths (no explicit schema is known).
0152 class ARROW_DS_EXPORT PartitioningFactory {
0153  public:
0154   virtual ~PartitioningFactory() = default;
0155 
0156   /// \brief The name identifying the kind of partitioning
0157   virtual std::string type_name() const = 0;
0158 
0159   /// Get the schema for the resulting Partitioning.
0160   /// This may reset internal state, for example dictionaries of unique representations.
0161   virtual Result<std::shared_ptr<Schema>> Inspect(
0162       const std::vector<std::string>& paths) = 0;
0163 
0164   /// Create a partitioning using the provided schema
0165   /// (fields may be dropped).
0166   virtual Result<std::shared_ptr<Partitioning>> Finish(
0167       const std::shared_ptr<Schema>& schema) const = 0;
0168 };
0169 
0170 /// \brief Subclass for the common case of a partitioning which yields an equality
0171 /// expression for each segment
0172 class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning {
0173  public:
0174   /// An unconverted equality expression consisting of a field name and the representation
0175   /// of a scalar value
0176   struct Key {
0177     std::string name;
0178     std::optional<std::string> value;
0179   };
0180 
0181   Result<PartitionedBatches> Partition(
0182       const std::shared_ptr<RecordBatch>& batch) const override;
0183 
0184   Result<compute::Expression> Parse(const std::string& path) const override;
0185 
0186   Result<PartitionPathFormat> Format(const compute::Expression& expr) const override;
0187 
0188   const ArrayVector& dictionaries() const { return dictionaries_; }
0189 
0190   SegmentEncoding segment_encoding() const { return options_.segment_encoding; }
0191 
0192   bool Equals(const Partitioning& other) const override;
0193 
0194  protected:
0195   KeyValuePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries,
0196                        KeyValuePartitioningOptions options)
0197       : Partitioning(std::move(schema)),
0198         dictionaries_(std::move(dictionaries)),
0199         options_(options) {
0200     if (dictionaries_.empty()) {
0201       dictionaries_.resize(schema_->num_fields());
0202     }
0203   }
0204 
0205   virtual Result<std::vector<Key>> ParseKeys(const std::string& path) const = 0;
0206 
0207   virtual Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const = 0;
0208 
0209   /// Convert a Key to a full expression.
0210   Result<compute::Expression> ConvertKey(const Key& key) const;
0211 
0212   Result<std::vector<std::string>> FormatPartitionSegments(
0213       const ScalarVector& values) const;
0214   Result<std::vector<Key>> ParsePartitionSegments(
0215       const std::vector<std::string>& segments) const;
0216 
0217   ArrayVector dictionaries_;
0218   KeyValuePartitioningOptions options_;
0219 };
0220 
0221 /// \brief DirectoryPartitioning parses one segment of a path for each field in its
0222 /// schema. All fields are required, so paths passed to DirectoryPartitioning::Parse
0223 /// must contain segments for each field.
0224 ///
0225 /// For example given schema<year:int16, month:int8> the path "/2009/11" would be
0226 /// parsed to ("year"_ == 2009 and "month"_ == 11)
0227 class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning {
0228  public:
0229   /// If a field in schema is of dictionary type, the corresponding element of
0230   /// dictionaries must be contain the dictionary of values for that field.
0231   explicit DirectoryPartitioning(std::shared_ptr<Schema> schema,
0232                                  ArrayVector dictionaries = {},
0233                                  KeyValuePartitioningOptions options = {});
0234 
0235   std::string type_name() const override { return "directory"; }
0236 
0237   bool Equals(const Partitioning& other) const override;
0238 
0239   /// \brief Create a factory for a directory partitioning.
0240   ///
0241   /// \param[in] field_names The names for the partition fields. Types will be
0242   ///     inferred.
0243   static std::shared_ptr<PartitioningFactory> MakeFactory(
0244       std::vector<std::string> field_names, PartitioningFactoryOptions = {});
0245 
0246  private:
0247   Result<std::vector<Key>> ParseKeys(const std::string& path) const override;
0248 
0249   Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override;
0250 };
0251 
0252 /// \brief The default fallback used for null values in a Hive-style partitioning.
0253 static constexpr char kDefaultHiveNullFallback[] = "__HIVE_DEFAULT_PARTITION__";
0254 
0255 struct ARROW_DS_EXPORT HivePartitioningOptions : public KeyValuePartitioningOptions {
0256   std::string null_fallback = kDefaultHiveNullFallback;
0257 
0258   static HivePartitioningOptions DefaultsWithNullFallback(std::string fallback) {
0259     HivePartitioningOptions options;
0260     options.null_fallback = std::move(fallback);
0261     return options;
0262   }
0263 };
0264 
0265 /// \brief Multi-level, directory based partitioning
0266 /// originating from Apache Hive with all data files stored in the
0267 /// leaf directories. Data is partitioned by static values of a
0268 /// particular column in the schema. Partition keys are represented in
0269 /// the form $key=$value in directory names.
0270 /// Field order is ignored, as are missing or unrecognized field names.
0271 ///
0272 /// For example given schema<year:int16, month:int8, day:int8> the path
0273 /// "/day=321/ignored=3.4/year=2009" parses to ("year"_ == 2009 and "day"_ == 321)
0274 class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning {
0275  public:
0276   /// If a field in schema is of dictionary type, the corresponding element of
0277   /// dictionaries must be contain the dictionary of values for that field.
0278   explicit HivePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries = {},
0279                             std::string null_fallback = kDefaultHiveNullFallback)
0280       : KeyValuePartitioning(std::move(schema), std::move(dictionaries),
0281                              KeyValuePartitioningOptions()),
0282         hive_options_(
0283             HivePartitioningOptions::DefaultsWithNullFallback(std::move(null_fallback))) {
0284   }
0285 
0286   explicit HivePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries,
0287                             HivePartitioningOptions options)
0288       : KeyValuePartitioning(std::move(schema), std::move(dictionaries), options),
0289         hive_options_(options) {}
0290 
0291   std::string type_name() const override { return "hive"; }
0292   std::string null_fallback() const { return hive_options_.null_fallback; }
0293   const HivePartitioningOptions& options() const { return hive_options_; }
0294 
0295   static Result<std::optional<Key>> ParseKey(const std::string& segment,
0296                                              const HivePartitioningOptions& options);
0297 
0298   bool Equals(const Partitioning& other) const override;
0299 
0300   /// \brief Create a factory for a hive partitioning.
0301   static std::shared_ptr<PartitioningFactory> MakeFactory(
0302       HivePartitioningFactoryOptions = {});
0303 
0304  private:
0305   const HivePartitioningOptions hive_options_;
0306   Result<std::vector<Key>> ParseKeys(const std::string& path) const override;
0307 
0308   Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override;
0309 };
0310 
0311 /// \brief Implementation provided by lambda or other callable
0312 class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning {
0313  public:
0314   using ParseImpl = std::function<Result<compute::Expression>(const std::string&)>;
0315 
0316   using FormatImpl =
0317       std::function<Result<PartitionPathFormat>(const compute::Expression&)>;
0318 
0319   FunctionPartitioning(std::shared_ptr<Schema> schema, ParseImpl parse_impl,
0320                        FormatImpl format_impl = NULLPTR, std::string name = "function")
0321       : Partitioning(std::move(schema)),
0322         parse_impl_(std::move(parse_impl)),
0323         format_impl_(std::move(format_impl)),
0324         name_(std::move(name)) {}
0325 
0326   std::string type_name() const override { return name_; }
0327 
0328   bool Equals(const Partitioning& other) const override { return false; }
0329 
0330   Result<compute::Expression> Parse(const std::string& path) const override {
0331     return parse_impl_(path);
0332   }
0333 
0334   Result<PartitionPathFormat> Format(const compute::Expression& expr) const override {
0335     if (format_impl_) {
0336       return format_impl_(expr);
0337     }
0338     return Status::NotImplemented("formatting paths from ", type_name(), " Partitioning");
0339   }
0340 
0341   Result<PartitionedBatches> Partition(
0342       const std::shared_ptr<RecordBatch>& batch) const override {
0343     return Status::NotImplemented("partitioning batches from ", type_name(),
0344                                   " Partitioning");
0345   }
0346 
0347  private:
0348   ParseImpl parse_impl_;
0349   FormatImpl format_impl_;
0350   std::string name_;
0351 };
0352 
0353 class ARROW_DS_EXPORT FilenamePartitioning : public KeyValuePartitioning {
0354  public:
0355   /// \brief Construct a FilenamePartitioning from its components.
0356   ///
0357   /// If a field in schema is of dictionary type, the corresponding element of
0358   /// dictionaries must be contain the dictionary of values for that field.
0359   explicit FilenamePartitioning(std::shared_ptr<Schema> schema,
0360                                 ArrayVector dictionaries = {},
0361                                 KeyValuePartitioningOptions options = {});
0362 
0363   std::string type_name() const override { return "filename"; }
0364 
0365   /// \brief Create a factory for a filename partitioning.
0366   ///
0367   /// \param[in] field_names The names for the partition fields. Types will be
0368   ///     inferred.
0369   static std::shared_ptr<PartitioningFactory> MakeFactory(
0370       std::vector<std::string> field_names, PartitioningFactoryOptions = {});
0371 
0372   bool Equals(const Partitioning& other) const override;
0373 
0374  private:
0375   Result<std::vector<Key>> ParseKeys(const std::string& path) const override;
0376 
0377   Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override;
0378 };
0379 
0380 ARROW_DS_EXPORT std::string StripPrefix(const std::string& path,
0381                                         const std::string& prefix);
0382 
0383 /// \brief Extracts the directory and filename and removes the prefix of a path
0384 ///
0385 /// e.g., `StripPrefixAndFilename("/data/year=2019/c.txt", "/data") ->
0386 /// {"year=2019","c.txt"}`
0387 ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path,
0388                                                    const std::string& prefix);
0389 
0390 /// \brief Vector version of StripPrefixAndFilename.
0391 ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
0392     const std::vector<std::string>& paths, const std::string& prefix);
0393 
0394 /// \brief Vector version of StripPrefixAndFilename.
0395 ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename(
0396     const std::vector<fs::FileInfo>& files, const std::string& prefix);
0397 
0398 /// \brief Either a Partitioning or a PartitioningFactory
0399 class ARROW_DS_EXPORT PartitioningOrFactory {
0400  public:
0401   explicit PartitioningOrFactory(std::shared_ptr<Partitioning> partitioning)
0402       : partitioning_(std::move(partitioning)) {}
0403 
0404   explicit PartitioningOrFactory(std::shared_ptr<PartitioningFactory> factory)
0405       : factory_(std::move(factory)) {}
0406 
0407   PartitioningOrFactory& operator=(std::shared_ptr<Partitioning> partitioning) {
0408     return *this = PartitioningOrFactory(std::move(partitioning));
0409   }
0410 
0411   PartitioningOrFactory& operator=(std::shared_ptr<PartitioningFactory> factory) {
0412     return *this = PartitioningOrFactory(std::move(factory));
0413   }
0414 
0415   /// \brief The partitioning (if given).
0416   const std::shared_ptr<Partitioning>& partitioning() const { return partitioning_; }
0417 
0418   /// \brief The partition factory (if given).
0419   const std::shared_ptr<PartitioningFactory>& factory() const { return factory_; }
0420 
0421   /// \brief Get the partition schema, inferring it with the given factory if needed.
0422   Result<std::shared_ptr<Schema>> GetOrInferSchema(const std::vector<std::string>& paths);
0423 
0424  private:
0425   std::shared_ptr<PartitioningFactory> factory_;
0426   std::shared_ptr<Partitioning> partitioning_;
0427 };
0428 
0429 /// @}
0430 
0431 }  // namespace dataset
0432 }  // namespace arrow