File indexing completed on 2025-08-28 08:26:52
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <atomic>
0021 #include <cstdint>
0022 #include <memory>
0023
0024 #include "arrow/acero/partition_util.h"
0025 #include "arrow/acero/util.h"
0026 #include "arrow/memory_pool.h"
0027 #include "arrow/result.h"
0028 #include "arrow/status.h"
0029 #include "arrow/util/simd.h"
0030
0031 namespace arrow {
0032 namespace acero {
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042 struct ARROW_ACERO_EXPORT BloomFilterMasks {
0043
0044
0045
0046
0047
0048 BloomFilterMasks();
0049
0050 inline uint64_t mask(int bit_offset) {
0051 #if ARROW_LITTLE_ENDIAN
0052 return (arrow::util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8) >>
0053 (bit_offset % 8)) &
0054 kFullMask;
0055 #else
0056 return (BYTESWAP(arrow::util::SafeLoadAs<uint64_t>(masks_ + bit_offset / 8)) >>
0057 (bit_offset % 8)) &
0058 kFullMask;
0059 #endif
0060 }
0061
0062
0063
0064
0065 static constexpr int kBitsPerMask = 57;
0066 static constexpr uint64_t kFullMask = (1ULL << kBitsPerMask) - 1;
0067
0068
0069
0070
0071
0072
0073 static constexpr int kMinBitsSet = 4;
0074 static constexpr int kMaxBitsSet = 5;
0075
0076
0077
0078
0079
0080
0081
0082
0083 static constexpr int kLogNumMasks = 10;
0084 static constexpr int kNumMasks = 1 << kLogNumMasks;
0085
0086
0087
0088
0089 static constexpr int kTotalBytes = (kNumMasks + 64) / 8;
0090 uint8_t masks_[kTotalBytes];
0091 };
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102
0103
0104
0105
0106 class ARROW_ACERO_EXPORT BlockedBloomFilter {
0107 friend class BloomFilterBuilder_SingleThreaded;
0108 friend class BloomFilterBuilder_Parallel;
0109
0110 public:
0111 BlockedBloomFilter() : log_num_blocks_(0), num_blocks_(0), blocks_(NULLPTR) {}
0112
0113 inline bool Find(uint64_t hash) const {
0114 uint64_t m = mask(hash);
0115 uint64_t b = blocks_[block_id(hash)];
0116 return (b & m) == m;
0117 }
0118
0119
0120
0121
0122 void Find(int64_t hardware_flags, int64_t num_rows, const uint32_t* hashes,
0123 uint8_t* result_bit_vector, bool enable_prefetch = true) const;
0124 void Find(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes,
0125 uint8_t* result_bit_vector, bool enable_prefetch = true) const;
0126
0127 int log_num_blocks() const { return log_num_blocks_; }
0128
0129 int NumHashBitsUsed() const;
0130
0131 bool IsSameAs(const BlockedBloomFilter* other) const;
0132
0133 int64_t NumBitsSet() const;
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158 void Fold();
0159
0160 private:
0161 Status CreateEmpty(int64_t num_rows_to_insert, MemoryPool* pool);
0162
0163 inline void Insert(uint64_t hash) {
0164 uint64_t m = mask(hash);
0165 uint64_t& b = blocks_[block_id(hash)];
0166 b |= m;
0167 }
0168
0169 void Insert(int64_t hardware_flags, int64_t num_rows, const uint32_t* hashes);
0170 void Insert(int64_t hardware_flags, int64_t num_rows, const uint64_t* hashes);
0171
0172 inline uint64_t mask(uint64_t hash) const {
0173
0174
0175 int mask_id = static_cast<int>(hash & (BloomFilterMasks::kNumMasks - 1));
0176 uint64_t result = masks_.mask(mask_id);
0177
0178
0179
0180
0181 int rotation = (hash >> BloomFilterMasks::kLogNumMasks) & 63;
0182 result = ROTL64(result, rotation);
0183
0184 return result;
0185 }
0186
0187 inline int64_t block_id(uint64_t hash) const {
0188
0189
0190
0191
0192 return (hash >> (BloomFilterMasks::kLogNumMasks + 6)) & (num_blocks_ - 1);
0193 }
0194
0195 template <typename T>
0196 inline void InsertImp(int64_t num_rows, const T* hashes);
0197
0198 template <typename T>
0199 inline void FindImp(int64_t num_rows, const T* hashes, uint8_t* result_bit_vector,
0200 bool enable_prefetch) const;
0201
0202 void SingleFold(int num_folds);
0203
0204 #if defined(ARROW_HAVE_RUNTIME_AVX2)
0205 inline __m256i mask_avx2(__m256i hash) const;
0206 inline __m256i block_id_avx2(__m256i hash) const;
0207 int64_t Insert_avx2(int64_t num_rows, const uint32_t* hashes);
0208 int64_t Insert_avx2(int64_t num_rows, const uint64_t* hashes);
0209 template <typename T>
0210 int64_t InsertImp_avx2(int64_t num_rows, const T* hashes);
0211 int64_t Find_avx2(int64_t num_rows, const uint32_t* hashes,
0212 uint8_t* result_bit_vector) const;
0213 int64_t Find_avx2(int64_t num_rows, const uint64_t* hashes,
0214 uint8_t* result_bit_vector) const;
0215 template <typename T>
0216 int64_t FindImp_avx2(int64_t num_rows, const T* hashes,
0217 uint8_t* result_bit_vector) const;
0218 #endif
0219
0220 bool UsePrefetch() const {
0221 return num_blocks_ * sizeof(uint64_t) > kPrefetchLimitBytes;
0222 }
0223
0224 static constexpr int64_t kPrefetchLimitBytes = 256 * 1024;
0225
0226 static BloomFilterMasks masks_;
0227
0228
0229
0230
0231 int log_num_blocks_;
0232 int64_t num_blocks_;
0233
0234
0235
0236 std::shared_ptr<Buffer> buf_;
0237
0238
0239 uint64_t* blocks_;
0240 };
0241
0242
0243
0244
0245
0246
0247
0248
0249
0250
0251 enum class BloomFilterBuildStrategy {
0252 SINGLE_THREADED = 0,
0253 PARALLEL = 1,
0254 };
0255
0256 class ARROW_ACERO_EXPORT BloomFilterBuilder {
0257 public:
0258 virtual ~BloomFilterBuilder() = default;
0259 virtual Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
0260 int64_t num_rows, int64_t num_batches,
0261 BlockedBloomFilter* build_target) = 0;
0262 virtual int64_t num_tasks() const { return 0; }
0263 virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
0264 const uint32_t* hashes) = 0;
0265 virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
0266 const uint64_t* hashes) = 0;
0267 virtual void CleanUp() {}
0268 static std::unique_ptr<BloomFilterBuilder> Make(BloomFilterBuildStrategy strategy);
0269 };
0270
0271 class ARROW_ACERO_EXPORT BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
0272 public:
0273 Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
0274 int64_t num_rows, int64_t num_batches,
0275 BlockedBloomFilter* build_target) override;
0276
0277 Status PushNextBatch(size_t , int64_t num_rows,
0278 const uint32_t* hashes) override;
0279
0280 Status PushNextBatch(size_t , int64_t num_rows,
0281 const uint64_t* hashes) override;
0282
0283 private:
0284 template <typename T>
0285 void PushNextBatchImp(int64_t num_rows, const T* hashes);
0286
0287 int64_t hardware_flags_;
0288 BlockedBloomFilter* build_target_;
0289 };
0290
0291 class ARROW_ACERO_EXPORT BloomFilterBuilder_Parallel : public BloomFilterBuilder {
0292 public:
0293 Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
0294 int64_t num_rows, int64_t num_batches,
0295 BlockedBloomFilter* build_target) override;
0296
0297 Status PushNextBatch(size_t thread_id, int64_t num_rows,
0298 const uint32_t* hashes) override;
0299
0300 Status PushNextBatch(size_t thread_id, int64_t num_rows,
0301 const uint64_t* hashes) override;
0302
0303 void CleanUp() override;
0304
0305 private:
0306 template <typename T>
0307 void PushNextBatchImp(size_t thread_id, int64_t num_rows, const T* hashes);
0308
0309 int64_t hardware_flags_;
0310 BlockedBloomFilter* build_target_;
0311 int log_num_prtns_;
0312 struct ThreadLocalState {
0313 std::vector<uint32_t> partitioned_hashes_32;
0314 std::vector<uint64_t> partitioned_hashes_64;
0315 std::vector<uint16_t> partition_ranges;
0316 std::vector<int> unprocessed_partition_ids;
0317 };
0318 std::vector<ThreadLocalState> thread_local_states_;
0319 PartitionLocks prtn_locks_;
0320 };
0321
0322 }
0323 }