File indexing completed on 2025-01-30 10:01:07
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
0009 #define BOOST_THREAD_SYNC_PRIORITY_QUEUE
0010
0011 #include <boost/thread/detail/config.hpp>
0012
0013 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
0014 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
0015 #include <boost/thread/condition_variable.hpp>
0016 #include <boost/thread/csbl/vector.hpp>
0017 #include <boost/thread/detail/move.hpp>
0018 #include <boost/thread/mutex.hpp>
0019
0020 #include <boost/atomic.hpp>
0021 #include <boost/chrono/duration.hpp>
0022 #include <boost/chrono/time_point.hpp>
0023
0024 #include <exception>
0025 #include <queue>
0026 #include <utility>
0027
0028 #include <boost/config/abi_prefix.hpp>
0029
0030 namespace boost
0031 {
0032 namespace detail {
0033
0034 template <
0035 class Type,
0036 class Container = csbl::vector<Type>,
0037 class Compare = std::less<Type>
0038 >
0039 class priority_queue
0040 {
0041 private:
0042 Container _elements;
0043 Compare _compare;
0044 public:
0045 typedef Type value_type;
0046 typedef typename Container::size_type size_type;
0047
0048 explicit priority_queue(const Compare& compare = Compare())
0049 : _elements(), _compare(compare)
0050 { }
0051
0052 size_type size() const
0053 {
0054 return _elements.size();
0055 }
0056
0057 bool empty() const
0058 {
0059 return _elements.empty();
0060 }
0061
0062 void push(Type const& element)
0063 {
0064 _elements.push_back(element);
0065 std::push_heap(_elements.begin(), _elements.end(), _compare);
0066 }
0067 void push(BOOST_RV_REF(Type) element)
0068 {
0069 _elements.push_back(boost::move(element));
0070 std::push_heap(_elements.begin(), _elements.end(), _compare);
0071 }
0072
0073 void pop()
0074 {
0075 std::pop_heap(_elements.begin(), _elements.end(), _compare);
0076 _elements.pop_back();
0077 }
0078 Type pull()
0079 {
0080 Type result = boost::move(_elements.front());
0081 pop();
0082 return boost::move(result);
0083 }
0084
0085 Type const& top() const
0086 {
0087 return _elements.front();
0088 }
0089 };
0090 }
0091
0092 namespace concurrent
0093 {
0094 template <class ValueType,
0095 class Container = csbl::vector<ValueType>,
0096 class Compare = std::less<typename Container::value_type> >
0097 class sync_priority_queue
0098 : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
0099 {
0100 typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super;
0101
0102 public:
0103 typedef ValueType value_type;
0104
0105 typedef typename super::underlying_queue_type underlying_queue_type;
0106 typedef typename super::size_type size_type;
0107 typedef typename super::op_status op_status;
0108
0109 typedef chrono::steady_clock clock;
0110 protected:
0111
0112 public:
0113 sync_priority_queue() {}
0114
0115 ~sync_priority_queue()
0116 {
0117 if(!super::closed())
0118 {
0119 super::close();
0120 }
0121 }
0122
0123 void push(const ValueType& elem);
0124 void push(BOOST_THREAD_RV_REF(ValueType) elem);
0125
0126 queue_op_status try_push(const ValueType& elem);
0127 queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
0128
0129 ValueType pull();
0130
0131 void pull(ValueType&);
0132
0133 template <class WClock, class Duration>
0134 queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&);
0135 template <class Rep, class Period>
0136 queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&);
0137
0138 queue_op_status try_pull(ValueType& elem);
0139 queue_op_status wait_pull(ValueType& elem);
0140 queue_op_status nonblocking_pull(ValueType&);
0141
0142 private:
0143 void push(unique_lock<mutex>&, const ValueType& elem);
0144 void push(lock_guard<mutex>&, const ValueType& elem);
0145 void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
0146 void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
0147
0148 queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
0149 queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
0150
0151 ValueType pull(unique_lock<mutex>&);
0152 ValueType pull(lock_guard<mutex>&);
0153
0154 void pull(unique_lock<mutex>&, ValueType&);
0155 void pull(lock_guard<mutex>&, ValueType&);
0156
0157 queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
0158 queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
0159
0160 queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
0161
0162 queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
0163
0164 sync_priority_queue(const sync_priority_queue&);
0165 sync_priority_queue& operator= (const sync_priority_queue&);
0166 sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
0167 sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
0168 };
0169
0170
0171
0172 template <class T, class Container,class Cmp>
0173 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
0174 {
0175 super::throw_if_closed(lk);
0176 super::data_.push(elem);
0177 super::notify_elem_added(lk);
0178 }
0179 template <class T, class Container,class Cmp>
0180 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
0181 {
0182 super::throw_if_closed(lk);
0183 super::data_.push(elem);
0184 super::notify_elem_added(lk);
0185 }
0186 template <class T, class Container,class Cmp>
0187 void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
0188 {
0189 lock_guard<mutex> lk(super::mtx_);
0190 push(lk, elem);
0191 }
0192
0193
0194 template <class T, class Container,class Cmp>
0195 void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
0196 {
0197 super::throw_if_closed(lk);
0198 super::data_.push(boost::move(elem));
0199 super::notify_elem_added(lk);
0200 }
0201 template <class T, class Container,class Cmp>
0202 void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
0203 {
0204 super::throw_if_closed(lk);
0205 super::data_.push(boost::move(elem));
0206 super::notify_elem_added(lk);
0207 }
0208 template <class T, class Container,class Cmp>
0209 void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
0210 {
0211 lock_guard<mutex> lk(super::mtx_);
0212 push(lk, boost::move(elem));
0213 }
0214
0215
0216 template <class T, class Container,class Cmp>
0217 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
0218 {
0219 lock_guard<mutex> lk(super::mtx_);
0220 if (super::closed(lk)) return queue_op_status::closed;
0221 push(lk, elem);
0222 return queue_op_status::success;
0223 }
0224
0225
0226 template <class T, class Container,class Cmp>
0227 queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
0228 {
0229 lock_guard<mutex> lk(super::mtx_);
0230 if (super::closed(lk)) return queue_op_status::closed;
0231 push(lk, boost::move(elem));
0232
0233 return queue_op_status::success;
0234 }
0235
0236
0237 template <class T,class Container, class Cmp>
0238 T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
0239 {
0240 return super::data_.pull();
0241 }
0242 template <class T,class Container, class Cmp>
0243 T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
0244 {
0245 return super::data_.pull();
0246 }
0247
0248 template <class T,class Container, class Cmp>
0249 T sync_priority_queue<T,Container,Cmp>::pull()
0250 {
0251 unique_lock<mutex> lk(super::mtx_);
0252 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
0253 if (has_been_closed) super::throw_if_closed(lk);
0254 return pull(lk);
0255 }
0256
0257
0258 template <class T,class Container, class Cmp>
0259 void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
0260 {
0261 elem = super::data_.pull();
0262 }
0263 template <class T,class Container, class Cmp>
0264 void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
0265 {
0266 elem = super::data_.pull();
0267 }
0268
0269 template <class T,class Container, class Cmp>
0270 void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
0271 {
0272 unique_lock<mutex> lk(super::mtx_);
0273 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
0274 if (has_been_closed) super::throw_if_closed(lk);
0275 pull(lk, elem);
0276 }
0277
0278
0279 template <class T, class Cont,class Cmp>
0280 template <class WClock, class Duration>
0281 queue_op_status
0282 sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
0283 {
0284 unique_lock<mutex> lk(super::mtx_);
0285 const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp);
0286 if (rc == queue_op_status::success) pull(lk, elem);
0287 return rc;
0288 }
0289
0290
0291 template <class T, class Cont,class Cmp>
0292 template <class Rep, class Period>
0293 queue_op_status
0294 sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
0295 {
0296 return pull_until(chrono::steady_clock::now() + dura, elem);
0297 }
0298
0299
0300 template <class T, class Container,class Cmp>
0301 queue_op_status
0302 sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
0303 {
0304 if (super::empty(lk))
0305 {
0306 if (super::closed(lk)) return queue_op_status::closed;
0307 return queue_op_status::empty;
0308 }
0309 pull(lk, elem);
0310 return queue_op_status::success;
0311 }
0312
0313 template <class T, class Container,class Cmp>
0314 queue_op_status
0315 sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
0316 {
0317 if (super::empty(lk))
0318 {
0319 if (super::closed(lk)) return queue_op_status::closed;
0320 return queue_op_status::empty;
0321 }
0322 pull(lk, elem);
0323 return queue_op_status::success;
0324 }
0325
0326 template <class T, class Container,class Cmp>
0327 queue_op_status
0328 sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
0329 {
0330 lock_guard<mutex> lk(super::mtx_);
0331 return try_pull(lk, elem);
0332 }
0333
0334
0335 template <class T,class Container, class Cmp>
0336 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
0337 {
0338 const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
0339 if (has_been_closed) return queue_op_status::closed;
0340 pull(lk, elem);
0341 return queue_op_status::success;
0342 }
0343
0344 template <class T,class Container, class Cmp>
0345 queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
0346 {
0347 unique_lock<mutex> lk(super::mtx_);
0348 return wait_pull(lk, elem);
0349 }
0350
0351
0352 template <class T,class Container, class Cmp>
0353 queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
0354 {
0355 unique_lock<mutex> lk(super::mtx_, try_to_lock);
0356 if (!lk.owns_lock()) return queue_op_status::busy;
0357 return try_pull(lk, elem);
0358 }
0359
0360
0361
0362 }
0363
0364 using concurrent::sync_priority_queue;
0365
0366 }
0367 #include <boost/config/abi_suffix.hpp>
0368
0369 #endif