Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:45

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__concurrent_queue_impl_H
0018 #define __TBB__concurrent_queue_impl_H
0019 
0020 #ifndef __TBB_concurrent_queue_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 #include "../tbb_stddef.h"
0025 #include "../tbb_machine.h"
0026 #include "../atomic.h"
0027 #include "../spin_mutex.h"
0028 #include "../cache_aligned_allocator.h"
0029 #include "../tbb_exception.h"
0030 #include "../tbb_profiling.h"
0031 #include <new>
0032 #include __TBB_STD_SWAP_HEADER
0033 #include <iterator>
0034 
0035 namespace tbb {
0036 
0037 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
0038 
0039 // forward declaration
0040 namespace strict_ppl {
0041 template<typename T, typename A> class concurrent_queue;
0042 }
0043 
0044 template<typename T, typename A> class concurrent_bounded_queue;
0045 
0046 #endif
0047 
0048 //! For internal use only.
0049 namespace strict_ppl {
0050 
0051 //! @cond INTERNAL
0052 namespace internal {
0053 
0054 using namespace tbb::internal;
0055 
0056 typedef size_t ticket;
0057 
0058 template<typename T> class micro_queue ;
0059 template<typename T> class micro_queue_pop_finalizer ;
0060 template<typename T> class concurrent_queue_base_v3;
0061 template<typename T> struct concurrent_queue_rep;
0062 
0063 //! parts of concurrent_queue_rep that do not have references to micro_queue
0064 /**
0065  * For internal use only.
0066  */
0067 struct concurrent_queue_rep_base : no_copy {
0068     template<typename T> friend class micro_queue;
0069     template<typename T> friend class concurrent_queue_base_v3;
0070 
0071 protected:
0072     //! Approximately n_queue/golden ratio
0073     static const size_t phi = 3;
0074 
0075 public:
0076     // must be power of 2
0077     static const size_t n_queue = 8;
0078 
0079     //! Prefix on a page
0080     struct page {
0081         page* next;
0082         uintptr_t mask;
0083     };
0084 
0085     atomic<ticket> head_counter;
0086     char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
0087     atomic<ticket> tail_counter;
0088     char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
0089 
0090     //! Always a power of 2
0091     size_t items_per_page;
0092 
0093     //! Size of an item
0094     size_t item_size;
0095 
0096     //! number of invalid entries in the queue
0097     atomic<size_t> n_invalid_entries;
0098 
0099     char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
0100 } ;
0101 
0102 inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
0103     return uintptr_t(p)>1;
0104 }
0105 
0106 //! Abstract class to define interface for page allocation/deallocation
0107 /**
0108  * For internal use only.
0109  */
0110 class concurrent_queue_page_allocator
0111 {
0112     template<typename T> friend class micro_queue ;
0113     template<typename T> friend class micro_queue_pop_finalizer ;
0114 protected:
0115     virtual ~concurrent_queue_page_allocator() {}
0116 private:
0117     virtual concurrent_queue_rep_base::page* allocate_page() = 0;
0118     virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
0119 } ;
0120 
0121 #if _MSC_VER && !defined(__INTEL_COMPILER)
0122 // unary minus operator applied to unsigned type, result still unsigned
0123 #pragma warning( push )
0124 #pragma warning( disable: 4146 )
0125 #endif
0126 
0127 //! A queue using simple locking.
0128 /** For efficiency, this class has no constructor.
0129     The caller is expected to zero-initialize it. */
0130 template<typename T>
0131 class micro_queue : no_copy {
0132 public:
0133     typedef void (*item_constructor_t)(T* location, const void* src);
0134 private:
0135     typedef concurrent_queue_rep_base::page page;
0136 
0137     //! Class used to ensure exception-safety of method "pop"
0138     class destroyer: no_copy {
0139         T& my_value;
0140     public:
0141         destroyer( T& value ) : my_value(value) {}
0142         ~destroyer() {my_value.~T();}
0143     };
0144 
0145     void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
0146         construct_item( &get_ref(dst, dindex), src );
0147     }
0148 
0149     void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
0150         item_constructor_t construct_item )
0151     {
0152         T& src_item = get_ref( const_cast<page&>(src), sindex );
0153         construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
0154     }
0155 
0156     void assign_and_destroy_item( void* dst, page& src, size_t index ) {
0157         T& from = get_ref(src,index);
0158         destroyer d(from);
0159         *static_cast<T*>(dst) = tbb::internal::move( from );
0160     }
0161 
0162     void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
0163 
0164 public:
0165     friend class micro_queue_pop_finalizer<T>;
0166 
0167     struct padded_page: page {
0168         //! Not defined anywhere - exists to quiet warnings.
0169         padded_page();
0170         //! Not defined anywhere - exists to quiet warnings.
0171         void operator=( const padded_page& );
0172         //! Must be last field.
0173         T last;
0174     };
0175 
0176     static T& get_ref( page& p, size_t index ) {
0177         return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
0178     }
0179 
0180     atomic<page*> head_page;
0181     atomic<ticket> head_counter;
0182 
0183     atomic<page*> tail_page;
0184     atomic<ticket> tail_counter;
0185 
0186     spin_mutex page_mutex;
0187 
0188     void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
0189         item_constructor_t construct_item ) ;
0190 
0191     bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
0192 
0193     micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base,
0194         item_constructor_t construct_item ) ;
0195 
0196     page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
0197         size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
0198 
0199     void invalidate_page_and_rethrow( ticket k ) ;
0200 };
0201 
0202 template<typename T>
0203 void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
0204     for( atomic_backoff b(true);;b.pause() ) {
0205         ticket c = counter;
0206         if( c==k ) return;
0207         else if( c&1 ) {
0208             ++rb.n_invalid_entries;
0209             throw_exception( eid_bad_last_alloc );
0210         }
0211     }
0212 }
0213 
0214 template<typename T>
0215 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
0216     item_constructor_t construct_item )
0217 {
0218     k &= -concurrent_queue_rep_base::n_queue;
0219     page* p = NULL;
0220     size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
0221     if( !index ) {
0222         __TBB_TRY {
0223             concurrent_queue_page_allocator& pa = base;
0224             p = pa.allocate_page();
0225         } __TBB_CATCH (...) {
0226             ++base.my_rep->n_invalid_entries;
0227             invalidate_page_and_rethrow( k );
0228         }
0229         p->mask = 0;
0230         p->next = NULL;
0231     }
0232 
0233     if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
0234     call_itt_notify(acquired, &tail_counter);
0235 
0236     if( p ) {
0237         spin_mutex::scoped_lock lock( page_mutex );
0238         page* q = tail_page;
0239         if( is_valid_page(q) )
0240             q->next = p;
0241         else
0242             head_page = p;
0243         tail_page = p;
0244     } else {
0245         p = tail_page;
0246     }
0247 
0248     __TBB_TRY {
0249         copy_item( *p, index, item, construct_item );
0250         // If no exception was thrown, mark item as present.
0251         itt_hide_store_word(p->mask,  p->mask | uintptr_t(1)<<index);
0252         call_itt_notify(releasing, &tail_counter);
0253         tail_counter += concurrent_queue_rep_base::n_queue;
0254     } __TBB_CATCH (...) {
0255         ++base.my_rep->n_invalid_entries;
0256         call_itt_notify(releasing, &tail_counter);
0257         tail_counter += concurrent_queue_rep_base::n_queue;
0258         __TBB_RETHROW();
0259     }
0260 }
0261 
0262 template<typename T>
0263 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
0264     k &= -concurrent_queue_rep_base::n_queue;
0265     if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
0266     call_itt_notify(acquired, &head_counter);
0267     if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
0268     call_itt_notify(acquired, &tail_counter);
0269     page *p = head_page;
0270     __TBB_ASSERT( p, NULL );
0271     size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
0272     bool success = false;
0273     {
0274         micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? p : NULL );
0275         if( p->mask & uintptr_t(1)<<index ) {
0276             success = true;
0277             assign_and_destroy_item( dst, *p, index );
0278         } else {
0279             --base.my_rep->n_invalid_entries;
0280         }
0281     }
0282     return success;
0283 }
0284 
0285 template<typename T>
0286 micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base,
0287     item_constructor_t construct_item )
0288 {
0289     head_counter = src.head_counter;
0290     tail_counter = src.tail_counter;
0291 
0292     const page* srcp = src.head_page;
0293     if( is_valid_page(srcp) ) {
0294         ticket g_index = head_counter;
0295         __TBB_TRY {
0296             size_t n_items  = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
0297             size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
0298             size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
0299 
0300             head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
0301             page* cur_page = head_page;
0302 
0303             if( srcp != src.tail_page ) {
0304                 for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
0305                     cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
0306                     cur_page = cur_page->next;
0307                 }
0308 
0309                 __TBB_ASSERT( srcp==src.tail_page, NULL );
0310                 size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
0311                 if( last_index==0 ) last_index = base.my_rep->items_per_page;
0312 
0313                 cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
0314                 cur_page = cur_page->next;
0315             }
0316             tail_page = cur_page;
0317         } __TBB_CATCH (...) {
0318             invalidate_page_and_rethrow( g_index );
0319         }
0320     } else {
0321         head_page = tail_page = NULL;
0322     }
0323     return *this;
0324 }
0325 
0326 template<typename T>
0327 void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
0328     // Append an invalid page at address 1 so that no more pushes are allowed.
0329     page* invalid_page = (page*)uintptr_t(1);
0330     {
0331         spin_mutex::scoped_lock lock( page_mutex );
0332         itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
0333         page* q = tail_page;
0334         if( is_valid_page(q) )
0335             q->next = invalid_page;
0336         else
0337             head_page = invalid_page;
0338         tail_page = invalid_page;
0339     }
0340     __TBB_RETHROW();
0341 }
0342 
0343 template<typename T>
0344 concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base,
0345     const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
0346     ticket& g_index, item_constructor_t construct_item )
0347 {
0348     concurrent_queue_page_allocator& pa = base;
0349     page* new_page = pa.allocate_page();
0350     new_page->next = NULL;
0351     new_page->mask = src_page->mask;
0352     for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
0353         if( new_page->mask & uintptr_t(1)<<begin_in_page )
0354             copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
0355     return new_page;
0356 }
0357 
0358 template<typename T>
0359 class micro_queue_pop_finalizer: no_copy {
0360     typedef concurrent_queue_rep_base::page page;
0361     ticket my_ticket;
0362     micro_queue<T>& my_queue;
0363     page* my_page;
0364     concurrent_queue_page_allocator& allocator;
0365 public:
0366     micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
0367         my_ticket(k), my_queue(queue), my_page(p), allocator(b)
0368     {}
0369     ~micro_queue_pop_finalizer() ;
0370 };
0371 
0372 template<typename T>
0373 micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
0374     page* p = my_page;
0375     if( is_valid_page(p) ) {
0376         spin_mutex::scoped_lock lock( my_queue.page_mutex );
0377         page* q = p->next;
0378         my_queue.head_page = q;
0379         if( !is_valid_page(q) ) {
0380             my_queue.tail_page = NULL;
0381         }
0382     }
0383     itt_store_word_with_release(my_queue.head_counter, my_ticket);
0384     if( is_valid_page(p) ) {
0385         allocator.deallocate_page( p );
0386     }
0387 }
0388 
0389 #if _MSC_VER && !defined(__INTEL_COMPILER)
0390 #pragma warning( pop )
0391 #endif // warning 4146 is back
0392 
0393 template<typename T> class concurrent_queue_iterator_rep ;
0394 template<typename T> class concurrent_queue_iterator_base_v3;
0395 
0396 //! representation of concurrent_queue_base
0397 /**
0398  * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
0399  */
0400 template<typename T>
0401 struct concurrent_queue_rep : public concurrent_queue_rep_base {
0402     micro_queue<T> array[n_queue];
0403 
0404     //! Map ticket to an array index
0405     static size_t index( ticket k ) {
0406         return k*phi%n_queue;
0407     }
0408 
0409     micro_queue<T>& choose( ticket k ) {
0410         // The formula here approximates LRU in a cache-oblivious way.
0411         return array[index(k)];
0412     }
0413 };
0414 
0415 //! base class of concurrent_queue
0416 /**
0417  * The class implements the interface defined by concurrent_queue_page_allocator
0418  * and has a pointer to an instance of concurrent_queue_rep.
0419  */
0420 template<typename T>
0421 class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
0422 private:
0423     //! Internal representation
0424     concurrent_queue_rep<T>* my_rep;
0425 
0426     friend struct concurrent_queue_rep<T>;
0427     friend class micro_queue<T>;
0428     friend class concurrent_queue_iterator_rep<T>;
0429     friend class concurrent_queue_iterator_base_v3<T>;
0430 
0431 protected:
0432     typedef typename concurrent_queue_rep<T>::page page;
0433 
0434 private:
0435     typedef typename micro_queue<T>::padded_page padded_page;
0436     typedef typename micro_queue<T>::item_constructor_t item_constructor_t;
0437 
0438     virtual page *allocate_page() __TBB_override {
0439         concurrent_queue_rep<T>& r = *my_rep;
0440         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
0441         return reinterpret_cast<page*>(allocate_block ( n ));
0442     }
0443 
0444     virtual void deallocate_page( concurrent_queue_rep_base::page *p ) __TBB_override {
0445         concurrent_queue_rep<T>& r = *my_rep;
0446         size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
0447         deallocate_block( reinterpret_cast<void*>(p), n );
0448     }
0449 
0450     //! custom allocator
0451     virtual void *allocate_block( size_t n ) = 0;
0452 
0453     //! custom de-allocator
0454     virtual void deallocate_block( void *p, size_t n ) = 0;
0455 
0456 protected:
0457     concurrent_queue_base_v3();
0458 
0459     virtual ~concurrent_queue_base_v3() {
0460 #if TBB_USE_ASSERT
0461         size_t nq = my_rep->n_queue;
0462         for( size_t i=0; i<nq; i++ )
0463             __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
0464 #endif /* TBB_USE_ASSERT */
0465         cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
0466     }
0467 
0468     //! Enqueue item at tail of queue
0469     void internal_push( const void* src, item_constructor_t construct_item ) {
0470          concurrent_queue_rep<T>& r = *my_rep;
0471          ticket k = r.tail_counter++;
0472          r.choose(k).push( src, k, *this, construct_item );
0473     }
0474 
0475     //! Attempt to dequeue item from queue.
0476     /** NULL if there was no item to dequeue. */
0477     bool internal_try_pop( void* dst ) ;
0478 
0479     //! Get size of queue; result may be invalid if queue is modified concurrently
0480     size_t internal_size() const ;
0481 
0482     //! check if the queue is empty; thread safe
0483     bool internal_empty() const ;
0484 
0485     //! free any remaining pages
0486     /* note that the name may be misleading, but it remains so due to a historical accident. */
0487     void internal_finish_clear() ;
0488 
0489     //! Obsolete
0490     void internal_throw_exception() const {
0491         throw_exception( eid_bad_alloc );
0492     }
0493 
0494     //! copy or move internal representation
0495     void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
0496 
0497 #if __TBB_CPP11_RVALUE_REF_PRESENT
0498     //! swap internal representation
0499     void internal_swap( concurrent_queue_base_v3& src ) {
0500         std::swap( my_rep, src.my_rep );
0501     }
0502 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
0503 };
0504 
0505 template<typename T>
0506 concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
0507     const size_t item_size = sizeof(T);
0508     my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
0509     __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
0510     __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
0511     __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
0512     __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
0513     memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep<T>));
0514     my_rep->item_size = item_size;
0515     my_rep->items_per_page = item_size<=  8 ? 32 :
0516                              item_size<= 16 ? 16 :
0517                              item_size<= 32 ?  8 :
0518                              item_size<= 64 ?  4 :
0519                              item_size<=128 ?  2 :
0520                              1;
0521 }
0522 
0523 template<typename T>
0524 bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
0525     concurrent_queue_rep<T>& r = *my_rep;
0526     ticket k;
0527     do {
0528         k = r.head_counter;
0529         for(;;) {
0530             if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
0531                 // Queue is empty
0532                 return false;
0533             }
0534             // Queue had item with ticket k when we looked.  Attempt to get that item.
0535             ticket tk=k;
0536 #if defined(_MSC_VER) && defined(_Wp64)
0537     #pragma warning (push)
0538     #pragma warning (disable: 4267)
0539 #endif
0540             k = r.head_counter.compare_and_swap( tk+1, tk );
0541 #if defined(_MSC_VER) && defined(_Wp64)
0542     #pragma warning (pop)
0543 #endif
0544             if( k==tk )
0545                 break;
0546             // Another thread snatched the item, retry.
0547         }
0548     } while( !r.choose( k ).pop( dst, k, *this ) );
0549     return true;
0550 }
0551 
0552 template<typename T>
0553 size_t concurrent_queue_base_v3<T>::internal_size() const {
0554     concurrent_queue_rep<T>& r = *my_rep;
0555     __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
0556     ticket hc = r.head_counter;
0557     size_t nie = r.n_invalid_entries;
0558     ticket tc = r.tail_counter;
0559     __TBB_ASSERT( hc!=tc || !nie, NULL );
0560     ptrdiff_t sz = tc-hc-nie;
0561     return sz<0 ? 0 :  size_t(sz);
0562 }
0563 
0564 template<typename T>
0565 bool concurrent_queue_base_v3<T>::internal_empty() const {
0566     concurrent_queue_rep<T>& r = *my_rep;
0567     ticket tc = r.tail_counter;
0568     ticket hc = r.head_counter;
0569     // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
0570     return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
0571 }
0572 
0573 template<typename T>
0574 void concurrent_queue_base_v3<T>::internal_finish_clear() {
0575     concurrent_queue_rep<T>& r = *my_rep;
0576     size_t nq = r.n_queue;
0577     for( size_t i=0; i<nq; ++i ) {
0578         page* tp = r.array[i].tail_page;
0579         if( is_valid_page(tp) ) {
0580             __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
0581             deallocate_page( tp );
0582             r.array[i].tail_page = NULL;
0583         } else
0584             __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
0585     }
0586 }
0587 
0588 template<typename T>
0589 void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src,
0590     item_constructor_t construct_item )
0591 {
0592     concurrent_queue_rep<T>& r = *my_rep;
0593     r.items_per_page = src.my_rep->items_per_page;
0594 
0595     // copy concurrent_queue_rep data
0596     r.head_counter = src.my_rep->head_counter;
0597     r.tail_counter = src.my_rep->tail_counter;
0598     r.n_invalid_entries = src.my_rep->n_invalid_entries;
0599 
0600     // copy or move micro_queues
0601     for( size_t i = 0; i < r.n_queue; ++i )
0602         r.array[i].assign( src.my_rep->array[i], *this, construct_item);
0603 
0604     __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
0605             "the source concurrent queue should not be concurrently modified." );
0606 }
0607 
0608 template<typename Container, typename Value> class concurrent_queue_iterator;
0609 
0610 template<typename T>
0611 class concurrent_queue_iterator_rep: no_assign {
0612     typedef typename micro_queue<T>::padded_page padded_page;
0613 public:
0614     ticket head_counter;
0615     const concurrent_queue_base_v3<T>& my_queue;
0616     typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
0617     concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
0618         head_counter(queue.my_rep->head_counter),
0619         my_queue(queue)
0620     {
0621         for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
0622             array[k] = queue.my_rep->array[k].head_page;
0623     }
0624 
0625     //! Set item to point to kth element.  Return true if at end of queue or item is marked valid; false otherwise.
0626     bool get_item( T*& item, size_t k ) ;
0627 };
0628 
0629 template<typename T>
0630 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
0631     if( k==my_queue.my_rep->tail_counter ) {
0632         item = NULL;
0633         return true;
0634     } else {
0635         typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
0636         __TBB_ASSERT(p,NULL);
0637         size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
0638         item = &micro_queue<T>::get_ref(*p,i);
0639         return (p->mask & uintptr_t(1)<<i)!=0;
0640     }
0641 }
0642 
0643 //! Constness-independent portion of concurrent_queue_iterator.
0644 /** @ingroup containers */
0645 template<typename Value>
0646 class concurrent_queue_iterator_base_v3 {
0647     //! Represents concurrent_queue over which we are iterating.
0648     /** NULL if one past last element in queue. */
0649     concurrent_queue_iterator_rep<Value>* my_rep;
0650 
0651     template<typename C, typename T, typename U>
0652     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
0653 
0654     template<typename C, typename T, typename U>
0655     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
0656 protected:
0657     //! Pointer to current item
0658     Value* my_item;
0659 
0660     //! Default constructor
0661     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
0662 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
0663         __TBB_compiler_fence();
0664 #endif
0665     }
0666 
0667     //! Copy constructor
0668     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
0669     : my_rep(NULL), my_item(NULL) {
0670         assign(i);
0671     }
0672 
0673     concurrent_queue_iterator_base_v3& operator=( const concurrent_queue_iterator_base_v3& i ) {
0674         assign(i);
0675         return *this;
0676     }
0677 
0678     //! Construct iterator pointing to head of queue.
0679     concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
0680 
0681     //! Assignment
0682     void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
0683 
0684     //! Advance iterator one step towards tail of queue.
0685     void advance() ;
0686 
0687     //! Destructor
0688     ~concurrent_queue_iterator_base_v3() {
0689         cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
0690         my_rep = NULL;
0691     }
0692 };
0693 
0694 template<typename Value>
0695 concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
0696     my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
0697     new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
0698     size_t k = my_rep->head_counter;
0699     if( !my_rep->get_item(my_item, k) ) advance();
0700 }
0701 
0702 template<typename Value>
0703 void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
0704     if( my_rep!=other.my_rep ) {
0705         if( my_rep ) {
0706             cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
0707             my_rep = NULL;
0708         }
0709         if( other.my_rep ) {
0710             my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
0711             new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
0712         }
0713     }
0714     my_item = other.my_item;
0715 }
0716 
0717 template<typename Value>
0718 void concurrent_queue_iterator_base_v3<Value>::advance() {
0719     __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
0720     size_t k = my_rep->head_counter;
0721     const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
0722 #if TBB_USE_ASSERT
0723     Value* tmp;
0724     my_rep->get_item(tmp,k);
0725     __TBB_ASSERT( my_item==tmp, NULL );
0726 #endif /* TBB_USE_ASSERT */
0727     size_t i = modulo_power_of_two( k/concurrent_queue_rep<Value>::n_queue, queue.my_rep->items_per_page );
0728     if( i==queue.my_rep->items_per_page-1 ) {
0729         typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
0730         root = root->next;
0731     }
0732     // advance k
0733     my_rep->head_counter = ++k;
0734     if( !my_rep->get_item(my_item, k) ) advance();
0735 }
0736 
0737 //! Similar to C++0x std::remove_cv
0738 /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
0739 template<typename T> struct tbb_remove_cv {typedef T type;};
0740 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
0741 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
0742 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
0743 
0744 //! Meets requirements of a forward iterator for STL.
0745 /** Value is either the T or const T type of the container.
0746     @ingroup containers */
0747 template<typename Container, typename Value>
0748 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
0749         public std::iterator<std::forward_iterator_tag,Value> {
0750 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
0751     template<typename T, class A>
0752     friend class ::tbb::strict_ppl::concurrent_queue;
0753 #else
0754 public:
0755 #endif
0756     //! Construct iterator pointing to head of queue.
0757     explicit concurrent_queue_iterator( const concurrent_queue_base_v3<typename tbb_remove_cv<Value>::type>& queue ) :
0758         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
0759     {
0760     }
0761 
0762 public:
0763     concurrent_queue_iterator() {}
0764 
0765     /** If Value==Container::value_type, then this routine is the copy constructor.
0766         If Value==const Container::value_type, then this routine is a conversion constructor. */
0767     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
0768         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
0769     {}
0770 
0771     //! Iterator assignment
0772     concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) {
0773         concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>::operator=(other);
0774         return *this;
0775     }
0776 
0777     //! Reference to current item
0778     Value& operator*() const {
0779         return *static_cast<Value*>(this->my_item);
0780     }
0781 
0782     Value* operator->() const {return &operator*();}
0783 
0784     //! Advance to next item in queue
0785     concurrent_queue_iterator& operator++() {
0786         this->advance();
0787         return *this;
0788     }
0789 
0790     //! Post increment
0791     Value* operator++(int) {
0792         Value* result = &operator*();
0793         operator++();
0794         return result;
0795     }
0796 }; // concurrent_queue_iterator
0797 
0798 
0799 template<typename C, typename T, typename U>
0800 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
0801     return i.my_item==j.my_item;
0802 }
0803 
0804 template<typename C, typename T, typename U>
0805 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
0806     return i.my_item!=j.my_item;
0807 }
0808 
0809 } // namespace internal
0810 
0811 //! @endcond
0812 
0813 } // namespace strict_ppl
0814 
0815 //! @cond INTERNAL
0816 namespace internal {
0817 
0818 class concurrent_queue_rep;
0819 class concurrent_queue_iterator_rep;
0820 class concurrent_queue_iterator_base_v3;
0821 template<typename Container, typename Value> class concurrent_queue_iterator;
0822 
0823 //! For internal use only.
0824 /** Type-independent portion of concurrent_queue.
0825     @ingroup containers */
0826 class concurrent_queue_base_v3: no_copy {
0827 private:
0828     //! Internal representation
0829     concurrent_queue_rep* my_rep;
0830 
0831     friend class concurrent_queue_rep;
0832     friend struct micro_queue;
0833     friend class micro_queue_pop_finalizer;
0834     friend class concurrent_queue_iterator_rep;
0835     friend class concurrent_queue_iterator_base_v3;
0836 protected:
0837     //! Prefix on a page
0838     struct page {
0839         page* next;
0840         uintptr_t mask;
0841     };
0842 
0843     //! Capacity of the queue
0844     ptrdiff_t my_capacity;
0845 
0846     //! Always a power of 2
0847     size_t items_per_page;
0848 
0849     //! Size of an item
0850     size_t item_size;
0851 
0852     enum copy_specifics { copy, move };
0853 
0854 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
0855 public:
0856 #endif
0857     template<typename T>
0858     struct padded_page: page {
0859         //! Not defined anywhere - exists to quiet warnings.
0860         padded_page();
0861         //! Not defined anywhere - exists to quiet warnings.
0862         void operator=( const padded_page& );
0863         //! Must be last field.
0864         T last;
0865     };
0866 
0867 private:
0868     virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
0869     virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
0870 protected:
0871     __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
0872     virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
0873 
0874     //! Enqueue item at tail of queue using copy operation
0875     void __TBB_EXPORTED_METHOD internal_push( const void* src );
0876 
0877     //! Dequeue item from head of queue
0878     void __TBB_EXPORTED_METHOD internal_pop( void* dst );
0879 
0880     //! Abort all pending queue operations
0881     void __TBB_EXPORTED_METHOD internal_abort();
0882 
0883     //! Attempt to enqueue item onto queue using copy operation
0884     bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
0885 
0886     //! Attempt to dequeue item from queue.
0887     /** NULL if there was no item to dequeue. */
0888     bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
0889 
0890     //! Get size of queue
0891     ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
0892 
0893     //! Check if the queue is empty
0894     bool __TBB_EXPORTED_METHOD internal_empty() const;
0895 
0896     //! Set the queue capacity
0897     void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
0898 
0899     //! custom allocator
0900     virtual page *allocate_page() = 0;
0901 
0902     //! custom de-allocator
0903     virtual void deallocate_page( page *p ) = 0;
0904 
0905     //! free any remaining pages
0906     /* note that the name may be misleading, but it remains so due to a historical accident. */
0907     void __TBB_EXPORTED_METHOD internal_finish_clear() ;
0908 
0909     //! throw an exception
0910     void __TBB_EXPORTED_METHOD internal_throw_exception() const;
0911 
0912     //! copy internal representation
0913     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
0914 
0915 #if __TBB_CPP11_RVALUE_REF_PRESENT
0916     //! swap queues
0917     void internal_swap( concurrent_queue_base_v3& src ) {
0918         std::swap( my_capacity, src.my_capacity );
0919         std::swap( items_per_page, src.items_per_page );
0920         std::swap( item_size, src.item_size );
0921         std::swap( my_rep, src.my_rep );
0922     }
0923 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
0924 
0925     //! Enqueues item at tail of queue using specified operation (copy or move)
0926     void internal_insert_item( const void* src, copy_specifics op_type );
0927 
0928     //! Attempts to enqueue at tail of queue using specified operation (copy or move)
0929     bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
0930 
0931     //! Assigns one queue to another using specified operation (copy or move)
0932     void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
0933 private:
0934     virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
0935 };
0936 
0937 //! For internal use only.
0938 /** Backward compatible modification of concurrent_queue_base_v3
0939     @ingroup containers */
0940 class concurrent_queue_base_v8: public concurrent_queue_base_v3 {
0941 protected:
0942     concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
0943 
0944     //! move items
0945     void __TBB_EXPORTED_METHOD move_content( concurrent_queue_base_v8& src ) ;
0946 
0947     //! Attempt to enqueue item onto queue using move operation
0948     bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full( const void* src );
0949 
0950     //! Enqueue item at tail of queue using move operation
0951     void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
0952 private:
0953     friend struct micro_queue;
0954     virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
0955     virtual void move_item( page& dst, size_t index, const void* src ) = 0;
0956 };
0957 
0958 //! Type-independent portion of concurrent_queue_iterator.
0959 /** @ingroup containers */
0960 class concurrent_queue_iterator_base_v3 {
0961     //! concurrent_queue over which we are iterating.
0962     /** NULL if one past last element in queue. */
0963     concurrent_queue_iterator_rep* my_rep;
0964 
0965     template<typename C, typename T, typename U>
0966     friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
0967 
0968     template<typename C, typename T, typename U>
0969     friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
0970 
0971     void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
0972 protected:
0973     //! Pointer to current item
0974     void* my_item;
0975 
0976     //! Default constructor
0977     concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
0978 
0979     //! Copy constructor
0980     concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
0981         assign(i);
0982     }
0983 
0984     concurrent_queue_iterator_base_v3& operator=( const concurrent_queue_iterator_base_v3& i ) {
0985         assign(i);
0986         return *this;
0987     }
0988 
0989     //! Obsolete entry point for constructing iterator pointing to head of queue.
0990     /** Does not work correctly for SSE types. */
0991     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
0992 
0993     //! Construct iterator pointing to head of queue.
0994     __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
0995 
0996     //! Assignment
0997     void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
0998 
0999     //! Advance iterator one step towards tail of queue.
1000     void __TBB_EXPORTED_METHOD advance();
1001 
1002     //! Destructor
1003     __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
1004 };
1005 
1006 typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
1007 
1008 //! Meets requirements of a forward iterator for STL.
1009 /** Value is either the T or const T type of the container.
1010     @ingroup containers */
1011 template<typename Container, typename Value>
1012 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
1013         public std::iterator<std::forward_iterator_tag,Value> {
1014 
1015 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1016     template<typename T, class A>
1017     friend class ::tbb::concurrent_bounded_queue;
1018 #else
1019 public:
1020 #endif
1021 
1022     //! Construct iterator pointing to head of queue.
1023     explicit concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
1024         concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
1025     {
1026     }
1027 
1028 public:
1029     concurrent_queue_iterator() {}
1030 
1031     /** If Value==Container::value_type, then this routine is the copy constructor.
1032         If Value==const Container::value_type, then this routine is a conversion constructor. */
1033     concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
1034         concurrent_queue_iterator_base_v3(other)
1035     {}
1036 
1037     //! Iterator assignment
1038     concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) {
1039         concurrent_queue_iterator_base_v3::operator=(other);
1040         return *this;
1041     }
1042 
1043     //! Reference to current item
1044     Value& operator*() const {
1045         return *static_cast<Value*>(my_item);
1046     }
1047 
1048     Value* operator->() const {return &operator*();}
1049 
1050     //! Advance to next item in queue
1051     concurrent_queue_iterator& operator++() {
1052         advance();
1053         return *this;
1054     }
1055 
1056     //! Post increment
1057     Value* operator++(int) {
1058         Value* result = &operator*();
1059         operator++();
1060         return result;
1061     }
1062 }; // concurrent_queue_iterator
1063 
1064 
1065 template<typename C, typename T, typename U>
1066 bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1067     return i.my_item==j.my_item;
1068 }
1069 
1070 template<typename C, typename T, typename U>
1071 bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
1072     return i.my_item!=j.my_item;
1073 }
1074 
1075 } // namespace internal;
1076 
1077 //! @endcond
1078 
1079 } // namespace tbb
1080 
1081 #endif /* __TBB__concurrent_queue_impl_H */