File indexing completed on 2025-07-30 08:46:14
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_detail__concurrent_queue_base_H
0018 #define __TBB_detail__concurrent_queue_base_H
0019
0020 #include "_utils.h"
0021 #include "_exception.h"
0022 #include "_machine.h"
0023 #include "_allocator_traits.h"
0024
0025 #include "../profiling.h"
0026 #include "../spin_mutex.h"
0027 #include "../cache_aligned_allocator.h"
0028
0029 #include <atomic>
0030
0031 namespace tbb {
0032 namespace detail {
0033 namespace d2 {
0034
0035 using ticket_type = std::size_t;
0036
0037 template <typename Page>
0038 inline bool is_valid_page(const Page p) {
0039 return reinterpret_cast<std::uintptr_t>(p) > 1;
0040 }
0041
0042 template <typename T, typename Allocator>
0043 struct concurrent_queue_rep;
0044
0045 template <typename Container, typename T, typename Allocator>
0046 class micro_queue_pop_finalizer;
0047
0048 #if _MSC_VER && !defined(__INTEL_COMPILER)
0049
0050 #pragma warning( push )
0051 #pragma warning( disable: 4146 )
0052 #endif
0053
0054
0055
0056
0057 template <typename T, typename Allocator>
0058 class micro_queue {
0059 private:
0060 using queue_rep_type = concurrent_queue_rep<T, Allocator>;
0061 using self_type = micro_queue<T, Allocator>;
0062 public:
0063 using size_type = std::size_t;
0064 using value_type = T;
0065 using reference = value_type&;
0066 using const_reference = const value_type&;
0067
0068 using allocator_type = Allocator;
0069 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
0070 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>;
0071
0072 static constexpr size_type item_size = sizeof(T);
0073 static constexpr size_type items_per_page = item_size <= 8 ? 32 :
0074 item_size <= 16 ? 16 :
0075 item_size <= 32 ? 8 :
0076 item_size <= 64 ? 4 :
0077 item_size <= 128 ? 2 : 1;
0078
0079 struct padded_page {
0080 padded_page() {}
0081 ~padded_page() {}
0082
0083 reference operator[] (std::size_t index) {
0084 __TBB_ASSERT(index < items_per_page, "Index out of range");
0085 return items[index];
0086 }
0087
0088 const_reference operator[] (std::size_t index) const {
0089 __TBB_ASSERT(index < items_per_page, "Index out of range");
0090 return items[index];
0091 }
0092
0093 padded_page* next{ nullptr };
0094 std::atomic<std::uintptr_t> mask{};
0095
0096 union {
0097 value_type items[items_per_page];
0098 };
0099 };
0100
0101 using page_allocator_type = typename allocator_traits_type::template rebind_alloc<padded_page>;
0102 protected:
0103 using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
0104
0105 public:
0106 using item_constructor_type = void (*)(value_type* location, const void* src);
0107 micro_queue() = default;
0108 micro_queue( const micro_queue& ) = delete;
0109 micro_queue& operator=( const micro_queue& ) = delete;
0110
0111 size_type prepare_page( ticket_type k, queue_rep_type& base, page_allocator_type page_allocator,
0112 padded_page*& p ) {
0113 __TBB_ASSERT(p == nullptr, "Invalid page argument for prepare_page");
0114 k &= -queue_rep_type::n_queue;
0115 size_type index = modulo_power_of_two(k / queue_rep_type::n_queue, items_per_page);
0116 if (!index) {
0117 try_call( [&] {
0118 p = page_allocator_traits::allocate(page_allocator, 1);
0119 }).on_exception( [&] {
0120 ++base.n_invalid_entries;
0121 invalidate_page( k );
0122 });
0123 page_allocator_traits::construct(page_allocator, p);
0124 }
0125
0126 spin_wait_until_my_turn(tail_counter, k, base);
0127 d1::call_itt_notify(d1::acquired, &tail_counter);
0128
0129 if (p) {
0130 spin_mutex::scoped_lock lock( page_mutex );
0131 padded_page* q = tail_page.load(std::memory_order_relaxed);
0132 if (is_valid_page(q)) {
0133 q->next = p;
0134 } else {
0135 head_page.store(p, std::memory_order_relaxed);
0136 }
0137 tail_page.store(p, std::memory_order_relaxed);
0138 } else {
0139 p = tail_page.load(std::memory_order_relaxed);
0140 }
0141 return index;
0142 }
0143
0144 template<typename... Args>
0145 void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args )
0146 {
0147 padded_page* p = nullptr;
0148 page_allocator_type page_allocator(allocator);
0149 size_type index = prepare_page(k, base, page_allocator, p);
0150 __TBB_ASSERT(p != nullptr, "Page was not prepared");
0151
0152
0153
0154 auto value_guard = make_raii_guard([&] {
0155 ++base.n_invalid_entries;
0156 d1::call_itt_notify(d1::releasing, &tail_counter);
0157 tail_counter.fetch_add(queue_rep_type::n_queue);
0158 });
0159
0160 page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...);
0161
0162 p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed);
0163 d1::call_itt_notify(d1::releasing, &tail_counter);
0164
0165 value_guard.dismiss();
0166 tail_counter.fetch_add(queue_rep_type::n_queue);
0167 }
0168
0169 void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
0170 padded_page* p = nullptr;
0171 prepare_page(k, base, allocator, p);
0172 ++base.n_invalid_entries;
0173 tail_counter.fetch_add(queue_rep_type::n_queue);
0174 }
0175
0176 bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
0177 k &= -queue_rep_type::n_queue;
0178 spin_wait_until_eq(head_counter, k);
0179 d1::call_itt_notify(d1::acquired, &head_counter);
0180 spin_wait_while_eq(tail_counter, k);
0181 d1::call_itt_notify(d1::acquired, &tail_counter);
0182 padded_page *p = head_page.load(std::memory_order_relaxed);
0183 __TBB_ASSERT( p, nullptr );
0184 size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
0185 bool success = false;
0186 {
0187 page_allocator_type page_allocator(allocator);
0188 micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator,
0189 k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
0190 if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
0191 success = true;
0192 assign_and_destroy_item(dst, *p, index);
0193 } else {
0194 --base.n_invalid_entries;
0195 }
0196 }
0197 return success;
0198 }
0199
0200 micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator,
0201 item_constructor_type construct_item )
0202 {
0203 head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
0204 tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
0205
0206 const padded_page* srcp = src.head_page.load(std::memory_order_relaxed);
0207 if( is_valid_page(srcp) ) {
0208 ticket_type g_index = head_counter.load(std::memory_order_relaxed);
0209 size_type n_items = (tail_counter.load(std::memory_order_relaxed) - head_counter.load(std::memory_order_relaxed))
0210 / queue_rep_type::n_queue;
0211 size_type index = modulo_power_of_two(head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
0212 size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page;
0213
0214 try_call( [&] {
0215 head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed);
0216 }).on_exception( [&] {
0217 head_counter.store(0, std::memory_order_relaxed);
0218 tail_counter.store(0, std::memory_order_relaxed);
0219 });
0220 padded_page* cur_page = head_page.load(std::memory_order_relaxed);
0221
0222 try_call( [&] {
0223 if (srcp != src.tail_page.load(std::memory_order_relaxed)) {
0224 for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) {
0225 cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item );
0226 cur_page = cur_page->next;
0227 }
0228
0229 __TBB_ASSERT(srcp == src.tail_page.load(std::memory_order_relaxed), nullptr );
0230 size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
0231 if( last_index==0 ) last_index = items_per_page;
0232
0233 cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item );
0234 cur_page = cur_page->next;
0235 }
0236 tail_page.store(cur_page, std::memory_order_relaxed);
0237 }).on_exception( [&] {
0238 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
0239 tail_page.store(invalid_page, std::memory_order_relaxed);
0240 });
0241 } else {
0242 head_page.store(nullptr, std::memory_order_relaxed);
0243 tail_page.store(nullptr, std::memory_order_relaxed);
0244 }
0245 return *this;
0246 }
0247
0248 padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page,
0249 size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item )
0250 {
0251 page_allocator_type page_allocator(allocator);
0252 padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1);
0253 new_page->next = nullptr;
0254 new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
0255 for (; begin_in_page!=end_in_page; ++begin_in_page, ++g_index) {
0256 if (new_page->mask.load(std::memory_order_relaxed) & uintptr_t(1) << begin_in_page) {
0257 copy_item(*new_page, begin_in_page, *src_page, begin_in_page, construct_item);
0258 }
0259 }
0260 return new_page;
0261 }
0262
0263 void invalidate_page( ticket_type k ) {
0264
0265 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
0266 {
0267 spin_mutex::scoped_lock lock( page_mutex );
0268 tail_counter.store(k + queue_rep_type::n_queue + 1, std::memory_order_relaxed);
0269 padded_page* q = tail_page.load(std::memory_order_relaxed);
0270 if (is_valid_page(q)) {
0271 q->next = invalid_page;
0272 } else {
0273 head_page.store(invalid_page, std::memory_order_relaxed);
0274 }
0275 tail_page.store(invalid_page, std::memory_order_relaxed);
0276 }
0277 }
0278
0279 padded_page* get_head_page() {
0280 return head_page.load(std::memory_order_relaxed);
0281 }
0282
0283 void clear(queue_allocator_type& allocator, padded_page* new_head = nullptr, padded_page* new_tail = nullptr) {
0284 padded_page* curr_page = get_head_page();
0285 size_type index = (head_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page;
0286 page_allocator_type page_allocator(allocator);
0287
0288 while (curr_page && is_valid_page(curr_page)) {
0289 while (index != items_per_page) {
0290 if (curr_page->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
0291 page_allocator_traits::destroy(page_allocator, &curr_page->operator[](index));
0292 }
0293 ++index;
0294 }
0295
0296 index = 0;
0297 padded_page* next_page = curr_page->next;
0298 page_allocator_traits::destroy(page_allocator, curr_page);
0299 page_allocator_traits::deallocate(page_allocator, curr_page, 1);
0300 curr_page = next_page;
0301 }
0302 head_counter.store(0, std::memory_order_relaxed);
0303 tail_counter.store(0, std::memory_order_relaxed);
0304 head_page.store(new_head, std::memory_order_relaxed);
0305 tail_page.store(new_tail, std::memory_order_relaxed);
0306 }
0307
0308 void clear_and_invalidate(queue_allocator_type& allocator) {
0309 padded_page* invalid_page = reinterpret_cast<padded_page*>(std::uintptr_t(1));
0310 clear(allocator, invalid_page, invalid_page);
0311 }
0312
0313 private:
0314
0315 friend class micro_queue_pop_finalizer<self_type, value_type, page_allocator_type>;
0316
0317
0318 class destroyer {
0319 value_type& my_value;
0320 public:
0321 destroyer( reference value ) : my_value(value) {}
0322 destroyer( const destroyer& ) = delete;
0323 destroyer& operator=( const destroyer& ) = delete;
0324 ~destroyer() {my_value.~T();}
0325 };
0326
0327 void copy_item( padded_page& dst, size_type dindex, const padded_page& src, size_type sindex,
0328 item_constructor_type construct_item )
0329 {
0330 auto& src_item = src[sindex];
0331 construct_item( &dst[dindex], static_cast<const void*>(&src_item) );
0332 }
0333
0334 void assign_and_destroy_item( void* dst, padded_page& src, size_type index ) {
0335 auto& from = src[index];
0336 destroyer d(from);
0337 *static_cast<T*>(dst) = std::move(from);
0338 }
0339
0340 void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
0341 for (atomic_backoff b{};; b.pause()) {
0342 ticket_type c = counter.load(std::memory_order_acquire);
0343 if (c == k) return;
0344 else if (c & 1) {
0345 ++rb.n_invalid_entries;
0346 throw_exception( exception_id::bad_last_alloc);
0347 }
0348 }
0349 }
0350
0351 std::atomic<padded_page*> head_page{};
0352 std::atomic<ticket_type> head_counter{};
0353
0354 std::atomic<padded_page*> tail_page{};
0355 std::atomic<ticket_type> tail_counter{};
0356
0357 spin_mutex page_mutex{};
0358 };
0359
0360 #if _MSC_VER && !defined(__INTEL_COMPILER)
0361 #pragma warning( pop )
0362 #endif
0363
0364 template <typename Container, typename T, typename Allocator>
0365 class micro_queue_pop_finalizer {
0366 public:
0367 using padded_page = typename Container::padded_page;
0368 using allocator_type = Allocator;
0369 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
0370
0371 micro_queue_pop_finalizer( Container& queue, Allocator& alloc, ticket_type k, padded_page* p ) :
0372 my_ticket_type(k), my_queue(queue), my_page(p), allocator(alloc)
0373 {}
0374
0375 micro_queue_pop_finalizer( const micro_queue_pop_finalizer& ) = delete;
0376 micro_queue_pop_finalizer& operator=( const micro_queue_pop_finalizer& ) = delete;
0377
0378 ~micro_queue_pop_finalizer() {
0379 padded_page* p = my_page;
0380 if( is_valid_page(p) ) {
0381 spin_mutex::scoped_lock lock( my_queue.page_mutex );
0382 padded_page* q = p->next;
0383 my_queue.head_page.store(q, std::memory_order_relaxed);
0384 if( !is_valid_page(q) ) {
0385 my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
0386 }
0387 }
0388 my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
0389 if ( is_valid_page(p) ) {
0390 allocator_traits_type::destroy(allocator, static_cast<padded_page*>(p));
0391 allocator_traits_type::deallocate(allocator, static_cast<padded_page*>(p), 1);
0392 }
0393 }
0394 private:
0395 ticket_type my_ticket_type;
0396 Container& my_queue;
0397 padded_page* my_page;
0398 Allocator& allocator;
0399 };
0400
0401 #if _MSC_VER && !defined(__INTEL_COMPILER)
0402
0403 #pragma warning( push )
0404 #pragma warning( disable: 4324 )
0405 #endif
0406
0407 template <typename T, typename Allocator>
0408 struct concurrent_queue_rep {
0409 using self_type = concurrent_queue_rep<T, Allocator>;
0410 using size_type = std::size_t;
0411 using micro_queue_type = micro_queue<T, Allocator>;
0412 using allocator_type = Allocator;
0413 using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
0414 using padded_page = typename micro_queue_type::padded_page;
0415 using page_allocator_type = typename micro_queue_type::page_allocator_type;
0416 using item_constructor_type = typename micro_queue_type::item_constructor_type;
0417 private:
0418 using page_allocator_traits = tbb::detail::allocator_traits<page_allocator_type>;
0419 using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<self_type>;
0420
0421 public:
0422
0423 static constexpr size_type n_queue = 8;
0424
0425 static constexpr size_type phi = 3;
0426 static constexpr size_type item_size = micro_queue_type::item_size;
0427 static constexpr size_type items_per_page = micro_queue_type::items_per_page;
0428
0429 concurrent_queue_rep() {}
0430
0431 concurrent_queue_rep( const concurrent_queue_rep& ) = delete;
0432 concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete;
0433
0434 void clear( queue_allocator_type& alloc ) {
0435 for (size_type index = 0; index < n_queue; ++index) {
0436 array[index].clear(alloc);
0437 }
0438 head_counter.store(0, std::memory_order_relaxed);
0439 tail_counter.store(0, std::memory_order_relaxed);
0440 n_invalid_entries.store(0, std::memory_order_relaxed);
0441 }
0442
0443 void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
0444 head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
0445 tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
0446 n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed);
0447
0448
0449 size_type queue_idx = 0;
0450 try_call( [&] {
0451 for (; queue_idx < n_queue; ++queue_idx) {
0452 array[queue_idx].assign(src.array[queue_idx], alloc, construct_item);
0453 }
0454 }).on_exception( [&] {
0455 for (size_type i = 0; i < queue_idx + 1; ++i) {
0456 array[i].clear_and_invalidate(alloc);
0457 }
0458 head_counter.store(0, std::memory_order_relaxed);
0459 tail_counter.store(0, std::memory_order_relaxed);
0460 n_invalid_entries.store(0, std::memory_order_relaxed);
0461 });
0462
0463 __TBB_ASSERT(head_counter.load(std::memory_order_relaxed) == src.head_counter.load(std::memory_order_relaxed) &&
0464 tail_counter.load(std::memory_order_relaxed) == src.tail_counter.load(std::memory_order_relaxed),
0465 "the source concurrent queue should not be concurrently modified." );
0466 }
0467
0468 bool empty() const {
0469 ticket_type tc = tail_counter.load(std::memory_order_acquire);
0470 ticket_type hc = head_counter.load(std::memory_order_relaxed);
0471
0472 return tc == tail_counter.load(std::memory_order_relaxed) &&
0473 std::ptrdiff_t(tc - hc - n_invalid_entries.load(std::memory_order_relaxed)) <= 0;
0474 }
0475
0476 std::ptrdiff_t size() const {
0477 __TBB_ASSERT(sizeof(std::ptrdiff_t) <= sizeof(size_type), nullptr);
0478 std::ptrdiff_t hc = head_counter.load(std::memory_order_acquire);
0479 std::ptrdiff_t tc = tail_counter.load(std::memory_order_relaxed);
0480 std::ptrdiff_t nie = n_invalid_entries.load(std::memory_order_relaxed);
0481
0482 return tc - hc - nie;
0483 }
0484
0485 friend class micro_queue<T, Allocator>;
0486
0487
0488 static size_type index( ticket_type k ) {
0489 return k * phi % n_queue;
0490 }
0491
0492 micro_queue_type& choose( ticket_type k ) {
0493
0494 return array[index(k)];
0495 }
0496
0497 alignas(max_nfs_size) micro_queue_type array[n_queue];
0498
0499 alignas(max_nfs_size) std::atomic<ticket_type> head_counter{};
0500 alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{};
0501 alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{};
0502 };
0503
0504 #if _MSC_VER && !defined(__INTEL_COMPILER)
0505 #pragma warning( pop )
0506 #endif
0507
0508 template <typename Value, typename Allocator>
0509 class concurrent_queue_iterator_base {
0510 using queue_rep_type = concurrent_queue_rep<Value, Allocator>;
0511 using padded_page = typename queue_rep_type::padded_page;
0512 protected:
0513 concurrent_queue_iterator_base() = default;
0514
0515 concurrent_queue_iterator_base( const concurrent_queue_iterator_base& other ) {
0516 assign(other);
0517 }
0518
0519 concurrent_queue_iterator_base( queue_rep_type* queue_rep )
0520 : my_queue_rep(queue_rep),
0521 my_head_counter(my_queue_rep->head_counter.load(std::memory_order_relaxed))
0522 {
0523 for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
0524 my_array[i] = my_queue_rep->array[i].get_head_page();
0525 }
0526
0527 if (!get_item(my_item, my_head_counter)) advance();
0528 }
0529
0530 void assign( const concurrent_queue_iterator_base& other ) {
0531 my_item = other.my_item;
0532 my_queue_rep = other.my_queue_rep;
0533
0534 if (my_queue_rep != nullptr) {
0535 my_head_counter = other.my_head_counter;
0536
0537 for (std::size_t i = 0; i < queue_rep_type::n_queue; ++i) {
0538 my_array[i] = other.my_array[i];
0539 }
0540 }
0541 }
0542
0543 void advance() {
0544 __TBB_ASSERT(my_item, "Attempt to increment iterator past end of the queue");
0545 std::size_t k = my_head_counter;
0546 #if TBB_USE_ASSERT
0547 Value* tmp;
0548 get_item(tmp, k);
0549 __TBB_ASSERT(my_item == tmp, nullptr);
0550 #endif
0551 std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
0552 if (i == my_queue_rep->items_per_page - 1) {
0553 padded_page*& root = my_array[queue_rep_type::index(k)];
0554 root = root->next;
0555 }
0556
0557 my_head_counter = ++k;
0558 if (!get_item(my_item, k)) advance();
0559 }
0560
0561 concurrent_queue_iterator_base& operator=( const concurrent_queue_iterator_base& other ) {
0562 this->assign(other);
0563 return *this;
0564 }
0565
0566 bool get_item( Value*& item, std::size_t k ) {
0567 if (k == my_queue_rep->tail_counter.load(std::memory_order_relaxed)) {
0568 item = nullptr;
0569 return true;
0570 } else {
0571 padded_page* p = my_array[queue_rep_type::index(k)];
0572 __TBB_ASSERT(p, nullptr);
0573 std::size_t i = modulo_power_of_two(k / queue_rep_type::n_queue, my_queue_rep->items_per_page);
0574 item = &(*p)[i];
0575 return (p->mask & uintptr_t(1) << i) != 0;
0576 }
0577 }
0578
0579 Value* my_item{ nullptr };
0580 queue_rep_type* my_queue_rep{ nullptr };
0581 ticket_type my_head_counter{};
0582 padded_page* my_array[queue_rep_type::n_queue]{};
0583 };
0584
0585 struct concurrent_queue_iterator_provider {
0586 template <typename Iterator, typename Container>
0587 static Iterator get( const Container& container ) {
0588 return Iterator(container);
0589 }
0590 };
0591
0592 template <typename Container, typename Value, typename Allocator>
0593 class concurrent_queue_iterator : public concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator> {
0594 using base_type = concurrent_queue_iterator_base<typename std::remove_cv<Value>::type, Allocator>;
0595 public:
0596 using value_type = Value;
0597 using pointer = value_type*;
0598 using reference = value_type&;
0599 using difference_type = std::ptrdiff_t;
0600 using iterator_category = std::forward_iterator_tag;
0601
0602 concurrent_queue_iterator() = default;
0603
0604
0605
0606 concurrent_queue_iterator( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other )
0607 : base_type(other) {}
0608
0609 private:
0610 concurrent_queue_iterator( const Container& container )
0611 : base_type(container.my_queue_representation) {}
0612 public:
0613 concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container, typename Container::value_type, Allocator>& other ) {
0614 this->assign(other);
0615 return *this;
0616 }
0617
0618 reference operator*() const {
0619 return *static_cast<pointer>(this->my_item);
0620 }
0621
0622 pointer operator->() const { return &operator*(); }
0623
0624 concurrent_queue_iterator& operator++() {
0625 this->advance();
0626 return *this;
0627 }
0628
0629 concurrent_queue_iterator operator++(int) {
0630 concurrent_queue_iterator tmp = *this;
0631 ++*this;
0632 return tmp;
0633 }
0634
0635 friend bool operator==( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
0636 return lhs.my_item == rhs.my_item;
0637 }
0638
0639 friend bool operator!=( const concurrent_queue_iterator& lhs, const concurrent_queue_iterator& rhs ) {
0640 return lhs.my_item != rhs.my_item;
0641 }
0642 private:
0643 friend struct concurrent_queue_iterator_provider;
0644 };
0645
0646 }
0647 }
0648 }
0649
0650 #endif