![]() |
|
|||
File indexing completed on 2025-08-28 08:26:53
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <atomic> 0021 #include <cassert> 0022 #include <cstdint> 0023 #include <functional> 0024 #include <random> 0025 #include "arrow/acero/util.h" 0026 #include "arrow/buffer.h" 0027 #include "arrow/util/pcg_random.h" 0028 0029 namespace arrow { 0030 namespace acero { 0031 0032 class PartitionSort { 0033 public: 0034 /// \brief Bucket sort rows on partition ids in O(num_rows) time. 0035 /// 0036 /// Include in the output exclusive cumulative sum of bucket sizes. 0037 /// This corresponds to ranges in the sorted array containing all row ids for 0038 /// each of the partitions. 0039 /// 0040 /// prtn_ranges must be initialized and have at least prtn_ranges + 1 elements 0041 /// when this method returns prtn_ranges[i] will contains the total number of 0042 /// elements in partitions 0 through i. prtn_ranges[0] will be 0. 0043 /// 0044 /// prtn_id_impl must be a function that takes in a row id (int) and returns 0045 /// a partition id (int). The returned partition id must be between 0 and 0046 /// num_prtns (exclusive). 0047 /// 0048 /// output_pos_impl is a function that takes in a row id (int) and a position (int) 0049 /// in the bucket sorted output. The function should insert the row in the 0050 /// output. 0051 /// 0052 /// For example: 0053 /// 0054 /// in_arr: [5, 7, 2, 3, 5, 4] 0055 /// num_prtns: 3 0056 /// prtn_id_impl: [&in_arr] (int row_id) { return in_arr[row_id] / 3; } 0057 /// output_pos_impl: [&sorted_row_ids] (int row_id, int pos) { sorted_row_ids[pos] = 0058 /// row_id; } 0059 /// 0060 /// After Execution 0061 /// sorted_row_ids: [2, 0, 3, 4, 5, 1] 0062 /// prtn_ranges: [0, 1, 5, 6] 0063 template <class INPUT_PRTN_ID_FN, class OUTPUT_POS_FN> 0064 static void Eval(int64_t num_rows, int num_prtns, uint16_t* prtn_ranges, 0065 INPUT_PRTN_ID_FN prtn_id_impl, OUTPUT_POS_FN output_pos_impl) { 0066 ARROW_DCHECK(num_rows > 0 && num_rows <= (1 << 15)); 0067 ARROW_DCHECK(num_prtns >= 1 && num_prtns <= (1 << 15)); 0068 0069 memset(prtn_ranges, 0, (num_prtns + 1) * sizeof(uint16_t)); 0070 0071 for (int64_t i = 0; i < num_rows; ++i) { 0072 int prtn_id = static_cast<int>(prtn_id_impl(i)); 0073 ++prtn_ranges[prtn_id + 1]; 0074 } 0075 0076 uint16_t sum = 0; 0077 for (int i = 0; i < num_prtns; ++i) { 0078 uint16_t sum_next = sum + prtn_ranges[i + 1]; 0079 prtn_ranges[i + 1] = sum; 0080 sum = sum_next; 0081 } 0082 0083 for (int64_t i = 0; i < num_rows; ++i) { 0084 int prtn_id = static_cast<int>(prtn_id_impl(i)); 0085 int pos = prtn_ranges[prtn_id + 1]++; 0086 output_pos_impl(i, pos); 0087 } 0088 } 0089 }; 0090 0091 /// \brief A control for synchronizing threads on a partitionable workload 0092 class PartitionLocks { 0093 public: 0094 PartitionLocks(); 0095 ~PartitionLocks(); 0096 /// \brief Initializes the control, must be called before use 0097 /// 0098 /// \param num_threads Maximum number of threads that will access the partitions 0099 /// \param num_prtns Number of partitions to synchronize 0100 void Init(size_t num_threads, int num_prtns); 0101 /// \brief Cleans up the control, it should not be used after this call 0102 void CleanUp(); 0103 /// \brief Acquire a partition to work on one 0104 /// 0105 /// \param thread_id The index of the thread trying to acquire the partition lock 0106 /// \param num_prtns Length of prtns_to_try, must be <= num_prtns used in Init 0107 /// \param prtns_to_try An array of partitions that still have remaining work 0108 /// \param limit_retries If false, this method will spinwait forever until success 0109 /// \param max_retries Max times to attempt checking out work before returning false 0110 /// \param[out] locked_prtn_id The id of the partition locked 0111 /// \param[out] locked_prtn_id_pos The index of the partition locked in prtns_to_try 0112 /// \return True if a partition was locked, false if max_retries was attempted 0113 /// without successfully acquiring a lock 0114 /// 0115 /// This method is thread safe 0116 bool AcquirePartitionLock(size_t thread_id, int num_prtns, const int* prtns_to_try, 0117 bool limit_retries, int max_retries, int* locked_prtn_id, 0118 int* locked_prtn_id_pos); 0119 /// \brief Release a partition so that other threads can work on it 0120 void ReleasePartitionLock(int prtn_id); 0121 0122 // Executes (synchronously and using current thread) the same operation on a set of 0123 // multiple partitions. Tries to minimize partition locking overhead by randomizing and 0124 // adjusting order in which partitions are processed. 0125 // 0126 // PROCESS_PRTN_FN is a callback which will be executed for each partition after 0127 // acquiring the lock for that partition. It gets partition id as an argument. 0128 // IS_PRTN_EMPTY_FN is a callback which filters out (when returning true) partitions 0129 // with specific ids from processing. 0130 // 0131 template <typename IS_PRTN_EMPTY_FN, typename PROCESS_PRTN_FN> 0132 Status ForEachPartition(size_t thread_id, 0133 /*scratch space buffer with space for one element per partition; 0134 dirty in and dirty out*/ 0135 int* temp_unprocessed_prtns, IS_PRTN_EMPTY_FN is_prtn_empty_fn, 0136 PROCESS_PRTN_FN process_prtn_fn) { 0137 int num_unprocessed_partitions = 0; 0138 for (int i = 0; i < num_prtns_; ++i) { 0139 bool is_prtn_empty = is_prtn_empty_fn(i); 0140 if (!is_prtn_empty) { 0141 temp_unprocessed_prtns[num_unprocessed_partitions++] = i; 0142 } 0143 } 0144 while (num_unprocessed_partitions > 0) { 0145 int locked_prtn_id; 0146 int locked_prtn_id_pos; 0147 AcquirePartitionLock(thread_id, num_unprocessed_partitions, temp_unprocessed_prtns, 0148 /*limit_retries=*/false, /*max_retries=*/-1, &locked_prtn_id, 0149 &locked_prtn_id_pos); 0150 { 0151 class AutoReleaseLock { 0152 public: 0153 AutoReleaseLock(PartitionLocks* locks, int prtn_id) 0154 : locks(locks), prtn_id(prtn_id) {} 0155 ~AutoReleaseLock() { locks->ReleasePartitionLock(prtn_id); } 0156 PartitionLocks* locks; 0157 int prtn_id; 0158 } auto_release_lock(this, locked_prtn_id); 0159 ARROW_RETURN_NOT_OK(process_prtn_fn(locked_prtn_id)); 0160 } 0161 if (locked_prtn_id_pos < num_unprocessed_partitions - 1) { 0162 temp_unprocessed_prtns[locked_prtn_id_pos] = 0163 temp_unprocessed_prtns[num_unprocessed_partitions - 1]; 0164 } 0165 --num_unprocessed_partitions; 0166 } 0167 return Status::OK(); 0168 } 0169 0170 private: 0171 std::atomic<bool>* lock_ptr(int prtn_id); 0172 int random_int(size_t thread_id, int num_values); 0173 0174 struct PartitionLock { 0175 static constexpr int kCacheLineBytes = 64; 0176 std::atomic<bool> lock; 0177 uint8_t padding[kCacheLineBytes]; 0178 }; 0179 int num_prtns_; 0180 std::unique_ptr<PartitionLock[]> locks_; 0181 std::unique_ptr<arrow::random::pcg32_fast[]> rngs_; 0182 }; 0183 0184 } // namespace acero 0185 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |