Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:53

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_concurrent_priority_queue_H
0018 #define __TBB_concurrent_priority_queue_H
0019 
0020 #define __TBB_concurrent_priority_queue_H_include_area
0021 #include "internal/_warning_suppress_enable_notice.h"
0022 
0023 #include "atomic.h"
0024 #include "cache_aligned_allocator.h"
0025 #include "tbb_exception.h"
0026 #include "tbb_stddef.h"
0027 #include "tbb_profiling.h"
0028 #include "internal/_aggregator_impl.h"
0029 #include "internal/_template_helpers.h"
0030 #include "internal/_allocator_traits.h"
0031 #include <vector>
0032 #include <iterator>
0033 #include <functional>
0034 #include __TBB_STD_SWAP_HEADER
0035 
0036 #if __TBB_INITIALIZER_LISTS_PRESENT
0037     #include <initializer_list>
0038 #endif
0039 
0040 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
0041     #include <type_traits>
0042 #endif
0043 
0044 namespace tbb {
0045 namespace interface5 {
0046 namespace internal {
0047 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
0048     template<typename T, bool C = std::is_copy_constructible<T>::value>
0049     struct use_element_copy_constructor {
0050         typedef tbb::internal::true_type type;
0051     };
0052     template<typename T>
0053     struct use_element_copy_constructor <T,false> {
0054         typedef tbb::internal::false_type type;
0055     };
0056 #else
0057     template<typename>
0058     struct use_element_copy_constructor {
0059         typedef tbb::internal::true_type type;
0060     };
0061 #endif
0062 } // namespace internal
0063 
0064 using namespace tbb::internal;
0065 
0066 //! Concurrent priority queue
0067 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
0068 class concurrent_priority_queue {
0069  public:
0070     //! Element type in the queue.
0071     typedef T value_type;
0072 
0073     //! Reference type
0074     typedef T& reference;
0075 
0076     //! Const reference type
0077     typedef const T& const_reference;
0078 
0079     //! Integral type for representing size of the queue.
0080     typedef size_t size_type;
0081 
0082     //! Difference type for iterator
0083     typedef ptrdiff_t difference_type;
0084 
0085     //! Allocator type
0086     typedef A allocator_type;
0087 
0088     //! Constructs a new concurrent_priority_queue with default capacity
0089     explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(), data(a)
0090     {
0091         my_aggregator.initialize_handler(my_functor_t(this));
0092     }
0093 
0094     //! Constructs a new concurrent_priority_queue with default capacity
0095     explicit concurrent_priority_queue(const Compare& c, const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(c), data(a)
0096     {
0097         my_aggregator.initialize_handler(my_functor_t(this));
0098     }
0099 
0100     //! Constructs a new concurrent_priority_queue with init_sz capacity
0101     explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
0102         mark(0), my_size(0), compare(), data(a)
0103     {
0104         data.reserve(init_capacity);
0105         my_aggregator.initialize_handler(my_functor_t(this));
0106     }
0107 
0108     //! Constructs a new concurrent_priority_queue with init_sz capacity
0109     explicit concurrent_priority_queue(size_type init_capacity, const Compare& c, const allocator_type& a = allocator_type()) :
0110         mark(0), my_size(0), compare(c), data(a)
0111     {
0112         data.reserve(init_capacity);
0113         my_aggregator.initialize_handler(my_functor_t(this));
0114     }
0115 
0116     //! [begin,end) constructor
0117     template<typename InputIterator>
0118     concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
0119         mark(0), compare(), data(begin, end, a)
0120     {
0121         my_aggregator.initialize_handler(my_functor_t(this));
0122         heapify();
0123         my_size = data.size();
0124     }
0125 
0126     //! [begin,end) constructor
0127     template<typename InputIterator>
0128     concurrent_priority_queue(InputIterator begin, InputIterator end, const Compare& c, const allocator_type& a = allocator_type()) :
0129         mark(0), compare(c), data(begin, end, a)
0130     {
0131         my_aggregator.initialize_handler(my_functor_t(this));
0132         heapify();
0133         my_size = data.size();
0134     }
0135 
0136 #if __TBB_INITIALIZER_LISTS_PRESENT
0137     //! Constructor from std::initializer_list
0138     concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
0139         mark(0), compare(), data(init_list.begin(), init_list.end(), a)
0140     {
0141         my_aggregator.initialize_handler(my_functor_t(this));
0142         heapify();
0143         my_size = data.size();
0144     }
0145 
0146     //! Constructor from std::initializer_list
0147     concurrent_priority_queue(std::initializer_list<T> init_list, const Compare& c, const allocator_type &a = allocator_type()) :
0148         mark(0), compare(c), data(init_list.begin(), init_list.end(), a)
0149     {
0150         my_aggregator.initialize_handler(my_functor_t(this));
0151         heapify();
0152         my_size = data.size();
0153     }
0154 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
0155 
0156     //! Copy constructor
0157     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
0158     concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
0159         my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
0160     {
0161         my_aggregator.initialize_handler(my_functor_t(this));
0162         heapify();
0163     }
0164 
0165     //! Copy constructor with specific allocator
0166     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
0167     concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
0168         my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
0169     {
0170         my_aggregator.initialize_handler(my_functor_t(this));
0171         heapify();
0172     }
0173 
0174     //! Assignment operator
0175     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
0176     concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
0177         if (this != &src) {
0178             vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
0179             mark = src.mark;
0180             my_size = src.my_size;
0181         }
0182         return *this;
0183     }
0184 
0185 #if __TBB_CPP11_RVALUE_REF_PRESENT
0186     //! Move constructor
0187     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
0188     concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
0189         my_size(src.my_size), data(std::move(src.data))
0190     {
0191         my_aggregator.initialize_handler(my_functor_t(this));
0192     }
0193 
0194     //! Move constructor with specific allocator
0195     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
0196     concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
0197         my_size(src.my_size),
0198 #if __TBB_ALLOCATOR_TRAITS_PRESENT
0199         data(std::move(src.data), a)
0200 #else
0201     // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
0202     // It seems that the reason is absence of support of allocator_traits (stateful allocators).
0203         data(a)
0204 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
0205     {
0206         my_aggregator.initialize_handler(my_functor_t(this));
0207 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
0208         if (a != src.data.get_allocator()){
0209             data.reserve(src.data.size());
0210             data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
0211         }else{
0212             data = std::move(src.data);
0213         }
0214 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
0215     }
0216 
0217     //! Move assignment operator
0218     /** This operation is unsafe if there are pending concurrent operations on the src queue. */
0219     concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
0220         if (this != &src) {
0221             mark = src.mark;
0222             my_size = src.my_size;
0223 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
0224             if (data.get_allocator() != src.data.get_allocator()){
0225                 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
0226             }else
0227 #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
0228             {
0229                 data = std::move(src.data);
0230             }
0231         }
0232         return *this;
0233     }
0234 #endif //__TBB_CPP11_RVALUE_REF_PRESENT
0235 
0236     //! Assign the queue from [begin,end) range, not thread-safe
0237     template<typename InputIterator>
0238     void assign(InputIterator begin, InputIterator end) {
0239         vector_t(begin, end, data.get_allocator()).swap(data);
0240         mark = 0;
0241         my_size = data.size();
0242         heapify();
0243     }
0244 
0245 #if __TBB_INITIALIZER_LISTS_PRESENT
0246     //! Assign the queue from std::initializer_list, not thread-safe
0247     void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
0248 
0249     //! Assign from std::initializer_list, not thread-safe
0250     concurrent_priority_queue& operator=(std::initializer_list<T> il) {
0251         this->assign(il.begin(), il.end());
0252         return *this;
0253     }
0254 #endif //# __TBB_INITIALIZER_LISTS_PRESENT
0255 
0256     //! Returns true if empty, false otherwise
0257     /** Returned value may not reflect results of pending operations.
0258         This operation reads shared data and will trigger a race condition. */
0259     bool empty() const { return size()==0; }
0260 
0261     //! Returns the current number of elements contained in the queue
0262     /** Returned value may not reflect results of pending operations.
0263         This operation reads shared data and will trigger a race condition. */
0264     size_type size() const { return __TBB_load_with_acquire(my_size); }
0265 
0266     //! Pushes elem onto the queue, increasing capacity of queue if necessary
0267     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
0268     void push(const_reference elem) {
0269 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
0270         __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
0271 #endif
0272         cpq_operation op_data(elem, PUSH_OP);
0273         my_aggregator.execute(&op_data);
0274         if (op_data.status == FAILED) // exception thrown
0275             throw_exception(eid_bad_alloc);
0276     }
0277 
0278 #if __TBB_CPP11_RVALUE_REF_PRESENT
0279     //! Pushes elem onto the queue, increasing capacity of queue if necessary
0280     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
0281     void push(value_type &&elem) {
0282         cpq_operation op_data(elem, PUSH_RVALUE_OP);
0283         my_aggregator.execute(&op_data);
0284         if (op_data.status == FAILED) // exception thrown
0285             throw_exception(eid_bad_alloc);
0286     }
0287 
0288 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
0289     //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
0290     /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
0291     template<typename... Args>
0292     void emplace(Args&&... args) {
0293         push(value_type(std::forward<Args>(args)...));
0294     }
0295 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
0296 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
0297 
0298     //! Gets a reference to and removes highest priority element
0299     /** If a highest priority element was found, sets elem and returns true,
0300         otherwise returns false.
0301         This operation can be safely used concurrently with other push, try_pop or emplace operations. */
0302     bool try_pop(reference elem) {
0303         cpq_operation op_data(POP_OP);
0304         op_data.elem = &elem;
0305         my_aggregator.execute(&op_data);
0306         return op_data.status==SUCCEEDED;
0307     }
0308 
0309     //! Clear the queue; not thread-safe
0310     /** This operation is unsafe if there are pending concurrent operations on the queue.
0311         Resets size, effectively emptying queue; does not free space.
0312         May not clear elements added in pending operations. */
0313     void clear() {
0314         data.clear();
0315         mark = 0;
0316         my_size = 0;
0317     }
0318 
0319     //! Swap this queue with another; not thread-safe
0320     /** This operation is unsafe if there are pending concurrent operations on the queue. */
0321     void swap(concurrent_priority_queue& q) {
0322         using std::swap;
0323         data.swap(q.data);
0324         swap(mark, q.mark);
0325         swap(my_size, q.my_size);
0326     }
0327 
0328     //! Return allocator object
0329     allocator_type get_allocator() const { return data.get_allocator(); }
0330 
0331  private:
0332     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
0333     enum operation_status { WAIT=0, SUCCEEDED, FAILED };
0334 
0335     class cpq_operation : public aggregated_operation<cpq_operation> {
0336      public:
0337         operation_type type;
0338         union {
0339             value_type *elem;
0340             size_type sz;
0341         };
0342         cpq_operation(const_reference e, operation_type t) :
0343             type(t), elem(const_cast<value_type*>(&e)) {}
0344         cpq_operation(operation_type t) : type(t) {}
0345     };
0346 
0347     class my_functor_t {
0348         concurrent_priority_queue<T, Compare, A> *cpq;
0349      public:
0350         my_functor_t() {}
0351         my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
0352         void operator()(cpq_operation* op_list) {
0353             cpq->handle_operations(op_list);
0354         }
0355     };
0356 
0357     typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
0358     aggregator_t my_aggregator;
0359     //! Padding added to avoid false sharing
0360     char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
0361     //! The point at which unsorted elements begin
0362     size_type mark;
0363     __TBB_atomic size_type my_size;
0364     Compare compare;
0365     //! Padding added to avoid false sharing
0366     char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
0367     //! Storage for the heap of elements in queue, plus unheapified elements
0368     /** data has the following structure:
0369 
0370          binary unheapified
0371           heap   elements
0372         ____|_______|____
0373         |       |       |
0374         v       v       v
0375         [_|...|_|_|...|_| |...| ]
0376          0       ^       ^       ^
0377                  |       |       |__capacity
0378                  |       |__my_size
0379                  |__mark
0380 
0381         Thus, data stores the binary heap starting at position 0 through
0382         mark-1 (it may be empty).  Then there are 0 or more elements
0383         that have not yet been inserted into the heap, in positions
0384         mark through my_size-1. */
0385     typedef std::vector<value_type, allocator_type> vector_t;
0386     vector_t data;
0387 
0388     void handle_operations(cpq_operation *op_list) {
0389         cpq_operation *tmp, *pop_list=NULL;
0390 
0391         __TBB_ASSERT(mark == data.size(), NULL);
0392 
0393         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
0394         while (op_list) {
0395             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
0396             // node. This thread is going to handle the operation, and so will acquire it
0397             // and perform the associated operation w/o triggering a race condition; the
0398             // thread that created the operation is waiting on the status field, so when
0399             // this thread is done with the operation, it will perform a
0400             // store_with_release to give control back to the waiting thread in
0401             // aggregator::insert_operation.
0402             call_itt_notify(acquired, &(op_list->status));
0403             __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
0404             tmp = op_list;
0405             op_list = itt_hide_load_word(op_list->next);
0406             if (tmp->type == POP_OP) {
0407                 if (mark < data.size() &&
0408                     compare(data[0], data[data.size()-1])) {
0409                     // there are newly pushed elems and the last one
0410                     // is higher than top
0411                     *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
0412                     __TBB_store_with_release(my_size, my_size-1);
0413                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
0414                     data.pop_back();
0415                     __TBB_ASSERT(mark<=data.size(), NULL);
0416                 }
0417                 else { // no convenient item to pop; postpone
0418                     itt_hide_store_word(tmp->next, pop_list);
0419                     pop_list = tmp;
0420                 }
0421             } else { // PUSH_OP or PUSH_RVALUE_OP
0422                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
0423                 __TBB_TRY{
0424                     if (tmp->type == PUSH_OP) {
0425                         push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
0426                     } else {
0427                         data.push_back(tbb::internal::move(*(tmp->elem)));
0428                     }
0429                     __TBB_store_with_release(my_size, my_size + 1);
0430                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
0431                 } __TBB_CATCH(...) {
0432                     itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
0433                 }
0434             }
0435         }
0436 
0437         // second pass processes pop operations
0438         while (pop_list) {
0439             tmp = pop_list;
0440             pop_list = itt_hide_load_word(pop_list->next);
0441             __TBB_ASSERT(tmp->type == POP_OP, NULL);
0442             if (data.empty()) {
0443                 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
0444             }
0445             else {
0446                 __TBB_ASSERT(mark<=data.size(), NULL);
0447                 if (mark < data.size() &&
0448                     compare(data[0], data[data.size()-1])) {
0449                     // there are newly pushed elems and the last one is
0450                     // higher than top
0451                     *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
0452                     __TBB_store_with_release(my_size, my_size-1);
0453                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
0454                     data.pop_back();
0455                 }
0456                 else { // extract top and push last element down heap
0457                     *(tmp->elem) = tbb::internal::move(data[0]);
0458                     __TBB_store_with_release(my_size, my_size-1);
0459                     itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
0460                     reheap();
0461                 }
0462             }
0463         }
0464 
0465         // heapify any leftover pushed elements before doing the next
0466         // batch of operations
0467         if (mark<data.size()) heapify();
0468         __TBB_ASSERT(mark == data.size(), NULL);
0469     }
0470 
0471     //! Merge unsorted elements into heap
0472     void heapify() {
0473         if (!mark && data.size()>0) mark = 1;
0474         for (; mark<data.size(); ++mark) {
0475             // for each unheapified element under size
0476             size_type cur_pos = mark;
0477             value_type to_place = tbb::internal::move(data[mark]);
0478             do { // push to_place up the heap
0479                 size_type parent = (cur_pos-1)>>1;
0480                 if (!compare(data[parent], to_place)) break;
0481                 data[cur_pos] = tbb::internal::move(data[parent]);
0482                 cur_pos = parent;
0483             } while( cur_pos );
0484             data[cur_pos] = tbb::internal::move(to_place);
0485         }
0486     }
0487 
0488     //! Re-heapify after an extraction
0489     /** Re-heapify by pushing last element down the heap from the root. */
0490     void reheap() {
0491         size_type cur_pos=0, child=1;
0492 
0493         while (child < mark) {
0494             size_type target = child;
0495             if (child+1 < mark && compare(data[child], data[child+1]))
0496                 ++target;
0497             // target now has the higher priority child
0498             if (compare(data[target], data[data.size()-1])) break;
0499             data[cur_pos] = tbb::internal::move(data[target]);
0500             cur_pos = target;
0501             child = (cur_pos<<1)+1;
0502         }
0503         if (cur_pos != data.size()-1)
0504             data[cur_pos] = tbb::internal::move(data[data.size()-1]);
0505         data.pop_back();
0506         if (mark > data.size()) mark = data.size();
0507     }
0508 
0509     void push_back_helper(const T& t, tbb::internal::true_type) {
0510         data.push_back(t);
0511     }
0512 
0513     void push_back_helper(const T&, tbb::internal::false_type) {
0514         __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
0515     }
0516 };
0517 
0518 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
0519 namespace internal {
0520 
0521 template<typename T, typename... Args>
0522 using priority_queue_t = concurrent_priority_queue<
0523     T,
0524     std::conditional_t< (sizeof...(Args)>0) && !is_allocator_v< pack_element_t<0, Args...> >,
0525                         pack_element_t<0, Args...>, std::less<T> >,
0526     std::conditional_t< (sizeof...(Args)>0) && is_allocator_v< pack_element_t<sizeof...(Args)-1, Args...> >,
0527                          pack_element_t<sizeof...(Args)-1, Args...>, cache_aligned_allocator<T> >
0528 >;
0529 }
0530 
0531 // Deduction guide for the constructor from two iterators
0532 template<typename InputIterator,
0533          typename T = typename std::iterator_traits<InputIterator>::value_type,
0534          typename... Args
0535 > concurrent_priority_queue(InputIterator, InputIterator, Args...)
0536 -> internal::priority_queue_t<T, Args...>;
0537 
0538 template<typename T, typename CompareOrAllocalor>
0539 concurrent_priority_queue(std::initializer_list<T> init_list, CompareOrAllocalor)
0540 -> internal::priority_queue_t<T, CompareOrAllocalor>;
0541 
0542 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
0543 } // namespace interface5
0544 
0545 using interface5::concurrent_priority_queue;
0546 
0547 } // namespace tbb
0548 
0549 #include "internal/_warning_suppress_disable_notice.h"
0550 #undef __TBB_concurrent_priority_queue_H_include_area
0551 
0552 #endif /* __TBB_concurrent_priority_queue_H */