Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:39:18

0001 //  lock-free single-producer/single-consumer ringbuffer
0002 //  this algorithm is implemented in various projects (linux kernel)
0003 //
0004 //  Copyright (C) 2009-2013 Tim Blechmann
0005 //
0006 //  Distributed under the Boost Software License, Version 1.0. (See
0007 //  accompanying file LICENSE_1_0.txt or copy at
0008 //  http://www.boost.org/LICENSE_1_0.txt)
0009 
0010 #ifndef BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
0011 #define BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED
0012 
0013 #include <algorithm>
0014 #include <memory>
0015 
0016 #include <boost/aligned_storage.hpp>
0017 #include <boost/assert.hpp>
0018 #include <boost/static_assert.hpp>
0019 #include <boost/core/allocator_access.hpp>
0020 #include <boost/utility.hpp>
0021 #include <boost/next_prior.hpp>
0022 #include <boost/utility/enable_if.hpp>
0023 #include <boost/config.hpp> // for BOOST_LIKELY
0024 
0025 #include <boost/type_traits/has_trivial_destructor.hpp>
0026 #include <boost/type_traits/is_convertible.hpp>
0027 
0028 #include <boost/lockfree/detail/atomic.hpp>
0029 #include <boost/lockfree/detail/copy_payload.hpp>
0030 #include <boost/lockfree/detail/parameter.hpp>
0031 #include <boost/lockfree/detail/prefix.hpp>
0032 
0033 #include <boost/lockfree/lockfree_forward.hpp>
0034 
0035 #ifdef BOOST_HAS_PRAGMA_ONCE
0036 #pragma once
0037 #endif
0038 
0039 namespace boost    {
0040 namespace lockfree {
0041 namespace detail   {
0042 
0043 typedef parameter::parameters<boost::parameter::optional<tag::capacity>,
0044                               boost::parameter::optional<tag::allocator>
0045                              > ringbuffer_signature;
0046 
0047 template <typename T>
0048 class ringbuffer_base
0049 {
0050 #ifndef BOOST_DOXYGEN_INVOKED
0051 protected:
0052     typedef std::size_t size_t;
0053     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(size_t);
0054     atomic<size_t> write_index_;
0055     char padding1[padding_size]; /* force read_index and write_index to different cache lines */
0056     atomic<size_t> read_index_;
0057 
0058     BOOST_DELETED_FUNCTION(ringbuffer_base(ringbuffer_base const&))
0059     BOOST_DELETED_FUNCTION(ringbuffer_base& operator= (ringbuffer_base const&))
0060 
0061 protected:
0062     ringbuffer_base(void):
0063         write_index_(0), read_index_(0)
0064     {}
0065 
0066     static size_t next_index(size_t arg, size_t max_size)
0067     {
0068         size_t ret = arg + 1;
0069         while (BOOST_UNLIKELY(ret >= max_size))
0070             ret -= max_size;
0071         return ret;
0072     }
0073 
0074     static size_t read_available(size_t write_index, size_t read_index, size_t max_size)
0075     {
0076         if (write_index >= read_index)
0077             return write_index - read_index;
0078 
0079         const size_t ret = write_index + max_size - read_index;
0080         return ret;
0081     }
0082 
0083     static size_t write_available(size_t write_index, size_t read_index, size_t max_size)
0084     {
0085         size_t ret = read_index - write_index - 1;
0086         if (write_index >= read_index)
0087             ret += max_size;
0088         return ret;
0089     }
0090 
0091     size_t read_available(size_t max_size) const
0092     {
0093         size_t write_index = write_index_.load(memory_order_acquire);
0094         const size_t read_index  = read_index_.load(memory_order_relaxed);
0095         return read_available(write_index, read_index, max_size);
0096     }
0097 
0098     size_t write_available(size_t max_size) const
0099     {
0100         size_t write_index = write_index_.load(memory_order_relaxed);
0101         const size_t read_index  = read_index_.load(memory_order_acquire);
0102         return write_available(write_index, read_index, max_size);
0103     }
0104 
0105     bool push(T const & t, T * buffer, size_t max_size)
0106     {
0107         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
0108         const size_t next = next_index(write_index, max_size);
0109 
0110         if (next == read_index_.load(memory_order_acquire))
0111             return false; /* ringbuffer is full */
0112 
0113         new (buffer + write_index) T(t); // copy-construct
0114 
0115         write_index_.store(next, memory_order_release);
0116 
0117         return true;
0118     }
0119 
0120     size_t push(const T * input_buffer, size_t input_count, T * internal_buffer, size_t max_size)
0121     {
0122         return push(input_buffer, input_buffer + input_count, internal_buffer, max_size) - input_buffer;
0123     }
0124 
0125     template <typename ConstIterator>
0126     ConstIterator push(ConstIterator begin, ConstIterator end, T * internal_buffer, size_t max_size)
0127     {
0128         // FIXME: avoid std::distance
0129 
0130         const size_t write_index = write_index_.load(memory_order_relaxed);  // only written from push thread
0131         const size_t read_index  = read_index_.load(memory_order_acquire);
0132         const size_t avail = write_available(write_index, read_index, max_size);
0133 
0134         if (avail == 0)
0135             return begin;
0136 
0137         size_t input_count = std::distance(begin, end);
0138         input_count = (std::min)(input_count, avail);
0139 
0140         size_t new_write_index = write_index + input_count;
0141 
0142         const ConstIterator last = boost::next(begin, input_count);
0143 
0144         if (write_index + input_count > max_size) {
0145             /* copy data in two sections */
0146             const size_t count0 = max_size - write_index;
0147             const ConstIterator midpoint = boost::next(begin, count0);
0148 
0149             std::uninitialized_copy(begin, midpoint, internal_buffer + write_index);
0150             std::uninitialized_copy(midpoint, last, internal_buffer);
0151             new_write_index -= max_size;
0152         } else {
0153             std::uninitialized_copy(begin, last, internal_buffer + write_index);
0154 
0155             if (new_write_index == max_size)
0156                 new_write_index = 0;
0157         }
0158 
0159         write_index_.store(new_write_index, memory_order_release);
0160         return last;
0161     }
0162 
0163     template <typename Functor>
0164     bool consume_one(Functor & functor, T * buffer, size_t max_size)
0165     {
0166         const size_t write_index = write_index_.load(memory_order_acquire);
0167         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
0168         if ( empty(write_index, read_index) )
0169             return false;
0170 
0171         T & object_to_consume = buffer[read_index];
0172         functor( object_to_consume );
0173         object_to_consume.~T();
0174 
0175         size_t next = next_index(read_index, max_size);
0176         read_index_.store(next, memory_order_release);
0177         return true;
0178     }
0179 
0180     template <typename Functor>
0181     bool consume_one(Functor const & functor, T * buffer, size_t max_size)
0182     {
0183         const size_t write_index = write_index_.load(memory_order_acquire);
0184         const size_t read_index  = read_index_.load(memory_order_relaxed); // only written from pop thread
0185         if ( empty(write_index, read_index) )
0186             return false;
0187 
0188         T & object_to_consume = buffer[read_index];
0189         functor( object_to_consume );
0190         object_to_consume.~T();
0191 
0192         size_t next = next_index(read_index, max_size);
0193         read_index_.store(next, memory_order_release);
0194         return true;
0195     }
0196 
0197     template <typename Functor>
0198     size_t consume_all (Functor const & functor, T * internal_buffer, size_t max_size)
0199     {
0200         const size_t write_index = write_index_.load(memory_order_acquire);
0201         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
0202 
0203         const size_t avail = read_available(write_index, read_index, max_size);
0204 
0205         if (avail == 0)
0206             return 0;
0207 
0208         const size_t output_count = avail;
0209 
0210         size_t new_read_index = read_index + output_count;
0211 
0212         if (read_index + output_count > max_size) {
0213             /* copy data in two sections */
0214             const size_t count0 = max_size - read_index;
0215             const size_t count1 = output_count - count0;
0216 
0217             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
0218             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
0219 
0220             new_read_index -= max_size;
0221         } else {
0222             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
0223 
0224             if (new_read_index == max_size)
0225                 new_read_index = 0;
0226         }
0227 
0228         read_index_.store(new_read_index, memory_order_release);
0229         return output_count;
0230     }
0231 
0232     template <typename Functor>
0233     size_t consume_all (Functor & functor, T * internal_buffer, size_t max_size)
0234     {
0235         const size_t write_index = write_index_.load(memory_order_acquire);
0236         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
0237 
0238         const size_t avail = read_available(write_index, read_index, max_size);
0239 
0240         if (avail == 0)
0241             return 0;
0242 
0243         const size_t output_count = avail;
0244 
0245         size_t new_read_index = read_index + output_count;
0246 
0247         if (read_index + output_count > max_size) {
0248             /* copy data in two sections */
0249             const size_t count0 = max_size - read_index;
0250             const size_t count1 = output_count - count0;
0251 
0252             run_functor_and_delete(internal_buffer + read_index, internal_buffer + max_size, functor);
0253             run_functor_and_delete(internal_buffer, internal_buffer + count1, functor);
0254 
0255             new_read_index -= max_size;
0256         } else {
0257             run_functor_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, functor);
0258 
0259             if (new_read_index == max_size)
0260                 new_read_index = 0;
0261         }
0262 
0263         read_index_.store(new_read_index, memory_order_release);
0264         return output_count;
0265     }
0266 
0267     size_t pop (T * output_buffer, size_t output_count, T * internal_buffer, size_t max_size)
0268     {
0269         const size_t write_index = write_index_.load(memory_order_acquire);
0270         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
0271 
0272         const size_t avail = read_available(write_index, read_index, max_size);
0273 
0274         if (avail == 0)
0275             return 0;
0276 
0277         output_count = (std::min)(output_count, avail);
0278 
0279         size_t new_read_index = read_index + output_count;
0280 
0281         if (read_index + output_count > max_size) {
0282             /* copy data in two sections */
0283             const size_t count0 = max_size - read_index;
0284             const size_t count1 = output_count - count0;
0285 
0286             copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, output_buffer);
0287             copy_and_delete(internal_buffer, internal_buffer + count1, output_buffer + count0);
0288 
0289             new_read_index -= max_size;
0290         } else {
0291             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + output_count, output_buffer);
0292             if (new_read_index == max_size)
0293                 new_read_index = 0;
0294         }
0295 
0296         read_index_.store(new_read_index, memory_order_release);
0297         return output_count;
0298     }
0299 
0300     template <typename OutputIterator>
0301     size_t pop_to_output_iterator (OutputIterator it, T * internal_buffer, size_t max_size)
0302     {
0303         const size_t write_index = write_index_.load(memory_order_acquire);
0304         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
0305 
0306         const size_t avail = read_available(write_index, read_index, max_size);
0307         if (avail == 0)
0308             return 0;
0309 
0310         size_t new_read_index = read_index + avail;
0311 
0312         if (read_index + avail > max_size) {
0313             /* copy data in two sections */
0314             const size_t count0 = max_size - read_index;
0315             const size_t count1 = avail - count0;
0316 
0317             it = copy_and_delete(internal_buffer + read_index, internal_buffer + max_size, it);
0318             copy_and_delete(internal_buffer, internal_buffer + count1, it);
0319 
0320             new_read_index -= max_size;
0321         } else {
0322             copy_and_delete(internal_buffer + read_index, internal_buffer + read_index + avail, it);
0323             if (new_read_index == max_size)
0324                 new_read_index = 0;
0325         }
0326 
0327         read_index_.store(new_read_index, memory_order_release);
0328         return avail;
0329     }
0330 
0331     const T& front(const T * internal_buffer) const
0332     {
0333         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
0334         return *(internal_buffer + read_index);
0335     }
0336 
0337     T& front(T * internal_buffer)
0338     {
0339         const size_t read_index = read_index_.load(memory_order_relaxed); // only written from pop thread
0340         return *(internal_buffer + read_index);
0341     }
0342 #endif
0343 
0344 
0345 public:
0346     /** reset the ringbuffer
0347      *
0348      * \note Not thread-safe
0349      * */
0350     void reset(void)
0351     {
0352         if ( !boost::has_trivial_destructor<T>::value ) {
0353             // make sure to call all destructors!
0354 
0355             detail::consume_noop consume_functor;
0356             (void)consume_all( consume_functor );
0357         } else {
0358             write_index_.store(0, memory_order_relaxed);
0359             read_index_.store(0, memory_order_release);
0360         }
0361     }
0362 
0363     /** Check if the ringbuffer is empty
0364      *
0365      * \return true, if the ringbuffer is empty, false otherwise
0366      * \note Due to the concurrent nature of the ringbuffer the result may be inaccurate.
0367      * */
0368     bool empty(void)
0369     {
0370         return empty(write_index_.load(memory_order_relaxed), read_index_.load(memory_order_relaxed));
0371     }
0372 
0373     /**
0374      * \return true, if implementation is lock-free.
0375      *
0376      * */
0377     bool is_lock_free(void) const
0378     {
0379         return write_index_.is_lock_free() && read_index_.is_lock_free();
0380     }
0381 
0382 private:
0383     bool empty(size_t write_index, size_t read_index)
0384     {
0385         return write_index == read_index;
0386     }
0387 
0388     template< class OutputIterator >
0389     OutputIterator copy_and_delete( T * first, T * last, OutputIterator out )
0390     {
0391         if (boost::has_trivial_destructor<T>::value) {
0392             return std::copy(first, last, out); // will use memcpy if possible
0393         } else {
0394             for (; first != last; ++first, ++out) {
0395                 *out = *first;
0396                 first->~T();
0397             }
0398             return out;
0399         }
0400     }
0401 
0402     template< class Functor >
0403     void run_functor_and_delete( T * first, T * last, Functor & functor )
0404     {
0405         for (; first != last; ++first) {
0406             functor(*first);
0407             first->~T();
0408         }
0409     }
0410 
0411     template< class Functor >
0412     void run_functor_and_delete( T * first, T * last, Functor const & functor )
0413     {
0414         for (; first != last; ++first) {
0415             functor(*first);
0416             first->~T();
0417         }
0418     }
0419 };
0420 
0421 template <typename T, std::size_t MaxSize>
0422 class compile_time_sized_ringbuffer:
0423     public ringbuffer_base<T>
0424 {
0425     typedef std::size_t size_type;
0426     static const std::size_t max_size = MaxSize + 1;
0427 
0428     typedef typename boost::aligned_storage<max_size * sizeof(T),
0429                                             boost::alignment_of<T>::value
0430                                            >::type storage_type;
0431 
0432     storage_type storage_;
0433 
0434     T * data()
0435     {
0436         return static_cast<T*>(storage_.address());
0437     }
0438 
0439     const T * data() const
0440     {
0441         return static_cast<const T*>(storage_.address());
0442     }
0443 
0444 protected:
0445     size_type max_number_of_elements() const
0446     {
0447         return max_size;
0448     }
0449 
0450     ~compile_time_sized_ringbuffer(void)
0451     {
0452         // destroy all remaining items
0453         detail::consume_noop consume_functor;
0454         (void)consume_all(consume_functor);
0455     }
0456 
0457 public:
0458     bool push(T const & t)
0459     {
0460         return ringbuffer_base<T>::push(t, data(), max_size);
0461     }
0462 
0463     template <typename Functor>
0464     bool consume_one(Functor & f)
0465     {
0466         return ringbuffer_base<T>::consume_one(f, data(), max_size);
0467     }
0468 
0469     template <typename Functor>
0470     bool consume_one(Functor const & f)
0471     {
0472         return ringbuffer_base<T>::consume_one(f, data(), max_size);
0473     }
0474 
0475     template <typename Functor>
0476     size_type consume_all(Functor & f)
0477     {
0478         return ringbuffer_base<T>::consume_all(f, data(), max_size);
0479     }
0480 
0481     template <typename Functor>
0482     size_type consume_all(Functor const & f)
0483     {
0484         return ringbuffer_base<T>::consume_all(f, data(), max_size);
0485     }
0486 
0487     size_type push(T const * t, size_type size)
0488     {
0489         return ringbuffer_base<T>::push(t, size, data(), max_size);
0490     }
0491 
0492     template <size_type size>
0493     size_type push(T const (&t)[size])
0494     {
0495         return push(t, size);
0496     }
0497 
0498     template <typename ConstIterator>
0499     ConstIterator push(ConstIterator begin, ConstIterator end)
0500     {
0501         return ringbuffer_base<T>::push(begin, end, data(), max_size);
0502     }
0503 
0504     size_type pop(T * ret, size_type size)
0505     {
0506         return ringbuffer_base<T>::pop(ret, size, data(), max_size);
0507     }
0508 
0509     template <typename OutputIterator>
0510     size_type pop_to_output_iterator(OutputIterator it)
0511     {
0512         return ringbuffer_base<T>::pop_to_output_iterator(it, data(), max_size);
0513     }
0514 
0515     const T& front(void) const
0516     {
0517         return ringbuffer_base<T>::front(data());
0518     }
0519 
0520     T& front(void)
0521     {
0522         return ringbuffer_base<T>::front(data());
0523     }
0524 };
0525 
0526 template <typename T, typename Alloc>
0527 class runtime_sized_ringbuffer:
0528     public ringbuffer_base<T>,
0529     private Alloc
0530 {
0531     typedef std::size_t size_type;
0532     size_type max_elements_;
0533 #ifdef BOOST_NO_CXX11_ALLOCATOR
0534     typedef typename Alloc::pointer pointer;
0535 #else
0536     typedef std::allocator_traits<Alloc> allocator_traits;
0537     typedef typename allocator_traits::pointer pointer;
0538 #endif
0539     pointer array_;
0540 
0541 protected:
0542     size_type max_number_of_elements() const
0543     {
0544         return max_elements_;
0545     }
0546 
0547 public:
0548     explicit runtime_sized_ringbuffer(size_type max_elements):
0549         max_elements_(max_elements + 1)
0550     {
0551 #ifdef BOOST_NO_CXX11_ALLOCATOR
0552         array_ = Alloc::allocate(max_elements_);
0553 #else
0554         Alloc& alloc = *this;
0555         array_ = allocator_traits::allocate(alloc, max_elements_);
0556 #endif
0557     }
0558 
0559     template <typename U>
0560     runtime_sized_ringbuffer(typename boost::allocator_rebind<Alloc, U>::type const & alloc, size_type max_elements):
0561         Alloc(alloc), max_elements_(max_elements + 1)
0562     {
0563 #ifdef BOOST_NO_CXX11_ALLOCATOR
0564         array_ = Alloc::allocate(max_elements_);
0565 #else
0566         Alloc& allocator = *this;
0567         array_ = allocator_traits::allocate(allocator, max_elements_);
0568 #endif
0569     }
0570 
0571     runtime_sized_ringbuffer(Alloc const & alloc, size_type max_elements):
0572         Alloc(alloc), max_elements_(max_elements + 1)
0573     {
0574 #ifdef BOOST_NO_CXX11_ALLOCATOR
0575         array_ = Alloc::allocate(max_elements_);
0576 #else
0577         Alloc& allocator = *this;
0578         array_ = allocator_traits::allocate(allocator, max_elements_);
0579 #endif
0580     }
0581 
0582     ~runtime_sized_ringbuffer(void)
0583     {
0584         // destroy all remaining items
0585         detail::consume_noop consume_functor;
0586         (void)consume_all(consume_functor);
0587 
0588 #ifdef BOOST_NO_CXX11_ALLOCATOR
0589         Alloc::deallocate(array_, max_elements_);
0590 #else
0591         Alloc& allocator = *this;
0592         allocator_traits::deallocate(allocator, array_, max_elements_);
0593 #endif
0594     }
0595 
0596     bool push(T const & t)
0597     {
0598         return ringbuffer_base<T>::push(t, &*array_, max_elements_);
0599     }
0600 
0601     template <typename Functor>
0602     bool consume_one(Functor & f)
0603     {
0604         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
0605     }
0606 
0607     template <typename Functor>
0608     bool consume_one(Functor const & f)
0609     {
0610         return ringbuffer_base<T>::consume_one(f, &*array_, max_elements_);
0611     }
0612 
0613     template <typename Functor>
0614     size_type consume_all(Functor & f)
0615     {
0616         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
0617     }
0618 
0619     template <typename Functor>
0620     size_type consume_all(Functor const & f)
0621     {
0622         return ringbuffer_base<T>::consume_all(f, &*array_, max_elements_);
0623     }
0624 
0625     size_type push(T const * t, size_type size)
0626     {
0627         return ringbuffer_base<T>::push(t, size, &*array_, max_elements_);
0628     }
0629 
0630     template <size_type size>
0631     size_type push(T const (&t)[size])
0632     {
0633         return push(t, size);
0634     }
0635 
0636     template <typename ConstIterator>
0637     ConstIterator push(ConstIterator begin, ConstIterator end)
0638     {
0639         return ringbuffer_base<T>::push(begin, end, &*array_, max_elements_);
0640     }
0641 
0642     size_type pop(T * ret, size_type size)
0643     {
0644         return ringbuffer_base<T>::pop(ret, size, &*array_, max_elements_);
0645     }
0646 
0647     template <typename OutputIterator>
0648     size_type pop_to_output_iterator(OutputIterator it)
0649     {
0650         return ringbuffer_base<T>::pop_to_output_iterator(it, &*array_, max_elements_);
0651     }
0652 
0653     const T& front(void) const
0654     {
0655         return ringbuffer_base<T>::front(&*array_);
0656     }
0657 
0658     T& front(void)
0659     {
0660         return ringbuffer_base<T>::front(&*array_);
0661     }
0662 };
0663 
0664 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
0665 template <typename T, typename A0, typename A1>
0666 #else
0667 template <typename T, typename ...Options>
0668 #endif
0669 struct make_ringbuffer
0670 {
0671 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
0672     typedef typename ringbuffer_signature::bind<A0, A1>::type bound_args;
0673 #else
0674     typedef typename ringbuffer_signature::bind<Options...>::type bound_args;
0675 #endif
0676 
0677     typedef extract_capacity<bound_args> extract_capacity_t;
0678 
0679     static const bool runtime_sized = !extract_capacity_t::has_capacity;
0680     static const size_t capacity    =  extract_capacity_t::capacity;
0681 
0682     typedef extract_allocator<bound_args, T> extract_allocator_t;
0683     typedef typename extract_allocator_t::type allocator;
0684 
0685     // allocator argument is only sane, for run-time sized ringbuffers
0686     BOOST_STATIC_ASSERT((mpl::if_<mpl::bool_<!runtime_sized>,
0687                                   mpl::bool_<!extract_allocator_t::has_allocator>,
0688                                   mpl::true_
0689                                  >::type::value));
0690 
0691     typedef typename mpl::if_c<runtime_sized,
0692                                runtime_sized_ringbuffer<T, allocator>,
0693                                compile_time_sized_ringbuffer<T, capacity>
0694                               >::type ringbuffer_type;
0695 };
0696 
0697 
0698 } /* namespace detail */
0699 
0700 
0701 /** The spsc_queue class provides a single-writer/single-reader fifo queue, pushing and popping is wait-free.
0702  *
0703  *  \b Policies:
0704  *  - \c boost::lockfree::capacity<>, optional <br>
0705  *    If this template argument is passed to the options, the size of the ringbuffer is set at compile-time.
0706  *
0707  *  - \c boost::lockfree::allocator<>, defaults to \c boost::lockfree::allocator<std::allocator<T>> <br>
0708  *    Specifies the allocator that is used to allocate the ringbuffer. This option is only valid, if the ringbuffer is configured
0709  *    to be sized at run-time
0710  *
0711  *  \b Requirements:
0712  *  - T must have a default constructor
0713  *  - T must be copyable
0714  * */
0715 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
0716 template <typename T, class A0, class A1>
0717 #else
0718 template <typename T, typename ...Options>
0719 #endif
0720 class spsc_queue:
0721 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
0722     public detail::make_ringbuffer<T, A0, A1>::ringbuffer_type
0723 #else
0724     public detail::make_ringbuffer<T, Options...>::ringbuffer_type
0725 #endif
0726 {
0727 private:
0728 
0729 #ifndef BOOST_DOXYGEN_INVOKED
0730 
0731 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
0732     typedef typename detail::make_ringbuffer<T, A0, A1>::ringbuffer_type base_type;
0733     static const bool runtime_sized = detail::make_ringbuffer<T, A0, A1>::runtime_sized;
0734     typedef typename detail::make_ringbuffer<T, A0, A1>::allocator allocator_arg;
0735 #else
0736     typedef typename detail::make_ringbuffer<T, Options...>::ringbuffer_type base_type;
0737     static const bool runtime_sized = detail::make_ringbuffer<T, Options...>::runtime_sized;
0738     typedef typename detail::make_ringbuffer<T, Options...>::allocator allocator_arg;
0739 #endif
0740 
0741 
0742     struct implementation_defined
0743     {
0744         typedef allocator_arg allocator;
0745         typedef std::size_t size_type;
0746     };
0747 #endif
0748 
0749 public:
0750     typedef T value_type;
0751     typedef typename implementation_defined::allocator allocator;
0752     typedef typename implementation_defined::size_type size_type;
0753 
0754     /** Constructs a spsc_queue
0755      *
0756      *  \pre spsc_queue must be configured to be sized at compile-time
0757      */
0758     spsc_queue(void)
0759     {
0760         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
0761         // this function and this function may be compiled even when it isn't being used.
0762         BOOST_ASSERT(!runtime_sized);
0763     }
0764 
0765     /** Constructs a spsc_queue with a custom allocator
0766      *
0767      *  \pre spsc_queue must be configured to be sized at compile-time
0768      *
0769      *  \note This is just for API compatibility: an allocator isn't actually needed
0770      */
0771     template <typename U>
0772     explicit spsc_queue(typename boost::allocator_rebind<allocator, U>::type const &)
0773     {
0774         BOOST_STATIC_ASSERT(!runtime_sized);
0775     }
0776 
0777     /** Constructs a spsc_queue with a custom allocator
0778      *
0779      *  \pre spsc_queue must be configured to be sized at compile-time
0780      *
0781      *  \note This is just for API compatibility: an allocator isn't actually needed
0782      */
0783     explicit spsc_queue(allocator const &)
0784     {
0785         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
0786         // this function and this function may be compiled even when it isn't being used.
0787         BOOST_ASSERT(!runtime_sized);
0788     }
0789 
0790     /** Constructs a spsc_queue for element_count elements
0791      *
0792      *  \pre spsc_queue must be configured to be sized at run-time
0793      */
0794     explicit spsc_queue(size_type element_count):
0795         base_type(element_count)
0796     {
0797         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
0798         // this function and this function may be compiled even when it isn't being used.
0799         BOOST_ASSERT(runtime_sized);
0800     }
0801 
0802     /** Constructs a spsc_queue for element_count elements with a custom allocator
0803      *
0804      *  \pre spsc_queue must be configured to be sized at run-time
0805      */
0806     template <typename U>
0807     spsc_queue(size_type element_count, typename boost::allocator_rebind<allocator, U>::type const & alloc):
0808         base_type(alloc, element_count)
0809     {
0810         BOOST_STATIC_ASSERT(runtime_sized);
0811     }
0812 
0813     /** Constructs a spsc_queue for element_count elements with a custom allocator
0814      *
0815      *  \pre spsc_queue must be configured to be sized at run-time
0816      */
0817     spsc_queue(size_type element_count, allocator_arg const & alloc):
0818         base_type(alloc, element_count)
0819     {
0820         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
0821         // this function and this function may be compiled even when it isn't being used.
0822         BOOST_ASSERT(runtime_sized);
0823     }
0824 
0825     /** Pushes object t to the ringbuffer.
0826      *
0827      * \pre only one thread is allowed to push data to the spsc_queue
0828      * \post object will be pushed to the spsc_queue, unless it is full.
0829      * \return true, if the push operation is successful.
0830      *
0831      * \note Thread-safe and wait-free
0832      * */
0833     bool push(T const & t)
0834     {
0835         return base_type::push(t);
0836     }
0837 
0838     /** Pops one object from ringbuffer.
0839      *
0840      * \pre only one thread is allowed to pop data to the spsc_queue
0841      * \post if ringbuffer is not empty, object will be discarded.
0842      * \return true, if the pop operation is successful, false if ringbuffer was empty.
0843      *
0844      * \note Thread-safe and wait-free
0845      */
0846     bool pop ()
0847     {
0848         detail::consume_noop consume_functor;
0849         return consume_one( consume_functor );
0850     }
0851 
0852     /** Pops one object from ringbuffer.
0853      *
0854      * \pre only one thread is allowed to pop data to the spsc_queue
0855      * \post if ringbuffer is not empty, object will be copied to ret.
0856      * \return true, if the pop operation is successful, false if ringbuffer was empty.
0857      *
0858      * \note Thread-safe and wait-free
0859      */
0860     template <typename U>
0861     typename boost::enable_if<typename is_convertible<T, U>::type, bool>::type
0862     pop (U & ret)
0863     {
0864         detail::consume_via_copy<U> consume_functor(ret);
0865         return consume_one( consume_functor );
0866     }
0867 
0868     /** Pushes as many objects from the array t as there is space.
0869      *
0870      * \pre only one thread is allowed to push data to the spsc_queue
0871      * \return number of pushed items
0872      *
0873      * \note Thread-safe and wait-free
0874      */
0875     size_type push(T const * t, size_type size)
0876     {
0877         return base_type::push(t, size);
0878     }
0879 
0880     /** Pushes as many objects from the array t as there is space available.
0881      *
0882      * \pre only one thread is allowed to push data to the spsc_queue
0883      * \return number of pushed items
0884      *
0885      * \note Thread-safe and wait-free
0886      */
0887     template <size_type size>
0888     size_type push(T const (&t)[size])
0889     {
0890         return push(t, size);
0891     }
0892 
0893     /** Pushes as many objects from the range [begin, end) as there is space .
0894      *
0895      * \pre only one thread is allowed to push data to the spsc_queue
0896      * \return iterator to the first element, which has not been pushed
0897      *
0898      * \note Thread-safe and wait-free
0899      */
0900     template <typename ConstIterator>
0901     ConstIterator push(ConstIterator begin, ConstIterator end)
0902     {
0903         return base_type::push(begin, end);
0904     }
0905 
0906     /** Pops a maximum of size objects from ringbuffer.
0907      *
0908      * \pre only one thread is allowed to pop data to the spsc_queue
0909      * \return number of popped items
0910      *
0911      * \note Thread-safe and wait-free
0912      * */
0913     size_type pop(T * ret, size_type size)
0914     {
0915         return base_type::pop(ret, size);
0916     }
0917 
0918     /** Pops a maximum of size objects from spsc_queue.
0919      *
0920      * \pre only one thread is allowed to pop data to the spsc_queue
0921      * \return number of popped items
0922      *
0923      * \note Thread-safe and wait-free
0924      * */
0925     template <size_type size>
0926     size_type pop(T (&ret)[size])
0927     {
0928         return pop(ret, size);
0929     }
0930 
0931     /** Pops objects to the output iterator it
0932      *
0933      * \pre only one thread is allowed to pop data to the spsc_queue
0934      * \return number of popped items
0935      *
0936      * \note Thread-safe and wait-free
0937      * */
0938     template <typename OutputIterator>
0939     typename boost::disable_if<typename is_convertible<T, OutputIterator>::type, size_type>::type
0940     pop(OutputIterator it)
0941     {
0942         return base_type::pop_to_output_iterator(it);
0943     }
0944 
0945     /** consumes one element via a functor
0946      *
0947      *  pops one element from the queue and applies the functor on this object
0948      *
0949      * \returns true, if one element was consumed
0950      *
0951      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
0952      * */
0953     template <typename Functor>
0954     bool consume_one(Functor & f)
0955     {
0956         return base_type::consume_one(f);
0957     }
0958 
0959     /// \copydoc boost::lockfree::spsc_queue::consume_one(Functor & rhs)
0960     template <typename Functor>
0961     bool consume_one(Functor const & f)
0962     {
0963         return base_type::consume_one(f);
0964     }
0965 
0966     /** consumes all elements via a functor
0967      *
0968      * sequentially pops all elements from the queue and applies the functor on each object
0969      *
0970      * \returns number of elements that are consumed
0971      *
0972      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
0973      * */
0974     template <typename Functor>
0975     size_type consume_all(Functor & f)
0976     {
0977         return base_type::consume_all(f);
0978     }
0979 
0980     /// \copydoc boost::lockfree::spsc_queue::consume_all(Functor & rhs)
0981     template <typename Functor>
0982     size_type consume_all(Functor const & f)
0983     {
0984         return base_type::consume_all(f);
0985     }
0986 
0987     /** get number of elements that are available for read
0988      *
0989      * \return number of available elements that can be popped from the spsc_queue
0990      *
0991      * \note Thread-safe and wait-free, should only be called from the consumer thread
0992      * */
0993     size_type read_available() const
0994     {
0995         return base_type::read_available(base_type::max_number_of_elements());
0996     }
0997 
0998     /** get write space to write elements
0999      *
1000      * \return number of elements that can be pushed to the spsc_queue
1001      *
1002      * \note Thread-safe and wait-free, should only be called from the producer thread
1003      * */
1004     size_type write_available() const
1005     {
1006         return base_type::write_available(base_type::max_number_of_elements());
1007     }
1008 
1009     /** get reference to element in the front of the queue
1010      *
1011      * Availability of front element can be checked using read_available().
1012      *
1013      * \pre only a consuming thread is allowed to check front element
1014      * \pre read_available() > 0. If ringbuffer is empty, it's undefined behaviour to invoke this method.
1015      * \return reference to the first element in the queue
1016      *
1017      * \note Thread-safe and wait-free
1018      */
1019     const T& front() const
1020     {
1021         BOOST_ASSERT(read_available() > 0);
1022         return base_type::front();
1023     }
1024 
1025     /// \copydoc boost::lockfree::spsc_queue::front() const
1026     T& front()
1027     {
1028         BOOST_ASSERT(read_available() > 0);
1029         return base_type::front();
1030     }
1031 
1032     /** reset the ringbuffer
1033      *
1034      * \note Not thread-safe
1035      * */
1036     void reset(void)
1037     {
1038         if ( !boost::has_trivial_destructor<T>::value ) {
1039             // make sure to call all destructors!
1040 
1041             detail::consume_noop consume_functor;
1042             (void)consume_all(consume_functor);
1043         } else {
1044             base_type::write_index_.store(0, memory_order_relaxed);
1045             base_type::read_index_.store(0, memory_order_release);
1046         }
1047    }
1048 };
1049 
1050 } /* namespace lockfree */
1051 } /* namespace boost */
1052 
1053 
1054 #endif /* BOOST_LOCKFREE_SPSC_QUEUE_HPP_INCLUDED */