Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2024-11-15 09:05:27

0001 
0002 //          Copyright Oliver Kowalke 2016.
0003 // Distributed under the Boost Software License, Version 1.0.
0004 //    (See accompanying file LICENSE_1_0.txt or copy at
0005 //          http://www.boost.org/LICENSE_1_0.txt)
0006 
0007 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
0008 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
0009 
0010 #include <atomic>
0011 #include <chrono>
0012 #include <cstddef>
0013 #include <cstdint>
0014 #include <memory>
0015 #include <vector>
0016 
0017 #include <boost/config.hpp>
0018 
0019 #include <boost/fiber/channel_op_status.hpp>
0020 #include <boost/fiber/context.hpp>
0021 #include <boost/fiber/detail/config.hpp>
0022 #include <boost/fiber/detail/convert.hpp>
0023 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
0024 #include <boost/fiber/detail/exchange.hpp>
0025 #endif
0026 #include <boost/fiber/detail/spinlock.hpp>
0027 #include <boost/fiber/exceptions.hpp>
0028 #include <boost/fiber/waker.hpp>
0029 
0030 #ifdef BOOST_HAS_ABI_HEADERS
0031 #  include BOOST_ABI_PREFIX
0032 #endif
0033 
0034 namespace boost {
0035 namespace fibers {
0036 
0037 template< typename T >
0038 class unbuffered_channel {
0039 public:
0040     using value_type = typename std::remove_reference<T>::type;
0041 
0042 private:
0043     struct slot {
0044         value_type  value;
0045         waker       w;
0046 
0047         slot( value_type const& value_, waker && w) :
0048             value{ value_ },
0049             w{ std::move(w) } {
0050         }
0051 
0052         slot( value_type && value_, waker && w) :
0053             value{ std::move( value_) },
0054             w{ std::move(w) } {
0055         }
0056     };
0057 
0058     // shared cacheline
0059     std::atomic< slot * >       slot_{ nullptr };
0060     // shared cacheline
0061     std::atomic_bool            closed_{ false };
0062     mutable detail::spinlock    splk_producers_{};
0063     wait_queue                  waiting_producers_{};
0064     mutable detail::spinlock    splk_consumers_{};
0065     wait_queue                  waiting_consumers_{};
0066     char                        pad_[cacheline_length];
0067 
0068     bool is_empty_() {
0069         return nullptr == slot_.load( std::memory_order_acquire);
0070     }
0071 
0072     bool try_push_( slot * own_slot) {
0073         for (;;) {
0074             slot * s = slot_.load( std::memory_order_acquire);
0075             if ( nullptr == s) {
0076                 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
0077                     continue;
0078                 }
0079                 return true;
0080             }
0081             return false;
0082         }
0083     }
0084 
0085     slot * try_pop_() {
0086         slot * nil_slot = nullptr;
0087         for (;;) {
0088             slot * s = slot_.load( std::memory_order_acquire);
0089             if ( nullptr != s) {
0090                 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
0091                     continue;}
0092             }
0093             return s;
0094         }
0095     }
0096 
0097 public:
0098     unbuffered_channel() = default;
0099 
0100     ~unbuffered_channel() {
0101         close();
0102     }
0103 
0104     unbuffered_channel( unbuffered_channel const&) = delete;
0105     unbuffered_channel & operator=( unbuffered_channel const&) = delete;
0106 
0107     bool is_closed() const noexcept {
0108         return closed_.load( std::memory_order_acquire);
0109     }
0110 
0111     void close() noexcept {
0112         // set flag
0113         if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
0114             // notify current waiting  
0115             slot * s = slot_.load( std::memory_order_acquire);
0116             if ( nullptr != s) {
0117                 // notify context
0118                 s->w.wake();
0119             }
0120             detail::spinlock_lock lk1{ splk_producers_ };
0121             waiting_producers_.notify_all();
0122 
0123             detail::spinlock_lock lk2{ splk_consumers_ };
0124             waiting_consumers_.notify_all();
0125         }
0126     }
0127 
0128     channel_op_status push( value_type const& value) {
0129         context * active_ctx = context::active();
0130         slot s{ value, {} };
0131         for (;;) {
0132             if ( BOOST_UNLIKELY( is_closed() ) ) {
0133                 return channel_op_status::closed;
0134             }
0135             s.w = active_ctx->create_waker();
0136             if ( try_push_( & s) ) {
0137                 detail::spinlock_lock lk{ splk_consumers_ };
0138                 waiting_consumers_.notify_one();
0139                 // suspend till value has been consumed
0140                 active_ctx->suspend( lk);
0141                 // resumed
0142                 if ( BOOST_UNLIKELY( is_closed() ) ) {
0143                     // channel was closed before value was consumed
0144                     return channel_op_status::closed;
0145                 }
0146                 // value has been consumed
0147                 return channel_op_status::success;
0148             }
0149             detail::spinlock_lock lk{ splk_producers_ };
0150             if ( BOOST_UNLIKELY( is_closed() ) ) {
0151                 return channel_op_status::closed;
0152             }
0153             if ( is_empty_() ) {
0154                 continue;
0155             }
0156 
0157             waiting_producers_.suspend_and_wait( lk, active_ctx);
0158             // resumed, slot mabye free
0159         }
0160     }
0161 
0162     channel_op_status push( value_type && value) {
0163         context * active_ctx = context::active();
0164         slot s{ std::move( value), {} };
0165         for (;;) {
0166             if ( BOOST_UNLIKELY( is_closed() ) ) {
0167                 return channel_op_status::closed;
0168             }
0169             s.w = active_ctx->create_waker();
0170             if ( try_push_( & s) ) {
0171                 detail::spinlock_lock lk{ splk_consumers_ };
0172                 waiting_consumers_.notify_one();
0173                 // suspend till value has been consumed
0174                 active_ctx->suspend( lk);
0175                 // resumed
0176                 if ( BOOST_UNLIKELY( is_closed() ) ) {
0177                     // channel was closed before value was consumed
0178                     return channel_op_status::closed;
0179                 }
0180                 // value has been consumed
0181                 return channel_op_status::success;
0182             }
0183             detail::spinlock_lock lk{ splk_producers_ };
0184             if ( BOOST_UNLIKELY( is_closed() ) ) {
0185                 return channel_op_status::closed;
0186             }
0187             if ( is_empty_() ) {
0188                 continue;
0189             }
0190             waiting_producers_.suspend_and_wait( lk, active_ctx);
0191             // resumed, slot mabye free
0192         }
0193     }
0194 
0195     template< typename Rep, typename Period >
0196     channel_op_status push_wait_for( value_type const& value,
0197                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
0198         return push_wait_until( value,
0199                                 std::chrono::steady_clock::now() + timeout_duration);
0200     }
0201 
0202     template< typename Rep, typename Period >
0203     channel_op_status push_wait_for( value_type && value,
0204                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
0205         return push_wait_until( std::forward< value_type >( value),
0206                                 std::chrono::steady_clock::now() + timeout_duration);
0207     }
0208 
0209     template< typename Clock, typename Duration >
0210     channel_op_status push_wait_until( value_type const& value,
0211                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
0212         context * active_ctx = context::active();
0213         slot s{ value, {} };
0214         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
0215         for (;;) {
0216             if ( BOOST_UNLIKELY( is_closed() ) ) {
0217                 return channel_op_status::closed;
0218             }
0219             s.w = active_ctx->create_waker();
0220             if ( try_push_( & s) ) {
0221                 detail::spinlock_lock lk{ splk_consumers_ };
0222                 waiting_consumers_.notify_one();
0223                 // suspend this producer
0224                 if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
0225                     // clear slot
0226                     slot * nil_slot = nullptr, * own_slot = & s;
0227                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
0228                     // resumed, value has not been consumed
0229                     return channel_op_status::timeout;
0230                 }
0231                 // resumed
0232                 if ( BOOST_UNLIKELY( is_closed() ) ) {
0233                     // channel was closed before value was consumed
0234                     return channel_op_status::closed;
0235                 }
0236                 // value has been consumed
0237                 return channel_op_status::success;
0238             }
0239             detail::spinlock_lock lk{ splk_producers_ };
0240             if ( BOOST_UNLIKELY( is_closed() ) ) {
0241                 return channel_op_status::closed;
0242             }
0243             if ( is_empty_() ) {
0244                 continue;
0245             }
0246 
0247             if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
0248             {
0249                 return channel_op_status::timeout;
0250             }
0251             // resumed, slot maybe free
0252         }
0253     }
0254 
0255     template< typename Clock, typename Duration >
0256     channel_op_status push_wait_until( value_type && value,
0257                                        std::chrono::time_point< Clock, Duration > const& timeout_time_) {
0258         context * active_ctx = context::active();
0259         slot s{ std::move( value), {} };
0260         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
0261         for (;;) {
0262             if ( BOOST_UNLIKELY( is_closed() ) ) {
0263                 return channel_op_status::closed;
0264             }
0265             s.w = active_ctx->create_waker();
0266             if ( try_push_( & s) ) {
0267                 detail::spinlock_lock lk{ splk_consumers_ };
0268                 waiting_consumers_.notify_one();
0269                 // suspend this producer
0270                 if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
0271                     // clear slot
0272                     slot * nil_slot = nullptr, * own_slot = & s;
0273                     slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
0274                     // resumed, value has not been consumed
0275                     return channel_op_status::timeout;
0276                 }
0277                 // resumed
0278                 if ( BOOST_UNLIKELY( is_closed() ) ) {
0279                     // channel was closed before value was consumed
0280                     return channel_op_status::closed;
0281                 }
0282                 // value has been consumed
0283                 return channel_op_status::success;
0284             }
0285             detail::spinlock_lock lk{ splk_producers_ };
0286             if ( BOOST_UNLIKELY( is_closed() ) ) {
0287                 return channel_op_status::closed;
0288             }
0289             if ( is_empty_() ) {
0290                 continue;
0291             }
0292             if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
0293             {
0294                 return channel_op_status::timeout;
0295             }
0296             // resumed, slot maybe free
0297         }
0298     }
0299 
0300     channel_op_status pop( value_type & value) {
0301         context * active_ctx = context::active();
0302         slot * s = nullptr;
0303         for (;;) {
0304             if ( nullptr != ( s = try_pop_() ) ) {
0305                 {
0306                     detail::spinlock_lock lk{ splk_producers_ };
0307                     waiting_producers_.notify_one();
0308                 }
0309                 value = std::move( s->value);
0310                 // notify context
0311                 s->w.wake();
0312                 return channel_op_status::success;
0313             }
0314             detail::spinlock_lock lk{ splk_consumers_ };
0315             if ( BOOST_UNLIKELY( is_closed() ) ) {
0316                 return channel_op_status::closed;
0317             }
0318             if ( ! is_empty_() ) {
0319                 continue;
0320             }
0321             waiting_consumers_.suspend_and_wait( lk, active_ctx);
0322             // resumed, slot mabye set
0323         }
0324     }
0325 
0326     value_type value_pop() {
0327         context * active_ctx = context::active();
0328         slot * s = nullptr;
0329         for (;;) {
0330             if ( nullptr != ( s = try_pop_() ) ) {
0331                 {
0332                     detail::spinlock_lock lk{ splk_producers_ };
0333                     waiting_producers_.notify_one();
0334                 }
0335                 // consume value
0336                 value_type value = std::move( s->value);
0337                 // notify context
0338                 s->w.wake();
0339                 return std::move( value);
0340             }
0341             detail::spinlock_lock lk{ splk_consumers_ };
0342             if ( BOOST_UNLIKELY( is_closed() ) ) {
0343                 throw fiber_error{
0344                         std::make_error_code( std::errc::operation_not_permitted),
0345                         "boost fiber: channel is closed" };
0346             }
0347             if ( ! is_empty_() ) {
0348                 continue;
0349             }
0350             waiting_consumers_.suspend_and_wait( lk, active_ctx);
0351             // resumed, slot mabye set
0352         }
0353     }
0354 
0355     template< typename Rep, typename Period >
0356     channel_op_status pop_wait_for( value_type & value,
0357                                     std::chrono::duration< Rep, Period > const& timeout_duration) {
0358         return pop_wait_until( value,
0359                                std::chrono::steady_clock::now() + timeout_duration);
0360     }
0361 
0362     template< typename Clock, typename Duration >
0363     channel_op_status pop_wait_until( value_type & value,
0364                                       std::chrono::time_point< Clock, Duration > const& timeout_time_) {
0365         context * active_ctx = context::active();
0366         slot * s = nullptr;
0367         std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
0368         for (;;) {
0369             if ( nullptr != ( s = try_pop_() ) ) {
0370                 {
0371                     detail::spinlock_lock lk{ splk_producers_ };
0372                     waiting_producers_.notify_one();
0373                 }
0374                 // consume value
0375                 value = std::move( s->value);
0376                 // notify context
0377                 s->w.wake();
0378                 return channel_op_status::success;
0379             }
0380             detail::spinlock_lock lk{ splk_consumers_ };
0381             if ( BOOST_UNLIKELY( is_closed() ) ) {
0382                 return channel_op_status::closed;
0383             }
0384             if ( ! is_empty_() ) {
0385                 continue;
0386             }
0387             if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
0388                 return channel_op_status::timeout;
0389             }
0390         }
0391     }
0392 
0393     class iterator {
0394     private:
0395         typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type  storage_type;
0396 
0397         unbuffered_channel  *   chan_{ nullptr };
0398         storage_type            storage_;
0399 
0400         void increment_( bool initial = false) {
0401             BOOST_ASSERT( nullptr != chan_);
0402             try {
0403                 if ( ! initial) {
0404                     reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
0405                 }
0406                 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
0407             } catch ( fiber_error const&) {
0408                 chan_ = nullptr;
0409             }
0410         }
0411 
0412     public:
0413         using iterator_category = std::input_iterator_tag;
0414         using difference_type = std::ptrdiff_t;
0415         using pointer = value_type *;
0416         using reference = value_type &;
0417 
0418         using pointer_t = pointer;
0419         using reference_t = reference;
0420 
0421         iterator() = default;
0422 
0423         explicit iterator( unbuffered_channel< T > * chan) noexcept :
0424             chan_{ chan } {
0425             increment_( true);
0426         }
0427 
0428         iterator( iterator const& other) noexcept :
0429             chan_{ other.chan_ } {
0430         }
0431 
0432         iterator & operator=( iterator const& other) noexcept {
0433             if ( this == & other) return * this;
0434             chan_ = other.chan_;
0435             return * this;
0436         }
0437 
0438         bool operator==( iterator const& other) const noexcept {
0439             return other.chan_ == chan_;
0440         }
0441 
0442         bool operator!=( iterator const& other) const noexcept {
0443             return other.chan_ != chan_;
0444         }
0445 
0446         iterator & operator++() {
0447             reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
0448             increment_();
0449             return * this;
0450         }
0451 
0452         const iterator operator++( int) = delete;
0453 
0454         reference_t operator*() noexcept {
0455             return * reinterpret_cast< value_type * >( std::addressof( storage_) );
0456         }
0457 
0458         pointer_t operator->() noexcept {
0459             return reinterpret_cast< value_type * >( std::addressof( storage_) );
0460         }
0461     };
0462 
0463     friend class iterator;
0464 };
0465 
0466 template< typename T >
0467 typename unbuffered_channel< T >::iterator
0468 begin( unbuffered_channel< T > & chan) {
0469     return typename unbuffered_channel< T >::iterator( & chan);
0470 }
0471 
0472 template< typename T >
0473 typename unbuffered_channel< T >::iterator
0474 end( unbuffered_channel< T > &) {
0475     return typename unbuffered_channel< T >::iterator();
0476 }
0477 
0478 }}
0479 
0480 #ifdef BOOST_HAS_ABI_HEADERS
0481 #  include BOOST_ABI_SUFFIX
0482 #endif
0483 
0484 #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H