Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <memory>
0021 #include <unordered_map>
0022 
0023 #include "arrow/acero/schema_util.h"
0024 #include "arrow/compute/exec.h"
0025 #include "arrow/compute/row/row_encoder_internal.h"
0026 #include "arrow/result.h"
0027 #include "arrow/status.h"
0028 #include "arrow/type.h"
0029 
0030 // This file contains hash join logic related to handling of dictionary encoded key
0031 // columns.
0032 //
0033 // A key column from probe side of the join can be matched against a key column from build
0034 // side of the join, as long as the underlying value types are equal. That means that:
0035 // - both scalars and arrays can be used and even mixed in the same column
0036 // - dictionary column can be matched against non-dictionary column if underlying value
0037 // types are equal
0038 // - dictionary column can be matched against dictionary column with a different index
0039 // type, and potentially using a different dictionary, if underlying value types are equal
0040 //
0041 // We currently require in hash join that for all dictionary encoded columns, the same
0042 // dictionary is used in all input exec batches.
0043 //
0044 // In order to allow matching columns with different dictionaries, different dictionary
0045 // index types, and dictionary key against non-dictionary key, internally comparisons will
0046 // be evaluated after remapping values on both sides of the join to a common
0047 // representation (which will be called "unified representation"). This common
0048 // representation is a column of int32() type (not a dictionary column). It represents an
0049 // index in the unified dictionary computed for the (only) dictionary present on build
0050 // side (an empty dictionary is still created for an empty build side). Null value is
0051 // always represented in this common representation as null int32 value, unified
0052 // dictionary will never contain a null value (so there is no ambiguity of representing
0053 // nulls as either index to a null entry in the dictionary or null index).
0054 //
0055 // Unified dictionary represents values present on build side. There may be values on
0056 // probe side that are not present in it. All such values, that are not null, are mapped
0057 // in the common representation to a special constant kMissingValueId.
0058 //
0059 
0060 namespace arrow {
0061 
0062 using compute::ExecBatch;
0063 using compute::ExecContext;
0064 using compute::internal::RowEncoder;
0065 
0066 namespace acero {
0067 
0068 /// Helper class with operations that are stateless and common to processing of dictionary
0069 /// keys on both build and probe side.
0070 class HashJoinDictUtil {
0071  public:
0072   // Null values in unified representation are always represented as null that has
0073   // corresponding integer set to this constant
0074   static constexpr int32_t kNullId = 0;
0075   // Constant representing a value, that is not null, missing on the build side, in
0076   // unified representation.
0077   static constexpr int32_t kMissingValueId = -1;
0078 
0079   // Check if data types of corresponding pair of key column on build and probe side are
0080   // compatible
0081   static bool KeyDataTypesValid(const std::shared_ptr<DataType>& probe_data_type,
0082                                 const std::shared_ptr<DataType>& build_data_type);
0083 
0084   // Input must be dictionary array or dictionary scalar.
0085   // A precomputed and provided here lookup table in the form of int32() array will be
0086   // used to remap input indices to unified representation.
0087   //
0088   static Result<std::shared_ptr<ArrayData>> IndexRemapUsingLUT(
0089       ExecContext* ctx, const Datum& indices, int64_t batch_length,
0090       const std::shared_ptr<ArrayData>& map_array,
0091       const std::shared_ptr<DataType>& data_type);
0092 
0093   // Return int32() array that contains indices of input dictionary array or scalar after
0094   // type casting.
0095   static Result<std::shared_ptr<ArrayData>> ConvertToInt32(
0096       const std::shared_ptr<DataType>& from_type, const Datum& input,
0097       int64_t batch_length, ExecContext* ctx);
0098 
0099   // Return an array that contains elements of input int32() array after casting to a
0100   // given integer type. This is used for mapping unified representation stored in the
0101   // hash table on build side back to original input data type of hash join, when
0102   // outputting hash join results to parent exec node.
0103   //
0104   static Result<std::shared_ptr<ArrayData>> ConvertFromInt32(
0105       const std::shared_ptr<DataType>& to_type, const Datum& input, int64_t batch_length,
0106       ExecContext* ctx);
0107 
0108   // Return dictionary referenced in either dictionary array or dictionary scalar
0109   static std::shared_ptr<Array> ExtractDictionary(const Datum& data);
0110 };
0111 
0112 /// Implements processing of dictionary arrays/scalars in key columns on the build side of
0113 /// a hash join.
0114 /// Each instance of this class corresponds to a single column and stores and
0115 /// processes only the information related to that column.
0116 /// Const methods are thread-safe, non-const methods are not (the caller must make sure
0117 /// that only one thread at any time will access them).
0118 ///
0119 class HashJoinDictBuild {
0120  public:
0121   // Returns true if the key column (described in input by its data type) requires any
0122   // pre- or post-processing related to handling dictionaries.
0123   //
0124   static bool KeyNeedsProcessing(const std::shared_ptr<DataType>& build_data_type) {
0125     return (build_data_type->id() == Type::DICTIONARY);
0126   }
0127 
0128   // Data type of unified representation
0129   static std::shared_ptr<DataType> DataTypeAfterRemapping() { return int32(); }
0130 
0131   // Should be called only once in hash join, before processing any build or probe
0132   // batches.
0133   //
0134   // Takes a pointer to the dictionary for a corresponding key column on the build side as
0135   // an input. If the build side is empty, it still needs to be called, but with
0136   // dictionary pointer set to null.
0137   //
0138   // Currently it is required that all input batches on build side share the same
0139   // dictionary. For each input batch during its pre-processing, dictionary will be
0140   // checked and error will be returned if it is different then the one provided in the
0141   // call to this method.
0142   //
0143   // Unifies the dictionary. The order of the values is still preserved.
0144   // Null and duplicate entries are removed. If the dictionary is already unified, its
0145   // copy will be produced and stored within this class.
0146   //
0147   // Prepares the mapping from ids within original dictionary to the ids in the resulting
0148   // dictionary. This is used later on to pre-process (map to unified representation) key
0149   // column on build side.
0150   //
0151   // Prepares the reverse mapping (in the form of hash table) from values to the ids in
0152   // the resulting dictionary. This will be used later on to pre-process (map to unified
0153   // representation) key column on probe side. Values on probe side that are not present
0154   // in the original dictionary will be mapped to a special constant kMissingValueId. The
0155   // exception is made for nulls, which get always mapped to nulls (both when null is
0156   // represented as a dictionary id pointing to a null and a null dictionary id).
0157   //
0158   Status Init(ExecContext* ctx, std::shared_ptr<Array> dictionary,
0159               std::shared_ptr<DataType> index_type, std::shared_ptr<DataType> value_type);
0160 
0161   // Remap array or scalar values into unified representation (array of int32()).
0162   // Outputs kMissingValueId if input value is not found in the unified dictionary.
0163   // Outputs null for null input value (with corresponding data set to kNullId).
0164   //
0165   Result<std::shared_ptr<ArrayData>> RemapInputValues(ExecContext* ctx,
0166                                                       const Datum& values,
0167                                                       int64_t batch_length) const;
0168 
0169   // Remap dictionary array or dictionary scalar on build side to unified representation.
0170   // Dictionary referenced in the input must match the dictionary that was
0171   // given during initialization.
0172   // The output is a dictionary array that references unified dictionary.
0173   //
0174   Result<std::shared_ptr<ArrayData>> RemapInput(
0175       ExecContext* ctx, const Datum& indices, int64_t batch_length,
0176       const std::shared_ptr<DataType>& data_type) const;
0177 
0178   // Outputs dictionary array referencing unified dictionary, given an array with 32-bit
0179   // ids.
0180   // Used to post-process values looked up in a hash table on build side of the hash join
0181   // before outputting to the parent exec node.
0182   //
0183   Result<std::shared_ptr<ArrayData>> RemapOutput(const ArrayData& indices32Bit,
0184                                                  ExecContext* ctx) const;
0185 
0186   // Release shared pointers and memory
0187   void CleanUp();
0188 
0189  private:
0190   // Data type of dictionary ids for the input dictionary on build side
0191   std::shared_ptr<DataType> index_type_;
0192   // Data type of values for the input dictionary on build side
0193   std::shared_ptr<DataType> value_type_;
0194   // Mapping from (encoded as string) values to the ids in unified dictionary
0195   std::unordered_map<std::string, int32_t> hash_table_;
0196   // Mapping from input dictionary ids to unified dictionary ids
0197   std::shared_ptr<ArrayData> remapped_ids_;
0198   // Input dictionary
0199   std::shared_ptr<Array> dictionary_;
0200   // Unified dictionary
0201   std::shared_ptr<ArrayData> unified_dictionary_;
0202 };
0203 
0204 /// Implements processing of dictionary arrays/scalars in key columns on the probe side of
0205 /// a hash join.
0206 /// Each instance of this class corresponds to a single column and stores and
0207 /// processes only the information related to that column.
0208 /// It is not thread-safe - every participating thread should use its own instance of
0209 /// this class.
0210 ///
0211 class HashJoinDictProbe {
0212  public:
0213   static bool KeyNeedsProcessing(const std::shared_ptr<DataType>& probe_data_type,
0214                                  const std::shared_ptr<DataType>& build_data_type);
0215 
0216   // Data type of the result of remapping input key column.
0217   //
0218   // The result of remapping is what is used in hash join for matching keys on build and
0219   // probe side. The exact data types may be different, as described below, and therefore
0220   // a common representation is needed for simplifying comparisons of pairs of keys on
0221   // both sides.
0222   //
0223   // We support matching key that is of non-dictionary type with key that is of dictionary
0224   // type, as long as the underlying value types are equal. We support matching when both
0225   // keys are of dictionary type, regardless whether underlying dictionary index types are
0226   // the same or not.
0227   //
0228   static std::shared_ptr<DataType> DataTypeAfterRemapping(
0229       const std::shared_ptr<DataType>& build_data_type);
0230 
0231   // Should only be called if KeyNeedsProcessing method returns true for a pair of
0232   // corresponding key columns from build and probe side.
0233   // Converts values in order to match the common representation for
0234   // both build and probe side used in hash table comparison.
0235   // Supports arrays and scalars as input.
0236   // Argument opt_build_side should be null if dictionary key on probe side is matched
0237   // with non-dictionary key on build side.
0238   //
0239   Result<std::shared_ptr<ArrayData>> RemapInput(
0240       const HashJoinDictBuild* opt_build_side, const Datum& data, int64_t batch_length,
0241       const std::shared_ptr<DataType>& probe_data_type,
0242       const std::shared_ptr<DataType>& build_data_type, ExecContext* ctx);
0243 
0244   void CleanUp();
0245 
0246  private:
0247   // May be null if probe side key is non-dictionary. Otherwise it is used to verify that
0248   // only a single dictionary is referenced in exec batch on probe side of hash join.
0249   std::shared_ptr<Array> dictionary_;
0250   // Mapping from dictionary on probe side of hash join (if it is used) to unified
0251   // representation.
0252   std::shared_ptr<ArrayData> remapped_ids_;
0253   // Encoder of key columns that uses unified representation instead of original data type
0254   // for key columns that need to use it (have dictionaries on either side of the join).
0255   RowEncoder encoder_;
0256 };
0257 
0258 // Encapsulates dictionary handling logic for build side of hash join.
0259 //
0260 class HashJoinDictBuildMulti {
0261  public:
0262   Status Init(const SchemaProjectionMaps<HashJoinProjection>& proj_map,
0263               const ExecBatch* opt_non_empty_batch, ExecContext* ctx);
0264   static void InitEncoder(const SchemaProjectionMaps<HashJoinProjection>& proj_map,
0265                           RowEncoder* encoder, ExecContext* ctx);
0266   Status EncodeBatch(size_t thread_index,
0267                      const SchemaProjectionMaps<HashJoinProjection>& proj_map,
0268                      const ExecBatch& batch, RowEncoder* encoder, ExecContext* ctx) const;
0269   Status PostDecode(const SchemaProjectionMaps<HashJoinProjection>& proj_map,
0270                     ExecBatch* decoded_key_batch, ExecContext* ctx);
0271   const HashJoinDictBuild& get_dict_build(int icol) const { return remap_imp_[icol]; }
0272 
0273  private:
0274   std::vector<bool> needs_remap_;
0275   std::vector<HashJoinDictBuild> remap_imp_;
0276 };
0277 
0278 // Encapsulates dictionary handling logic for probe side of hash join
0279 //
0280 class HashJoinDictProbeMulti {
0281  public:
0282   void Init(size_t num_threads);
0283   bool BatchRemapNeeded(size_t thread_index,
0284                         const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe,
0285                         const SchemaProjectionMaps<HashJoinProjection>& proj_map_build,
0286                         ExecContext* ctx);
0287   Status EncodeBatch(size_t thread_index,
0288                      const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe,
0289                      const SchemaProjectionMaps<HashJoinProjection>& proj_map_build,
0290                      const HashJoinDictBuildMulti& dict_build, const ExecBatch& batch,
0291                      RowEncoder** out_encoder, ExecBatch* opt_out_key_batch,
0292                      ExecContext* ctx);
0293 
0294  private:
0295   void InitLocalStateIfNeeded(
0296       size_t thread_index, const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe,
0297       const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, ExecContext* ctx);
0298   static void InitEncoder(const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe,
0299                           const SchemaProjectionMaps<HashJoinProjection>& proj_map_build,
0300                           RowEncoder* encoder, ExecContext* ctx);
0301   struct ThreadLocalState {
0302     bool is_initialized;
0303     // Whether any key column needs remapping (because of dictionaries used) before doing
0304     // join hash table lookups
0305     bool any_needs_remap;
0306     // Whether each key column needs remapping before doing join hash table lookups
0307     std::vector<bool> needs_remap;
0308     std::vector<HashJoinDictProbe> remap_imp;
0309     // Encoder of key columns that uses unified representation instead of original data
0310     // type for key columns that need to use it (have dictionaries on either side of the
0311     // join).
0312     RowEncoder post_remap_encoder;
0313   };
0314   std::vector<ThreadLocalState> local_states_;
0315 };
0316 
0317 }  // namespace acero
0318 }  // namespace arrow