Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/oneapi/tbb/concurrent_queue.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 /*
0002     Copyright (c) 2005-2023 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_concurrent_queue_H
0018 #define __TBB_concurrent_queue_H
0019 
0020 #include "detail/_namespace_injection.h"
0021 #include "detail/_concurrent_queue_base.h"
0022 #include "detail/_allocator_traits.h"
0023 #include "detail/_exception.h"
0024 #include "detail/_containers_helpers.h"
0025 #include "cache_aligned_allocator.h"
0026 
0027 namespace tbb {
0028 namespace detail {
0029 namespace d2 {
0030 
0031 template <typename QueueRep, typename Allocator>
0032 std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
0033     ticket_type ticket{};
0034     do {
0035         // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
0036         ticket = queue.head_counter.load(std::memory_order_acquire);
0037         do {
0038             if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
0039                 // Queue is empty
0040                 return { false, ticket };
0041             }
0042             // Queue had item with ticket k when we looked.  Attempt to get that item.
0043             // Another thread snatched the item, retry.
0044         } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
0045     } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
0046     return { true, ticket };
0047 }
0048 
0049 // A high-performance thread-safe non-blocking concurrent queue.
0050 // Multiple threads may each push and pop concurrently.
0051 // Assignment construction is not allowed.
0052 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
0053 class concurrent_queue {
0054     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
0055     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
0056     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
0057     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
0058 public:
0059     using size_type = std::size_t;
0060     using value_type = T;
0061     using reference = T&;
0062     using const_reference = const T&;
0063     using difference_type = std::ptrdiff_t;
0064 
0065     using allocator_type = Allocator;
0066     using pointer = typename allocator_traits_type::pointer;
0067     using const_pointer = typename allocator_traits_type::const_pointer;
0068 
0069     using iterator = concurrent_queue_iterator<concurrent_queue, T, Allocator>;
0070     using const_iterator = concurrent_queue_iterator<concurrent_queue, const T, Allocator>;
0071 
0072     concurrent_queue() : concurrent_queue(allocator_type()) {}
0073 
0074     explicit concurrent_queue(const allocator_type& a) :
0075         my_allocator(a), my_queue_representation(nullptr)
0076     {
0077         my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
0078         queue_allocator_traits::construct(my_allocator, my_queue_representation);
0079 
0080         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
0081         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
0082         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
0083         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
0084     }
0085 
0086     template <typename InputIterator>
0087     concurrent_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
0088         concurrent_queue(a)
0089     {
0090         for (; begin != end; ++begin)
0091             push(*begin);
0092     }
0093 
0094     concurrent_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ) :
0095         concurrent_queue(init.begin(), init.end(), alloc)
0096     {}
0097 
0098     concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
0099         concurrent_queue(a)
0100     {
0101         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
0102     }
0103 
0104     concurrent_queue(const concurrent_queue& src) :
0105         concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
0106     {
0107         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
0108     }
0109 
0110     // Move constructors
0111     concurrent_queue(concurrent_queue&& src) :
0112         concurrent_queue(std::move(src.my_allocator))
0113     {
0114         internal_swap(src);
0115     }
0116 
0117     concurrent_queue(concurrent_queue&& src, const allocator_type& a) :
0118         concurrent_queue(a)
0119     {
0120         // checking that memory allocated by one instance of allocator can be deallocated
0121         // with another
0122         if (my_allocator == src.my_allocator) {
0123             internal_swap(src);
0124         } else {
0125             // allocators are different => performing per-element move
0126             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
0127             src.clear();
0128         }
0129     }
0130 
0131     // Destroy queue
0132     ~concurrent_queue() {
0133         clear();
0134         my_queue_representation->clear(my_allocator);
0135         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
0136         r1::cache_aligned_deallocate(my_queue_representation);
0137     }
0138 
0139     concurrent_queue& operator=( const concurrent_queue& other ) {
0140         //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
0141         if (my_queue_representation != other.my_queue_representation) {
0142             clear();
0143             my_allocator = other.my_allocator;
0144             my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
0145         }
0146         return *this;
0147     }
0148 
0149     concurrent_queue& operator=( concurrent_queue&& other ) {
0150         //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
0151         if (my_queue_representation != other.my_queue_representation) {
0152             clear();
0153             if (my_allocator == other.my_allocator) {
0154                 internal_swap(other);
0155             } else {
0156                 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
0157                 other.clear();
0158                 my_allocator = std::move(other.my_allocator);
0159             }
0160         }
0161         return *this;
0162     }
0163 
0164     concurrent_queue& operator=( std::initializer_list<value_type> init ) {
0165         assign(init);
0166         return *this;
0167     }
0168 
0169     template <typename InputIterator>
0170     void assign( InputIterator first, InputIterator last ) {
0171         concurrent_queue src(first, last);
0172         clear();
0173         my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
0174     }
0175 
0176     void assign( std::initializer_list<value_type> init ) {
0177         assign(init.begin(), init.end());
0178     }
0179 
0180     void swap ( concurrent_queue& other ) {
0181         //TODO: implement support for std::allocator_traits::propagate_on_container_swap
0182         __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
0183         internal_swap(other);
0184     }
0185 
0186     // Enqueue an item at tail of queue.
0187     void push(const T& value) {
0188         internal_push(value);
0189     }
0190 
0191     void push(T&& value) {
0192         internal_push(std::move(value));
0193     }
0194 
0195     template <typename... Args>
0196     void emplace( Args&&... args ) {
0197         internal_push(std::forward<Args>(args)...);
0198     }
0199 
0200     // Attempt to dequeue an item from head of queue.
0201     /** Does not wait for item to become available.
0202         Returns true if successful; false otherwise. */
0203     bool try_pop( T& result ) {
0204         return internal_try_pop(&result);
0205     }
0206 
0207     // Return the number of items in the queue; thread unsafe
0208     size_type unsafe_size() const {
0209         std::ptrdiff_t size = my_queue_representation->size();
0210         return size < 0 ? 0 :  size_type(size);
0211     }
0212 
0213     // Equivalent to size()==0.
0214     __TBB_nodiscard bool empty() const {
0215         return my_queue_representation->empty();
0216     }
0217 
0218     // Clear the queue. not thread-safe.
0219     void clear() {
0220         my_queue_representation->clear(my_allocator);
0221     }
0222 
0223     // Return allocator object
0224     allocator_type get_allocator() const { return my_allocator; }
0225 
0226     //------------------------------------------------------------------------
0227     // The iterators are intended only for debugging.  They are slow and not thread safe.
0228     //------------------------------------------------------------------------
0229 
0230     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
0231     iterator unsafe_end() { return iterator(); }
0232     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
0233     const_iterator unsafe_end() const { return const_iterator(); }
0234     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
0235     const_iterator unsafe_cend() const { return const_iterator(); }
0236 
0237 private:
0238     void internal_swap(concurrent_queue& src) {
0239         using std::swap;
0240         swap(my_queue_representation, src.my_queue_representation);
0241     }
0242 
0243     template <typename... Args>
0244     void internal_push( Args&&... args ) {
0245         ticket_type k = my_queue_representation->tail_counter++;
0246         my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
0247     }
0248 
0249     bool internal_try_pop( void* dst ) {
0250         return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
0251     }
0252 
0253     template <typename Container, typename Value, typename A>
0254     friend class concurrent_queue_iterator;
0255 
0256     static void copy_construct_item(T* location, const void* src) {
0257         // TODO: use allocator_traits for copy construction
0258         new (location) value_type(*static_cast<const value_type*>(src));
0259         // queue_allocator_traits::construct(my_allocator, location, *static_cast<const T*>(src));
0260     }
0261 
0262     static void move_construct_item(T* location, const void* src) {
0263         // TODO: use allocator_traits for move construction
0264         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
0265     }
0266 
0267     queue_allocator_type my_allocator;
0268     queue_representation_type* my_queue_representation;
0269 
0270     friend void swap( concurrent_queue& lhs, concurrent_queue& rhs ) {
0271         lhs.swap(rhs);
0272     }
0273 
0274     friend bool operator==( const concurrent_queue& lhs, const concurrent_queue& rhs ) {
0275         return lhs.unsafe_size() == rhs.unsafe_size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
0276     }
0277 
0278 #if !__TBB_CPP20_COMPARISONS_PRESENT
0279     friend bool operator!=( const concurrent_queue& lhs,  const concurrent_queue& rhs ) {
0280         return !(lhs == rhs);
0281     }
0282 #endif // __TBB_CPP20_COMPARISONS_PRESENT
0283 }; // class concurrent_queue
0284 
0285 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
0286 // Deduction guide for the constructor from two iterators
0287 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
0288           typename = std::enable_if_t<is_input_iterator_v<It>>,
0289           typename = std::enable_if_t<is_allocator_v<Alloc>>>
0290 concurrent_queue( It, It, Alloc = Alloc() )
0291 -> concurrent_queue<iterator_value_t<It>, Alloc>;
0292 
0293 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
0294 
0295 class concurrent_monitor;
0296 
0297 // The concurrent monitor tags for concurrent_bounded_queue.
0298 static constexpr std::size_t cbq_slots_avail_tag = 0;
0299 static constexpr std::size_t cbq_items_avail_tag = 1;
0300 } // namespace d2
0301 
0302 
0303 namespace r1 {
0304     class concurrent_monitor;
0305 
0306     TBB_EXPORT std::uint8_t* __TBB_EXPORTED_FUNC allocate_bounded_queue_rep( std::size_t queue_rep_size );
0307     TBB_EXPORT void __TBB_EXPORTED_FUNC deallocate_bounded_queue_rep( std::uint8_t* mem, std::size_t queue_rep_size );
0308     TBB_EXPORT void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors );
0309     TBB_EXPORT void __TBB_EXPORTED_FUNC notify_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag
0310                                                             , std::size_t ticket );
0311     TBB_EXPORT void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitors, std::size_t monitor_tag,
0312                                                             std::ptrdiff_t target, d1::delegate_base& predicate );
0313 } // namespace r1
0314 
0315 
0316 namespace d2 {
0317 // A high-performance thread-safe blocking concurrent bounded queue.
0318 // Supports boundedness and blocking semantics.
0319 // Multiple threads may each push and pop concurrently.
0320 // Assignment construction is not allowed.
0321 template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
0322 class concurrent_bounded_queue {
0323     using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
0324     using queue_representation_type = concurrent_queue_rep<T, Allocator>;
0325     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_representation_type>;
0326     using queue_allocator_traits = tbb::detail::allocator_traits<queue_allocator_type>;
0327 
0328     template <typename FuncType>
0329     void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
0330         d1::delegated_function<FuncType> func(pred);
0331         r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
0332     }
0333 public:
0334     using size_type = std::ptrdiff_t;
0335     using value_type = T;
0336     using reference = T&;
0337     using const_reference = const T&;
0338     using difference_type = std::ptrdiff_t;
0339 
0340     using allocator_type = Allocator;
0341     using pointer = typename allocator_traits_type::pointer;
0342     using const_pointer = typename allocator_traits_type::const_pointer;
0343 
0344     using iterator = concurrent_queue_iterator<concurrent_bounded_queue, T, Allocator>;
0345     using const_iterator = concurrent_queue_iterator<concurrent_bounded_queue, const T, Allocator> ;
0346 
0347     concurrent_bounded_queue() : concurrent_bounded_queue(allocator_type()) {}
0348 
0349     explicit concurrent_bounded_queue( const allocator_type& a ) :
0350         my_allocator(a), my_capacity(0), my_abort_counter(0), my_queue_representation(nullptr)
0351     {
0352         my_queue_representation = reinterpret_cast<queue_representation_type*>(
0353             r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
0354         my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
0355         queue_allocator_traits::construct(my_allocator, my_queue_representation);
0356         my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
0357 
0358         __TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
0359         __TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
0360         __TBB_ASSERT(is_aligned(&my_queue_representation->tail_counter, max_nfs_size), "alignment error" );
0361         __TBB_ASSERT(is_aligned(&my_queue_representation->array, max_nfs_size), "alignment error" );
0362     }
0363 
0364     template <typename InputIterator>
0365     concurrent_bounded_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type() ) :
0366         concurrent_bounded_queue(a)
0367     {
0368         for (; begin != end; ++begin)
0369             push(*begin);
0370     }
0371 
0372     concurrent_bounded_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() ):
0373         concurrent_bounded_queue(init.begin(), init.end(), alloc)
0374     {}
0375 
0376     concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
0377         concurrent_bounded_queue(a)
0378     {
0379         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
0380     }
0381 
0382     concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
0383         concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
0384     {
0385         my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
0386     }
0387 
0388     // Move constructors
0389     concurrent_bounded_queue( concurrent_bounded_queue&& src ) :
0390         concurrent_bounded_queue(std::move(src.my_allocator))
0391     {
0392         internal_swap(src);
0393     }
0394 
0395     concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a ) :
0396         concurrent_bounded_queue(a)
0397     {
0398         // checking that memory allocated by one instance of allocator can be deallocated
0399         // with another
0400         if (my_allocator == src.my_allocator) {
0401             internal_swap(src);
0402         } else {
0403             // allocators are different => performing per-element move
0404             my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
0405             src.clear();
0406         }
0407     }
0408 
0409     // Destroy queue
0410     ~concurrent_bounded_queue() {
0411         clear();
0412         my_queue_representation->clear(my_allocator);
0413         queue_allocator_traits::destroy(my_allocator, my_queue_representation);
0414         r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
0415                                          sizeof(queue_representation_type));
0416     }
0417 
0418     concurrent_bounded_queue& operator=( const concurrent_bounded_queue& other ) {
0419         //TODO: implement support for std::allocator_traits::propagate_on_container_copy_assignment
0420         if (my_queue_representation != other.my_queue_representation) {
0421             clear();
0422             my_allocator = other.my_allocator;
0423             my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
0424         }
0425         return *this;
0426     }
0427 
0428     concurrent_bounded_queue& operator=( concurrent_bounded_queue&& other ) {
0429         //TODO: implement support for std::allocator_traits::propagate_on_container_move_assignment
0430         if (my_queue_representation != other.my_queue_representation) {
0431             clear();
0432             if (my_allocator == other.my_allocator) {
0433                 internal_swap(other);
0434             } else {
0435                 my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
0436                 other.clear();
0437                 my_allocator = std::move(other.my_allocator);
0438             }
0439         }
0440         return *this;
0441     }
0442 
0443     concurrent_bounded_queue& operator=( std::initializer_list<value_type> init ) {
0444         assign(init);
0445         return *this;
0446     }
0447 
0448     template <typename InputIterator>
0449     void assign( InputIterator first, InputIterator last ) {
0450         concurrent_bounded_queue src(first, last);
0451         clear();
0452         my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
0453     }
0454 
0455     void assign( std::initializer_list<value_type> init ) {
0456         assign(init.begin(), init.end());
0457     }
0458 
0459     void swap ( concurrent_bounded_queue& other ) {
0460         //TODO: implement support for std::allocator_traits::propagate_on_container_swap
0461         __TBB_ASSERT(my_allocator == other.my_allocator, "unequal allocators");
0462         internal_swap(other);
0463     }
0464 
0465     // Enqueue an item at tail of queue.
0466     void push( const T& value ) {
0467         internal_push(value);
0468     }
0469 
0470     void push( T&& value ) {
0471         internal_push(std::move(value));
0472     }
0473 
0474     // Enqueue an item at tail of queue if queue is not already full.
0475     // Does not wait for queue to become not full.
0476     // Returns true if item is pushed; false if queue was already full.
0477     bool try_push( const T& value ) {
0478         return internal_push_if_not_full(value);
0479     }
0480 
0481     bool try_push( T&& value ) {
0482         return internal_push_if_not_full(std::move(value));
0483     }
0484 
0485     template <typename... Args>
0486     void emplace( Args&&... args ) {
0487         internal_push(std::forward<Args>(args)...);
0488     }
0489 
0490     template <typename... Args>
0491     bool try_emplace( Args&&... args ) {
0492         return internal_push_if_not_full(std::forward<Args>(args)...);
0493     }
0494 
0495     // Attempt to dequeue an item from head of queue.
0496     void pop( T& result ) {
0497         internal_pop(&result);
0498     }
0499 
0500     /** Does not wait for item to become available.
0501         Returns true if successful; false otherwise. */
0502     bool try_pop( T& result ) {
0503         return internal_pop_if_present(&result);
0504     }
0505 
0506     void abort() {
0507         internal_abort();
0508     }
0509 
0510     // Return the number of items in the queue; thread unsafe
0511     std::ptrdiff_t size() const {
0512         return my_queue_representation->size();
0513     }
0514 
0515     void set_capacity( size_type new_capacity ) {
0516         std::ptrdiff_t c = new_capacity < 0 ? infinite_capacity : new_capacity;
0517         my_capacity = c;
0518     }
0519 
0520     size_type capacity() const {
0521         return my_capacity;
0522     }
0523 
0524     // Equivalent to size()==0.
0525     __TBB_nodiscard bool empty() const {
0526         return my_queue_representation->empty();
0527     }
0528 
0529     // Clear the queue. not thread-safe.
0530     void clear() {
0531         my_queue_representation->clear(my_allocator);
0532     }
0533 
0534     // Return allocator object
0535     allocator_type get_allocator() const { return my_allocator; }
0536 
0537     //------------------------------------------------------------------------
0538     // The iterators are intended only for debugging.  They are slow and not thread safe.
0539     //------------------------------------------------------------------------
0540 
0541     iterator unsafe_begin() { return concurrent_queue_iterator_provider::get<iterator>(*this); }
0542     iterator unsafe_end() { return iterator(); }
0543     const_iterator unsafe_begin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
0544     const_iterator unsafe_end() const { return const_iterator(); }
0545     const_iterator unsafe_cbegin() const { return concurrent_queue_iterator_provider::get<const_iterator>(*this); }
0546     const_iterator unsafe_cend() const { return const_iterator(); }
0547 
0548 private:
0549     void internal_swap( concurrent_bounded_queue& src ) {
0550         std::swap(my_queue_representation, src.my_queue_representation);
0551         std::swap(my_monitors, src.my_monitors);
0552     }
0553 
0554     static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
0555 
0556     template <typename... Args>
0557     void internal_push( Args&&... args ) {
0558         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
0559         ticket_type ticket = my_queue_representation->tail_counter++;
0560         std::ptrdiff_t target = ticket - my_capacity;
0561 
0562         if (static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target) { // queue is full
0563             auto pred = [&] {
0564                 if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
0565                     throw_exception(exception_id::user_abort);
0566                 }
0567 
0568                 return static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) <= target;
0569             };
0570 
0571             try_call( [&] {
0572                 internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
0573             }).on_exception( [&] {
0574                 my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator);
0575             });
0576 
0577         }
0578         __TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
0579         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
0580         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
0581     }
0582 
0583     template <typename... Args>
0584     bool internal_push_if_not_full( Args&&... args ) {
0585         ticket_type ticket = my_queue_representation->tail_counter.load(std::memory_order_relaxed);
0586         do {
0587             if (static_cast<std::ptrdiff_t>(ticket - my_queue_representation->head_counter.load(std::memory_order_relaxed)) >= my_capacity) {
0588                 // Queue is full
0589                 return false;
0590             }
0591             // Queue had empty slot with ticket k when we looked. Attempt to claim that slot.
0592             // Another thread claimed the slot, so retry.
0593         } while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
0594 
0595         my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
0596         r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
0597         return true;
0598     }
0599 
0600     void internal_pop( void* dst ) {
0601         std::ptrdiff_t target;
0602         // This loop is a single pop operation; abort_counter should not be re-read inside
0603         unsigned old_abort_counter = my_abort_counter.load(std::memory_order_relaxed);
0604 
0605         do {
0606             target = my_queue_representation->head_counter++;
0607             if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target) {
0608                 auto pred = [&] {
0609                     if (my_abort_counter.load(std::memory_order_relaxed) != old_abort_counter) {
0610                             throw_exception(exception_id::user_abort);
0611                     }
0612 
0613                     return static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) <= target;
0614                 };
0615 
0616                 try_call( [&] {
0617                     internal_wait(my_monitors, cbq_items_avail_tag, target, pred);
0618                 }).on_exception( [&] {
0619                     my_queue_representation->head_counter--;
0620                 });
0621             }
0622             __TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
0623         } while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator));
0624 
0625         r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
0626     }
0627 
0628     bool internal_pop_if_present( void* dst ) {
0629         bool present{};
0630         ticket_type ticket{};
0631         std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);
0632 
0633         if (present) {
0634             r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
0635         }
0636         return present;
0637     }
0638 
0639     void internal_abort() {
0640         ++my_abort_counter;
0641         r1::abort_bounded_queue_monitors(my_monitors);
0642     }
0643 
0644     static void copy_construct_item(T* location, const void* src) {
0645         // TODO: use allocator_traits for copy construction
0646         new (location) value_type(*static_cast<const value_type*>(src));
0647     }
0648 
0649     static void move_construct_item(T* location, const void* src) {
0650         // TODO: use allocator_traits for move construction
0651         new (location) value_type(std::move(*static_cast<value_type*>(const_cast<void*>(src))));
0652     }
0653 
0654     template <typename Container, typename Value, typename A>
0655     friend class concurrent_queue_iterator;
0656 
0657     queue_allocator_type my_allocator;
0658     std::ptrdiff_t my_capacity;
0659     std::atomic<unsigned> my_abort_counter;
0660     queue_representation_type* my_queue_representation;
0661 
0662     r1::concurrent_monitor* my_monitors;
0663 
0664     friend void swap( concurrent_bounded_queue& lhs, concurrent_bounded_queue& rhs ) {
0665         lhs.swap(rhs);
0666     }
0667 
0668     friend bool operator==( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
0669         return lhs.size() == rhs.size() && std::equal(lhs.unsafe_begin(), lhs.unsafe_end(), rhs.unsafe_begin());
0670     }
0671 
0672 #if !__TBB_CPP20_COMPARISONS_PRESENT
0673     friend bool operator!=( const concurrent_bounded_queue& lhs, const concurrent_bounded_queue& rhs ) {
0674         return !(lhs == rhs);
0675     }
0676 #endif // __TBB_CPP20_COMPARISONS_PRESENT
0677 }; // class concurrent_bounded_queue
0678 
0679 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
0680 // Deduction guide for the constructor from two iterators
0681 template <typename It, typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>>
0682 concurrent_bounded_queue( It, It, Alloc = Alloc() )
0683 -> concurrent_bounded_queue<iterator_value_t<It>, Alloc>;
0684 
0685 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
0686 
0687 } //namespace d2
0688 } // namespace detail
0689 
0690 inline namespace v1 {
0691 
0692 using detail::d2::concurrent_queue;
0693 using detail::d2::concurrent_bounded_queue;
0694 using detail::r1::user_abort;
0695 using detail::r1::bad_last_alloc;
0696 
0697 } // inline namespace v1
0698 } // namespace tbb
0699 
0700 #endif // __TBB_concurrent_queue_H