Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/oneapi/tbb/concurrent_priority_queue.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 /*
0002     Copyright (c) 2005-2022 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_concurrent_priority_queue_H
0018 #define __TBB_concurrent_priority_queue_H
0019 
0020 #include "detail/_namespace_injection.h"
0021 #include "detail/_aggregator.h"
0022 #include "detail/_template_helpers.h"
0023 #include "detail/_allocator_traits.h"
0024 #include "detail/_range_common.h"
0025 #include "detail/_exception.h"
0026 #include "detail/_utils.h"
0027 #include "detail/_containers_helpers.h"
0028 #include "cache_aligned_allocator.h"
0029 #include <vector>
0030 #include <iterator>
0031 #include <functional>
0032 #include <utility>
0033 #include <initializer_list>
0034 #include <type_traits>
0035 
0036 namespace tbb {
0037 namespace detail {
0038 namespace d1 {
0039 
0040 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>>
0041 class concurrent_priority_queue {
0042 public:
0043     using value_type = T;
0044     using reference = T&;
0045     using const_reference = const T&;
0046 
0047     using size_type = std::size_t;
0048     using difference_type = std::ptrdiff_t;
0049 
0050     using allocator_type = Allocator;
0051 
0052     concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {}
0053 
0054     explicit concurrent_priority_queue( const allocator_type& alloc )
0055         : mark(0), my_size(0), my_compare(), data(alloc)
0056     {
0057         my_aggregator.initialize_handler(functor{this});
0058     }
0059 
0060     explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() )
0061         : mark(0), my_size(0), my_compare(compare), data(alloc)
0062     {
0063         my_aggregator.initialize_handler(functor{this});
0064     }
0065 
0066     explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() )
0067         : mark(0), my_size(0), my_compare(), data(alloc)
0068     {
0069         data.reserve(init_capacity);
0070         my_aggregator.initialize_handler(functor{this});
0071     }
0072 
0073     explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() )
0074         : mark(0), my_size(0), my_compare(compare), data(alloc)
0075     {
0076         data.reserve(init_capacity);
0077         my_aggregator.initialize_handler(functor{this});
0078     }
0079 
0080     template <typename InputIterator>
0081     concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() )
0082         : mark(0), my_compare(compare), data(begin, end, alloc)
0083     {
0084         my_aggregator.initialize_handler(functor{this});
0085         heapify();
0086         my_size.store(data.size(), std::memory_order_relaxed);
0087     }
0088 
0089     template <typename InputIterator>
0090     concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() )
0091         : concurrent_priority_queue(begin, end, Compare(), alloc) {}
0092 
0093     concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() )
0094         : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {}
0095 
0096     concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() )
0097         : concurrent_priority_queue(init, Compare(), alloc) {}
0098 
0099     concurrent_priority_queue( const concurrent_priority_queue& other )
0100         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
0101           data(other.data)
0102     {
0103         my_aggregator.initialize_handler(functor{this});
0104     }
0105 
0106     concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc )
0107         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
0108           data(other.data, alloc)
0109     {
0110         my_aggregator.initialize_handler(functor{this});
0111     }
0112 
0113     concurrent_priority_queue( concurrent_priority_queue&& other )
0114         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
0115           data(std::move(other.data))
0116     {
0117         my_aggregator.initialize_handler(functor{this});
0118     }
0119 
0120     concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc )
0121         : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
0122           data(std::move(other.data), alloc)
0123     {
0124         my_aggregator.initialize_handler(functor{this});
0125     }
0126 
0127     concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) {
0128         if (this != &other) {
0129             data = other.data;
0130             mark = other.mark;
0131             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
0132         }
0133         return *this;
0134     }
0135 
0136     concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) {
0137         if (this != &other) {
0138             // TODO: check if exceptions from std::vector::operator=(vector&&) should be handled separately
0139             data = std::move(other.data);
0140             mark = other.mark;
0141             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
0142         }
0143         return *this;
0144     }
0145 
0146     concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) {
0147         assign(init.begin(), init.end());
0148         return *this;
0149     }
0150 
0151     template <typename InputIterator>
0152     void assign( InputIterator begin, InputIterator end ) {
0153         data.assign(begin, end);
0154         mark = 0;
0155         my_size.store(data.size(), std::memory_order_relaxed);
0156         heapify();
0157     }
0158 
0159     void assign( std::initializer_list<value_type> init ) {
0160         assign(init.begin(), init.end());
0161     }
0162 
0163     /* Returned value may not reflect results of pending operations.
0164        This operation reads shared data and will trigger a race condition. */
0165     __TBB_nodiscard bool empty() const { return size() == 0; }
0166 
0167     // Returns the current number of elements contained in the queue
0168     /* Returned value may not reflect results of pending operations.
0169        This operation reads shared data and will trigger a race condition. */
0170     size_type size() const { return my_size.load(std::memory_order_relaxed); }
0171 
0172     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
0173     void push( const value_type& value ) {
0174         cpq_operation op_data(value, PUSH_OP);
0175         my_aggregator.execute(&op_data);
0176         if (op_data.status == FAILED)
0177             throw_exception(exception_id::bad_alloc);
0178     }
0179 
0180     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
0181     void push( value_type&& value ) {
0182         cpq_operation op_data(value, PUSH_RVALUE_OP);
0183         my_aggregator.execute(&op_data);
0184         if (op_data.status == FAILED)
0185             throw_exception(exception_id::bad_alloc);
0186     }
0187 
0188     /* This operation can be safely used concurrently with other push, try_pop or emplace operations. */
0189     template <typename... Args>
0190     void emplace( Args&&... args ) {
0191         // TODO: support uses allocator construction in this place
0192         push(value_type(std::forward<Args>(args)...));
0193     }
0194 
0195     // Gets a reference to and removes highest priority element
0196     /* If a highest priority element was found, sets elem and returns true,
0197        otherwise returns false.
0198        This operation can be safely used concurrently with other push, try_pop or emplace operations. */
0199     bool try_pop( value_type& value ) {
0200         cpq_operation op_data(value, POP_OP);
0201         my_aggregator.execute(&op_data);
0202         return op_data.status == SUCCEEDED;
0203     }
0204 
0205     // This operation affects the whole container => it is not thread-safe
0206     void clear() {
0207         data.clear();
0208         mark = 0;
0209         my_size.store(0, std::memory_order_relaxed);
0210     }
0211 
0212     // This operation affects the whole container => it is not thread-safe
0213     void swap( concurrent_priority_queue& other ) {
0214         if (this != &other) {
0215             using std::swap;
0216             swap(data, other.data);
0217             swap(mark, other.mark);
0218 
0219             size_type sz = my_size.load(std::memory_order_relaxed);
0220             my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
0221             other.my_size.store(sz, std::memory_order_relaxed);
0222         }
0223     }
0224 
0225     allocator_type get_allocator() const { return data.get_allocator(); }
0226 private:
0227     enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
0228     enum operation_status {WAIT = 0, SUCCEEDED, FAILED};
0229 
0230     class cpq_operation : public aggregated_operation<cpq_operation> {
0231     public:
0232         operation_type type;
0233         union {
0234             value_type* elem;
0235             size_type sz;
0236         };
0237         cpq_operation( const value_type& value, operation_type t )
0238             : type(t), elem(const_cast<value_type*>(&value)) {}
0239     }; // class cpq_operation
0240 
0241     class functor {
0242         concurrent_priority_queue* my_cpq;
0243     public:
0244         functor() : my_cpq(nullptr) {}
0245         functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {}
0246 
0247         void operator()(cpq_operation* op_list) {
0248             __TBB_ASSERT(my_cpq != nullptr, "Invalid functor");
0249             my_cpq->handle_operations(op_list);
0250         }
0251     }; // class functor
0252 
0253     void handle_operations( cpq_operation* op_list ) {
0254         call_itt_notify(acquired, this);
0255         cpq_operation* tmp, *pop_list = nullptr;
0256         __TBB_ASSERT(mark == data.size(), nullptr);
0257 
0258         // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
0259         while(op_list) {
0260             // ITT note: &(op_list->status) tag is used to cover accesses to op_list
0261             // node. This thread is going to handle the operation, and so will acquire it
0262             // and perform the associated operation w/o triggering a race condition; the
0263             // thread that created the operation is waiting on the status field, so when
0264             // this thread is done with the operation, it will perform a
0265             // store_with_release to give control back to the waiting thread in
0266             // aggregator::insert_operation.
0267             // TODO: enable
0268             call_itt_notify(acquired, &(op_list->status));
0269             __TBB_ASSERT(op_list->type != INVALID_OP, nullptr);
0270 
0271             tmp = op_list;
0272             op_list = op_list->next.load(std::memory_order_relaxed);
0273             if (tmp->type == POP_OP) {
0274                 if (mark < data.size() &&
0275                     my_compare(data[0], data.back()))
0276                 {
0277                     // there are newly pushed elems and the last one is higher than top
0278                     *(tmp->elem) = std::move(data.back());
0279                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
0280                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
0281 
0282                     data.pop_back();
0283                     __TBB_ASSERT(mark <= data.size(), nullptr);
0284                 } else { // no convenient item to pop; postpone
0285                     tmp->next.store(pop_list, std::memory_order_relaxed);
0286                     pop_list = tmp;
0287                 }
0288             } else { // PUSH_OP or PUSH_RVALUE_OP
0289                 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation");
0290 #if TBB_USE_EXCEPTIONS
0291                 try
0292 #endif
0293                 {
0294                     if (tmp->type == PUSH_OP) {
0295                         push_back_helper(*(tmp->elem));
0296                     } else {
0297                         data.push_back(std::move(*(tmp->elem)));
0298                     }
0299                     my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
0300                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
0301                 }
0302 #if TBB_USE_EXCEPTIONS
0303                 catch(...) {
0304                     tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
0305                 }
0306 #endif
0307             }
0308         }
0309 
0310         // Second pass processes pop operations
0311         while(pop_list) {
0312             tmp = pop_list;
0313             pop_list = pop_list->next.load(std::memory_order_relaxed);
0314             __TBB_ASSERT(tmp->type == POP_OP, nullptr);
0315             if (data.empty()) {
0316                 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
0317             } else {
0318                 __TBB_ASSERT(mark <= data.size(), nullptr);
0319                 if (mark < data.size() &&
0320                     my_compare(data[0], data.back()))
0321                 {
0322                     // there are newly pushed elems and the last one is higher than top
0323                     *(tmp->elem) = std::move(data.back());
0324                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
0325                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
0326                     data.pop_back();
0327                 } else { // extract top and push last element down heap
0328                     *(tmp->elem) = std::move(data[0]);
0329                     my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
0330                     tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
0331                     reheap();
0332                 }
0333             }
0334         }
0335 
0336         // heapify any leftover pushed elements before doing the next
0337         // batch of operations
0338         if (mark < data.size()) heapify();
0339         __TBB_ASSERT(mark == data.size(), nullptr);
0340         call_itt_notify(releasing, this);
0341     }
0342 
0343     // Merge unsorted elements into heap
0344     void heapify() {
0345         if (!mark && data.size() > 0) mark = 1;
0346         for (; mark < data.size(); ++mark) {
0347             // for each unheapified element under size
0348             size_type cur_pos = mark;
0349             value_type to_place = std::move(data[mark]);
0350             do { // push to_place up the heap
0351                 size_type parent = (cur_pos - 1) >> 1;
0352                 if (!my_compare(data[parent], to_place))
0353                     break;
0354                 data[cur_pos] = std::move(data[parent]);
0355                 cur_pos = parent;
0356             } while(cur_pos);
0357             data[cur_pos] = std::move(to_place);
0358         }
0359     }
0360 
0361     // Re-heapify after an extraction
0362     // Re-heapify by pushing last element down the heap from the root.
0363     void reheap() {
0364         size_type cur_pos = 0, child = 1;
0365 
0366         while(child < mark) {
0367             size_type target = child;
0368             if (child + 1 < mark && my_compare(data[child], data[child + 1]))
0369                 ++target;
0370             // target now has the higher priority child
0371             if (my_compare(data[target], data.back()))
0372                 break;
0373             data[cur_pos] = std::move(data[target]);
0374             cur_pos = target;
0375             child = (cur_pos << 1) + 1;
0376         }
0377         if (cur_pos != data.size() - 1)
0378             data[cur_pos] = std::move(data.back());
0379         data.pop_back();
0380         if (mark > data.size()) mark = data.size();
0381     }
0382 
0383     void push_back_helper( const T& value ) {
0384         push_back_helper_impl(value, std::is_copy_constructible<T>{});
0385     }
0386 
0387     void push_back_helper_impl( const T& value, /*is_copy_constructible = */std::true_type ) {
0388         data.push_back(value);
0389     }
0390 
0391     void push_back_helper_impl( const T&, /*is_copy_constructible = */std::false_type ) {
0392         __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type");
0393     }
0394 
0395     using aggregator_type = aggregator<functor, cpq_operation>;
0396 
0397     aggregator_type my_aggregator;
0398     // Padding added to avoid false sharing
0399     char padding1[max_nfs_size - sizeof(aggregator_type)];
0400     // The point at which unsorted elements begin
0401     size_type mark;
0402     std::atomic<size_type> my_size;
0403     Compare my_compare;
0404 
0405     // Padding added to avoid false sharing
0406     char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)];
0407     //! Storage for the heap of elements in queue, plus unheapified elements
0408     /** data has the following structure:
0409 
0410          binary unheapified
0411           heap   elements
0412         ____|_______|____
0413         |       |       |
0414         v       v       v
0415         [_|...|_|_|...|_| |...| ]
0416          0       ^       ^       ^
0417                  |       |       |__capacity
0418                  |       |__my_size
0419                  |__mark
0420 
0421         Thus, data stores the binary heap starting at position 0 through
0422         mark-1 (it may be empty).  Then there are 0 or more elements
0423         that have not yet been inserted into the heap, in positions
0424         mark through my_size-1. */
0425 
0426     using vector_type = std::vector<value_type, allocator_type>;
0427     vector_type data;
0428 
0429     friend bool operator==( const concurrent_priority_queue& lhs,
0430                             const concurrent_priority_queue& rhs )
0431     {
0432         return lhs.data == rhs.data;
0433     }
0434 
0435 #if !__TBB_CPP20_COMPARISONS_PRESENT
0436     friend bool operator!=( const concurrent_priority_queue& lhs,
0437                             const concurrent_priority_queue& rhs )
0438     {
0439         return !(lhs == rhs);
0440     }
0441 #endif
0442 }; // class concurrent_priority_queue
0443 
0444 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
0445 template <typename It,
0446           typename Comp = std::less<iterator_value_t<It>>,
0447           typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
0448           typename = std::enable_if_t<is_input_iterator_v<It>>,
0449           typename = std::enable_if_t<is_allocator_v<Alloc>>,
0450           typename = std::enable_if_t<!is_allocator_v<Comp>>>
0451 concurrent_priority_queue( It, It, Comp = Comp(), Alloc = Alloc() )
0452 -> concurrent_priority_queue<iterator_value_t<It>, Comp, Alloc>;
0453 
0454 template <typename It, typename Alloc,
0455           typename = std::enable_if_t<is_input_iterator_v<It>>,
0456           typename = std::enable_if_t<is_allocator_v<Alloc>>>
0457 concurrent_priority_queue( It, It, Alloc )
0458 -> concurrent_priority_queue<iterator_value_t<It>, std::less<iterator_value_t<It>>, Alloc>;
0459 
0460 template <typename T,
0461           typename Comp = std::less<T>,
0462           typename Alloc = tbb::cache_aligned_allocator<T>,
0463           typename = std::enable_if_t<is_allocator_v<Alloc>>,
0464           typename = std::enable_if_t<!is_allocator_v<Comp>>>
0465 concurrent_priority_queue( std::initializer_list<T>, Comp = Comp(), Alloc = Alloc() )
0466 -> concurrent_priority_queue<T, Comp, Alloc>;
0467 
0468 template <typename T, typename Alloc,
0469           typename = std::enable_if_t<is_allocator_v<Alloc>>>
0470 concurrent_priority_queue( std::initializer_list<T>, Alloc )
0471 -> concurrent_priority_queue<T, std::less<T>, Alloc>;
0472 
0473 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
0474 
0475 template <typename T, typename Compare, typename Allocator>
0476 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs,
0477            concurrent_priority_queue<T, Compare, Allocator>& rhs )
0478 {
0479     lhs.swap(rhs);
0480 }
0481 
0482 } // namespace d1
0483 } // namespace detail
0484 inline namespace v1 {
0485 using detail::d1::concurrent_priority_queue;
0486 
0487 } // inline namespace v1
0488 } // namespace tbb
0489 
0490 #endif // __TBB_concurrent_priority_queue_H