Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:01:07

0001 // Copyright (C) 2014 Ian Forbed
0002 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
0003 //
0004 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
0009 #define BOOST_THREAD_SYNC_PRIORITY_QUEUE
0010 
0011 #include <boost/thread/detail/config.hpp>
0012 
0013 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
0014 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
0015 #include <boost/thread/condition_variable.hpp>
0016 #include <boost/thread/csbl/vector.hpp>
0017 #include <boost/thread/detail/move.hpp>
0018 #include <boost/thread/mutex.hpp>
0019 
0020 #include <boost/atomic.hpp>
0021 #include <boost/chrono/duration.hpp>
0022 #include <boost/chrono/time_point.hpp>
0023 
0024 #include <exception>
0025 #include <queue>
0026 #include <utility>
0027 
0028 #include <boost/config/abi_prefix.hpp>
0029 
0030 namespace boost
0031 {
0032 namespace detail {
0033 
0034   template <
0035     class Type,
0036     class Container = csbl::vector<Type>,
0037     class Compare = std::less<Type>
0038   >
0039   class priority_queue
0040   {
0041   private:
0042       Container _elements;
0043       Compare _compare;
0044   public:
0045       typedef Type value_type;
0046       typedef typename Container::size_type size_type;
0047 
0048       explicit priority_queue(const Compare& compare = Compare())
0049           : _elements(), _compare(compare)
0050       { }
0051 
0052       size_type size() const
0053       {
0054           return _elements.size();
0055       }
0056 
0057       bool empty() const
0058       {
0059           return _elements.empty();
0060       }
0061 
0062       void push(Type const& element)
0063       {
0064           _elements.push_back(element);
0065           std::push_heap(_elements.begin(), _elements.end(), _compare);
0066       }
0067       void push(BOOST_RV_REF(Type) element)
0068       {
0069           _elements.push_back(boost::move(element));
0070           std::push_heap(_elements.begin(), _elements.end(), _compare);
0071       }
0072 
0073       void pop()
0074       {
0075           std::pop_heap(_elements.begin(), _elements.end(), _compare);
0076           _elements.pop_back();
0077       }
0078       Type pull()
0079       {
0080           Type result = boost::move(_elements.front());
0081           pop();
0082           return boost::move(result);
0083       }
0084 
0085       Type const& top() const
0086       {
0087           return _elements.front();
0088       }
0089   };
0090 }
0091 
0092 namespace concurrent
0093 {
0094   template <class ValueType,
0095             class Container = csbl::vector<ValueType>,
0096             class Compare = std::less<typename Container::value_type> >
0097   class sync_priority_queue
0098     : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
0099   {
0100     typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >  super;
0101 
0102   public:
0103     typedef ValueType value_type;
0104     //typedef typename super::value_type value_type; // fixme
0105     typedef typename super::underlying_queue_type underlying_queue_type;
0106     typedef typename super::size_type size_type;
0107     typedef typename super::op_status op_status;
0108 
0109     typedef chrono::steady_clock clock;
0110   protected:
0111 
0112   public:
0113     sync_priority_queue() {}
0114 
0115     ~sync_priority_queue()
0116     {
0117       if(!super::closed())
0118       {
0119         super::close();
0120       }
0121     }
0122 
0123     void push(const ValueType& elem);
0124     void push(BOOST_THREAD_RV_REF(ValueType) elem);
0125 
0126     queue_op_status try_push(const ValueType& elem);
0127     queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
0128 
0129     ValueType pull();
0130 
0131     void pull(ValueType&);
0132 
0133     template <class WClock, class Duration>
0134     queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&);
0135     template <class Rep, class Period>
0136     queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&);
0137 
0138     queue_op_status try_pull(ValueType& elem);
0139     queue_op_status wait_pull(ValueType& elem);
0140     queue_op_status nonblocking_pull(ValueType&);
0141 
0142   private:
0143     void push(unique_lock<mutex>&, const ValueType& elem);
0144     void push(lock_guard<mutex>&, const ValueType& elem);
0145     void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
0146     void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
0147 
0148     queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
0149     queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
0150 
0151     ValueType pull(unique_lock<mutex>&);
0152     ValueType pull(lock_guard<mutex>&);
0153 
0154     void pull(unique_lock<mutex>&, ValueType&);
0155     void pull(lock_guard<mutex>&, ValueType&);
0156 
0157     queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
0158     queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
0159 
0160     queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
0161 
0162     queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
0163 
0164     sync_priority_queue(const sync_priority_queue&);
0165     sync_priority_queue& operator= (const sync_priority_queue&);
0166     sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
0167     sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
0168   }; //end class
0169 
0170 
0171   //////////////////////
0172   template <class T, class Container,class Cmp>
0173   void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
0174   {
0175     super::throw_if_closed(lk);
0176     super::data_.push(elem);
0177     super::notify_elem_added(lk);
0178   }
0179   template <class T, class Container,class Cmp>
0180   void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
0181   {
0182     super::throw_if_closed(lk);
0183     super::data_.push(elem);
0184     super::notify_elem_added(lk);
0185   }
0186   template <class T, class Container,class Cmp>
0187   void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
0188   {
0189     lock_guard<mutex> lk(super::mtx_);
0190     push(lk, elem);
0191   }
0192 
0193   //////////////////////
0194   template <class T, class Container,class Cmp>
0195   void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
0196   {
0197     super::throw_if_closed(lk);
0198     super::data_.push(boost::move(elem));
0199     super::notify_elem_added(lk);
0200   }
0201   template <class T, class Container,class Cmp>
0202   void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
0203   {
0204     super::throw_if_closed(lk);
0205     super::data_.push(boost::move(elem));
0206     super::notify_elem_added(lk);
0207   }
0208   template <class T, class Container,class Cmp>
0209   void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
0210   {
0211     lock_guard<mutex> lk(super::mtx_);
0212     push(lk, boost::move(elem));
0213   }
0214 
0215   //////////////////////
0216   template <class T, class Container,class Cmp>
0217   queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
0218   {
0219     lock_guard<mutex> lk(super::mtx_);
0220     if (super::closed(lk)) return queue_op_status::closed;
0221     push(lk, elem);
0222     return queue_op_status::success;
0223   }
0224 
0225   //////////////////////
0226   template <class T, class Container,class Cmp>
0227   queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
0228   {
0229     lock_guard<mutex> lk(super::mtx_);
0230     if (super::closed(lk)) return queue_op_status::closed;
0231     push(lk, boost::move(elem));
0232 
0233     return queue_op_status::success;
0234   }
0235 
0236   //////////////////////
0237   template <class T,class Container, class Cmp>
0238   T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
0239   {
0240     return super::data_.pull();
0241   }
0242   template <class T,class Container, class Cmp>
0243   T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
0244   {
0245     return super::data_.pull();
0246   }
0247 
0248   template <class T,class Container, class Cmp>
0249   T sync_priority_queue<T,Container,Cmp>::pull()
0250   {
0251     unique_lock<mutex> lk(super::mtx_);
0252     const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
0253     if (has_been_closed) super::throw_if_closed(lk);
0254     return pull(lk);
0255   }
0256 
0257   //////////////////////
0258   template <class T,class Container, class Cmp>
0259   void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
0260   {
0261     elem = super::data_.pull();
0262   }
0263   template <class T,class Container, class Cmp>
0264   void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
0265   {
0266     elem = super::data_.pull();
0267   }
0268 
0269   template <class T,class Container, class Cmp>
0270   void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
0271   {
0272     unique_lock<mutex> lk(super::mtx_);
0273     const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
0274     if (has_been_closed) super::throw_if_closed(lk);
0275     pull(lk, elem);
0276   }
0277 
0278   //////////////////////
0279   template <class T, class Cont,class Cmp>
0280   template <class WClock, class Duration>
0281   queue_op_status
0282   sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
0283   {
0284     unique_lock<mutex> lk(super::mtx_);
0285     const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp);
0286     if (rc == queue_op_status::success) pull(lk, elem);
0287     return rc;
0288   }
0289 
0290   //////////////////////
0291   template <class T, class Cont,class Cmp>
0292   template <class Rep, class Period>
0293   queue_op_status
0294   sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
0295   {
0296     return pull_until(chrono::steady_clock::now() + dura, elem);
0297   }
0298 
0299   //////////////////////
0300   template <class T, class Container,class Cmp>
0301   queue_op_status
0302   sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
0303   {
0304     if (super::empty(lk))
0305     {
0306       if (super::closed(lk)) return queue_op_status::closed;
0307       return queue_op_status::empty;
0308     }
0309     pull(lk, elem);
0310     return queue_op_status::success;
0311   }
0312 
0313   template <class T, class Container,class Cmp>
0314   queue_op_status
0315   sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
0316   {
0317     if (super::empty(lk))
0318     {
0319       if (super::closed(lk)) return queue_op_status::closed;
0320       return queue_op_status::empty;
0321     }
0322     pull(lk, elem);
0323     return queue_op_status::success;
0324   }
0325 
0326   template <class T, class Container,class Cmp>
0327   queue_op_status
0328   sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
0329   {
0330     lock_guard<mutex> lk(super::mtx_);
0331     return try_pull(lk, elem);
0332   }
0333 
0334   //////////////////////
0335   template <class T,class Container, class Cmp>
0336   queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
0337   {
0338     const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
0339     if (has_been_closed) return queue_op_status::closed;
0340     pull(lk, elem);
0341     return queue_op_status::success;
0342   }
0343 
0344   template <class T,class Container, class Cmp>
0345   queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
0346   {
0347     unique_lock<mutex> lk(super::mtx_);
0348     return wait_pull(lk, elem);
0349   }
0350 
0351   //////////////////////
0352   template <class T,class Container, class Cmp>
0353   queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
0354   {
0355     unique_lock<mutex> lk(super::mtx_, try_to_lock);
0356     if (!lk.owns_lock()) return queue_op_status::busy;
0357     return try_pull(lk, elem);
0358   }
0359 
0360 
0361 
0362 } //end concurrent namespace
0363 
0364 using concurrent::sync_priority_queue;
0365 
0366 } //end boost namespace
0367 #include <boost/config/abi_suffix.hpp>
0368 
0369 #endif