Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-15 08:39:14

0001 //  lock-free single-producer/single-consumer ringbuffer
0002 //  this algorithm is implemented in various projects (linux kernel)
0003 //
0004 //  Copyright (C) 2009-2013 Tim Blechmann
0005 //
0006 //  Distributed under the Boost Software License, Version 1.0. (See
0007 //  accompanying file LICENSE_1_0.txt or copy at
0008 //  http://www.boost.org/LICENSE_1_0.txt)
0009 
0010 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
0011 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
0012 
0013 #include <boost/config.hpp>
0014 #ifdef BOOST_HAS_PRAGMA_ONCE
0015 #    pragma once
0016 #endif
0017 
0018 #include <algorithm>
0019 #include <memory>
0020 #include <type_traits>
0021 
0022 #include <boost/aligned_storage.hpp>
0023 #include <boost/assert.hpp>
0024 #include <boost/core/allocator_access.hpp>
0025 #include <boost/core/span.hpp>
0026 #include <boost/parameter/optional.hpp>
0027 #include <boost/parameter/parameters.hpp>
0028 #include <boost/static_assert.hpp>
0029 
0030 #include <boost/lockfree/detail/atomic.hpp>
0031 #include <boost/lockfree/detail/copy_payload.hpp>
0032 #include <boost/lockfree/detail/parameter.hpp>
0033 #include <boost/lockfree/detail/prefix.hpp>
0034 #include <boost/lockfree/detail/uses_optional.hpp>
0035 #include <boost/lockfree/lockfree_forward.hpp>
0036 
0037 namespace boost { namespace lockfree {
0038 namespace detail {
0039 
0040 template < typename T >
0041 class ringbuffer_base
0042 {
0043 #ifndef BOOST_DOXYGEN_INVOKED
0044 protected:
0045     typedef std::size_t  size_t;
0046     static constexpr int padding_size = cacheline_bytes - sizeof( size_t );
0047     atomic< size_t >     write_index_;
0048     char                 padding1[ padding_size ]; /* force read_index and write_index to different cache lines */
0049     atomic< size_t >     read_index_;
0050 
0051 protected:
0052     ringbuffer_base( void ) :
0053         write_index_( 0 ),
0054         read_index_( 0 )
0055     {}
0056 
0057     static size_t next_index( size_t arg, size_t max_size )
0058     {
0059         size_t ret = arg + 1;
0060         while ( BOOST_UNLIKELY( ret >= max_size ) )
0061             ret -= max_size;
0062         return ret;
0063     }
0064 
0065     static size_t read_available( size_t write_index, size_t read_index, size_t max_size )
0066     {
0067         if ( write_index >= read_index )
0068             return write_index - read_index;
0069 
0070         const size_t ret = write_index + max_size - read_index;
0071         return ret;
0072     }
0073 
0074     static size_t write_available( size_t write_index, size_t read_index, size_t max_size )
0075     {
0076         size_t ret = read_index - write_index - 1;
0077         if ( write_index >= read_index )
0078             ret += max_size;
0079         return ret;
0080     }
0081 
0082     size_t read_available( size_t max_size ) const
0083     {
0084         size_t       write_index = write_index_.load( memory_order_acquire );
0085         const size_t read_index  = read_index_.load( memory_order_relaxed );
0086         return read_available( write_index, read_index, max_size );
0087     }
0088 
0089     size_t write_available( size_t max_size ) const
0090     {
0091         size_t       write_index = write_index_.load( memory_order_relaxed );
0092         const size_t read_index  = read_index_.load( memory_order_acquire );
0093         return write_available( write_index, read_index, max_size );
0094     }
0095 
0096 
0097     bool push( const T& t, T* buffer, size_t max_size )
0098     {
0099         const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
0100         const size_t next        = next_index( write_index, max_size );
0101 
0102         if ( next == read_index_.load( memory_order_acquire ) )
0103             return false;                    /* ringbuffer is full */
0104 
0105         new ( buffer + write_index ) T( t ); // copy-construct
0106 
0107         write_index_.store( next, memory_order_release );
0108 
0109         return true;
0110     }
0111 
0112     bool push( T&& t, T* buffer, size_t max_size )
0113     {
0114         const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
0115         const size_t next        = next_index( write_index, max_size );
0116 
0117         if ( next == read_index_.load( memory_order_acquire ) )
0118             return false;                                         /* ringbuffer is full */
0119 
0120         new ( buffer + write_index ) T( std::forward< T >( t ) ); // move-construct
0121 
0122         write_index_.store( next, memory_order_release );
0123 
0124         return true;
0125     }
0126 
0127     size_t push( const T* input_buffer, size_t input_count, T* internal_buffer, size_t max_size )
0128     {
0129         return push( input_buffer, input_buffer + input_count, internal_buffer, max_size ) - input_buffer;
0130     }
0131 
0132     template < typename ConstIterator >
0133     ConstIterator push( ConstIterator begin, ConstIterator end, T* internal_buffer, size_t max_size )
0134     {
0135         // FIXME: avoid std::distance
0136 
0137         const size_t write_index = write_index_.load( memory_order_relaxed ); // only written from push thread
0138         const size_t read_index  = read_index_.load( memory_order_acquire );
0139         const size_t avail       = write_available( write_index, read_index, max_size );
0140 
0141         if ( avail == 0 )
0142             return begin;
0143 
0144         size_t input_count = std::distance( begin, end );
0145         input_count        = ( std::min )( input_count, avail );
0146 
0147         size_t new_write_index = write_index + input_count;
0148 
0149         const ConstIterator last = std::next( begin, input_count );
0150 
0151         if ( write_index + input_count > max_size ) {
0152             /* copy data in two sections */
0153             const size_t        count0   = max_size - write_index;
0154             const ConstIterator midpoint = std::next( begin, count0 );
0155 
0156             std::uninitialized_copy( begin, midpoint, internal_buffer + write_index );
0157             std::uninitialized_copy( midpoint, last, internal_buffer );
0158             new_write_index -= max_size;
0159         } else {
0160             std::uninitialized_copy( begin, last, internal_buffer + write_index );
0161 
0162             if ( new_write_index == max_size )
0163                 new_write_index = 0;
0164         }
0165 
0166         write_index_.store( new_write_index, memory_order_release );
0167         return last;
0168     }
0169 
0170     template < typename Functor >
0171     bool consume_one( Functor&& functor, T* buffer, size_t max_size )
0172     {
0173         const size_t write_index = write_index_.load( memory_order_acquire );
0174         const size_t read_index  = read_index_.load( memory_order_relaxed ); // only written from pop thread
0175         if ( empty( write_index, read_index ) )
0176             return false;
0177 
0178         T& object_to_consume = buffer[ read_index ];
0179         functor( std::move( object_to_consume ) );
0180         object_to_consume.~T();
0181 
0182         size_t next = next_index( read_index, max_size );
0183         read_index_.store( next, memory_order_release );
0184         return true;
0185     }
0186 
0187     template < typename Functor >
0188     size_t consume_all( Functor&& functor, T* internal_buffer, size_t max_size )
0189     {
0190         const size_t write_index = write_index_.load( memory_order_acquire );
0191         const size_t read_index  = read_index_.load( memory_order_relaxed ); // only written from pop thread
0192 
0193         const size_t avail = read_available( write_index, read_index, max_size );
0194 
0195         if ( avail == 0 )
0196             return 0;
0197 
0198         const size_t output_count = avail;
0199 
0200         size_t new_read_index = read_index + output_count;
0201 
0202         if ( read_index + output_count > max_size ) {
0203             /* copy data in two sections */
0204             const size_t count0 = max_size - read_index;
0205             const size_t count1 = output_count - count0;
0206 
0207             run_functor_and_delete( internal_buffer + read_index, internal_buffer + max_size, functor );
0208             run_functor_and_delete( internal_buffer, internal_buffer + count1, functor );
0209 
0210             new_read_index -= max_size;
0211         } else {
0212             run_functor_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, functor );
0213 
0214             if ( new_read_index == max_size )
0215                 new_read_index = 0;
0216         }
0217 
0218         read_index_.store( new_read_index, memory_order_release );
0219         return output_count;
0220     }
0221 
0222     size_t pop( T* output_buffer, size_t output_count, T* internal_buffer, size_t max_size )
0223     {
0224         const size_t write_index = write_index_.load( memory_order_acquire );
0225         const size_t read_index  = read_index_.load( memory_order_relaxed ); // only written from pop thread
0226 
0227         const size_t avail = read_available( write_index, read_index, max_size );
0228 
0229         if ( avail == 0 )
0230             return 0;
0231 
0232         output_count = ( std::min )( output_count, avail );
0233 
0234         size_t new_read_index = read_index + output_count;
0235 
0236         if ( read_index + output_count > max_size ) {
0237             /* copy data in two sections */
0238             const size_t count0 = max_size - read_index;
0239             const size_t count1 = output_count - count0;
0240 
0241             move_and_delete( internal_buffer + read_index, internal_buffer + max_size, output_buffer );
0242             move_and_delete( internal_buffer, internal_buffer + count1, output_buffer + count0 );
0243 
0244             new_read_index -= max_size;
0245         } else {
0246             move_and_delete( internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer );
0247             if ( new_read_index == max_size )
0248                 new_read_index = 0;
0249         }
0250 
0251         read_index_.store( new_read_index, memory_order_release );
0252         return output_count;
0253     }
0254 
0255     template < typename OutputIterator >
0256     size_t pop_to_output_iterator( OutputIterator it, T* internal_buffer, size_t max_size )
0257     {
0258         const size_t write_index = write_index_.load( memory_order_acquire );
0259         const size_t read_index  = read_index_.load( memory_order_relaxed ); // only written from pop thread
0260 
0261         const size_t avail = read_available( write_index, read_index, max_size );
0262         if ( avail == 0 )
0263             return 0;
0264 
0265         size_t new_read_index = read_index + avail;
0266 
0267         if ( read_index + avail > max_size ) {
0268             /* copy data in two sections */
0269             const size_t count0 = max_size - read_index;
0270             const size_t count1 = avail - count0;
0271 
0272             it = move_and_delete( internal_buffer + read_index, internal_buffer + max_size, it );
0273             move_and_delete( internal_buffer, internal_buffer + count1, it );
0274 
0275             new_read_index -= max_size;
0276         } else {
0277             move_and_delete( internal_buffer + read_index, internal_buffer + read_index + avail, it );
0278             if ( new_read_index == max_size )
0279                 new_read_index = 0;
0280         }
0281 
0282         read_index_.store( new_read_index, memory_order_release );
0283         return avail;
0284     }
0285 
0286     const T& front( const T* internal_buffer ) const
0287     {
0288         const size_t read_index = read_index_.load( memory_order_relaxed ); // only written from pop thread
0289         return *( internal_buffer + read_index );
0290     }
0291 
0292     T& front( T* internal_buffer )
0293     {
0294         const size_t read_index = read_index_.load( memory_order_relaxed ); // only written from pop thread
0295         return *( internal_buffer + read_index );
0296     }
0297 #endif
0298 
0299 
0300 public:
0301     /** reset the ringbuffer
0302      *
0303      * \note Not thread-safe
0304      * */
0305     void reset( void )
0306     {
0307         if ( !std::is_trivially_destructible< T >::value ) {
0308             // make sure to call all destructors!
0309             consume_all( []( const T& ) {} );
0310         } else {
0311             write_index_.store( 0, memory_order_relaxed );
0312             read_index_.store( 0, memory_order_release );
0313         }
0314     }
0315 
0316     /** Check if the ringbuffer is empty
0317      *
0318      * \return true, if the ringbuffer is empty, false otherwise
0319      * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
0320      * */
0321     bool empty( void )
0322     {
0323         return empty( write_index_.load( memory_order_relaxed ), read_index_.load( memory_order_relaxed ) );
0324     }
0325 
0326     /**
0327      * \return true, if implementation is lock-free.
0328      *
0329      * */
0330     bool is_lock_free( void ) const
0331     {
0332         return write_index_.is_lock_free() && read_index_.is_lock_free();
0333     }
0334 
0335 private:
0336     bool empty( size_t write_index, size_t read_index )
0337     {
0338         return write_index == read_index;
0339     }
0340 
0341     template < class OutputIterator >
0342     OutputIterator move_and_delete( T* first, T* last, OutputIterator out )
0343     {
0344         if ( std::is_trivially_destructible< T >::value ) {
0345             return std::copy( first, last, out ); // will use memcpy if possible
0346         } else {
0347             for ( ; first != last; ++first, ++out ) {
0348                 *out = std::move( *first );
0349                 first->~T();
0350             }
0351             return out;
0352         }
0353     }
0354 
0355     template < class Functor >
0356     void run_functor_and_delete( T* first, T* last, Functor&& functor )
0357     {
0358         for ( ; first != last; ++first ) {
0359             functor( std::move( *first ) );
0360             first->~T();
0361         }
0362     }
0363 };
0364 
0365 template < typename T, std::size_t MaxSize >
0366 class compile_time_sized_ringbuffer : public ringbuffer_base< T >
0367 {
0368     typedef std::size_t          size_type;
0369     static constexpr std::size_t max_size = MaxSize + 1;
0370 
0371     typedef
0372         typename boost::aligned_storage< max_size * sizeof( T ), boost::alignment_of< T >::value >::type storage_type;
0373 
0374     storage_type storage_;
0375 
0376     T* data()
0377     {
0378         return static_cast< T* >( storage_.address() );
0379     }
0380 
0381     const T* data() const
0382     {
0383         return static_cast< const T* >( storage_.address() );
0384     }
0385 
0386 protected:
0387     size_type max_number_of_elements() const
0388     {
0389         return max_size;
0390     }
0391 
0392     ~compile_time_sized_ringbuffer( void )
0393     {
0394         // destroy all remaining items
0395         consume_all( []( const T& ) {} );
0396     }
0397 
0398 public:
0399     bool push( const T& t )
0400     {
0401         return ringbuffer_base< T >::push( t, data(), max_size );
0402     }
0403 
0404     bool push( T&& t )
0405     {
0406         return ringbuffer_base< T >::push( std::forward< T >( t ), data(), max_size );
0407     }
0408 
0409     template < typename Functor >
0410     bool consume_one( Functor&& f )
0411     {
0412         return ringbuffer_base< T >::consume_one( f, data(), max_size );
0413     }
0414 
0415     template < typename Functor >
0416     size_type consume_all( Functor&& f )
0417     {
0418         return ringbuffer_base< T >::consume_all( f, data(), max_size );
0419     }
0420 
0421     size_type push( T const* t, size_type size )
0422     {
0423         return ringbuffer_base< T >::push( t, size, data(), max_size );
0424     }
0425 
0426     template < size_type size >
0427     size_type push( T const ( &t )[ size ] )
0428     {
0429         return push( t, size );
0430     }
0431 
0432     template < typename ConstIterator >
0433     ConstIterator push( ConstIterator begin, ConstIterator end )
0434     {
0435         return ringbuffer_base< T >::push( begin, end, data(), max_size );
0436     }
0437 
0438     size_type pop( T* ret, size_type size )
0439     {
0440         return ringbuffer_base< T >::pop( ret, size, data(), max_size );
0441     }
0442 
0443     template < typename OutputIterator >
0444     size_type pop_to_output_iterator( OutputIterator it )
0445     {
0446         return ringbuffer_base< T >::pop_to_output_iterator( it, data(), max_size );
0447     }
0448 
0449     const T& front( void ) const
0450     {
0451         return ringbuffer_base< T >::front( data() );
0452     }
0453 
0454     T& front( void )
0455     {
0456         return ringbuffer_base< T >::front( data() );
0457     }
0458 };
0459 
0460 template < typename T, typename Alloc >
0461 class runtime_sized_ringbuffer : public ringbuffer_base< T >, private Alloc
0462 {
0463     typedef std::size_t                        size_type;
0464     size_type                                  max_elements_;
0465     typedef std::allocator_traits< Alloc >     allocator_traits;
0466     typedef typename allocator_traits::pointer pointer;
0467     pointer                                    array_;
0468 
0469 protected:
0470     size_type max_number_of_elements() const
0471     {
0472         return max_elements_;
0473     }
0474 
0475 public:
0476     explicit runtime_sized_ringbuffer( size_type max_elements ) :
0477         max_elements_( max_elements + 1 )
0478     {
0479         Alloc& alloc = *this;
0480         array_       = allocator_traits::allocate( alloc, max_elements_ );
0481     }
0482 
0483     template < typename U >
0484     runtime_sized_ringbuffer( typename boost::allocator_rebind< Alloc, U >::type const& alloc, size_type max_elements ) :
0485         Alloc( alloc ),
0486         max_elements_( max_elements + 1 )
0487     {
0488         Alloc& allocator = *this;
0489         array_           = allocator_traits::allocate( allocator, max_elements_ );
0490     }
0491 
0492     runtime_sized_ringbuffer( Alloc const& alloc, size_type max_elements ) :
0493         Alloc( alloc ),
0494         max_elements_( max_elements + 1 )
0495     {
0496         Alloc& allocator = *this;
0497         array_           = allocator_traits::allocate( allocator, max_elements_ );
0498     }
0499 
0500     ~runtime_sized_ringbuffer( void )
0501     {
0502         // destroy all remaining items
0503         consume_all( []( const T& ) {} );
0504 
0505         Alloc& allocator = *this;
0506         allocator_traits::deallocate( allocator, array_, max_elements_ );
0507     }
0508 
0509     bool push( const T& t )
0510     {
0511         return ringbuffer_base< T >::push( t, &*array_, max_elements_ );
0512     }
0513 
0514     bool push( T&& t )
0515     {
0516         return ringbuffer_base< T >::push( std::forward< T >( t ), &*array_, max_elements_ );
0517     }
0518 
0519     template < typename Functor >
0520     bool consume_one( Functor&& f )
0521     {
0522         return ringbuffer_base< T >::consume_one( f, &*array_, max_elements_ );
0523     }
0524 
0525     template < typename Functor >
0526     size_type consume_all( Functor&& f )
0527     {
0528         return ringbuffer_base< T >::consume_all( f, &*array_, max_elements_ );
0529     }
0530 
0531     size_type push( T const* t, size_type size )
0532     {
0533         return ringbuffer_base< T >::push( t, size, &*array_, max_elements_ );
0534     }
0535 
0536     template < size_type size >
0537     size_type push( T const ( &t )[ size ] )
0538     {
0539         return push( t, size );
0540     }
0541 
0542     template < typename ConstIterator >
0543     ConstIterator push( ConstIterator begin, ConstIterator end )
0544     {
0545         return ringbuffer_base< T >::push( begin, end, &*array_, max_elements_ );
0546     }
0547 
0548     size_type pop( T* ret, size_type size )
0549     {
0550         return ringbuffer_base< T >::pop( ret, size, &*array_, max_elements_ );
0551     }
0552 
0553     template < typename OutputIterator >
0554     size_type pop_to_output_iterator( OutputIterator it )
0555     {
0556         return ringbuffer_base< T >::pop_to_output_iterator( it, &*array_, max_elements_ );
0557     }
0558 
0559     const T& front( void ) const
0560     {
0561         return ringbuffer_base< T >::front( &*array_ );
0562     }
0563 
0564     T& front( void )
0565     {
0566         return ringbuffer_base< T >::front( &*array_ );
0567     }
0568 };
0569 
0570 typedef parameter::parameters< boost::parameter::optional< tag::capacity >, boost::parameter::optional< tag::allocator > >
0571     ringbuffer_signature;
0572 
0573 template < typename T, typename... Options >
0574 struct make_ringbuffer
0575 {
0576     typedef typename ringbuffer_signature::bind< Options... >::type bound_args;
0577 
0578     typedef extract_capacity< bound_args > extract_capacity_t;
0579 
0580     static constexpr bool   runtime_sized = !extract_capacity_t::has_capacity;
0581     static constexpr size_t capacity      = extract_capacity_t::capacity;
0582 
0583     typedef extract_allocator< bound_args, T > extract_allocator_t;
0584     static constexpr bool                      has_allocator = extract_allocator_t::has_allocator;
0585     typedef typename extract_allocator_t::type allocator;
0586 
0587     static constexpr bool signature_is_valid = runtime_sized ? true : !has_allocator;
0588     BOOST_STATIC_ASSERT( signature_is_valid );
0589 
0590     typedef std::conditional_t< runtime_sized,
0591                                 runtime_sized_ringbuffer< T, allocator >,
0592                                 compile_time_sized_ringbuffer< T, capacity > >
0593         ringbuffer_type;
0594 };
0595 
0596 
0597 } /* namespace detail */
0598 
0599 
0600 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
0601  *
0602  *  \b Policies:
0603  *  - \c boost::lockfree::capacity<>, optional <br>
0604  *    If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
0605  *
0606  *  - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
0607  *    Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is
0608  * configured to be sized at run-time
0609  *
0610  *  \b Requirements:
0611  *  - T must have a default constructor
0612  *  - T must be copyable or movable
0613  * */
0614 template < typename T, typename... Options >
0615 #if !defined( BOOST_NO_CXX20_HDR_CONCEPTS )
0616     requires( std::is_default_constructible_v< T >, std::is_move_assignable_v< T > || std::is_copy_assignable_v< T > )
0617 #endif
0618 class spsc_queue : public detail::make_ringbuffer< T, Options... >::ringbuffer_type
0619 {
0620 private:
0621 #ifndef BOOST_DOXYGEN_INVOKED
0622     typedef typename detail::make_ringbuffer< T, Options... >::ringbuffer_type base_type;
0623     static constexpr bool runtime_sized = detail::make_ringbuffer< T, Options... >::runtime_sized;
0624     typedef typename detail::make_ringbuffer< T, Options... >::allocator allocator_arg;
0625 
0626     struct implementation_defined
0627     {
0628         typedef allocator_arg allocator;
0629         typedef std::size_t   size_type;
0630     };
0631 #endif
0632 
0633 public:
0634     typedef T                                          value_type;
0635     typedef typename implementation_defined::allocator allocator;
0636     typedef typename implementation_defined::size_type size_type;
0637 
0638     /** Constructs a spsc_queue
0639      *
0640      *  \pre spsc_queue must be configured to be sized at compile-time
0641      */
0642     spsc_queue( void )
0643 #if !defined( BOOST_NO_CXX20_HDR_CONCEPTS )
0644         requires( !runtime_sized )
0645 #endif
0646     {
0647         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
0648         // this function and this function may be compiled even when it isn't being used.
0649         BOOST_ASSERT( !runtime_sized );
0650     }
0651 
0652     /** Constructs a spsc_queue with a custom allocator
0653      *
0654      *  \pre spsc_queue must be configured to be sized at compile-time
0655      *
0656      *  \note This is just for API compatibility: an allocator isn't actually needed
0657      */
0658     template < typename U, typename Enabler = std::enable_if< !runtime_sized > >
0659     explicit spsc_queue( typename boost::allocator_rebind< allocator, U >::type const& )
0660     {}
0661 
0662     /** Constructs a spsc_queue with a custom allocator
0663      *
0664      *  \pre spsc_queue must be configured to be sized at compile-time
0665      *
0666      *  \note This is just for API compatibility: an allocator isn't actually needed
0667      */
0668     template < typename Enabler = std::enable_if< !runtime_sized > >
0669     explicit spsc_queue( allocator const& )
0670     {}
0671 
0672     /** Constructs a spsc_queue for element_count elements
0673      *
0674      *  \pre spsc_queue must be configured to be sized at run-time
0675      */
0676     template < typename Enabler = std::enable_if< runtime_sized > >
0677     explicit spsc_queue( size_type element_count ) :
0678         base_type( element_count )
0679     {}
0680 
0681     /** Constructs a spsc_queue for element_count elements with a custom allocator
0682      *
0683      *  \pre spsc_queue must be configured to be sized at run-time
0684      */
0685     template < typename U, typename Enabler = std::enable_if< runtime_sized > >
0686     spsc_queue( size_type element_count, typename boost::allocator_rebind< allocator, U >::type const& alloc ) :
0687         base_type( alloc, element_count )
0688     {}
0689 
0690     /** Constructs a spsc_queue for element_count elements with a custom allocator
0691      *
0692      *  \pre spsc_queue must be configured to be sized at run-time
0693      */
0694     template < typename Enabler = std::enable_if< runtime_sized > >
0695     spsc_queue( size_type element_count, allocator_arg const& alloc ) :
0696         base_type( alloc, element_count )
0697     {}
0698 
0699     spsc_queue( const spsc_queue& )            = delete;
0700     spsc_queue& operator=( const spsc_queue& ) = delete;
0701     spsc_queue( spsc_queue&& )                 = delete;
0702     spsc_queue& operator=( spsc_queue&& )      = delete;
0703 
0704     /** Pushes object t to the ringbuffer.
0705      *
0706      * \pre only one thread is allowed to push data to the spsc_queue
0707      * \post object will be pushed to the spsc_queue, unless it is full.
0708      * \return true, if the push operation is successful.
0709      *
0710      * \note Thread-safe and wait-free
0711      * */
0712     bool push( const T& t )
0713     {
0714         return base_type::push( t );
0715     }
0716 
0717     /// \copydoc boost::lockfree::spsc_queue::push(const T& t)
0718     bool push( T&& t )
0719     {
0720         return base_type::push( std::forward< T >( t ) );
0721     }
0722 
0723     /** Pops one object from ringbuffer.
0724      *
0725      * \pre only one thread is allowed to pop data from the spsc_queue
0726      * \post if ringbuffer is not empty, object will be discarded.
0727      * \return true, if the pop operation is successful, false if ringbuffer was empty.
0728      *
0729      * \note Thread-safe and wait-free
0730      */
0731     bool pop()
0732     {
0733         return consume_one( []( const T& ) {} );
0734     }
0735 
0736     /** Pops one object from ringbuffer.
0737      *
0738      * \pre only one thread is allowed to pop data from the spsc_queue
0739      * \post if ringbuffer is not empty, object will be copied to ret.
0740      * \return true, if the pop operation is successful, false if ringbuffer was empty.
0741      *
0742      * \note Thread-safe and wait-free
0743      */
0744     template < typename U, typename Enabler = std::enable_if< std::is_convertible< T, U >::value > >
0745     bool pop( U& ret )
0746     {
0747         return consume_one( [ & ]( T&& t ) {
0748             ret = std::forward< T >( t );
0749         } );
0750     }
0751 
0752 #if !defined( BOOST_NO_CXX17_HDR_OPTIONAL ) || defined( BOOST_DOXYGEN_INVOKED )
0753     /** Pops object from spsc_queue, returning a std::optional<>
0754      *
0755      * \returns `std::optional` with value if successful, `std::nullopt` if spsc_queue is empty.
0756      *
0757      * \note Thread-safe and non-blocking
0758      *
0759      * */
0760     std::optional< T > pop( uses_optional_t )
0761     {
0762         T to_dequeue;
0763         if ( pop( to_dequeue ) )
0764             return to_dequeue;
0765         else
0766             return std::nullopt;
0767     }
0768 
0769     /** Pops object from spsc_queue, returning a std::optional<>
0770      *
0771      * \pre type T must be convertible to U
0772      * \returns `std::optional` with value if successful, `std::nullopt` if spsc_queue is empty.
0773      *
0774      * \note Thread-safe and non-blocking
0775      *
0776      * */
0777     template < typename U >
0778     std::optional< U > pop( uses_optional_t )
0779     {
0780         U to_dequeue;
0781         if ( pop( to_dequeue ) )
0782             return to_dequeue;
0783         else
0784             return std::nullopt;
0785     }
0786 #endif
0787 
0788     /** Pushes as many objects from the array t as there is space.
0789      *
0790      * \pre only one thread is allowed to push data to the spsc_queue
0791      * \return number of pushed items
0792      *
0793      * \note Thread-safe and wait-free
0794      */
0795     size_type push( T const* t, size_type size )
0796     {
0797         return base_type::push( t, size );
0798     }
0799 
0800     /** Pushes as many objects from the array t as there is space available.
0801      *
0802      * \pre only one thread is allowed to push data to the spsc_queue
0803      * \return number of pushed items
0804      *
0805      * \note Thread-safe and wait-free
0806      */
0807     template < size_type size >
0808     size_type push( T const ( &t )[ size ] )
0809     {
0810         return push( t, size );
0811     }
0812 
0813     /** Pushes as many objects from the span t as there is space available.
0814      *
0815      * \pre only one thread is allowed to push data to the spsc_queue
0816      * \return number of pushed items
0817      *
0818      * \note Thread-safe and wait-free
0819      */
0820     template < std::size_t Extent >
0821     size_type push( boost::span< const T, Extent > t )
0822     {
0823         return push( t.data(), t.size() );
0824     }
0825 
0826     /** Pushes as many objects from the range [begin, end) as there is space .
0827      *
0828      * \pre only one thread is allowed to push data to the spsc_queue
0829      * \return iterator to the first element, which has not been pushed
0830      *
0831      * \note Thread-safe and wait-free
0832      */
0833     template < typename ConstIterator >
0834     ConstIterator push( ConstIterator begin, ConstIterator end )
0835     {
0836         return base_type::push( begin, end );
0837     }
0838 
0839     /** Pops a maximum of size objects from ringbuffer.
0840      *
0841      * \pre only one thread is allowed to pop data from the spsc_queue
0842      * \return number of popped items
0843      *
0844      * \note Thread-safe and wait-free
0845      * */
0846     size_type pop( T* ret, size_type size )
0847     {
0848         return base_type::pop( ret, size );
0849     }
0850 
0851     /** Pops a maximum of size objects from spsc_queue.
0852      *
0853      * \pre only one thread is allowed to pop data from the spsc_queue
0854      * \return number of popped items
0855      *
0856      * \note Thread-safe and wait-free
0857      * */
0858     template < size_type size >
0859     size_type pop( T ( &ret )[ size ] )
0860     {
0861         return pop( ret, size );
0862     }
0863 
0864     /** Pops objects to the output iterator it
0865      *
0866      * \pre only one thread is allowed to pop data from the spsc_queue
0867      * \return number of popped items
0868      *
0869      * \note Thread-safe and wait-free
0870      * */
0871     template < typename OutputIterator >
0872     typename std::enable_if< !std::is_convertible< T, OutputIterator >::value, size_type >::type pop( OutputIterator it )
0873     {
0874         return base_type::pop_to_output_iterator( it );
0875     }
0876 
0877     /** consumes one element via a functor
0878      *
0879      *  pops one element from the queue and applies the functor on this object
0880      *
0881      * \returns true, if one element was consumed
0882      *
0883      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
0884      * */
0885     template < typename Functor >
0886     bool consume_one( Functor&& f )
0887     {
0888         return base_type::consume_one( f );
0889     }
0890 
0891     /** consumes all elements via a functor
0892      *
0893      * sequentially pops all elements from the queue and applies the functor on each object
0894      *
0895      * \returns number of elements that are consumed
0896      *
0897      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
0898      * */
0899     template < typename Functor >
0900     size_type consume_all( Functor&& f )
0901     {
0902         return base_type::consume_all( f );
0903     }
0904 
0905     /** get number of elements that are available for read
0906      *
0907      * \return number of available elements that can be popped from the spsc_queue
0908      *
0909      * \note Thread-safe and wait-free, should only be called from the consumer thread
0910      * */
0911     size_type read_available() const
0912     {
0913         return base_type::read_available( base_type::max_number_of_elements() );
0914     }
0915 
0916     /** get write space to write elements
0917      *
0918      * \return number of elements that can be pushed to the spsc_queue
0919      *
0920      * \note Thread-safe and wait-free, should only be called from the producer thread
0921      * */
0922     size_type write_available() const
0923     {
0924         return base_type::write_available( base_type::max_number_of_elements() );
0925     }
0926 
0927     /** get reference to element in the front of the queue
0928      *
0929      * Availability of front element can be checked using read_available().
0930      *
0931      * \pre only a consuming thread is allowed to check front element
0932      * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
0933      * \return reference to the first element in the queue
0934      *
0935      * \note Thread-safe and wait-free
0936      */
0937     const T& front() const
0938     {
0939         BOOST_ASSERT( read_available() > 0 );
0940         return base_type::front();
0941     }
0942 
0943     /// \copydoc boost::lockfree::spsc_queue::front() const
0944     T& front()
0945     {
0946         BOOST_ASSERT( read_available() > 0 );
0947         return base_type::front();
0948     }
0949 
0950     /** reset the ringbuffer
0951      *
0952      * \note Not thread-safe
0953      * */
0954     void reset( void )
0955     {
0956         if ( !std::is_trivially_destructible< T >::value ) {
0957             // make sure to call all destructors!
0958             consume_all( []( const T& ) {} );
0959         } else {
0960             base_type::write_index_.store( 0, memory_order_relaxed );
0961             base_type::read_index_.store( 0, memory_order_release );
0962         }
0963     }
0964 };
0965 
0966 }} // namespace boost::lockfree
0967 
0968 
0969 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */