File indexing completed on 2025-02-21 10:15:52
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 #ifndef __TBB_parallel_while
0018 #define __TBB_parallel_while
0019
0020 #define __TBB_parallel_while_H_include_area
0021 #include "internal/_warning_suppress_enable_notice.h"
0022
0023 #include "task.h"
0024 #include <new>
0025
0026 namespace tbb {
0027
0028 template<typename Body>
0029 class parallel_while;
0030
0031
0032 namespace internal {
0033
0034 template<typename Stream, typename Body> class while_task;
0035
0036
0037
0038
0039 template<typename Body>
0040 class while_iteration_task: public task {
0041 const Body& my_body;
0042 typename Body::argument_type my_value;
0043 task* execute() __TBB_override {
0044 my_body(my_value);
0045 return NULL;
0046 }
0047 while_iteration_task( const typename Body::argument_type& value, const Body& body ) :
0048 my_body(body), my_value(value)
0049 {}
0050 template<typename Body_> friend class while_group_task;
0051 friend class tbb::parallel_while<Body>;
0052 };
0053
0054
0055
0056
0057 template<typename Body>
0058 class while_group_task: public task {
0059 static const size_t max_arg_size = 4;
0060 const Body& my_body;
0061 size_t size;
0062 typename Body::argument_type my_arg[max_arg_size];
0063 while_group_task( const Body& body ) : my_body(body), size(0) {}
0064 task* execute() __TBB_override {
0065 typedef while_iteration_task<Body> iteration_type;
0066 __TBB_ASSERT( size>0, NULL );
0067 task_list list;
0068 task* t;
0069 size_t k=0;
0070 for(;;) {
0071 t = new( allocate_child() ) iteration_type(my_arg[k],my_body);
0072 if( ++k==size ) break;
0073 list.push_back(*t);
0074 }
0075 set_ref_count(int(k+1));
0076 spawn(list);
0077 spawn_and_wait_for_all(*t);
0078 return NULL;
0079 }
0080 template<typename Stream, typename Body_> friend class while_task;
0081 };
0082
0083
0084
0085
0086 template<typename Stream, typename Body>
0087 class while_task: public task {
0088 Stream& my_stream;
0089 const Body& my_body;
0090 empty_task& my_barrier;
0091 task* execute() __TBB_override {
0092 typedef while_group_task<Body> block_type;
0093 block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
0094 size_t k=0;
0095 while( my_stream.pop_if_present(t.my_arg[k]) ) {
0096 if( ++k==block_type::max_arg_size ) {
0097
0098 recycle_to_reexecute();
0099 break;
0100 }
0101 }
0102 if( k==0 ) {
0103 destroy(t);
0104 return NULL;
0105 } else {
0106 t.size = k;
0107 return &t;
0108 }
0109 }
0110 while_task( Stream& stream, const Body& body, empty_task& barrier ) :
0111 my_stream(stream),
0112 my_body(body),
0113 my_barrier(barrier)
0114 {}
0115 friend class tbb::parallel_while<Body>;
0116 };
0117
0118 }
0119
0120
0121
0122
0123
0124
0125
0126
0127 template<typename Body>
0128 class parallel_while: internal::no_copy {
0129 public:
0130
0131 parallel_while() : my_body(NULL), my_barrier(NULL) {}
0132
0133
0134 ~parallel_while() {
0135 if( my_barrier ) {
0136 my_barrier->destroy(*my_barrier);
0137 my_barrier = NULL;
0138 }
0139 }
0140
0141
0142 typedef typename Body::argument_type value_type;
0143
0144
0145
0146
0147
0148 template<typename Stream>
0149 void run( Stream& stream, const Body& body );
0150
0151
0152
0153 void add( const value_type& item );
0154
0155 private:
0156 const Body* my_body;
0157 empty_task* my_barrier;
0158 };
0159
0160 template<typename Body>
0161 template<typename Stream>
0162 void parallel_while<Body>::run( Stream& stream, const Body& body ) {
0163 using namespace internal;
0164 empty_task& barrier = *new( task::allocate_root() ) empty_task();
0165 my_body = &body;
0166 my_barrier = &barrier;
0167 my_barrier->set_ref_count(2);
0168 while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
0169 my_barrier->spawn_and_wait_for_all(w);
0170 my_barrier->destroy(*my_barrier);
0171 my_barrier = NULL;
0172 my_body = NULL;
0173 }
0174
0175 template<typename Body>
0176 void parallel_while<Body>::add( const value_type& item ) {
0177 __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
0178 typedef internal::while_iteration_task<Body> iteration_type;
0179 iteration_type& i = *new( task::allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
0180 task::self().spawn( i );
0181 }
0182
0183 }
0184
0185 #include "internal/_warning_suppress_disable_notice.h"
0186 #undef __TBB_parallel_while_H_include_area
0187
0188 #endif