File indexing completed on 2025-01-18 10:12:53
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_concurrent_priority_queue_H
0018 #define __TBB_concurrent_priority_queue_H
0019
0020 #define __TBB_concurrent_priority_queue_H_include_area
0021 #include "internal/_warning_suppress_enable_notice.h"
0022
0023 #include "atomic.h"
0024 #include "cache_aligned_allocator.h"
0025 #include "tbb_exception.h"
0026 #include "tbb_stddef.h"
0027 #include "tbb_profiling.h"
0028 #include "internal/_aggregator_impl.h"
0029 #include "internal/_template_helpers.h"
0030 #include "internal/_allocator_traits.h"
0031 #include <vector>
0032 #include <iterator>
0033 #include <functional>
0034 #include __TBB_STD_SWAP_HEADER
0035
0036 #if __TBB_INITIALIZER_LISTS_PRESENT
0037 #include <initializer_list>
0038 #endif
0039
0040 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
0041 #include <type_traits>
0042 #endif
0043
0044 namespace tbb {
0045 namespace interface5 {
0046 namespace internal {
0047 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
0048 template<typename T, bool C = std::is_copy_constructible<T>::value>
0049 struct use_element_copy_constructor {
0050 typedef tbb::internal::true_type type;
0051 };
0052 template<typename T>
0053 struct use_element_copy_constructor <T,false> {
0054 typedef tbb::internal::false_type type;
0055 };
0056 #else
0057 template<typename>
0058 struct use_element_copy_constructor {
0059 typedef tbb::internal::true_type type;
0060 };
0061 #endif
0062 }
0063
0064 using namespace tbb::internal;
0065
0066
0067 template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
0068 class concurrent_priority_queue {
0069 public:
0070
0071 typedef T value_type;
0072
0073
0074 typedef T& reference;
0075
0076
0077 typedef const T& const_reference;
0078
0079
0080 typedef size_t size_type;
0081
0082
0083 typedef ptrdiff_t difference_type;
0084
0085
0086 typedef A allocator_type;
0087
0088
0089 explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(), data(a)
0090 {
0091 my_aggregator.initialize_handler(my_functor_t(this));
0092 }
0093
0094
0095 explicit concurrent_priority_queue(const Compare& c, const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(c), data(a)
0096 {
0097 my_aggregator.initialize_handler(my_functor_t(this));
0098 }
0099
0100
0101 explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
0102 mark(0), my_size(0), compare(), data(a)
0103 {
0104 data.reserve(init_capacity);
0105 my_aggregator.initialize_handler(my_functor_t(this));
0106 }
0107
0108
0109 explicit concurrent_priority_queue(size_type init_capacity, const Compare& c, const allocator_type& a = allocator_type()) :
0110 mark(0), my_size(0), compare(c), data(a)
0111 {
0112 data.reserve(init_capacity);
0113 my_aggregator.initialize_handler(my_functor_t(this));
0114 }
0115
0116
0117 template<typename InputIterator>
0118 concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
0119 mark(0), compare(), data(begin, end, a)
0120 {
0121 my_aggregator.initialize_handler(my_functor_t(this));
0122 heapify();
0123 my_size = data.size();
0124 }
0125
0126
0127 template<typename InputIterator>
0128 concurrent_priority_queue(InputIterator begin, InputIterator end, const Compare& c, const allocator_type& a = allocator_type()) :
0129 mark(0), compare(c), data(begin, end, a)
0130 {
0131 my_aggregator.initialize_handler(my_functor_t(this));
0132 heapify();
0133 my_size = data.size();
0134 }
0135
0136 #if __TBB_INITIALIZER_LISTS_PRESENT
0137
0138 concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
0139 mark(0), compare(), data(init_list.begin(), init_list.end(), a)
0140 {
0141 my_aggregator.initialize_handler(my_functor_t(this));
0142 heapify();
0143 my_size = data.size();
0144 }
0145
0146
0147 concurrent_priority_queue(std::initializer_list<T> init_list, const Compare& c, const allocator_type &a = allocator_type()) :
0148 mark(0), compare(c), data(init_list.begin(), init_list.end(), a)
0149 {
0150 my_aggregator.initialize_handler(my_functor_t(this));
0151 heapify();
0152 my_size = data.size();
0153 }
0154 #endif
0155
0156
0157
0158 concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
0159 my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
0160 {
0161 my_aggregator.initialize_handler(my_functor_t(this));
0162 heapify();
0163 }
0164
0165
0166
0167 concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
0168 my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
0169 {
0170 my_aggregator.initialize_handler(my_functor_t(this));
0171 heapify();
0172 }
0173
0174
0175
0176 concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
0177 if (this != &src) {
0178 vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
0179 mark = src.mark;
0180 my_size = src.my_size;
0181 }
0182 return *this;
0183 }
0184
0185 #if __TBB_CPP11_RVALUE_REF_PRESENT
0186
0187
0188 concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
0189 my_size(src.my_size), data(std::move(src.data))
0190 {
0191 my_aggregator.initialize_handler(my_functor_t(this));
0192 }
0193
0194
0195
0196 concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
0197 my_size(src.my_size),
0198 #if __TBB_ALLOCATOR_TRAITS_PRESENT
0199 data(std::move(src.data), a)
0200 #else
0201
0202
0203 data(a)
0204 #endif
0205 {
0206 my_aggregator.initialize_handler(my_functor_t(this));
0207 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
0208 if (a != src.data.get_allocator()){
0209 data.reserve(src.data.size());
0210 data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
0211 }else{
0212 data = std::move(src.data);
0213 }
0214 #endif
0215 }
0216
0217
0218
0219 concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
0220 if (this != &src) {
0221 mark = src.mark;
0222 my_size = src.my_size;
0223 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
0224 if (data.get_allocator() != src.data.get_allocator()){
0225 vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
0226 }else
0227 #endif
0228 {
0229 data = std::move(src.data);
0230 }
0231 }
0232 return *this;
0233 }
0234 #endif
0235
0236
0237 template<typename InputIterator>
0238 void assign(InputIterator begin, InputIterator end) {
0239 vector_t(begin, end, data.get_allocator()).swap(data);
0240 mark = 0;
0241 my_size = data.size();
0242 heapify();
0243 }
0244
0245 #if __TBB_INITIALIZER_LISTS_PRESENT
0246
0247 void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
0248
0249
0250 concurrent_priority_queue& operator=(std::initializer_list<T> il) {
0251 this->assign(il.begin(), il.end());
0252 return *this;
0253 }
0254 #endif
0255
0256
0257
0258
0259 bool empty() const { return size()==0; }
0260
0261
0262
0263
0264 size_type size() const { return __TBB_load_with_acquire(my_size); }
0265
0266
0267
0268 void push(const_reference elem) {
0269 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
0270 __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
0271 #endif
0272 cpq_operation op_data(elem, PUSH_OP);
0273 my_aggregator.execute(&op_data);
0274 if (op_data.status == FAILED)
0275 throw_exception(eid_bad_alloc);
0276 }
0277
0278 #if __TBB_CPP11_RVALUE_REF_PRESENT
0279
0280
0281 void push(value_type &&elem) {
0282 cpq_operation op_data(elem, PUSH_RVALUE_OP);
0283 my_aggregator.execute(&op_data);
0284 if (op_data.status == FAILED)
0285 throw_exception(eid_bad_alloc);
0286 }
0287
0288 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
0289
0290
0291 template<typename... Args>
0292 void emplace(Args&&... args) {
0293 push(value_type(std::forward<Args>(args)...));
0294 }
0295 #endif
0296 #endif
0297
0298
0299
0300
0301
0302 bool try_pop(reference elem) {
0303 cpq_operation op_data(POP_OP);
0304 op_data.elem = &elem;
0305 my_aggregator.execute(&op_data);
0306 return op_data.status==SUCCEEDED;
0307 }
0308
0309
0310
0311
0312
0313 void clear() {
0314 data.clear();
0315 mark = 0;
0316 my_size = 0;
0317 }
0318
0319
0320
0321 void swap(concurrent_priority_queue& q) {
0322 using std::swap;
0323 data.swap(q.data);
0324 swap(mark, q.mark);
0325 swap(my_size, q.my_size);
0326 }
0327
0328
0329 allocator_type get_allocator() const { return data.get_allocator(); }
0330
0331 private:
0332 enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
0333 enum operation_status { WAIT=0, SUCCEEDED, FAILED };
0334
0335 class cpq_operation : public aggregated_operation<cpq_operation> {
0336 public:
0337 operation_type type;
0338 union {
0339 value_type *elem;
0340 size_type sz;
0341 };
0342 cpq_operation(const_reference e, operation_type t) :
0343 type(t), elem(const_cast<value_type*>(&e)) {}
0344 cpq_operation(operation_type t) : type(t) {}
0345 };
0346
0347 class my_functor_t {
0348 concurrent_priority_queue<T, Compare, A> *cpq;
0349 public:
0350 my_functor_t() {}
0351 my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
0352 void operator()(cpq_operation* op_list) {
0353 cpq->handle_operations(op_list);
0354 }
0355 };
0356
0357 typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
0358 aggregator_t my_aggregator;
0359
0360 char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
0361
0362 size_type mark;
0363 __TBB_atomic size_type my_size;
0364 Compare compare;
0365
0366 char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
0367
0368
0369
0370
0371
0372
0373
0374
0375
0376
0377
0378
0379
0380
0381
0382
0383
0384
0385 typedef std::vector<value_type, allocator_type> vector_t;
0386 vector_t data;
0387
0388 void handle_operations(cpq_operation *op_list) {
0389 cpq_operation *tmp, *pop_list=NULL;
0390
0391 __TBB_ASSERT(mark == data.size(), NULL);
0392
0393
0394 while (op_list) {
0395
0396
0397
0398
0399
0400
0401
0402 call_itt_notify(acquired, &(op_list->status));
0403 __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
0404 tmp = op_list;
0405 op_list = itt_hide_load_word(op_list->next);
0406 if (tmp->type == POP_OP) {
0407 if (mark < data.size() &&
0408 compare(data[0], data[data.size()-1])) {
0409
0410
0411 *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
0412 __TBB_store_with_release(my_size, my_size-1);
0413 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
0414 data.pop_back();
0415 __TBB_ASSERT(mark<=data.size(), NULL);
0416 }
0417 else {
0418 itt_hide_store_word(tmp->next, pop_list);
0419 pop_list = tmp;
0420 }
0421 } else {
0422 __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
0423 __TBB_TRY{
0424 if (tmp->type == PUSH_OP) {
0425 push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
0426 } else {
0427 data.push_back(tbb::internal::move(*(tmp->elem)));
0428 }
0429 __TBB_store_with_release(my_size, my_size + 1);
0430 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
0431 } __TBB_CATCH(...) {
0432 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
0433 }
0434 }
0435 }
0436
0437
0438 while (pop_list) {
0439 tmp = pop_list;
0440 pop_list = itt_hide_load_word(pop_list->next);
0441 __TBB_ASSERT(tmp->type == POP_OP, NULL);
0442 if (data.empty()) {
0443 itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
0444 }
0445 else {
0446 __TBB_ASSERT(mark<=data.size(), NULL);
0447 if (mark < data.size() &&
0448 compare(data[0], data[data.size()-1])) {
0449
0450
0451 *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
0452 __TBB_store_with_release(my_size, my_size-1);
0453 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
0454 data.pop_back();
0455 }
0456 else {
0457 *(tmp->elem) = tbb::internal::move(data[0]);
0458 __TBB_store_with_release(my_size, my_size-1);
0459 itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
0460 reheap();
0461 }
0462 }
0463 }
0464
0465
0466
0467 if (mark<data.size()) heapify();
0468 __TBB_ASSERT(mark == data.size(), NULL);
0469 }
0470
0471
0472 void heapify() {
0473 if (!mark && data.size()>0) mark = 1;
0474 for (; mark<data.size(); ++mark) {
0475
0476 size_type cur_pos = mark;
0477 value_type to_place = tbb::internal::move(data[mark]);
0478 do {
0479 size_type parent = (cur_pos-1)>>1;
0480 if (!compare(data[parent], to_place)) break;
0481 data[cur_pos] = tbb::internal::move(data[parent]);
0482 cur_pos = parent;
0483 } while( cur_pos );
0484 data[cur_pos] = tbb::internal::move(to_place);
0485 }
0486 }
0487
0488
0489
0490 void reheap() {
0491 size_type cur_pos=0, child=1;
0492
0493 while (child < mark) {
0494 size_type target = child;
0495 if (child+1 < mark && compare(data[child], data[child+1]))
0496 ++target;
0497
0498 if (compare(data[target], data[data.size()-1])) break;
0499 data[cur_pos] = tbb::internal::move(data[target]);
0500 cur_pos = target;
0501 child = (cur_pos<<1)+1;
0502 }
0503 if (cur_pos != data.size()-1)
0504 data[cur_pos] = tbb::internal::move(data[data.size()-1]);
0505 data.pop_back();
0506 if (mark > data.size()) mark = data.size();
0507 }
0508
0509 void push_back_helper(const T& t, tbb::internal::true_type) {
0510 data.push_back(t);
0511 }
0512
0513 void push_back_helper(const T&, tbb::internal::false_type) {
0514 __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
0515 }
0516 };
0517
0518 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
0519 namespace internal {
0520
0521 template<typename T, typename... Args>
0522 using priority_queue_t = concurrent_priority_queue<
0523 T,
0524 std::conditional_t< (sizeof...(Args)>0) && !is_allocator_v< pack_element_t<0, Args...> >,
0525 pack_element_t<0, Args...>, std::less<T> >,
0526 std::conditional_t< (sizeof...(Args)>0) && is_allocator_v< pack_element_t<sizeof...(Args)-1, Args...> >,
0527 pack_element_t<sizeof...(Args)-1, Args...>, cache_aligned_allocator<T> >
0528 >;
0529 }
0530
0531
0532 template<typename InputIterator,
0533 typename T = typename std::iterator_traits<InputIterator>::value_type,
0534 typename... Args
0535 > concurrent_priority_queue(InputIterator, InputIterator, Args...)
0536 -> internal::priority_queue_t<T, Args...>;
0537
0538 template<typename T, typename CompareOrAllocalor>
0539 concurrent_priority_queue(std::initializer_list<T> init_list, CompareOrAllocalor)
0540 -> internal::priority_queue_t<T, CompareOrAllocalor>;
0541
0542 #endif
0543 }
0544
0545 using interface5::concurrent_priority_queue;
0546
0547 }
0548
0549 #include "internal/_warning_suppress_disable_notice.h"
0550 #undef __TBB_concurrent_priority_queue_H_include_area
0551
0552 #endif