Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 09:35:34

0001 
0002 //          Copyright Oliver Kowalke 2013.
0003 // Distributed under the Boost Software License, Version 1.0.
0004 //    (See accompanying file LICENSE_1_0.txt or copy at
0005 //          http://www.boost.org/LICENSE_1_0.txt)
0006 
0007 #ifndef BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
0008 #define BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H
0009 
0010 #include <atomic>
0011 #include <cstddef>
0012 #include <cstdint>
0013 #include <memory>
0014 #include <type_traits>
0015 #include <utility>
0016 
0017 #include <boost/assert.hpp>
0018 #include <boost/config.hpp>
0019 
0020 #include <boost/fiber/detail/config.hpp>
0021 #include <boost/fiber/context.hpp>
0022 
0023 // David Chase and Yossi Lev. Dynamic circular work-stealing deque.
0024 // In SPAA ’05: Proceedings of the seventeenth annual ACM symposium
0025 // on Parallelism in algorithms and architectures, pages 21–28,
0026 // New York, NY, USA, 2005. ACM.
0027 //
0028 // Nhat Minh Lê, Antoniu Pop, Albert Cohen, and Francesco Zappa Nardelli. 2013.
0029 // Correct and efficient work-stealing for weak memory models.
0030 // In Proceedings of the 18th ACM SIGPLAN symposium on Principles and practice
0031 // of parallel programming (PPoPP '13). ACM, New York, NY, USA, 69-80.
0032 
0033 #if BOOST_COMP_CLANG
0034 #pragma clang diagnostic push
0035 #pragma clang diagnostic ignored "-Wunused-private-field"
0036 #endif
0037 
0038 namespace boost {
0039 namespace fibers {
0040 namespace detail {
0041 
0042 class context_spmc_queue {
0043 private:
0044     class array {
0045     private:
0046         typedef std::atomic< context * >                atomic_type;
0047         typedef atomic_type                             storage_type; 
0048 
0049         std::size_t         capacity_;
0050         storage_type    *   storage_;
0051 
0052     public:
0053         array( std::size_t capacity) :
0054             capacity_{ capacity },
0055             storage_{ new storage_type[capacity_] } {
0056             for ( std::size_t i = 0; i < capacity_; ++i) {
0057                 ::new ( static_cast< void * >( std::addressof( storage_[i]) ) ) atomic_type{ nullptr };
0058             }
0059         }
0060 
0061         ~array() {
0062             for ( std::size_t i = 0; i < capacity_; ++i) {
0063                 reinterpret_cast< atomic_type * >( std::addressof( storage_[i]) )->~atomic_type();
0064             }
0065             delete [] storage_;
0066         }
0067 
0068         std::size_t capacity() const noexcept {
0069             return capacity_;
0070         }
0071 
0072         void push( std::size_t bottom, context * ctx) noexcept {
0073             reinterpret_cast< atomic_type * >(
0074                 std::addressof( storage_[bottom % capacity_]) )
0075                     ->store( ctx, std::memory_order_relaxed);
0076         }
0077 
0078         context * pop( std::size_t top) noexcept {
0079             return reinterpret_cast< atomic_type * >(
0080                 std::addressof( storage_[top % capacity_]) )
0081                     ->load( std::memory_order_relaxed);
0082         }
0083 
0084         array * resize( std::size_t bottom, std::size_t top) {
0085             std::unique_ptr< array > tmp{ new array{ 2 * capacity_ } };
0086             for ( std::size_t i = top; i != bottom; ++i) {
0087                 tmp->push( i, pop( i) );
0088             }
0089             return tmp.release();
0090         }
0091     };
0092 
0093     std::atomic< std::size_t >     top_{ 0 };
0094     std::atomic< std::size_t >     bottom_{ 0 };
0095     std::atomic< array * >         array_;
0096     std::vector< array * >                                  old_arrays_{};
0097     char                                                    padding_[cacheline_length];
0098 
0099 public:
0100     context_spmc_queue( std::size_t capacity = 4096) :
0101         array_{ new array{ capacity } } {
0102         old_arrays_.reserve( 32);
0103     }
0104 
0105     ~context_spmc_queue() {
0106         for ( array * a : old_arrays_) {
0107             delete a;
0108         }
0109         delete array_.load();
0110     }
0111 
0112     context_spmc_queue( context_spmc_queue const&) = delete;
0113     context_spmc_queue & operator=( context_spmc_queue const&) = delete;
0114 
0115     bool empty() const noexcept {
0116         std::size_t bottom = bottom_.load( std::memory_order_relaxed);
0117         std::size_t top = top_.load( std::memory_order_relaxed);
0118         return bottom <= top;
0119     }
0120 
0121     void push( context * ctx) {
0122         std::size_t bottom = bottom_.load( std::memory_order_relaxed);
0123         std::size_t top = top_.load( std::memory_order_acquire);
0124         array * a = array_.load( std::memory_order_relaxed);
0125         if ( (a->capacity() - 1) < (bottom - top) ) {
0126             // queue is full
0127             // resize
0128             array * tmp = a->resize( bottom, top);
0129             old_arrays_.push_back( a);
0130             std::swap( a, tmp);
0131             array_.store( a, std::memory_order_relaxed);
0132         }
0133         a->push( bottom, ctx);
0134         std::atomic_thread_fence( std::memory_order_release);
0135         bottom_.store( bottom + 1, std::memory_order_relaxed);
0136     }
0137 
0138     context * pop() {
0139         std::size_t bottom = bottom_.load( std::memory_order_relaxed) - 1;
0140         array * a = array_.load( std::memory_order_relaxed);
0141         bottom_.store( bottom, std::memory_order_relaxed);
0142         std::atomic_thread_fence( std::memory_order_seq_cst);
0143         std::size_t top = top_.load( std::memory_order_relaxed);
0144         context * ctx = nullptr;
0145         if ( top <= bottom) {
0146             // queue is not empty
0147             ctx = a->pop( bottom);
0148             BOOST_ASSERT( nullptr != ctx);
0149             if ( top == bottom) {
0150                 // last element dequeued
0151                 if ( ! top_.compare_exchange_strong( top, top + 1,
0152                                                      std::memory_order_seq_cst,
0153                                                      std::memory_order_relaxed) ) {
0154                     // lose the race
0155                     ctx = nullptr;
0156                 }
0157                 bottom_.store( bottom + 1, std::memory_order_relaxed);
0158             }
0159         } else {
0160             // queue is empty
0161             bottom_.store( bottom + 1, std::memory_order_relaxed);
0162         }
0163         return ctx;
0164     }
0165 
0166     context * steal() {
0167         std::size_t top = top_.load( std::memory_order_acquire);
0168         std::atomic_thread_fence( std::memory_order_seq_cst);
0169         std::size_t bottom = bottom_.load( std::memory_order_acquire);
0170         context * ctx = nullptr;
0171         if ( top < bottom) {
0172             // queue is not empty
0173             array * a = array_.load( std::memory_order_consume);
0174             ctx = a->pop( top);
0175             BOOST_ASSERT( nullptr != ctx);
0176             // do not steal pinned context (e.g. main-/dispatcher-context)
0177             if ( ctx->is_context( type::pinned_context) ) {
0178                 return nullptr;
0179             }
0180             if ( ! top_.compare_exchange_strong( top, top + 1,
0181                                                  std::memory_order_seq_cst,
0182                                                  std::memory_order_relaxed) ) {
0183                 // lose the race
0184                 return nullptr;
0185             }
0186         }
0187         return ctx;
0188     }
0189 };
0190 
0191 }}}
0192 
0193 #if BOOST_COMP_CLANG
0194 #pragma clang diagnostic pop
0195 #endif
0196 
0197 #endif // BOOST_FIBERS_DETAIL_CONTEXT_SPMC_QUEUE_H