File indexing completed on 2025-01-18 10:12:46
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_cache_impl_H
0018 #define __TBB__flow_graph_cache_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024
0025
0026 namespace internal {
0027
0028
0029 template< typename T, typename M=spin_mutex >
0030 class node_cache {
0031 public:
0032
0033 typedef size_t size_type;
0034
0035 bool empty() {
0036 typename mutex_type::scoped_lock lock( my_mutex );
0037 return internal_empty();
0038 }
0039
0040 void add( T &n ) {
0041 typename mutex_type::scoped_lock lock( my_mutex );
0042 internal_push(n);
0043 }
0044
0045 void remove( T &n ) {
0046 typename mutex_type::scoped_lock lock( my_mutex );
0047 for ( size_t i = internal_size(); i != 0; --i ) {
0048 T &s = internal_pop();
0049 if ( &s == &n ) return;
0050 internal_push(s);
0051 }
0052 }
0053
0054 void clear() {
0055 while( !my_q.empty()) (void)my_q.pop();
0056 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0057 my_built_predecessors.clear();
0058 #endif
0059 }
0060
0061 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0062 typedef edge_container<T> built_predecessors_type;
0063 built_predecessors_type &built_predecessors() { return my_built_predecessors; }
0064
0065 typedef typename edge_container<T>::edge_list_type predecessor_list_type;
0066 void internal_add_built_predecessor( T &n ) {
0067 typename mutex_type::scoped_lock lock( my_mutex );
0068 my_built_predecessors.add_edge(n);
0069 }
0070
0071 void internal_delete_built_predecessor( T &n ) {
0072 typename mutex_type::scoped_lock lock( my_mutex );
0073 my_built_predecessors.delete_edge(n);
0074 }
0075
0076 void copy_predecessors( predecessor_list_type &v) {
0077 typename mutex_type::scoped_lock lock( my_mutex );
0078 my_built_predecessors.copy_edges(v);
0079 }
0080
0081 size_t predecessor_count() {
0082 typename mutex_type::scoped_lock lock(my_mutex);
0083 return (size_t)(my_built_predecessors.edge_count());
0084 }
0085 #endif
0086
0087 protected:
0088
0089 typedef M mutex_type;
0090 mutex_type my_mutex;
0091 std::queue< T * > my_q;
0092 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0093 built_predecessors_type my_built_predecessors;
0094 #endif
0095
0096
0097 inline bool internal_empty( ) {
0098 return my_q.empty();
0099 }
0100
0101
0102 inline size_type internal_size( ) {
0103 return my_q.size();
0104 }
0105
0106
0107 inline void internal_push( T &n ) {
0108 my_q.push(&n);
0109 }
0110
0111
0112 inline T &internal_pop() {
0113 T *v = my_q.front();
0114 my_q.pop();
0115 return *v;
0116 }
0117
0118 };
0119
0120
0121 template< typename T, typename M=spin_mutex >
0122 #if __TBB_PREVIEW_ASYNC_MSG
0123
0124 class predecessor_cache : public node_cache< untyped_sender, M > {
0125 #else
0126 class predecessor_cache : public node_cache< sender<T>, M > {
0127 #endif
0128 public:
0129 typedef M mutex_type;
0130 typedef T output_type;
0131 #if __TBB_PREVIEW_ASYNC_MSG
0132 typedef untyped_sender predecessor_type;
0133 typedef untyped_receiver successor_type;
0134 #else
0135 typedef sender<output_type> predecessor_type;
0136 typedef receiver<output_type> successor_type;
0137 #endif
0138
0139 predecessor_cache( ) : my_owner( NULL ) { }
0140
0141 void set_owner( successor_type *owner ) { my_owner = owner; }
0142
0143 bool get_item( output_type &v ) {
0144
0145 bool msg = false;
0146
0147 do {
0148 predecessor_type *src;
0149 {
0150 typename mutex_type::scoped_lock lock(this->my_mutex);
0151 if ( this->internal_empty() ) {
0152 break;
0153 }
0154 src = &this->internal_pop();
0155 }
0156
0157
0158 msg = src->try_get( v );
0159
0160 if (msg == false) {
0161
0162 if (my_owner)
0163 src->register_successor( *my_owner );
0164 } else {
0165
0166 this->add(*src);
0167 }
0168 } while ( msg == false );
0169 return msg;
0170 }
0171
0172
0173 void reset() {
0174 if (my_owner) {
0175 for(;;) {
0176 predecessor_type *src;
0177 {
0178 if (this->internal_empty()) break;
0179 src = &this->internal_pop();
0180 }
0181 src->register_successor( *my_owner );
0182 }
0183 }
0184 }
0185
0186 protected:
0187
0188 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0189 using node_cache< predecessor_type, M >::my_built_predecessors;
0190 #endif
0191 successor_type *my_owner;
0192 };
0193
0194
0195
0196 template< typename T, typename M=spin_mutex >
0197 class reservable_predecessor_cache : public predecessor_cache< T, M > {
0198 public:
0199 typedef M mutex_type;
0200 typedef T output_type;
0201 #if __TBB_PREVIEW_ASYNC_MSG
0202 typedef untyped_sender predecessor_type;
0203 typedef untyped_receiver successor_type;
0204 #else
0205 typedef sender<T> predecessor_type;
0206 typedef receiver<T> successor_type;
0207 #endif
0208
0209 reservable_predecessor_cache( ) : reserved_src(NULL) { }
0210
0211 bool
0212 try_reserve( output_type &v ) {
0213 bool msg = false;
0214
0215 do {
0216 {
0217 typename mutex_type::scoped_lock lock(this->my_mutex);
0218 if ( reserved_src || this->internal_empty() )
0219 return false;
0220
0221 reserved_src = &this->internal_pop();
0222 }
0223
0224
0225 msg = reserved_src->try_reserve( v );
0226
0227 if (msg == false) {
0228 typename mutex_type::scoped_lock lock(this->my_mutex);
0229
0230 reserved_src->register_successor( *this->my_owner );
0231 reserved_src = NULL;
0232 } else {
0233
0234 this->add( *reserved_src );
0235 }
0236 } while ( msg == false );
0237
0238 return msg;
0239 }
0240
0241 bool
0242 try_release( ) {
0243 reserved_src->try_release( );
0244 reserved_src = NULL;
0245 return true;
0246 }
0247
0248 bool
0249 try_consume( ) {
0250 reserved_src->try_consume( );
0251 reserved_src = NULL;
0252 return true;
0253 }
0254
0255 void reset( ) {
0256 reserved_src = NULL;
0257 predecessor_cache<T,M>::reset( );
0258 }
0259
0260 void clear() {
0261 reserved_src = NULL;
0262 predecessor_cache<T,M>::clear();
0263 }
0264
0265 private:
0266 predecessor_type *reserved_src;
0267 };
0268
0269
0270
0271
0272 template<typename T, typename M=spin_rw_mutex >
0273 class successor_cache : tbb::internal::no_copy {
0274 protected:
0275
0276 typedef M mutex_type;
0277 mutex_type my_mutex;
0278
0279 #if __TBB_PREVIEW_ASYNC_MSG
0280 typedef untyped_receiver successor_type;
0281 typedef untyped_receiver *pointer_type;
0282 typedef untyped_sender owner_type;
0283 #else
0284 typedef receiver<T> successor_type;
0285 typedef receiver<T> *pointer_type;
0286 typedef sender<T> owner_type;
0287 #endif
0288 typedef std::list< pointer_type > successors_type;
0289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0290 edge_container<successor_type> my_built_successors;
0291 #endif
0292 successors_type my_successors;
0293
0294 owner_type *my_owner;
0295
0296 public:
0297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0298 typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
0299
0300 edge_container<successor_type> &built_successors() { return my_built_successors; }
0301
0302 void internal_add_built_successor( successor_type &r) {
0303 typename mutex_type::scoped_lock l(my_mutex, true);
0304 my_built_successors.add_edge( r );
0305 }
0306
0307 void internal_delete_built_successor( successor_type &r) {
0308 typename mutex_type::scoped_lock l(my_mutex, true);
0309 my_built_successors.delete_edge(r);
0310 }
0311
0312 void copy_successors( successor_list_type &v) {
0313 typename mutex_type::scoped_lock l(my_mutex, false);
0314 my_built_successors.copy_edges(v);
0315 }
0316
0317 size_t successor_count() {
0318 typename mutex_type::scoped_lock l(my_mutex,false);
0319 return my_built_successors.edge_count();
0320 }
0321
0322 #endif
0323
0324 successor_cache( ) : my_owner(NULL) {}
0325
0326 void set_owner( owner_type *owner ) { my_owner = owner; }
0327
0328 virtual ~successor_cache() {}
0329
0330 void register_successor( successor_type &r ) {
0331 typename mutex_type::scoped_lock l(my_mutex, true);
0332 my_successors.push_back( &r );
0333 }
0334
0335 void remove_successor( successor_type &r ) {
0336 typename mutex_type::scoped_lock l(my_mutex, true);
0337 for ( typename successors_type::iterator i = my_successors.begin();
0338 i != my_successors.end(); ++i ) {
0339 if ( *i == & r ) {
0340 my_successors.erase(i);
0341 break;
0342 }
0343 }
0344 }
0345
0346 bool empty() {
0347 typename mutex_type::scoped_lock l(my_mutex, false);
0348 return my_successors.empty();
0349 }
0350
0351 void clear() {
0352 my_successors.clear();
0353 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0354 my_built_successors.clear();
0355 #endif
0356 }
0357
0358 #if !__TBB_PREVIEW_ASYNC_MSG
0359 virtual task * try_put_task( const T &t ) = 0;
0360 #endif
0361 };
0362
0363
0364 template<typename M>
0365 class successor_cache< continue_msg, M > : tbb::internal::no_copy {
0366 protected:
0367
0368 typedef M mutex_type;
0369 mutex_type my_mutex;
0370
0371 #if __TBB_PREVIEW_ASYNC_MSG
0372 typedef untyped_receiver successor_type;
0373 typedef untyped_receiver *pointer_type;
0374 #else
0375 typedef receiver<continue_msg> successor_type;
0376 typedef receiver<continue_msg> *pointer_type;
0377 #endif
0378 typedef std::list< pointer_type > successors_type;
0379 successors_type my_successors;
0380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0381 edge_container<successor_type> my_built_successors;
0382 typedef edge_container<successor_type>::edge_list_type successor_list_type;
0383 #endif
0384
0385 sender<continue_msg> *my_owner;
0386
0387 public:
0388
0389 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0390
0391 edge_container<successor_type> &built_successors() { return my_built_successors; }
0392
0393 void internal_add_built_successor( successor_type &r) {
0394 typename mutex_type::scoped_lock l(my_mutex, true);
0395 my_built_successors.add_edge( r );
0396 }
0397
0398 void internal_delete_built_successor( successor_type &r) {
0399 typename mutex_type::scoped_lock l(my_mutex, true);
0400 my_built_successors.delete_edge(r);
0401 }
0402
0403 void copy_successors( successor_list_type &v) {
0404 typename mutex_type::scoped_lock l(my_mutex, false);
0405 my_built_successors.copy_edges(v);
0406 }
0407
0408 size_t successor_count() {
0409 typename mutex_type::scoped_lock l(my_mutex,false);
0410 return my_built_successors.edge_count();
0411 }
0412
0413 #endif
0414
0415 successor_cache( ) : my_owner(NULL) {}
0416
0417 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
0418
0419 virtual ~successor_cache() {}
0420
0421 void register_successor( successor_type &r ) {
0422 typename mutex_type::scoped_lock l(my_mutex, true);
0423 my_successors.push_back( &r );
0424 if ( my_owner && r.is_continue_receiver() ) {
0425 r.register_predecessor( *my_owner );
0426 }
0427 }
0428
0429 void remove_successor( successor_type &r ) {
0430 typename mutex_type::scoped_lock l(my_mutex, true);
0431 for ( successors_type::iterator i = my_successors.begin();
0432 i != my_successors.end(); ++i ) {
0433 if ( *i == & r ) {
0434
0435
0436 if ( my_owner )
0437 r.remove_predecessor( *my_owner );
0438 my_successors.erase(i);
0439 break;
0440 }
0441 }
0442 }
0443
0444 bool empty() {
0445 typename mutex_type::scoped_lock l(my_mutex, false);
0446 return my_successors.empty();
0447 }
0448
0449 void clear() {
0450 my_successors.clear();
0451 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0452 my_built_successors.clear();
0453 #endif
0454 }
0455
0456 #if !__TBB_PREVIEW_ASYNC_MSG
0457 virtual task * try_put_task( const continue_msg &t ) = 0;
0458 #endif
0459
0460 };
0461
0462
0463
0464 template<typename T, typename M=spin_rw_mutex>
0465 class broadcast_cache : public successor_cache<T, M> {
0466 typedef M mutex_type;
0467 typedef typename successor_cache<T,M>::successors_type successors_type;
0468
0469 public:
0470
0471 broadcast_cache( ) {}
0472
0473
0474 #if __TBB_PREVIEW_ASYNC_MSG
0475 template<typename X>
0476 task * try_put_task( const X &t ) {
0477 #else
0478 task * try_put_task( const T &t ) __TBB_override {
0479 #endif
0480 task * last_task = NULL;
0481 bool upgraded = true;
0482 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
0483 typename successors_type::iterator i = this->my_successors.begin();
0484 while ( i != this->my_successors.end() ) {
0485 task *new_task = (*i)->try_put_task(t);
0486
0487 graph& graph_ref = (*i)->graph_reference();
0488 last_task = combine_tasks(graph_ref, last_task, new_task);
0489 if(new_task) {
0490 ++i;
0491 }
0492 else {
0493 if ( (*i)->register_predecessor(*this->my_owner) ) {
0494 if (!upgraded) {
0495 l.upgrade_to_writer();
0496 upgraded = true;
0497 }
0498 i = this->my_successors.erase(i);
0499 } else {
0500 ++i;
0501 }
0502 }
0503 }
0504 return last_task;
0505 }
0506
0507
0508 #if __TBB_PREVIEW_ASYNC_MSG
0509 template<typename X>
0510 bool gather_successful_try_puts( const X &t, task_list &tasks ) {
0511 #else
0512 bool gather_successful_try_puts( const T &t, task_list &tasks ) {
0513 #endif
0514 bool upgraded = true;
0515 bool is_at_least_one_put_successful = false;
0516 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
0517 typename successors_type::iterator i = this->my_successors.begin();
0518 while ( i != this->my_successors.end() ) {
0519 task * new_task = (*i)->try_put_task(t);
0520 if(new_task) {
0521 ++i;
0522 if(new_task != SUCCESSFULLY_ENQUEUED) {
0523 tasks.push_back(*new_task);
0524 }
0525 is_at_least_one_put_successful = true;
0526 }
0527 else {
0528 if ( (*i)->register_predecessor(*this->my_owner) ) {
0529 if (!upgraded) {
0530 l.upgrade_to_writer();
0531 upgraded = true;
0532 }
0533 i = this->my_successors.erase(i);
0534 } else {
0535 ++i;
0536 }
0537 }
0538 }
0539 return is_at_least_one_put_successful;
0540 }
0541 };
0542
0543
0544
0545 template<typename T, typename M=spin_rw_mutex >
0546 class round_robin_cache : public successor_cache<T, M> {
0547 typedef size_t size_type;
0548 typedef M mutex_type;
0549 typedef typename successor_cache<T,M>::successors_type successors_type;
0550
0551 public:
0552
0553 round_robin_cache( ) {}
0554
0555 size_type size() {
0556 typename mutex_type::scoped_lock l(this->my_mutex, false);
0557 return this->my_successors.size();
0558 }
0559
0560 #if __TBB_PREVIEW_ASYNC_MSG
0561 template<typename X>
0562 task * try_put_task( const X &t ) {
0563 #else
0564 task *try_put_task( const T &t ) __TBB_override {
0565 #endif
0566 bool upgraded = true;
0567 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
0568 typename successors_type::iterator i = this->my_successors.begin();
0569 while ( i != this->my_successors.end() ) {
0570 task *new_task = (*i)->try_put_task(t);
0571 if ( new_task ) {
0572 return new_task;
0573 } else {
0574 if ( (*i)->register_predecessor(*this->my_owner) ) {
0575 if (!upgraded) {
0576 l.upgrade_to_writer();
0577 upgraded = true;
0578 }
0579 i = this->my_successors.erase(i);
0580 }
0581 else {
0582 ++i;
0583 }
0584 }
0585 }
0586 return NULL;
0587 }
0588 };
0589
0590 }
0591
0592 #endif