File indexing completed on 2025-01-18 10:12:59
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_pipeline_H
0018 #define __TBB_pipeline_H
0019
0020 #define __TBB_pipeline_H_include_area
0021 #include "internal/_warning_suppress_enable_notice.h"
0022
0023 #include "atomic.h"
0024 #include "task.h"
0025 #include "tbb_allocator.h"
0026 #include <cstddef>
0027
0028 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT
0029 #include <type_traits>
0030 #endif
0031
0032 namespace tbb {
0033
0034 class pipeline;
0035 class filter;
0036
0037
0038 namespace internal {
0039
0040
0041 #define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1)
0042
0043 typedef unsigned long Token;
0044 typedef long tokendiff_t;
0045 class stage_task;
0046 class input_buffer;
0047 class pipeline_root_task;
0048 class pipeline_cleaner;
0049
0050 }
0051
0052 namespace interface6 {
0053 template<typename T, typename U> class filter_t;
0054
0055 namespace internal {
0056 class pipeline_proxy;
0057 }
0058 }
0059
0060
0061
0062
0063
0064 class filter: internal::no_copy {
0065 private:
0066
0067 static filter* not_in_pipeline() { return reinterpret_cast<filter*>(intptr_t(-1)); }
0068 protected:
0069
0070 static const unsigned char filter_is_serial = 0x1;
0071
0072
0073
0074
0075 static const unsigned char filter_is_out_of_order = 0x1<<4;
0076
0077
0078 static const unsigned char filter_is_bound = 0x1<<5;
0079
0080
0081 static const unsigned char filter_may_emit_null = 0x1<<6;
0082
0083
0084 static const unsigned char exact_exception_propagation =
0085 #if TBB_USE_CAPTURED_EXCEPTION
0086 0x0;
0087 #else
0088 0x1<<7;
0089 #endif
0090
0091 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
0092 static const unsigned char version_mask = 0x7<<1;
0093 public:
0094 enum mode {
0095
0096 parallel = current_version | filter_is_out_of_order,
0097
0098 serial_in_order = current_version | filter_is_serial,
0099
0100 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
0101
0102 serial = serial_in_order
0103 };
0104 protected:
0105 explicit filter( bool is_serial_ ) :
0106 next_filter_in_pipeline(not_in_pipeline()),
0107 my_input_buffer(NULL),
0108 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
0109 prev_filter_in_pipeline(not_in_pipeline()),
0110 my_pipeline(NULL),
0111 next_segment(NULL)
0112 {}
0113
0114 explicit filter( mode filter_mode ) :
0115 next_filter_in_pipeline(not_in_pipeline()),
0116 my_input_buffer(NULL),
0117 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
0118 prev_filter_in_pipeline(not_in_pipeline()),
0119 my_pipeline(NULL),
0120 next_segment(NULL)
0121 {}
0122
0123
0124 void __TBB_EXPORTED_METHOD set_end_of_input();
0125
0126 public:
0127
0128 bool is_serial() const {
0129 return bool( my_filter_mode & filter_is_serial );
0130 }
0131
0132
0133 bool is_ordered() const {
0134 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
0135 }
0136
0137
0138 bool is_bound() const {
0139 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
0140 }
0141
0142
0143 bool object_may_be_null() {
0144 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
0145 }
0146
0147
0148
0149 virtual void* operator()( void* item ) = 0;
0150
0151
0152
0153 virtual __TBB_EXPORTED_METHOD ~filter();
0154
0155 #if __TBB_TASK_GROUP_CONTEXT
0156
0157
0158
0159 virtual void finalize( void* ) {}
0160 #endif
0161
0162 private:
0163
0164 filter* next_filter_in_pipeline;
0165
0166
0167
0168
0169 bool has_more_work();
0170
0171
0172
0173 internal::input_buffer* my_input_buffer;
0174
0175 friend class internal::stage_task;
0176 friend class internal::pipeline_root_task;
0177 friend class pipeline;
0178 friend class thread_bound_filter;
0179
0180
0181 const unsigned char my_filter_mode;
0182
0183
0184 filter* prev_filter_in_pipeline;
0185
0186
0187 pipeline* my_pipeline;
0188
0189
0190
0191 filter* next_segment;
0192 };
0193
0194
0195
0196 class thread_bound_filter: public filter {
0197 public:
0198 enum result_type {
0199
0200 success,
0201
0202 item_not_available,
0203
0204 end_of_stream
0205 };
0206 protected:
0207 explicit thread_bound_filter(mode filter_mode):
0208 filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
0209 {
0210 __TBB_ASSERT(filter_mode & filter::filter_is_serial, "thread-bound filters must be serial");
0211 }
0212 public:
0213
0214
0215
0216
0217
0218
0219 result_type __TBB_EXPORTED_METHOD try_process_item();
0220
0221
0222
0223
0224
0225
0226 result_type __TBB_EXPORTED_METHOD process_item();
0227
0228 private:
0229
0230 result_type internal_process_item(bool is_blocking);
0231 };
0232
0233
0234
0235 class __TBB_DEPRECATED_MSG("tbb::pipeline is deprecated, use tbb::parallel_pipeline") pipeline {
0236 public:
0237
0238 __TBB_EXPORTED_METHOD pipeline();
0239
0240
0241
0242 virtual __TBB_EXPORTED_METHOD ~pipeline();
0243
0244
0245 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
0246
0247
0248 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
0249
0250 #if __TBB_TASK_GROUP_CONTEXT
0251
0252 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
0253 #endif
0254
0255
0256 void __TBB_EXPORTED_METHOD clear();
0257
0258 private:
0259 friend class internal::stage_task;
0260 friend class internal::pipeline_root_task;
0261 friend class filter;
0262 friend class thread_bound_filter;
0263 friend class internal::pipeline_cleaner;
0264 friend class tbb::interface6::internal::pipeline_proxy;
0265
0266
0267 filter* filter_list;
0268
0269
0270 filter* filter_end;
0271
0272
0273 task* end_counter;
0274
0275
0276 atomic<internal::Token> input_tokens;
0277
0278
0279 atomic<internal::Token> token_counter;
0280
0281
0282 bool end_of_input;
0283
0284
0285 bool has_thread_bound_filters;
0286
0287
0288 void remove_filter( filter& filter_ );
0289
0290
0291 void __TBB_EXPORTED_METHOD inject_token( task& self );
0292
0293 #if __TBB_TASK_GROUP_CONTEXT
0294
0295 void clear_filters();
0296 #endif
0297 };
0298
0299
0300
0301
0302
0303 namespace flow {
0304 namespace interface11 {
0305 template<typename Output> class input_node;
0306 }
0307 }
0308
0309 namespace interface6 {
0310
0311 namespace internal {
0312 template<typename T, typename U, typename Body> class concrete_filter;
0313 }
0314
0315
0316 class flow_control {
0317 bool is_pipeline_stopped;
0318 flow_control() { is_pipeline_stopped = false; }
0319 template<typename T, typename U, typename Body> friend class internal::concrete_filter;
0320 template<typename Output> friend class flow::interface11::input_node;
0321 public:
0322 void stop() { is_pipeline_stopped = true; }
0323 };
0324
0325
0326 namespace internal {
0327
0328
0329 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT
0330 template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
0331 #else
0332 template<typename T> struct tbb_trivially_copyable { enum { value = false }; };
0333 template<typename T> struct tbb_trivially_copyable < T* > { enum { value = true }; };
0334 template<> struct tbb_trivially_copyable < bool > { enum { value = true }; };
0335 template<> struct tbb_trivially_copyable < char > { enum { value = true }; };
0336 template<> struct tbb_trivially_copyable < signed char > { enum { value = true }; };
0337 template<> struct tbb_trivially_copyable <unsigned char > { enum { value = true }; };
0338 template<> struct tbb_trivially_copyable < short > { enum { value = true }; };
0339 template<> struct tbb_trivially_copyable <unsigned short > { enum { value = true }; };
0340 template<> struct tbb_trivially_copyable < int > { enum { value = true }; };
0341 template<> struct tbb_trivially_copyable <unsigned int > { enum { value = true }; };
0342 template<> struct tbb_trivially_copyable < long > { enum { value = true }; };
0343 template<> struct tbb_trivially_copyable <unsigned long > { enum { value = true }; };
0344 template<> struct tbb_trivially_copyable < long long> { enum { value = true }; };
0345 template<> struct tbb_trivially_copyable <unsigned long long> { enum { value = true }; };
0346 template<> struct tbb_trivially_copyable < float > { enum { value = true }; };
0347 template<> struct tbb_trivially_copyable < double > { enum { value = true }; };
0348 template<> struct tbb_trivially_copyable < long double > { enum { value = true }; };
0349 #if !_MSC_VER || defined(_NATIVE_WCHAR_T_DEFINED)
0350 template<> struct tbb_trivially_copyable < wchar_t > { enum { value = true }; };
0351 #endif
0352 #endif
0353
0354 template<typename T>
0355 struct use_allocator {
0356 enum { value = sizeof(T) > sizeof(void *) || !tbb_trivially_copyable<T>::value };
0357 };
0358
0359
0360
0361 template<typename T, bool Allocate> class token_helper;
0362
0363
0364 template<typename T>
0365 class token_helper<T, true> {
0366 public:
0367 typedef typename tbb::tbb_allocator<T> allocator;
0368 typedef T* pointer;
0369 typedef T value_type;
0370 #if __TBB_CPP11_RVALUE_REF_PRESENT
0371 static pointer create_token(value_type && source)
0372 #else
0373 static pointer create_token(const value_type & source)
0374 #endif
0375 {
0376 pointer output_t = allocator().allocate(1);
0377 return new (output_t) T(tbb::internal::move(source));
0378 }
0379 static value_type & token(pointer & t) { return *t; }
0380 static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
0381 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
0382 static void destroy_token(pointer token) {
0383 allocator().destroy(token);
0384 allocator().deallocate(token,1);
0385 }
0386 };
0387
0388
0389 template<typename T>
0390 class token_helper<T*, false> {
0391 public:
0392 typedef T* pointer;
0393 typedef T* value_type;
0394 static pointer create_token(const value_type & source) { return source; }
0395 static value_type & token(pointer & t) { return t; }
0396 static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
0397 static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
0398 static void destroy_token( pointer ) {}
0399 };
0400
0401
0402 template<typename T>
0403 class token_helper<T, false> {
0404 typedef union {
0405 T actual_value;
0406 void * void_overlay;
0407 } type_to_void_ptr_map;
0408 public:
0409 typedef T pointer;
0410 typedef T value_type;
0411 static pointer create_token(const value_type & source) { return source; }
0412 static value_type & token(pointer & t) { return t; }
0413 static void * cast_to_void_ptr(pointer ref) {
0414 type_to_void_ptr_map mymap;
0415 mymap.void_overlay = NULL;
0416 mymap.actual_value = ref;
0417 return mymap.void_overlay;
0418 }
0419 static pointer cast_from_void_ptr(void * ref) {
0420 type_to_void_ptr_map mymap;
0421 mymap.void_overlay = ref;
0422 return mymap.actual_value;
0423 }
0424 static void destroy_token( pointer ) {}
0425 };
0426
0427
0428 template<typename T, typename U, typename Body>
0429 class concrete_filter: public tbb::filter {
0430 const Body& my_body;
0431 typedef token_helper<T,use_allocator<T>::value> t_helper;
0432 typedef typename t_helper::pointer t_pointer;
0433 typedef token_helper<U,use_allocator<U>::value> u_helper;
0434 typedef typename u_helper::pointer u_pointer;
0435
0436 void* operator()(void* input) __TBB_override {
0437 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
0438 u_pointer output_u = u_helper::create_token(my_body(tbb::internal::move(t_helper::token(temp_input))));
0439 t_helper::destroy_token(temp_input);
0440 return u_helper::cast_to_void_ptr(output_u);
0441 }
0442
0443 void finalize(void * input) __TBB_override {
0444 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
0445 t_helper::destroy_token(temp_input);
0446 }
0447
0448 public:
0449 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
0450 };
0451
0452
0453 template<typename U, typename Body>
0454 class concrete_filter<void,U,Body>: public filter {
0455 const Body& my_body;
0456 typedef token_helper<U, use_allocator<U>::value> u_helper;
0457 typedef typename u_helper::pointer u_pointer;
0458
0459 void* operator()(void*) __TBB_override {
0460 flow_control control;
0461 u_pointer output_u = u_helper::create_token(my_body(control));
0462 if(control.is_pipeline_stopped) {
0463 u_helper::destroy_token(output_u);
0464 set_end_of_input();
0465 return NULL;
0466 }
0467 return u_helper::cast_to_void_ptr(output_u);
0468 }
0469
0470 public:
0471 concrete_filter(tbb::filter::mode filter_mode, const Body& body) :
0472 filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
0473 my_body(body)
0474 {}
0475 };
0476
0477
0478 template<typename T, typename Body>
0479 class concrete_filter<T,void,Body>: public filter {
0480 const Body& my_body;
0481 typedef token_helper<T, use_allocator<T>::value> t_helper;
0482 typedef typename t_helper::pointer t_pointer;
0483
0484 void* operator()(void* input) __TBB_override {
0485 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
0486 my_body(tbb::internal::move(t_helper::token(temp_input)));
0487 t_helper::destroy_token(temp_input);
0488 return NULL;
0489 }
0490 void finalize(void* input) __TBB_override {
0491 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
0492 t_helper::destroy_token(temp_input);
0493 }
0494
0495 public:
0496 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
0497 };
0498
0499 template<typename Body>
0500 class concrete_filter<void,void,Body>: public filter {
0501 const Body& my_body;
0502
0503 void* operator()(void*) __TBB_override {
0504 flow_control control;
0505 my_body(control);
0506 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
0507 return output;
0508 }
0509 public:
0510 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
0511 };
0512
0513
0514
0515 class pipeline_proxy {
0516 tbb::pipeline my_pipe;
0517 public:
0518 pipeline_proxy( const filter_t<void,void>& filter_chain );
0519 ~pipeline_proxy() {
0520 while( filter* f = my_pipe.filter_list )
0521 delete f;
0522 }
0523 tbb::pipeline* operator->() { return &my_pipe; }
0524 };
0525
0526
0527
0528 class filter_node: tbb::internal::no_copy {
0529
0530 tbb::atomic<intptr_t> ref_count;
0531 protected:
0532 filter_node() {
0533 ref_count = 0;
0534 #ifdef __TBB_TEST_FILTER_NODE_COUNT
0535 ++(__TBB_TEST_FILTER_NODE_COUNT);
0536 #endif
0537 }
0538 public:
0539
0540 virtual void add_to( pipeline& ) = 0;
0541
0542 void add_ref() { ++ref_count; }
0543
0544 void remove_ref() {
0545 __TBB_ASSERT(ref_count>0,"ref_count underflow");
0546 if( --ref_count==0 )
0547 delete this;
0548 }
0549 virtual ~filter_node() {
0550 #ifdef __TBB_TEST_FILTER_NODE_COUNT
0551 --(__TBB_TEST_FILTER_NODE_COUNT);
0552 #endif
0553 }
0554 };
0555
0556
0557 template<typename T, typename U, typename Body>
0558 class filter_node_leaf: public filter_node {
0559 const tbb::filter::mode mode;
0560 const Body body;
0561 void add_to( pipeline& p ) __TBB_override {
0562 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
0563 p.add_filter( *f );
0564 }
0565 public:
0566 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
0567 };
0568
0569
0570 class filter_node_join: public filter_node {
0571 friend class filter_node;
0572 filter_node& left;
0573 filter_node& right;
0574 ~filter_node_join() {
0575 left.remove_ref();
0576 right.remove_ref();
0577 }
0578 void add_to( pipeline& p ) __TBB_override {
0579 left.add_to(p);
0580 right.add_to(p);
0581 }
0582 public:
0583 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
0584 left.add_ref();
0585 right.add_ref();
0586 }
0587 };
0588
0589 }
0590
0591
0592
0593 template<typename T, typename U, typename Body>
0594 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
0595 return new internal::filter_node_leaf<T,U,Body>(mode, body);
0596 }
0597
0598 template<typename T, typename V, typename U>
0599 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
0600 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
0601 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
0602 return new internal::filter_node_join(*left.root,*right.root);
0603 }
0604
0605
0606 template<typename T, typename U>
0607 class filter_t {
0608 typedef internal::filter_node filter_node;
0609 filter_node* root;
0610 filter_t( filter_node* root_ ) : root(root_) {
0611 root->add_ref();
0612 }
0613 friend class internal::pipeline_proxy;
0614 template<typename T_, typename U_, typename Body>
0615 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
0616 template<typename T_, typename V_, typename U_>
0617 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
0618 public:
0619
0620 filter_t() : root(NULL) {}
0621 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
0622 if( root ) root->add_ref();
0623 }
0624 template<typename Body>
0625 filter_t( tbb::filter::mode mode, const Body& body ) :
0626 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
0627 root->add_ref();
0628 }
0629
0630 void operator=( const filter_t<T,U>& rhs ) {
0631
0632
0633 filter_node* old = root;
0634 root = rhs.root;
0635 if( root ) root->add_ref();
0636 if( old ) old->remove_ref();
0637 }
0638 ~filter_t() {
0639 if( root ) root->remove_ref();
0640 }
0641 void clear() {
0642
0643 if( root ) {
0644 filter_node* old = root;
0645 root = NULL;
0646 old->remove_ref();
0647 }
0648 }
0649 };
0650
0651 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
0652 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
0653 filter_chain.root->add_to(my_pipe);
0654 }
0655
0656 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
0657 #if __TBB_TASK_GROUP_CONTEXT
0658 , tbb::task_group_context& context
0659 #endif
0660 ) {
0661 internal::pipeline_proxy pipe(filter_chain);
0662
0663 pipe->run(max_number_of_live_tokens
0664 #if __TBB_TASK_GROUP_CONTEXT
0665 , context
0666 #endif
0667 );
0668 }
0669
0670 #if __TBB_TASK_GROUP_CONTEXT
0671 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
0672 tbb::task_group_context context;
0673 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
0674 }
0675 #endif
0676
0677 }
0678
0679 using interface6::flow_control;
0680 using interface6::filter_t;
0681 using interface6::make_filter;
0682 using interface6::parallel_pipeline;
0683
0684 }
0685
0686 #include "internal/_warning_suppress_disable_notice.h"
0687 #undef __TBB_pipeline_H_include_area
0688
0689 #endif