File indexing completed on 2025-01-18 10:12:46
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_async_msg_impl_H
0018 #define __TBB__flow_graph_async_msg_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024 namespace internal {
0025
0026 template <typename T>
0027 class async_storage {
0028 public:
0029 typedef receiver<T> async_storage_client;
0030
0031 async_storage() : my_graph(nullptr) {
0032 my_data_ready.store<tbb::relaxed>(false);
0033 }
0034
0035 ~async_storage() {
0036
0037
0038 if (my_graph) {
0039 my_graph->release_wait();
0040 my_graph = nullptr;
0041 }
0042 }
0043
0044 template<typename C>
0045 async_storage(C&& data) : my_graph(nullptr), my_data( std::forward<C>(data) ) {
0046 using namespace tbb::internal;
0047 __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
0048
0049 my_data_ready.store<tbb::relaxed>(true);
0050 }
0051
0052 template<typename C>
0053 bool set(C&& data) {
0054 using namespace tbb::internal;
0055 __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
0056
0057 {
0058 tbb::spin_mutex::scoped_lock locker(my_mutex);
0059
0060 if (my_data_ready.load<tbb::relaxed>()) {
0061 __TBB_ASSERT(false, "double set() call");
0062 return false;
0063 }
0064
0065 my_data = std::forward<C>(data);
0066 my_data_ready.store<tbb::release>(true);
0067 }
0068
0069
0070 for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
0071 (*it)->try_put(my_data);
0072 }
0073
0074
0075 if (my_graph) {
0076 my_graph->release_wait();
0077 my_graph = nullptr;
0078 }
0079
0080 return true;
0081 }
0082
0083 task* subscribe(async_storage_client& client, graph& g) {
0084 if (! my_data_ready.load<tbb::acquire>())
0085 {
0086 tbb::spin_mutex::scoped_lock locker(my_mutex);
0087
0088 if (! my_data_ready.load<tbb::relaxed>()) {
0089 #if TBB_USE_ASSERT
0090 for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
0091 __TBB_ASSERT(*it != &client, "unexpected double subscription");
0092 }
0093 #endif
0094
0095
0096 my_graph = &g;
0097 my_graph->reserve_wait();
0098
0099
0100 my_clients.push_back(&client);
0101 return SUCCESSFULLY_ENQUEUED;
0102 }
0103 }
0104
0105 __TBB_ASSERT(my_data_ready.load<tbb::relaxed>(), "data is NOT ready");
0106 return client.try_put_task(my_data);
0107 }
0108
0109 private:
0110 graph* my_graph;
0111 tbb::spin_mutex my_mutex;
0112 tbb::atomic<bool> my_data_ready;
0113 T my_data;
0114 typedef std::vector<async_storage_client*> subscriber_list_type;
0115 subscriber_list_type my_clients;
0116 };
0117
0118 }
0119
0120 template <typename T>
0121 class __TBB_DEPRECATED async_msg {
0122 template< typename > friend class receiver;
0123 template< typename, typename > friend struct internal::async_helpers;
0124 public:
0125 typedef T async_msg_data_type;
0126
0127 async_msg() : my_storage(std::make_shared< internal::async_storage<T> >()) {}
0128
0129 async_msg(const T& t) : my_storage(std::make_shared< internal::async_storage<T> >(t)) {}
0130
0131 async_msg(T&& t) : my_storage(std::make_shared< internal::async_storage<T> >( std::move(t) )) {}
0132
0133 virtual ~async_msg() {}
0134
0135 void set(const T& t) {
0136 my_storage->set(t);
0137 }
0138
0139 void set(T&& t) {
0140 my_storage->set( std::move(t) );
0141 }
0142
0143 protected:
0144
0145
0146 virtual void finalize() const {}
0147
0148 private:
0149 typedef std::shared_ptr< internal::async_storage<T> > async_storage_ptr;
0150 async_storage_ptr my_storage;
0151 };
0152
0153 #endif