File indexing completed on 2025-01-30 09:35:34
0001
0002
0003
0004
0005
0006
0007 #ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
0008 #define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
0009
0010 #include <atomic>
0011 #include <cstddef>
0012 #include <cstdint>
0013 #include <memory>
0014 #include <type_traits>
0015 #include <utility>
0016
0017 #include <boost/assert.hpp>
0018 #include <boost/config.hpp>
0019
0020 #include <boost/fiber/detail/config.hpp>
0021 #include <boost/fiber/context.hpp>
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 #if BOOST_COMP_CLANG
0034 #pragma clang diagnostic push
0035 #pragma clang diagnostic ignored "-Wunused-private-field"
0036 #endif
0037
0038 namespace boost {
0039 namespace fibers {
0040 namespace detail {
0041
0042 class context_spmc_queue {
0043 private:
0044 class array {
0045 private:
0046 typedef std::atomic< context * > atomic_type;
0047 typedef atomic_type storage_type;
0048
0049 std::size_t capacity_;
0050 storage_type * storage_;
0051
0052 public:
0053 array( std::size_t capacity) :
0054 capacity_{ capacity },
0055 storage_{ new storage_type[capacity_] } {
0056 for ( std::size_t i = 0; i < capacity_; ++i) {
0057 ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
0058 }
0059 }
0060
0061 ~array() {
0062 for ( std::size_t i = 0; i < capacity_; ++i) {
0063 reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
0064 }
0065 delete [] storage_;
0066 }
0067
0068 std::size_t capacity() const noexcept {
0069 return capacity_;
0070 }
0071
0072 void push( std::size_t bottom, context * ctx) noexcept {
0073 reinterpret_cast< atomic_type * >(
0074 std::addressof( storage_[bottom % capacity_]) )
0075 ->store( ctx, std::memory_order_relaxed);
0076 }
0077
0078 context * pop( std::size_t top) noexcept {
0079 return reinterpret_cast< atomic_type * >(
0080 std::addressof( storage_[top % capacity_]) )
0081 ->load( std::memory_order_relaxed);
0082 }
0083
0084 array * resize( std::size_t bottom, std::size_t top) {
0085 std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
0086 for ( std::size_t i = top; i != bottom; ++i) {
0087 tmp->push( i, pop( i) );
0088 }
0089 return tmp.release();
0090 }
0091 };
0092
0093 std::atomic< std::size_t > top_{ 0 };
0094 std::atomic< std::size_t > bottom_{ 0 };
0095 std::atomic< array * > array_;
0096 std::vector< array * > old_arrays_{};
0097 char padding_[cacheline_length];
0098
0099 public:
0100 context_spmc_queue( std::size_t capacity = 4096) :
0101 array_{ new array{ capacity } } {
0102 old_arrays_.reserve( 32);
0103 }
0104
0105 ~context_spmc_queue() {
0106 for ( array * a : old_arrays_) {
0107 delete a;
0108 }
0109 delete array_.load();
0110 }
0111
0112 context_spmc_queue( context_spmc_queue const&) = delete;
0113 context_spmc_queue & operator=( context_spmc_queue const&) = delete;
0114
0115 bool empty() const noexcept {
0116 std::size_t bottom = bottom_.load( std::memory_order_relaxed);
0117 std::size_t top = top_.load( std::memory_order_relaxed);
0118 return bottom <= top;
0119 }
0120
0121 void push( context * ctx) {
0122 std::size_t bottom = bottom_.load( std::memory_order_relaxed);
0123 std::size_t top = top_.load( std::memory_order_acquire);
0124 array * a = array_.load( std::memory_order_relaxed);
0125 if ( (a->capacity() - 1) < (bottom - top) ) {
0126
0127
0128 array * tmp = a->resize( bottom, top);
0129 old_arrays_.push_back( a);
0130 std::swap( a, tmp);
0131 array_.store( a, std::memory_order_relaxed);
0132 }
0133 a->push( bottom, ctx);
0134 std::atomic_thread_fence( std::memory_order_release);
0135 bottom_.store( bottom + 1, std::memory_order_relaxed);
0136 }
0137
0138 context * pop() {
0139 std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
0140 array * a = array_.load( std::memory_order_relaxed);
0141 bottom_.store( bottom, std::memory_order_relaxed);
0142 std::atomic_thread_fence( std::memory_order_seq_cst);
0143 std::size_t top = top_.load( std::memory_order_relaxed);
0144 context * ctx = nullptr;
0145 if ( top <= bottom) {
0146
0147 ctx = a->pop( bottom);
0148 BOOST_ASSERT( nullptr != ctx);
0149 if ( top == bottom) {
0150
0151 if ( ! top_.compare_exchange_strong( top, top + 1,
0152 std::memory_order_seq_cst,
0153 std::memory_order_relaxed) ) {
0154
0155 ctx = nullptr;
0156 }
0157 bottom_.store( bottom + 1, std::memory_order_relaxed);
0158 }
0159 } else {
0160
0161 bottom_.store( bottom + 1, std::memory_order_relaxed);
0162 }
0163 return ctx;
0164 }
0165
0166 context * steal() {
0167 std::size_t top = top_.load( std::memory_order_acquire);
0168 std::atomic_thread_fence( std::memory_order_seq_cst);
0169 std::size_t bottom = bottom_.load( std::memory_order_acquire);
0170 context * ctx = nullptr;
0171 if ( top < bottom) {
0172
0173 array * a = array_.load( std::memory_order_consume);
0174 ctx = a->pop( top);
0175 BOOST_ASSERT( nullptr != ctx);
0176
0177 if ( ctx->is_context( type::pinned_context) ) {
0178 return nullptr;
0179 }
0180 if ( ! top_.compare_exchange_strong( top, top + 1,
0181 std::memory_order_seq_cst,
0182 std::memory_order_relaxed) ) {
0183
0184 return nullptr;
0185 }
0186 }
0187 return ctx;
0188 }
0189 };
0190
0191 }}}
0192
0193 #if BOOST_COMP_CLANG
0194 #pragma clang diagnostic pop
0195 #endif
0196
0197 #endif