Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <atomic>
0021 #include <cassert>
0022 #include <cstdint>
0023 #include <functional>
0024 #include <random>
0025 #include "arrow/acero/util.h"
0026 #include "arrow/buffer.h"
0027 #include "arrow/util/pcg_random.h"
0028 
0029 namespace arrow {
0030 namespace acero {
0031 
0032 class PartitionSort {
0033  public:
0034   /// \brief Bucket sort rows on partition ids in O(num_rows) time.
0035   ///
0036   /// Include in the output exclusive cumulative sum of bucket sizes.
0037   /// This corresponds to ranges in the sorted array containing all row ids for
0038   /// each of the partitions.
0039   ///
0040   /// prtn_ranges must be initialized and have at least prtn_ranges + 1 elements
0041   /// when this method returns prtn_ranges[i] will contains the total number of
0042   /// elements in partitions 0 through i.  prtn_ranges[0] will be 0.
0043   ///
0044   /// prtn_id_impl must be a function that takes in a row id (int) and returns
0045   /// a partition id (int).  The returned partition id must be between 0 and
0046   /// num_prtns (exclusive).
0047   ///
0048   /// output_pos_impl is a function that takes in a row id (int) and a position (int)
0049   /// in the bucket sorted output.  The function should insert the row in the
0050   /// output.
0051   ///
0052   /// For example:
0053   ///
0054   /// in_arr: [5, 7, 2, 3, 5, 4]
0055   /// num_prtns: 3
0056   /// prtn_id_impl: [&in_arr] (int row_id) { return in_arr[row_id] / 3; }
0057   /// output_pos_impl: [&sorted_row_ids] (int row_id, int pos) { sorted_row_ids[pos] =
0058   /// row_id; }
0059   ///
0060   /// After Execution
0061   /// sorted_row_ids: [2, 0, 3, 4, 5, 1]
0062   /// prtn_ranges: [0, 1, 5, 6]
0063   template <class INPUT_PRTN_ID_FN, class OUTPUT_POS_FN>
0064   static void Eval(int64_t num_rows, int num_prtns, uint16_t* prtn_ranges,
0065                    INPUT_PRTN_ID_FN prtn_id_impl, OUTPUT_POS_FN output_pos_impl) {
0066     ARROW_DCHECK(num_rows > 0 && num_rows <= (1 << 15));
0067     ARROW_DCHECK(num_prtns >= 1 && num_prtns <= (1 << 15));
0068 
0069     memset(prtn_ranges, 0, (num_prtns + 1) * sizeof(uint16_t));
0070 
0071     for (int64_t i = 0; i < num_rows; ++i) {
0072       int prtn_id = static_cast<int>(prtn_id_impl(i));
0073       ++prtn_ranges[prtn_id + 1];
0074     }
0075 
0076     uint16_t sum = 0;
0077     for (int i = 0; i < num_prtns; ++i) {
0078       uint16_t sum_next = sum + prtn_ranges[i + 1];
0079       prtn_ranges[i + 1] = sum;
0080       sum = sum_next;
0081     }
0082 
0083     for (int64_t i = 0; i < num_rows; ++i) {
0084       int prtn_id = static_cast<int>(prtn_id_impl(i));
0085       int pos = prtn_ranges[prtn_id + 1]++;
0086       output_pos_impl(i, pos);
0087     }
0088   }
0089 };
0090 
0091 /// \brief A control for synchronizing threads on a partitionable workload
0092 class PartitionLocks {
0093  public:
0094   PartitionLocks();
0095   ~PartitionLocks();
0096   /// \brief Initializes the control, must be called before use
0097   ///
0098   /// \param num_threads Maximum number of threads that will access the partitions
0099   /// \param num_prtns Number of partitions to synchronize
0100   void Init(size_t num_threads, int num_prtns);
0101   /// \brief Cleans up the control, it should not be used after this call
0102   void CleanUp();
0103   /// \brief Acquire a partition to work on one
0104   ///
0105   /// \param thread_id The index of the thread trying to acquire the partition lock
0106   /// \param num_prtns Length of prtns_to_try, must be <= num_prtns used in Init
0107   /// \param prtns_to_try An array of partitions that still have remaining work
0108   /// \param limit_retries If false, this method will spinwait forever until success
0109   /// \param max_retries Max times to attempt checking out work before returning false
0110   /// \param[out] locked_prtn_id The id of the partition locked
0111   /// \param[out] locked_prtn_id_pos The index of the partition locked in prtns_to_try
0112   /// \return True if a partition was locked, false if max_retries was attempted
0113   ///         without successfully acquiring a lock
0114   ///
0115   /// This method is thread safe
0116   bool AcquirePartitionLock(size_t thread_id, int num_prtns, const int* prtns_to_try,
0117                             bool limit_retries, int max_retries, int* locked_prtn_id,
0118                             int* locked_prtn_id_pos);
0119   /// \brief Release a partition so that other threads can work on it
0120   void ReleasePartitionLock(int prtn_id);
0121 
0122   // Executes (synchronously and using current thread) the same operation on a set of
0123   // multiple partitions. Tries to minimize partition locking overhead by randomizing and
0124   // adjusting order in which partitions are processed.
0125   //
0126   // PROCESS_PRTN_FN is a callback which will be executed for each partition after
0127   // acquiring the lock for that partition. It gets partition id as an argument.
0128   // IS_PRTN_EMPTY_FN is a callback which filters out (when returning true) partitions
0129   // with specific ids from processing.
0130   //
0131   template <typename IS_PRTN_EMPTY_FN, typename PROCESS_PRTN_FN>
0132   Status ForEachPartition(size_t thread_id,
0133                           /*scratch space buffer with space for one element per partition;
0134                              dirty in and dirty out*/
0135                           int* temp_unprocessed_prtns, IS_PRTN_EMPTY_FN is_prtn_empty_fn,
0136                           PROCESS_PRTN_FN process_prtn_fn) {
0137     int num_unprocessed_partitions = 0;
0138     for (int i = 0; i < num_prtns_; ++i) {
0139       bool is_prtn_empty = is_prtn_empty_fn(i);
0140       if (!is_prtn_empty) {
0141         temp_unprocessed_prtns[num_unprocessed_partitions++] = i;
0142       }
0143     }
0144     while (num_unprocessed_partitions > 0) {
0145       int locked_prtn_id;
0146       int locked_prtn_id_pos;
0147       AcquirePartitionLock(thread_id, num_unprocessed_partitions, temp_unprocessed_prtns,
0148                            /*limit_retries=*/false, /*max_retries=*/-1, &locked_prtn_id,
0149                            &locked_prtn_id_pos);
0150       {
0151         class AutoReleaseLock {
0152          public:
0153           AutoReleaseLock(PartitionLocks* locks, int prtn_id)
0154               : locks(locks), prtn_id(prtn_id) {}
0155           ~AutoReleaseLock() { locks->ReleasePartitionLock(prtn_id); }
0156           PartitionLocks* locks;
0157           int prtn_id;
0158         } auto_release_lock(this, locked_prtn_id);
0159         ARROW_RETURN_NOT_OK(process_prtn_fn(locked_prtn_id));
0160       }
0161       if (locked_prtn_id_pos < num_unprocessed_partitions - 1) {
0162         temp_unprocessed_prtns[locked_prtn_id_pos] =
0163             temp_unprocessed_prtns[num_unprocessed_partitions - 1];
0164       }
0165       --num_unprocessed_partitions;
0166     }
0167     return Status::OK();
0168   }
0169 
0170  private:
0171   std::atomic<bool>* lock_ptr(int prtn_id);
0172   int random_int(size_t thread_id, int num_values);
0173 
0174   struct PartitionLock {
0175     static constexpr int kCacheLineBytes = 64;
0176     std::atomic<bool> lock;
0177     uint8_t padding[kCacheLineBytes];
0178   };
0179   int num_prtns_;
0180   std::unique_ptr<PartitionLock[]> locks_;
0181   std::unique_ptr<arrow::random::pcg32_fast[]> rngs_;
0182 };
0183 
0184 }  // namespace acero
0185 }  // namespace arrow