![]() |
|
|||
File indexing completed on 2025-08-28 08:26:53
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <memory> 0021 #include <unordered_map> 0022 0023 #include "arrow/acero/schema_util.h" 0024 #include "arrow/compute/exec.h" 0025 #include "arrow/compute/row/row_encoder_internal.h" 0026 #include "arrow/result.h" 0027 #include "arrow/status.h" 0028 #include "arrow/type.h" 0029 0030 // This file contains hash join logic related to handling of dictionary encoded key 0031 // columns. 0032 // 0033 // A key column from probe side of the join can be matched against a key column from build 0034 // side of the join, as long as the underlying value types are equal. That means that: 0035 // - both scalars and arrays can be used and even mixed in the same column 0036 // - dictionary column can be matched against non-dictionary column if underlying value 0037 // types are equal 0038 // - dictionary column can be matched against dictionary column with a different index 0039 // type, and potentially using a different dictionary, if underlying value types are equal 0040 // 0041 // We currently require in hash join that for all dictionary encoded columns, the same 0042 // dictionary is used in all input exec batches. 0043 // 0044 // In order to allow matching columns with different dictionaries, different dictionary 0045 // index types, and dictionary key against non-dictionary key, internally comparisons will 0046 // be evaluated after remapping values on both sides of the join to a common 0047 // representation (which will be called "unified representation"). This common 0048 // representation is a column of int32() type (not a dictionary column). It represents an 0049 // index in the unified dictionary computed for the (only) dictionary present on build 0050 // side (an empty dictionary is still created for an empty build side). Null value is 0051 // always represented in this common representation as null int32 value, unified 0052 // dictionary will never contain a null value (so there is no ambiguity of representing 0053 // nulls as either index to a null entry in the dictionary or null index). 0054 // 0055 // Unified dictionary represents values present on build side. There may be values on 0056 // probe side that are not present in it. All such values, that are not null, are mapped 0057 // in the common representation to a special constant kMissingValueId. 0058 // 0059 0060 namespace arrow { 0061 0062 using compute::ExecBatch; 0063 using compute::ExecContext; 0064 using compute::internal::RowEncoder; 0065 0066 namespace acero { 0067 0068 /// Helper class with operations that are stateless and common to processing of dictionary 0069 /// keys on both build and probe side. 0070 class HashJoinDictUtil { 0071 public: 0072 // Null values in unified representation are always represented as null that has 0073 // corresponding integer set to this constant 0074 static constexpr int32_t kNullId = 0; 0075 // Constant representing a value, that is not null, missing on the build side, in 0076 // unified representation. 0077 static constexpr int32_t kMissingValueId = -1; 0078 0079 // Check if data types of corresponding pair of key column on build and probe side are 0080 // compatible 0081 static bool KeyDataTypesValid(const std::shared_ptr<DataType>& probe_data_type, 0082 const std::shared_ptr<DataType>& build_data_type); 0083 0084 // Input must be dictionary array or dictionary scalar. 0085 // A precomputed and provided here lookup table in the form of int32() array will be 0086 // used to remap input indices to unified representation. 0087 // 0088 static Result<std::shared_ptr<ArrayData>> IndexRemapUsingLUT( 0089 ExecContext* ctx, const Datum& indices, int64_t batch_length, 0090 const std::shared_ptr<ArrayData>& map_array, 0091 const std::shared_ptr<DataType>& data_type); 0092 0093 // Return int32() array that contains indices of input dictionary array or scalar after 0094 // type casting. 0095 static Result<std::shared_ptr<ArrayData>> ConvertToInt32( 0096 const std::shared_ptr<DataType>& from_type, const Datum& input, 0097 int64_t batch_length, ExecContext* ctx); 0098 0099 // Return an array that contains elements of input int32() array after casting to a 0100 // given integer type. This is used for mapping unified representation stored in the 0101 // hash table on build side back to original input data type of hash join, when 0102 // outputting hash join results to parent exec node. 0103 // 0104 static Result<std::shared_ptr<ArrayData>> ConvertFromInt32( 0105 const std::shared_ptr<DataType>& to_type, const Datum& input, int64_t batch_length, 0106 ExecContext* ctx); 0107 0108 // Return dictionary referenced in either dictionary array or dictionary scalar 0109 static std::shared_ptr<Array> ExtractDictionary(const Datum& data); 0110 }; 0111 0112 /// Implements processing of dictionary arrays/scalars in key columns on the build side of 0113 /// a hash join. 0114 /// Each instance of this class corresponds to a single column and stores and 0115 /// processes only the information related to that column. 0116 /// Const methods are thread-safe, non-const methods are not (the caller must make sure 0117 /// that only one thread at any time will access them). 0118 /// 0119 class HashJoinDictBuild { 0120 public: 0121 // Returns true if the key column (described in input by its data type) requires any 0122 // pre- or post-processing related to handling dictionaries. 0123 // 0124 static bool KeyNeedsProcessing(const std::shared_ptr<DataType>& build_data_type) { 0125 return (build_data_type->id() == Type::DICTIONARY); 0126 } 0127 0128 // Data type of unified representation 0129 static std::shared_ptr<DataType> DataTypeAfterRemapping() { return int32(); } 0130 0131 // Should be called only once in hash join, before processing any build or probe 0132 // batches. 0133 // 0134 // Takes a pointer to the dictionary for a corresponding key column on the build side as 0135 // an input. If the build side is empty, it still needs to be called, but with 0136 // dictionary pointer set to null. 0137 // 0138 // Currently it is required that all input batches on build side share the same 0139 // dictionary. For each input batch during its pre-processing, dictionary will be 0140 // checked and error will be returned if it is different then the one provided in the 0141 // call to this method. 0142 // 0143 // Unifies the dictionary. The order of the values is still preserved. 0144 // Null and duplicate entries are removed. If the dictionary is already unified, its 0145 // copy will be produced and stored within this class. 0146 // 0147 // Prepares the mapping from ids within original dictionary to the ids in the resulting 0148 // dictionary. This is used later on to pre-process (map to unified representation) key 0149 // column on build side. 0150 // 0151 // Prepares the reverse mapping (in the form of hash table) from values to the ids in 0152 // the resulting dictionary. This will be used later on to pre-process (map to unified 0153 // representation) key column on probe side. Values on probe side that are not present 0154 // in the original dictionary will be mapped to a special constant kMissingValueId. The 0155 // exception is made for nulls, which get always mapped to nulls (both when null is 0156 // represented as a dictionary id pointing to a null and a null dictionary id). 0157 // 0158 Status Init(ExecContext* ctx, std::shared_ptr<Array> dictionary, 0159 std::shared_ptr<DataType> index_type, std::shared_ptr<DataType> value_type); 0160 0161 // Remap array or scalar values into unified representation (array of int32()). 0162 // Outputs kMissingValueId if input value is not found in the unified dictionary. 0163 // Outputs null for null input value (with corresponding data set to kNullId). 0164 // 0165 Result<std::shared_ptr<ArrayData>> RemapInputValues(ExecContext* ctx, 0166 const Datum& values, 0167 int64_t batch_length) const; 0168 0169 // Remap dictionary array or dictionary scalar on build side to unified representation. 0170 // Dictionary referenced in the input must match the dictionary that was 0171 // given during initialization. 0172 // The output is a dictionary array that references unified dictionary. 0173 // 0174 Result<std::shared_ptr<ArrayData>> RemapInput( 0175 ExecContext* ctx, const Datum& indices, int64_t batch_length, 0176 const std::shared_ptr<DataType>& data_type) const; 0177 0178 // Outputs dictionary array referencing unified dictionary, given an array with 32-bit 0179 // ids. 0180 // Used to post-process values looked up in a hash table on build side of the hash join 0181 // before outputting to the parent exec node. 0182 // 0183 Result<std::shared_ptr<ArrayData>> RemapOutput(const ArrayData& indices32Bit, 0184 ExecContext* ctx) const; 0185 0186 // Release shared pointers and memory 0187 void CleanUp(); 0188 0189 private: 0190 // Data type of dictionary ids for the input dictionary on build side 0191 std::shared_ptr<DataType> index_type_; 0192 // Data type of values for the input dictionary on build side 0193 std::shared_ptr<DataType> value_type_; 0194 // Mapping from (encoded as string) values to the ids in unified dictionary 0195 std::unordered_map<std::string, int32_t> hash_table_; 0196 // Mapping from input dictionary ids to unified dictionary ids 0197 std::shared_ptr<ArrayData> remapped_ids_; 0198 // Input dictionary 0199 std::shared_ptr<Array> dictionary_; 0200 // Unified dictionary 0201 std::shared_ptr<ArrayData> unified_dictionary_; 0202 }; 0203 0204 /// Implements processing of dictionary arrays/scalars in key columns on the probe side of 0205 /// a hash join. 0206 /// Each instance of this class corresponds to a single column and stores and 0207 /// processes only the information related to that column. 0208 /// It is not thread-safe - every participating thread should use its own instance of 0209 /// this class. 0210 /// 0211 class HashJoinDictProbe { 0212 public: 0213 static bool KeyNeedsProcessing(const std::shared_ptr<DataType>& probe_data_type, 0214 const std::shared_ptr<DataType>& build_data_type); 0215 0216 // Data type of the result of remapping input key column. 0217 // 0218 // The result of remapping is what is used in hash join for matching keys on build and 0219 // probe side. The exact data types may be different, as described below, and therefore 0220 // a common representation is needed for simplifying comparisons of pairs of keys on 0221 // both sides. 0222 // 0223 // We support matching key that is of non-dictionary type with key that is of dictionary 0224 // type, as long as the underlying value types are equal. We support matching when both 0225 // keys are of dictionary type, regardless whether underlying dictionary index types are 0226 // the same or not. 0227 // 0228 static std::shared_ptr<DataType> DataTypeAfterRemapping( 0229 const std::shared_ptr<DataType>& build_data_type); 0230 0231 // Should only be called if KeyNeedsProcessing method returns true for a pair of 0232 // corresponding key columns from build and probe side. 0233 // Converts values in order to match the common representation for 0234 // both build and probe side used in hash table comparison. 0235 // Supports arrays and scalars as input. 0236 // Argument opt_build_side should be null if dictionary key on probe side is matched 0237 // with non-dictionary key on build side. 0238 // 0239 Result<std::shared_ptr<ArrayData>> RemapInput( 0240 const HashJoinDictBuild* opt_build_side, const Datum& data, int64_t batch_length, 0241 const std::shared_ptr<DataType>& probe_data_type, 0242 const std::shared_ptr<DataType>& build_data_type, ExecContext* ctx); 0243 0244 void CleanUp(); 0245 0246 private: 0247 // May be null if probe side key is non-dictionary. Otherwise it is used to verify that 0248 // only a single dictionary is referenced in exec batch on probe side of hash join. 0249 std::shared_ptr<Array> dictionary_; 0250 // Mapping from dictionary on probe side of hash join (if it is used) to unified 0251 // representation. 0252 std::shared_ptr<ArrayData> remapped_ids_; 0253 // Encoder of key columns that uses unified representation instead of original data type 0254 // for key columns that need to use it (have dictionaries on either side of the join). 0255 RowEncoder encoder_; 0256 }; 0257 0258 // Encapsulates dictionary handling logic for build side of hash join. 0259 // 0260 class HashJoinDictBuildMulti { 0261 public: 0262 Status Init(const SchemaProjectionMaps<HashJoinProjection>& proj_map, 0263 const ExecBatch* opt_non_empty_batch, ExecContext* ctx); 0264 static void InitEncoder(const SchemaProjectionMaps<HashJoinProjection>& proj_map, 0265 RowEncoder* encoder, ExecContext* ctx); 0266 Status EncodeBatch(size_t thread_index, 0267 const SchemaProjectionMaps<HashJoinProjection>& proj_map, 0268 const ExecBatch& batch, RowEncoder* encoder, ExecContext* ctx) const; 0269 Status PostDecode(const SchemaProjectionMaps<HashJoinProjection>& proj_map, 0270 ExecBatch* decoded_key_batch, ExecContext* ctx); 0271 const HashJoinDictBuild& get_dict_build(int icol) const { return remap_imp_[icol]; } 0272 0273 private: 0274 std::vector<bool> needs_remap_; 0275 std::vector<HashJoinDictBuild> remap_imp_; 0276 }; 0277 0278 // Encapsulates dictionary handling logic for probe side of hash join 0279 // 0280 class HashJoinDictProbeMulti { 0281 public: 0282 void Init(size_t num_threads); 0283 bool BatchRemapNeeded(size_t thread_index, 0284 const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe, 0285 const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, 0286 ExecContext* ctx); 0287 Status EncodeBatch(size_t thread_index, 0288 const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe, 0289 const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, 0290 const HashJoinDictBuildMulti& dict_build, const ExecBatch& batch, 0291 RowEncoder** out_encoder, ExecBatch* opt_out_key_batch, 0292 ExecContext* ctx); 0293 0294 private: 0295 void InitLocalStateIfNeeded( 0296 size_t thread_index, const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe, 0297 const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, ExecContext* ctx); 0298 static void InitEncoder(const SchemaProjectionMaps<HashJoinProjection>& proj_map_probe, 0299 const SchemaProjectionMaps<HashJoinProjection>& proj_map_build, 0300 RowEncoder* encoder, ExecContext* ctx); 0301 struct ThreadLocalState { 0302 bool is_initialized; 0303 // Whether any key column needs remapping (because of dictionaries used) before doing 0304 // join hash table lookups 0305 bool any_needs_remap; 0306 // Whether each key column needs remapping before doing join hash table lookups 0307 std::vector<bool> needs_remap; 0308 std::vector<HashJoinDictProbe> remap_imp; 0309 // Encoder of key columns that uses unified representation instead of original data 0310 // type for key columns that need to use it (have dictionaries on either side of the 0311 // join). 0312 RowEncoder post_remap_encoder; 0313 }; 0314 std::vector<ThreadLocalState> local_states_; 0315 }; 0316 0317 } // namespace acero 0318 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |