Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:52

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <atomic>
0021 #include <cstdint>
0022 #include <memory>
0023 
0024 #include "arrow/acero/partition_util.h"
0025 #include "arrow/acero/util.h"
0026 #include "arrow/memory_pool.h"
0027 #include "arrow/result.h"
0028 #include "arrow/status.h"
0029 #include "arrow/util/simd.h"
0030 
0031 namespace arrow {
0032 namespace acero {
0033 
0034 // A set of pre-generated bit masks from a 64-bit word.
0035 //
0036 // It is used to map selected bits of hash to a bit mask that will be used in
0037 // a Bloom filter.
0038 //
0039 // These bit masks need to look random and need to have a similar fractions of
0040 // bits set in order for a Bloom filter to have a low false positives rate.
0041 //
0042 struct ARROW_ACERO_EXPORT BloomFilterMasks {
0043   // Generate all masks as a single bit vector. Each bit offset in this bit
0044   // vector corresponds to a single mask.
0045   // In each consecutive kBitsPerMask bits, there must be between
0046   // kMinBitsSet and kMaxBitsSet bits set.
0047   //
0048   BloomFilterMasks();
0049 
0050   inline uint64_t mask(int bit_offset) {
0051 #if ARROW_LITTLE_ENDIAN
0052     return (arrow::util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8) >>
0053             (bit_offset % 8)) &
0054            kFullMask;
0055 #else
0056     return (BYTESWAP(arrow::util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8)) >>
0057             (bit_offset % 8)) &
0058            kFullMask;
0059 #endif
0060   }
0061 
0062   // Masks are 57 bits long because then they can be accessed at an
0063   // arbitrary bit offset using a single unaligned 64-bit load instruction.
0064   //
0065   static constexpr int kBitsPerMask = 57;
0066   static constexpr uint64_t kFullMask = (1ULL << kBitsPerMask) - 1;
0067 
0068   // Minimum and maximum number of bits set in each mask.
0069   // This constraint is enforced when generating the bit masks.
0070   // Values should be close to each other and chosen as to minimize a Bloom
0071   // filter false positives rate.
0072   //
0073   static constexpr int kMinBitsSet = 4;
0074   static constexpr int kMaxBitsSet = 5;
0075 
0076   // Number of generated masks.
0077   // Having more masks to choose will improve false positives rate of Bloom
0078   // filter but will also use more memory, which may lead to more CPU cache
0079   // misses.
0080   // The chosen value results in using only a few cache-lines for mask lookups,
0081   // while providing a good variety of available bit masks.
0082   //
0083   static constexpr int kLogNumMasks = 10;
0084   static constexpr int kNumMasks = 1 << kLogNumMasks;
0085 
0086   // Data of masks. Masks are stored in a single bit vector. Nth mask is
0087   // kBitsPerMask bits starting at bit offset N.
0088   //
0089   static constexpr int kTotalBytes = (kNumMasks + 64) / 8;
0090   uint8_t masks_[kTotalBytes];
0091 };
0092 
0093 // A variant of a blocked Bloom filter implementation.
0094 // A Bloom filter is a data structure that provides approximate membership test
0095 // functionality based only on the hash of the key. Membership test may return
0096 // false positives but not false negatives. Approximation of the result allows
0097 // in general case (for arbitrary data types of keys) to save on both memory and
0098 // lookup cost compared to the accurate membership test.
0099 // The accurate test may sometimes still be cheaper for a specific data types
0100 // and inputs, e.g. integers from a small range.
0101 //
0102 // This blocked Bloom filter is optimized for use in hash joins, to achieve a
0103 // good balance between the size of the filter, the cost of its building and
0104 // querying and the rate of false positives.
0105 //
0106 class ARROW_ACERO_EXPORT BlockedBloomFilter {
0107   friend class BloomFilterBuilder_SingleThreaded;
0108   friend class BloomFilterBuilder_Parallel;
0109 
0110  public:
0111   BlockedBloomFilter() : log_num_blocks_(0), num_blocks_(0), blocks_(NULLPTR) {}
0112 
0113   inline bool Find(uint64_t hash) const {
0114     uint64_t m = mask(hash);
0115     uint64_t b = blocks_[block_id(hash)];
0116     return (b & m) == m;
0117   }
0118 
0119   // Uses SIMD if available for smaller Bloom filters.
0120   // Uses memory prefetching for larger Bloom filters.
0121   //
0122   void Find(int64_t hardware_flags, int64_t num_rows, const uint32_t* hashes,
0123             uint8_t* result_bit_vector, bool enable_prefetch = true) const;
0124   void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes,
0125             uint8_t* result_bit_vector, bool enable_prefetch = true) const;
0126 
0127   int log_num_blocks() const { return log_num_blocks_; }
0128 
0129   int NumHashBitsUsed() const;
0130 
0131   bool IsSameAs(const BlockedBloomFilter* other) const;
0132 
0133   int64_t NumBitsSet() const;
0134 
0135   // Folding of a block Bloom filter after the initial version
0136   // has been built.
0137   //
0138   // One of the parameters for creation of Bloom filter is the number
0139   // of bits allocated for it. The more bits allocated, the lower the
0140   // probability of false positives. A good heuristic is to aim for
0141   // half of the bits set in the constructed Bloom filter. This should
0142   // result in a good trade off between size (and following cost of
0143   // memory accesses) and false positives rate.
0144   //
0145   // There might have been many duplicate keys in the input provided
0146   // to Bloom filter builder. In that case the resulting bit vector
0147   // would be more sparse then originally intended. It is possible to
0148   // easily correct that and cut in half the size of Bloom filter
0149   // after it has already been constructed. The process to do that is
0150   // approximately equal to OR-ing bits from upper and lower half (the
0151   // way we address these bits when inserting or querying a hash makes
0152   // such folding in half possible).
0153   //
0154   // We will keep folding as long as the fraction of bits set is less
0155   // than 1/4. The resulting bit vector density should be in the [1/4,
0156   // 1/2) range.
0157   //
0158   void Fold();
0159 
0160  private:
0161   Status CreateEmpty(int64_t num_rows_to_insert, MemoryPool* pool);
0162 
0163   inline void Insert(uint64_t hash) {
0164     uint64_t m = mask(hash);
0165     uint64_t& b = blocks_[block_id(hash)];
0166     b |= m;
0167   }
0168 
0169   void Insert(int64_t hardware_flags, int64_t num_rows, const uint32_t* hashes);
0170   void Insert(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes);
0171 
0172   inline uint64_t mask(uint64_t hash) const {
0173     // The lowest bits of hash are used to pick mask index.
0174     //
0175     int mask_id = static_cast<int>(hash & (BloomFilterMasks::kNumMasks - 1));
0176     uint64_t result = masks_.mask(mask_id);
0177 
0178     // The next set of hash bits is used to pick the amount of bit
0179     // rotation of the mask.
0180     //
0181     int rotation = (hash >> BloomFilterMasks::kLogNumMasks) & 63;
0182     result = ROTL64(result, rotation);
0183 
0184     return result;
0185   }
0186 
0187   inline int64_t block_id(uint64_t hash) const {
0188     // The next set of hash bits following the bits used to select a
0189     // mask is used to pick block id (index of 64-bit word in a bit
0190     // vector).
0191     //
0192     return (hash >> (BloomFilterMasks::kLogNumMasks + 6)) & (num_blocks_ - 1);
0193   }
0194 
0195   template <typename T>
0196   inline void InsertImp(int64_t num_rows, const T* hashes);
0197 
0198   template <typename T>
0199   inline void FindImp(int64_t num_rows, const T* hashes, uint8_t* result_bit_vector,
0200                       bool enable_prefetch) const;
0201 
0202   void SingleFold(int num_folds);
0203 
0204 #if defined(ARROW_HAVE_RUNTIME_AVX2)
0205   inline __m256i mask_avx2(__m256i hash) const;
0206   inline __m256i block_id_avx2(__m256i hash) const;
0207   int64_t Insert_avx2(int64_t num_rows, const uint32_t* hashes);
0208   int64_t Insert_avx2(int64_t num_rows, const uint64_t* hashes);
0209   template <typename T>
0210   int64_t InsertImp_avx2(int64_t num_rows, const T* hashes);
0211   int64_t Find_avx2(int64_t num_rows, const uint32_t* hashes,
0212                     uint8_t* result_bit_vector) const;
0213   int64_t Find_avx2(int64_t num_rows, const uint64_t* hashes,
0214                     uint8_t* result_bit_vector) const;
0215   template <typename T>
0216   int64_t FindImp_avx2(int64_t num_rows, const T* hashes,
0217                        uint8_t* result_bit_vector) const;
0218 #endif
0219 
0220   bool UsePrefetch() const {
0221     return num_blocks_ * sizeof(uint64_t) > kPrefetchLimitBytes;
0222   }
0223 
0224   static constexpr int64_t kPrefetchLimitBytes = 256 * 1024;
0225 
0226   static BloomFilterMasks masks_;
0227 
0228   // Total number of bits used by block Bloom filter must be a power
0229   // of 2.
0230   //
0231   int log_num_blocks_;
0232   int64_t num_blocks_;
0233 
0234   // Buffer allocated to store an array of power of 2 64-bit blocks.
0235   //
0236   std::shared_ptr<Buffer> buf_;
0237   // Pointer to mutable data owned by Buffer
0238   //
0239   uint64_t* blocks_;
0240 };
0241 
0242 // We have two separate implementations of building a Bloom filter, multi-threaded and
0243 // single-threaded.
0244 //
0245 // Single threaded version is useful in two ways:
0246 // a) It allows to verify parallel implementation in tests (the single threaded one is
0247 // simpler and can be used as the source of truth).
0248 // b) It is preferred for small and medium size Bloom filters, because it skips extra
0249 // synchronization related steps from parallel variant (partitioning and taking locks).
0250 //
0251 enum class BloomFilterBuildStrategy {
0252   SINGLE_THREADED = 0,
0253   PARALLEL = 1,
0254 };
0255 
0256 class ARROW_ACERO_EXPORT BloomFilterBuilder {
0257  public:
0258   virtual ~BloomFilterBuilder() = default;
0259   virtual Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
0260                        int64_t num_rows, int64_t num_batches,
0261                        BlockedBloomFilter* build_target) = 0;
0262   virtual int64_t num_tasks() const { return 0; }
0263   virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
0264                                const uint32_t* hashes) = 0;
0265   virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
0266                                const uint64_t* hashes) = 0;
0267   virtual void CleanUp() {}
0268   static std::unique_ptr<BloomFilterBuilder> Make(BloomFilterBuildStrategy strategy);
0269 };
0270 
0271 class ARROW_ACERO_EXPORT BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
0272  public:
0273   Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
0274                int64_t num_rows, int64_t num_batches,
0275                BlockedBloomFilter* build_target) override;
0276 
0277   Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
0278                        const uint32_t* hashes) override;
0279 
0280   Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
0281                        const uint64_t* hashes) override;
0282 
0283  private:
0284   template <typename T>
0285   void PushNextBatchImp(int64_t num_rows, const T* hashes);
0286 
0287   int64_t hardware_flags_;
0288   BlockedBloomFilter* build_target_;
0289 };
0290 
0291 class ARROW_ACERO_EXPORT BloomFilterBuilder_Parallel : public BloomFilterBuilder {
0292  public:
0293   Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
0294                int64_t num_rows, int64_t num_batches,
0295                BlockedBloomFilter* build_target) override;
0296 
0297   Status PushNextBatch(size_t thread_id, int64_t num_rows,
0298                        const uint32_t* hashes) override;
0299 
0300   Status PushNextBatch(size_t thread_id, int64_t num_rows,
0301                        const uint64_t* hashes) override;
0302 
0303   void CleanUp() override;
0304 
0305  private:
0306   template <typename T>
0307   void PushNextBatchImp(size_t thread_id, int64_t num_rows, const T* hashes);
0308 
0309   int64_t hardware_flags_;
0310   BlockedBloomFilter* build_target_;
0311   int log_num_prtns_;
0312   struct ThreadLocalState {
0313     std::vector<uint32_t> partitioned_hashes_32;
0314     std::vector<uint64_t> partitioned_hashes_64;
0315     std::vector<uint16_t> partition_ranges;
0316     std::vector<int> unprocessed_partition_ids;
0317   };
0318   std::vector<ThreadLocalState> thread_local_states_;
0319   PartitionLocks prtn_locks_;
0320 };
0321 
0322 }  // namespace acero
0323 }  // namespace arrow