File indexing completed on 2025-07-30 08:46:16
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_cache_impl_H
0018 #define __TBB__flow_graph_cache_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024
0025
0026
0027 template< typename T, typename M=spin_mutex >
0028 class node_cache {
0029 public:
0030
0031 typedef size_t size_type;
0032
0033 bool empty() {
0034 typename mutex_type::scoped_lock lock( my_mutex );
0035 return internal_empty();
0036 }
0037
0038 void add( T &n ) {
0039 typename mutex_type::scoped_lock lock( my_mutex );
0040 internal_push(n);
0041 }
0042
0043 void remove( T &n ) {
0044 typename mutex_type::scoped_lock lock( my_mutex );
0045 for ( size_t i = internal_size(); i != 0; --i ) {
0046 T &s = internal_pop();
0047 if ( &s == &n )
0048 break;
0049 internal_push(s);
0050 }
0051 }
0052
0053 void clear() {
0054 while( !my_q.empty()) (void)my_q.pop();
0055 }
0056
0057 protected:
0058
0059 typedef M mutex_type;
0060 mutex_type my_mutex;
0061 std::queue< T * > my_q;
0062
0063
0064 inline bool internal_empty( ) {
0065 return my_q.empty();
0066 }
0067
0068
0069 inline size_type internal_size( ) {
0070 return my_q.size();
0071 }
0072
0073
0074 inline void internal_push( T &n ) {
0075 my_q.push(&n);
0076 }
0077
0078
0079 inline T &internal_pop() {
0080 T *v = my_q.front();
0081 my_q.pop();
0082 return *v;
0083 }
0084
0085 };
0086
0087
0088 template< typename T, typename M=spin_mutex >
0089 class predecessor_cache : public node_cache< sender<T>, M > {
0090 public:
0091 typedef M mutex_type;
0092 typedef T output_type;
0093 typedef sender<output_type> predecessor_type;
0094 typedef receiver<output_type> successor_type;
0095
0096 predecessor_cache( successor_type* owner ) : my_owner( owner ) {
0097 __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." );
0098
0099 }
0100
0101 bool get_item( output_type& v ) {
0102
0103 bool msg = false;
0104
0105 do {
0106 predecessor_type *src;
0107 {
0108 typename mutex_type::scoped_lock lock(this->my_mutex);
0109 if ( this->internal_empty() ) {
0110 break;
0111 }
0112 src = &this->internal_pop();
0113 }
0114
0115
0116 msg = src->try_get( v );
0117
0118 if (msg == false) {
0119
0120 register_successor(*src, *my_owner);
0121 } else {
0122
0123 this->add(*src);
0124 }
0125 } while ( msg == false );
0126 return msg;
0127 }
0128
0129
0130 void reset() {
0131 for(;;) {
0132 predecessor_type *src;
0133 {
0134 if (this->internal_empty()) break;
0135 src = &this->internal_pop();
0136 }
0137 register_successor(*src, *my_owner);
0138 }
0139 }
0140
0141 protected:
0142 successor_type* my_owner;
0143 };
0144
0145
0146 template< typename T, typename M=spin_mutex >
0147 class reservable_predecessor_cache : public predecessor_cache< T, M > {
0148 public:
0149 typedef M mutex_type;
0150 typedef T output_type;
0151 typedef sender<T> predecessor_type;
0152 typedef receiver<T> successor_type;
0153
0154 reservable_predecessor_cache( successor_type* owner )
0155 : predecessor_cache<T,M>(owner), reserved_src(nullptr)
0156 {
0157
0158 }
0159
0160 bool try_reserve( output_type &v ) {
0161 bool msg = false;
0162
0163 do {
0164 predecessor_type* pred = nullptr;
0165 {
0166 typename mutex_type::scoped_lock lock(this->my_mutex);
0167 if ( reserved_src.load(std::memory_order_relaxed) || this->internal_empty() )
0168 return false;
0169
0170 pred = &this->internal_pop();
0171 reserved_src.store(pred, std::memory_order_relaxed);
0172 }
0173
0174
0175 msg = pred->try_reserve( v );
0176
0177 if (msg == false) {
0178 typename mutex_type::scoped_lock lock(this->my_mutex);
0179
0180 register_successor( *pred, *this->my_owner );
0181 reserved_src.store(nullptr, std::memory_order_relaxed);
0182 } else {
0183
0184 this->add( *pred);
0185 }
0186 } while ( msg == false );
0187
0188 return msg;
0189 }
0190
0191 bool try_release() {
0192 reserved_src.load(std::memory_order_relaxed)->try_release();
0193 reserved_src.store(nullptr, std::memory_order_relaxed);
0194 return true;
0195 }
0196
0197 bool try_consume() {
0198 reserved_src.load(std::memory_order_relaxed)->try_consume();
0199 reserved_src.store(nullptr, std::memory_order_relaxed);
0200 return true;
0201 }
0202
0203 void reset() {
0204 reserved_src.store(nullptr, std::memory_order_relaxed);
0205 predecessor_cache<T, M>::reset();
0206 }
0207
0208 void clear() {
0209 reserved_src.store(nullptr, std::memory_order_relaxed);
0210 predecessor_cache<T, M>::clear();
0211 }
0212
0213 private:
0214 std::atomic<predecessor_type*> reserved_src;
0215 };
0216
0217
0218
0219 template<typename T, typename M=spin_rw_mutex >
0220 class successor_cache : no_copy {
0221 protected:
0222
0223 typedef M mutex_type;
0224 mutex_type my_mutex;
0225
0226 typedef receiver<T> successor_type;
0227 typedef receiver<T>* pointer_type;
0228 typedef sender<T> owner_type;
0229
0230 typedef std::list< pointer_type > successors_type;
0231 successors_type my_successors;
0232
0233 owner_type* my_owner;
0234
0235 public:
0236 successor_cache( owner_type* owner ) : my_owner(owner) {
0237
0238 }
0239
0240 virtual ~successor_cache() {}
0241
0242 void register_successor( successor_type& r ) {
0243 typename mutex_type::scoped_lock l(my_mutex, true);
0244 if( r.priority() != no_priority )
0245 my_successors.push_front( &r );
0246 else
0247 my_successors.push_back( &r );
0248 }
0249
0250 void remove_successor( successor_type& r ) {
0251 typename mutex_type::scoped_lock l(my_mutex, true);
0252 for ( typename successors_type::iterator i = my_successors.begin();
0253 i != my_successors.end(); ++i ) {
0254 if ( *i == & r ) {
0255 my_successors.erase(i);
0256 break;
0257 }
0258 }
0259 }
0260
0261 bool empty() {
0262 typename mutex_type::scoped_lock l(my_mutex, false);
0263 return my_successors.empty();
0264 }
0265
0266 void clear() {
0267 my_successors.clear();
0268 }
0269
0270 virtual graph_task* try_put_task( const T& t ) = 0;
0271 };
0272
0273
0274 template<typename M>
0275 class successor_cache< continue_msg, M > : no_copy {
0276 protected:
0277
0278 typedef M mutex_type;
0279 mutex_type my_mutex;
0280
0281 typedef receiver<continue_msg> successor_type;
0282 typedef receiver<continue_msg>* pointer_type;
0283 typedef sender<continue_msg> owner_type;
0284 typedef std::list< pointer_type > successors_type;
0285 successors_type my_successors;
0286 owner_type* my_owner;
0287
0288 public:
0289 successor_cache( sender<continue_msg>* owner ) : my_owner(owner) {
0290
0291 }
0292
0293 virtual ~successor_cache() {}
0294
0295 void register_successor( successor_type& r ) {
0296 typename mutex_type::scoped_lock l(my_mutex, true);
0297 if( r.priority() != no_priority )
0298 my_successors.push_front( &r );
0299 else
0300 my_successors.push_back( &r );
0301 __TBB_ASSERT( my_owner, "Cache of successors must have an owner." );
0302 if ( r.is_continue_receiver() ) {
0303 r.register_predecessor( *my_owner );
0304 }
0305 }
0306
0307 void remove_successor( successor_type& r ) {
0308 typename mutex_type::scoped_lock l(my_mutex, true);
0309 for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) {
0310 if ( *i == &r ) {
0311 __TBB_ASSERT(my_owner, "Cache of successors must have an owner.");
0312
0313 r.remove_predecessor( *my_owner );
0314 my_successors.erase(i);
0315 break;
0316 }
0317 }
0318 }
0319
0320 bool empty() {
0321 typename mutex_type::scoped_lock l(my_mutex, false);
0322 return my_successors.empty();
0323 }
0324
0325 void clear() {
0326 my_successors.clear();
0327 }
0328
0329 virtual graph_task* try_put_task( const continue_msg& t ) = 0;
0330 };
0331
0332
0333 template<typename T, typename M=spin_rw_mutex>
0334 class broadcast_cache : public successor_cache<T, M> {
0335 typedef successor_cache<T, M> base_type;
0336 typedef M mutex_type;
0337 typedef typename successor_cache<T,M>::successors_type successors_type;
0338
0339 public:
0340
0341 broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
0342
0343 }
0344
0345
0346 graph_task* try_put_task( const T &t ) override {
0347 graph_task * last_task = nullptr;
0348 typename mutex_type::scoped_lock l(this->my_mutex, true);
0349 typename successors_type::iterator i = this->my_successors.begin();
0350 while ( i != this->my_successors.end() ) {
0351 graph_task *new_task = (*i)->try_put_task(t);
0352
0353 graph& graph_ref = (*i)->graph_reference();
0354 last_task = combine_tasks(graph_ref, last_task, new_task);
0355 if(new_task) {
0356 ++i;
0357 }
0358 else {
0359 if ( (*i)->register_predecessor(*this->my_owner) ) {
0360 i = this->my_successors.erase(i);
0361 } else {
0362 ++i;
0363 }
0364 }
0365 }
0366 return last_task;
0367 }
0368
0369
0370 bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
0371 bool is_at_least_one_put_successful = false;
0372 typename mutex_type::scoped_lock l(this->my_mutex, true);
0373 typename successors_type::iterator i = this->my_successors.begin();
0374 while ( i != this->my_successors.end() ) {
0375 graph_task * new_task = (*i)->try_put_task(t);
0376 if(new_task) {
0377 ++i;
0378 if(new_task != SUCCESSFULLY_ENQUEUED) {
0379 tasks.push_back(*new_task);
0380 }
0381 is_at_least_one_put_successful = true;
0382 }
0383 else {
0384 if ( (*i)->register_predecessor(*this->my_owner) ) {
0385 i = this->my_successors.erase(i);
0386 } else {
0387 ++i;
0388 }
0389 }
0390 }
0391 return is_at_least_one_put_successful;
0392 }
0393 };
0394
0395
0396 template<typename T, typename M=spin_rw_mutex >
0397 class round_robin_cache : public successor_cache<T, M> {
0398 typedef successor_cache<T, M> base_type;
0399 typedef size_t size_type;
0400 typedef M mutex_type;
0401 typedef typename successor_cache<T,M>::successors_type successors_type;
0402
0403 public:
0404
0405 round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) {
0406
0407 }
0408
0409 size_type size() {
0410 typename mutex_type::scoped_lock l(this->my_mutex, false);
0411 return this->my_successors.size();
0412 }
0413
0414 graph_task* try_put_task( const T &t ) override {
0415 typename mutex_type::scoped_lock l(this->my_mutex, true);
0416 typename successors_type::iterator i = this->my_successors.begin();
0417 while ( i != this->my_successors.end() ) {
0418 graph_task* new_task = (*i)->try_put_task(t);
0419 if ( new_task ) {
0420 return new_task;
0421 } else {
0422 if ( (*i)->register_predecessor(*this->my_owner) ) {
0423 i = this->my_successors.erase(i);
0424 }
0425 else {
0426 ++i;
0427 }
0428 }
0429 }
0430 return nullptr;
0431 }
0432 };
0433
0434 #endif