File indexing completed on 2024-11-15 09:05:27
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
0008 #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
0009
0010 #include <atomic>
0011 #include <chrono>
0012 #include <cstddef>
0013 #include <cstdint>
0014 #include <memory>
0015 #include <vector>
0016
0017 #include <boost/config.hpp>
0018
0019 #include <boost/fiber/channel_op_status.hpp>
0020 #include <boost/fiber/context.hpp>
0021 #include <boost/fiber/detail/config.hpp>
0022 #include <boost/fiber/detail/convert.hpp>
0023 #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
0024 #include <boost/fiber/detail/exchange.hpp>
0025 #endif
0026 #include <boost/fiber/detail/spinlock.hpp>
0027 #include <boost/fiber/exceptions.hpp>
0028 #include <boost/fiber/waker.hpp>
0029
0030 #ifdef BOOST_HAS_ABI_HEADERS
0031 # include BOOST_ABI_PREFIX
0032 #endif
0033
0034 namespace boost {
0035 namespace fibers {
0036
0037 template< typename T >
0038 class unbuffered_channel {
0039 public:
0040 using value_type = typename std::remove_reference<T>::type;
0041
0042 private:
0043 struct slot {
0044 value_type value;
0045 waker w;
0046
0047 slot( value_type const& value_, waker && w) :
0048 value{ value_ },
0049 w{ std::move(w) } {
0050 }
0051
0052 slot( value_type && value_, waker && w) :
0053 value{ std::move( value_) },
0054 w{ std::move(w) } {
0055 }
0056 };
0057
0058
0059 std::atomic< slot * > slot_{ nullptr };
0060
0061 std::atomic_bool closed_{ false };
0062 mutable detail::spinlock splk_producers_{};
0063 wait_queue waiting_producers_{};
0064 mutable detail::spinlock splk_consumers_{};
0065 wait_queue waiting_consumers_{};
0066 char pad_[cacheline_length];
0067
0068 bool is_empty_() {
0069 return nullptr == slot_.load( std::memory_order_acquire);
0070 }
0071
0072 bool try_push_( slot * own_slot) {
0073 for (;;) {
0074 slot * s = slot_.load( std::memory_order_acquire);
0075 if ( nullptr == s) {
0076 if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
0077 continue;
0078 }
0079 return true;
0080 }
0081 return false;
0082 }
0083 }
0084
0085 slot * try_pop_() {
0086 slot * nil_slot = nullptr;
0087 for (;;) {
0088 slot * s = slot_.load( std::memory_order_acquire);
0089 if ( nullptr != s) {
0090 if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
0091 continue;}
0092 }
0093 return s;
0094 }
0095 }
0096
0097 public:
0098 unbuffered_channel() = default;
0099
0100 ~unbuffered_channel() {
0101 close();
0102 }
0103
0104 unbuffered_channel( unbuffered_channel const&) = delete;
0105 unbuffered_channel & operator=( unbuffered_channel const&) = delete;
0106
0107 bool is_closed() const noexcept {
0108 return closed_.load( std::memory_order_acquire);
0109 }
0110
0111 void close() noexcept {
0112
0113 if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
0114
0115 slot * s = slot_.load( std::memory_order_acquire);
0116 if ( nullptr != s) {
0117
0118 s->w.wake();
0119 }
0120 detail::spinlock_lock lk1{ splk_producers_ };
0121 waiting_producers_.notify_all();
0122
0123 detail::spinlock_lock lk2{ splk_consumers_ };
0124 waiting_consumers_.notify_all();
0125 }
0126 }
0127
0128 channel_op_status push( value_type const& value) {
0129 context * active_ctx = context::active();
0130 slot s{ value, {} };
0131 for (;;) {
0132 if ( BOOST_UNLIKELY( is_closed() ) ) {
0133 return channel_op_status::closed;
0134 }
0135 s.w = active_ctx->create_waker();
0136 if ( try_push_( & s) ) {
0137 detail::spinlock_lock lk{ splk_consumers_ };
0138 waiting_consumers_.notify_one();
0139
0140 active_ctx->suspend( lk);
0141
0142 if ( BOOST_UNLIKELY( is_closed() ) ) {
0143
0144 return channel_op_status::closed;
0145 }
0146
0147 return channel_op_status::success;
0148 }
0149 detail::spinlock_lock lk{ splk_producers_ };
0150 if ( BOOST_UNLIKELY( is_closed() ) ) {
0151 return channel_op_status::closed;
0152 }
0153 if ( is_empty_() ) {
0154 continue;
0155 }
0156
0157 waiting_producers_.suspend_and_wait( lk, active_ctx);
0158
0159 }
0160 }
0161
0162 channel_op_status push( value_type && value) {
0163 context * active_ctx = context::active();
0164 slot s{ std::move( value), {} };
0165 for (;;) {
0166 if ( BOOST_UNLIKELY( is_closed() ) ) {
0167 return channel_op_status::closed;
0168 }
0169 s.w = active_ctx->create_waker();
0170 if ( try_push_( & s) ) {
0171 detail::spinlock_lock lk{ splk_consumers_ };
0172 waiting_consumers_.notify_one();
0173
0174 active_ctx->suspend( lk);
0175
0176 if ( BOOST_UNLIKELY( is_closed() ) ) {
0177
0178 return channel_op_status::closed;
0179 }
0180
0181 return channel_op_status::success;
0182 }
0183 detail::spinlock_lock lk{ splk_producers_ };
0184 if ( BOOST_UNLIKELY( is_closed() ) ) {
0185 return channel_op_status::closed;
0186 }
0187 if ( is_empty_() ) {
0188 continue;
0189 }
0190 waiting_producers_.suspend_and_wait( lk, active_ctx);
0191
0192 }
0193 }
0194
0195 template< typename Rep, typename Period >
0196 channel_op_status push_wait_for( value_type const& value,
0197 std::chrono::duration< Rep, Period > const& timeout_duration) {
0198 return push_wait_until( value,
0199 std::chrono::steady_clock::now() + timeout_duration);
0200 }
0201
0202 template< typename Rep, typename Period >
0203 channel_op_status push_wait_for( value_type && value,
0204 std::chrono::duration< Rep, Period > const& timeout_duration) {
0205 return push_wait_until( std::forward< value_type >( value),
0206 std::chrono::steady_clock::now() + timeout_duration);
0207 }
0208
0209 template< typename Clock, typename Duration >
0210 channel_op_status push_wait_until( value_type const& value,
0211 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
0212 context * active_ctx = context::active();
0213 slot s{ value, {} };
0214 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
0215 for (;;) {
0216 if ( BOOST_UNLIKELY( is_closed() ) ) {
0217 return channel_op_status::closed;
0218 }
0219 s.w = active_ctx->create_waker();
0220 if ( try_push_( & s) ) {
0221 detail::spinlock_lock lk{ splk_consumers_ };
0222 waiting_consumers_.notify_one();
0223
0224 if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
0225
0226 slot * nil_slot = nullptr, * own_slot = & s;
0227 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
0228
0229 return channel_op_status::timeout;
0230 }
0231
0232 if ( BOOST_UNLIKELY( is_closed() ) ) {
0233
0234 return channel_op_status::closed;
0235 }
0236
0237 return channel_op_status::success;
0238 }
0239 detail::spinlock_lock lk{ splk_producers_ };
0240 if ( BOOST_UNLIKELY( is_closed() ) ) {
0241 return channel_op_status::closed;
0242 }
0243 if ( is_empty_() ) {
0244 continue;
0245 }
0246
0247 if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
0248 {
0249 return channel_op_status::timeout;
0250 }
0251
0252 }
0253 }
0254
0255 template< typename Clock, typename Duration >
0256 channel_op_status push_wait_until( value_type && value,
0257 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
0258 context * active_ctx = context::active();
0259 slot s{ std::move( value), {} };
0260 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
0261 for (;;) {
0262 if ( BOOST_UNLIKELY( is_closed() ) ) {
0263 return channel_op_status::closed;
0264 }
0265 s.w = active_ctx->create_waker();
0266 if ( try_push_( & s) ) {
0267 detail::spinlock_lock lk{ splk_consumers_ };
0268 waiting_consumers_.notify_one();
0269
0270 if ( ! active_ctx->wait_until(timeout_time, lk, waker(s.w))) {
0271
0272 slot * nil_slot = nullptr, * own_slot = & s;
0273 slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
0274
0275 return channel_op_status::timeout;
0276 }
0277
0278 if ( BOOST_UNLIKELY( is_closed() ) ) {
0279
0280 return channel_op_status::closed;
0281 }
0282
0283 return channel_op_status::success;
0284 }
0285 detail::spinlock_lock lk{ splk_producers_ };
0286 if ( BOOST_UNLIKELY( is_closed() ) ) {
0287 return channel_op_status::closed;
0288 }
0289 if ( is_empty_() ) {
0290 continue;
0291 }
0292 if (! waiting_producers_.suspend_and_wait_until( lk, active_ctx, timeout_time))
0293 {
0294 return channel_op_status::timeout;
0295 }
0296
0297 }
0298 }
0299
0300 channel_op_status pop( value_type & value) {
0301 context * active_ctx = context::active();
0302 slot * s = nullptr;
0303 for (;;) {
0304 if ( nullptr != ( s = try_pop_() ) ) {
0305 {
0306 detail::spinlock_lock lk{ splk_producers_ };
0307 waiting_producers_.notify_one();
0308 }
0309 value = std::move( s->value);
0310
0311 s->w.wake();
0312 return channel_op_status::success;
0313 }
0314 detail::spinlock_lock lk{ splk_consumers_ };
0315 if ( BOOST_UNLIKELY( is_closed() ) ) {
0316 return channel_op_status::closed;
0317 }
0318 if ( ! is_empty_() ) {
0319 continue;
0320 }
0321 waiting_consumers_.suspend_and_wait( lk, active_ctx);
0322
0323 }
0324 }
0325
0326 value_type value_pop() {
0327 context * active_ctx = context::active();
0328 slot * s = nullptr;
0329 for (;;) {
0330 if ( nullptr != ( s = try_pop_() ) ) {
0331 {
0332 detail::spinlock_lock lk{ splk_producers_ };
0333 waiting_producers_.notify_one();
0334 }
0335
0336 value_type value = std::move( s->value);
0337
0338 s->w.wake();
0339 return std::move( value);
0340 }
0341 detail::spinlock_lock lk{ splk_consumers_ };
0342 if ( BOOST_UNLIKELY( is_closed() ) ) {
0343 throw fiber_error{
0344 std::make_error_code( std::errc::operation_not_permitted),
0345 "boost fiber: channel is closed" };
0346 }
0347 if ( ! is_empty_() ) {
0348 continue;
0349 }
0350 waiting_consumers_.suspend_and_wait( lk, active_ctx);
0351
0352 }
0353 }
0354
0355 template< typename Rep, typename Period >
0356 channel_op_status pop_wait_for( value_type & value,
0357 std::chrono::duration< Rep, Period > const& timeout_duration) {
0358 return pop_wait_until( value,
0359 std::chrono::steady_clock::now() + timeout_duration);
0360 }
0361
0362 template< typename Clock, typename Duration >
0363 channel_op_status pop_wait_until( value_type & value,
0364 std::chrono::time_point< Clock, Duration > const& timeout_time_) {
0365 context * active_ctx = context::active();
0366 slot * s = nullptr;
0367 std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
0368 for (;;) {
0369 if ( nullptr != ( s = try_pop_() ) ) {
0370 {
0371 detail::spinlock_lock lk{ splk_producers_ };
0372 waiting_producers_.notify_one();
0373 }
0374
0375 value = std::move( s->value);
0376
0377 s->w.wake();
0378 return channel_op_status::success;
0379 }
0380 detail::spinlock_lock lk{ splk_consumers_ };
0381 if ( BOOST_UNLIKELY( is_closed() ) ) {
0382 return channel_op_status::closed;
0383 }
0384 if ( ! is_empty_() ) {
0385 continue;
0386 }
0387 if ( ! waiting_consumers_.suspend_and_wait_until( lk, active_ctx, timeout_time)) {
0388 return channel_op_status::timeout;
0389 }
0390 }
0391 }
0392
0393 class iterator {
0394 private:
0395 typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
0396
0397 unbuffered_channel * chan_{ nullptr };
0398 storage_type storage_;
0399
0400 void increment_( bool initial = false) {
0401 BOOST_ASSERT( nullptr != chan_);
0402 try {
0403 if ( ! initial) {
0404 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
0405 }
0406 ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
0407 } catch ( fiber_error const&) {
0408 chan_ = nullptr;
0409 }
0410 }
0411
0412 public:
0413 using iterator_category = std::input_iterator_tag;
0414 using difference_type = std::ptrdiff_t;
0415 using pointer = value_type *;
0416 using reference = value_type &;
0417
0418 using pointer_t = pointer;
0419 using reference_t = reference;
0420
0421 iterator() = default;
0422
0423 explicit iterator( unbuffered_channel< T > * chan) noexcept :
0424 chan_{ chan } {
0425 increment_( true);
0426 }
0427
0428 iterator( iterator const& other) noexcept :
0429 chan_{ other.chan_ } {
0430 }
0431
0432 iterator & operator=( iterator const& other) noexcept {
0433 if ( this == & other) return * this;
0434 chan_ = other.chan_;
0435 return * this;
0436 }
0437
0438 bool operator==( iterator const& other) const noexcept {
0439 return other.chan_ == chan_;
0440 }
0441
0442 bool operator!=( iterator const& other) const noexcept {
0443 return other.chan_ != chan_;
0444 }
0445
0446 iterator & operator++() {
0447 reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
0448 increment_();
0449 return * this;
0450 }
0451
0452 const iterator operator++( int) = delete;
0453
0454 reference_t operator*() noexcept {
0455 return * reinterpret_cast< value_type * >( std::addressof( storage_) );
0456 }
0457
0458 pointer_t operator->() noexcept {
0459 return reinterpret_cast< value_type * >( std::addressof( storage_) );
0460 }
0461 };
0462
0463 friend class iterator;
0464 };
0465
0466 template< typename T >
0467 typename unbuffered_channel< T >::iterator
0468 begin( unbuffered_channel< T > & chan) {
0469 return typename unbuffered_channel< T >::iterator( & chan);
0470 }
0471
0472 template< typename T >
0473 typename unbuffered_channel< T >::iterator
0474 end( unbuffered_channel< T > &) {
0475 return typename unbuffered_channel< T >::iterator();
0476 }
0477
0478 }}
0479
0480 #ifdef BOOST_HAS_ABI_HEADERS
0481 # include BOOST_ABI_SUFFIX
0482 #endif
0483
0484 #endif