Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 10:14:27

0001 // This file is part of Eigen, a lightweight C++ template library
0002 // for linear algebra.
0003 //
0004 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
0005 //
0006 // This Source Code Form is subject to the terms of the Mozilla
0007 // Public License v. 2.0. If a copy of the MPL was not distributed
0008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
0009 
0010 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
0011 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
0012 
0013 namespace Eigen {
0014 
0015 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
0016 // Operations on front of the queue must be done by a single thread (owner),
0017 // operations on back of the queue can be done by multiple threads concurrently.
0018 //
0019 // Algorithm outline:
0020 // All remote threads operating on the queue back are serialized by a mutex.
0021 // This ensures that at most two threads access state: owner and one remote
0022 // thread (Size aside). The algorithm ensures that the occupied region of the
0023 // underlying array is logically continuous (can wraparound, but no stray
0024 // occupied elements). Owner operates on one end of this region, remote thread
0025 // operates on the other end. Synchronization between these threads
0026 // (potential consumption of the last element and take up of the last empty
0027 // element) happens by means of state variable in each element. States are:
0028 // empty, busy (in process of insertion of removal) and ready. Threads claim
0029 // elements (empty->busy and ready->busy transitions) by means of a CAS
0030 // operation. The finishing transition (busy->empty and busy->ready) are done
0031 // with plain store as the element is exclusively owned by the current thread.
0032 //
0033 // Note: we could permit only pointers as elements, then we would not need
0034 // separate state variable as null/non-null pointer value would serve as state,
0035 // but that would require malloc/free per operation for large, complex values
0036 // (and this is designed to store std::function<()>).
0037 template <typename Work, unsigned kSize>
0038 class RunQueue {
0039  public:
0040   RunQueue() : front_(0), back_(0) {
0041     // require power-of-two for fast masking
0042     eigen_plain_assert((kSize & (kSize - 1)) == 0);
0043     eigen_plain_assert(kSize > 2);            // why would you do this?
0044     eigen_plain_assert(kSize <= (64 << 10));  // leave enough space for counter
0045     for (unsigned i = 0; i < kSize; i++)
0046       array_[i].state.store(kEmpty, std::memory_order_relaxed);
0047   }
0048 
0049   ~RunQueue() { eigen_plain_assert(Size() == 0); }
0050 
0051   // PushFront inserts w at the beginning of the queue.
0052   // If queue is full returns w, otherwise returns default-constructed Work.
0053   Work PushFront(Work w) {
0054     unsigned front = front_.load(std::memory_order_relaxed);
0055     Elem* e = &array_[front & kMask];
0056     uint8_t s = e->state.load(std::memory_order_relaxed);
0057     if (s != kEmpty ||
0058         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
0059       return w;
0060     front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
0061     e->w = std::move(w);
0062     e->state.store(kReady, std::memory_order_release);
0063     return Work();
0064   }
0065 
0066   // PopFront removes and returns the first element in the queue.
0067   // If the queue was empty returns default-constructed Work.
0068   Work PopFront() {
0069     unsigned front = front_.load(std::memory_order_relaxed);
0070     Elem* e = &array_[(front - 1) & kMask];
0071     uint8_t s = e->state.load(std::memory_order_relaxed);
0072     if (s != kReady ||
0073         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
0074       return Work();
0075     Work w = std::move(e->w);
0076     e->state.store(kEmpty, std::memory_order_release);
0077     front = ((front - 1) & kMask2) | (front & ~kMask2);
0078     front_.store(front, std::memory_order_relaxed);
0079     return w;
0080   }
0081 
0082   // PushBack adds w at the end of the queue.
0083   // If queue is full returns w, otherwise returns default-constructed Work.
0084   Work PushBack(Work w) {
0085     std::unique_lock<std::mutex> lock(mutex_);
0086     unsigned back = back_.load(std::memory_order_relaxed);
0087     Elem* e = &array_[(back - 1) & kMask];
0088     uint8_t s = e->state.load(std::memory_order_relaxed);
0089     if (s != kEmpty ||
0090         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
0091       return w;
0092     back = ((back - 1) & kMask2) | (back & ~kMask2);
0093     back_.store(back, std::memory_order_relaxed);
0094     e->w = std::move(w);
0095     e->state.store(kReady, std::memory_order_release);
0096     return Work();
0097   }
0098 
0099   // PopBack removes and returns the last elements in the queue.
0100   Work PopBack() {
0101     if (Empty()) return Work();
0102     std::unique_lock<std::mutex> lock(mutex_);
0103     unsigned back = back_.load(std::memory_order_relaxed);
0104     Elem* e = &array_[back & kMask];
0105     uint8_t s = e->state.load(std::memory_order_relaxed);
0106     if (s != kReady ||
0107         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
0108       return Work();
0109     Work w = std::move(e->w);
0110     e->state.store(kEmpty, std::memory_order_release);
0111     back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
0112     return w;
0113   }
0114 
0115   // PopBackHalf removes and returns half last elements in the queue.
0116   // Returns number of elements removed.
0117   unsigned PopBackHalf(std::vector<Work>* result) {
0118     if (Empty()) return 0;
0119     std::unique_lock<std::mutex> lock(mutex_);
0120     unsigned back = back_.load(std::memory_order_relaxed);
0121     unsigned size = Size();
0122     unsigned mid = back;
0123     if (size > 1) mid = back + (size - 1) / 2;
0124     unsigned n = 0;
0125     unsigned start = 0;
0126     for (; static_cast<int>(mid - back) >= 0; mid--) {
0127       Elem* e = &array_[mid & kMask];
0128       uint8_t s = e->state.load(std::memory_order_relaxed);
0129       if (n == 0) {
0130         if (s != kReady || !e->state.compare_exchange_strong(
0131                                s, kBusy, std::memory_order_acquire))
0132           continue;
0133         start = mid;
0134       } else {
0135         // Note: no need to store temporal kBusy, we exclusively own these
0136         // elements.
0137         eigen_plain_assert(s == kReady);
0138       }
0139       result->push_back(std::move(e->w));
0140       e->state.store(kEmpty, std::memory_order_release);
0141       n++;
0142     }
0143     if (n != 0)
0144       back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
0145     return n;
0146   }
0147 
0148   // Size returns current queue size.
0149   // Can be called by any thread at any time.
0150   unsigned Size() const { return SizeOrNotEmpty<true>(); }
0151 
0152   // Empty tests whether container is empty.
0153   // Can be called by any thread at any time.
0154   bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
0155 
0156   // Delete all the elements from the queue.
0157   void Flush() {
0158     while (!Empty()) {
0159       PopFront();
0160     }
0161   }
0162 
0163  private:
0164   static const unsigned kMask = kSize - 1;
0165   static const unsigned kMask2 = (kSize << 1) - 1;
0166   struct Elem {
0167     std::atomic<uint8_t> state;
0168     Work w;
0169   };
0170   enum {
0171     kEmpty,
0172     kBusy,
0173     kReady,
0174   };
0175   std::mutex mutex_;
0176   // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
0177   // front/back, respectively. The remaining bits contain modification counters
0178   // that are incremented on Push operations. This allows us to (1) distinguish
0179   // between empty and full conditions (if we would use log(kSize) bits for
0180   // position, these conditions would be indistinguishable); (2) obtain
0181   // consistent snapshot of front_/back_ for Size operation using the
0182   // modification counters.
0183   std::atomic<unsigned> front_;
0184   std::atomic<unsigned> back_;
0185   Elem array_[kSize];
0186 
0187   // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
0188   // only whether the size is 0 is guaranteed to be correct.
0189   // Can be called by any thread at any time.
0190   template<bool NeedSizeEstimate>
0191   unsigned SizeOrNotEmpty() const {
0192     // Emptiness plays critical role in thread pool blocking. So we go to great
0193     // effort to not produce false positives (claim non-empty queue as empty).
0194     unsigned front = front_.load(std::memory_order_acquire);
0195     for (;;) {
0196       // Capture a consistent snapshot of front/tail.
0197       unsigned back = back_.load(std::memory_order_acquire);
0198       unsigned front1 = front_.load(std::memory_order_relaxed);
0199       if (front != front1) {
0200         front = front1;
0201         std::atomic_thread_fence(std::memory_order_acquire);
0202         continue;
0203       }
0204       if (NeedSizeEstimate) {
0205         return CalculateSize(front, back);
0206       } else {
0207         // This value will be 0 if the queue is empty, and undefined otherwise.
0208         unsigned maybe_zero = ((front ^ back) & kMask2);
0209         // Queue size estimate must agree with maybe zero check on the queue
0210         // empty/non-empty state.
0211         eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
0212         return maybe_zero;
0213       }
0214     }
0215   }
0216 
0217   EIGEN_ALWAYS_INLINE
0218   unsigned CalculateSize(unsigned front, unsigned back) const {
0219     int size = (front & kMask2) - (back & kMask2);
0220     // Fix overflow.
0221     if (size < 0) size += 2 * kSize;
0222     // Order of modification in push/pop is crafted to make the queue look
0223     // larger than it is during concurrent modifications. E.g. push can
0224     // increment size before the corresponding pop has decremented it.
0225     // So the computed size can be up to kSize + 1, fix it.
0226     if (size > static_cast<int>(kSize)) size = kSize;
0227     return static_cast<unsigned>(size);
0228   }
0229 
0230   RunQueue(const RunQueue&) = delete;
0231   void operator=(const RunQueue&) = delete;
0232 };
0233 
0234 }  // namespace Eigen
0235 
0236 #endif  // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_