Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:39:17

0001 //  lock-free queue from
0002 //  Michael, M. M. and Scott, M. L.,
0003 //  "simple, fast and practical non-blocking and blocking concurrent queue algorithms"
0004 //
0005 //  Copyright (C) 2008-2013 Tim Blechmann
0006 //
0007 //  Distributed under the Boost Software License, Version 1.0. (See
0008 //  accompanying file LICENSE_1_0.txt or copy at
0009 //  http://www.boost.org/LICENSE_1_0.txt)
0010 
0011 #ifndef BOOST_LOCKFREE_FIFO_HPP_INCLUDED
0012 #define BOOST_LOCKFREE_FIFO_HPP_INCLUDED
0013 
0014 #include <boost/assert.hpp>
0015 #include <boost/static_assert.hpp>
0016 #include <boost/core/allocator_access.hpp>
0017 #include <boost/type_traits/has_trivial_assign.hpp>
0018 #include <boost/type_traits/has_trivial_destructor.hpp>
0019 #include <boost/config.hpp> // for BOOST_LIKELY & BOOST_ALIGNMENT
0020 
0021 #include <boost/lockfree/detail/atomic.hpp>
0022 #include <boost/lockfree/detail/copy_payload.hpp>
0023 #include <boost/lockfree/detail/freelist.hpp>
0024 #include <boost/lockfree/detail/parameter.hpp>
0025 #include <boost/lockfree/detail/tagged_ptr.hpp>
0026 
0027 #include <boost/lockfree/lockfree_forward.hpp>
0028 
0029 #ifdef BOOST_HAS_PRAGMA_ONCE
0030 #pragma once
0031 #endif
0032 
0033 
0034 #if defined(_MSC_VER)
0035 #pragma warning(push)
0036 #pragma warning(disable: 4324) // structure was padded due to __declspec(align())
0037 #endif
0038 
0039 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000)
0040 #pragma warning(push)
0041 #pragma warning(disable:488) // template parameter unused in declaring parameter types, 
0042                              // gets erronously triggered the queue constructor which
0043                              // takes an allocator of another type and rebinds it
0044 #endif
0045 
0046 
0047 
0048 namespace boost    {
0049 namespace lockfree {
0050 namespace detail   {
0051 
0052 typedef parameter::parameters<boost::parameter::optional<tag::allocator>,
0053                               boost::parameter::optional<tag::capacity>
0054                              > queue_signature;
0055 
0056 } /* namespace detail */
0057 
0058 
0059 /** The queue class provides a multi-writer/multi-reader queue, pushing and popping is lock-free,
0060  *  construction/destruction has to be synchronized. It uses a freelist for memory management,
0061  *  freed nodes are pushed to the freelist and not returned to the OS before the queue is destroyed.
0062  *
0063  *  \b Policies:
0064  *  - \ref boost::lockfree::fixed_sized, defaults to \c boost::lockfree::fixed_sized<false> \n
0065  *    Can be used to completely disable dynamic memory allocations during push in order to ensure lockfree behavior. \n
0066  *    If the data structure is configured as fixed-sized, the internal nodes are stored inside an array and they are addressed
0067  *    by array indexing. This limits the possible size of the queue to the number of elements that can be addressed by the index
0068  *    type (usually 2**16-2), but on platforms that lack double-width compare-and-exchange instructions, this is the best way
0069  *    to achieve lock-freedom.
0070  *
0071  *  - \ref boost::lockfree::capacity, optional \n
0072  *    If this template argument is passed to the options, the size of the queue is set at compile-time.\n
0073  *    This option implies \c fixed_sized<true>
0074  *
0075  *  - \ref boost::lockfree::allocator, defaults to \c boost::lockfree::allocator<std::allocator<void>> \n
0076  *    Specifies the allocator that is used for the internal freelist
0077  *
0078  *  \b Requirements:
0079  *   - T must have a copy constructor
0080  *   - T must have a trivial assignment operator
0081  *   - T must have a trivial destructor
0082  *
0083  * */
0084 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
0085 template <typename T, class A0, class A1, class A2>
0086 #else
0087 template <typename T, typename ...Options>
0088 #endif
0089 class queue
0090 {
0091 private:
0092 #ifndef BOOST_DOXYGEN_INVOKED
0093 
0094 #ifdef BOOST_HAS_TRIVIAL_DESTRUCTOR
0095     BOOST_STATIC_ASSERT((boost::has_trivial_destructor<T>::value));
0096 #endif
0097 
0098 #ifdef BOOST_HAS_TRIVIAL_ASSIGN
0099     BOOST_STATIC_ASSERT((boost::has_trivial_assign<T>::value));
0100 #endif
0101 
0102 #ifdef BOOST_NO_CXX11_VARIADIC_TEMPLATES
0103     typedef typename detail::queue_signature::bind<A0, A1, A2>::type bound_args;
0104 #else
0105     typedef typename detail::queue_signature::bind<Options...>::type bound_args;
0106 #endif
0107 
0108     static const bool has_capacity = detail::extract_capacity<bound_args>::has_capacity;
0109     static const size_t capacity = detail::extract_capacity<bound_args>::capacity + 1; // the queue uses one dummy node
0110     static const bool fixed_sized = detail::extract_fixed_sized<bound_args>::value;
0111     static const bool node_based = !(has_capacity || fixed_sized);
0112     static const bool compile_time_sized = has_capacity;
0113 
0114     struct BOOST_ALIGNMENT(BOOST_LOCKFREE_CACHELINE_BYTES) node
0115     {
0116         typedef typename detail::select_tagged_handle<node, node_based>::tagged_handle_type tagged_node_handle;
0117         typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
0118 
0119         node(T const & v, handle_type null_handle):
0120             data(v)
0121         {
0122             /* increment tag to avoid ABA problem */
0123             tagged_node_handle old_next = next.load(memory_order_relaxed);
0124             tagged_node_handle new_next (null_handle, old_next.get_next_tag());
0125             next.store(new_next, memory_order_release);
0126         }
0127 
0128         node (handle_type null_handle):
0129             next(tagged_node_handle(null_handle, 0))
0130         {}
0131 
0132         node(void)
0133         {}
0134 
0135         atomic<tagged_node_handle> next;
0136         T data;
0137     };
0138 
0139     typedef typename detail::extract_allocator<bound_args, node>::type node_allocator;
0140     typedef typename detail::select_freelist<node, node_allocator, compile_time_sized, fixed_sized, capacity>::type pool_t;
0141     typedef typename pool_t::tagged_node_handle tagged_node_handle;
0142     typedef typename detail::select_tagged_handle<node, node_based>::handle_type handle_type;
0143 
0144     void initialize(void)
0145     {
0146         node * n = pool.template construct<true, false>(pool.null_handle());
0147         tagged_node_handle dummy_node(pool.get_handle(n), 0);
0148         head_.store(dummy_node, memory_order_relaxed);
0149         tail_.store(dummy_node, memory_order_release);
0150     }
0151 
0152     struct implementation_defined
0153     {
0154         typedef node_allocator allocator;
0155         typedef std::size_t size_type;
0156     };
0157 
0158 #endif
0159 
0160     BOOST_DELETED_FUNCTION(queue(queue const&))
0161     BOOST_DELETED_FUNCTION(queue& operator= (queue const&))
0162 
0163 public:
0164     typedef T value_type;
0165     typedef typename implementation_defined::allocator allocator;
0166     typedef typename implementation_defined::size_type size_type;
0167 
0168     /**
0169      * \return true, if implementation is lock-free.
0170      *
0171      * \warning It only checks, if the queue head and tail nodes and the freelist can be modified in a lock-free manner.
0172      *       On most platforms, the whole implementation is lock-free, if this is true. Using c++0x-style atomics, there is
0173      *       no possibility to provide a completely accurate implementation, because one would need to test every internal
0174      *       node, which is impossible if further nodes will be allocated from the operating system.
0175      * */
0176     bool is_lock_free (void) const
0177     {
0178         return head_.is_lock_free() && tail_.is_lock_free() && pool.is_lock_free();
0179     }
0180 
0181     /** Construct a fixed-sized queue
0182      *
0183      *  \pre Must specify a capacity<> argument
0184      * */
0185     queue(void):
0186         head_(tagged_node_handle(0, 0)),
0187         tail_(tagged_node_handle(0, 0)),
0188         pool(node_allocator(), capacity)
0189     {
0190         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
0191         // this function and this function may be compiled even when it isn't being used.
0192         BOOST_ASSERT(has_capacity);
0193         initialize();
0194     }
0195 
0196     /** Construct a fixed-sized queue with a custom allocator
0197      *
0198      *  \pre Must specify a capacity<> argument
0199      * */
0200     template <typename U>
0201     explicit queue(typename boost::allocator_rebind<node_allocator, U>::type const & alloc):
0202         head_(tagged_node_handle(0, 0)),
0203         tail_(tagged_node_handle(0, 0)),
0204         pool(alloc, capacity)
0205     {
0206         BOOST_STATIC_ASSERT(has_capacity);
0207         initialize();
0208     }
0209 
0210     /** Construct a fixed-sized queue with a custom allocator
0211      *
0212      *  \pre Must specify a capacity<> argument
0213      * */
0214     explicit queue(allocator const & alloc):
0215         head_(tagged_node_handle(0, 0)),
0216         tail_(tagged_node_handle(0, 0)),
0217         pool(alloc, capacity)
0218     {
0219         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
0220         // this function and this function may be compiled even when it isn't being used.
0221         BOOST_ASSERT(has_capacity);
0222         initialize();
0223     }
0224 
0225     /** Construct a variable-sized queue
0226      *
0227      *  Allocate n nodes initially for the freelist
0228      *
0229      *  \pre Must \b not specify a capacity<> argument
0230      * */
0231     explicit queue(size_type n):
0232         head_(tagged_node_handle(0, 0)),
0233         tail_(tagged_node_handle(0, 0)),
0234         pool(node_allocator(), n + 1)
0235     {
0236         // Don't use BOOST_STATIC_ASSERT() here since it will be evaluated when compiling
0237         // this function and this function may be compiled even when it isn't being used.
0238         BOOST_ASSERT(!has_capacity);
0239         initialize();
0240     }
0241 
0242     /** Construct a variable-sized queue with a custom allocator
0243      *
0244      *  Allocate n nodes initially for the freelist
0245      *
0246      *  \pre Must \b not specify a capacity<> argument
0247      * */
0248     template <typename U>
0249     queue(size_type n, typename boost::allocator_rebind<node_allocator, U>::type const & alloc):
0250         head_(tagged_node_handle(0, 0)),
0251         tail_(tagged_node_handle(0, 0)),
0252         pool(alloc, n + 1)
0253     {
0254         BOOST_STATIC_ASSERT(!has_capacity);
0255         initialize();
0256     }
0257 
0258     /** \copydoc boost::lockfree::stack::reserve
0259      * */
0260     void reserve(size_type n)
0261     {
0262         pool.template reserve<true>(n);
0263     }
0264 
0265     /** \copydoc boost::lockfree::stack::reserve_unsafe
0266      * */
0267     void reserve_unsafe(size_type n)
0268     {
0269         pool.template reserve<false>(n);
0270     }
0271 
0272     /** Destroys queue, free all nodes from freelist.
0273      * */
0274     ~queue(void)
0275     {
0276         T dummy;
0277         while(unsynchronized_pop(dummy))
0278         {}
0279 
0280         pool.template destruct<false>(head_.load(memory_order_relaxed));
0281     }
0282 
0283     /** Check if the queue is empty
0284      *
0285      * \return true, if the queue is empty, false otherwise
0286      * \note The result is only accurate, if no other thread modifies the queue. Therefore it is rarely practical to use this
0287      *       value in program logic.
0288      * */
0289     bool empty(void) const
0290     {
0291         return pool.get_handle(head_.load()) == pool.get_handle(tail_.load());
0292     }
0293 
0294     /** Pushes object t to the queue.
0295      *
0296      * \post object will be pushed to the queue, if internal node can be allocated
0297      * \returns true, if the push operation is successful.
0298      *
0299      * \note Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
0300      *                    from the OS. This may not be lock-free.
0301      * */
0302     bool push(T const & t)
0303     {
0304         return do_push<false>(t);
0305     }
0306 
0307     /** Pushes object t to the queue.
0308      *
0309      * \post object will be pushed to the queue, if internal node can be allocated
0310      * \returns true, if the push operation is successful.
0311      *
0312      * \note Thread-safe and non-blocking. If internal memory pool is exhausted, operation will fail
0313      * \throws if memory allocator throws
0314      * */
0315     bool bounded_push(T const & t)
0316     {
0317         return do_push<true>(t);
0318     }
0319 
0320 
0321 private:
0322 #ifndef BOOST_DOXYGEN_INVOKED
0323     template <bool Bounded>
0324     bool do_push(T const & t)
0325     {
0326         node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
0327         handle_type node_handle = pool.get_handle(n);
0328 
0329         if (n == NULL)
0330             return false;
0331 
0332         for (;;) {
0333             tagged_node_handle tail = tail_.load(memory_order_acquire);
0334             node * tail_node = pool.get_pointer(tail);
0335             tagged_node_handle next = tail_node->next.load(memory_order_acquire);
0336             node * next_ptr = pool.get_pointer(next);
0337 
0338             tagged_node_handle tail2 = tail_.load(memory_order_acquire);
0339             if (BOOST_LIKELY(tail == tail2)) {
0340                 if (next_ptr == 0) {
0341                     tagged_node_handle new_tail_next(node_handle, next.get_next_tag());
0342                     if ( tail_node->next.compare_exchange_weak(next, new_tail_next) ) {
0343                         tagged_node_handle new_tail(node_handle, tail.get_next_tag());
0344                         tail_.compare_exchange_strong(tail, new_tail);
0345                         return true;
0346                     }
0347                 }
0348                 else {
0349                     tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
0350                     tail_.compare_exchange_strong(tail, new_tail);
0351                 }
0352             }
0353         }
0354     }
0355 #endif
0356 
0357 public:
0358 
0359     /** Pushes object t to the queue.
0360      *
0361      * \post object will be pushed to the queue, if internal node can be allocated
0362      * \returns true, if the push operation is successful.
0363      *
0364      * \note Not Thread-safe. If internal memory pool is exhausted and the memory pool is not fixed-sized, a new node will be allocated
0365      *       from the OS. This may not be lock-free.
0366      * \throws if memory allocator throws
0367      * */
0368     bool unsynchronized_push(T const & t)
0369     {
0370         node * n = pool.template construct<false, false>(t, pool.null_handle());
0371 
0372         if (n == NULL)
0373             return false;
0374 
0375         for (;;) {
0376             tagged_node_handle tail = tail_.load(memory_order_relaxed);
0377             tagged_node_handle next = tail->next.load(memory_order_relaxed);
0378             node * next_ptr = next.get_ptr();
0379 
0380             if (next_ptr == 0) {
0381                 tail->next.store(tagged_node_handle(n, next.get_next_tag()), memory_order_relaxed);
0382                 tail_.store(tagged_node_handle(n, tail.get_next_tag()), memory_order_relaxed);
0383                 return true;
0384             }
0385             else
0386                 tail_.store(tagged_node_handle(next_ptr, tail.get_next_tag()), memory_order_relaxed);
0387         }
0388     }
0389 
0390     /** Pops object from queue.
0391      *
0392      * \post if pop operation is successful, object will be copied to ret.
0393      * \returns true, if the pop operation is successful, false if queue was empty.
0394      *
0395      * \note Thread-safe and non-blocking
0396      * */
0397     bool pop (T & ret)
0398     {
0399         return pop<T>(ret);
0400     }
0401 
0402     /** Pops object from queue.
0403      *
0404      * \pre type U must be constructible by T and copyable, or T must be convertible to U
0405      * \post if pop operation is successful, object will be copied to ret.
0406      * \returns true, if the pop operation is successful, false if queue was empty.
0407      *
0408      * \note Thread-safe and non-blocking
0409      * */
0410     template <typename U>
0411     bool pop (U & ret)
0412     {
0413         for (;;) {
0414             tagged_node_handle head = head_.load(memory_order_acquire);
0415             node * head_ptr = pool.get_pointer(head);
0416 
0417             tagged_node_handle tail = tail_.load(memory_order_acquire);
0418             tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
0419             node * next_ptr = pool.get_pointer(next);
0420 
0421             tagged_node_handle head2 = head_.load(memory_order_acquire);
0422             if (BOOST_LIKELY(head == head2)) {
0423                 if (pool.get_handle(head) == pool.get_handle(tail)) {
0424                     if (next_ptr == 0)
0425                         return false;
0426 
0427                     tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
0428                     tail_.compare_exchange_strong(tail, new_tail);
0429 
0430                 } else {
0431                     if (next_ptr == 0)
0432                         /* this check is not part of the original algorithm as published by michael and scott
0433                          *
0434                          * however we reuse the tagged_ptr part for the freelist and clear the next part during node
0435                          * allocation. we can observe a null-pointer here.
0436                          * */
0437                         continue;
0438                     detail::copy_payload(next_ptr->data, ret);
0439 
0440                     tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
0441                     if (head_.compare_exchange_weak(head, new_head)) {
0442                         pool.template destruct<true>(head);
0443                         return true;
0444                     }
0445                 }
0446             }
0447         }
0448     }
0449 
0450     /** Pops object from queue.
0451      *
0452      * \post if pop operation is successful, object will be copied to ret.
0453      * \returns true, if the pop operation is successful, false if queue was empty.
0454      *
0455      * \note Not thread-safe, but non-blocking
0456      *
0457      * */
0458     bool unsynchronized_pop (T & ret)
0459     {
0460         return unsynchronized_pop<T>(ret);
0461     }
0462 
0463     /** Pops object from queue.
0464      *
0465      * \pre type U must be constructible by T and copyable, or T must be convertible to U
0466      * \post if pop operation is successful, object will be copied to ret.
0467      * \returns true, if the pop operation is successful, false if queue was empty.
0468      *
0469      * \note Not thread-safe, but non-blocking
0470      *
0471      * */
0472     template <typename U>
0473     bool unsynchronized_pop (U & ret)
0474     {
0475         for (;;) {
0476             tagged_node_handle head = head_.load(memory_order_relaxed);
0477             node * head_ptr = pool.get_pointer(head);
0478             tagged_node_handle tail = tail_.load(memory_order_relaxed);
0479             tagged_node_handle next = head_ptr->next.load(memory_order_relaxed);
0480             node * next_ptr = pool.get_pointer(next);
0481 
0482             if (pool.get_handle(head) == pool.get_handle(tail)) {
0483                 if (next_ptr == 0)
0484                     return false;
0485 
0486                 tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
0487                 tail_.store(new_tail);
0488             } else {
0489                 if (next_ptr == 0)
0490                     /* this check is not part of the original algorithm as published by michael and scott
0491                      *
0492                      * however we reuse the tagged_ptr part for the freelist and clear the next part during node
0493                      * allocation. we can observe a null-pointer here.
0494                      * */
0495                     continue;
0496                 detail::copy_payload(next_ptr->data, ret);
0497                 tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
0498                 head_.store(new_head);
0499                 pool.template destruct<false>(head);
0500                 return true;
0501             }
0502         }
0503     }
0504 
0505     /** consumes one element via a functor
0506      *
0507      *  pops one element from the queue and applies the functor on this object
0508      *
0509      * \returns true, if one element was consumed
0510      *
0511      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
0512      * */
0513     template <typename Functor>
0514     bool consume_one(Functor & f)
0515     {
0516         T element;
0517         bool success = pop(element);
0518         if (success)
0519             f(element);
0520 
0521         return success;
0522     }
0523 
0524     /// \copydoc boost::lockfree::queue::consume_one(Functor & rhs)
0525     template <typename Functor>
0526     bool consume_one(Functor const & f)
0527     {
0528         T element;
0529         bool success = pop(element);
0530         if (success)
0531             f(element);
0532 
0533         return success;
0534     }
0535 
0536     /** consumes all elements via a functor
0537      *
0538      * sequentially pops all elements from the queue and applies the functor on each object
0539      *
0540      * \returns number of elements that are consumed
0541      *
0542      * \note Thread-safe and non-blocking, if functor is thread-safe and non-blocking
0543      * */
0544     template <typename Functor>
0545     size_t consume_all(Functor & f)
0546     {
0547         size_t element_count = 0;
0548         while (consume_one(f))
0549             element_count += 1;
0550 
0551         return element_count;
0552     }
0553 
0554     /// \copydoc boost::lockfree::queue::consume_all(Functor & rhs)
0555     template <typename Functor>
0556     size_t consume_all(Functor const & f)
0557     {
0558         size_t element_count = 0;
0559         while (consume_one(f))
0560             element_count += 1;
0561 
0562         return element_count;
0563     }
0564 
0565 private:
0566 #ifndef BOOST_DOXYGEN_INVOKED
0567     atomic<tagged_node_handle> head_;
0568     static const int padding_size = BOOST_LOCKFREE_CACHELINE_BYTES - sizeof(tagged_node_handle);
0569     char padding1[padding_size];
0570     atomic<tagged_node_handle> tail_;
0571     char padding2[padding_size];
0572 
0573     pool_t pool;
0574 #endif
0575 };
0576 
0577 } /* namespace lockfree */
0578 } /* namespace boost */
0579 
0580 #if defined(BOOST_INTEL) && (BOOST_INTEL_CXX_VERSION > 1000)
0581 #pragma warning(pop)
0582 #endif
0583 
0584 #if defined(_MSC_VER)
0585 #pragma warning(pop)
0586 #endif
0587 
0588 #endif /* BOOST_LOCKFREE_FIFO_HPP_INCLUDED */