File indexing completed on 2025-01-30 10:01:08
0001
0002
0003
0004
0005
0006
0007
0008 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
0009 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
0010
0011 #include <boost/thread/detail/config.hpp>
0012
0013 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
0014 #include <boost/chrono/duration.hpp>
0015 #include <boost/chrono/time_point.hpp>
0016 #include <boost/chrono/system_clocks.hpp>
0017 #include <boost/chrono/chrono_io.hpp>
0018
0019 #include <algorithm> // std::min
0020
0021 #include <boost/config/abi_prefix.hpp>
0022
0023 namespace boost
0024 {
0025 namespace concurrent
0026 {
0027 namespace detail
0028 {
0029
0030 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
0031 struct scheduled_type
0032 {
0033 typedef T value_type;
0034 typedef Clock clock;
0035 typedef TimePoint time_point;
0036 T data;
0037 time_point time;
0038
0039 BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
0040
0041 scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
0042 scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
0043
0044 scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
0045 scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
0046 data = other.data;
0047 time = other.time;
0048 return *this;
0049 }
0050
0051 scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
0052 scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
0053 data = boost::move(other.data);
0054 time = other.time;
0055 return *this;
0056 }
0057
0058 bool operator <(const scheduled_type & other) const
0059 {
0060 return this->time > other.time;
0061 }
0062 };
0063
0064 template <class Duration>
0065 chrono::time_point<chrono::steady_clock,Duration>
0066 limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
0067 {
0068
0069 return tp;
0070 }
0071
0072 template <class Clock, class Duration>
0073 chrono::time_point<Clock,Duration>
0074 limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
0075 {
0076
0077
0078
0079 const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
0080 return (std::min)(tp, tpmax);
0081 }
0082
0083 template <class Duration>
0084 chrono::steady_clock::time_point
0085 convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
0086 {
0087
0088 return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
0089 }
0090
0091 template <class Clock, class Duration>
0092 chrono::steady_clock::time_point
0093 convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
0094 {
0095
0096
0097
0098 const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
0099 const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
0100 return chrono::steady_clock::now() + (std::min)(dura, duramax);
0101 }
0102
0103 }
0104
0105 template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
0106 class sync_timed_queue
0107 : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
0108 {
0109 typedef detail::scheduled_type<T, Clock, TimePoint> stype;
0110 typedef sync_priority_queue<stype> super;
0111 public:
0112 typedef T value_type;
0113 typedef Clock clock;
0114 typedef typename clock::duration duration;
0115 typedef typename clock::time_point time_point;
0116 typedef typename super::underlying_queue_type underlying_queue_type;
0117 typedef typename super::size_type size_type;
0118 typedef typename super::op_status op_status;
0119
0120 sync_timed_queue() : super() {};
0121 ~sync_timed_queue() {}
0122
0123 using super::size;
0124 using super::empty;
0125 using super::full;
0126 using super::close;
0127 using super::closed;
0128
0129 T pull();
0130 void pull(T& elem);
0131
0132 template <class Duration>
0133 queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
0134 template <class Rep, class Period>
0135 queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
0136
0137 queue_op_status try_pull(T& elem);
0138 queue_op_status wait_pull(T& elem);
0139 queue_op_status nonblocking_pull(T& elem);
0140
0141 template <class Duration>
0142 void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
0143 template <class Rep, class Period>
0144 void push(const T& elem, chrono::duration<Rep,Period> const& dura);
0145
0146 template <class Duration>
0147 void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
0148 template <class Rep, class Period>
0149 void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
0150
0151 template <class Duration>
0152 queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
0153 template <class Rep, class Period>
0154 queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
0155
0156 template <class Duration>
0157 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
0158 template <class Rep, class Period>
0159 queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
0160
0161 private:
0162 inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
0163 inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
0164
0165 bool wait_to_pull(unique_lock<mutex>&);
0166 queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
0167 template <class Rep, class Period>
0168 queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
0169
0170 T pull(unique_lock<mutex>&);
0171 T pull(lock_guard<mutex>&);
0172
0173 void pull(unique_lock<mutex>&, T& elem);
0174 void pull(lock_guard<mutex>&, T& elem);
0175
0176 queue_op_status try_pull(unique_lock<mutex>&, T& elem);
0177 queue_op_status try_pull(lock_guard<mutex>&, T& elem);
0178
0179 queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
0180
0181 sync_timed_queue(const sync_timed_queue&);
0182 sync_timed_queue& operator=(const sync_timed_queue&);
0183 sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
0184 sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
0185 };
0186
0187
0188 template <class T, class Clock, class TimePoint>
0189 template <class Duration>
0190 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
0191 {
0192 super::push(stype(elem,tp));
0193 }
0194
0195 template <class T, class Clock, class TimePoint>
0196 template <class Rep, class Period>
0197 void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
0198 {
0199 push(elem, clock::now() + dura);
0200 }
0201
0202 template <class T, class Clock, class TimePoint>
0203 template <class Duration>
0204 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
0205 {
0206 super::push(stype(boost::move(elem),tp));
0207 }
0208
0209 template <class T, class Clock, class TimePoint>
0210 template <class Rep, class Period>
0211 void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
0212 {
0213 push(boost::move(elem), clock::now() + dura);
0214 }
0215
0216
0217
0218 template <class T, class Clock, class TimePoint>
0219 template <class Duration>
0220 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
0221 {
0222 return super::try_push(stype(elem,tp));
0223 }
0224
0225 template <class T, class Clock, class TimePoint>
0226 template <class Rep, class Period>
0227 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
0228 {
0229 return try_push(elem,clock::now() + dura);
0230 }
0231
0232 template <class T, class Clock, class TimePoint>
0233 template <class Duration>
0234 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
0235 {
0236 return super::try_push(stype(boost::move(elem), tp));
0237 }
0238
0239 template <class T, class Clock, class TimePoint>
0240 template <class Rep, class Period>
0241 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
0242 {
0243 return try_push(boost::move(elem), clock::now() + dura);
0244 }
0245
0246
0247 template <class T, class Clock, class TimePoint>
0248 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
0249 {
0250 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
0251 }
0252
0253 template <class T, class Clock, class TimePoint>
0254 bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
0255 {
0256 return ! super::empty(lk) && clock::now() >= super::data_.top().time;
0257 }
0258
0259
0260 template <class T, class Clock, class TimePoint>
0261 bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
0262 {
0263 for (;;)
0264 {
0265 if (not_empty_and_time_reached(lk)) return false;
0266 if (super::closed(lk)) return true;
0267
0268 super::wait_until_not_empty_or_closed(lk);
0269
0270 if (not_empty_and_time_reached(lk)) return false;
0271 if (super::closed(lk)) return true;
0272
0273 const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
0274 super::cond_.wait_until(lk, tpmin);
0275 }
0276 }
0277
0278 template <class T, class Clock, class TimePoint>
0279 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
0280 {
0281 for (;;)
0282 {
0283 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
0284 if (super::closed(lk)) return queue_op_status::closed;
0285 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
0286
0287 super::wait_until_not_empty_or_closed_until(lk, tp);
0288
0289 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
0290 if (super::closed(lk)) return queue_op_status::closed;
0291 if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
0292
0293 const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
0294 super::cond_.wait_until(lk, tpmin);
0295 }
0296 }
0297
0298 template <class T, class Clock, class TimePoint>
0299 template <class Rep, class Period>
0300 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
0301 {
0302 const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
0303 for (;;)
0304 {
0305 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
0306 if (super::closed(lk)) return queue_op_status::closed;
0307 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
0308
0309 super::wait_until_not_empty_or_closed_until(lk, tp);
0310
0311 if (not_empty_and_time_reached(lk)) return queue_op_status::success;
0312 if (super::closed(lk)) return queue_op_status::closed;
0313 if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
0314
0315 const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
0316 super::cond_.wait_until(lk, tpmin);
0317 }
0318 }
0319
0320
0321 template <class T, class Clock, class TimePoint>
0322 T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
0323 {
0324 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
0325 return boost::move(super::data_.pull().data);
0326 #else
0327 return super::data_.pull().data;
0328 #endif
0329 }
0330
0331 template <class T, class Clock, class TimePoint>
0332 T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
0333 {
0334 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
0335 return boost::move(super::data_.pull().data);
0336 #else
0337 return super::data_.pull().data;
0338 #endif
0339 }
0340 template <class T, class Clock, class TimePoint>
0341 T sync_timed_queue<T, Clock, TimePoint>::pull()
0342 {
0343 unique_lock<mutex> lk(super::mtx_);
0344 const bool has_been_closed = wait_to_pull(lk);
0345 if (has_been_closed) super::throw_if_closed(lk);
0346 return pull(lk);
0347 }
0348
0349
0350 template <class T, class Clock, class TimePoint>
0351 void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
0352 {
0353 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
0354 elem = boost::move(super::data_.pull().data);
0355 #else
0356 elem = super::data_.pull().data;
0357 #endif
0358 }
0359
0360 template <class T, class Clock, class TimePoint>
0361 void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
0362 {
0363 #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
0364 elem = boost::move(super::data_.pull().data);
0365 #else
0366 elem = super::data_.pull().data;
0367 #endif
0368 }
0369
0370 template <class T, class Clock, class TimePoint>
0371 void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
0372 {
0373 unique_lock<mutex> lk(super::mtx_);
0374 const bool has_been_closed = wait_to_pull(lk);
0375 if (has_been_closed) super::throw_if_closed(lk);
0376 pull(lk, elem);
0377 }
0378
0379
0380 template <class T, class Clock, class TimePoint>
0381 template <class Duration>
0382 queue_op_status
0383 sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
0384 {
0385 unique_lock<mutex> lk(super::mtx_);
0386 const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
0387 if (rc == queue_op_status::success) pull(lk, elem);
0388 return rc;
0389 }
0390
0391
0392 template <class T, class Clock, class TimePoint>
0393 template <class Rep, class Period>
0394 queue_op_status
0395 sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
0396 {
0397 unique_lock<mutex> lk(super::mtx_);
0398 const queue_op_status rc = wait_to_pull_for(lk, dura);
0399 if (rc == queue_op_status::success) pull(lk, elem);
0400 return rc;
0401 }
0402
0403
0404 template <class T, class Clock, class TimePoint>
0405 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
0406 {
0407 if (not_empty_and_time_reached(lk))
0408 {
0409 pull(lk, elem);
0410 return queue_op_status::success;
0411 }
0412 if (super::closed(lk)) return queue_op_status::closed;
0413 if (super::empty(lk)) return queue_op_status::empty;
0414 return queue_op_status::not_ready;
0415 }
0416 template <class T, class Clock, class TimePoint>
0417 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
0418 {
0419 if (not_empty_and_time_reached(lk))
0420 {
0421 pull(lk, elem);
0422 return queue_op_status::success;
0423 }
0424 if (super::closed(lk)) return queue_op_status::closed;
0425 if (super::empty(lk)) return queue_op_status::empty;
0426 return queue_op_status::not_ready;
0427 }
0428
0429 template <class T, class Clock, class TimePoint>
0430 queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
0431 {
0432 lock_guard<mutex> lk(super::mtx_);
0433 return try_pull(lk, elem);
0434 }
0435
0436
0437 template <class T, class Clock, class TimePoint>
0438 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
0439 {
0440 const bool has_been_closed = wait_to_pull(lk);
0441 if (has_been_closed) return queue_op_status::closed;
0442 pull(lk, elem);
0443 return queue_op_status::success;
0444 }
0445
0446 template <class T, class Clock, class TimePoint>
0447 queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
0448 {
0449 unique_lock<mutex> lk(super::mtx_);
0450 return wait_pull(lk, elem);
0451 }
0452
0453
0454 template <class T, class Clock, class TimePoint>
0455 queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
0456 {
0457 unique_lock<mutex> lk(super::mtx_, try_to_lock);
0458 if (! lk.owns_lock()) return queue_op_status::busy;
0459 return try_pull(lk, elem);
0460 }
0461
0462 }
0463
0464 using concurrent::sync_timed_queue;
0465
0466 }
0467 #include <boost/config/abi_suffix.hpp>
0468
0469 #endif