Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:59

0001 /*
0002     Copyright (c) 2005-2020 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB_pipeline_H
0018 #define __TBB_pipeline_H
0019 
0020 #define __TBB_pipeline_H_include_area
0021 #include "internal/_warning_suppress_enable_notice.h"
0022 
0023 #include "atomic.h"
0024 #include "task.h"
0025 #include "tbb_allocator.h"
0026 #include <cstddef>
0027 
0028 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT
0029 #include <type_traits>
0030 #endif
0031 
0032 namespace tbb {
0033 
0034 class pipeline;
0035 class filter;
0036 
0037 //! @cond INTERNAL
0038 namespace internal {
0039 
0040 // The argument for PIPELINE_VERSION should be an integer between 2 and 9
0041 #define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1)
0042 
0043 typedef unsigned long Token;
0044 typedef long tokendiff_t;
0045 class stage_task;
0046 class input_buffer;
0047 class pipeline_root_task;
0048 class pipeline_cleaner;
0049 
0050 } // namespace internal
0051 
0052 namespace interface6 {
0053     template<typename T, typename U> class filter_t;
0054 
0055     namespace internal {
0056         class pipeline_proxy;
0057     }
0058 }
0059 
0060 //! @endcond
0061 
0062 //! A stage in a pipeline.
0063 /** @ingroup algorithms */
0064 class filter: internal::no_copy {
0065 private:
0066     //! Value used to mark "not in pipeline"
0067     static filter* not_in_pipeline() { return reinterpret_cast<filter*>(intptr_t(-1)); }
0068 protected:
0069     //! The lowest bit 0 is for parallel vs. serial
0070     static const unsigned char filter_is_serial = 0x1;
0071 
0072     //! 4th bit distinguishes ordered vs unordered filters.
0073     /** The bit was not set for parallel filters in TBB 2.1 and earlier,
0074         but is_ordered() function always treats parallel filters as out of order. */
0075     static const unsigned char filter_is_out_of_order = 0x1<<4;
0076 
0077     //! 5th bit distinguishes thread-bound and regular filters.
0078     static const unsigned char filter_is_bound = 0x1<<5;
0079 
0080     //! 6th bit marks input filters emitting small objects
0081     static const unsigned char filter_may_emit_null = 0x1<<6;
0082 
0083     //! 7th bit defines exception propagation mode expected by the application.
0084     static const unsigned char exact_exception_propagation =
0085 #if TBB_USE_CAPTURED_EXCEPTION
0086             0x0;
0087 #else
0088             0x1<<7;
0089 #endif /* TBB_USE_CAPTURED_EXCEPTION */
0090 
0091     static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
0092     static const unsigned char version_mask = 0x7<<1; // bits 1-3 are for version
0093 public:
0094     enum mode {
0095         //! processes multiple items in parallel and in no particular order
0096         parallel = current_version | filter_is_out_of_order,
0097         //! processes items one at a time; all such filters process items in the same order
0098         serial_in_order = current_version | filter_is_serial,
0099         //! processes items one at a time and in no particular order
0100         serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
0101         //! @deprecated use serial_in_order instead
0102         serial = serial_in_order
0103     };
0104 protected:
0105     explicit filter( bool is_serial_ ) :
0106         next_filter_in_pipeline(not_in_pipeline()),
0107         my_input_buffer(NULL),
0108         my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
0109         prev_filter_in_pipeline(not_in_pipeline()),
0110         my_pipeline(NULL),
0111         next_segment(NULL)
0112     {}
0113 
0114     explicit filter( mode filter_mode ) :
0115         next_filter_in_pipeline(not_in_pipeline()),
0116         my_input_buffer(NULL),
0117         my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
0118         prev_filter_in_pipeline(not_in_pipeline()),
0119         my_pipeline(NULL),
0120         next_segment(NULL)
0121     {}
0122 
0123     // signal end-of-input for concrete_filters
0124     void __TBB_EXPORTED_METHOD set_end_of_input();
0125 
0126 public:
0127     //! True if filter is serial.
0128     bool is_serial() const {
0129         return bool( my_filter_mode & filter_is_serial );
0130     }
0131 
0132     //! True if filter must receive stream in order.
0133     bool is_ordered() const {
0134         return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
0135     }
0136 
0137     //! True if filter is thread-bound.
0138     bool is_bound() const {
0139         return ( my_filter_mode & filter_is_bound )==filter_is_bound;
0140     }
0141 
0142     //! true if an input filter can emit null
0143     bool object_may_be_null() {
0144         return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
0145     }
0146 
0147     //! Operate on an item from the input stream, and return item for output stream.
0148     /** Returns NULL if filter is a sink. */
0149     virtual void* operator()( void* item ) = 0;
0150 
0151     //! Destroy filter.
0152     /** If the filter was added to a pipeline, the pipeline must be destroyed first. */
0153     virtual __TBB_EXPORTED_METHOD ~filter();
0154 
0155 #if __TBB_TASK_GROUP_CONTEXT
0156     //! Destroys item if pipeline was cancelled.
0157     /** Required to prevent memory leaks.
0158         Note it can be called concurrently even for serial filters.*/
0159     virtual void finalize( void* /*item*/ ) {}
0160 #endif
0161 
0162 private:
0163     //! Pointer to next filter in the pipeline.
0164     filter* next_filter_in_pipeline;
0165 
0166     //! has the filter not yet processed all the tokens it will ever see?
0167     //  (pipeline has not yet reached end_of_input or this filter has not yet
0168     //  seen the last token produced by input_filter)
0169     bool has_more_work();
0170 
0171     //! Buffer for incoming tokens, or NULL if not required.
0172     /** The buffer is required if the filter is serial or follows a thread-bound one. */
0173     internal::input_buffer* my_input_buffer;
0174 
0175     friend class internal::stage_task;
0176     friend class internal::pipeline_root_task;
0177     friend class pipeline;
0178     friend class thread_bound_filter;
0179 
0180     //! Storage for filter mode and dynamically checked implementation version.
0181     const unsigned char my_filter_mode;
0182 
0183     //! Pointer to previous filter in the pipeline.
0184     filter* prev_filter_in_pipeline;
0185 
0186     //! Pointer to the pipeline.
0187     pipeline* my_pipeline;
0188 
0189     //! Pointer to the next "segment" of filters, or NULL if not required.
0190     /** In each segment, the first filter is not thread-bound but follows a thread-bound one. */
0191     filter* next_segment;
0192 };
0193 
0194 //! A stage in a pipeline served by a user thread.
0195 /** @ingroup algorithms */
0196 class thread_bound_filter: public filter {
0197 public:
0198     enum result_type {
0199         // item was processed
0200         success,
0201         // item is currently not available
0202         item_not_available,
0203         // there are no more items to process
0204         end_of_stream
0205     };
0206 protected:
0207     explicit thread_bound_filter(mode filter_mode):
0208          filter(static_cast<mode>(filter_mode | filter::filter_is_bound))
0209     {
0210         __TBB_ASSERT(filter_mode & filter::filter_is_serial, "thread-bound filters must be serial");
0211     }
0212 public:
0213     //! If a data item is available, invoke operator() on that item.
0214     /** This interface is non-blocking.
0215         Returns 'success' if an item was processed.
0216         Returns 'item_not_available' if no item can be processed now
0217         but more may arrive in the future, or if token limit is reached.
0218         Returns 'end_of_stream' if there are no more items to process. */
0219     result_type __TBB_EXPORTED_METHOD try_process_item();
0220 
0221     //! Wait until a data item becomes available, and invoke operator() on that item.
0222     /** This interface is blocking.
0223         Returns 'success' if an item was processed.
0224         Returns 'end_of_stream' if there are no more items to process.
0225         Never returns 'item_not_available', as it blocks until another return condition applies. */
0226     result_type __TBB_EXPORTED_METHOD process_item();
0227 
0228 private:
0229     //! Internal routine for item processing
0230     result_type internal_process_item(bool is_blocking);
0231 };
0232 
0233 //! A processing pipeline that applies filters to items.
0234 /** @ingroup algorithms */
0235 class __TBB_DEPRECATED_MSG("tbb::pipeline is deprecated, use tbb::parallel_pipeline") pipeline {
0236 public:
0237     //! Construct empty pipeline.
0238     __TBB_EXPORTED_METHOD pipeline();
0239 
0240     /** Though the current implementation declares the destructor virtual, do not rely on this
0241         detail.  The virtualness is deprecated and may disappear in future versions of TBB. */
0242     virtual __TBB_EXPORTED_METHOD ~pipeline();
0243 
0244     //! Add filter to end of pipeline.
0245     void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
0246 
0247     //! Run the pipeline to completion.
0248     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
0249 
0250 #if __TBB_TASK_GROUP_CONTEXT
0251     //! Run the pipeline to completion with user-supplied context.
0252     void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
0253 #endif
0254 
0255     //! Remove all filters from the pipeline.
0256     void __TBB_EXPORTED_METHOD clear();
0257 
0258 private:
0259     friend class internal::stage_task;
0260     friend class internal::pipeline_root_task;
0261     friend class filter;
0262     friend class thread_bound_filter;
0263     friend class internal::pipeline_cleaner;
0264     friend class tbb::interface6::internal::pipeline_proxy;
0265 
0266     //! Pointer to first filter in the pipeline.
0267     filter* filter_list;
0268 
0269     //! Pointer to location where address of next filter to be added should be stored.
0270     filter* filter_end;
0271 
0272     //! task who's reference count is used to determine when all stages are done.
0273     task* end_counter;
0274 
0275     //! Number of idle tokens waiting for input stage.
0276     atomic<internal::Token> input_tokens;
0277 
0278     //! Global counter of tokens
0279     atomic<internal::Token> token_counter;
0280 
0281     //! False until fetch_input returns NULL.
0282     bool end_of_input;
0283 
0284     //! True if the pipeline contains a thread-bound filter; false otherwise.
0285     bool has_thread_bound_filters;
0286 
0287     //! Remove filter from pipeline.
0288     void remove_filter( filter& filter_ );
0289 
0290     //! Not used, but retained to satisfy old export files.
0291     void __TBB_EXPORTED_METHOD inject_token( task& self );
0292 
0293 #if __TBB_TASK_GROUP_CONTEXT
0294     //! Does clean up if pipeline is cancelled or exception occurred
0295     void clear_filters();
0296 #endif
0297 };
0298 
0299 //------------------------------------------------------------------------
0300 // Support for lambda-friendly parallel_pipeline interface
0301 //------------------------------------------------------------------------
0302 
0303 namespace flow {
0304 namespace interface11 {
0305     template<typename Output> class input_node;
0306 }
0307 }
0308 
0309 namespace interface6 {
0310 
0311 namespace internal {
0312     template<typename T, typename U, typename Body> class concrete_filter;
0313 }
0314 
0315 //! input_filter control to signal end-of-input for parallel_pipeline
0316 class flow_control {
0317     bool is_pipeline_stopped;
0318     flow_control() { is_pipeline_stopped = false; }
0319     template<typename T, typename U, typename Body> friend class internal::concrete_filter;
0320     template<typename Output> friend class flow::interface11::input_node;
0321 public:
0322     void stop() { is_pipeline_stopped = true; }
0323 };
0324 
0325 //! @cond INTERNAL
0326 namespace internal {
0327 
0328 // Emulate std::is_trivially_copyable (false positives not allowed, false negatives suboptimal but safe).
0329 #if   __TBB_CPP11_TYPE_PROPERTIES_PRESENT
0330 template<typename T> struct tbb_trivially_copyable { enum { value = std::is_trivially_copyable<T>::value }; };
0331 #else
0332 template<typename T> struct tbb_trivially_copyable                      { enum { value = false }; };
0333 template<typename T> struct tbb_trivially_copyable <         T*       > { enum { value = true  }; };
0334 template<>           struct tbb_trivially_copyable <         bool     > { enum { value = true  }; };
0335 template<>           struct tbb_trivially_copyable <         char     > { enum { value = true  }; };
0336 template<>           struct tbb_trivially_copyable <  signed char     > { enum { value = true  }; };
0337 template<>           struct tbb_trivially_copyable <unsigned char     > { enum { value = true  }; };
0338 template<>           struct tbb_trivially_copyable <         short    > { enum { value = true  }; };
0339 template<>           struct tbb_trivially_copyable <unsigned short    > { enum { value = true  }; };
0340 template<>           struct tbb_trivially_copyable <         int      > { enum { value = true  }; };
0341 template<>           struct tbb_trivially_copyable <unsigned int      > { enum { value = true  }; };
0342 template<>           struct tbb_trivially_copyable <         long     > { enum { value = true  }; };
0343 template<>           struct tbb_trivially_copyable <unsigned long     > { enum { value = true  }; };
0344 template<>           struct tbb_trivially_copyable <         long long> { enum { value = true  }; };
0345 template<>           struct tbb_trivially_copyable <unsigned long long> { enum { value = true  }; };
0346 template<>           struct tbb_trivially_copyable <         float    > { enum { value = true  }; };
0347 template<>           struct tbb_trivially_copyable <         double   > { enum { value = true  }; };
0348 template<>           struct tbb_trivially_copyable <    long double   > { enum { value = true  }; };
0349 #if !_MSC_VER || defined(_NATIVE_WCHAR_T_DEFINED)
0350 template<>           struct tbb_trivially_copyable <         wchar_t  > { enum { value = true  }; };
0351 #endif /* _MSC_VER||!defined(_NATIVE_WCHAR_T_DEFINED) */
0352 #endif // tbb_trivially_copyable
0353 
0354 template<typename T>
0355 struct use_allocator {
0356     enum { value = sizeof(T) > sizeof(void *) || !tbb_trivially_copyable<T>::value };
0357 };
0358 
0359 // A helper class to customize how a type is passed between filters.
0360 // Usage: token_helper<T, use_allocator<T>::value>
0361 template<typename T, bool Allocate> class token_helper;
0362 
0363 // using tbb_allocator
0364 template<typename T>
0365 class token_helper<T, true> {
0366 public:
0367     typedef typename tbb::tbb_allocator<T> allocator;
0368     typedef T* pointer;
0369     typedef T value_type;
0370 #if __TBB_CPP11_RVALUE_REF_PRESENT
0371     static pointer create_token(value_type && source)
0372 #else
0373     static pointer create_token(const value_type & source)
0374 #endif
0375     {
0376         pointer output_t = allocator().allocate(1);
0377         return new (output_t) T(tbb::internal::move(source));
0378     }
0379     static value_type & token(pointer & t) { return *t; }
0380     static void * cast_to_void_ptr(pointer ref) { return (void *) ref; }
0381     static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
0382     static void destroy_token(pointer token) {
0383         allocator().destroy(token);
0384         allocator().deallocate(token,1);
0385     }
0386 };
0387 
0388 // pointer specialization
0389 template<typename T>
0390 class token_helper<T*, false> {
0391 public:
0392     typedef T* pointer;
0393     typedef T* value_type;
0394     static pointer create_token(const value_type & source) { return source; }
0395     static value_type & token(pointer & t) { return t; }
0396     static void * cast_to_void_ptr(pointer ref) { return (void *)ref; }
0397     static pointer cast_from_void_ptr(void * ref) { return (pointer)ref; }
0398     static void destroy_token( pointer /*token*/) {}
0399 };
0400 
0401 // converting type to and from void*, passing objects directly
0402 template<typename T>
0403 class token_helper<T, false> {
0404     typedef union {
0405         T actual_value;
0406         void * void_overlay;
0407     } type_to_void_ptr_map;
0408 public:
0409     typedef T pointer;  // not really a pointer in this case.
0410     typedef T value_type;
0411     static pointer create_token(const value_type & source) { return source; }
0412     static value_type & token(pointer & t) { return t; }
0413     static void * cast_to_void_ptr(pointer ref) {
0414         type_to_void_ptr_map mymap;
0415         mymap.void_overlay = NULL;
0416         mymap.actual_value = ref;
0417         return mymap.void_overlay;
0418     }
0419     static pointer cast_from_void_ptr(void * ref) {
0420         type_to_void_ptr_map mymap;
0421         mymap.void_overlay = ref;
0422         return mymap.actual_value;
0423     }
0424     static void destroy_token( pointer /*token*/) {}
0425 };
0426 
0427 // intermediate
0428 template<typename T, typename U, typename Body>
0429 class concrete_filter: public tbb::filter {
0430     const Body& my_body;
0431     typedef token_helper<T,use_allocator<T>::value> t_helper;
0432     typedef typename t_helper::pointer t_pointer;
0433     typedef token_helper<U,use_allocator<U>::value> u_helper;
0434     typedef typename u_helper::pointer u_pointer;
0435 
0436     void* operator()(void* input) __TBB_override {
0437         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
0438         u_pointer output_u = u_helper::create_token(my_body(tbb::internal::move(t_helper::token(temp_input))));
0439         t_helper::destroy_token(temp_input);
0440         return u_helper::cast_to_void_ptr(output_u);
0441     }
0442 
0443     void finalize(void * input) __TBB_override {
0444         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
0445         t_helper::destroy_token(temp_input);
0446     }
0447 
0448 public:
0449     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
0450 };
0451 
0452 // input
0453 template<typename U, typename Body>
0454 class concrete_filter<void,U,Body>: public filter {
0455     const Body& my_body;
0456     typedef token_helper<U, use_allocator<U>::value> u_helper;
0457     typedef typename u_helper::pointer u_pointer;
0458 
0459     void* operator()(void*) __TBB_override {
0460         flow_control control;
0461         u_pointer output_u = u_helper::create_token(my_body(control));
0462         if(control.is_pipeline_stopped) {
0463             u_helper::destroy_token(output_u);
0464             set_end_of_input();
0465             return NULL;
0466         }
0467         return u_helper::cast_to_void_ptr(output_u);
0468     }
0469 
0470 public:
0471     concrete_filter(tbb::filter::mode filter_mode, const Body& body) :
0472         filter(static_cast<tbb::filter::mode>(filter_mode | filter_may_emit_null)),
0473         my_body(body)
0474     {}
0475 };
0476 
0477 // output
0478 template<typename T, typename Body>
0479 class concrete_filter<T,void,Body>: public filter {
0480     const Body& my_body;
0481     typedef token_helper<T, use_allocator<T>::value> t_helper;
0482     typedef typename t_helper::pointer t_pointer;
0483 
0484     void* operator()(void* input) __TBB_override {
0485         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
0486         my_body(tbb::internal::move(t_helper::token(temp_input)));
0487         t_helper::destroy_token(temp_input);
0488         return NULL;
0489     }
0490     void finalize(void* input) __TBB_override {
0491         t_pointer temp_input = t_helper::cast_from_void_ptr(input);
0492         t_helper::destroy_token(temp_input);
0493     }
0494 
0495 public:
0496     concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
0497 };
0498 
0499 template<typename Body>
0500 class concrete_filter<void,void,Body>: public filter {
0501     const Body& my_body;
0502 
0503     void* operator()(void*) __TBB_override {
0504         flow_control control;
0505         my_body(control);
0506         void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
0507         return output;
0508     }
0509 public:
0510     concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
0511 };
0512 
0513 //! The class that represents an object of the pipeline for parallel_pipeline().
0514 /** It primarily serves as RAII class that deletes heap-allocated filter instances. */
0515 class pipeline_proxy {
0516     tbb::pipeline my_pipe;
0517 public:
0518     pipeline_proxy( const filter_t<void,void>& filter_chain );
0519     ~pipeline_proxy() {
0520         while( filter* f = my_pipe.filter_list )
0521             delete f; // filter destructor removes it from the pipeline
0522     }
0523     tbb::pipeline* operator->() { return &my_pipe; }
0524 };
0525 
0526 //! Abstract base class that represents a node in a parse tree underlying a filter_t.
0527 /** These nodes are always heap-allocated and can be shared by filter_t objects. */
0528 class filter_node: tbb::internal::no_copy {
0529     /** Count must be atomic because it is hidden state for user, but might be shared by threads. */
0530     tbb::atomic<intptr_t> ref_count;
0531 protected:
0532     filter_node() {
0533         ref_count = 0;
0534 #ifdef __TBB_TEST_FILTER_NODE_COUNT
0535         ++(__TBB_TEST_FILTER_NODE_COUNT);
0536 #endif
0537     }
0538 public:
0539     //! Add concrete_filter to pipeline
0540     virtual void add_to( pipeline& ) = 0;
0541     //! Increment reference count
0542     void add_ref() { ++ref_count; }
0543     //! Decrement reference count and delete if it becomes zero.
0544     void remove_ref() {
0545         __TBB_ASSERT(ref_count>0,"ref_count underflow");
0546         if( --ref_count==0 )
0547             delete this;
0548     }
0549     virtual ~filter_node() {
0550 #ifdef __TBB_TEST_FILTER_NODE_COUNT
0551         --(__TBB_TEST_FILTER_NODE_COUNT);
0552 #endif
0553     }
0554 };
0555 
0556 //! Node in parse tree representing result of make_filter.
0557 template<typename T, typename U, typename Body>
0558 class filter_node_leaf: public filter_node {
0559     const tbb::filter::mode mode;
0560     const Body body;
0561     void add_to( pipeline& p ) __TBB_override {
0562         concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
0563         p.add_filter( *f );
0564     }
0565 public:
0566     filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
0567 };
0568 
0569 //! Node in parse tree representing join of two filters.
0570 class filter_node_join: public filter_node {
0571     friend class filter_node; // to suppress GCC 3.2 warnings
0572     filter_node& left;
0573     filter_node& right;
0574     ~filter_node_join() {
0575        left.remove_ref();
0576        right.remove_ref();
0577     }
0578     void add_to( pipeline& p ) __TBB_override {
0579         left.add_to(p);
0580         right.add_to(p);
0581     }
0582 public:
0583     filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
0584        left.add_ref();
0585        right.add_ref();
0586     }
0587 };
0588 
0589 } // namespace internal
0590 //! @endcond
0591 
0592 //! Create a filter to participate in parallel_pipeline
0593 template<typename T, typename U, typename Body>
0594 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
0595     return new internal::filter_node_leaf<T,U,Body>(mode, body);
0596 }
0597 
0598 template<typename T, typename V, typename U>
0599 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
0600     __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
0601     __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
0602     return new internal::filter_node_join(*left.root,*right.root);
0603 }
0604 
0605 //! Class representing a chain of type-safe pipeline filters
0606 template<typename T, typename U>
0607 class filter_t {
0608     typedef internal::filter_node filter_node;
0609     filter_node* root;
0610     filter_t( filter_node* root_ ) : root(root_) {
0611         root->add_ref();
0612     }
0613     friend class internal::pipeline_proxy;
0614     template<typename T_, typename U_, typename Body>
0615     friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
0616     template<typename T_, typename V_, typename U_>
0617     friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
0618 public:
0619     // TODO: add move-constructors, move-assignment, etc. where C++11 is available.
0620     filter_t() : root(NULL) {}
0621     filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
0622         if( root ) root->add_ref();
0623     }
0624     template<typename Body>
0625     filter_t( tbb::filter::mode mode, const Body& body ) :
0626         root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
0627         root->add_ref();
0628     }
0629 
0630     void operator=( const filter_t<T,U>& rhs ) {
0631         // Order of operations below carefully chosen so that reference counts remain correct
0632         // in unlikely event that remove_ref throws exception.
0633         filter_node* old = root;
0634         root = rhs.root;
0635         if( root ) root->add_ref();
0636         if( old ) old->remove_ref();
0637     }
0638     ~filter_t() {
0639         if( root ) root->remove_ref();
0640     }
0641     void clear() {
0642         // Like operator= with filter_t() on right side.
0643         if( root ) {
0644             filter_node* old = root;
0645             root = NULL;
0646             old->remove_ref();
0647         }
0648     }
0649 };
0650 
0651 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
0652     __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t"  );
0653     filter_chain.root->add_to(my_pipe);
0654 }
0655 
0656 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
0657 #if __TBB_TASK_GROUP_CONTEXT
0658     , tbb::task_group_context& context
0659 #endif
0660     ) {
0661     internal::pipeline_proxy pipe(filter_chain);
0662     // tbb::pipeline::run() is called via the proxy
0663     pipe->run(max_number_of_live_tokens
0664 #if __TBB_TASK_GROUP_CONTEXT
0665               , context
0666 #endif
0667     );
0668 }
0669 
0670 #if __TBB_TASK_GROUP_CONTEXT
0671 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
0672     tbb::task_group_context context;
0673     parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
0674 }
0675 #endif // __TBB_TASK_GROUP_CONTEXT
0676 
0677 } // interface6
0678 
0679 using interface6::flow_control;
0680 using interface6::filter_t;
0681 using interface6::make_filter;
0682 using interface6::parallel_pipeline;
0683 
0684 } // tbb
0685 
0686 #include "internal/_warning_suppress_disable_notice.h"
0687 #undef __TBB_pipeline_H_include_area
0688 
0689 #endif /* __TBB_pipeline_H */