Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-17 08:35:01

0001 //  lock-free single-producer/single-consumer value
0002 //  implemented via a triple buffer
0003 //
0004 //  Copyright (C) 2023-2024 Tim Blechmann
0005 //
0006 //  Distributed under the Boost Software License, Version 1.0. (See
0007 //  accompanying file LICENSE_1_0.txt or copy at
0008 //  http://www.boost.org/LICENSE_1_0.txt)
0009 
0010 #ifndef BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED
0011 #define BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED
0012 
0013 #include <boost/config.hpp>
0014 
0015 #ifdef BOOST_HAS_PRAGMA_ONCE
0016 #    pragma once
0017 #endif
0018 
0019 #include <boost/lockfree/detail/atomic.hpp>
0020 #include <boost/lockfree/detail/parameter.hpp>
0021 #include <boost/lockfree/detail/uses_optional.hpp>
0022 #include <boost/lockfree/lockfree_forward.hpp>
0023 #include <boost/lockfree/policies.hpp>
0024 
0025 #include <boost/parameter/optional.hpp>
0026 #include <boost/parameter/parameters.hpp>
0027 
0028 #include <array>
0029 #include <cstdint>
0030 
0031 #ifndef BOOST_DOXYGEN_INVOKED
0032 
0033 #    ifdef BOOST_NO_CXX17_IF_CONSTEXPR
0034 #        define ifconstexpr
0035 #    else
0036 #        define ifconstexpr constexpr
0037 #    endif
0038 
0039 #endif
0040 
0041 namespace boost { namespace lockfree {
0042 
0043 /** The spcs_value provides a single-writer/single-reader value, implemented by a triple buffer
0044  *
0045  *  \b Policies:
0046  *  - \ref boost::lockfree::allow_multiple_reads, defaults to
0047  *    \ref boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<false>" \n
0048  *    If multiple reads are allowed, a value written to the spsc_value can be read multiple times, but not moved out
0049  *    of the instance. If multiple reads are not allowed, the class works as single-element queue that overwrites on
0050  *    write
0051  *
0052  * */
0053 template < typename T, typename... Options >
0054 struct spsc_value
0055 {
0056 #ifndef BOOST_DOXYGEN_INVOKED
0057 private:
0058     using spsc_value_signature = parameter::parameters< boost::parameter::optional< tag::allow_multiple_reads > >;
0059     using bound_args           = typename spsc_value_signature::bind< Options... >::type;
0060 
0061     static const bool allow_multiple_reads = detail::extract_allow_multiple_reads< bound_args >::value;
0062 
0063 public:
0064 #endif
0065 
0066     /** Construct a \ref boost::lockfree::spsc_value "spsc_value"
0067      *
0068      *  If configured with \ref boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<true>" it
0069      *  is initialized to a default-constructed value
0070      *
0071      * */
0072     explicit spsc_value()
0073     {
0074         if ifconstexpr ( allow_multiple_reads ) {
0075             // populate initial reader
0076             m_write_index = tagged_index {
0077                 1,
0078             };
0079             m_available_index.store(
0080                 tagged_index {
0081                     0,
0082                     true,
0083                 },
0084                 std::memory_order_relaxed );
0085             m_buffer[ 0 ].value = {};
0086         }
0087     }
0088 
0089     /** Construct a \ref boost::lockfree::spsc_value "spsc_value", initialized to a value
0090      * */
0091     explicit spsc_value( T value ) :
0092         m_write_index {
0093             1,
0094         },
0095         m_available_index {
0096             tagged_index {
0097                 0,
0098                 true,
0099             },
0100         }
0101     {
0102         m_buffer[ 0 ].value = std::move( value );
0103     }
0104 
0105     /** Writes `value` to the \ref boost::lockfree::spsc_value "spsc_value"
0106      *
0107      * \pre  only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
0108      * \post object will be written to the \ref boost::lockfree::spsc_value "spsc_value"
0109      *
0110      * \note Thread-safe and wait-free
0111      * */
0112     void write( T&& value )
0113     {
0114         m_buffer[ m_write_index.index() ].value = std::forward< T >( value );
0115         swap_write_buffer();
0116     }
0117 
0118     /// \copydoc boost::lockfree::spsc_value::write(T&& value)
0119     void write( const T& value )
0120     {
0121         m_buffer[ m_write_index.index() ].value = value;
0122         swap_write_buffer();
0123     }
0124 
0125     /** Reads content of the \ref boost::lockfree::spsc_value "spsc_value"
0126      *
0127      * \pre     only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
0128      * \post    if read operation is successful, object will be copied to `ret`.
0129      * \returns `true`, if the read operation is successful, false if the \ref boost::lockfree::spsc_value "spsc_value" is
0130      *          configured with \ref boost::lockfree::allow_multiple_reads
0131      *          "boost::lockfree::allow_multiple_reads<false>" and no value is available for reading
0132      *
0133      * \note Thread-safe and wait-free
0134      * */
0135     bool read( T& ret )
0136     {
0137 #ifndef BOOST_NO_CXX17_IF_CONSTEXPR
0138         bool read_index_updated = swap_read_buffer();
0139 
0140         if constexpr ( allow_multiple_reads ) {
0141             ret = m_buffer[ m_read_index.index() ].value;
0142         } else {
0143             if ( !read_index_updated )
0144                 return false;
0145             ret = std::move( m_buffer[ m_read_index.index() ].value );
0146         }
0147 
0148         return true;
0149 #else
0150         return read_helper( ret, std::integral_constant< bool, allow_multiple_reads > {} );
0151 #endif
0152     }
0153 
0154 #if !defined( BOOST_NO_CXX17_HDR_OPTIONAL ) || defined( BOOST_DOXYGEN_INVOKED )
0155     /** Reads content of the \ref boost::lockfree::spsc_value "spsc_value", returning an optional
0156      *
0157      * \pre     only one thread is allowed to write data to the \ref boost::lockfree::spsc_value "spsc_value"
0158      * \returns `std::optional` with value if successful, `std::nullopt` if spsc_value is configured with \ref
0159      *          boost::lockfree::allow_multiple_reads "boost::lockfree::allow_multiple_reads<false>" and no value is
0160      *          available for reading
0161      *
0162      * \note Thread-safe and wait-free
0163      * */
0164     std::optional< T > read( uses_optional_t )
0165     {
0166         T to_dequeue;
0167         if ( read( to_dequeue ) )
0168             return to_dequeue;
0169         else
0170             return std::nullopt;
0171     }
0172 #endif
0173 
0174     /** consumes value via a functor
0175      *
0176      *  reads element from the spsc_value and applies the functor on this object
0177      *
0178      * \returns `true`, if element was consumed
0179      *
0180      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
0181      * */
0182 
0183     template < typename Functor >
0184     bool consume( Functor&& f )
0185     {
0186 #ifndef BOOST_NO_CXX17_IF_CONSTEXPR
0187         bool read_index_updated = swap_read_buffer();
0188 
0189         if constexpr ( allow_multiple_reads ) {
0190             f( m_buffer[ m_read_index.index() ].value );
0191         } else {
0192             if ( !read_index_updated )
0193                 return false;
0194             f( std::move( m_buffer[ m_read_index.index() ].value ) );
0195         }
0196 
0197         return true;
0198 #else
0199         return consume_helper( f, std::integral_constant< bool, allow_multiple_reads > {} );
0200 #endif
0201     }
0202 
0203 private:
0204 #ifndef BOOST_DOXYGEN_INVOKED
0205     using allow_multiple_reads_true  = std::true_type;
0206     using allow_multiple_reads_false = std::false_type;
0207 
0208 #    ifdef BOOST_NO_CXX17_IF_CONSTEXPR
0209     template < typename Functor >
0210     bool consume_helper( Functor&& f, allow_multiple_reads_true = {} )
0211     {
0212         swap_read_buffer();
0213         f( m_buffer[ m_read_index.index() ].value );
0214         return true;
0215     }
0216 
0217     template < typename Functor >
0218     bool consume_helper( Functor&& f, allow_multiple_reads_false = {} )
0219     {
0220         bool read_index_updated = swap_read_buffer();
0221         if ( !read_index_updated )
0222             return false;
0223         f( std::move( m_buffer[ m_read_index.index() ].value ) );
0224         return true;
0225     }
0226 
0227     template < typename TT >
0228     bool read_helper( TT& ret, allow_multiple_reads_true = {} )
0229     {
0230         swap_read_buffer();
0231         ret = m_buffer[ m_read_index.index() ].value;
0232         return true;
0233     }
0234 
0235     template < typename TT >
0236     bool read_helper( TT& ret, allow_multiple_reads_false = {} )
0237     {
0238         bool read_index_updated = swap_read_buffer();
0239         if ( !read_index_updated )
0240             return false;
0241         ret = std::move( m_buffer[ m_read_index.index() ].value );
0242         return true;
0243     }
0244 #    endif
0245 
0246     void swap_write_buffer()
0247     {
0248         tagged_index old_avail_index = m_available_index.exchange(
0249             tagged_index {
0250                 m_write_index.index(),
0251                 true,
0252             },
0253             std::memory_order_release );
0254         m_write_index.set_tag_and_index( old_avail_index.index(), false );
0255     }
0256 
0257     bool swap_read_buffer()
0258     {
0259         constexpr bool use_compare_exchange = false; // exchange is most likely faster
0260 
0261         if ifconstexpr ( use_compare_exchange ) {
0262             tagged_index new_avail_index = m_read_index;
0263 
0264             tagged_index current_avail_index_with_tag = tagged_index {
0265                 m_available_index.load( std::memory_order_acquire ).index(),
0266                 true,
0267             };
0268 
0269             if ( m_available_index.compare_exchange_strong( current_avail_index_with_tag,
0270                                                             new_avail_index,
0271                                                             std::memory_order_acquire ) ) {
0272                 m_read_index = tagged_index( current_avail_index_with_tag.index(), false );
0273                 return true;
0274             } else
0275                 return false;
0276         } else {
0277             tagged_index new_avail_index = m_read_index;
0278 
0279             tagged_index current_avail_index = m_available_index.load( std::memory_order_acquire );
0280             if ( !current_avail_index.is_consumable() )
0281                 return false;
0282 
0283             current_avail_index = m_available_index.exchange( new_avail_index, std::memory_order_acquire );
0284             m_read_index        = tagged_index {
0285                 current_avail_index.index(),
0286                 false,
0287             };
0288             return true;
0289         }
0290     }
0291 
0292     struct tagged_index
0293     {
0294         tagged_index( uint8_t index, bool tag = false )
0295         {
0296             set_tag_and_index( index, tag );
0297         }
0298 
0299         uint8_t index() const
0300         {
0301             return byte & 0x07;
0302         }
0303 
0304         bool is_consumable() const
0305         {
0306             return byte & 0x08;
0307         }
0308 
0309         void set_tag_and_index( uint8_t index, bool tag )
0310         {
0311             byte = index | ( tag ? 0x08 : 0x00 );
0312         }
0313 
0314         uint8_t byte;
0315     };
0316 
0317     static constexpr size_t cacheline_bytes = detail::cacheline_bytes;
0318 
0319     struct alignas( cacheline_bytes ) cache_aligned_value
0320     {
0321         T value;
0322     };
0323 
0324     std::array< cache_aligned_value, 3 > m_buffer;
0325 
0326     alignas( cacheline_bytes ) tagged_index m_write_index { 0 };
0327     alignas( cacheline_bytes ) detail::atomic< tagged_index > m_available_index { 1 };
0328     alignas( cacheline_bytes ) tagged_index m_read_index { 2 };
0329 #endif
0330 };
0331 
0332 }} // namespace boost::lockfree
0333 
0334 #undef ifconstexpr
0335 
0336 #endif /* BOOST_LOCKFREE_SPSC_VALUE_HPP_INCLUDED */