File indexing completed on 2025-01-18 10:12:49
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_flow_graph_streaming_H
0018 #define __TBB_flow_graph_streaming_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024 #if __TBB_PREVIEW_STREAMING_NODE
0025
0026
0027
0028 namespace internal {
0029
0030 template <int N1, int N2>
0031 struct port_ref_impl {
0032
0033 static const int size = N2 - N1 + 1;
0034 };
0035
0036 }
0037
0038
0039
0040 template <int N1, int N2 = N1>
0041 __TBB_DEPRECATED internal::port_ref_impl<N1,N2> port_ref() {
0042 return internal::port_ref_impl<N1,N2>();
0043 };
0044
0045 namespace internal {
0046
0047 template <typename T>
0048 struct num_arguments {
0049 static const int value = 1;
0050 };
0051
0052 template <int N1, int N2>
0053 struct num_arguments<port_ref_impl<N1,N2>(*)()> {
0054 static const int value = port_ref_impl<N1,N2>::size;
0055 };
0056
0057 template <int N1, int N2>
0058 struct num_arguments<port_ref_impl<N1,N2>> {
0059 static const int value = port_ref_impl<N1,N2>::size;
0060 };
0061
0062 template <typename... Args>
0063 void ignore_return_values( Args&&... ) {}
0064
0065 template <typename T>
0066 T or_return_values( T&& t ) { return t; }
0067 template <typename T, typename... Rest>
0068 T or_return_values( T&& t, Rest&&... rest ) {
0069 return t | or_return_values( std::forward<Rest>(rest)... );
0070 }
0071
0072 template<typename JP>
0073 struct key_from_policy {
0074 typedef size_t type;
0075 typedef std::false_type is_key_matching;
0076 };
0077
0078 template<typename Key>
0079 struct key_from_policy< key_matching<Key> > {
0080 typedef Key type;
0081 typedef std::true_type is_key_matching;
0082 };
0083
0084 template<typename Key>
0085 struct key_from_policy< key_matching<Key&> > {
0086 typedef const Key &type;
0087 typedef std::true_type is_key_matching;
0088 };
0089
0090 template<typename Device, typename Key>
0091 class streaming_device_with_key {
0092 Device my_device;
0093 typename std::decay<Key>::type my_key;
0094 public:
0095
0096 streaming_device_with_key() {}
0097 streaming_device_with_key( const Device& d, Key k ) : my_device( d ), my_key( k ) {}
0098 Key key() const { return my_key; }
0099 const Device& device() const { return my_device; }
0100 };
0101
0102
0103 template <typename T>
0104 struct is_port_ref_impl {
0105 typedef std::false_type type;
0106 };
0107
0108 template <int N1, int N2>
0109 struct is_port_ref_impl< port_ref_impl<N1, N2> > {
0110 typedef std::true_type type;
0111 };
0112
0113 template <int N1, int N2>
0114 struct is_port_ref_impl< port_ref_impl<N1, N2>( * )() > {
0115 typedef std::true_type type;
0116 };
0117
0118 template <typename T>
0119 struct is_port_ref {
0120 typedef typename is_port_ref_impl< typename tbb::internal::strip<T>::type >::type type;
0121 };
0122
0123 template <typename ...Args1>
0124 struct convert_and_call_impl;
0125
0126 template <typename A1, typename ...Args1>
0127 struct convert_and_call_impl<A1, Args1...> {
0128 static const size_t my_delta = 1;
0129
0130 template <typename F, typename Tuple, typename ...Args2>
0131 static void doit(F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
0132 convert_and_call_impl<A1, Args1...>::doit_impl(typename is_port_ref<A1>::type(), f, t, a1, args1..., args2...);
0133 }
0134 template <typename F, typename Tuple, typename ...Args2>
0135 static void doit_impl(std::false_type, F& f, Tuple& t, A1& a1, Args1&... args1, Args2&... args2) {
0136 convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., a1);
0137 }
0138 template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
0139 static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>, Args1&... args1, Args2&... args2) {
0140 convert_and_call_impl<port_ref_impl<N1 + 1,N2>, Args1...>::doit_impl(x, f, t, port_ref<N1 + 1, N2>(), args1...,
0141 args2..., std::get<N1 + my_delta>(t));
0142 }
0143 template <typename F, typename Tuple, int N, typename ...Args2>
0144 static void doit_impl(std::true_type, F& f, Tuple& t, port_ref_impl<N, N>, Args1&... args1, Args2&... args2) {
0145 convert_and_call_impl<Args1...>::doit(f, t, args1..., args2..., std::get<N + my_delta>(t));
0146 }
0147
0148 template <typename F, typename Tuple, int N1, int N2, typename ...Args2>
0149 static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N1, N2>(* fn)(), Args1&... args1, Args2&... args2) {
0150 doit_impl(x, f, t, fn(), args1..., args2...);
0151 }
0152 template <typename F, typename Tuple, int N, typename ...Args2>
0153 static void doit_impl(std::true_type x, F& f, Tuple& t, port_ref_impl<N, N>(* fn)(), Args1&... args1, Args2&... args2) {
0154 doit_impl(x, f, t, fn(), args1..., args2...);
0155 }
0156 };
0157
0158 template <>
0159 struct convert_and_call_impl<> {
0160 template <typename F, typename Tuple, typename ...Args2>
0161 static void doit(F& f, Tuple&, Args2&... args2) {
0162 f(args2...);
0163 }
0164 };
0165
0166
0167 template<typename JP, typename StreamFactory, typename... Ports>
0168 struct streaming_node_traits {
0169
0170 template <typename T>
0171 struct async_msg_type {
0172 typedef typename StreamFactory::template async_msg_type<T> type;
0173 };
0174
0175 typedef tuple< typename async_msg_type<Ports>::type... > input_tuple;
0176 typedef input_tuple output_tuple;
0177 typedef tuple< streaming_device_with_key< typename StreamFactory::device_type, typename key_from_policy<JP>::type >,
0178 typename async_msg_type<Ports>::type... > kernel_input_tuple;
0179
0180
0181 typedef indexer_node< typename async_msg_type<Ports>::type... > indexer_node_type;
0182 };
0183
0184
0185 template<typename StreamFactory, typename KernelInputTuple, typename = void>
0186 class kernel_executor_helper {
0187 typedef typename StreamFactory::device_type device_type;
0188 typedef typename StreamFactory::kernel_type kernel_type;
0189 typedef KernelInputTuple kernel_input_tuple;
0190 protected:
0191 template <typename ...Args>
0192 void enqueue_kernel_impl( kernel_input_tuple&, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
0193 factory.send_kernel( device, kernel, args... );
0194 }
0195 };
0196
0197
0198 template<typename StreamFactory, typename KernelInputTuple>
0199 class kernel_executor_helper<StreamFactory, KernelInputTuple, typename tbb::internal::void_t< typename StreamFactory::range_type >::type > {
0200 typedef typename StreamFactory::device_type device_type;
0201 typedef typename StreamFactory::kernel_type kernel_type;
0202 typedef KernelInputTuple kernel_input_tuple;
0203
0204 typedef typename StreamFactory::range_type range_type;
0205
0206
0207 struct range_wrapper {
0208 virtual range_type get_range( const kernel_input_tuple &ip ) const = 0;
0209 virtual range_wrapper *clone() const = 0;
0210 virtual ~range_wrapper() {}
0211 };
0212
0213 struct range_value : public range_wrapper {
0214 range_value( const range_type& value ) : my_value(value) {}
0215
0216 range_value( range_type&& value ) : my_value(std::move(value)) {}
0217
0218 range_type get_range( const kernel_input_tuple & ) const __TBB_override {
0219 return my_value;
0220 }
0221
0222 range_wrapper *clone() const __TBB_override {
0223 return new range_value(my_value);
0224 }
0225 private:
0226 range_type my_value;
0227 };
0228
0229 template <int N>
0230 struct range_mapper : public range_wrapper {
0231 range_mapper() {}
0232
0233 range_type get_range( const kernel_input_tuple &ip ) const __TBB_override {
0234
0235 return get<N + 1>(ip).data(false);
0236 }
0237
0238 range_wrapper *clone() const __TBB_override {
0239 return new range_mapper<N>;
0240 }
0241 };
0242
0243 protected:
0244 template <typename ...Args>
0245 void enqueue_kernel_impl( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
0246 __TBB_ASSERT(my_range_wrapper, "Range is not set. Call set_range() before running streaming_node.");
0247 factory.send_kernel( device, kernel, my_range_wrapper->get_range(ip), args... );
0248 }
0249
0250 public:
0251 kernel_executor_helper() : my_range_wrapper(NULL) {}
0252
0253 kernel_executor_helper(const kernel_executor_helper& executor) : my_range_wrapper(executor.my_range_wrapper ? executor.my_range_wrapper->clone() : NULL) {}
0254
0255 kernel_executor_helper(kernel_executor_helper&& executor) : my_range_wrapper(executor.my_range_wrapper) {
0256
0257 executor.my_range_wrapper = NULL;
0258 }
0259
0260 ~kernel_executor_helper() {
0261 if (my_range_wrapper) delete my_range_wrapper;
0262 }
0263
0264 void set_range(const range_type& work_size) {
0265 my_range_wrapper = new range_value(work_size);
0266 }
0267
0268 void set_range(range_type&& work_size) {
0269 my_range_wrapper = new range_value(std::move(work_size));
0270 }
0271
0272 template <int N>
0273 void set_range(port_ref_impl<N, N>) {
0274 my_range_wrapper = new range_mapper<N>;
0275 }
0276
0277 template <int N>
0278 void set_range(port_ref_impl<N, N>(*)()) {
0279 my_range_wrapper = new range_mapper<N>;
0280 }
0281
0282 private:
0283 range_wrapper* my_range_wrapper;
0284 };
0285
0286 }
0287
0288
0289
0290
0291
0292
0293
0294
0295
0296
0297
0298
0299
0300
0301 template<typename... Args>
0302 class __TBB_DEPRECATED streaming_node;
0303
0304 template<typename... Ports, typename JP, typename StreamFactory>
0305 class __TBB_DEPRECATED
0306 streaming_node< tuple<Ports...>, JP, StreamFactory >
0307 : public composite_node < typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple,
0308 typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple >
0309 , public internal::kernel_executor_helper< StreamFactory, typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple >
0310 {
0311 typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::input_tuple input_tuple;
0312 typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::output_tuple output_tuple;
0313 typedef typename internal::key_from_policy<JP>::type key_type;
0314 protected:
0315 typedef typename StreamFactory::device_type device_type;
0316 typedef typename StreamFactory::kernel_type kernel_type;
0317 private:
0318 typedef internal::streaming_device_with_key<device_type, key_type> device_with_key_type;
0319 typedef composite_node<input_tuple, output_tuple> base_type;
0320 static const size_t NUM_INPUTS = tuple_size<input_tuple>::value;
0321 static const size_t NUM_OUTPUTS = tuple_size<output_tuple>::value;
0322
0323 typedef typename internal::make_sequence<NUM_INPUTS>::type input_sequence;
0324 typedef typename internal::make_sequence<NUM_OUTPUTS>::type output_sequence;
0325
0326 typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::indexer_node_type indexer_node_type;
0327 typedef typename indexer_node_type::output_type indexer_node_output_type;
0328 typedef typename internal::streaming_node_traits<JP, StreamFactory, Ports...>::kernel_input_tuple kernel_input_tuple;
0329 typedef multifunction_node<indexer_node_output_type, kernel_input_tuple> device_selector_node;
0330 typedef multifunction_node<kernel_input_tuple, output_tuple> kernel_multifunction_node;
0331
0332 template <int... S>
0333 typename base_type::input_ports_type get_input_ports( internal::sequence<S...> ) {
0334 return std::tie( internal::input_port<S>( my_indexer_node )... );
0335 }
0336
0337 template <int... S>
0338 typename base_type::output_ports_type get_output_ports( internal::sequence<S...> ) {
0339 return std::tie( internal::output_port<S>( my_kernel_node )... );
0340 }
0341
0342 typename base_type::input_ports_type get_input_ports() {
0343 return get_input_ports( input_sequence() );
0344 }
0345
0346 typename base_type::output_ports_type get_output_ports() {
0347 return get_output_ports( output_sequence() );
0348 }
0349
0350 template <int N>
0351 int make_Nth_edge() {
0352 make_edge( internal::output_port<N>( my_device_selector_node ), internal::input_port<N>( my_join_node ) );
0353 return 0;
0354 }
0355
0356 template <int... S>
0357 void make_edges( internal::sequence<S...> ) {
0358 make_edge( my_indexer_node, my_device_selector_node );
0359 make_edge( my_device_selector_node, my_join_node );
0360 internal::ignore_return_values( make_Nth_edge<S + 1>()... );
0361 make_edge( my_join_node, my_kernel_node );
0362 }
0363
0364 void make_edges() {
0365 make_edges( input_sequence() );
0366 }
0367
0368 class device_selector_base {
0369 public:
0370 virtual void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) = 0;
0371 virtual device_selector_base *clone( streaming_node &n ) const = 0;
0372 virtual ~device_selector_base() {}
0373 };
0374
0375 template <typename UserFunctor>
0376 class device_selector : public device_selector_base, tbb::internal::no_assign {
0377 public:
0378 device_selector( UserFunctor uf, streaming_node &n, StreamFactory &f )
0379 : my_dispatch_funcs( create_dispatch_funcs( input_sequence() ) )
0380 , my_user_functor( uf ), my_node(n), my_factory( f )
0381 {
0382 my_port_epoches.fill( 0 );
0383 }
0384
0385 void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) __TBB_override {
0386 (this->*my_dispatch_funcs[ v.tag() ])( my_port_epoches[ v.tag() ], v, op );
0387 __TBB_ASSERT( (tbb::internal::is_same_type<typename internal::key_from_policy<JP>::is_key_matching, std::false_type>::value)
0388 || my_port_epoches[v.tag()] == 0, "Epoch is changed when key matching is requested" );
0389 }
0390
0391 device_selector_base *clone( streaming_node &n ) const __TBB_override {
0392 return new device_selector( my_user_functor, n, my_factory );
0393 }
0394 private:
0395 typedef void(device_selector<UserFunctor>::*send_and_put_fn_type)(size_t &, const indexer_node_output_type &, typename device_selector_node::output_ports_type &);
0396 typedef std::array < send_and_put_fn_type, NUM_INPUTS > dispatch_funcs_type;
0397
0398 template <int... S>
0399 static dispatch_funcs_type create_dispatch_funcs( internal::sequence<S...> ) {
0400 dispatch_funcs_type dispatch = { { &device_selector<UserFunctor>::send_and_put_impl<S>... } };
0401 return dispatch;
0402 }
0403
0404 template <typename T>
0405 key_type get_key( std::false_type, const T &, size_t &epoch ) {
0406 __TBB_STATIC_ASSERT( (tbb::internal::is_same_type<key_type, size_t>::value), "" );
0407 return epoch++;
0408 }
0409
0410 template <typename T>
0411 key_type get_key( std::true_type, const T &t, size_t & ) {
0412 using tbb::flow::key_from_message;
0413 return key_from_message<key_type>( t );
0414 }
0415
0416 template <int N>
0417 void send_and_put_impl( size_t &epoch, const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
0418 typedef typename tuple_element<N + 1, typename device_selector_node::output_ports_type>::type::output_type elem_type;
0419 elem_type e = internal::cast_to<elem_type>( v );
0420 device_type device = get_device( get_key( typename internal::key_from_policy<JP>::is_key_matching(), e, epoch ), get<0>( op ) );
0421 my_factory.send_data( device, e );
0422 get<N + 1>( op ).try_put( e );
0423 }
0424
0425 template< typename DevicePort >
0426 device_type get_device( key_type key, DevicePort& dp ) {
0427 typename std::unordered_map<typename std::decay<key_type>::type, epoch_desc>::iterator it = my_devices.find( key );
0428 if ( it == my_devices.end() ) {
0429 device_type d = my_user_functor( my_factory );
0430 std::tie( it, std::ignore ) = my_devices.insert( std::make_pair( key, d ) );
0431 bool res = dp.try_put( device_with_key_type( d, key ) );
0432 __TBB_ASSERT_EX( res, NULL );
0433 my_node.notify_new_device( d );
0434 }
0435 epoch_desc &e = it->second;
0436 device_type d = e.my_device;
0437 if ( ++e.my_request_number == NUM_INPUTS ) my_devices.erase( it );
0438 return d;
0439 }
0440
0441 struct epoch_desc {
0442 epoch_desc(device_type d ) : my_device( d ), my_request_number( 0 ) {}
0443 device_type my_device;
0444 size_t my_request_number;
0445 };
0446
0447 std::unordered_map<typename std::decay<key_type>::type, epoch_desc> my_devices;
0448 std::array<size_t, NUM_INPUTS> my_port_epoches;
0449 dispatch_funcs_type my_dispatch_funcs;
0450 UserFunctor my_user_functor;
0451 streaming_node &my_node;
0452 StreamFactory &my_factory;
0453 };
0454
0455 class device_selector_body {
0456 public:
0457 device_selector_body( device_selector_base *d ) : my_device_selector( d ) {}
0458
0459 void operator()( const indexer_node_output_type &v, typename device_selector_node::output_ports_type &op ) {
0460 (*my_device_selector)(v, op);
0461 }
0462 private:
0463 device_selector_base *my_device_selector;
0464 };
0465
0466
0467 class args_storage_base : tbb::internal::no_copy {
0468 public:
0469 typedef typename kernel_multifunction_node::output_ports_type output_ports_type;
0470
0471 virtual void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) = 0;
0472 virtual void send( device_type d ) = 0;
0473 virtual args_storage_base *clone() const = 0;
0474 virtual ~args_storage_base () {}
0475
0476 protected:
0477 args_storage_base( const kernel_type& kernel, StreamFactory &f )
0478 : my_kernel( kernel ), my_factory( f )
0479 {}
0480
0481 args_storage_base( const args_storage_base &k )
0482 : tbb::internal::no_copy(), my_kernel( k.my_kernel ), my_factory( k.my_factory )
0483 {}
0484
0485 const kernel_type my_kernel;
0486 StreamFactory &my_factory;
0487 };
0488
0489 template <typename... Args>
0490 class args_storage : public args_storage_base {
0491 typedef typename args_storage_base::output_ports_type output_ports_type;
0492
0493
0494 template <int N>
0495 bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op ) const {
0496 const auto& t = get<N + 1>( ip );
0497 auto &port = get<N>( op );
0498 return port.try_put( t );
0499 }
0500
0501 template <int... S>
0502 bool do_try_put( const kernel_input_tuple& ip, output_ports_type &op, internal::sequence<S...> ) const {
0503 return internal::or_return_values( do_try_put<S>( ip, op )... );
0504 }
0505
0506
0507 class run_kernel_func : tbb::internal::no_assign {
0508 public:
0509 run_kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage )
0510 : my_kernel_func( ip, node, storage, get<0>(ip).device() ) {}
0511
0512
0513
0514 template <typename... FnArgs>
0515 void operator()( FnArgs&... args ) {
0516 internal::convert_and_call_impl<FnArgs...>::doit( my_kernel_func, my_kernel_func.my_ip, args... );
0517 }
0518 private:
0519 struct kernel_func : tbb::internal::no_copy {
0520 kernel_input_tuple &my_ip;
0521 const streaming_node &my_node;
0522 const args_storage& my_storage;
0523 device_type my_device;
0524
0525 kernel_func( kernel_input_tuple &ip, const streaming_node &node, const args_storage& storage, device_type device )
0526 : my_ip( ip ), my_node( node ), my_storage( storage ), my_device( device )
0527 {}
0528
0529 template <typename... FnArgs>
0530 void operator()( FnArgs&... args ) {
0531 my_node.enqueue_kernel( my_ip, my_storage.my_factory, my_device, my_storage.my_kernel, args... );
0532 }
0533 } my_kernel_func;
0534 };
0535
0536 template<typename FinalizeFn>
0537 class run_finalize_func : tbb::internal::no_assign {
0538 public:
0539 run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn )
0540 : my_ip( ip ), my_finalize_func( factory, get<0>(ip).device(), fn ) {}
0541
0542
0543
0544 template <typename... FnArgs>
0545 void operator()( FnArgs&... args ) {
0546 internal::convert_and_call_impl<FnArgs...>::doit( my_finalize_func, my_ip, args... );
0547 }
0548 private:
0549 kernel_input_tuple &my_ip;
0550
0551 struct finalize_func : tbb::internal::no_assign {
0552 StreamFactory &my_factory;
0553 device_type my_device;
0554 FinalizeFn my_fn;
0555
0556 finalize_func( StreamFactory &factory, device_type device, FinalizeFn fn )
0557 : my_factory(factory), my_device(device), my_fn(fn) {}
0558
0559 template <typename... FnArgs>
0560 void operator()( FnArgs&... args ) {
0561 my_factory.finalize( my_device, my_fn, args... );
0562 }
0563 } my_finalize_func;
0564 };
0565
0566 template<typename FinalizeFn>
0567 static run_finalize_func<FinalizeFn> make_run_finalize_func( kernel_input_tuple &ip, StreamFactory &factory, FinalizeFn fn ) {
0568 return run_finalize_func<FinalizeFn>( ip, factory, fn );
0569 }
0570
0571 class send_func : tbb::internal::no_assign {
0572 public:
0573 send_func( StreamFactory &factory, device_type d )
0574 : my_factory(factory), my_device( d ) {}
0575
0576 template <typename... FnArgs>
0577 void operator()( FnArgs&... args ) {
0578 my_factory.send_data( my_device, args... );
0579 }
0580 private:
0581 StreamFactory &my_factory;
0582 device_type my_device;
0583 };
0584
0585 public:
0586 args_storage( const kernel_type& kernel, StreamFactory &f, Args&&... args )
0587 : args_storage_base( kernel, f )
0588 , my_args_pack( std::forward<Args>(args)... )
0589 {}
0590
0591 args_storage( const args_storage &k ) : args_storage_base( k ), my_args_pack( k.my_args_pack ) {}
0592
0593 args_storage( const args_storage_base &k, Args&&... args ) : args_storage_base( k ), my_args_pack( std::forward<Args>(args)... ) {}
0594
0595 void enqueue( kernel_input_tuple &ip, output_ports_type &op, const streaming_node &n ) __TBB_override {
0596
0597 const args_pack_type& const_args_pack = my_args_pack;
0598
0599
0600
0601 tbb::internal::call( run_kernel_func( ip, n, *this ), const_args_pack );
0602
0603 if (! do_try_put( ip, op, input_sequence() ) ) {
0604 graph& g = n.my_graph;
0605
0606 g.increment_wait_count();
0607
0608
0609
0610
0611 tbb::internal::call( make_run_finalize_func(ip, this->my_factory, [&g] {
0612 g.decrement_wait_count();
0613 }), const_args_pack );
0614 }
0615 }
0616
0617 void send( device_type d ) __TBB_override {
0618
0619
0620 tbb::internal::call( send_func( this->my_factory, d ), my_args_pack );
0621 }
0622
0623 args_storage_base *clone() const __TBB_override {
0624
0625 return new args_storage<Args...>( *this );
0626 }
0627
0628 private:
0629 typedef tbb::internal::stored_pack<Args...> args_pack_type;
0630 args_pack_type my_args_pack;
0631 };
0632
0633
0634 class kernel_body : tbb::internal::no_assign {
0635 public:
0636 kernel_body( const streaming_node &node ) : my_node( node ) {}
0637
0638 void operator()( kernel_input_tuple ip, typename args_storage_base::output_ports_type &op ) {
0639 __TBB_ASSERT( (my_node.my_args_storage != NULL), "No arguments storage" );
0640
0641 my_node.my_args_storage->enqueue( ip, op, my_node );
0642 }
0643 private:
0644 const streaming_node &my_node;
0645 };
0646
0647 template <typename T, typename U = typename internal::is_port_ref<T>::type >
0648 struct wrap_to_async {
0649 typedef T type;
0650 };
0651
0652 template <typename T>
0653 struct wrap_to_async<T, std::false_type> {
0654 typedef typename StreamFactory::template async_msg_type< typename tbb::internal::strip<T>::type > type;
0655 };
0656
0657 template <typename... Args>
0658 args_storage_base *make_args_storage(const args_storage_base& storage, Args&&... args) const {
0659
0660 return new args_storage<Args...>(storage, std::forward<Args>(args)...);
0661 }
0662
0663 void notify_new_device( device_type d ) {
0664 my_args_storage->send( d );
0665 }
0666
0667 template <typename ...Args>
0668 void enqueue_kernel( kernel_input_tuple& ip, StreamFactory& factory, device_type device, const kernel_type& kernel, Args&... args ) const {
0669 this->enqueue_kernel_impl( ip, factory, device, kernel, args... );
0670 }
0671
0672 public:
0673 template <typename DeviceSelector>
0674 streaming_node( graph &g, const kernel_type& kernel, DeviceSelector d, StreamFactory &f )
0675 : base_type( g )
0676 , my_indexer_node( g )
0677 , my_device_selector( new device_selector<DeviceSelector>( d, *this, f ) )
0678 , my_device_selector_node( g, serial, device_selector_body( my_device_selector ) )
0679 , my_join_node( g )
0680 , my_kernel_node( g, serial, kernel_body( *this ) )
0681
0682 , my_args_storage( make_args_storage( args_storage<>(kernel, f), port_ref<0, NUM_INPUTS - 1>() ) )
0683 {
0684 base_type::set_external_ports( get_input_ports(), get_output_ports() );
0685 make_edges();
0686 }
0687
0688 streaming_node( const streaming_node &node )
0689 : base_type( node.my_graph )
0690 , my_indexer_node( node.my_indexer_node )
0691 , my_device_selector( node.my_device_selector->clone( *this ) )
0692 , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
0693 , my_join_node( node.my_join_node )
0694 , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
0695 , my_args_storage( node.my_args_storage->clone() )
0696 {
0697 base_type::set_external_ports( get_input_ports(), get_output_ports() );
0698 make_edges();
0699 }
0700
0701 streaming_node( streaming_node &&node )
0702 : base_type( node.my_graph )
0703 , my_indexer_node( std::move( node.my_indexer_node ) )
0704 , my_device_selector( node.my_device_selector->clone(*this) )
0705 , my_device_selector_node( node.my_graph, serial, device_selector_body( my_device_selector ) )
0706 , my_join_node( std::move( node.my_join_node ) )
0707 , my_kernel_node( node.my_graph, serial, kernel_body( *this ) )
0708 , my_args_storage( node.my_args_storage )
0709 {
0710 base_type::set_external_ports( get_input_ports(), get_output_ports() );
0711 make_edges();
0712
0713 node.my_args_storage = NULL;
0714 }
0715
0716 ~streaming_node() {
0717 if ( my_args_storage ) delete my_args_storage;
0718 if ( my_device_selector ) delete my_device_selector;
0719 }
0720
0721 template <typename... Args>
0722 void set_args( Args&&... args ) {
0723
0724 args_storage_base * const new_args_storage = make_args_storage( *my_args_storage, typename wrap_to_async<Args>::type(std::forward<Args>(args))...);
0725 delete my_args_storage;
0726 my_args_storage = new_args_storage;
0727 }
0728
0729 protected:
0730 void reset_node( reset_flags = rf_reset_protocol ) __TBB_override { __TBB_ASSERT( false, "Not implemented yet" ); }
0731
0732 private:
0733 indexer_node_type my_indexer_node;
0734 device_selector_base *my_device_selector;
0735 device_selector_node my_device_selector_node;
0736 join_node<kernel_input_tuple, JP> my_join_node;
0737 kernel_multifunction_node my_kernel_node;
0738
0739 args_storage_base *my_args_storage;
0740 };
0741
0742 #endif
0743 #endif