Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-02-21 10:15:52

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_parallel_while
0018 #define __TBB_parallel_while
0019 
0020 #define __TBB_parallel_while_H_include_area
0021 #include "internal/_warning_suppress_enable_notice.h"
0022 
0023 #include "task.h"
0024 #include <new>
0025 
0026 namespace tbb {
0027 
0028 template<typename Body>
0029 class parallel_while;
0030 
0031 //! @cond INTERNAL
0032 namespace internal {
0033 
0034     template<typename Stream, typename Body> class while_task;
0035 
0036     //! For internal use only.
0037     /** Executes one iteration of a while.
0038         @ingroup algorithms */
0039     template<typename Body>
0040     class while_iteration_task: public task {
0041         const Body& my_body;
0042         typename Body::argument_type my_value;
0043         task* execute() __TBB_override {
0044             my_body(my_value);
0045             return NULL;
0046         }
0047         while_iteration_task( const typename Body::argument_type& value, const Body& body ) :
0048             my_body(body), my_value(value)
0049         {}
0050         template<typename Body_> friend class while_group_task;
0051         friend class tbb::parallel_while<Body>;
0052     };
0053 
0054     //! For internal use only
0055     /** Unpacks a block of iterations.
0056         @ingroup algorithms */
0057     template<typename Body>
0058     class while_group_task: public task {
0059         static const size_t max_arg_size = 4;
0060         const Body& my_body;
0061         size_t size;
0062         typename Body::argument_type my_arg[max_arg_size];
0063         while_group_task( const Body& body ) : my_body(body), size(0) {}
0064         task* execute() __TBB_override {
0065             typedef while_iteration_task<Body> iteration_type;
0066             __TBB_ASSERT( size>0, NULL );
0067             task_list list;
0068             task* t;
0069             size_t k=0;
0070             for(;;) {
0071                 t = new( allocate_child() ) iteration_type(my_arg[k],my_body);
0072                 if( ++k==size ) break;
0073                 list.push_back(*t);
0074             }
0075             set_ref_count(int(k+1));
0076             spawn(list);
0077             spawn_and_wait_for_all(*t);
0078             return NULL;
0079         }
0080         template<typename Stream, typename Body_> friend class while_task;
0081     };
0082 
0083     //! For internal use only.
0084     /** Gets block of iterations from a stream and packages them into a while_group_task.
0085         @ingroup algorithms */
0086     template<typename Stream, typename Body>
0087     class while_task: public task {
0088         Stream& my_stream;
0089         const Body& my_body;
0090         empty_task& my_barrier;
0091         task* execute() __TBB_override {
0092             typedef while_group_task<Body> block_type;
0093             block_type& t = *new( allocate_additional_child_of(my_barrier) ) block_type(my_body);
0094             size_t k=0;
0095             while( my_stream.pop_if_present(t.my_arg[k]) ) {
0096                 if( ++k==block_type::max_arg_size ) {
0097                     // There might be more iterations.
0098                     recycle_to_reexecute();
0099                     break;
0100                 }
0101             }
0102             if( k==0 ) {
0103                 destroy(t);
0104                 return NULL;
0105             } else {
0106                 t.size = k;
0107                 return &t;
0108             }
0109         }
0110         while_task( Stream& stream, const Body& body, empty_task& barrier ) :
0111             my_stream(stream),
0112             my_body(body),
0113             my_barrier(barrier)
0114         {}
0115         friend class tbb::parallel_while<Body>;
0116     };
0117 
0118 } // namespace internal
0119 //! @endcond
0120 
0121 //! Parallel iteration over a stream, with optional addition of more work.
0122 /** The Body b has the requirement: \n
0123         "b(v)"                      \n
0124         "b.argument_type"           \n
0125     where v is an argument_type
0126     @ingroup algorithms */
0127 template<typename Body>
0128 class parallel_while: internal::no_copy {
0129 public:
0130     //! Construct empty non-running parallel while.
0131     parallel_while() : my_body(NULL), my_barrier(NULL) {}
0132 
0133     //! Destructor cleans up data members before returning.
0134     ~parallel_while() {
0135         if( my_barrier ) {
0136             my_barrier->destroy(*my_barrier);
0137             my_barrier = NULL;
0138         }
0139     }
0140 
0141     //! Type of items
0142     typedef typename Body::argument_type value_type;
0143 
0144     //! Apply body.apply to each item in the stream.
0145     /** A Stream s has the requirements \n
0146          "S::value_type"                \n
0147          "s.pop_if_present(value) is convertible to bool */
0148     template<typename Stream>
0149     void run( Stream& stream, const Body& body );
0150 
0151     //! Add a work item while running.
0152     /** Should be executed only by body.apply or a thread spawned therefrom. */
0153     void add( const value_type& item );
0154 
0155 private:
0156     const Body* my_body;
0157     empty_task* my_barrier;
0158 };
0159 
0160 template<typename Body>
0161 template<typename Stream>
0162 void parallel_while<Body>::run( Stream& stream, const Body& body ) {
0163     using namespace internal;
0164     empty_task& barrier = *new( task::allocate_root() ) empty_task();
0165     my_body = &body;
0166     my_barrier = &barrier;
0167     my_barrier->set_ref_count(2);
0168     while_task<Stream,Body>& w = *new( my_barrier->allocate_child() ) while_task<Stream,Body>( stream, body, barrier );
0169     my_barrier->spawn_and_wait_for_all(w);
0170     my_barrier->destroy(*my_barrier);
0171     my_barrier = NULL;
0172     my_body = NULL;
0173 }
0174 
0175 template<typename Body>
0176 void parallel_while<Body>::add( const value_type& item ) {
0177     __TBB_ASSERT(my_barrier,"attempt to add to parallel_while that is not running");
0178     typedef internal::while_iteration_task<Body> iteration_type;
0179     iteration_type& i = *new( task::allocate_additional_child_of(*my_barrier) ) iteration_type(item,*my_body);
0180     task::self().spawn( i );
0181 }
0182 
0183 } // namespace
0184 
0185 #include "internal/_warning_suppress_disable_notice.h"
0186 #undef __TBB_parallel_while_H_include_area
0187 
0188 #endif /* __TBB_parallel_while */