Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-30 08:46:14

0001 /*
0002     Copyright (c) 2005-2022 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_detail__concurrent_queue_base_H
0018 #define __TBB_detail__concurrent_queue_base_H
0019 
0020 #include "_utils.h"
0021 #include "_exception.h"
0022 #include "_machine.h"
0023 #include "_allocator_traits.h"
0024 
0025 #include "../profiling.h"
0026 #include "../spin_mutex.h"
0027 #include "../cache_aligned_allocator.h"
0028 
0029 #include <atomic>
0030 
0031 namespace tbb {
0032 namespace detail {
0033 namespace d2 {
0034 
0035 using ticket_type = std::size_t;
0036 
0037 template <typename Page>
0038 inline bool is_valid_page(const Page p) {
0039     return reinterpret_cast<std::uintptr_t>(p) > 1;
0040 }
0041 
0042 template <typename T, typename Allocator>
0043 struct concurrent_queue_rep;
0044 
0045 template <typename Container, typename T, typename Allocator>
0046 class micro_queue_pop_finalizer;
0047 
0048 #if _MSC_VER && !defined(__INTEL_COMPILER)
0049 // unary minus operator applied to unsigned type, result still unsigned
0050 #pragma warning( push )
0051 #pragma warning( disable: 4146 )
0052 #endif
0053 
0054 // A queue using simple locking.
0055 // For efficiency, this class has no constructor.
0056 // The caller is expected to zero-initialize it.
0057 template <typename T, typename Allocator>
0058 class micro_queue {
0059 private:
0060     using queue_rep_type = concurrent_queue_rep<T, Allocator>;
0061     using self_type = micro_queue<T, Allocator>;
0062 public:
0063     using size_type = std::size_t;
0064     using value_type = T;
0065     using reference = value_type&;
0066     using const_reference = const value_type&;
0067 
0068     using allocator_type = Allocator;
0069     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
0070     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>;
0071 
0072     static constexpr size_type item_size = sizeof(T);
0073     static constexpr size_type items_per_page = item_size <=   8 ? 32 :
0074                                                 item_size <=  16 ? 16 :
0075                                                 item_size <=  32 ?  8 :
0076                                                 item_size <=  64 ?  4 :
0077                                                 item_size <= 128 ?  2 : 1;
0078 
0079     struct padded_page {
0080         padded_page() {}
0081         ~padded_page() {}
0082 
0083         reference operator[] (std::size_t index) {
0084             __TBB_ASSERT(index < items_per_page, "Index out of range");
0085             return items[index];
0086         }
0087 
0088         const_reference operator[] (std::size_t index) const {
0089             __TBB_ASSERT(index < items_per_page, "Index out of range");
0090             return items[index];
0091         }
0092 
0093         padded_page* next{ nullptr };
0094         std::atomic<std::uintptr_t> mask{};
0095 
0096         union {
0097             value_type items[items_per_page];
0098         };
0099     }; // struct padded_page
0100 
0101     using page_allocator_type = typename allocator_traits_type::template rebind_alloc<padded_page>;
0102 protected:
0103     using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
0104 
0105 public:
0106     using item_constructor_type = void (*)(value_type* location, const void* src);
0107     micro_queue() = default;
0108     micro_queue( const micro_queue& ) = delete;
0109     micro_queue& operator=( const micro_queue& ) = delete;
0110 
0111     size_type prepare_page( ticket_type k, queue_rep_type& base, page_allocator_type page_allocator,
0112                             padded_page*& p ) {
0113         __TBB_ASSERT(p == nullptr, "Invalid page argument for prepare_page");
0114         k &= -queue_rep_type::n_queue;
0115         size_type index = modulo_power_of_two(k / queue_rep_type::n_queue, items_per_page);
0116         if (!index) {
0117             try_call( [&] {
0118                 p = page_allocator_traits::allocate(page_allocator, 1);
0119             }).on_exception( [&] {
0120                 ++base.n_invalid_entries;
0121                 invalidate_page( k );
0122             });
0123             page_allocator_traits::construct(page_allocator, p);
0124         }
0125 
0126         spin_wait_until_my_turn(tail_counter, k, base);
0127         d1::call_itt_notify(d1::acquired, &tail_counter);
0128 
0129         if (p) {
0130             spin_mutex::scoped_lock lock( page_mutex );
0131             padded_page* q = tail_page.load(std::memory_order_relaxed);
0132             if (is_valid_page(q)) {
0133                 q->next = p;
0134             } else {
0135                 head_page.store(p, std::memory_order_relaxed);
0136             }
0137             tail_page.store(p, std::memory_order_relaxed);
0138         } else {
0139             p = tail_page.load(std::memory_order_relaxed);
0140         }
0141         return index;
0142     }
0143 
0144     template<typename... Args>
0145     void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args )
0146     {
0147         padded_page* p = nullptr;
0148         page_allocator_type page_allocator(allocator);
0149         size_type index = prepare_page(k, base, page_allocator, p);
0150         __TBB_ASSERT(p != nullptr, "Page was not prepared");
0151 
0152         // try_call API is not convenient here due to broken
0153         // variadic capture on GCC 4.8.5
0154         auto value_guard = make_raii_guard([&] {
0155             ++base.n_invalid_entries;
0156             d1::call_itt_notify(d1::releasing, &tail_counter);
0157             tail_counter.fetch_add(queue_rep_type::n_queue);
0158         });
0159 
0160         page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...);
0161         // If no exception was thrown, mark item as present.
0162         p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed);
0163         d1::call_itt_notify(d1::releasing, &tail_counter);
0164 
0165         value_guard.dismiss();
0166         tail_counter.fetch_add(queue_rep_type::n_queue);
0167     }
0168 
0169     void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
0170         padded_page* p = nullptr;
0171         prepare_page(k, base, allocator, p);
0172         ++base.n_invalid_entries;
0173         tail_counter.fetch_add(queue_rep_type::n_queue);
0174     }
0175 
0176     bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
0177         k &= -queue_rep_type::n_queue;
0178         spin_wait_until_eq(head_counter, k);
0179         d1::call_itt_notify(d1::acquired, &head_counter);
0180         spin_wait_while_eq(tail_counter, k);
0181         d1::call_itt_notify(d1::acquired, &tail_counter);
0182         padded_page *p = head_page.load(std::memory_order_relaxed);
0183         __TBB_ASSERT( p, nullptr );
0184         size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
0185         bool success = false;
0186         {
0187             page_allocator_type page_allocator(allocator);
0188             micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator,
0189                 k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
0190             if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
0191                 success = true;
0192                 assign_and_destroy_item(dst, *p, index);
0193             } else {
0194                 --base.n_invalid_entries;
0195             }
0196         }
0197         return success;
0198     }
0199 
0200     micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator,
0201         item_constructor_type construct_item )
0202     {
0203         head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
0204         tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
0205 
0206         const padded_page* srcp = src.head_page.load(std::memory_order_relaxed);
0207         if( is_valid_page(srcp) ) {
0208             ticket_type g_index = head_counter.load(std::memory_order_relaxed);
0209             size_type n_items  = (tail_counter.load(std::memory_order_relaxed) - head_counter.load(std::memory_order_relaxed))
0210                 / queue_rep_type::n_queue;
0211             size_type index = modulo_power_of_two(head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
0212             size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page;
0213 
0214             try_call( [&] {
0215                 head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed);
0216             }).on_exception( [&] {
0217                 head_counter.store(0, std::memory_order_relaxed);
0218                 tail_counter.store(0, std::memory_order_relaxed);
0219             });
0220             padded_page* cur_page = head_page.load(std::memory_order_relaxed);
0221 
0222             try_call( [&] {
0223                 if (srcp != src.tail_page.load(std::memory_order_relaxed)) {
0224                     for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) {
0225                         cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item );
0226                         cur_page = cur_page->next;
0227                     }
0228 
0229                     __TBB_ASSERT(srcp == src.tail_page.load(std::memory_order_relaxed), nullptr );
0230                     size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
0231                     if( last_index==0 ) last_index = items_per_page;
0232 
0233                     cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item );
0234                     cur_page = cur_page->next;
0235                 }
0236                 tail_page.store(cur_page, std::memory_order_relaxed);
0237             }).on_exception( [&] {
0238                 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
0239                 tail_page.store(invalid_page, std::memory_order_relaxed);
0240             });
0241         } else {
0242             head_page.store(nullptr, std::memory_order_relaxed);
0243             tail_page.store(nullptr, std::memory_order_relaxed);
0244         }
0245         return *this;
0246     }
0247 
0248     padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page,
0249         size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item )
0250     {
0251         page_allocator_type page_allocator(allocator);
0252         padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1);
0253         new_page->next = nullptr;
0254         new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
0255         for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
0256             if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
0257                 copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
0258             }
0259         }
0260         return new_page;
0261     }
0262 
0263     void invalidate_page( ticket_type k )  {
0264         // Append an invalid page at address 1 so that no more pushes are allowed.
0265         padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
0266         {
0267             spin_mutex::scoped_lock lock( page_mutex );
0268             tail_counter.store(k + queue_rep_type::n_queue + 1, std::memory_order_relaxed);
0269             padded_page* q = tail_page.load(std::memory_order_relaxed);
0270             if (is_valid_page(q)) {
0271                 q->next = invalid_page;
0272             } else {
0273                 head_page.store(invalid_page, std::memory_order_relaxed);
0274             }
0275             tail_page.store(invalid_page, std::memory_order_relaxed);
0276         }
0277     }
0278 
0279     padded_page* get_head_page() {
0280         return head_page.load(std::memory_order_relaxed);
0281     }
0282 
0283     void clear(queue_allocator_type& allocator, padded_page* new_head = nullptr, padded_page* new_tail = nullptr) {
0284         padded_page* curr_page = get_head_page();
0285         size_type index = (head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page;
0286         page_allocator_type page_allocator(allocator);
0287 
0288         while (curr_page && is_valid_page(curr_page)) {
0289             while (index != items_per_page) {
0290                 if (curr_page->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
0291                     page_allocator_traits::destroy(page_allocator, &curr_page->operator[](index));
0292                 }
0293                 ++index;
0294             }
0295 
0296             index = 0;
0297             padded_page* next_page = curr_page->next;
0298             page_allocator_traits::destroy(page_allocator, curr_page);
0299             page_allocator_traits::deallocate(page_allocator, curr_page, 1);
0300             curr_page = next_page;
0301         }
0302         head_counter.store(0, std::memory_order_relaxed);
0303         tail_counter.store(0, std::memory_order_relaxed);
0304         head_page.store(new_head, std::memory_order_relaxed);
0305         tail_page.store(new_tail, std::memory_order_relaxed);
0306     }
0307 
0308     void clear_and_invalidate(queue_allocator_type& allocator) {
0309         padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
0310         clear(allocator, invalid_page, invalid_page);
0311     }
0312 
0313 private:
0314     // template <typename U, typename A>
0315     friend class micro_queue_pop_finalizer<self_type, value_type, page_allocator_type>;
0316 
0317     // Class used to ensure exception-safety of method "pop"
0318     class destroyer  {
0319         value_type& my_value;
0320     public:
0321         destroyer( reference value ) : my_value(value) {}
0322         destroyer( const destroyer& ) = delete;
0323         destroyer& operator=( const destroyer& ) = delete;
0324         ~destroyer() {my_value.~T();}
0325     }; // class destroyer
0326 
0327     void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
0328         item_constructor_type construct_item )
0329     {
0330         auto& src_item = src[sindex];
0331         construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
0332     }
0333 
0334     void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
0335         auto& from = src[index];
0336         destroyer d(from);
0337         *static_cast<T*>(dst) = std::move(from);
0338     }
0339 
0340     void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
0341         for (atomic_backoff b{};; b.pause()) {
0342             ticket_type c = counter.load(std::memory_order_acquire);
0343             if (c == k) return;
0344             else if (c & 1) {
0345                 ++rb.n_invalid_entries;
0346                 throw_exception( exception_id::bad_last_alloc);
0347             }
0348         }
0349     }
0350 
0351     std::atomic<padded_page*> head_page{};
0352     std::atomic<ticket_type> head_counter{};
0353 
0354     std::atomic<padded_page*> tail_page{};
0355     std::atomic<ticket_type> tail_counter{};
0356 
0357     spin_mutex page_mutex{};
0358 }; // class micro_queue
0359 
0360 #if _MSC_VER && !defined(__INTEL_COMPILER)
0361 #pragma warning( pop )
0362 #endif // warning 4146 is back
0363 
0364 template <typename Container, typename T, typename Allocator>
0365 class micro_queue_pop_finalizer {
0366 public:
0367     using padded_page = typename Container::padded_page;
0368     using allocator_type = Allocator;
0369     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
0370 
0371     micro_queue_pop_finalizer( Container& queue, Allocator& alloc, ticket_type k, padded_page* p ) :
0372         my_ticket_type(k), my_queue(queue), my_page(p), allocator(alloc)
0373     {}
0374 
0375     micro_queue_pop_finalizer( const micro_queue_pop_finalizer& ) = delete;
0376     micro_queue_pop_finalizer& operator=( const micro_queue_pop_finalizer& ) = delete;
0377 
0378     ~micro_queue_pop_finalizer() {
0379         padded_page* p = my_page;
0380         if( is_valid_page(p) ) {
0381             spin_mutex::scoped_lock lock( my_queue.page_mutex );
0382             padded_page* q = p->next;
0383             my_queue.head_page.store(q, std::memory_order_relaxed);
0384             if( !is_valid_page(q) ) {
0385                 my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
0386             }
0387         }
0388         my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
0389         if ( is_valid_page(p) ) {
0390             allocator_traits_type::destroy(allocator, static_cast<padded_page*>(p));
0391             allocator_traits_type::deallocate(allocator, static_cast<padded_page*>(p), 1);
0392         }
0393     }
0394 private:
0395     ticket_type my_ticket_type;
0396     Container& my_queue;
0397     padded_page* my_page;
0398     Allocator& allocator;
0399 }; // class micro_queue_pop_finalizer
0400 
0401 #if _MSC_VER && !defined(__INTEL_COMPILER)
0402 // structure was padded due to alignment specifier
0403 #pragma warning( push )
0404 #pragma warning( disable: 4324 )
0405 #endif
0406 
0407 template <typename T, typename Allocator>
0408 struct concurrent_queue_rep {
0409     using self_type = concurrent_queue_rep<T, Allocator>;
0410     using size_type = std::size_t;
0411     using micro_queue_type = micro_queue<T, Allocator>;
0412     using allocator_type = Allocator;
0413     using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
0414     using padded_page = typename micro_queue_type::padded_page;
0415     using page_allocator_type = typename micro_queue_type::page_allocator_type;
0416     using item_constructor_type = typename micro_queue_type::item_constructor_type;
0417 private:
0418     using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
0419     using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<self_type>;
0420 
0421 public:
0422     // must be power of 2
0423     static constexpr size_type n_queue = 8;
0424     // Approximately n_queue/golden ratio
0425     static constexpr size_type phi = 3;
0426     static constexpr size_type item_size = micro_queue_type::item_size;
0427     static constexpr size_type items_per_page = micro_queue_type::items_per_page;
0428 
0429     concurrent_queue_rep() {}
0430 
0431     concurrent_queue_rep( const concurrent_queue_rep& ) = delete;
0432     concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete;
0433 
0434     void clear( queue_allocator_type& alloc ) {
0435         for (size_type index = 0; index < n_queue; ++index) {
0436             array[index].clear(alloc);
0437         }
0438         head_counter.store(0, std::memory_order_relaxed);
0439         tail_counter.store(0, std::memory_order_relaxed);
0440         n_invalid_entries.store(0, std::memory_order_relaxed);
0441     }
0442 
0443     void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
0444         head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
0445         tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
0446         n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed);
0447 
0448         // copy or move micro_queues
0449         size_type queue_idx = 0;
0450         try_call( [&] {
0451             for (; queue_idx < n_queue; ++queue_idx) {
0452                 array[queue_idx].assign(src.array[queue_idx], alloc, construct_item);
0453             }
0454         }).on_exception( [&] {
0455             for (size_type i = 0; i < queue_idx + 1; ++i) {
0456                 array[i].clear_and_invalidate(alloc);
0457             }
0458             head_counter.store(0, std::memory_order_relaxed);
0459             tail_counter.store(0, std::memory_order_relaxed);
0460             n_invalid_entries.store(0, std::memory_order_relaxed);
0461         });
0462 
0463         __TBB_ASSERT(head_counter.load(std::memory_order_relaxed) == src.head_counter.load(std::memory_order_relaxed) &&
0464                      tail_counter.load(std::memory_order_relaxed) == src.tail_counter.load(std::memory_order_relaxed),
0465                      "the source concurrent queue should not be concurrently modified." );
0466     }
0467 
0468     bool empty() const {
0469         ticket_type tc = tail_counter.load(std::memory_order_acquire);
0470         ticket_type hc = head_counter.load(std::memory_order_relaxed);
0471         // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
0472         return tc == tail_counter.load(std::memory_order_relaxed) &&
0473                std::ptrdiff_t(tc - hc - n_invalid_entries.load(std::memory_order_relaxed)) <= 0;
0474     }
0475 
0476     std::ptrdiff_t size() const {
0477         __TBB_ASSERT(sizeof(std::ptrdiff_t) <= sizeof(size_type), nullptr);
0478         std::ptrdiff_t hc = head_counter.load(std::memory_order_acquire);
0479         std::ptrdiff_t tc = tail_counter.load(std::memory_order_relaxed);
0480         std::ptrdiff_t nie = n_invalid_entries.load(std::memory_order_relaxed);
0481 
0482         return tc - hc - nie;
0483     }
0484 
0485     friend class micro_queue<T, Allocator>;
0486 
0487     // Map ticket_type to an array index
0488     static size_type index( ticket_type k ) {
0489         return k * phi % n_queue;
0490     }
0491 
0492     micro_queue_type& choose( ticket_type k ) {
0493         // The formula here approximates LRU in a cache-oblivious way.
0494         return array[index(k)];
0495     }
0496 
0497     alignas(max_nfs_size) micro_queue_type array[n_queue];
0498 
0499     alignas(max_nfs_size) std::atomic<ticket_type> head_counter{};
0500     alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{};
0501     alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{};
0502 }; // class concurrent_queue_rep
0503 
0504 #if _MSC_VER && !defined(__INTEL_COMPILER)
0505 #pragma warning( pop )
0506 #endif
0507 
0508 template <typename Value, typename Allocator>
0509 class concurrent_queue_iterator_base {
0510     using queue_rep_type = concurrent_queue_rep<Value, Allocator>;
0511     using padded_page = typename queue_rep_type::padded_page;
0512 protected:
0513     concurrent_queue_iterator_base() = default;
0514 
0515     concurrent_queue_iterator_base( const concurrent_queue_iterator_base& other ) {
0516         assign(other);
0517     }
0518 
0519     concurrent_queue_iterator_base( queue_rep_type* queue_rep )
0520         : my_queue_rep(queue_rep),
0521           my_head_counter(my_queue_rep->head_counter.load(std::memory_order_relaxed))
0522     {
0523         for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
0524             my_array[i] = my_queue_rep->array[i].get_head_page();
0525         }
0526 
0527         if (!get_item(my_item, my_head_counter)) advance();
0528     }
0529 
0530     void assign( const concurrent_queue_iterator_base& other ) {
0531         my_item = other.my_item;
0532         my_queue_rep = other.my_queue_rep;
0533 
0534         if (my_queue_rep != nullptr) {
0535             my_head_counter = other.my_head_counter;
0536 
0537             for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
0538                 my_array[i] = other.my_array[i];
0539             }
0540         }
0541     }
0542 
0543     void advance() {
0544         __TBB_ASSERT(my_item, "Attempt to increment iterator past end of the queue");
0545         std::size_t k = my_head_counter;
0546 #if TBB_USE_ASSERT
0547         Value* tmp;
0548         get_item(tmp, k);
0549         __TBB_ASSERT(my_item == tmp, nullptr);
0550 #endif
0551         std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
0552         if (i == my_queue_rep->items_per_page - 1) {
0553             padded_page*& root = my_array[queue_rep_type::index(k)];
0554             root = root->next;
0555         }
0556         // Advance k
0557         my_head_counter = ++k;
0558         if (!get_item(my_item, k)) advance();
0559     }
0560 
0561     concurrent_queue_iterator_base& operator=( const concurrent_queue_iterator_base& other ) {
0562         this->assign(other);
0563         return *this;
0564     }
0565 
0566     bool get_item( Value*& item, std::size_t k ) {
0567         if (k == my_queue_rep->tail_counter.load(std::memory_order_relaxed)) {
0568             item = nullptr;
0569             return true;
0570         } else {
0571             padded_page* p = my_array[queue_rep_type::index(k)];
0572             __TBB_ASSERT(p, nullptr);
0573             std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
0574             item = &(*p)[i];
0575             return (p->mask & uintptr_t(1) << i) != 0;
0576         }
0577     }
0578 
0579     Value* my_item{ nullptr };
0580     queue_rep_type* my_queue_rep{ nullptr };
0581     ticket_type my_head_counter{};
0582     padded_page* my_array[queue_rep_type::n_queue]{};
0583 }; // class concurrent_queue_iterator_base
0584 
0585 struct concurrent_queue_iterator_provider {
0586     template <typename Iterator, typename Container>
0587     static Iterator get( const Container& container ) {
0588         return Iterator(container);
0589     }
0590 }; // struct concurrent_queue_iterator_provider
0591 
0592 template <typename Container, typename Value, typename Allocator>
0593 class concurrent_queue_iterator : public concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator> {
0594     using base_type = concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator>;
0595 public:
0596     using value_type = Value;
0597     using pointer = value_type*;
0598     using reference = value_type&;
0599     using difference_type = std::ptrdiff_t;
0600     using iterator_category = std::forward_iterator_tag;
0601 
0602     concurrent_queue_iterator() = default;
0603 
0604     /** If Value==Container::value_type, then this routine is the copy constructor.
0605         If Value==const Container::value_type, then this routine is a conversion constructor. */
0606     concurrent_queue_iterator( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other )
0607         : base_type(other) {}
0608 
0609 private:
0610     concurrent_queue_iterator( const Container& container )
0611         : base_type(container.my_queue_representation) {}
0612 public:
0613     concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) {
0614         this->assign(other);
0615         return *this;
0616     }
0617 
0618     reference operator*() const {
0619         return *static_cast<pointer>(this->my_item);
0620     }
0621 
0622     pointer operator->() const { return &operator*(); }
0623 
0624     concurrent_queue_iterator& operator++() {
0625         this->advance();
0626         return *this;
0627     }
0628 
0629     concurrent_queue_iterator operator++(int) {
0630         concurrent_queue_iterator tmp = *this;
0631         ++*this;
0632         return tmp;
0633     }
0634 
0635     friend bool operator==( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
0636         return lhs.my_item == rhs.my_item;
0637     }
0638 
0639     friend bool operator!=( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
0640         return lhs.my_item != rhs.my_item;
0641     }
0642 private:
0643     friend struct concurrent_queue_iterator_provider;
0644 }; // class concurrent_queue_iterator
0645 
0646 } // namespace d2
0647 } // namespace detail
0648 } // tbb
0649 
0650 #endif // __TBB_detail__concurrent_queue_base_H