Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:49

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_flow_graph_streaming_H
0018 #define __TBB_flow_graph_streaming_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 #if __TBB_PREVIEW_STREAMING_NODE
0025 
0026 // Included in namespace tbb::flow::interfaceX (in flow_graph.h)
0027 
0028 namespace internal {
0029 
0030 template <int N1, int N2>
0031 struct port_ref_impl {
0032     // "+1" since the port_ref range is a closed interval (includes its endpoints).
0033     static const int size = N2 - N1 + 1;
0034 };
0035 
0036 } // internal
0037 
0038 // The purpose of the port_ref_impl is the pretty syntax: the deduction of a compile-time constant is processed from the return type.
0039 // So it is possible to use this helper without parentheses, e.g. "port_ref<0>".
0040 template <int N1, int N2 = N1>
0041 __TBB_DEPRECATED internal::port_ref_impl<N1,N2> port_ref() {
0042     return internal::port_ref_impl<N1,N2>();
0043 };
0044 
0045 namespace internal {
0046 
0047 template <typename T>
0048 struct num_arguments {
0049     static const int value = 1;
0050 };
0051 
0052 template <int N1, int N2>
0053 struct num_arguments<port_ref_impl<N1,N2>(*)()> {
0054     static const int value = port_ref_impl<N1,N2>::size;
0055 };
0056 
0057 template <int N1, int N2>
0058 struct num_arguments<port_ref_impl<N1,N2>> {
0059     static const int value = port_ref_impl<N1,N2>::size;
0060 };
0061 
0062 template <typename... Args>
0063 void ignore_return_values( Args&&... ) {}
0064 
0065 template <typename T>
0066 T or_return_values( T&& t ) { return t; }
0067 template <typename T, typename... Rest>
0068 T or_return_values( T&& t, Rest&&... rest ) {
0069     return t | or_return_values( std::forward<Rest>(rest)... );
0070 }
0071 
0072 template<typename JP>
0073 struct key_from_policy {
0074     typedef size_t type;
0075     typedef std::false_type is_key_matching;
0076 };
0077 
0078 template<typename Key>
0079 struct key_from_policy< key_matching<Key> > {
0080     typedef Key type;
0081     typedef std::true_type is_key_matching;
0082 };
0083 
0084 template<typename Key>
0085 struct key_from_policy< key_matching<Key&> > {
0086     typedef const Key &type;
0087     typedef std::true_type is_key_matching;
0088 };
0089 
0090 template<typename Device, typename Key>
0091 class streaming_device_with_key {
0092     Device my_device;
0093     typename std::decay<Key>::type my_key;
0094 public:
0095     // TODO: investigate why default constructor is required
0096     streaming_device_with_key() {}
0097     streaming_device_with_key( const Device& d, Key k ) : my_device( d ), my_key( k ) {}
0098     Key key() const { return my_key; }
0099     const Device& device() const { return my_device; }
0100 };
0101 
0102 // --------- Kernel argument helpers --------- //
0103 template <typename T>
0104 struct is_port_ref_impl {
0105     typedef std::false_type type;
0106 };
0107 
0108 template <int N1, int N2>
0109 struct is_port_ref_impl< port_ref_impl<N1, N2> > {
0110     typedef std::true_type type;
0111 };
0112 
0113 template <int N1, int N2>
0114 struct is_port_ref_impl< port_ref_impl<N1, N2>( * )()  > {
0115     typedef std::true_type type;
0116 };
0117 
0118 template <typename T>
0119 struct is_port_ref {
0120     typedef typename is_port_ref_impl< typename tbb::internal::strip<T>::type >::type type;
0121 };
0122 
0123 template <typename ...Args1>
0124 struct convert_and_call_impl;
0125 
0126 template <typename A1, typename ...Args1>
0127 struct convert_and_call_impl<A1, Args1...> {
0128     static const size_t my_delta = 1; // Index 0 contains device
0129 
0130     template <typename F, typename Tuple, typename ...Args2>
0131     static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
0132         convert_and_call_impl<A1, Args1...>::doit_impl(typename is_port_ref<A1>::type(), f, t, a1, args1..., args2...);
0133     }
0134     template <typename F, typename Tuple, typename ...Args2>
0135     static void doit_impl(std::false_type, F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
0136         convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., a1);
0137     }
0138     template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
0139     static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>, Args1&... args1, Args2&... args2) {
0140         convert_and_call_impl<port_ref_impl<N1 + 1,N2>, Args1...>::doit_impl(x, f, t, port_ref<N1 + 1, N2>(), args1...,
0141             args2..., std::get<N1 + my_delta>(t));
0142     }
0143     template <typename F, typename Tuple, int N, typename ...Args2>
0144     static void doit_impl(std::true_type, F& f, Tuple& t, port_ref_impl<N, N>, Args1&... args1, Args2&... args2) {
0145         convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., std::get<N + my_delta>(t));
0146     }
0147 
0148     template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
0149     static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>(* fn)(), Args1&... args1, Args2&... args2) {
0150         doit_impl(x, f, t, fn(), args1..., args2...);
0151     }
0152     template <typename F, typename Tuple, int N, typename ...Args2>
0153     static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N, N>(* fn)(), Args1&... args1, Args2&... args2) {
0154         doit_impl(x, f, t, fn(), args1..., args2...);
0155     }
0156 };
0157 
0158 template <>
0159 struct convert_and_call_impl<> {
0160     template <typename F, typename Tuple, typename ...Args2>
0161     static void doit(F& f, Tuple&, Args2&... args2) {
0162         f(args2...);
0163     }
0164 };
0165 // ------------------------------------------- //
0166 
0167 template<typename JP, typename StreamFactory, typename... Ports>
0168 struct streaming_node_traits {
0169     // Do not use 'using' instead of 'struct' because Microsoft Visual C++ 12.0 fails to compile.
0170     template <typename T>
0171     struct async_msg_type {
0172         typedef typename StreamFactory::template async_msg_type<T> type;
0173     };
0174 
0175     typedef tuple< typename async_msg_type<Ports>::type... > input_tuple;
0176     typedef input_tuple output_tuple;
0177     typedef tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy<JP>::type >,
0178         typename async_msg_type<Ports>::type... > kernel_input_tuple;
0179 
0180     // indexer_node parameters pack expansion workaround for VS2013 for streaming_node
0181     typedef indexer_node< typename async_msg_type<Ports>::type... > indexer_node_type;
0182 };
0183 
0184 // Default empty implementation
0185 template<typename StreamFactory, typename KernelInputTuple, typename = void>
0186 class kernel_executor_helper {
0187     typedef typename StreamFactory::device_type device_type;
0188     typedef typename StreamFactory::kernel_type kernel_type;
0189     typedef KernelInputTuple kernel_input_tuple;
0190 protected:
0191     template <typename ...Args>
0192     void enqueue_kernel_impl( kernel_input_tuple&, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
0193         factory.send_kernel( device, kernel, args... );
0194     }
0195 };
0196 
0197 // Implementation for StreamFactory supporting range
0198 template<typename StreamFactory, typename KernelInputTuple>
0199 class kernel_executor_helper<StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type > {
0200     typedef typename StreamFactory::device_type device_type;
0201     typedef typename StreamFactory::kernel_type kernel_type;
0202     typedef KernelInputTuple kernel_input_tuple;
0203 
0204     typedef typename StreamFactory::range_type range_type;
0205 
0206     // Container for randge. It can contain either port references or real range.
0207     struct range_wrapper {
0208         virtual range_type get_range( const kernel_input_tuple &ip ) const = 0;
0209         virtual range_wrapper *clone() const = 0;
0210         virtual ~range_wrapper() {}
0211     };
0212 
0213     struct range_value : public range_wrapper {
0214         range_value( const range_type& value ) : my_value(value) {}
0215 
0216         range_value( range_type&& value ) : my_value(std::move(value)) {}
0217 
0218         range_type get_range( const kernel_input_tuple & ) const __TBB_override {
0219             return my_value;
0220         }
0221 
0222         range_wrapper *clone() const __TBB_override {
0223             return new range_value(my_value);
0224         }
0225     private:
0226         range_type my_value;
0227     };
0228 
0229     template <int N>
0230     struct range_mapper : public range_wrapper {
0231         range_mapper() {}
0232 
0233         range_type get_range( const kernel_input_tuple &ip ) const __TBB_override {
0234             // "+1" since get<0>(ip) is StreamFactory::device.
0235             return get<N + 1>(ip).data(false);
0236         }
0237 
0238         range_wrapper *clone() const __TBB_override {
0239             return new range_mapper<N>;
0240         }
0241     };
0242 
0243 protected:
0244     template <typename ...Args>
0245     void enqueue_kernel_impl( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
0246         __TBB_ASSERT(my_range_wrapper, "Range is not set. Call set_range() before running streaming_node.");
0247         factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
0248     }
0249 
0250 public:
0251     kernel_executor_helper() : my_range_wrapper(NULL) {}
0252 
0253     kernel_executor_helper(const kernel_executor_helper& executor) : my_range_wrapper(executor.my_range_wrapper ? executor.my_range_wrapper->clone() : NULL) {}
0254 
0255     kernel_executor_helper(kernel_executor_helper&& executor) : my_range_wrapper(executor.my_range_wrapper) {
0256         // Set moving holder mappers to NULL to prevent double deallocation
0257         executor.my_range_wrapper = NULL;
0258     }
0259 
0260     ~kernel_executor_helper() {
0261         if (my_range_wrapper) delete my_range_wrapper;
0262     }
0263 
0264     void set_range(const range_type& work_size) {
0265         my_range_wrapper = new range_value(work_size);
0266     }
0267 
0268     void set_range(range_type&& work_size) {
0269         my_range_wrapper = new range_value(std::move(work_size));
0270     }
0271 
0272     template <int N>
0273     void set_range(port_ref_impl<N, N>) {
0274         my_range_wrapper = new range_mapper<N>;
0275     }
0276 
0277     template <int N>
0278     void set_range(port_ref_impl<N, N>(*)()) {
0279         my_range_wrapper = new range_mapper<N>;
0280     }
0281 
0282 private:
0283     range_wrapper* my_range_wrapper;
0284 };
0285 
0286 } // internal
0287 
0288 /*
0289 /---------------------------------------- streaming_node ------------------------------------\
0290 |                                                                                            |
0291 |   /--------------\   /----------------------\   /-----------\   /----------------------\   |
0292 |   |              |   |    (device_with_key) O---O           |   |                      |   |
0293 |   |              |   |                      |   |           |   |                      |   |
0294 O---O indexer_node O---O device_selector_node O---O join_node O---O      kernel_node     O---O
0295 |   |              |   | (multifunction_node) |   |           |   | (multifunction_node) |   |
0296 O---O              |   |                      O---O           |   |                      O---O
0297 |   \--------------/   \----------------------/   \-----------/   \----------------------/   |
0298 |                                                                                            |
0299 \--------------------------------------------------------------------------------------------/
0300 */
0301 template<typename... Args>
0302 class __TBB_DEPRECATED streaming_node;
0303 
0304 template<typename... Ports, typename JP, typename StreamFactory>
0305 class __TBB_DEPRECATED
0306 streaming_node< tuple<Ports...>, JP, StreamFactory >
0307     : public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
0308                               typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
0309     , public internal::kernel_executor_helper< StreamFactory, typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple >
0310 {
0311     typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple input_tuple;
0312     typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple output_tuple;
0313     typedef typename internal::key_from_policy<JP>::type key_type;
0314 protected:
0315     typedef typename StreamFactory::device_type device_type;
0316     typedef typename StreamFactory::kernel_type kernel_type;
0317 private:
0318     typedef internal::streaming_device_with_key<device_type, key_type> device_with_key_type;
0319     typedef composite_node<input_tuple, output_tuple> base_type;
0320     static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
0321     static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
0322 
0323     typedef typename internal::make_sequence<NUM_INPUTS>::type input_sequence;
0324     typedef typename internal::make_sequence<NUM_OUTPUTS>::type output_sequence;
0325 
0326     typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::indexer_node_type indexer_node_type;
0327     typedef typename indexer_node_type::output_type indexer_node_output_type;
0328     typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple kernel_input_tuple;
0329     typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
0330     typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
0331 
0332     template <int... S>
0333     typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
0334         return std::tie( internal::input_port<S>( my_indexer_node )... );
0335     }
0336 
0337     template <int... S>
0338     typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
0339         return std::tie( internal::output_port<S>( my_kernel_node )... );
0340     }
0341 
0342     typename base_type::input_ports_type get_input_ports() {
0343         return get_input_ports( input_sequence() );
0344     }
0345 
0346     typename base_type::output_ports_type get_output_ports() {
0347         return get_output_ports( output_sequence() );
0348     }
0349 
0350     template <int N>
0351     int make_Nth_edge() {
0352         make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
0353         return 0;
0354     }
0355 
0356     template <int... S>
0357     void make_edges( internal::sequence<S...> ) {
0358         make_edge( my_indexer_node, my_device_selector_node );
0359         make_edge( my_device_selector_node, my_join_node );
0360         internal::ignore_return_values( make_Nth_edge<S + 1>()... );
0361         make_edge( my_join_node, my_kernel_node );
0362     }
0363 
0364     void make_edges() {
0365         make_edges( input_sequence() );
0366     }
0367 
0368     class device_selector_base {
0369     public:
0370         virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
0371         virtual device_selector_base *clone( streaming_node &n ) const = 0;
0372         virtual ~device_selector_base() {}
0373     };
0374 
0375     template <typename UserFunctor>
0376     class device_selector : public device_selector_base, tbb::internal::no_assign {
0377     public:
0378         device_selector( UserFunctor uf, streaming_node &n, StreamFactory &f )
0379             : my_dispatch_funcs( create_dispatch_funcs( input_sequence() ) )
0380             , my_user_functor( uf ), my_node(n), my_factory( f )
0381         {
0382             my_port_epoches.fill( 0 );
0383         }
0384 
0385         void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) __TBB_override {
0386             (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
0387             __TBB_ASSERT( (tbb::internal::is_same_type<typename internal::key_from_policy<JP>::is_key_matching, std::false_type>::value)
0388                 || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
0389         }
0390 
0391         device_selector_base *clone( streaming_node &n ) const __TBB_override {
0392             return new device_selector( my_user_functor, n, my_factory );
0393         }
0394     private:
0395         typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
0396         typedef std::array < send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type;
0397 
0398         template <int... S>
0399         static dispatch_funcs_type create_dispatch_funcs( internal::sequence<S...> ) {
0400             dispatch_funcs_type dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
0401             return dispatch;
0402         }
0403 
0404         template <typename T>
0405         key_type get_key( std::false_type, const T &, size_t &epoch ) {
0406             __TBB_STATIC_ASSERT( (tbb::internal::is_same_type<key_type, size_t>::value), "" );
0407             return epoch++;
0408         }
0409 
0410         template <typename T>
0411         key_type get_key( std::true_type, const T &t, size_t &/*epoch*/ ) {
0412             using tbb::flow::key_from_message;
0413             return key_from_message<key_type>( t );
0414         }
0415 
0416         template <int N>
0417         void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
0418             typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
0419             elem_type e = internal::cast_to<elem_type>( v );
0420             device_type device = get_device( get_key( typename internal::key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
0421             my_factory.send_data( device, e );
0422             get<N + 1>( op ).try_put( e );
0423         }
0424 
0425         template< typename DevicePort >
0426         device_type get_device( key_type key, DevicePort& dp ) {
0427             typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
0428             if ( it == my_devices.end() ) {
0429                 device_type d = my_user_functor( my_factory );
0430                 std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
0431                 bool res = dp.try_put( device_with_key_type( d, key ) );
0432                 __TBB_ASSERT_EX( res, NULL );
0433                 my_node.notify_new_device( d );
0434             }
0435             epoch_desc &e = it->second;
0436             device_type d = e.my_device;
0437             if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
0438             return d;
0439         }
0440 
0441         struct epoch_desc {
0442             epoch_desc(device_type d ) : my_device( d ), my_request_number( 0 ) {}
0443             device_type my_device;
0444             size_t my_request_number;
0445         };
0446 
0447         std::unordered_map<typename std::decay<key_type>::type, epoch_desc> my_devices;
0448         std::array<size_t, NUM_INPUTS> my_port_epoches;
0449         dispatch_funcs_type my_dispatch_funcs;
0450         UserFunctor my_user_functor;
0451         streaming_node &my_node;
0452         StreamFactory &my_factory;
0453     };
0454 
0455     class device_selector_body {
0456     public:
0457         device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
0458 
0459         void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
0460             (*my_device_selector)(v, op);
0461         }
0462     private:
0463         device_selector_base *my_device_selector;
0464     };
0465 
0466     // TODO: investigate why copy-construction is disallowed
0467     class args_storage_base : tbb::internal::no_copy {
0468     public:
0469         typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
0470 
0471         virtual void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) = 0;
0472         virtual void send( device_type d ) = 0;
0473         virtual args_storage_base *clone() const = 0;
0474         virtual ~args_storage_base () {}
0475 
0476     protected:
0477         args_storage_base( const kernel_type& kernel, StreamFactory &f )
0478             : my_kernel( kernel ), my_factory( f )
0479         {}
0480 
0481         args_storage_base( const args_storage_base &k )
0482             : tbb::internal::no_copy(), my_kernel( k.my_kernel ), my_factory( k.my_factory )
0483         {}
0484 
0485         const kernel_type my_kernel;
0486         StreamFactory &my_factory;
0487     };
0488 
0489     template <typename... Args>
0490     class args_storage : public args_storage_base {
0491         typedef typename args_storage_base::output_ports_type output_ports_type;
0492 
0493         // ---------- Update events helpers ---------- //
0494         template <int N>
0495         bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op ) const {
0496             const auto& t = get<N + 1>( ip );
0497             auto &port = get<N>( op );
0498             return port.try_put( t );
0499         }
0500 
0501         template <int... S>
0502         bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op, internal::sequence<S...> ) const {
0503             return internal::or_return_values( do_try_put<S>( ip, op )... );
0504         }
0505 
0506         // ------------------------------------------- //
0507         class run_kernel_func : tbb::internal::no_assign {
0508         public:
0509             run_kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage )
0510                 : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
0511 
0512             // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
0513             // Allow the compiler to deduce types for function pointers automatically.
0514             template <typename... FnArgs>
0515             void operator()( FnArgs&... args ) {
0516                 internal::convert_and_call_impl<FnArgs...>::doit( my_kernel_func, my_kernel_func.my_ip, args... );
0517             }
0518         private:
0519             struct kernel_func : tbb::internal::no_copy {
0520                 kernel_input_tuple &my_ip;
0521                 const streaming_node &my_node;
0522                 const args_storage& my_storage;
0523                 device_type my_device;
0524 
0525                 kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage, device_type device )
0526                     : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
0527                 {}
0528 
0529                 template <typename... FnArgs>
0530                 void operator()( FnArgs&... args ) {
0531                     my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
0532                 }
0533             } my_kernel_func;
0534         };
0535 
0536         template<typename FinalizeFn>
0537         class run_finalize_func : tbb::internal::no_assign {
0538         public:
0539             run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn )
0540                 : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(), fn ) {}
0541 
0542             // It is immpossible to use Args... because a function pointer cannot be casted to a function reference implicitly.
0543             // Allow the compiler to deduce types for function pointers automatically.
0544             template <typename... FnArgs>
0545             void operator()( FnArgs&... args ) {
0546                 internal::convert_and_call_impl<FnArgs...>::doit( my_finalize_func, my_ip, args... );
0547             }
0548         private:
0549             kernel_input_tuple &my_ip;
0550 
0551             struct finalize_func : tbb::internal::no_assign {
0552                 StreamFactory &my_factory;
0553                 device_type my_device;
0554                 FinalizeFn my_fn;
0555 
0556                 finalize_func( StreamFactory &factory, device_type device, FinalizeFn fn )
0557                     : my_factory(factory), my_device(device), my_fn(fn) {}
0558 
0559                 template <typename... FnArgs>
0560                 void operator()( FnArgs&... args ) {
0561                     my_factory.finalize( my_device, my_fn, args... );
0562                 }
0563             } my_finalize_func;
0564         };
0565 
0566         template<typename FinalizeFn>
0567         static run_finalize_func<FinalizeFn> make_run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn ) {
0568             return run_finalize_func<FinalizeFn>( ip, factory, fn );
0569         }
0570 
0571         class send_func : tbb::internal::no_assign {
0572         public:
0573             send_func( StreamFactory &factory, device_type d )
0574                 : my_factory(factory), my_device( d ) {}
0575 
0576             template <typename... FnArgs>
0577             void operator()( FnArgs&... args ) {
0578                 my_factory.send_data( my_device, args... );
0579             }
0580         private:
0581             StreamFactory &my_factory;
0582             device_type my_device;
0583         };
0584 
0585     public:
0586         args_storage( const kernel_type& kernel, StreamFactory &f, Args&&... args )
0587             : args_storage_base( kernel, f )
0588             , my_args_pack( std::forward<Args>(args)... )
0589         {}
0590 
0591         args_storage( const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
0592 
0593         args_storage( const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
0594 
0595         void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) __TBB_override {
0596             // Make const qualified args_pack (from non-const)
0597             const args_pack_type& const_args_pack = my_args_pack;
0598             // factory.enqure_kernel() gets
0599             //  - 'ip' tuple elements by reference and updates it (and 'ip') with dependencies
0600             //  - arguments (from my_args_pack) by const-reference via const_args_pack
0601             tbb::internal::call( run_kernel_func( ip, n, *this ), const_args_pack );
0602 
0603             if (! do_try_put( ip, op, input_sequence() ) ) {
0604                 graph& g = n.my_graph;
0605                 // No one message was passed to successors so set a callback to extend the graph lifetime until the kernel completion.
0606                 g.increment_wait_count();
0607 
0608                 // factory.finalize() gets
0609                 //  - 'ip' tuple elements by reference, so 'ip' might be changed
0610                 //  - arguments (from my_args_pack) by const-reference via const_args_pack
0611                 tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
0612                     g.decrement_wait_count();
0613                 }), const_args_pack );
0614             }
0615         }
0616 
0617         void send( device_type d ) __TBB_override {
0618             // factory.send() gets arguments by reference and updates these arguments with dependencies
0619             // (it gets but usually ignores port_ref-s)
0620             tbb::internal::call( send_func( this->my_factory, d ), my_args_pack );
0621         }
0622 
0623         args_storage_base *clone() const __TBB_override {
0624             // Create new args_storage with copying constructor.
0625             return new args_storage<Args...>( *this );
0626         }
0627 
0628     private:
0629         typedef tbb::internal::stored_pack<Args...> args_pack_type;
0630         args_pack_type my_args_pack;
0631     };
0632 
0633     // Body for kernel_multifunction_node.
0634     class kernel_body : tbb::internal::no_assign {
0635     public:
0636         kernel_body( const streaming_node &node ) : my_node( node ) {}
0637 
0638         void operator()( kernel_input_tuple ip, typename args_storage_base::output_ports_type &op ) {
0639             __TBB_ASSERT( (my_node.my_args_storage != NULL), "No arguments storage" );
0640             // 'ip' is passed by value to create local copy for updating inside enqueue_kernel()
0641             my_node.my_args_storage->enqueue( ip, op, my_node );
0642         }
0643     private:
0644         const streaming_node &my_node;
0645     };
0646 
0647     template <typename T, typename U = typename internal::is_port_ref<T>::type >
0648     struct wrap_to_async {
0649         typedef T type; // Keep port_ref as it is
0650     };
0651 
0652     template <typename T>
0653     struct wrap_to_async<T, std::false_type> {
0654         typedef typename StreamFactory::template async_msg_type< typename tbb::internal::strip<T>::type > type;
0655     };
0656 
0657     template <typename... Args>
0658     args_storage_base *make_args_storage(const args_storage_base& storage, Args&&... args) const {
0659         // In this variadic template convert all simple types 'T' into 'async_msg_type<T>'
0660         return new args_storage<Args...>(storage, std::forward<Args>(args)...);
0661     }
0662 
0663     void notify_new_device( device_type d ) {
0664         my_args_storage->send( d );
0665     }
0666 
0667     template <typename ...Args>
0668     void enqueue_kernel( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
0669         this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
0670     }
0671 
0672 public:
0673     template <typename DeviceSelector>
0674     streaming_node( graph &g, const kernel_type& kernel, DeviceSelector d, StreamFactory &f )
0675         : base_type( g )
0676         , my_indexer_node( g )
0677         , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
0678         , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
0679         , my_join_node( g )
0680         , my_kernel_node( g, serial, kernel_body( *this ) )
0681         // By default, streaming_node maps all its ports to the kernel arguments on a one-to-one basis.
0682         , my_args_storage( make_args_storage( args_storage<>(kernel, f), port_ref<0, NUM_INPUTS - 1>() ) )
0683     {
0684         base_type::set_external_ports( get_input_ports(), get_output_ports() );
0685         make_edges();
0686     }
0687 
0688     streaming_node( const streaming_node &node )
0689         : base_type( node.my_graph )
0690         , my_indexer_node( node.my_indexer_node )
0691         , my_device_selector( node.my_device_selector->clone( *this ) )
0692         , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
0693         , my_join_node( node.my_join_node )
0694         , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
0695         , my_args_storage( node.my_args_storage->clone() )
0696     {
0697         base_type::set_external_ports( get_input_ports(), get_output_ports() );
0698         make_edges();
0699     }
0700 
0701     streaming_node( streaming_node &&node )
0702         : base_type( node.my_graph )
0703         , my_indexer_node( std::move( node.my_indexer_node ) )
0704         , my_device_selector( node.my_device_selector->clone(*this) )
0705         , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
0706         , my_join_node( std::move( node.my_join_node ) )
0707         , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
0708         , my_args_storage( node.my_args_storage )
0709     {
0710         base_type::set_external_ports( get_input_ports(), get_output_ports() );
0711         make_edges();
0712         // Set moving node mappers to NULL to prevent double deallocation.
0713         node.my_args_storage = NULL;
0714     }
0715 
0716     ~streaming_node() {
0717         if ( my_args_storage ) delete my_args_storage;
0718         if ( my_device_selector ) delete my_device_selector;
0719     }
0720 
0721     template <typename... Args>
0722     void set_args( Args&&... args ) {
0723         // Copy the base class of args_storage and create new storage for "Args...".
0724         args_storage_base * const new_args_storage = make_args_storage( *my_args_storage, typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
0725         delete my_args_storage;
0726         my_args_storage = new_args_storage;
0727     }
0728 
0729 protected:
0730     void reset_node( reset_flags = rf_reset_protocol ) __TBB_override { __TBB_ASSERT( false, "Not implemented yet" ); }
0731 
0732 private:
0733     indexer_node_type my_indexer_node;
0734     device_selector_base *my_device_selector;
0735     device_selector_node my_device_selector_node;
0736     join_node<kernel_input_tuple, JP> my_join_node;
0737     kernel_multifunction_node my_kernel_node;
0738 
0739     args_storage_base *my_args_storage;
0740 };
0741 
0742 #endif // __TBB_PREVIEW_STREAMING_NODE
0743 #endif // __TBB_flow_graph_streaming_H