Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-18 10:24:16

0001 /*
0002     Copyright (c) 2005-2024 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_cache_impl_H
0018 #define __TBB__flow_graph_cache_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 // included in namespace tbb::detail::d2 (in flow_graph.h)
0025 
0026 //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock.
0027 template< typename T, typename M=spin_mutex >
0028 class node_cache {
0029     public:
0030 
0031     typedef size_t size_type;
0032 
0033     bool empty() {
0034         typename mutex_type::scoped_lock lock( my_mutex );
0035         return internal_empty();
0036     }
0037 
0038     void add( T &n ) {
0039         typename mutex_type::scoped_lock lock( my_mutex );
0040         internal_push(n);
0041     }
0042 
0043     void remove( T &n ) {
0044         typename mutex_type::scoped_lock lock( my_mutex );
0045         for ( size_t i = internal_size(); i != 0; --i ) {
0046             T &s = internal_pop();
0047             if ( &s == &n )
0048                 break;  // only remove one predecessor per request
0049             internal_push(s);
0050         }
0051     }
0052 
0053     void clear() {
0054         while( !my_q.empty()) (void)my_q.pop();
0055     }
0056 
0057 protected:
0058 
0059     typedef M mutex_type;
0060     mutex_type my_mutex;
0061     std::queue< T * > my_q;
0062 
0063     // Assumes lock is held
0064     inline bool internal_empty( )  {
0065         return my_q.empty();
0066     }
0067 
0068     // Assumes lock is held
0069     inline size_type internal_size( )  {
0070         return my_q.size();
0071     }
0072 
0073     // Assumes lock is held
0074     inline void internal_push( T &n )  {
0075         my_q.push(&n);
0076     }
0077 
0078     // Assumes lock is held
0079     inline T &internal_pop() {
0080         T *v = my_q.front();
0081         my_q.pop();
0082         return *v;
0083     }
0084 
0085 };
0086 
0087 //! A cache of predecessors that only supports try_get
0088 template< typename T, typename M=spin_mutex >
0089 class predecessor_cache : public node_cache< sender<T>, M > {
0090 public:
0091     typedef M mutex_type;
0092     typedef T output_type;
0093     typedef sender<output_type> predecessor_type;
0094     typedef receiver<output_type> successor_type;
0095 
0096     predecessor_cache( successor_type* owner ) : my_owner( owner ) {
0097         __TBB_ASSERT( my_owner, "predecessor_cache should have an owner." );
0098         // Do not work with the passed pointer here as it may not be fully initialized yet
0099     }
0100 
0101 private:
0102     bool get_item_impl( output_type& v
0103                         __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo_ptr = nullptr) )
0104     {
0105 
0106         bool successful_get = false;
0107 
0108         do {
0109             predecessor_type *src;
0110             {
0111                 typename mutex_type::scoped_lock lock(this->my_mutex);
0112                 if ( this->internal_empty() ) {
0113                     break;
0114                 }
0115                 src = &this->internal_pop();
0116             }
0117 
0118             // Try to get from this sender
0119 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0120             if (metainfo_ptr) {
0121                 successful_get = src->try_get( v, *metainfo_ptr );
0122             } else
0123 #endif
0124             {
0125                 successful_get = src->try_get( v );
0126             }
0127 
0128             if (successful_get == false) {
0129                 // Relinquish ownership of the edge
0130                 register_successor(*src, *my_owner);
0131             } else {
0132                 // Retain ownership of the edge
0133                 this->add(*src);
0134             }
0135         } while ( successful_get == false );
0136         return successful_get;
0137     }
0138 public:
0139     bool get_item( output_type& v ) {
0140         return get_item_impl(v);
0141     }
0142 
0143 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0144     bool get_item( output_type& v, message_metainfo& metainfo ) {
0145         return get_item_impl(v, &metainfo);
0146     }
0147 #endif
0148 
0149     // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
0150     void reset() {
0151         for(;;) {
0152             predecessor_type *src;
0153             {
0154                 if (this->internal_empty()) break;
0155                 src = &this->internal_pop();
0156             }
0157             register_successor(*src, *my_owner);
0158         }
0159     }
0160 
0161 protected:
0162     successor_type* my_owner;
0163 };
0164 
0165 //! An cache of predecessors that supports requests and reservations
0166 template< typename T, typename M=spin_mutex >
0167 class reservable_predecessor_cache : public predecessor_cache< T, M > {
0168 public:
0169     typedef M mutex_type;
0170     typedef T output_type;
0171     typedef sender<T> predecessor_type;
0172     typedef receiver<T> successor_type;
0173 
0174     reservable_predecessor_cache( successor_type* owner )
0175         : predecessor_cache<T,M>(owner), reserved_src(nullptr)
0176     {
0177         // Do not work with the passed pointer here as it may not be fully initialized yet
0178     }
0179 
0180 private:
0181     bool try_reserve_impl( output_type &v __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo* metainfo) ) {
0182         bool successful_reserve = false;
0183 
0184         do {
0185             predecessor_type* pred = nullptr;
0186             {
0187                 typename mutex_type::scoped_lock lock(this->my_mutex);
0188                 if ( reserved_src.load(std::memory_order_relaxed) || this->internal_empty() )
0189                     return false;
0190 
0191                 pred = &this->internal_pop();
0192                 reserved_src.store(pred, std::memory_order_relaxed);
0193             }
0194 
0195             // Try to get from this sender
0196 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0197             if (metainfo) {
0198                 successful_reserve = pred->try_reserve( v, *metainfo );
0199             } else
0200 #endif
0201             {
0202                 successful_reserve = pred->try_reserve( v );
0203             }
0204 
0205             if (successful_reserve == false) {
0206                 typename mutex_type::scoped_lock lock(this->my_mutex);
0207                 // Relinquish ownership of the edge
0208                 register_successor( *pred, *this->my_owner );
0209                 reserved_src.store(nullptr, std::memory_order_relaxed);
0210             } else {
0211                 // Retain ownership of the edge
0212                 this->add( *pred);
0213             }
0214         } while ( successful_reserve == false );
0215 
0216         return successful_reserve;
0217     }
0218 public:
0219     bool try_reserve( output_type& v ) {
0220         return try_reserve_impl(v __TBB_FLOW_GRAPH_METAINFO_ARG(nullptr));
0221     }
0222 
0223 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0224     bool try_reserve( output_type& v, message_metainfo& metainfo ) {
0225         return try_reserve_impl(v, &metainfo);
0226     }
0227 #endif
0228 
0229     bool try_release() {
0230         reserved_src.load(std::memory_order_relaxed)->try_release();
0231         reserved_src.store(nullptr, std::memory_order_relaxed);
0232         return true;
0233     }
0234 
0235     bool try_consume() {
0236         reserved_src.load(std::memory_order_relaxed)->try_consume();
0237         reserved_src.store(nullptr, std::memory_order_relaxed);
0238         return true;
0239     }
0240 
0241     void reset() {
0242         reserved_src.store(nullptr, std::memory_order_relaxed);
0243         predecessor_cache<T, M>::reset();
0244     }
0245 
0246     void clear() {
0247         reserved_src.store(nullptr, std::memory_order_relaxed);
0248         predecessor_cache<T, M>::clear();
0249     }
0250 
0251 private:
0252     std::atomic<predecessor_type*> reserved_src;
0253 };
0254 
0255 
0256 //! An abstract cache of successors
0257 template<typename T, typename M=spin_rw_mutex >
0258 class successor_cache : no_copy {
0259 protected:
0260 
0261     typedef M mutex_type;
0262     mutex_type my_mutex;
0263 
0264     typedef receiver<T> successor_type;
0265     typedef receiver<T>* pointer_type;
0266     typedef sender<T> owner_type;
0267     // TODO revamp: introduce heapified collection of successors for strict priorities
0268     typedef std::list< pointer_type > successors_type;
0269     successors_type my_successors;
0270 
0271     owner_type* my_owner;
0272 
0273 public:
0274     successor_cache( owner_type* owner ) : my_owner(owner) {
0275         // Do not work with the passed pointer here as it may not be fully initialized yet
0276     }
0277 
0278     virtual ~successor_cache() {}
0279 
0280     void register_successor( successor_type& r ) {
0281         typename mutex_type::scoped_lock l(my_mutex, true);
0282         if( r.priority() != no_priority )
0283             my_successors.push_front( &r );
0284         else
0285             my_successors.push_back( &r );
0286     }
0287 
0288     void remove_successor( successor_type& r ) {
0289         typename mutex_type::scoped_lock l(my_mutex, true);
0290         for ( typename successors_type::iterator i = my_successors.begin();
0291               i != my_successors.end(); ++i ) {
0292             if ( *i == & r ) {
0293                 my_successors.erase(i);
0294                 break;
0295             }
0296         }
0297     }
0298 
0299     bool empty() {
0300         typename mutex_type::scoped_lock l(my_mutex, false);
0301         return my_successors.empty();
0302     }
0303 
0304     void clear() {
0305         my_successors.clear();
0306     }
0307 
0308     virtual graph_task* try_put_task( const T& t ) = 0;
0309 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0310     virtual graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) = 0;
0311 #endif
0312 };  // successor_cache<T>
0313 
0314 //! An abstract cache of successors, specialized to continue_msg
0315 template<typename M>
0316 class successor_cache< continue_msg, M > : no_copy {
0317 protected:
0318 
0319     typedef M mutex_type;
0320     mutex_type my_mutex;
0321 
0322     typedef receiver<continue_msg> successor_type;
0323     typedef receiver<continue_msg>* pointer_type;
0324     typedef sender<continue_msg> owner_type;
0325     typedef std::list< pointer_type > successors_type;
0326     successors_type my_successors;
0327     owner_type* my_owner;
0328 
0329 public:
0330     successor_cache( sender<continue_msg>* owner ) : my_owner(owner) {
0331         // Do not work with the passed pointer here as it may not be fully initialized yet
0332     }
0333 
0334     virtual ~successor_cache() {}
0335 
0336     void register_successor( successor_type& r ) {
0337         typename mutex_type::scoped_lock l(my_mutex, true);
0338         if( r.priority() != no_priority )
0339             my_successors.push_front( &r );
0340         else
0341             my_successors.push_back( &r );
0342         __TBB_ASSERT( my_owner, "Cache of successors must have an owner." );
0343         if ( r.is_continue_receiver() ) {
0344             r.register_predecessor( *my_owner );
0345         }
0346     }
0347 
0348     void remove_successor( successor_type& r ) {
0349         typename mutex_type::scoped_lock l(my_mutex, true);
0350         for ( successors_type::iterator i = my_successors.begin(); i != my_successors.end(); ++i ) {
0351             if ( *i == &r ) {
0352                 __TBB_ASSERT(my_owner, "Cache of successors must have an owner.");
0353                 // TODO: check if we need to test for continue_receiver before removing from r.
0354                 r.remove_predecessor( *my_owner );
0355                 my_successors.erase(i);
0356                 break;
0357             }
0358         }
0359     }
0360 
0361     bool empty() {
0362         typename mutex_type::scoped_lock l(my_mutex, false);
0363         return my_successors.empty();
0364     }
0365 
0366     void clear() {
0367         my_successors.clear();
0368     }
0369 
0370     virtual graph_task* try_put_task( const continue_msg& t ) = 0;
0371 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0372     virtual graph_task* try_put_task( const continue_msg& t, const message_metainfo& metainfo ) = 0;
0373 #endif
0374 };  // successor_cache< continue_msg >
0375 
0376 //! A cache of successors that are broadcast to
0377 template<typename T, typename M=spin_rw_mutex>
0378 class broadcast_cache : public successor_cache<T, M> {
0379     typedef successor_cache<T, M> base_type;
0380     typedef M mutex_type;
0381     typedef typename successor_cache<T,M>::successors_type successors_type;
0382 
0383     graph_task* try_put_task_impl( const T& t __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) ) {
0384         graph_task * last_task = nullptr;
0385         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
0386         typename successors_type::iterator i = this->my_successors.begin();
0387         while ( i != this->my_successors.end() ) {
0388             graph_task *new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0389             // workaround for icc bug
0390             graph& graph_ref = (*i)->graph_reference();
0391             last_task = combine_tasks(graph_ref, last_task, new_task);  // enqueue if necessary
0392             if(new_task) {
0393                 ++i;
0394             }
0395             else {  // failed
0396                 if ( (*i)->register_predecessor(*this->my_owner) ) {
0397                     i = this->my_successors.erase(i);
0398                 } else {
0399                     ++i;
0400                 }
0401             }
0402         }
0403         return last_task;
0404     }
0405 public:
0406 
0407     broadcast_cache( typename base_type::owner_type* owner ): base_type(owner) {
0408         // Do not work with the passed pointer here as it may not be fully initialized yet
0409     }
0410 
0411     graph_task* try_put_task( const T &t ) override {
0412         return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0413     }
0414 
0415 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0416     graph_task* try_put_task( const T &t, const message_metainfo& metainfo ) override {
0417         return try_put_task_impl(t, metainfo);
0418     }
0419 #endif
0420 
0421     // call try_put_task and return list of received tasks
0422     bool gather_successful_try_puts( const T &t, graph_task_list& tasks ) {
0423         bool is_at_least_one_put_successful = false;
0424         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
0425         typename successors_type::iterator i = this->my_successors.begin();
0426         while ( i != this->my_successors.end() ) {
0427             graph_task * new_task = (*i)->try_put_task(t);
0428             if(new_task) {
0429                 ++i;
0430                 if(new_task != SUCCESSFULLY_ENQUEUED) {
0431                     tasks.push_back(*new_task);
0432                 }
0433                 is_at_least_one_put_successful = true;
0434             }
0435             else {  // failed
0436                 if ( (*i)->register_predecessor(*this->my_owner) ) {
0437                     i = this->my_successors.erase(i);
0438                 } else {
0439                     ++i;
0440                 }
0441             }
0442         }
0443         return is_at_least_one_put_successful;
0444     }
0445 };
0446 
0447 //! A cache of successors that are put in a round-robin fashion
0448 template<typename T, typename M=spin_rw_mutex >
0449 class round_robin_cache : public successor_cache<T, M> {
0450     typedef successor_cache<T, M> base_type;
0451     typedef size_t size_type;
0452     typedef M mutex_type;
0453     typedef typename successor_cache<T,M>::successors_type successors_type;
0454 
0455 public:
0456 
0457     round_robin_cache( typename base_type::owner_type* owner ): base_type(owner) {
0458         // Do not work with the passed pointer here as it may not be fully initialized yet
0459     }
0460 
0461     size_type size() {
0462         typename mutex_type::scoped_lock l(this->my_mutex, false);
0463         return this->my_successors.size();
0464     }
0465 
0466 private:
0467 
0468     graph_task* try_put_task_impl( const T &t
0469                                    __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo) )
0470     {
0471         typename mutex_type::scoped_lock l(this->my_mutex, /*write=*/true);
0472         typename successors_type::iterator i = this->my_successors.begin();
0473         while ( i != this->my_successors.end() ) {
0474             graph_task* new_task = (*i)->try_put_task(t __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0475             if ( new_task ) {
0476                 return new_task;
0477             } else {
0478                if ( (*i)->register_predecessor(*this->my_owner) ) {
0479                    i = this->my_successors.erase(i);
0480                }
0481                else {
0482                    ++i;
0483                }
0484             }
0485         }
0486         return nullptr;
0487     }
0488 
0489 public:
0490     graph_task* try_put_task(const T& t) override {
0491         return try_put_task_impl(t __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo{}));
0492     }
0493 
0494 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0495     graph_task* try_put_task( const T& t, const message_metainfo& metainfo ) override {
0496         return try_put_task_impl(t, metainfo);
0497     }
0498 #endif
0499 };
0500 
0501 #endif // __TBB__flow_graph_cache_impl_H