File indexing completed on 2025-01-18 10:12:45
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__concurrent_queue_impl_H
0018 #define __TBB__concurrent_queue_impl_H
0019
0020 #ifndef __TBB_concurrent_queue_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024 #include "../tbb_stddef.h"
0025 #include "../tbb_machine.h"
0026 #include "../atomic.h"
0027 #include "../spin_mutex.h"
0028 #include "../cache_aligned_allocator.h"
0029 #include "../tbb_exception.h"
0030 #include "../tbb_profiling.h"
0031 #include <new>
0032 #include __TBB_STD_SWAP_HEADER
0033 #include <iterator>
0034
0035 namespace tbb {
0036
0037 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
0038
0039
0040 namespace strict_ppl {
0041 template<typename T, typename A> class concurrent_queue;
0042 }
0043
0044 template<typename T, typename A> class concurrent_bounded_queue;
0045
0046 #endif
0047
0048
0049 namespace strict_ppl {
0050
0051
0052 namespace internal {
0053
0054 using namespace tbb::internal;
0055
0056 typedef size_t ticket;
0057
0058 template<typename T> class micro_queue ;
0059 template<typename T> class micro_queue_pop_finalizer ;
0060 template<typename T> class concurrent_queue_base_v3;
0061 template<typename T> struct concurrent_queue_rep;
0062
0063
0064
0065
0066
0067 struct concurrent_queue_rep_base : no_copy {
0068 template<typename T> friend class micro_queue;
0069 template<typename T> friend class concurrent_queue_base_v3;
0070
0071 protected:
0072
0073 static const size_t phi = 3;
0074
0075 public:
0076
0077 static const size_t n_queue = 8;
0078
0079
0080 struct page {
0081 page* next;
0082 uintptr_t mask;
0083 };
0084
0085 atomic<ticket> head_counter;
0086 char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
0087 atomic<ticket> tail_counter;
0088 char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
0089
0090
0091 size_t items_per_page;
0092
0093
0094 size_t item_size;
0095
0096
0097 atomic<size_t> n_invalid_entries;
0098
0099 char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
0100 } ;
0101
0102 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
0103 return uintptr_t(p)>1;
0104 }
0105
0106
0107
0108
0109
0110 class concurrent_queue_page_allocator
0111 {
0112 template<typename T> friend class micro_queue ;
0113 template<typename T> friend class micro_queue_pop_finalizer ;
0114 protected:
0115 virtual ~concurrent_queue_page_allocator() {}
0116 private:
0117 virtual concurrent_queue_rep_base::page* allocate_page() = 0;
0118 virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
0119 } ;
0120
0121 #if _MSC_VER && !defined(__INTEL_COMPILER)
0122
0123 #pragma warning( push )
0124 #pragma warning( disable: 4146 )
0125 #endif
0126
0127
0128
0129
0130 template<typename T>
0131 class micro_queue : no_copy {
0132 public:
0133 typedef void (*item_constructor_t)(T* location, const void* src);
0134 private:
0135 typedef concurrent_queue_rep_base::page page;
0136
0137
0138 class destroyer: no_copy {
0139 T& my_value;
0140 public:
0141 destroyer( T& value ) : my_value(value) {}
0142 ~destroyer() {my_value.~T();}
0143 };
0144
0145 void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
0146 construct_item( &get_ref(dst, dindex), src );
0147 }
0148
0149 void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
0150 item_constructor_t construct_item )
0151 {
0152 T& src_item = get_ref( const_cast<page&>(src), sindex );
0153 construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
0154 }
0155
0156 void assign_and_destroy_item( void* dst, page& src, size_t index ) {
0157 T& from = get_ref(src,index);
0158 destroyer d(from);
0159 *static_cast<T*>(dst) = tbb::internal::move( from );
0160 }
0161
0162 void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
0163
0164 public:
0165 friend class micro_queue_pop_finalizer<T>;
0166
0167 struct padded_page: page {
0168
0169 padded_page();
0170
0171 void operator=( const padded_page& );
0172
0173 T last;
0174 };
0175
0176 static T& get_ref( page& p, size_t index ) {
0177 return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
0178 }
0179
0180 atomic<page*> head_page;
0181 atomic<ticket> head_counter;
0182
0183 atomic<page*> tail_page;
0184 atomic<ticket> tail_counter;
0185
0186 spin_mutex page_mutex;
0187
0188 void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
0189 item_constructor_t construct_item ) ;
0190
0191 bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
0192
0193 micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base,
0194 item_constructor_t construct_item ) ;
0195
0196 page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
0197 size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
0198
0199 void invalidate_page_and_rethrow( ticket k ) ;
0200 };
0201
0202 template<typename T>
0203 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
0204 for( atomic_backoff b(true);;b.pause() ) {
0205 ticket c = counter;
0206 if( c==k ) return;
0207 else if( c&1 ) {
0208 ++rb.n_invalid_entries;
0209 throw_exception( eid_bad_last_alloc );
0210 }
0211 }
0212 }
0213
0214 template<typename T>
0215 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
0216 item_constructor_t construct_item )
0217 {
0218 k &= -concurrent_queue_rep_base::n_queue;
0219 page* p = NULL;
0220 size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
0221 if( !index ) {
0222 __TBB_TRY {
0223 concurrent_queue_page_allocator& pa = base;
0224 p = pa.allocate_page();
0225 } __TBB_CATCH (...) {
0226 ++base.my_rep->n_invalid_entries;
0227 invalidate_page_and_rethrow( k );
0228 }
0229 p->mask = 0;
0230 p->next = NULL;
0231 }
0232
0233 if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
0234 call_itt_notify(acquired, &tail_counter);
0235
0236 if( p ) {
0237 spin_mutex::scoped_lock lock( page_mutex );
0238 page* q = tail_page;
0239 if( is_valid_page(q) )
0240 q->next = p;
0241 else
0242 head_page = p;
0243 tail_page = p;
0244 } else {
0245 p = tail_page;
0246 }
0247
0248 __TBB_TRY {
0249 copy_item( *p, index, item, construct_item );
0250
0251 itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
0252 call_itt_notify(releasing, &tail_counter);
0253 tail_counter += concurrent_queue_rep_base::n_queue;
0254 } __TBB_CATCH (...) {
0255 ++base.my_rep->n_invalid_entries;
0256 call_itt_notify(releasing, &tail_counter);
0257 tail_counter += concurrent_queue_rep_base::n_queue;
0258 __TBB_RETHROW();
0259 }
0260 }
0261
0262 template<typename T>
0263 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
0264 k &= -concurrent_queue_rep_base::n_queue;
0265 if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
0266 call_itt_notify(acquired, &head_counter);
0267 if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
0268 call_itt_notify(acquired, &tail_counter);
0269 page *p = head_page;
0270 __TBB_ASSERT( p, NULL );
0271 size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
0272 bool success = false;
0273 {
0274 micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? p : NULL );
0275 if( p->mask & uintptr_t(1)<<index ) {
0276 success = true;
0277 assign_and_destroy_item( dst, *p, index );
0278 } else {
0279 --base.my_rep->n_invalid_entries;
0280 }
0281 }
0282 return success;
0283 }
0284
0285 template<typename T>
0286 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base,
0287 item_constructor_t construct_item )
0288 {
0289 head_counter = src.head_counter;
0290 tail_counter = src.tail_counter;
0291
0292 const page* srcp = src.head_page;
0293 if( is_valid_page(srcp) ) {
0294 ticket g_index = head_counter;
0295 __TBB_TRY {
0296 size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
0297 size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
0298 size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
0299
0300 head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
0301 page* cur_page = head_page;
0302
0303 if( srcp != src.tail_page ) {
0304 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
0305 cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
0306 cur_page = cur_page->next;
0307 }
0308
0309 __TBB_ASSERT( srcp==src.tail_page, NULL );
0310 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
0311 if( last_index==0 ) last_index = base.my_rep->items_per_page;
0312
0313 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
0314 cur_page = cur_page->next;
0315 }
0316 tail_page = cur_page;
0317 } __TBB_CATCH (...) {
0318 invalidate_page_and_rethrow( g_index );
0319 }
0320 } else {
0321 head_page = tail_page = NULL;
0322 }
0323 return *this;
0324 }
0325
0326 template<typename T>
0327 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
0328
0329 page* invalid_page = (page*)uintptr_t(1);
0330 {
0331 spin_mutex::scoped_lock lock( page_mutex );
0332 itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
0333 page* q = tail_page;
0334 if( is_valid_page(q) )
0335 q->next = invalid_page;
0336 else
0337 head_page = invalid_page;
0338 tail_page = invalid_page;
0339 }
0340 __TBB_RETHROW();
0341 }
0342
0343 template<typename T>
0344 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base,
0345 const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
0346 ticket& g_index, item_constructor_t construct_item )
0347 {
0348 concurrent_queue_page_allocator& pa = base;
0349 page* new_page = pa.allocate_page();
0350 new_page->next = NULL;
0351 new_page->mask = src_page->mask;
0352 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
0353 if( new_page->mask & uintptr_t(1)<<begin_in_page )
0354 copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
0355 return new_page;
0356 }
0357
0358 template<typename T>
0359 class micro_queue_pop_finalizer: no_copy {
0360 typedef concurrent_queue_rep_base::page page;
0361 ticket my_ticket;
0362 micro_queue<T>& my_queue;
0363 page* my_page;
0364 concurrent_queue_page_allocator& allocator;
0365 public:
0366 micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
0367 my_ticket(k), my_queue(queue), my_page(p), allocator(b)
0368 {}
0369 ~micro_queue_pop_finalizer() ;
0370 };
0371
0372 template<typename T>
0373 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
0374 page* p = my_page;
0375 if( is_valid_page(p) ) {
0376 spin_mutex::scoped_lock lock( my_queue.page_mutex );
0377 page* q = p->next;
0378 my_queue.head_page = q;
0379 if( !is_valid_page(q) ) {
0380 my_queue.tail_page = NULL;
0381 }
0382 }
0383 itt_store_word_with_release(my_queue.head_counter, my_ticket);
0384 if( is_valid_page(p) ) {
0385 allocator.deallocate_page( p );
0386 }
0387 }
0388
0389 #if _MSC_VER && !defined(__INTEL_COMPILER)
0390 #pragma warning( pop )
0391 #endif
0392
0393 template<typename T> class concurrent_queue_iterator_rep ;
0394 template<typename T> class concurrent_queue_iterator_base_v3;
0395
0396
0397
0398
0399
0400 template<typename T>
0401 struct concurrent_queue_rep : public concurrent_queue_rep_base {
0402 micro_queue<T> array[n_queue];
0403
0404
0405 static size_t index( ticket k ) {
0406 return k*phi%n_queue;
0407 }
0408
0409 micro_queue<T>& choose( ticket k ) {
0410
0411 return array[index(k)];
0412 }
0413 };
0414
0415
0416
0417
0418
0419
0420 template<typename T>
0421 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
0422 private:
0423
0424 concurrent_queue_rep<T>* my_rep;
0425
0426 friend struct concurrent_queue_rep<T>;
0427 friend class micro_queue<T>;
0428 friend class concurrent_queue_iterator_rep<T>;
0429 friend class concurrent_queue_iterator_base_v3<T>;
0430
0431 protected:
0432 typedef typename concurrent_queue_rep<T>::page page;
0433
0434 private:
0435 typedef typename micro_queue<T>::padded_page padded_page;
0436 typedef typename micro_queue<T>::item_constructor_t item_constructor_t;
0437
0438 virtual page *allocate_page() __TBB_override {
0439 concurrent_queue_rep<T>& r = *my_rep;
0440 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
0441 return reinterpret_cast<page*>(allocate_block ( n ));
0442 }
0443
0444 virtual void deallocate_page( concurrent_queue_rep_base::page *p ) __TBB_override {
0445 concurrent_queue_rep<T>& r = *my_rep;
0446 size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
0447 deallocate_block( reinterpret_cast<void*>(p), n );
0448 }
0449
0450
0451 virtual void *allocate_block( size_t n ) = 0;
0452
0453
0454 virtual void deallocate_block( void *p, size_t n ) = 0;
0455
0456 protected:
0457 concurrent_queue_base_v3();
0458
0459 virtual ~concurrent_queue_base_v3() {
0460 #if TBB_USE_ASSERT
0461 size_t nq = my_rep->n_queue;
0462 for( size_t i=0; i<nq; i++ )
0463 __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
0464 #endif
0465 cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
0466 }
0467
0468
0469 void internal_push( const void* src, item_constructor_t construct_item ) {
0470 concurrent_queue_rep<T>& r = *my_rep;
0471 ticket k = r.tail_counter++;
0472 r.choose(k).push( src, k, *this, construct_item );
0473 }
0474
0475
0476
0477 bool internal_try_pop( void* dst ) ;
0478
0479
0480 size_t internal_size() const ;
0481
0482
0483 bool internal_empty() const ;
0484
0485
0486
0487 void internal_finish_clear() ;
0488
0489
0490 void internal_throw_exception() const {
0491 throw_exception( eid_bad_alloc );
0492 }
0493
0494
0495 void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
0496
0497 #if __TBB_CPP11_RVALUE_REF_PRESENT
0498
0499 void internal_swap( concurrent_queue_base_v3& src ) {
0500 std::swap( my_rep, src.my_rep );
0501 }
0502 #endif
0503 };
0504
0505 template<typename T>
0506 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
0507 const size_t item_size = sizeof(T);
0508 my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
0509 __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
0510 __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
0511 __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
0512 __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
0513 memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep<T>));
0514 my_rep->item_size = item_size;
0515 my_rep->items_per_page = item_size<= 8 ? 32 :
0516 item_size<= 16 ? 16 :
0517 item_size<= 32 ? 8 :
0518 item_size<= 64 ? 4 :
0519 item_size<=128 ? 2 :
0520 1;
0521 }
0522
0523 template<typename T>
0524 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
0525 concurrent_queue_rep<T>& r = *my_rep;
0526 ticket k;
0527 do {
0528 k = r.head_counter;
0529 for(;;) {
0530 if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
0531
0532 return false;
0533 }
0534
0535 ticket tk=k;
0536 #if defined(_MSC_VER) && defined(_Wp64)
0537 #pragma warning (push)
0538 #pragma warning (disable: 4267)
0539 #endif
0540 k = r.head_counter.compare_and_swap( tk+1, tk );
0541 #if defined(_MSC_VER) && defined(_Wp64)
0542 #pragma warning (pop)
0543 #endif
0544 if( k==tk )
0545 break;
0546
0547 }
0548 } while( !r.choose( k ).pop( dst, k, *this ) );
0549 return true;
0550 }
0551
0552 template<typename T>
0553 size_t concurrent_queue_base_v3<T>::internal_size() const {
0554 concurrent_queue_rep<T>& r = *my_rep;
0555 __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
0556 ticket hc = r.head_counter;
0557 size_t nie = r.n_invalid_entries;
0558 ticket tc = r.tail_counter;
0559 __TBB_ASSERT( hc!=tc || !nie, NULL );
0560 ptrdiff_t sz = tc-hc-nie;
0561 return sz<0 ? 0 : size_t(sz);
0562 }
0563
0564 template<typename T>
0565 bool concurrent_queue_base_v3<T>::internal_empty() const {
0566 concurrent_queue_rep<T>& r = *my_rep;
0567 ticket tc = r.tail_counter;
0568 ticket hc = r.head_counter;
0569
0570 return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
0571 }
0572
0573 template<typename T>
0574 void concurrent_queue_base_v3<T>::internal_finish_clear() {
0575 concurrent_queue_rep<T>& r = *my_rep;
0576 size_t nq = r.n_queue;
0577 for( size_t i=0; i<nq; ++i ) {
0578 page* tp = r.array[i].tail_page;
0579 if( is_valid_page(tp) ) {
0580 __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
0581 deallocate_page( tp );
0582 r.array[i].tail_page = NULL;
0583 } else
0584 __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
0585 }
0586 }
0587
0588 template<typename T>
0589 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src,
0590 item_constructor_t construct_item )
0591 {
0592 concurrent_queue_rep<T>& r = *my_rep;
0593 r.items_per_page = src.my_rep->items_per_page;
0594
0595
0596 r.head_counter = src.my_rep->head_counter;
0597 r.tail_counter = src.my_rep->tail_counter;
0598 r.n_invalid_entries = src.my_rep->n_invalid_entries;
0599
0600
0601 for( size_t i = 0; i < r.n_queue; ++i )
0602 r.array[i].assign( src.my_rep->array[i], *this, construct_item);
0603
0604 __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
0605 "the source concurrent queue should not be concurrently modified." );
0606 }
0607
0608 template<typename Container, typename Value> class concurrent_queue_iterator;
0609
0610 template<typename T>
0611 class concurrent_queue_iterator_rep: no_assign {
0612 typedef typename micro_queue<T>::padded_page padded_page;
0613 public:
0614 ticket head_counter;
0615 const concurrent_queue_base_v3<T>& my_queue;
0616 typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
0617 concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
0618 head_counter(queue.my_rep->head_counter),
0619 my_queue(queue)
0620 {
0621 for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
0622 array[k] = queue.my_rep->array[k].head_page;
0623 }
0624
0625
0626 bool get_item( T*& item, size_t k ) ;
0627 };
0628
0629 template<typename T>
0630 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
0631 if( k==my_queue.my_rep->tail_counter ) {
0632 item = NULL;
0633 return true;
0634 } else {
0635 typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
0636 __TBB_ASSERT(p,NULL);
0637 size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
0638 item = µ_queue<T>::get_ref(*p,i);
0639 return (p->mask & uintptr_t(1)<<i)!=0;
0640 }
0641 }
0642
0643
0644
0645 template<typename Value>
0646 class concurrent_queue_iterator_base_v3 {
0647
0648
0649 concurrent_queue_iterator_rep<Value>* my_rep;
0650
0651 template<typename C, typename T, typename U>
0652 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
0653
0654 template<typename C, typename T, typename U>
0655 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
0656 protected:
0657
0658 Value* my_item;
0659
0660
0661 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
0662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
0663 __TBB_compiler_fence();
0664 #endif
0665 }
0666
0667
0668 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
0669 : my_rep(NULL), my_item(NULL) {
0670 assign(i);
0671 }
0672
0673 concurrent_queue_iterator_base_v3& operator=( const concurrent_queue_iterator_base_v3& i ) {
0674 assign(i);
0675 return *this;
0676 }
0677
0678
0679 concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
0680
0681
0682 void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
0683
0684
0685 void advance() ;
0686
0687
0688 ~concurrent_queue_iterator_base_v3() {
0689 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
0690 my_rep = NULL;
0691 }
0692 };
0693
0694 template<typename Value>
0695 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
0696 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
0697 new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
0698 size_t k = my_rep->head_counter;
0699 if( !my_rep->get_item(my_item, k) ) advance();
0700 }
0701
0702 template<typename Value>
0703 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
0704 if( my_rep!=other.my_rep ) {
0705 if( my_rep ) {
0706 cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
0707 my_rep = NULL;
0708 }
0709 if( other.my_rep ) {
0710 my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
0711 new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
0712 }
0713 }
0714 my_item = other.my_item;
0715 }
0716
0717 template<typename Value>
0718 void concurrent_queue_iterator_base_v3<Value>::advance() {
0719 __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
0720 size_t k = my_rep->head_counter;
0721 const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
0722 #if TBB_USE_ASSERT
0723 Value* tmp;
0724 my_rep->get_item(tmp,k);
0725 __TBB_ASSERT( my_item==tmp, NULL );
0726 #endif
0727 size_t i = modulo_power_of_two( k/concurrent_queue_rep<Value>::n_queue, queue.my_rep->items_per_page );
0728 if( i==queue.my_rep->items_per_page-1 ) {
0729 typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
0730 root = root->next;
0731 }
0732
0733 my_rep->head_counter = ++k;
0734 if( !my_rep->get_item(my_item, k) ) advance();
0735 }
0736
0737
0738
0739 template<typename T> struct tbb_remove_cv {typedef T type;};
0740 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
0741 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
0742 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
0743
0744
0745
0746
0747 template<typename Container, typename Value>
0748 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
0749 public std::iterator<std::forward_iterator_tag,Value> {
0750 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
0751 template<typename T, class A>
0752 friend class ::tbb::strict_ppl::concurrent_queue;
0753 #else
0754 public:
0755 #endif
0756
0757 explicit concurrent_queue_iterator( const concurrent_queue_base_v3<typename tbb_remove_cv<Value>::type>& queue ) :
0758 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
0759 {
0760 }
0761
0762 public:
0763 concurrent_queue_iterator() {}
0764
0765
0766
0767 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
0768 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
0769 {}
0770
0771
0772 concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) {
0773 concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>::operator=(other);
0774 return *this;
0775 }
0776
0777
0778 Value& operator*() const {
0779 return *static_cast<Value*>(this->my_item);
0780 }
0781
0782 Value* operator->() const {return &operator*();}
0783
0784
0785 concurrent_queue_iterator& operator++() {
0786 this->advance();
0787 return *this;
0788 }
0789
0790
0791 Value* operator++(int) {
0792 Value* result = &operator*();
0793 operator++();
0794 return result;
0795 }
0796 };
0797
0798
0799 template<typename C, typename T, typename U>
0800 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
0801 return i.my_item==j.my_item;
0802 }
0803
0804 template<typename C, typename T, typename U>
0805 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
0806 return i.my_item!=j.my_item;
0807 }
0808
0809 }
0810
0811
0812
0813 }
0814
0815
0816 namespace internal {
0817
0818 class concurrent_queue_rep;
0819 class concurrent_queue_iterator_rep;
0820 class concurrent_queue_iterator_base_v3;
0821 template<typename Container, typename Value> class concurrent_queue_iterator;
0822
0823
0824
0825
0826 class concurrent_queue_base_v3: no_copy {
0827 private:
0828
0829 concurrent_queue_rep* my_rep;
0830
0831 friend class concurrent_queue_rep;
0832 friend struct micro_queue;
0833 friend class micro_queue_pop_finalizer;
0834 friend class concurrent_queue_iterator_rep;
0835 friend class concurrent_queue_iterator_base_v3;
0836 protected:
0837
0838 struct page {
0839 page* next;
0840 uintptr_t mask;
0841 };
0842
0843
0844 ptrdiff_t my_capacity;
0845
0846
0847 size_t items_per_page;
0848
0849
0850 size_t item_size;
0851
0852 enum copy_specifics { copy, move };
0853
0854 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
0855 public:
0856 #endif
0857 template<typename T>
0858 struct padded_page: page {
0859
0860 padded_page();
0861
0862 void operator=( const padded_page& );
0863
0864 T last;
0865 };
0866
0867 private:
0868 virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
0869 virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
0870 protected:
0871 __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
0872 virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
0873
0874
0875 void __TBB_EXPORTED_METHOD internal_push( const void* src );
0876
0877
0878 void __TBB_EXPORTED_METHOD internal_pop( void* dst );
0879
0880
0881 void __TBB_EXPORTED_METHOD internal_abort();
0882
0883
0884 bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
0885
0886
0887
0888 bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
0889
0890
0891 ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
0892
0893
0894 bool __TBB_EXPORTED_METHOD internal_empty() const;
0895
0896
0897 void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
0898
0899
0900 virtual page *allocate_page() = 0;
0901
0902
0903 virtual void deallocate_page( page *p ) = 0;
0904
0905
0906
0907 void __TBB_EXPORTED_METHOD internal_finish_clear() ;
0908
0909
0910 void __TBB_EXPORTED_METHOD internal_throw_exception() const;
0911
0912
0913 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
0914
0915 #if __TBB_CPP11_RVALUE_REF_PRESENT
0916
0917 void internal_swap( concurrent_queue_base_v3& src ) {
0918 std::swap( my_capacity, src.my_capacity );
0919 std::swap( items_per_page, src.items_per_page );
0920 std::swap( item_size, src.item_size );
0921 std::swap( my_rep, src.my_rep );
0922 }
0923 #endif
0924
0925
0926 void internal_insert_item( const void* src, copy_specifics op_type );
0927
0928
0929 bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
0930
0931
0932 void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
0933 private:
0934 virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
0935 };
0936
0937
0938
0939
0940 class concurrent_queue_base_v8: public concurrent_queue_base_v3 {
0941 protected:
0942 concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
0943
0944
0945 void __TBB_EXPORTED_METHOD move_content( concurrent_queue_base_v8& src ) ;
0946
0947
0948 bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full( const void* src );
0949
0950
0951 void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
0952 private:
0953 friend struct micro_queue;
0954 virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
0955 virtual void move_item( page& dst, size_t index, const void* src ) = 0;
0956 };
0957
0958
0959
0960 class concurrent_queue_iterator_base_v3 {
0961
0962
0963 concurrent_queue_iterator_rep* my_rep;
0964
0965 template<typename C, typename T, typename U>
0966 friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
0967
0968 template<typename C, typename T, typename U>
0969 friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
0970
0971 void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
0972 protected:
0973
0974 void* my_item;
0975
0976
0977 concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
0978
0979
0980 concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
0981 assign(i);
0982 }
0983
0984 concurrent_queue_iterator_base_v3& operator=( const concurrent_queue_iterator_base_v3& i ) {
0985 assign(i);
0986 return *this;
0987 }
0988
0989
0990
0991 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
0992
0993
0994 __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
0995
0996
0997 void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
0998
0999
1000 void __TBB_EXPORTED_METHOD advance();
1001
1002
1003 __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
1004 };
1005
1006 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
1007
1008
1009
1010
1011 template<typename Container, typename Value>
1012 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
1013 public std::iterator<std::forward_iterator_tag,Value> {
1014
1015 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016 template<typename T, class A>
1017 friend class ::tbb::concurrent_bounded_queue;
1018 #else
1019 public:
1020 #endif
1021
1022
1023 explicit concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
1024 concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
1025 {
1026 }
1027
1028 public:
1029 concurrent_queue_iterator() {}
1030
1031
1032
1033 concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
1034 concurrent_queue_iterator_base_v3(other)
1035 {}
1036
1037
1038 concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) {
1039 concurrent_queue_iterator_base_v3::operator=(other);
1040 return *this;
1041 }
1042
1043
1044 Value& operator*() const {
1045 return *static_cast<Value*>(my_item);
1046 }
1047
1048 Value* operator->() const {return &operator*();}
1049
1050
1051 concurrent_queue_iterator& operator++() {
1052 advance();
1053 return *this;
1054 }
1055
1056
1057 Value* operator++(int) {
1058 Value* result = &operator*();
1059 operator++();
1060 return result;
1061 }
1062 };
1063
1064
1065 template<typename C, typename T, typename U>
1066 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1067 return i.my_item==j.my_item;
1068 }
1069
1070 template<typename C, typename T, typename U>
1071 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1072 return i.my_item!=j.my_item;
1073 }
1074
1075 }
1076
1077
1078
1079 }
1080
1081 #endif