Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-06 08:52:51

0001 // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
0002 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
0003 
0004 #pragma once
0005 
0006 // multi producer-multi consumer blocking queue.
0007 // enqueue(..) - will block until room found to put the new message.
0008 // enqueue_nowait(..) - will return immediately with false if no room left in
0009 // the queue.
0010 // dequeue_for(..) - will block until the queue is not empty or timeout have
0011 // passed.
0012 
0013 #include <spdlog/details/circular_q.h>
0014 
0015 #include <atomic>
0016 #include <condition_variable>
0017 #include <mutex>
0018 
0019 namespace spdlog {
0020 namespace details {
0021 
0022 template <typename T>
0023 class mpmc_blocking_queue {
0024 public:
0025     using item_type = T;
0026     explicit mpmc_blocking_queue(size_t max_items)
0027         : q_(max_items) {}
0028 
0029 #ifndef __MINGW32__
0030     // try to enqueue and block if no room left
0031     void enqueue(T &&item) {
0032         {
0033             std::unique_lock<std::mutex> lock(queue_mutex_);
0034             pop_cv_.wait(lock, [this] { return !this->q_.full(); });
0035             q_.push_back(std::move(item));
0036         }
0037         push_cv_.notify_one();
0038     }
0039 
0040     // enqueue immediately. overrun oldest message in the queue if no room left.
0041     void enqueue_nowait(T &&item) {
0042         {
0043             std::unique_lock<std::mutex> lock(queue_mutex_);
0044             q_.push_back(std::move(item));
0045         }
0046         push_cv_.notify_one();
0047     }
0048 
0049     void enqueue_if_have_room(T &&item) {
0050         bool pushed = false;
0051         {
0052             std::unique_lock<std::mutex> lock(queue_mutex_);
0053             if (!q_.full()) {
0054                 q_.push_back(std::move(item));
0055                 pushed = true;
0056             }
0057         }
0058 
0059         if (pushed) {
0060             push_cv_.notify_one();
0061         } else {
0062             ++discard_counter_;
0063         }
0064     }
0065 
0066     // dequeue with a timeout.
0067     // Return true, if succeeded dequeue item, false otherwise
0068     bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) {
0069         {
0070             std::unique_lock<std::mutex> lock(queue_mutex_);
0071             if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); })) {
0072                 return false;
0073             }
0074             popped_item = std::move(q_.front());
0075             q_.pop_front();
0076         }
0077         pop_cv_.notify_one();
0078         return true;
0079     }
0080 
0081     // blocking dequeue without a timeout.
0082     void dequeue(T &popped_item) {
0083         {
0084             std::unique_lock<std::mutex> lock(queue_mutex_);
0085             push_cv_.wait(lock, [this] { return !this->q_.empty(); });
0086             popped_item = std::move(q_.front());
0087             q_.pop_front();
0088         }
0089         pop_cv_.notify_one();
0090     }
0091 
0092 #else
0093     // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
0094     // so release the mutex at the very end each function.
0095 
0096     // try to enqueue and block if no room left
0097     void enqueue(T &&item) {
0098         std::unique_lock<std::mutex> lock(queue_mutex_);
0099         pop_cv_.wait(lock, [this] { return !this->q_.full(); });
0100         q_.push_back(std::move(item));
0101         push_cv_.notify_one();
0102     }
0103 
0104     // enqueue immediately. overrun oldest message in the queue if no room left.
0105     void enqueue_nowait(T &&item) {
0106         std::unique_lock<std::mutex> lock(queue_mutex_);
0107         q_.push_back(std::move(item));
0108         push_cv_.notify_one();
0109     }
0110 
0111     void enqueue_if_have_room(T &&item) {
0112         bool pushed = false;
0113         std::unique_lock<std::mutex> lock(queue_mutex_);
0114         if (!q_.full()) {
0115             q_.push_back(std::move(item));
0116             pushed = true;
0117         }
0118 
0119         if (pushed) {
0120             push_cv_.notify_one();
0121         } else {
0122             ++discard_counter_;
0123         }
0124     }
0125 
0126     // dequeue with a timeout.
0127     // Return true, if succeeded dequeue item, false otherwise
0128     bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration) {
0129         std::unique_lock<std::mutex> lock(queue_mutex_);
0130         if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); })) {
0131             return false;
0132         }
0133         popped_item = std::move(q_.front());
0134         q_.pop_front();
0135         pop_cv_.notify_one();
0136         return true;
0137     }
0138 
0139     // blocking dequeue without a timeout.
0140     void dequeue(T &popped_item) {
0141         std::unique_lock<std::mutex> lock(queue_mutex_);
0142         push_cv_.wait(lock, [this] { return !this->q_.empty(); });
0143         popped_item = std::move(q_.front());
0144         q_.pop_front();
0145         pop_cv_.notify_one();
0146     }
0147 
0148 #endif
0149 
0150     size_t overrun_counter() {
0151         std::unique_lock<std::mutex> lock(queue_mutex_);
0152         return q_.overrun_counter();
0153     }
0154 
0155     size_t discard_counter() { return discard_counter_.load(std::memory_order_relaxed); }
0156 
0157     size_t size() {
0158         std::unique_lock<std::mutex> lock(queue_mutex_);
0159         return q_.size();
0160     }
0161 
0162     void reset_overrun_counter() {
0163         std::unique_lock<std::mutex> lock(queue_mutex_);
0164         q_.reset_overrun_counter();
0165     }
0166 
0167     void reset_discard_counter() { discard_counter_.store(0, std::memory_order_relaxed); }
0168 
0169 private:
0170     std::mutex queue_mutex_;
0171     std::condition_variable push_cv_;
0172     std::condition_variable pop_cv_;
0173     spdlog::details::circular_q<T> q_;
0174     std::atomic<size_t> discard_counter_{0};
0175 };
0176 }  // namespace details
0177 }  // namespace spdlog