Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:46

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_cache_impl_H
0018 #define __TBB__flow_graph_cache_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 // included in namespace tbb::flow::interfaceX (in flow_graph.h)
0025 
0026 namespace internal {
0027 
0028 //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock.
0029 template< typename T, typename M=spin_mutex >
0030 class node_cache {
0031     public:
0032 
0033     typedef size_t size_type;
0034 
0035     bool empty() {
0036         typename mutex_type::scoped_lock lock( my_mutex );
0037         return internal_empty();
0038     }
0039 
0040     void add( T &n ) {
0041         typename mutex_type::scoped_lock lock( my_mutex );
0042         internal_push(n);
0043     }
0044 
0045     void remove( T &n ) {
0046         typename mutex_type::scoped_lock lock( my_mutex );
0047         for ( size_t i = internal_size(); i != 0; --i ) {
0048             T &s = internal_pop();
0049             if ( &s == &n )  return;  // only remove one predecessor per request
0050             internal_push(s);
0051         }
0052     }
0053 
0054     void clear() {
0055         while( !my_q.empty()) (void)my_q.pop();
0056 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0057         my_built_predecessors.clear();
0058 #endif
0059     }
0060 
0061 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0062     typedef edge_container<T> built_predecessors_type;
0063     built_predecessors_type &built_predecessors() { return my_built_predecessors; }
0064 
0065     typedef typename edge_container<T>::edge_list_type predecessor_list_type;
0066     void internal_add_built_predecessor( T &n ) {
0067         typename mutex_type::scoped_lock lock( my_mutex );
0068         my_built_predecessors.add_edge(n);
0069     }
0070 
0071     void internal_delete_built_predecessor( T &n ) {
0072         typename mutex_type::scoped_lock lock( my_mutex );
0073         my_built_predecessors.delete_edge(n);
0074     }
0075 
0076     void copy_predecessors( predecessor_list_type &v) {
0077         typename mutex_type::scoped_lock lock( my_mutex );
0078         my_built_predecessors.copy_edges(v);
0079     }
0080 
0081     size_t predecessor_count() {
0082         typename mutex_type::scoped_lock lock(my_mutex);
0083         return (size_t)(my_built_predecessors.edge_count());
0084     }
0085 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0086 
0087 protected:
0088 
0089     typedef M mutex_type;
0090     mutex_type my_mutex;
0091     std::queue< T * > my_q;
0092 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0093     built_predecessors_type my_built_predecessors;
0094 #endif
0095 
0096     // Assumes lock is held
0097     inline bool internal_empty( )  {
0098         return my_q.empty();
0099     }
0100 
0101     // Assumes lock is held
0102     inline size_type internal_size( )  {
0103         return my_q.size();
0104     }
0105 
0106     // Assumes lock is held
0107     inline void internal_push( T &n )  {
0108         my_q.push(&n);
0109     }
0110 
0111     // Assumes lock is held
0112     inline T &internal_pop() {
0113         T *v = my_q.front();
0114         my_q.pop();
0115         return *v;
0116     }
0117 
0118 };
0119 
0120 //! A cache of predecessors that only supports try_get
0121 template< typename T, typename M=spin_mutex >
0122 #if __TBB_PREVIEW_ASYNC_MSG
0123 // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
0124 class predecessor_cache : public node_cache< untyped_sender, M > {
0125 #else
0126 class predecessor_cache : public node_cache< sender<T>, M > {
0127 #endif // __TBB_PREVIEW_ASYNC_MSG
0128 public:
0129     typedef M mutex_type;
0130     typedef T output_type;
0131 #if __TBB_PREVIEW_ASYNC_MSG
0132     typedef untyped_sender predecessor_type;
0133     typedef untyped_receiver successor_type;
0134 #else
0135     typedef sender<output_type> predecessor_type;
0136     typedef receiver<output_type> successor_type;
0137 #endif // __TBB_PREVIEW_ASYNC_MSG
0138 
0139     predecessor_cache( ) : my_owner( NULL ) { }
0140 
0141     void set_owner( successor_type *owner ) { my_owner = owner; }
0142 
0143     bool get_item( output_type &v ) {
0144 
0145         bool msg = false;
0146 
0147         do {
0148             predecessor_type *src;
0149             {
0150                 typename mutex_type::scoped_lock lock(this->my_mutex);
0151                 if ( this->internal_empty() ) {
0152                     break;
0153                 }
0154                 src = &this->internal_pop();
0155             }
0156 
0157             // Try to get from this sender
0158             msg = src->try_get( v );
0159 
0160             if (msg == false) {
0161                 // Relinquish ownership of the edge
0162                 if (my_owner)
0163                     src->register_successor( *my_owner );
0164             } else {
0165                 // Retain ownership of the edge
0166                 this->add(*src);
0167             }
0168         } while ( msg == false );
0169         return msg;
0170     }
0171 
0172     // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
0173     void reset() {
0174         if (my_owner) {
0175             for(;;) {
0176                 predecessor_type *src;
0177                 {
0178                     if (this->internal_empty()) break;
0179                     src = &this->internal_pop();
0180                 }
0181                 src->register_successor( *my_owner );
0182             }
0183         }
0184     }
0185 
0186 protected:
0187 
0188 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0189     using node_cache< predecessor_type, M >::my_built_predecessors;
0190 #endif
0191     successor_type *my_owner;
0192 };
0193 
0194 //! An cache of predecessors that supports requests and reservations
0195 // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
0196 template< typename T, typename M=spin_mutex >
0197 class reservable_predecessor_cache : public predecessor_cache< T, M > {
0198 public:
0199     typedef M mutex_type;
0200     typedef T output_type;
0201 #if __TBB_PREVIEW_ASYNC_MSG
0202     typedef untyped_sender predecessor_type;
0203     typedef untyped_receiver successor_type;
0204 #else
0205     typedef sender<T> predecessor_type;
0206     typedef receiver<T> successor_type;
0207 #endif // __TBB_PREVIEW_ASYNC_MSG
0208 
0209     reservable_predecessor_cache( ) : reserved_src(NULL) { }
0210 
0211     bool
0212     try_reserve( output_type &v ) {
0213         bool msg = false;
0214 
0215         do {
0216             {
0217                 typename mutex_type::scoped_lock lock(this->my_mutex);
0218                 if ( reserved_src || this->internal_empty() )
0219                     return false;
0220 
0221                 reserved_src = &this->internal_pop();
0222             }
0223 
0224             // Try to get from this sender
0225             msg = reserved_src->try_reserve( v );
0226 
0227             if (msg == false) {
0228                 typename mutex_type::scoped_lock lock(this->my_mutex);
0229                 // Relinquish ownership of the edge
0230                 reserved_src->register_successor( *this->my_owner );
0231                 reserved_src = NULL;
0232             } else {
0233                 // Retain ownership of the edge
0234                 this->add( *reserved_src );
0235             }
0236         } while ( msg == false );
0237 
0238         return msg;
0239     }
0240 
0241     bool
0242     try_release( ) {
0243         reserved_src->try_release( );
0244         reserved_src = NULL;
0245         return true;
0246     }
0247 
0248     bool
0249     try_consume( ) {
0250         reserved_src->try_consume( );
0251         reserved_src = NULL;
0252         return true;
0253     }
0254 
0255     void reset( ) {
0256         reserved_src = NULL;
0257         predecessor_cache<T,M>::reset( );
0258     }
0259 
0260     void clear() {
0261         reserved_src = NULL;
0262         predecessor_cache<T,M>::clear();
0263     }
0264 
0265 private:
0266     predecessor_type *reserved_src;
0267 };
0268 
0269 
0270 //! An abstract cache of successors
0271 // TODO: make successor_cache type T-independent when async_msg becomes regular feature
0272 template<typename T, typename M=spin_rw_mutex >
0273 class successor_cache : tbb::internal::no_copy {
0274 protected:
0275 
0276     typedef M mutex_type;
0277     mutex_type my_mutex;
0278 
0279 #if __TBB_PREVIEW_ASYNC_MSG
0280     typedef untyped_receiver successor_type;
0281     typedef untyped_receiver *pointer_type;
0282     typedef untyped_sender owner_type;
0283 #else
0284     typedef receiver<T> successor_type;
0285     typedef receiver<T> *pointer_type;
0286     typedef sender<T> owner_type;
0287 #endif // __TBB_PREVIEW_ASYNC_MSG
0288     typedef std::list< pointer_type > successors_type;
0289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0290     edge_container<successor_type> my_built_successors;
0291 #endif
0292     successors_type my_successors;
0293 
0294     owner_type *my_owner;
0295 
0296 public:
0297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0298     typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
0299 
0300     edge_container<successor_type> &built_successors() { return my_built_successors; }
0301 
0302     void internal_add_built_successor( successor_type &r) {
0303         typename mutex_type::scoped_lock l(my_mutex, true);
0304         my_built_successors.add_edge( r );
0305     }
0306 
0307     void internal_delete_built_successor( successor_type &r) {
0308         typename mutex_type::scoped_lock l(my_mutex, true);
0309         my_built_successors.delete_edge(r);
0310     }
0311 
0312     void copy_successors( successor_list_type &v) {
0313         typename mutex_type::scoped_lock l(my_mutex, false);
0314         my_built_successors.copy_edges(v);
0315     }
0316 
0317     size_t successor_count() {
0318         typename mutex_type::scoped_lock l(my_mutex,false);
0319         return my_built_successors.edge_count();
0320     }
0321 
0322 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0323 
0324     successor_cache( ) : my_owner(NULL) {}
0325 
0326     void set_owner( owner_type *owner ) { my_owner = owner; }
0327 
0328     virtual ~successor_cache() {}
0329 
0330     void register_successor( successor_type &r ) {
0331         typename mutex_type::scoped_lock l(my_mutex, true);
0332         my_successors.push_back( &r );
0333     }
0334 
0335     void remove_successor( successor_type &r ) {
0336         typename mutex_type::scoped_lock l(my_mutex, true);
0337         for ( typename successors_type::iterator i = my_successors.begin();
0338               i != my_successors.end(); ++i ) {
0339             if ( *i == & r ) {
0340                 my_successors.erase(i);
0341                 break;
0342             }
0343         }
0344     }
0345 
0346     bool empty() {
0347         typename mutex_type::scoped_lock l(my_mutex, false);
0348         return my_successors.empty();
0349     }
0350 
0351     void clear() {
0352         my_successors.clear();
0353 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0354         my_built_successors.clear();
0355 #endif
0356     }
0357 
0358 #if !__TBB_PREVIEW_ASYNC_MSG
0359     virtual task * try_put_task( const T &t ) = 0;
0360 #endif // __TBB_PREVIEW_ASYNC_MSG
0361  };  // successor_cache<T>
0362 
0363 //! An abstract cache of successors, specialized to continue_msg
0364 template<typename M>
0365 class successor_cache< continue_msg, M > : tbb::internal::no_copy {
0366 protected:
0367 
0368     typedef M mutex_type;
0369     mutex_type my_mutex;
0370 
0371 #if __TBB_PREVIEW_ASYNC_MSG
0372     typedef untyped_receiver successor_type;
0373     typedef untyped_receiver *pointer_type;
0374 #else
0375     typedef receiver<continue_msg> successor_type;
0376     typedef receiver<continue_msg> *pointer_type;
0377 #endif // __TBB_PREVIEW_ASYNC_MSG
0378     typedef std::list< pointer_type > successors_type;
0379     successors_type my_successors;
0380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0381     edge_container<successor_type> my_built_successors;
0382     typedef edge_container<successor_type>::edge_list_type successor_list_type;
0383 #endif
0384 
0385     sender<continue_msg> *my_owner;
0386 
0387 public:
0388 
0389 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0390 
0391     edge_container<successor_type> &built_successors() { return my_built_successors; }
0392 
0393     void internal_add_built_successor( successor_type &r) {
0394         typename mutex_type::scoped_lock l(my_mutex, true);
0395         my_built_successors.add_edge( r );
0396     }
0397 
0398     void internal_delete_built_successor( successor_type &r) {
0399         typename mutex_type::scoped_lock l(my_mutex, true);
0400         my_built_successors.delete_edge(r);
0401     }
0402 
0403     void copy_successors( successor_list_type &v) {
0404         typename mutex_type::scoped_lock l(my_mutex, false);
0405         my_built_successors.copy_edges(v);
0406     }
0407 
0408     size_t successor_count() {
0409         typename mutex_type::scoped_lock l(my_mutex,false);
0410         return my_built_successors.edge_count();
0411     }
0412 
0413 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
0414 
0415     successor_cache( ) : my_owner(NULL) {}
0416 
0417     void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
0418 
0419     virtual ~successor_cache() {}
0420 
0421     void register_successor( successor_type &r ) {
0422         typename mutex_type::scoped_lock l(my_mutex, true);
0423         my_successors.push_back( &r );
0424         if ( my_owner && r.is_continue_receiver() ) {
0425             r.register_predecessor( *my_owner );
0426         }
0427     }
0428 
0429     void remove_successor( successor_type &r ) {
0430         typename mutex_type::scoped_lock l(my_mutex, true);
0431         for ( successors_type::iterator i = my_successors.begin();
0432               i != my_successors.end(); ++i ) {
0433             if ( *i == & r ) {
0434                 // TODO: Check if we need to test for continue_receiver before
0435                 // removing from r.
0436                 if ( my_owner )
0437                     r.remove_predecessor( *my_owner );
0438                 my_successors.erase(i);
0439                 break;
0440             }
0441         }
0442     }
0443 
0444     bool empty() {
0445         typename mutex_type::scoped_lock l(my_mutex, false);
0446         return my_successors.empty();
0447     }
0448 
0449     void clear() {
0450         my_successors.clear();
0451 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
0452         my_built_successors.clear();
0453 #endif
0454     }
0455 
0456 #if !__TBB_PREVIEW_ASYNC_MSG
0457     virtual task * try_put_task( const continue_msg &t ) = 0;
0458 #endif // __TBB_PREVIEW_ASYNC_MSG
0459 
0460 };  // successor_cache< continue_msg >
0461 
0462 //! A cache of successors that are broadcast to
0463 // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
0464 template<typename T, typename M=spin_rw_mutex>
0465 class broadcast_cache : public successor_cache<T, M> {
0466     typedef M mutex_type;
0467     typedef typename successor_cache<T,M>::successors_type successors_type;
0468 
0469 public:
0470 
0471     broadcast_cache( ) {}
0472 
0473     // as above, but call try_put_task instead, and return the last task we received (if any)
0474 #if __TBB_PREVIEW_ASYNC_MSG
0475     template<typename X>
0476     task * try_put_task( const X &t ) {
0477 #else
0478     task * try_put_task( const T &t ) __TBB_override {
0479 #endif // __TBB_PREVIEW_ASYNC_MSG
0480         task * last_task = NULL;
0481         bool upgraded = true;
0482         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
0483         typename successors_type::iterator i = this->my_successors.begin();
0484         while ( i != this->my_successors.end() ) {
0485             task *new_task = (*i)->try_put_task(t);
0486             // workaround for icc bug
0487             graph& graph_ref = (*i)->graph_reference();
0488             last_task = combine_tasks(graph_ref, last_task, new_task);  // enqueue if necessary
0489             if(new_task) {
0490                 ++i;
0491             }
0492             else {  // failed
0493                 if ( (*i)->register_predecessor(*this->my_owner) ) {
0494                     if (!upgraded) {
0495                         l.upgrade_to_writer();
0496                         upgraded = true;
0497                     }
0498                     i = this->my_successors.erase(i);
0499                 } else {
0500                     ++i;
0501                 }
0502             }
0503         }
0504         return last_task;
0505     }
0506 
0507     // call try_put_task and return list of received tasks
0508 #if __TBB_PREVIEW_ASYNC_MSG
0509     template<typename X>
0510     bool gather_successful_try_puts( const X &t, task_list &tasks ) {
0511 #else
0512     bool gather_successful_try_puts( const T &t, task_list &tasks ) {
0513 #endif // __TBB_PREVIEW_ASYNC_MSG
0514         bool upgraded = true;
0515         bool is_at_least_one_put_successful = false;
0516         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
0517         typename successors_type::iterator i = this->my_successors.begin();
0518         while ( i != this->my_successors.end() ) {
0519             task * new_task = (*i)->try_put_task(t);
0520             if(new_task) {
0521                 ++i;
0522                 if(new_task != SUCCESSFULLY_ENQUEUED) {
0523                     tasks.push_back(*new_task);
0524                 }
0525                 is_at_least_one_put_successful = true;
0526             }
0527             else {  // failed
0528                 if ( (*i)->register_predecessor(*this->my_owner) ) {
0529                     if (!upgraded) {
0530                         l.upgrade_to_writer();
0531                         upgraded = true;
0532                     }
0533                     i = this->my_successors.erase(i);
0534                 } else {
0535                     ++i;
0536                 }
0537             }
0538         }
0539         return is_at_least_one_put_successful;
0540     }
0541 };
0542 
0543 //! A cache of successors that are put in a round-robin fashion
0544 // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
0545 template<typename T, typename M=spin_rw_mutex >
0546 class round_robin_cache : public successor_cache<T, M> {
0547     typedef size_t size_type;
0548     typedef M mutex_type;
0549     typedef typename successor_cache<T,M>::successors_type successors_type;
0550 
0551 public:
0552 
0553     round_robin_cache( ) {}
0554 
0555     size_type size() {
0556         typename mutex_type::scoped_lock l(this->my_mutex, false);
0557         return this->my_successors.size();
0558     }
0559 
0560 #if __TBB_PREVIEW_ASYNC_MSG
0561     template<typename X>
0562     task * try_put_task( const X &t ) {
0563 #else
0564     task *try_put_task( const T &t ) __TBB_override {
0565 #endif // __TBB_PREVIEW_ASYNC_MSG
0566         bool upgraded = true;
0567         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
0568         typename successors_type::iterator i = this->my_successors.begin();
0569         while ( i != this->my_successors.end() ) {
0570             task *new_task = (*i)->try_put_task(t);
0571             if ( new_task ) {
0572                 return new_task;
0573             } else {
0574                if ( (*i)->register_predecessor(*this->my_owner) ) {
0575                    if (!upgraded) {
0576                        l.upgrade_to_writer();
0577                        upgraded = true;
0578                    }
0579                    i = this->my_successors.erase(i);
0580                }
0581                else {
0582                    ++i;
0583                }
0584             }
0585         }
0586         return NULL;
0587     }
0588 };
0589 
0590 } // namespace internal
0591 
0592 #endif // __TBB__flow_graph_cache_impl_H