Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:01:08

0001 // Copyright (C) 2014 Ian Forbed
0002 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
0003 //
0004 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
0005 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0006 //
0007 
0008 #ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
0009 #define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP
0010 
0011 #include <boost/thread/detail/config.hpp>
0012 
0013 #include <boost/thread/concurrent_queues/sync_priority_queue.hpp>
0014 #include <boost/chrono/duration.hpp>
0015 #include <boost/chrono/time_point.hpp>
0016 #include <boost/chrono/system_clocks.hpp>
0017 #include <boost/chrono/chrono_io.hpp>
0018 
0019 #include <algorithm> // std::min
0020 
0021 #include <boost/config/abi_prefix.hpp>
0022 
0023 namespace boost
0024 {
0025 namespace concurrent
0026 {
0027 namespace detail
0028 {
0029   // fixme: shouldn't the timepoint be configurable
0030   template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
0031   struct scheduled_type
0032   {
0033     typedef T value_type;
0034     typedef Clock clock;
0035     typedef TimePoint time_point;
0036     T data;
0037     time_point time;
0038 
0039     BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)
0040 
0041     scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}
0042     scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}
0043 
0044     scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}
0045     scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {
0046       data = other.data;
0047       time = other.time;
0048       return *this;
0049     }
0050 
0051     scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}
0052     scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {
0053       data = boost::move(other.data);
0054       time = other.time;
0055       return *this;
0056     }
0057 
0058     bool operator <(const scheduled_type & other) const
0059     {
0060       return this->time > other.time;
0061     }
0062   }; //end struct
0063 
0064   template <class Duration>
0065   chrono::time_point<chrono::steady_clock,Duration>
0066   limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
0067   {
0068     // Clock == chrono::steady_clock
0069     return tp;
0070   }
0071 
0072   template <class Clock, class Duration>
0073   chrono::time_point<Clock,Duration>
0074   limit_timepoint(chrono::time_point<Clock,Duration> const& tp)
0075   {
0076     // Clock != chrono::steady_clock
0077     // The system time may jump while wait_until() is waiting. To compensate for this and time out near
0078     // the correct time, we limit how long wait_until() can wait before going around the loop again.
0079     const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));
0080     return (std::min)(tp, tpmax);
0081   }
0082 
0083   template <class Duration>
0084   chrono::steady_clock::time_point
0085   convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)
0086   {
0087     // Clock == chrono::steady_clock
0088     return chrono::time_point_cast<chrono::steady_clock::duration>(tp);
0089   }
0090 
0091   template <class Clock, class Duration>
0092   chrono::steady_clock::time_point
0093   convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)
0094   {
0095     // Clock != chrono::steady_clock
0096     // The system time may jump while wait_until() is waiting. To compensate for this and time out near
0097     // the correct time, we limit how long wait_until() can wait before going around the loop again.
0098     const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));
0099     const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));
0100     return chrono::steady_clock::now() + (std::min)(dura, duramax);
0101   }
0102 
0103 } //end detail namespace
0104 
0105   template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>
0106   class sync_timed_queue
0107     : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >
0108   {
0109     typedef detail::scheduled_type<T, Clock, TimePoint> stype;
0110     typedef sync_priority_queue<stype> super;
0111   public:
0112     typedef T value_type;
0113     typedef Clock clock;
0114     typedef typename clock::duration duration;
0115     typedef typename clock::time_point time_point;
0116     typedef typename super::underlying_queue_type underlying_queue_type;
0117     typedef typename super::size_type size_type;
0118     typedef typename super::op_status op_status;
0119 
0120     sync_timed_queue() : super() {};
0121     ~sync_timed_queue() {}
0122 
0123     using super::size;
0124     using super::empty;
0125     using super::full;
0126     using super::close;
0127     using super::closed;
0128 
0129     T pull();
0130     void pull(T& elem);
0131 
0132     template <class Duration>
0133     queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);
0134     template <class Rep, class Period>
0135     queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);
0136 
0137     queue_op_status try_pull(T& elem);
0138     queue_op_status wait_pull(T& elem);
0139     queue_op_status nonblocking_pull(T& elem);
0140 
0141     template <class Duration>
0142     void push(const T& elem, chrono::time_point<clock,Duration> const& tp);
0143     template <class Rep, class Period>
0144     void push(const T& elem, chrono::duration<Rep,Period> const& dura);
0145 
0146     template <class Duration>
0147     void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
0148     template <class Rep, class Period>
0149     void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
0150 
0151     template <class Duration>
0152     queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);
0153     template <class Rep, class Period>
0154     queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);
0155 
0156     template <class Duration>
0157     queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);
0158     template <class Rep, class Period>
0159     queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);
0160 
0161   private:
0162     inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;
0163     inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;
0164 
0165     bool wait_to_pull(unique_lock<mutex>&);
0166     queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);
0167     template <class Rep, class Period>
0168     queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);
0169 
0170     T pull(unique_lock<mutex>&);
0171     T pull(lock_guard<mutex>&);
0172 
0173     void pull(unique_lock<mutex>&, T& elem);
0174     void pull(lock_guard<mutex>&, T& elem);
0175 
0176     queue_op_status try_pull(unique_lock<mutex>&, T& elem);
0177     queue_op_status try_pull(lock_guard<mutex>&, T& elem);
0178 
0179     queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);
0180 
0181     sync_timed_queue(const sync_timed_queue&);
0182     sync_timed_queue& operator=(const sync_timed_queue&);
0183     sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));
0184     sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));
0185   }; //end class
0186 
0187 
0188   template <class T, class Clock, class TimePoint>
0189   template <class Duration>
0190   void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)
0191   {
0192     super::push(stype(elem,tp));
0193   }
0194 
0195   template <class T, class Clock, class TimePoint>
0196   template <class Rep, class Period>
0197   void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)
0198   {
0199     push(elem, clock::now() + dura);
0200   }
0201 
0202   template <class T, class Clock, class TimePoint>
0203   template <class Duration>
0204   void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
0205   {
0206     super::push(stype(boost::move(elem),tp));
0207   }
0208 
0209   template <class T, class Clock, class TimePoint>
0210   template <class Rep, class Period>
0211   void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
0212   {
0213     push(boost::move(elem), clock::now() + dura);
0214   }
0215 
0216 
0217 
0218   template <class T, class Clock, class TimePoint>
0219   template <class Duration>
0220   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)
0221   {
0222     return super::try_push(stype(elem,tp));
0223   }
0224 
0225   template <class T, class Clock, class TimePoint>
0226   template <class Rep, class Period>
0227   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)
0228   {
0229     return try_push(elem,clock::now() + dura);
0230   }
0231 
0232   template <class T, class Clock, class TimePoint>
0233   template <class Duration>
0234   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)
0235   {
0236     return super::try_push(stype(boost::move(elem), tp));
0237   }
0238 
0239   template <class T, class Clock, class TimePoint>
0240   template <class Rep, class Period>
0241   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)
0242   {
0243     return try_push(boost::move(elem), clock::now() + dura);
0244   }
0245 
0246   ///////////////////////////
0247   template <class T, class Clock, class TimePoint>
0248   bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const
0249   {
0250     return ! super::empty(lk) && clock::now() >= super::data_.top().time;
0251   }
0252 
0253   template <class T, class Clock, class TimePoint>
0254   bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const
0255   {
0256     return ! super::empty(lk) && clock::now() >= super::data_.top().time;
0257   }
0258 
0259   ///////////////////////////
0260   template <class T, class Clock, class TimePoint>
0261   bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)
0262   {
0263     for (;;)
0264     {
0265       if (not_empty_and_time_reached(lk)) return false; // success
0266       if (super::closed(lk)) return true; // closed
0267 
0268       super::wait_until_not_empty_or_closed(lk);
0269 
0270       if (not_empty_and_time_reached(lk)) return false; // success
0271       if (super::closed(lk)) return true; // closed
0272 
0273       const time_point tpmin(detail::limit_timepoint(super::data_.top().time));
0274       super::cond_.wait_until(lk, tpmin);
0275     }
0276   }
0277 
0278   template <class T, class Clock, class TimePoint>
0279   queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)
0280   {
0281     for (;;)
0282     {
0283       if (not_empty_and_time_reached(lk)) return queue_op_status::success;
0284       if (super::closed(lk)) return queue_op_status::closed;
0285       if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
0286 
0287       super::wait_until_not_empty_or_closed_until(lk, tp);
0288 
0289       if (not_empty_and_time_reached(lk)) return queue_op_status::success;
0290       if (super::closed(lk)) return queue_op_status::closed;
0291       if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
0292 
0293       const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));
0294       super::cond_.wait_until(lk, tpmin);
0295     }
0296   }
0297 
0298   template <class T, class Clock, class TimePoint>
0299   template <class Rep, class Period>
0300   queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)
0301   {
0302     const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));
0303     for (;;)
0304     {
0305       if (not_empty_and_time_reached(lk)) return queue_op_status::success;
0306       if (super::closed(lk)) return queue_op_status::closed;
0307       if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
0308 
0309       super::wait_until_not_empty_or_closed_until(lk, tp);
0310 
0311       if (not_empty_and_time_reached(lk)) return queue_op_status::success;
0312       if (super::closed(lk)) return queue_op_status::closed;
0313       if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;
0314 
0315       const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));
0316       super::cond_.wait_until(lk, tpmin);
0317     }
0318   }
0319 
0320   ///////////////////////////
0321   template <class T, class Clock, class TimePoint>
0322   T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)
0323   {
0324 #if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES
0325     return boost::move(super::data_.pull().data);
0326 #else
0327     return super::data_.pull().data;
0328 #endif
0329   }
0330 
0331   template <class T, class Clock, class TimePoint>
0332   T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)
0333   {
0334 #if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES
0335     return boost::move(super::data_.pull().data);
0336 #else
0337     return super::data_.pull().data;
0338 #endif
0339   }
0340   template <class T, class Clock, class TimePoint>
0341   T sync_timed_queue<T, Clock, TimePoint>::pull()
0342   {
0343     unique_lock<mutex> lk(super::mtx_);
0344     const bool has_been_closed = wait_to_pull(lk);
0345     if (has_been_closed) super::throw_if_closed(lk);
0346     return pull(lk);
0347   }
0348 
0349   ///////////////////////////
0350   template <class T, class Clock, class TimePoint>
0351   void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)
0352   {
0353 #if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES
0354     elem = boost::move(super::data_.pull().data);
0355 #else
0356     elem = super::data_.pull().data;
0357 #endif
0358   }
0359 
0360   template <class T, class Clock, class TimePoint>
0361   void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)
0362   {
0363 #if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES
0364     elem = boost::move(super::data_.pull().data);
0365 #else
0366     elem = super::data_.pull().data;
0367 #endif
0368   }
0369 
0370   template <class T, class Clock, class TimePoint>
0371   void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)
0372   {
0373     unique_lock<mutex> lk(super::mtx_);
0374     const bool has_been_closed = wait_to_pull(lk);
0375     if (has_been_closed) super::throw_if_closed(lk);
0376     pull(lk, elem);
0377   }
0378 
0379   //////////////////////
0380   template <class T, class Clock, class TimePoint>
0381   template <class Duration>
0382   queue_op_status
0383   sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)
0384   {
0385     unique_lock<mutex> lk(super::mtx_);
0386     const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));
0387     if (rc == queue_op_status::success) pull(lk, elem);
0388     return rc;
0389   }
0390 
0391   //////////////////////
0392   template <class T, class Clock, class TimePoint>
0393   template <class Rep, class Period>
0394   queue_op_status
0395   sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)
0396   {
0397     unique_lock<mutex> lk(super::mtx_);
0398     const queue_op_status rc = wait_to_pull_for(lk, dura);
0399     if (rc == queue_op_status::success) pull(lk, elem);
0400     return rc;
0401   }
0402 
0403   ///////////////////////////
0404   template <class T, class Clock, class TimePoint>
0405   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)
0406   {
0407     if (not_empty_and_time_reached(lk))
0408     {
0409       pull(lk, elem);
0410       return queue_op_status::success;
0411     }
0412     if (super::closed(lk)) return queue_op_status::closed;
0413     if (super::empty(lk)) return queue_op_status::empty;
0414     return queue_op_status::not_ready;
0415   }
0416   template <class T, class Clock, class TimePoint>
0417   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)
0418   {
0419     if (not_empty_and_time_reached(lk))
0420     {
0421       pull(lk, elem);
0422       return queue_op_status::success;
0423     }
0424     if (super::closed(lk)) return queue_op_status::closed;
0425     if (super::empty(lk)) return queue_op_status::empty;
0426     return queue_op_status::not_ready;
0427   }
0428 
0429   template <class T, class Clock, class TimePoint>
0430   queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)
0431   {
0432     lock_guard<mutex> lk(super::mtx_);
0433     return try_pull(lk, elem);
0434   }
0435 
0436   ///////////////////////////
0437   template <class T, class Clock, class TimePoint>
0438   queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)
0439   {
0440     const bool has_been_closed = wait_to_pull(lk);
0441     if (has_been_closed) return queue_op_status::closed;
0442     pull(lk, elem);
0443     return queue_op_status::success;
0444   }
0445 
0446   template <class T, class Clock, class TimePoint>
0447   queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)
0448   {
0449     unique_lock<mutex> lk(super::mtx_);
0450     return wait_pull(lk, elem);
0451   }
0452 
0453   ///////////////////////////
0454   template <class T, class Clock, class TimePoint>
0455   queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)
0456   {
0457     unique_lock<mutex> lk(super::mtx_, try_to_lock);
0458     if (! lk.owns_lock()) return queue_op_status::busy;
0459     return try_pull(lk, elem);
0460   }
0461 
0462 } //end concurrent namespace
0463 
0464 using concurrent::sync_timed_queue;
0465 
0466 } //end boost namespace
0467 #include <boost/config/abi_suffix.hpp>
0468 
0469 #endif