File indexing completed on 2025-07-30 08:46:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB__flow_graph_node_impl_H
0018 #define __TBB__flow_graph_node_impl_H
0019
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023
0024 #include "_flow_graph_item_buffer_impl.h"
0025
0026 template< typename T, typename A >
0027 class function_input_queue : public item_buffer<T,A> {
0028 public:
0029 bool empty() const {
0030 return this->buffer_empty();
0031 }
0032
0033 const T& front() const {
0034 return this->item_buffer<T, A>::front();
0035 }
0036
0037 void pop() {
0038 this->destroy_front();
0039 }
0040
0041 bool push( T& t ) {
0042 return this->push_back( t );
0043 }
0044 };
0045
0046
0047
0048
0049 template< typename Input, typename Policy, typename A, typename ImplType >
0050 class function_input_base : public receiver<Input>, no_assign {
0051 enum op_type {reg_pred, rem_pred, try_fwd, tryput_bypass, app_body_bypass, occupy_concurrency
0052 };
0053 typedef function_input_base<Input, Policy, A, ImplType> class_type;
0054
0055 public:
0056
0057
0058 typedef Input input_type;
0059 typedef typename receiver<input_type>::predecessor_type predecessor_type;
0060 typedef predecessor_cache<input_type, null_mutex > predecessor_cache_type;
0061 typedef function_input_queue<input_type, A> input_queue_type;
0062 typedef typename allocator_traits<A>::template rebind_alloc<input_queue_type> allocator_type;
0063 static_assert(!has_policy<queueing, Policy>::value || !has_policy<rejecting, Policy>::value, "");
0064
0065
0066 function_input_base( graph &g, size_t max_concurrency, node_priority_t a_priority, bool is_no_throw )
0067 : my_graph_ref(g), my_max_concurrency(max_concurrency)
0068 , my_concurrency(0), my_priority(a_priority), my_is_no_throw(is_no_throw)
0069 , my_queue(!has_policy<rejecting, Policy>::value ? new input_queue_type() : nullptr)
0070 , my_predecessors(this)
0071 , forwarder_busy(false)
0072 {
0073 my_aggregator.initialize_handler(handler_type(this));
0074 }
0075
0076
0077 function_input_base( const function_input_base& src )
0078 : function_input_base(src.my_graph_ref, src.my_max_concurrency, src.my_priority, src.my_is_no_throw) {}
0079
0080
0081
0082
0083
0084 virtual ~function_input_base() {
0085 delete my_queue;
0086 my_queue = nullptr;
0087 }
0088
0089 graph_task* try_put_task( const input_type& t) override {
0090 if ( my_is_no_throw )
0091 return try_put_task_impl(t, has_policy<lightweight, Policy>());
0092 else
0093 return try_put_task_impl(t, std::false_type());
0094 }
0095
0096
0097 bool register_predecessor( predecessor_type &src ) override {
0098 operation_type op_data(reg_pred);
0099 op_data.r = &src;
0100 my_aggregator.execute(&op_data);
0101 return true;
0102 }
0103
0104
0105 bool remove_predecessor( predecessor_type &src ) override {
0106 operation_type op_data(rem_pred);
0107 op_data.r = &src;
0108 my_aggregator.execute(&op_data);
0109 return true;
0110 }
0111
0112 protected:
0113
0114 void reset_function_input_base( reset_flags f) {
0115 my_concurrency = 0;
0116 if(my_queue) {
0117 my_queue->reset();
0118 }
0119 reset_receiver(f);
0120 forwarder_busy = false;
0121 }
0122
0123 graph& my_graph_ref;
0124 const size_t my_max_concurrency;
0125 size_t my_concurrency;
0126 node_priority_t my_priority;
0127 const bool my_is_no_throw;
0128 input_queue_type *my_queue;
0129 predecessor_cache<input_type, null_mutex > my_predecessors;
0130
0131 void reset_receiver( reset_flags f) {
0132 if( f & rf_clear_edges) my_predecessors.clear();
0133 else
0134 my_predecessors.reset();
0135 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.empty(), "function_input_base reset failed");
0136 }
0137
0138 graph& graph_reference() const override {
0139 return my_graph_ref;
0140 }
0141
0142 graph_task* try_get_postponed_task(const input_type& i) {
0143 operation_type op_data(i, app_body_bypass);
0144 my_aggregator.execute(&op_data);
0145 return op_data.bypass_t;
0146 }
0147
0148 private:
0149
0150 friend class apply_body_task_bypass< class_type, input_type >;
0151 friend class forward_task_bypass< class_type >;
0152
0153 class operation_type : public aggregated_operation< operation_type > {
0154 public:
0155 char type;
0156 union {
0157 input_type *elem;
0158 predecessor_type *r;
0159 };
0160 graph_task* bypass_t;
0161 operation_type(const input_type& e, op_type t) :
0162 type(char(t)), elem(const_cast<input_type*>(&e)), bypass_t(nullptr) {}
0163 operation_type(op_type t) : type(char(t)), r(nullptr), bypass_t(nullptr) {}
0164 };
0165
0166 bool forwarder_busy;
0167 typedef aggregating_functor<class_type, operation_type> handler_type;
0168 friend class aggregating_functor<class_type, operation_type>;
0169 aggregator< handler_type, operation_type > my_aggregator;
0170
0171 graph_task* perform_queued_requests() {
0172 graph_task* new_task = nullptr;
0173 if(my_queue) {
0174 if(!my_queue->empty()) {
0175 ++my_concurrency;
0176 new_task = create_body_task(my_queue->front());
0177
0178 my_queue->pop();
0179 }
0180 }
0181 else {
0182 input_type i;
0183 if(my_predecessors.get_item(i)) {
0184 ++my_concurrency;
0185 new_task = create_body_task(i);
0186 }
0187 }
0188 return new_task;
0189 }
0190 void handle_operations(operation_type *op_list) {
0191 operation_type* tmp;
0192 while (op_list) {
0193 tmp = op_list;
0194 op_list = op_list->next;
0195 switch (tmp->type) {
0196 case reg_pred:
0197 my_predecessors.add(*(tmp->r));
0198 tmp->status.store(SUCCEEDED, std::memory_order_release);
0199 if (!forwarder_busy) {
0200 forwarder_busy = true;
0201 spawn_forward_task();
0202 }
0203 break;
0204 case rem_pred:
0205 my_predecessors.remove(*(tmp->r));
0206 tmp->status.store(SUCCEEDED, std::memory_order_release);
0207 break;
0208 case app_body_bypass: {
0209 tmp->bypass_t = nullptr;
0210 __TBB_ASSERT(my_max_concurrency != 0, nullptr);
0211 --my_concurrency;
0212 if(my_concurrency<my_max_concurrency)
0213 tmp->bypass_t = perform_queued_requests();
0214 tmp->status.store(SUCCEEDED, std::memory_order_release);
0215 }
0216 break;
0217 case tryput_bypass: internal_try_put_task(tmp); break;
0218 case try_fwd: internal_forward(tmp); break;
0219 case occupy_concurrency:
0220 if (my_concurrency < my_max_concurrency) {
0221 ++my_concurrency;
0222 tmp->status.store(SUCCEEDED, std::memory_order_release);
0223 } else {
0224 tmp->status.store(FAILED, std::memory_order_release);
0225 }
0226 break;
0227 }
0228 }
0229 }
0230
0231
0232 void internal_try_put_task(operation_type *op) {
0233 __TBB_ASSERT(my_max_concurrency != 0, nullptr);
0234 if (my_concurrency < my_max_concurrency) {
0235 ++my_concurrency;
0236 graph_task * new_task = create_body_task(*(op->elem));
0237 op->bypass_t = new_task;
0238 op->status.store(SUCCEEDED, std::memory_order_release);
0239 } else if ( my_queue && my_queue->push(*(op->elem)) ) {
0240 op->bypass_t = SUCCESSFULLY_ENQUEUED;
0241 op->status.store(SUCCEEDED, std::memory_order_release);
0242 } else {
0243 op->bypass_t = nullptr;
0244 op->status.store(FAILED, std::memory_order_release);
0245 }
0246 }
0247
0248
0249 void internal_forward(operation_type *op) {
0250 op->bypass_t = nullptr;
0251 if (my_concurrency < my_max_concurrency)
0252 op->bypass_t = perform_queued_requests();
0253 if(op->bypass_t)
0254 op->status.store(SUCCEEDED, std::memory_order_release);
0255 else {
0256 forwarder_busy = false;
0257 op->status.store(FAILED, std::memory_order_release);
0258 }
0259 }
0260
0261 graph_task* internal_try_put_bypass( const input_type& t ) {
0262 operation_type op_data(t, tryput_bypass);
0263 my_aggregator.execute(&op_data);
0264 if( op_data.status == SUCCEEDED ) {
0265 return op_data.bypass_t;
0266 }
0267 return nullptr;
0268 }
0269
0270 graph_task* try_put_task_impl( const input_type& t, std::true_type ) {
0271 if( my_max_concurrency == 0 ) {
0272 return apply_body_bypass(t);
0273 } else {
0274 operation_type check_op(t, occupy_concurrency);
0275 my_aggregator.execute(&check_op);
0276 if( check_op.status == SUCCEEDED ) {
0277 return apply_body_bypass(t);
0278 }
0279 return internal_try_put_bypass(t);
0280 }
0281 }
0282
0283 graph_task* try_put_task_impl( const input_type& t, std::false_type ) {
0284 if( my_max_concurrency == 0 ) {
0285 return create_body_task(t);
0286 } else {
0287 return internal_try_put_bypass(t);
0288 }
0289 }
0290
0291
0292
0293 graph_task* apply_body_bypass( const input_type &i ) {
0294 return static_cast<ImplType *>(this)->apply_body_impl_bypass(i);
0295 }
0296
0297
0298 graph_task* create_body_task( const input_type &input ) {
0299 if (!is_graph_active(my_graph_ref)) {
0300 return nullptr;
0301 }
0302
0303 small_object_allocator allocator{};
0304 typedef apply_body_task_bypass<class_type, input_type> task_type;
0305 graph_task* t = allocator.new_object<task_type>( my_graph_ref, allocator, *this, input, my_priority );
0306 graph_reference().reserve_wait();
0307 return t;
0308 }
0309
0310
0311 graph_task* forward_task() {
0312 operation_type op_data(try_fwd);
0313 graph_task* rval = nullptr;
0314 do {
0315 op_data.status = WAIT;
0316 my_aggregator.execute(&op_data);
0317 if(op_data.status == SUCCEEDED) {
0318 graph_task* ttask = op_data.bypass_t;
0319 __TBB_ASSERT( ttask && ttask != SUCCESSFULLY_ENQUEUED, nullptr);
0320 rval = combine_tasks(my_graph_ref, rval, ttask);
0321 }
0322 } while (op_data.status == SUCCEEDED);
0323 return rval;
0324 }
0325
0326 inline graph_task* create_forward_task() {
0327 if (!is_graph_active(my_graph_ref)) {
0328 return nullptr;
0329 }
0330 small_object_allocator allocator{};
0331 typedef forward_task_bypass<class_type> task_type;
0332 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, my_priority );
0333 graph_reference().reserve_wait();
0334 return t;
0335 }
0336
0337
0338 inline void spawn_forward_task() {
0339 graph_task* tp = create_forward_task();
0340 if(tp) {
0341 spawn_in_graph_arena(graph_reference(), *tp);
0342 }
0343 }
0344
0345 node_priority_t priority() const override { return my_priority; }
0346 };
0347
0348
0349
0350 template< typename Input, typename Output, typename Policy, typename A>
0351 class function_input : public function_input_base<Input, Policy, A, function_input<Input,Output,Policy,A> > {
0352 public:
0353 typedef Input input_type;
0354 typedef Output output_type;
0355 typedef function_body<input_type, output_type> function_body_type;
0356 typedef function_input<Input, Output, Policy,A> my_class;
0357 typedef function_input_base<Input, Policy, A, my_class> base_type;
0358 typedef function_input_queue<input_type, A> input_queue_type;
0359
0360
0361 template<typename Body>
0362 function_input(
0363 graph &g, size_t max_concurrency, Body& body, node_priority_t a_priority )
0364 : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type())))
0365 , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0366 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) ) {
0367 }
0368
0369
0370 function_input( const function_input& src ) :
0371 base_type(src),
0372 my_body( src.my_init_body->clone() ),
0373 my_init_body(src.my_init_body->clone() ) {
0374 }
0375 #if __INTEL_COMPILER <= 2021
0376
0377
0378 virtual
0379 #endif
0380 ~function_input() {
0381 delete my_body;
0382 delete my_init_body;
0383 }
0384
0385 template< typename Body >
0386 Body copy_function_object() {
0387 function_body_type &body_ref = *this->my_body;
0388 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0389 }
0390
0391 output_type apply_body_impl( const input_type& i) {
0392
0393
0394 fgt_begin_body( my_body );
0395 output_type v = tbb::detail::invoke(*my_body, i);
0396 fgt_end_body( my_body );
0397 return v;
0398 }
0399
0400
0401 graph_task* apply_body_impl_bypass( const input_type &i) {
0402 output_type v = apply_body_impl(i);
0403 graph_task* postponed_task = nullptr;
0404 if( base_type::my_max_concurrency != 0 ) {
0405 postponed_task = base_type::try_get_postponed_task(i);
0406 __TBB_ASSERT( !postponed_task || postponed_task != SUCCESSFULLY_ENQUEUED, nullptr);
0407 }
0408 if( postponed_task ) {
0409
0410
0411 spawn_in_graph_arena(base_type::graph_reference(), *postponed_task);
0412 }
0413 graph_task* successor_task = successors().try_put_task(v);
0414 #if _MSC_VER && !__INTEL_COMPILER
0415 #pragma warning (push)
0416 #pragma warning (disable: 4127)
0417 #endif
0418 if(has_policy<lightweight, Policy>::value) {
0419 #if _MSC_VER && !__INTEL_COMPILER
0420 #pragma warning (pop)
0421 #endif
0422 if(!successor_task) {
0423
0424
0425 successor_task = SUCCESSFULLY_ENQUEUED;
0426 }
0427 }
0428 return successor_task;
0429 }
0430
0431 protected:
0432
0433 void reset_function_input(reset_flags f) {
0434 base_type::reset_function_input_base(f);
0435 if(f & rf_reset_bodies) {
0436 function_body_type *tmp = my_init_body->clone();
0437 delete my_body;
0438 my_body = tmp;
0439 }
0440 }
0441
0442 function_body_type *my_body;
0443 function_body_type *my_init_body;
0444 virtual broadcast_cache<output_type > &successors() = 0;
0445
0446 };
0447
0448
0449
0450 template<int N> struct clear_element {
0451 template<typename P> static void clear_this(P &p) {
0452 (void)std::get<N-1>(p).successors().clear();
0453 clear_element<N-1>::clear_this(p);
0454 }
0455 #if TBB_USE_ASSERT
0456 template<typename P> static bool this_empty(P &p) {
0457 if(std::get<N-1>(p).successors().empty())
0458 return clear_element<N-1>::this_empty(p);
0459 return false;
0460 }
0461 #endif
0462 };
0463
0464 template<> struct clear_element<1> {
0465 template<typename P> static void clear_this(P &p) {
0466 (void)std::get<0>(p).successors().clear();
0467 }
0468 #if TBB_USE_ASSERT
0469 template<typename P> static bool this_empty(P &p) {
0470 return std::get<0>(p).successors().empty();
0471 }
0472 #endif
0473 };
0474
0475 template <typename OutputTuple>
0476 struct init_output_ports {
0477 template <typename... Args>
0478 static OutputTuple call(graph& g, const std::tuple<Args...>&) {
0479 return OutputTuple(Args(g)...);
0480 }
0481 };
0482
0483
0484
0485 template< typename Input, typename OutputPortSet, typename Policy, typename A>
0486 class multifunction_input : public function_input_base<Input, Policy, A, multifunction_input<Input,OutputPortSet,Policy,A> > {
0487 public:
0488 static const int N = std::tuple_size<OutputPortSet>::value;
0489 typedef Input input_type;
0490 typedef OutputPortSet output_ports_type;
0491 typedef multifunction_body<input_type, output_ports_type> multifunction_body_type;
0492 typedef multifunction_input<Input, OutputPortSet, Policy, A> my_class;
0493 typedef function_input_base<Input, Policy, A, my_class> base_type;
0494 typedef function_input_queue<input_type, A> input_queue_type;
0495
0496
0497 template<typename Body>
0498 multifunction_input(graph &g, size_t max_concurrency,Body& body, node_priority_t a_priority )
0499 : base_type(g, max_concurrency, a_priority, noexcept(tbb::detail::invoke(body, input_type(), my_output_ports)))
0500 , my_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0501 , my_init_body( new multifunction_body_leaf<input_type, output_ports_type, Body>(body) )
0502 , my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)){
0503 }
0504
0505
0506 multifunction_input( const multifunction_input& src ) :
0507 base_type(src),
0508 my_body( src.my_init_body->clone() ),
0509 my_init_body(src.my_init_body->clone() ),
0510 my_output_ports( init_output_ports<output_ports_type>::call(src.my_graph_ref, my_output_ports) ) {
0511 }
0512
0513 ~multifunction_input() {
0514 delete my_body;
0515 delete my_init_body;
0516 }
0517
0518 template< typename Body >
0519 Body copy_function_object() {
0520 multifunction_body_type &body_ref = *this->my_body;
0521 return *static_cast<Body*>(dynamic_cast< multifunction_body_leaf<input_type, output_ports_type, Body> & >(body_ref).get_body_ptr());
0522 }
0523
0524
0525
0526
0527 graph_task* apply_body_impl_bypass( const input_type &i ) {
0528 fgt_begin_body( my_body );
0529 (*my_body)(i, my_output_ports);
0530 fgt_end_body( my_body );
0531 graph_task* ttask = nullptr;
0532 if(base_type::my_max_concurrency != 0) {
0533 ttask = base_type::try_get_postponed_task(i);
0534 }
0535 return ttask ? ttask : SUCCESSFULLY_ENQUEUED;
0536 }
0537
0538 output_ports_type &output_ports(){ return my_output_ports; }
0539
0540 protected:
0541
0542 void reset(reset_flags f) {
0543 base_type::reset_function_input_base(f);
0544 if(f & rf_clear_edges)clear_element<N>::clear_this(my_output_ports);
0545 if(f & rf_reset_bodies) {
0546 multifunction_body_type* tmp = my_init_body->clone();
0547 delete my_body;
0548 my_body = tmp;
0549 }
0550 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "multifunction_node reset failed");
0551 }
0552
0553 multifunction_body_type *my_body;
0554 multifunction_body_type *my_init_body;
0555 output_ports_type my_output_ports;
0556
0557 };
0558
0559
0560 template<size_t N, typename MOP>
0561 typename std::tuple_element<N, typename MOP::output_ports_type>::type &output_port(MOP &op) {
0562 return std::get<N>(op.output_ports());
0563 }
0564
0565 inline void check_task_and_spawn(graph& g, graph_task* t) {
0566 if (t && t != SUCCESSFULLY_ENQUEUED) {
0567 spawn_in_graph_arena(g, *t);
0568 }
0569 }
0570
0571
0572 template<int N>
0573 struct emit_element {
0574 template<typename T, typename P>
0575 static graph_task* emit_this(graph& g, const T &t, P &p) {
0576
0577 graph_task* last_task = std::get<N-1>(p).try_put_task(std::get<N-1>(t));
0578 check_task_and_spawn(g, last_task);
0579 return emit_element<N-1>::emit_this(g,t,p);
0580 }
0581 };
0582
0583 template<>
0584 struct emit_element<1> {
0585 template<typename T, typename P>
0586 static graph_task* emit_this(graph& g, const T &t, P &p) {
0587 graph_task* last_task = std::get<0>(p).try_put_task(std::get<0>(t));
0588 check_task_and_spawn(g, last_task);
0589 return SUCCESSFULLY_ENQUEUED;
0590 }
0591 };
0592
0593
0594 template< typename Output, typename Policy>
0595 class continue_input : public continue_receiver {
0596 public:
0597
0598
0599 typedef continue_msg input_type;
0600
0601
0602 typedef Output output_type;
0603 typedef function_body<input_type, output_type> function_body_type;
0604 typedef continue_input<output_type, Policy> class_type;
0605
0606 template< typename Body >
0607 continue_input( graph &g, Body& body, node_priority_t a_priority )
0608 : continue_receiver(0, a_priority)
0609 , my_graph_ref(g)
0610 , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0611 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
0612 { }
0613
0614 template< typename Body >
0615 continue_input( graph &g, int number_of_predecessors,
0616 Body& body, node_priority_t a_priority )
0617 : continue_receiver( number_of_predecessors, a_priority )
0618 , my_graph_ref(g)
0619 , my_body( new function_body_leaf< input_type, output_type, Body>(body) )
0620 , my_init_body( new function_body_leaf< input_type, output_type, Body>(body) )
0621 { }
0622
0623 continue_input( const continue_input& src ) : continue_receiver(src),
0624 my_graph_ref(src.my_graph_ref),
0625 my_body( src.my_init_body->clone() ),
0626 my_init_body( src.my_init_body->clone() ) {}
0627
0628 ~continue_input() {
0629 delete my_body;
0630 delete my_init_body;
0631 }
0632
0633 template< typename Body >
0634 Body copy_function_object() {
0635 function_body_type &body_ref = *my_body;
0636 return dynamic_cast< function_body_leaf<input_type, output_type, Body> & >(body_ref).get_body();
0637 }
0638
0639 void reset_receiver( reset_flags f) override {
0640 continue_receiver::reset_receiver(f);
0641 if(f & rf_reset_bodies) {
0642 function_body_type *tmp = my_init_body->clone();
0643 delete my_body;
0644 my_body = tmp;
0645 }
0646 }
0647
0648 protected:
0649
0650 graph& my_graph_ref;
0651 function_body_type *my_body;
0652 function_body_type *my_init_body;
0653
0654 virtual broadcast_cache<output_type > &successors() = 0;
0655
0656 friend class apply_body_task_bypass< class_type, continue_msg >;
0657
0658
0659 graph_task* apply_body_bypass( input_type ) {
0660
0661
0662 fgt_begin_body( my_body );
0663 output_type v = (*my_body)( continue_msg() );
0664 fgt_end_body( my_body );
0665 return successors().try_put_task( v );
0666 }
0667
0668 graph_task* execute() override {
0669 if(!is_graph_active(my_graph_ref)) {
0670 return nullptr;
0671 }
0672 #if _MSC_VER && !__INTEL_COMPILER
0673 #pragma warning (push)
0674 #pragma warning (disable: 4127)
0675 #endif
0676 if(has_policy<lightweight, Policy>::value) {
0677 #if _MSC_VER && !__INTEL_COMPILER
0678 #pragma warning (pop)
0679 #endif
0680 return apply_body_bypass( continue_msg() );
0681 }
0682 else {
0683 small_object_allocator allocator{};
0684 typedef apply_body_task_bypass<class_type, continue_msg> task_type;
0685 graph_task* t = allocator.new_object<task_type>( graph_reference(), allocator, *this, continue_msg(), my_priority );
0686 graph_reference().reserve_wait();
0687 return t;
0688 }
0689 }
0690
0691 graph& graph_reference() const override {
0692 return my_graph_ref;
0693 }
0694 };
0695
0696
0697 template< typename Output >
0698 class function_output : public sender<Output> {
0699 public:
0700
0701 template<int N> friend struct clear_element;
0702 typedef Output output_type;
0703 typedef typename sender<output_type>::successor_type successor_type;
0704 typedef broadcast_cache<output_type> broadcast_cache_type;
0705
0706 function_output(graph& g) : my_successors(this), my_graph_ref(g) {}
0707 function_output(const function_output& other) = delete;
0708
0709
0710 bool register_successor( successor_type &r ) override {
0711 successors().register_successor( r );
0712 return true;
0713 }
0714
0715
0716 bool remove_successor( successor_type &r ) override {
0717 successors().remove_successor( r );
0718 return true;
0719 }
0720
0721 broadcast_cache_type &successors() { return my_successors; }
0722
0723 graph& graph_reference() const { return my_graph_ref; }
0724 protected:
0725 broadcast_cache_type my_successors;
0726 graph& my_graph_ref;
0727 };
0728
0729 template< typename Output >
0730 class multifunction_output : public function_output<Output> {
0731 public:
0732 typedef Output output_type;
0733 typedef function_output<output_type> base_type;
0734 using base_type::my_successors;
0735
0736 multifunction_output(graph& g) : base_type(g) {}
0737 multifunction_output(const multifunction_output& other) : base_type(other.my_graph_ref) {}
0738
0739 bool try_put(const output_type &i) {
0740 graph_task *res = try_put_task(i);
0741 if( !res ) return false;
0742 if( res != SUCCESSFULLY_ENQUEUED ) {
0743
0744
0745 spawn_in_graph_arena(graph_reference(), *res);
0746 }
0747 return true;
0748 }
0749
0750 using base_type::graph_reference;
0751
0752 protected:
0753
0754 graph_task* try_put_task(const output_type &i) {
0755 return my_successors.try_put_task(i);
0756 }
0757
0758 template <int N> friend struct emit_element;
0759
0760 };
0761
0762
0763 template<typename CompositeType>
0764 void add_nodes_impl(CompositeType*, bool) {}
0765
0766 template< typename CompositeType, typename NodeType1, typename... NodeTypes >
0767 void add_nodes_impl(CompositeType *c_node, bool visible, const NodeType1& n1, const NodeTypes&... n) {
0768 void *addr = const_cast<NodeType1 *>(&n1);
0769
0770 fgt_alias_port(c_node, addr, visible);
0771 add_nodes_impl(c_node, visible, n...);
0772 }
0773
0774 #endif