Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-03-13 09:27:47

0001 // Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
0002 // Distributed under the MIT License (http://opensource.org/licenses/MIT)
0003 
0004 #pragma once
0005 
0006 // multi producer-multi consumer blocking queue.
0007 // enqueue(..) - will block until room found to put the new message.
0008 // enqueue_nowait(..) - will return immediately with false if no room left in
0009 // the queue.
0010 // dequeue_for(..) - will block until the queue is not empty or timeout have
0011 // passed.
0012 
0013 #include <spdlog/details/circular_q.h>
0014 
0015 #include <condition_variable>
0016 #include <mutex>
0017 
0018 namespace spdlog {
0019 namespace details {
0020 
0021 template<typename T>
0022 class mpmc_blocking_queue
0023 {
0024 public:
0025     using item_type = T;
0026     explicit mpmc_blocking_queue(size_t max_items)
0027         : q_(max_items)
0028     {}
0029 
0030 #ifndef __MINGW32__
0031     // try to enqueue and block if no room left
0032     void enqueue(T &&item)
0033     {
0034         {
0035             std::unique_lock<std::mutex> lock(queue_mutex_);
0036             pop_cv_.wait(lock, [this] { return !this->q_.full(); });
0037             q_.push_back(std::move(item));
0038         }
0039         push_cv_.notify_one();
0040     }
0041 
0042     // enqueue immediately. overrun oldest message in the queue if no room left.
0043     void enqueue_nowait(T &&item)
0044     {
0045         {
0046             std::unique_lock<std::mutex> lock(queue_mutex_);
0047             q_.push_back(std::move(item));
0048         }
0049         push_cv_.notify_one();
0050     }
0051 
0052     // try to dequeue item. if no item found. wait up to timeout and try again
0053     // Return true, if succeeded dequeue item, false otherwise
0054     bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
0055     {
0056         {
0057             std::unique_lock<std::mutex> lock(queue_mutex_);
0058             if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
0059             {
0060                 return false;
0061             }
0062             popped_item = std::move(q_.front());
0063             q_.pop_front();
0064         }
0065         pop_cv_.notify_one();
0066         return true;
0067     }
0068 
0069 #else
0070     // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
0071     // so release the mutex at the very end each function.
0072 
0073     // try to enqueue and block if no room left
0074     void enqueue(T &&item)
0075     {
0076         std::unique_lock<std::mutex> lock(queue_mutex_);
0077         pop_cv_.wait(lock, [this] { return !this->q_.full(); });
0078         q_.push_back(std::move(item));
0079         push_cv_.notify_one();
0080     }
0081 
0082     // enqueue immediately. overrun oldest message in the queue if no room left.
0083     void enqueue_nowait(T &&item)
0084     {
0085         std::unique_lock<std::mutex> lock(queue_mutex_);
0086         q_.push_back(std::move(item));
0087         push_cv_.notify_one();
0088     }
0089 
0090     // try to dequeue item. if no item found. wait up to timeout and try again
0091     // Return true, if succeeded dequeue item, false otherwise
0092     bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
0093     {
0094         std::unique_lock<std::mutex> lock(queue_mutex_);
0095         if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
0096         {
0097             return false;
0098         }
0099         popped_item = std::move(q_.front());
0100         q_.pop_front();
0101         pop_cv_.notify_one();
0102         return true;
0103     }
0104 
0105 #endif
0106 
0107     size_t overrun_counter()
0108     {
0109         std::unique_lock<std::mutex> lock(queue_mutex_);
0110         return q_.overrun_counter();
0111     }
0112 
0113     size_t size()
0114     {
0115         std::unique_lock<std::mutex> lock(queue_mutex_);
0116         return q_.size();
0117     }
0118 
0119     void reset_overrun_counter()
0120     {
0121         std::unique_lock<std::mutex> lock(queue_mutex_);
0122         q_.reset_overrun_counter();
0123     }
0124 
0125 private:
0126     std::mutex queue_mutex_;
0127     std::condition_variable push_cv_;
0128     std::condition_variable pop_cv_;
0129     spdlog::details::circular_q<T> q_;
0130 };
0131 } // namespace details
0132 } // namespace spdlog