Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-30 08:46:16

0001 /*
0002     Copyright (c) 2005-2022 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_cache_impl_H
0018 #define __TBB__flow_graph_cache_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 // included in namespace tbb::detail::d1 (in flow_graph.h)
0025 
0026 //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock.
0027 template< typename T, typename M=spin_mutex >
0028 class node_cache {
0029     public:
0030 
0031     typedef size_t size_type;
0032 
0033     bool empty() {
0034         typename mutex_type::scoped_lock lock( my_mutex );
0035         return internal_empty();
0036     }
0037 
0038     void add( T &n ) {
0039         typename mutex_type::scoped_lock lock( my_mutex );
0040         internal_push(n);
0041     }
0042 
0043     void remove( T &n ) {
0044         typename mutex_type::scoped_lock lock( my_mutex );
0045         for ( size_t i = internal_size(); i != 0; --i ) {
0046             T &s = internal_pop();
0047             if ( &s == &n )
0048                 break;  // only remove one predecessor per request
0049             internal_push(s);
0050         }
0051     }
0052 
0053     void clear() {
0054         while( !my_q.empty()) (void)my_q.pop();
0055     }
0056 
0057 protected:
0058 
0059     typedef M mutex_type;
0060     mutex_type my_mutex;
0061     std::queue< T * > my_q;
0062 
0063     // Assumes lock is held
0064     inline bool internal_empty( )  {
0065         return my_q.empty();
0066     }
0067 
0068     // Assumes lock is held
0069     inline size_type internal_size( )  {
0070         return my_q.size();
0071     }
0072 
0073     // Assumes lock is held
0074     inline void internal_push( T &n )  {
0075         my_q.push(&n);
0076     }
0077 
0078     // Assumes lock is held
0079     inline T &internal_pop() {
0080         T *v = my_q.front();
0081         my_q.pop();
0082         return *v;
0083     }
0084 
0085 };
0086 
0087 //! A cache of predecessors that only supports try_get
0088 template< typename T, typename M=spin_mutex >
0089 class predecessor_cache : public node_cache< sender<T>, M > {
0090 public:
0091     typedef M mutex_type;
0092     typedef T output_type;
0093     typedef sender<output_type> predecessor_type;
0094     typedef receiver<output_type> successor_type;
0095 
0096     predecessor_cache( successor_type* owner ) : my_owner( owner ) {
0097         __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." );
0098         // Do not work with the passed pointer here as it may not be fully initialized yet
0099     }
0100 
0101     bool get_item( output_type& v ) {
0102 
0103         bool msg = false;
0104 
0105         do {
0106             predecessor_type *src;
0107             {
0108                 typename mutex_type::scoped_lock lock(this->my_mutex);
0109                 if ( this->internal_empty() ) {
0110                     break;
0111                 }
0112                 src = &this->internal_pop();
0113             }
0114 
0115             // Try to get from this sender
0116             msg = src->try_get( v );
0117 
0118             if (msg == false) {
0119                 // Relinquish ownership of the edge
0120                 register_successor(*src, *my_owner);
0121             } else {
0122                 // Retain ownership of the edge
0123                 this->add(*src);
0124             }
0125         } while ( msg == false );
0126         return msg;
0127     }
0128 
0129     // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
0130     void reset() {
0131         for(;;) {
0132             predecessor_type *src;
0133             {
0134                 if (this->internal_empty()) break;
0135                 src = &this->internal_pop();
0136             }
0137             register_successor(*src, *my_owner);
0138         }
0139     }
0140 
0141 protected:
0142     successor_type* my_owner;
0143 };
0144 
0145 //! An cache of predecessors that supports requests and reservations
0146 template< typename T, typename M=spin_mutex >
0147 class reservable_predecessor_cache : public predecessor_cache< T, M > {
0148 public:
0149     typedef M mutex_type;
0150     typedef T output_type;
0151     typedef sender<T> predecessor_type;
0152     typedef receiver<T> successor_type;
0153 
0154     reservable_predecessor_cache( successor_type* owner )
0155         : predecessor_cache<T,M>(owner), reserved_src(nullptr)
0156     {
0157         // Do not work with the passed pointer here as it may not be fully initialized yet
0158     }
0159 
0160     bool try_reserve( output_type &v ) {
0161         bool msg = false;
0162 
0163         do {
0164             predecessor_type* pred = nullptr;
0165             {
0166                 typename mutex_type::scoped_lock lock(this->my_mutex);
0167                 if ( reserved_src.load(std::memory_order_relaxed) || this->internal_empty() )
0168                     return false;
0169 
0170                 pred = &this->internal_pop();
0171                 reserved_src.store(pred, std::memory_order_relaxed);
0172             }
0173 
0174             // Try to get from this sender
0175             msg = pred->try_reserve( v );
0176 
0177             if (msg == false) {
0178                 typename mutex_type::scoped_lock lock(this->my_mutex);
0179                 // Relinquish ownership of the edge
0180                 register_successor( *pred, *this->my_owner );
0181                 reserved_src.store(nullptr, std::memory_order_relaxed);
0182             } else {
0183                 // Retain ownership of the edge
0184                 this->add( *pred);
0185             }
0186         } while ( msg == false );
0187 
0188         return msg;
0189     }
0190 
0191     bool try_release() {
0192         reserved_src.load(std::memory_order_relaxed)->try_release();
0193         reserved_src.store(nullptr, std::memory_order_relaxed);
0194         return true;
0195     }
0196 
0197     bool try_consume() {
0198         reserved_src.load(std::memory_order_relaxed)->try_consume();
0199         reserved_src.store(nullptr, std::memory_order_relaxed);
0200         return true;
0201     }
0202 
0203     void reset() {
0204         reserved_src.store(nullptr, std::memory_order_relaxed);
0205         predecessor_cache<T, M>::reset();
0206     }
0207 
0208     void clear() {
0209         reserved_src.store(nullptr, std::memory_order_relaxed);
0210         predecessor_cache<T, M>::clear();
0211     }
0212 
0213 private:
0214     std::atomic<predecessor_type*> reserved_src;
0215 };
0216 
0217 
0218 //! An abstract cache of successors
0219 template<typename T, typename M=spin_rw_mutex >
0220 class successor_cache : no_copy {
0221 protected:
0222 
0223     typedef M mutex_type;
0224     mutex_type my_mutex;
0225 
0226     typedef receiver<T> successor_type;
0227     typedef receiver<T>* pointer_type;
0228     typedef sender<T> owner_type;
0229     // TODO revamp: introduce heapified collection of successors for strict priorities
0230     typedef std::list< pointer_type > successors_type;
0231     successors_type my_successors;
0232 
0233     owner_type* my_owner;
0234 
0235 public:
0236     successor_cache( owner_type* owner ) : my_owner(owner) {
0237         // Do not work with the passed pointer here as it may not be fully initialized yet
0238     }
0239 
0240     virtual ~successor_cache() {}
0241 
0242     void register_successor( successor_type& r ) {
0243         typename mutex_type::scoped_lock l(my_mutex, true);
0244         if( r.priority() != no_priority )
0245             my_successors.push_front( &r );
0246         else
0247             my_successors.push_back( &r );
0248     }
0249 
0250     void remove_successor( successor_type& r ) {
0251         typename mutex_type::scoped_lock l(my_mutex, true);
0252         for ( typename successors_type::iterator i = my_successors.begin();
0253               i != my_successors.end(); ++i ) {
0254             if ( *i == & r ) {
0255                 my_successors.erase(i);
0256                 break;
0257             }
0258         }
0259     }
0260 
0261     bool empty() {
0262         typename mutex_type::scoped_lock l(my_mutex, false);
0263         return my_successors.empty();
0264     }
0265 
0266     void clear() {
0267         my_successors.clear();
0268     }
0269 
0270     virtual graph_task* try_put_task( const T& t ) = 0;
0271 };  // successor_cache<T>
0272 
0273 //! An abstract cache of successors, specialized to continue_msg
0274 template<typename M>
0275 class successor_cache< continue_msg, M > : no_copy {
0276 protected:
0277 
0278     typedef M mutex_type;
0279     mutex_type my_mutex;
0280 
0281     typedef receiver<continue_msg> successor_type;
0282     typedef receiver<continue_msg>* pointer_type;
0283     typedef sender<continue_msg> owner_type;
0284     typedef std::list< pointer_type > successors_type;
0285     successors_type my_successors;
0286     owner_type* my_owner;
0287 
0288 public:
0289     successor_cache( sender<continue_msg>* owner ) : my_owner(owner) {
0290         // Do not work with the passed pointer here as it may not be fully initialized yet
0291     }
0292 
0293     virtual ~successor_cache() {}
0294 
0295     void register_successor( successor_type& r ) {
0296         typename mutex_type::scoped_lock l(my_mutex, true);
0297         if( r.priority() != no_priority )
0298             my_successors.push_front( &r );
0299         else
0300             my_successors.push_back( &r );
0301         __TBB_ASSERT( my_owner, "Cache of successors must have an owner." );
0302         if ( r.is_continue_receiver() ) {
0303             r.register_predecessor( *my_owner );
0304         }
0305     }
0306 
0307     void remove_successor( successor_type& r ) {
0308         typename mutex_type::scoped_lock l(my_mutex, true);
0309         for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) {
0310             if ( *i == &r ) {
0311                 __TBB_ASSERT(my_owner, "Cache of successors must have an owner.");
0312                 // TODO: check if we need to test for continue_receiver before removing from r.
0313                 r.remove_predecessor( *my_owner );
0314                 my_successors.erase(i);
0315                 break;
0316             }
0317         }
0318     }
0319 
0320     bool empty() {
0321         typename mutex_type::scoped_lock l(my_mutex, false);
0322         return my_successors.empty();
0323     }
0324 
0325     void clear() {
0326         my_successors.clear();
0327     }
0328 
0329     virtual graph_task* try_put_task( const continue_msg& t ) = 0;
0330 };  // successor_cache< continue_msg >
0331 
0332 //! A cache of successors that are broadcast to
0333 template<typename T, typename M=spin_rw_mutex>
0334 class broadcast_cache : public successor_cache<T, M> {
0335     typedef successor_cache<T, M> base_type;
0336     typedef M mutex_type;
0337     typedef typename successor_cache<T,M>::successors_type successors_type;
0338 
0339 public:
0340 
0341     broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
0342         // Do not work with the passed pointer here as it may not be fully initialized yet
0343     }
0344 
0345     // as above, but call try_put_task instead, and return the last task we received (if any)
0346     graph_task* try_put_task( const T &t ) override {
0347         graph_task * last_task = nullptr;
0348         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
0349         typename successors_type::iterator i = this->my_successors.begin();
0350         while ( i != this->my_successors.end() ) {
0351             graph_task *new_task = (*i)->try_put_task(t);
0352             // workaround for icc bug
0353             graph& graph_ref = (*i)->graph_reference();
0354             last_task = combine_tasks(graph_ref, last_task, new_task);  // enqueue if necessary
0355             if(new_task) {
0356                 ++i;
0357             }
0358             else {  // failed
0359                 if ( (*i)->register_predecessor(*this->my_owner) ) {
0360                     i = this->my_successors.erase(i);
0361                 } else {
0362                     ++i;
0363                 }
0364             }
0365         }
0366         return last_task;
0367     }
0368 
0369     // call try_put_task and return list of received tasks
0370     bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
0371         bool is_at_least_one_put_successful = false;
0372         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
0373         typename successors_type::iterator i = this->my_successors.begin();
0374         while ( i != this->my_successors.end() ) {
0375             graph_task * new_task = (*i)->try_put_task(t);
0376             if(new_task) {
0377                 ++i;
0378                 if(new_task != SUCCESSFULLY_ENQUEUED) {
0379                     tasks.push_back(*new_task);
0380                 }
0381                 is_at_least_one_put_successful = true;
0382             }
0383             else {  // failed
0384                 if ( (*i)->register_predecessor(*this->my_owner) ) {
0385                     i = this->my_successors.erase(i);
0386                 } else {
0387                     ++i;
0388                 }
0389             }
0390         }
0391         return is_at_least_one_put_successful;
0392     }
0393 };
0394 
0395 //! A cache of successors that are put in a round-robin fashion
0396 template<typename T, typename M=spin_rw_mutex >
0397 class round_robin_cache : public successor_cache<T, M> {
0398     typedef successor_cache<T, M> base_type;
0399     typedef size_t size_type;
0400     typedef M mutex_type;
0401     typedef typename successor_cache<T,M>::successors_type successors_type;
0402 
0403 public:
0404 
0405     round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) {
0406         // Do not work with the passed pointer here as it may not be fully initialized yet
0407     }
0408 
0409     size_type size() {
0410         typename mutex_type::scoped_lock l(this->my_mutex, false);
0411         return this->my_successors.size();
0412     }
0413 
0414     graph_task* try_put_task( const T &t ) override {
0415         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
0416         typename successors_type::iterator i = this->my_successors.begin();
0417         while ( i != this->my_successors.end() ) {
0418             graph_task* new_task = (*i)->try_put_task(t);
0419             if ( new_task ) {
0420                 return new_task;
0421             } else {
0422                if ( (*i)->register_predecessor(*this->my_owner) ) {
0423                    i = this->my_successors.erase(i);
0424                }
0425                else {
0426                    ++i;
0427                }
0428             }
0429         }
0430         return nullptr;
0431     }
0432 };
0433 
0434 #endif // __TBB__flow_graph_cache_impl_H