File indexing completed on 2025-12-18 10:24:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_cache_impl_H
0018 #define __TBB__flow_graph_cache_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024
0025
0026
0027 template< typename T, typename M=spin_mutex >
0028 class node_cache {
0029 public:
0030
0031 typedef size_t size_type;
0032
0033 bool empty() {
0034 typename mutex_type::scoped_lock lock( my_mutex );
0035 return internal_empty();
0036 }
0037
0038 void add( T &n ) {
0039 typename mutex_type::scoped_lock lock( my_mutex );
0040 internal_push(n);
0041 }
0042
0043 void remove( T &n ) {
0044 typename mutex_type::scoped_lock lock( my_mutex );
0045 for ( size_t i = internal_size(); i != 0; --i ) {
0046 T &s = internal_pop();
0047 if ( &s == &n )
0048 break;
0049 internal_push(s);
0050 }
0051 }
0052
0053 void clear() {
0054 while( !my_q.empty()) (void)my_q.pop();
0055 }
0056
0057 protected:
0058
0059 typedef M mutex_type;
0060 mutex_type my_mutex;
0061 std::queue< T * > my_q;
0062
0063
0064 inline bool internal_empty( ) {
0065 return my_q.empty();
0066 }
0067
0068
0069 inline size_type internal_size( ) {
0070 return my_q.size();
0071 }
0072
0073
0074 inline void internal_push( T &n ) {
0075 my_q.push(&n);
0076 }
0077
0078
0079 inline T &internal_pop() {
0080 T *v = my_q.front();
0081 my_q.pop();
0082 return *v;
0083 }
0084
0085 };
0086
0087
0088 template< typename T, typename M=spin_mutex >
0089 class predecessor_cache : public node_cache< sender<T>, M > {
0090 public:
0091 typedef M mutex_type;
0092 typedef T output_type;
0093 typedef sender<output_type> predecessor_type;
0094 typedef receiver<output_type> successor_type;
0095
0096 predecessor_cache( successor_type* owner ) : my_owner( owner ) {
0097 __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." );
0098
0099 }
0100
0101 private:
0102 bool get_item_impl( output_type& v
0103 __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
0104 {
0105
0106 bool successful_get = false;
0107
0108 do {
0109 predecessor_type *src;
0110 {
0111 typename mutex_type::scoped_lock lock(this->my_mutex);
0112 if ( this->internal_empty() ) {
0113 break;
0114 }
0115 src = &this->internal_pop();
0116 }
0117
0118
0119 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0120 if (metainfo_ptr) {
0121 successful_get = src->try_get( v, *metainfo_ptr );
0122 } else
0123 #endif
0124 {
0125 successful_get = src->try_get( v );
0126 }
0127
0128 if (successful_get == false) {
0129
0130 register_successor(*src, *my_owner);
0131 } else {
0132
0133 this->add(*src);
0134 }
0135 } while ( successful_get == false );
0136 return successful_get;
0137 }
0138 public:
0139 bool get_item( output_type& v ) {
0140 return get_item_impl(v);
0141 }
0142
0143 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0144 bool get_item( output_type& v, message_metainfo& metainfo ) {
0145 return get_item_impl(v, &metainfo);
0146 }
0147 #endif
0148
0149
0150 void reset() {
0151 for(;;) {
0152 predecessor_type *src;
0153 {
0154 if (this->internal_empty()) break;
0155 src = &this->internal_pop();
0156 }
0157 register_successor(*src, *my_owner);
0158 }
0159 }
0160
0161 protected:
0162 successor_type* my_owner;
0163 };
0164
0165
0166 template< typename T, typename M=spin_mutex >
0167 class reservable_predecessor_cache : public predecessor_cache< T, M > {
0168 public:
0169 typedef M mutex_type;
0170 typedef T output_type;
0171 typedef sender<T> predecessor_type;
0172 typedef receiver<T> successor_type;
0173
0174 reservable_predecessor_cache( successor_type* owner )
0175 : predecessor_cache<T,M>(owner), reserved_src(nullptr)
0176 {
0177
0178 }
0179
0180 private:
0181 bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
0182 bool successful_reserve = false;
0183
0184 do {
0185 predecessor_type* pred = nullptr;
0186 {
0187 typename mutex_type::scoped_lock lock(this->my_mutex);
0188 if ( reserved_src.load(std::memory_order_relaxed) || this->internal_empty() )
0189 return false;
0190
0191 pred = &this->internal_pop();
0192 reserved_src.store(pred, std::memory_order_relaxed);
0193 }
0194
0195
0196 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0197 if (metainfo) {
0198 successful_reserve = pred->try_reserve( v, *metainfo );
0199 } else
0200 #endif
0201 {
0202 successful_reserve = pred->try_reserve( v );
0203 }
0204
0205 if (successful_reserve == false) {
0206 typename mutex_type::scoped_lock lock(this->my_mutex);
0207
0208 register_successor( *pred, *this->my_owner );
0209 reserved_src.store(nullptr, std::memory_order_relaxed);
0210 } else {
0211
0212 this->add( *pred);
0213 }
0214 } while ( successful_reserve == false );
0215
0216 return successful_reserve;
0217 }
0218 public:
0219 bool try_reserve( output_type& v ) {
0220 return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
0221 }
0222
0223 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0224 bool try_reserve( output_type& v, message_metainfo& metainfo ) {
0225 return try_reserve_impl(v, &metainfo);
0226 }
0227 #endif
0228
0229 bool try_release() {
0230 reserved_src.load(std::memory_order_relaxed)->try_release();
0231 reserved_src.store(nullptr, std::memory_order_relaxed);
0232 return true;
0233 }
0234
0235 bool try_consume() {
0236 reserved_src.load(std::memory_order_relaxed)->try_consume();
0237 reserved_src.store(nullptr, std::memory_order_relaxed);
0238 return true;
0239 }
0240
0241 void reset() {
0242 reserved_src.store(nullptr, std::memory_order_relaxed);
0243 predecessor_cache<T, M>::reset();
0244 }
0245
0246 void clear() {
0247 reserved_src.store(nullptr, std::memory_order_relaxed);
0248 predecessor_cache<T, M>::clear();
0249 }
0250
0251 private:
0252 std::atomic<predecessor_type*> reserved_src;
0253 };
0254
0255
0256
0257 template<typename T, typename M=spin_rw_mutex >
0258 class successor_cache : no_copy {
0259 protected:
0260
0261 typedef M mutex_type;
0262 mutex_type my_mutex;
0263
0264 typedef receiver<T> successor_type;
0265 typedef receiver<T>* pointer_type;
0266 typedef sender<T> owner_type;
0267
0268 typedef std::list< pointer_type > successors_type;
0269 successors_type my_successors;
0270
0271 owner_type* my_owner;
0272
0273 public:
0274 successor_cache( owner_type* owner ) : my_owner(owner) {
0275
0276 }
0277
0278 virtual ~successor_cache() {}
0279
0280 void register_successor( successor_type& r ) {
0281 typename mutex_type::scoped_lock l(my_mutex, true);
0282 if( r.priority() != no_priority )
0283 my_successors.push_front( &r );
0284 else
0285 my_successors.push_back( &r );
0286 }
0287
0288 void remove_successor( successor_type& r ) {
0289 typename mutex_type::scoped_lock l(my_mutex, true);
0290 for ( typename successors_type::iterator i = my_successors.begin();
0291 i != my_successors.end(); ++i ) {
0292 if ( *i == & r ) {
0293 my_successors.erase(i);
0294 break;
0295 }
0296 }
0297 }
0298
0299 bool empty() {
0300 typename mutex_type::scoped_lock l(my_mutex, false);
0301 return my_successors.empty();
0302 }
0303
0304 void clear() {
0305 my_successors.clear();
0306 }
0307
0308 virtual graph_task* try_put_task( const T& t ) = 0;
0309 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0310 virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0;
0311 #endif
0312 };
0313
0314
0315 template<typename M>
0316 class successor_cache< continue_msg, M > : no_copy {
0317 protected:
0318
0319 typedef M mutex_type;
0320 mutex_type my_mutex;
0321
0322 typedef receiver<continue_msg> successor_type;
0323 typedef receiver<continue_msg>* pointer_type;
0324 typedef sender<continue_msg> owner_type;
0325 typedef std::list< pointer_type > successors_type;
0326 successors_type my_successors;
0327 owner_type* my_owner;
0328
0329 public:
0330 successor_cache( sender<continue_msg>* owner ) : my_owner(owner) {
0331
0332 }
0333
0334 virtual ~successor_cache() {}
0335
0336 void register_successor( successor_type& r ) {
0337 typename mutex_type::scoped_lock l(my_mutex, true);
0338 if( r.priority() != no_priority )
0339 my_successors.push_front( &r );
0340 else
0341 my_successors.push_back( &r );
0342 __TBB_ASSERT( my_owner, "Cache of successors must have an owner." );
0343 if ( r.is_continue_receiver() ) {
0344 r.register_predecessor( *my_owner );
0345 }
0346 }
0347
0348 void remove_successor( successor_type& r ) {
0349 typename mutex_type::scoped_lock l(my_mutex, true);
0350 for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) {
0351 if ( *i == &r ) {
0352 __TBB_ASSERT(my_owner, "Cache of successors must have an owner.");
0353
0354 r.remove_predecessor( *my_owner );
0355 my_successors.erase(i);
0356 break;
0357 }
0358 }
0359 }
0360
0361 bool empty() {
0362 typename mutex_type::scoped_lock l(my_mutex, false);
0363 return my_successors.empty();
0364 }
0365
0366 void clear() {
0367 my_successors.clear();
0368 }
0369
0370 virtual graph_task* try_put_task( const continue_msg& t ) = 0;
0371 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0372 virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0;
0373 #endif
0374 };
0375
0376
0377 template<typename T, typename M=spin_rw_mutex>
0378 class broadcast_cache : public successor_cache<T, M> {
0379 typedef successor_cache<T, M> base_type;
0380 typedef M mutex_type;
0381 typedef typename successor_cache<T,M>::successors_type successors_type;
0382
0383 graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
0384 graph_task * last_task = nullptr;
0385 typename mutex_type::scoped_lock l(this->my_mutex, true);
0386 typename successors_type::iterator i = this->my_successors.begin();
0387 while ( i != this->my_successors.end() ) {
0388 graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0389
0390 graph& graph_ref = (*i)->graph_reference();
0391 last_task = combine_tasks(graph_ref, last_task, new_task);
0392 if(new_task) {
0393 ++i;
0394 }
0395 else {
0396 if ( (*i)->register_predecessor(*this->my_owner) ) {
0397 i = this->my_successors.erase(i);
0398 } else {
0399 ++i;
0400 }
0401 }
0402 }
0403 return last_task;
0404 }
0405 public:
0406
0407 broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
0408
0409 }
0410
0411 graph_task* try_put_task( const T &t ) override {
0412 return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0413 }
0414
0415 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0416 graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override {
0417 return try_put_task_impl(t, metainfo);
0418 }
0419 #endif
0420
0421
0422 bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
0423 bool is_at_least_one_put_successful = false;
0424 typename mutex_type::scoped_lock l(this->my_mutex, true);
0425 typename successors_type::iterator i = this->my_successors.begin();
0426 while ( i != this->my_successors.end() ) {
0427 graph_task * new_task = (*i)->try_put_task(t);
0428 if(new_task) {
0429 ++i;
0430 if(new_task != SUCCESSFULLY_ENQUEUED) {
0431 tasks.push_back(*new_task);
0432 }
0433 is_at_least_one_put_successful = true;
0434 }
0435 else {
0436 if ( (*i)->register_predecessor(*this->my_owner) ) {
0437 i = this->my_successors.erase(i);
0438 } else {
0439 ++i;
0440 }
0441 }
0442 }
0443 return is_at_least_one_put_successful;
0444 }
0445 };
0446
0447
0448 template<typename T, typename M=spin_rw_mutex >
0449 class round_robin_cache : public successor_cache<T, M> {
0450 typedef successor_cache<T, M> base_type;
0451 typedef size_t size_type;
0452 typedef M mutex_type;
0453 typedef typename successor_cache<T,M>::successors_type successors_type;
0454
0455 public:
0456
0457 round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) {
0458
0459 }
0460
0461 size_type size() {
0462 typename mutex_type::scoped_lock l(this->my_mutex, false);
0463 return this->my_successors.size();
0464 }
0465
0466 private:
0467
0468 graph_task* try_put_task_impl( const T &t
0469 __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
0470 {
0471 typename mutex_type::scoped_lock l(this->my_mutex, true);
0472 typename successors_type::iterator i = this->my_successors.begin();
0473 while ( i != this->my_successors.end() ) {
0474 graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0475 if ( new_task ) {
0476 return new_task;
0477 } else {
0478 if ( (*i)->register_predecessor(*this->my_owner) ) {
0479 i = this->my_successors.erase(i);
0480 }
0481 else {
0482 ++i;
0483 }
0484 }
0485 }
0486 return nullptr;
0487 }
0488
0489 public:
0490 graph_task* try_put_task(const T& t) override {
0491 return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0492 }
0493
0494 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0495 graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
0496 return try_put_task_impl(t, metainfo);
0497 }
0498 #endif
0499 };
0500
0501 #endif