Warning, file /include/oneapi/tbb/concurrent_priority_queue.h was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_concurrent_priority_queue_H
0018 #define __TBB_concurrent_priority_queue_H
0019
0020 #include "detail/_namespace_injection.h"
0021 #include "detail/_aggregator.h"
0022 #include "detail/_template_helpers.h"
0023 #include "detail/_allocator_traits.h"
0024 #include "detail/_range_common.h"
0025 #include "detail/_exception.h"
0026 #include "detail/_utils.h"
0027 #include "detail/_containers_helpers.h"
0028 #include "cache_aligned_allocator.h"
0029 #include <vector>
0030 #include <iterator>
0031 #include <functional>
0032 #include <utility>
0033 #include <initializer_list>
0034 #include <type_traits>
0035
0036 namespace tbb {
0037 namespace detail {
0038 namespace d1 {
0039
0040 template <typename T, typename Compare = std::less<T>, typename Allocator = cache_aligned_allocator<T>>
0041 class concurrent_priority_queue {
0042 public:
0043 using value_type = T;
0044 using reference = T&;
0045 using const_reference = const T&;
0046
0047 using size_type = std::size_t;
0048 using difference_type = std::ptrdiff_t;
0049
0050 using allocator_type = Allocator;
0051
0052 concurrent_priority_queue() : concurrent_priority_queue(allocator_type{}) {}
0053
0054 explicit concurrent_priority_queue( const allocator_type& alloc )
0055 : mark(0), my_size(0), my_compare(), data(alloc)
0056 {
0057 my_aggregator.initialize_handler(functor{this});
0058 }
0059
0060 explicit concurrent_priority_queue( const Compare& compare, const allocator_type& alloc = allocator_type() )
0061 : mark(0), my_size(0), my_compare(compare), data(alloc)
0062 {
0063 my_aggregator.initialize_handler(functor{this});
0064 }
0065
0066 explicit concurrent_priority_queue( size_type init_capacity, const allocator_type& alloc = allocator_type() )
0067 : mark(0), my_size(0), my_compare(), data(alloc)
0068 {
0069 data.reserve(init_capacity);
0070 my_aggregator.initialize_handler(functor{this});
0071 }
0072
0073 explicit concurrent_priority_queue( size_type init_capacity, const Compare& compare, const allocator_type& alloc = allocator_type() )
0074 : mark(0), my_size(0), my_compare(compare), data(alloc)
0075 {
0076 data.reserve(init_capacity);
0077 my_aggregator.initialize_handler(functor{this});
0078 }
0079
0080 template <typename InputIterator>
0081 concurrent_priority_queue( InputIterator begin, InputIterator end, const Compare& compare, const allocator_type& alloc = allocator_type() )
0082 : mark(0), my_compare(compare), data(begin, end, alloc)
0083 {
0084 my_aggregator.initialize_handler(functor{this});
0085 heapify();
0086 my_size.store(data.size(), std::memory_order_relaxed);
0087 }
0088
0089 template <typename InputIterator>
0090 concurrent_priority_queue( InputIterator begin, InputIterator end, const allocator_type& alloc = allocator_type() )
0091 : concurrent_priority_queue(begin, end, Compare(), alloc) {}
0092
0093 concurrent_priority_queue( std::initializer_list<value_type> init, const Compare& compare, const allocator_type& alloc = allocator_type() )
0094 : concurrent_priority_queue(init.begin(), init.end(), compare, alloc) {}
0095
0096 concurrent_priority_queue( std::initializer_list<value_type> init, const allocator_type& alloc = allocator_type() )
0097 : concurrent_priority_queue(init, Compare(), alloc) {}
0098
0099 concurrent_priority_queue( const concurrent_priority_queue& other )
0100 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
0101 data(other.data)
0102 {
0103 my_aggregator.initialize_handler(functor{this});
0104 }
0105
0106 concurrent_priority_queue( const concurrent_priority_queue& other, const allocator_type& alloc )
0107 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
0108 data(other.data, alloc)
0109 {
0110 my_aggregator.initialize_handler(functor{this});
0111 }
0112
0113 concurrent_priority_queue( concurrent_priority_queue&& other )
0114 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
0115 data(std::move(other.data))
0116 {
0117 my_aggregator.initialize_handler(functor{this});
0118 }
0119
0120 concurrent_priority_queue( concurrent_priority_queue&& other, const allocator_type& alloc )
0121 : mark(other.mark), my_size(other.my_size.load(std::memory_order_relaxed)), my_compare(other.my_compare),
0122 data(std::move(other.data), alloc)
0123 {
0124 my_aggregator.initialize_handler(functor{this});
0125 }
0126
0127 concurrent_priority_queue& operator=( const concurrent_priority_queue& other ) {
0128 if (this != &other) {
0129 data = other.data;
0130 mark = other.mark;
0131 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
0132 }
0133 return *this;
0134 }
0135
0136 concurrent_priority_queue& operator=( concurrent_priority_queue&& other ) {
0137 if (this != &other) {
0138
0139 data = std::move(other.data);
0140 mark = other.mark;
0141 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
0142 }
0143 return *this;
0144 }
0145
0146 concurrent_priority_queue& operator=( std::initializer_list<value_type> init ) {
0147 assign(init.begin(), init.end());
0148 return *this;
0149 }
0150
0151 template <typename InputIterator>
0152 void assign( InputIterator begin, InputIterator end ) {
0153 data.assign(begin, end);
0154 mark = 0;
0155 my_size.store(data.size(), std::memory_order_relaxed);
0156 heapify();
0157 }
0158
0159 void assign( std::initializer_list<value_type> init ) {
0160 assign(init.begin(), init.end());
0161 }
0162
0163
0164
0165 __TBB_nodiscard bool empty() const { return size() == 0; }
0166
0167
0168
0169
0170 size_type size() const { return my_size.load(std::memory_order_relaxed); }
0171
0172
0173 void push( const value_type& value ) {
0174 cpq_operation op_data(value, PUSH_OP);
0175 my_aggregator.execute(&op_data);
0176 if (op_data.status == FAILED)
0177 throw_exception(exception_id::bad_alloc);
0178 }
0179
0180
0181 void push( value_type&& value ) {
0182 cpq_operation op_data(value, PUSH_RVALUE_OP);
0183 my_aggregator.execute(&op_data);
0184 if (op_data.status == FAILED)
0185 throw_exception(exception_id::bad_alloc);
0186 }
0187
0188
0189 template <typename... Args>
0190 void emplace( Args&&... args ) {
0191
0192 push(value_type(std::forward<Args>(args)...));
0193 }
0194
0195
0196
0197
0198
0199 bool try_pop( value_type& value ) {
0200 cpq_operation op_data(value, POP_OP);
0201 my_aggregator.execute(&op_data);
0202 return op_data.status == SUCCEEDED;
0203 }
0204
0205
0206 void clear() {
0207 data.clear();
0208 mark = 0;
0209 my_size.store(0, std::memory_order_relaxed);
0210 }
0211
0212
0213 void swap( concurrent_priority_queue& other ) {
0214 if (this != &other) {
0215 using std::swap;
0216 swap(data, other.data);
0217 swap(mark, other.mark);
0218
0219 size_type sz = my_size.load(std::memory_order_relaxed);
0220 my_size.store(other.my_size.load(std::memory_order_relaxed), std::memory_order_relaxed);
0221 other.my_size.store(sz, std::memory_order_relaxed);
0222 }
0223 }
0224
0225 allocator_type get_allocator() const { return data.get_allocator(); }
0226 private:
0227 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
0228 enum operation_status {WAIT = 0, SUCCEEDED, FAILED};
0229
0230 class cpq_operation : public aggregated_operation<cpq_operation> {
0231 public:
0232 operation_type type;
0233 union {
0234 value_type* elem;
0235 size_type sz;
0236 };
0237 cpq_operation( const value_type& value, operation_type t )
0238 : type(t), elem(const_cast<value_type*>(&value)) {}
0239 };
0240
0241 class functor {
0242 concurrent_priority_queue* my_cpq;
0243 public:
0244 functor() : my_cpq(nullptr) {}
0245 functor( concurrent_priority_queue* cpq ) : my_cpq(cpq) {}
0246
0247 void operator()(cpq_operation* op_list) {
0248 __TBB_ASSERT(my_cpq != nullptr, "Invalid functor");
0249 my_cpq->handle_operations(op_list);
0250 }
0251 };
0252
0253 void handle_operations( cpq_operation* op_list ) {
0254 call_itt_notify(acquired, this);
0255 cpq_operation* tmp, *pop_list = nullptr;
0256 __TBB_ASSERT(mark == data.size(), nullptr);
0257
0258
0259 while(op_list) {
0260
0261
0262
0263
0264
0265
0266
0267
0268 call_itt_notify(acquired, &(op_list->status));
0269 __TBB_ASSERT(op_list->type != INVALID_OP, nullptr);
0270
0271 tmp = op_list;
0272 op_list = op_list->next.load(std::memory_order_relaxed);
0273 if (tmp->type == POP_OP) {
0274 if (mark < data.size() &&
0275 my_compare(data[0], data.back()))
0276 {
0277
0278 *(tmp->elem) = std::move(data.back());
0279 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
0280 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
0281
0282 data.pop_back();
0283 __TBB_ASSERT(mark <= data.size(), nullptr);
0284 } else {
0285 tmp->next.store(pop_list, std::memory_order_relaxed);
0286 pop_list = tmp;
0287 }
0288 } else {
0289 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation");
0290 #if TBB_USE_EXCEPTIONS
0291 try
0292 #endif
0293 {
0294 if (tmp->type == PUSH_OP) {
0295 push_back_helper(*(tmp->elem));
0296 } else {
0297 data.push_back(std::move(*(tmp->elem)));
0298 }
0299 my_size.store(my_size.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
0300 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
0301 }
0302 #if TBB_USE_EXCEPTIONS
0303 catch(...) {
0304 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
0305 }
0306 #endif
0307 }
0308 }
0309
0310
0311 while(pop_list) {
0312 tmp = pop_list;
0313 pop_list = pop_list->next.load(std::memory_order_relaxed);
0314 __TBB_ASSERT(tmp->type == POP_OP, nullptr);
0315 if (data.empty()) {
0316 tmp->status.store(uintptr_t(FAILED), std::memory_order_release);
0317 } else {
0318 __TBB_ASSERT(mark <= data.size(), nullptr);
0319 if (mark < data.size() &&
0320 my_compare(data[0], data.back()))
0321 {
0322
0323 *(tmp->elem) = std::move(data.back());
0324 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
0325 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
0326 data.pop_back();
0327 } else {
0328 *(tmp->elem) = std::move(data[0]);
0329 my_size.store(my_size.load(std::memory_order_relaxed) - 1, std::memory_order_relaxed);
0330 tmp->status.store(uintptr_t(SUCCEEDED), std::memory_order_release);
0331 reheap();
0332 }
0333 }
0334 }
0335
0336
0337
0338 if (mark < data.size()) heapify();
0339 __TBB_ASSERT(mark == data.size(), nullptr);
0340 call_itt_notify(releasing, this);
0341 }
0342
0343
0344 void heapify() {
0345 if (!mark && data.size() > 0) mark = 1;
0346 for (; mark < data.size(); ++mark) {
0347
0348 size_type cur_pos = mark;
0349 value_type to_place = std::move(data[mark]);
0350 do {
0351 size_type parent = (cur_pos - 1) >> 1;
0352 if (!my_compare(data[parent], to_place))
0353 break;
0354 data[cur_pos] = std::move(data[parent]);
0355 cur_pos = parent;
0356 } while(cur_pos);
0357 data[cur_pos] = std::move(to_place);
0358 }
0359 }
0360
0361
0362
0363 void reheap() {
0364 size_type cur_pos = 0, child = 1;
0365
0366 while(child < mark) {
0367 size_type target = child;
0368 if (child + 1 < mark && my_compare(data[child], data[child + 1]))
0369 ++target;
0370
0371 if (my_compare(data[target], data.back()))
0372 break;
0373 data[cur_pos] = std::move(data[target]);
0374 cur_pos = target;
0375 child = (cur_pos << 1) + 1;
0376 }
0377 if (cur_pos != data.size() - 1)
0378 data[cur_pos] = std::move(data.back());
0379 data.pop_back();
0380 if (mark > data.size()) mark = data.size();
0381 }
0382
0383 void push_back_helper( const T& value ) {
0384 push_back_helper_impl(value, std::is_copy_constructible<T>{});
0385 }
0386
0387 void push_back_helper_impl( const T& value, std::true_type ) {
0388 data.push_back(value);
0389 }
0390
0391 void push_back_helper_impl( const T&, std::false_type ) {
0392 __TBB_ASSERT(false, "error: calling tbb::concurrent_priority_queue.push(const value_type&) for move-only type");
0393 }
0394
0395 using aggregator_type = aggregator<functor, cpq_operation>;
0396
0397 aggregator_type my_aggregator;
0398
0399 char padding1[max_nfs_size - sizeof(aggregator_type)];
0400
0401 size_type mark;
0402 std::atomic<size_type> my_size;
0403 Compare my_compare;
0404
0405
0406 char padding2[max_nfs_size - (2*sizeof(size_type)) - sizeof(Compare)];
0407
0408
0409
0410
0411
0412
0413
0414
0415
0416
0417
0418
0419
0420
0421
0422
0423
0424
0425
0426 using vector_type = std::vector<value_type, allocator_type>;
0427 vector_type data;
0428
0429 friend bool operator==( const concurrent_priority_queue& lhs,
0430 const concurrent_priority_queue& rhs )
0431 {
0432 return lhs.data == rhs.data;
0433 }
0434
0435 #if !__TBB_CPP20_COMPARISONS_PRESENT
0436 friend bool operator!=( const concurrent_priority_queue& lhs,
0437 const concurrent_priority_queue& rhs )
0438 {
0439 return !(lhs == rhs);
0440 }
0441 #endif
0442 };
0443
0444 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
0445 template <typename It,
0446 typename Comp = std::less<iterator_value_t<It>>,
0447 typename Alloc = tbb::cache_aligned_allocator<iterator_value_t<It>>,
0448 typename = std::enable_if_t<is_input_iterator_v<It>>,
0449 typename = std::enable_if_t<is_allocator_v<Alloc>>,
0450 typename = std::enable_if_t<!is_allocator_v<Comp>>>
0451 concurrent_priority_queue( It, It, Comp = Comp(), Alloc = Alloc() )
0452 -> concurrent_priority_queue<iterator_value_t<It>, Comp, Alloc>;
0453
0454 template <typename It, typename Alloc,
0455 typename = std::enable_if_t<is_input_iterator_v<It>>,
0456 typename = std::enable_if_t<is_allocator_v<Alloc>>>
0457 concurrent_priority_queue( It, It, Alloc )
0458 -> concurrent_priority_queue<iterator_value_t<It>, std::less<iterator_value_t<It>>, Alloc>;
0459
0460 template <typename T,
0461 typename Comp = std::less<T>,
0462 typename Alloc = tbb::cache_aligned_allocator<T>,
0463 typename = std::enable_if_t<is_allocator_v<Alloc>>,
0464 typename = std::enable_if_t<!is_allocator_v<Comp>>>
0465 concurrent_priority_queue( std::initializer_list<T>, Comp = Comp(), Alloc = Alloc() )
0466 -> concurrent_priority_queue<T, Comp, Alloc>;
0467
0468 template <typename T, typename Alloc,
0469 typename = std::enable_if_t<is_allocator_v<Alloc>>>
0470 concurrent_priority_queue( std::initializer_list<T>, Alloc )
0471 -> concurrent_priority_queue<T, std::less<T>, Alloc>;
0472
0473 #endif
0474
0475 template <typename T, typename Compare, typename Allocator>
0476 void swap( concurrent_priority_queue<T, Compare, Allocator>& lhs,
0477 concurrent_priority_queue<T, Compare, Allocator>& rhs )
0478 {
0479 lhs.swap(rhs);
0480 }
0481
0482 }
0483 }
0484 inline namespace v1 {
0485 using detail::d1::concurrent_priority_queue;
0486
0487 }
0488 }
0489
0490 #endif