Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:46

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_async_msg_impl_H
0018 #define __TBB__flow_graph_async_msg_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 namespace internal {
0025 
0026 template <typename T>
0027 class async_storage {
0028 public:
0029     typedef receiver<T> async_storage_client;
0030 
0031     async_storage() : my_graph(nullptr) {
0032         my_data_ready.store<tbb::relaxed>(false);
0033     }
0034 
0035     ~async_storage() {
0036         // Release reference to the graph if async_storage
0037         // was destructed before set() call
0038         if (my_graph) {
0039             my_graph->release_wait();
0040             my_graph = nullptr;
0041         }
0042     }
0043 
0044     template<typename C>
0045     async_storage(C&& data) : my_graph(nullptr), my_data( std::forward<C>(data) ) {
0046         using namespace tbb::internal;
0047         __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
0048 
0049         my_data_ready.store<tbb::relaxed>(true);
0050     }
0051 
0052     template<typename C>
0053     bool set(C&& data) {
0054         using namespace tbb::internal;
0055         __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
0056 
0057         {
0058             tbb::spin_mutex::scoped_lock locker(my_mutex);
0059 
0060             if (my_data_ready.load<tbb::relaxed>()) {
0061                 __TBB_ASSERT(false, "double set() call");
0062                 return false;
0063             }
0064 
0065             my_data = std::forward<C>(data);
0066             my_data_ready.store<tbb::release>(true);
0067         }
0068 
0069         // Thread sync is on my_data_ready flag
0070         for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
0071             (*it)->try_put(my_data);
0072         }
0073 
0074         // Data was sent, release reference to the graph
0075         if (my_graph) {
0076             my_graph->release_wait();
0077             my_graph = nullptr;
0078         }
0079 
0080         return true;
0081     }
0082 
0083     task* subscribe(async_storage_client& client, graph& g) {
0084         if (! my_data_ready.load<tbb::acquire>())
0085         {
0086             tbb::spin_mutex::scoped_lock locker(my_mutex);
0087 
0088             if (! my_data_ready.load<tbb::relaxed>()) {
0089 #if TBB_USE_ASSERT
0090                 for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
0091                     __TBB_ASSERT(*it != &client, "unexpected double subscription");
0092                 }
0093 #endif // TBB_USE_ASSERT
0094 
0095                 // Increase graph lifetime
0096                 my_graph = &g;
0097                 my_graph->reserve_wait();
0098 
0099                 // Subscribe
0100                 my_clients.push_back(&client);
0101                 return SUCCESSFULLY_ENQUEUED;
0102             }
0103         }
0104 
0105         __TBB_ASSERT(my_data_ready.load<tbb::relaxed>(), "data is NOT ready");
0106         return client.try_put_task(my_data);
0107     }
0108 
0109 private:
0110     graph* my_graph;
0111     tbb::spin_mutex my_mutex;
0112     tbb::atomic<bool> my_data_ready;
0113     T my_data;
0114     typedef std::vector<async_storage_client*> subscriber_list_type;
0115     subscriber_list_type my_clients;
0116 };
0117 
0118 } // namespace internal
0119 
0120 template <typename T>
0121 class __TBB_DEPRECATED async_msg {
0122     template< typename > friend class receiver;
0123     template< typename, typename > friend struct internal::async_helpers;
0124 public:
0125     typedef T async_msg_data_type;
0126 
0127     async_msg() : my_storage(std::make_shared< internal::async_storage<T> >()) {}
0128 
0129     async_msg(const T& t) : my_storage(std::make_shared< internal::async_storage<T> >(t)) {}
0130 
0131     async_msg(T&& t) : my_storage(std::make_shared< internal::async_storage<T> >( std::move(t) )) {}
0132 
0133     virtual ~async_msg() {}
0134 
0135     void set(const T& t) {
0136         my_storage->set(t);
0137     }
0138 
0139     void set(T&& t) {
0140         my_storage->set( std::move(t) );
0141     }
0142 
0143 protected:
0144     // Can be overridden in derived class to inform that 
0145     // async calculation chain is over
0146     virtual void finalize() const {}
0147 
0148 private:
0149     typedef std::shared_ptr< internal::async_storage<T> > async_storage_ptr;
0150     async_storage_ptr my_storage;
0151 };
0152 
0153 #endif  // __TBB__flow_graph_async_msg_impl_H