Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-18 10:24:17

0001 /*
0002     Copyright (c) 2005-2024 Intel Corporation
0003 
0004     Licensed under the Apache License, Version 2.0 (the "License");
0005     you may not use this file except in compliance with the License.
0006     You may obtain a copy of the License at
0007 
0008         http://www.apache.org/licenses/LICENSE-2.0
0009 
0010     Unless required by applicable law or agreed to in writing, software
0011     distributed under the License is distributed on an "AS IS" BASIS,
0012     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0013     See the License for the specific language governing permissions and
0014     limitations under the License.
0015 */
0016 
0017 #ifndef __TBB__flow_graph_item_buffer_impl_H
0018 #define __TBB__flow_graph_item_buffer_impl_H
0019 
0020 #ifndef __TBB_flow_graph_H
0021 #error Do not #include this internal file directly; use public TBB headers instead.
0022 #endif
0023 
0024 #include "_aligned_space.h"
0025 
0026 // in namespace tbb::flow::interfaceX (included in _flow_graph_node_impl.h)
0027 
0028 //! Expandable buffer of items.  The possible operations are push, pop,
0029 //* tests for empty and so forth.  No mutual exclusion is built in.
0030 //* objects are constructed into and explicitly-destroyed.  get_my_item gives
0031 // a read-only reference to the item in the buffer.  set_my_item may be called
0032 // with either an empty or occupied slot.
0033 
0034 template <typename T, typename A=cache_aligned_allocator<T> >
0035 class item_buffer {
0036 public:
0037     typedef T item_type;
0038     enum buffer_item_state { no_item=0, has_item=1, reserved_item=2 };
0039 protected:
0040     struct aligned_space_item {
0041         item_type item;
0042         buffer_item_state state;
0043 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0044         message_metainfo metainfo;
0045 #endif
0046     };
0047     typedef size_t size_type;
0048     typedef aligned_space<aligned_space_item> buffer_item_type;
0049     typedef typename allocator_traits<A>::template rebind_alloc<buffer_item_type> allocator_type;
0050     buffer_item_type *my_array;
0051     size_type my_array_size;
0052     static const size_type initial_buffer_size = 4;
0053     size_type my_head;
0054     size_type my_tail;
0055 
0056     bool buffer_empty() const { return my_head == my_tail; }
0057 
0058     aligned_space_item &element(size_type i) {
0059         __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->state))%alignment_of<buffer_item_state>::value), nullptr);
0060         __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->item))%alignment_of<item_type>::value), nullptr);
0061 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0062         __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->metainfo))%alignment_of<message_metainfo>::value), nullptr);
0063 #endif
0064         return *my_array[i & (my_array_size - 1) ].begin();
0065     }
0066 
0067     const aligned_space_item &element(size_type i) const {
0068         __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->state))%alignment_of<buffer_item_state>::value), nullptr);
0069         __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->item))%alignment_of<item_type>::value), nullptr);
0070 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0071         __TBB_ASSERT(!(size_type(&(my_array[i&(my_array_size-1)].begin()->metainfo))%alignment_of<message_metainfo>::value), nullptr);
0072 #endif
0073         return *my_array[i & (my_array_size-1)].begin();
0074     }
0075 
0076     bool my_item_valid(size_type i) const { return (i < my_tail) && (i >= my_head) && (element(i).state != no_item); }
0077 #if TBB_USE_ASSERT
0078     bool my_item_reserved(size_type i) const { return element(i).state == reserved_item; }
0079 #endif
0080 
0081     // object management in buffer
0082     const item_type &get_my_item(size_t i) const {
0083         __TBB_ASSERT(my_item_valid(i),"attempt to get invalid item");
0084         return element(i).item;
0085     }
0086 
0087 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0088     message_metainfo& get_my_metainfo(size_t i) {
0089         __TBB_ASSERT(my_item_valid(i), "attempt to get invalid item");
0090         return element(i).metainfo;
0091     }
0092 #endif
0093 
0094     // may be called with an empty slot or a slot that has already been constructed into.
0095     void set_my_item(size_t i, const item_type &o
0096                      __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0097     {
0098         if(element(i).state != no_item) {
0099             destroy_item(i);
0100         }
0101         new(&(element(i).item)) item_type(o);
0102         element(i).state = has_item;
0103 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0104         new(&element(i).metainfo) message_metainfo(metainfo);
0105 
0106         for (auto& waiter : metainfo.waiters()) {
0107             waiter->reserve(1);
0108         }
0109 #endif
0110     }
0111 
0112 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0113     void set_my_item(size_t i, const item_type& o, message_metainfo&& metainfo) {
0114         if(element(i).state != no_item) {
0115             destroy_item(i);
0116         }
0117 
0118         new(&(element(i).item)) item_type(o);
0119         new(&element(i).metainfo) message_metainfo(std::move(metainfo));
0120         // Skipping the reservation on metainfo.waiters since the ownership
0121         // is moving from metainfo to the cache
0122         element(i).state = has_item;
0123     }
0124 #endif
0125 
0126     // destructively-fetch an object from the buffer
0127 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0128     void fetch_item(size_t i, item_type& o, message_metainfo& metainfo) {
0129         __TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot");
0130         o = get_my_item(i);  // could have std::move assign semantics
0131         metainfo = std::move(get_my_metainfo(i));
0132         destroy_item(i);
0133     }
0134 #else
0135     void fetch_item(size_t i, item_type &o) {
0136         __TBB_ASSERT(my_item_valid(i), "Trying to fetch an empty slot");
0137         o = get_my_item(i);  // could have std::move assign semantics
0138         destroy_item(i);
0139     }
0140 #endif
0141 
0142     // move an existing item from one slot to another.  The moved-to slot must be unoccupied,
0143     // the moved-from slot must exist and not be reserved.  The after, from will be empty,
0144     // to will be occupied but not reserved
0145     void move_item(size_t to, size_t from) {
0146         __TBB_ASSERT(!my_item_valid(to), "Trying to move to a non-empty slot");
0147         __TBB_ASSERT(my_item_valid(from), "Trying to move from an empty slot");
0148         // could have std::move semantics
0149         set_my_item(to, get_my_item(from) __TBB_FLOW_GRAPH_METAINFO_ARG(get_my_metainfo(from)));
0150         destroy_item(from);
0151     }
0152 
0153     // put an item in an empty slot.  Return true if successful, else false
0154 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0155     template <typename Metainfo>
0156     bool place_item(size_t here, const item_type &me, Metainfo&& metainfo) {
0157 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
0158         if(my_item_valid(here)) return false;
0159 #endif
0160         set_my_item(here, me, std::forward<Metainfo>(metainfo));
0161         return true;
0162     }
0163 #else
0164     bool place_item(size_t here, const item_type &me) {
0165 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
0166         if(my_item_valid(here)) return false;
0167 #endif
0168         set_my_item(here, me);
0169         return true;
0170     }
0171 #endif
0172 
0173     // could be implemented with std::move semantics
0174     void swap_items(size_t i, size_t j) {
0175         __TBB_ASSERT(my_item_valid(i) && my_item_valid(j), "attempt to swap invalid item(s)");
0176         item_type temp = get_my_item(i);
0177 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0178         message_metainfo temp_metainfo = get_my_metainfo(i);
0179         set_my_item(i, get_my_item(j), get_my_metainfo(j));
0180         set_my_item(j, temp, temp_metainfo);
0181 #else
0182         set_my_item(i, get_my_item(j));
0183         set_my_item(j, temp);
0184 #endif
0185     }
0186 
0187     void destroy_item(size_type i) {
0188         __TBB_ASSERT(my_item_valid(i), "destruction of invalid item");
0189 
0190         auto& e = element(i);
0191         e.item.~item_type();
0192         e.state = no_item;
0193 
0194 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0195         for (auto& msg_waiter : e.metainfo.waiters()) {
0196             msg_waiter->release(1);
0197         }
0198 
0199         e.metainfo.~message_metainfo();
0200 #endif
0201     }
0202 
0203     // returns the front element
0204     const item_type& front() const
0205     {
0206         __TBB_ASSERT(my_item_valid(my_head), "attempt to fetch head non-item");
0207         return get_my_item(my_head);
0208     }
0209 
0210 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0211     const message_metainfo& front_metainfo() const
0212     {
0213         __TBB_ASSERT(my_item_valid(my_head), "attempt to fetch head non-item");
0214         return element(my_head).metainfo;
0215     }
0216 #endif
0217 
0218     // returns  the back element
0219     const item_type& back() const
0220     {
0221         __TBB_ASSERT(my_item_valid(my_tail - 1), "attempt to fetch head non-item");
0222         return get_my_item(my_tail - 1);
0223     }
0224 
0225 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0226     const message_metainfo& back_metainfo() const {
0227         __TBB_ASSERT(my_item_valid(my_tail - 1), "attempt to fetch head non-item");
0228         return element(my_tail - 1).metainfo;
0229     }
0230 #endif
0231 
0232     // following methods are for reservation of the front of a buffer.
0233     void reserve_item(size_type i) {
0234         __TBB_ASSERT(my_item_valid(i) && !my_item_reserved(i), "item cannot be reserved");
0235         element(i).state = reserved_item;
0236     }
0237 
0238     void release_item(size_type i) {
0239         __TBB_ASSERT(my_item_reserved(i), "item is not reserved");
0240         element(i).state = has_item;
0241     }
0242 
0243     void destroy_front() { destroy_item(my_head); ++my_head; }
0244     void destroy_back() { destroy_item(my_tail-1); --my_tail; }
0245 
0246     // we have to be able to test against a new tail value without changing my_tail
0247     // grow_array doesn't work if we change my_tail when the old array is too small
0248     size_type size(size_t new_tail = 0) { return (new_tail ? new_tail : my_tail) - my_head; }
0249     size_type capacity() { return my_array_size; }
0250     // sequencer_node does not use this method, so we don't
0251     // need a version that passes in the new_tail value.
0252     bool buffer_full() { return size() >= capacity(); }
0253 
0254     //! Grows the internal array.
0255     void grow_my_array( size_t minimum_size ) {
0256         // test that we haven't made the structure inconsistent.
0257         __TBB_ASSERT(capacity() >= my_tail - my_head, "total items exceed capacity");
0258         size_type new_size = my_array_size ? 2*my_array_size : initial_buffer_size;
0259         while( new_size<minimum_size )
0260             new_size*=2;
0261 
0262         buffer_item_type* new_array = allocator_type().allocate(new_size);
0263 
0264         // initialize validity to "no"
0265         for( size_type i=0; i<new_size; ++i ) { new_array[i].begin()->state = no_item; }
0266 
0267         for( size_type i=my_head; i<my_tail; ++i) {
0268             if(my_item_valid(i)) {  // sequencer_node may have empty slots
0269                 // placement-new copy-construct; could be std::move
0270                 char *new_space = (char *)&(new_array[i&(new_size-1)].begin()->item);
0271                 (void)new(new_space) item_type(get_my_item(i));
0272                 new_array[i&(new_size-1)].begin()->state = element(i).state;
0273 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0274                 char* meta_space = (char *)&(new_array[i&(new_size-1)].begin()->metainfo);
0275                 ::new(meta_space) message_metainfo(std::move(element(i).metainfo));
0276 #endif
0277             }
0278         }
0279 
0280         clean_up_buffer(/*reset_pointers*/false);
0281 
0282         my_array = new_array;
0283         my_array_size = new_size;
0284     }
0285 
0286     bool push_back(item_type& v
0287                    __TBB_FLOW_GRAPH_METAINFO_ARG(const message_metainfo& metainfo))
0288     {
0289         if (buffer_full()) {
0290             grow_my_array(size() + 1);
0291         }
0292         set_my_item(my_tail, v __TBB_FLOW_GRAPH_METAINFO_ARG(metainfo));
0293         ++my_tail;
0294         return true;
0295     }
0296 
0297     bool pop_back(item_type& v
0298                   __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& metainfo))
0299     {
0300         if (!my_item_valid(my_tail - 1)) {
0301             return false;
0302         }
0303         auto& e = element(my_tail - 1);
0304         v = e.item;
0305 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0306         metainfo = std::move(e.metainfo);
0307 #endif
0308 
0309         destroy_back();
0310         return true;
0311     }
0312 
0313     bool pop_front(item_type& v
0314                    __TBB_FLOW_GRAPH_METAINFO_ARG(message_metainfo& metainfo))
0315     {
0316         if (!my_item_valid(my_head)) {
0317             return false;
0318         }
0319         auto& e = element(my_head);
0320         v = e.item;
0321 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0322         metainfo = std::move(e.metainfo);
0323 #endif
0324 
0325         destroy_front();
0326         return true;
0327     }
0328 
0329 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0330     bool pop_back(item_type& v) {
0331         message_metainfo metainfo;
0332         return pop_back(v, metainfo);
0333     }
0334 
0335     bool pop_front(item_type& v) {
0336         message_metainfo metainfo;
0337         return pop_front(v, metainfo);
0338     }
0339 #endif
0340 
0341     // This is used both for reset and for grow_my_array.  In the case of grow_my_array
0342     // we want to retain the values of the head and tail.
0343     void clean_up_buffer(bool reset_pointers) {
0344         if (my_array) {
0345             for( size_type i=my_head; i<my_tail; ++i ) {
0346                 if(my_item_valid(i))
0347                     destroy_item(i);
0348             }
0349             allocator_type().deallocate(my_array,my_array_size);
0350         }
0351         my_array = nullptr;
0352         if(reset_pointers) {
0353             my_head = my_tail = my_array_size = 0;
0354         }
0355     }
0356 
0357 public:
0358     //! Constructor
0359     item_buffer( ) : my_array(nullptr), my_array_size(0),
0360                      my_head(0), my_tail(0) {
0361         grow_my_array(initial_buffer_size);
0362     }
0363 
0364     ~item_buffer() {
0365         clean_up_buffer(/*reset_pointers*/true);
0366     }
0367 
0368     void reset() { clean_up_buffer(/*reset_pointers*/true); grow_my_array(initial_buffer_size); }
0369 
0370 };
0371 
0372 //! item_buffer with reservable front-end.  NOTE: if reserving, do not
0373 //* complete operation with pop_front(); use consume_front().
0374 //* No synchronization built-in.
0375 template<typename T, typename A=cache_aligned_allocator<T> >
0376 class reservable_item_buffer : public item_buffer<T, A> {
0377 protected:
0378     using item_buffer<T, A>::my_item_valid;
0379     using item_buffer<T, A>::my_head;
0380 
0381 public:
0382     reservable_item_buffer() : item_buffer<T, A>(), my_reserved(false) {}
0383     void reset() {my_reserved = false; item_buffer<T,A>::reset(); }
0384 protected:
0385 
0386     bool reserve_front(T &v) {
0387         if(my_reserved || !my_item_valid(this->my_head)) return false;
0388         my_reserved = true;
0389         // reserving the head
0390         v = this->front();
0391         this->reserve_item(this->my_head);
0392         return true;
0393     }
0394 
0395 #if __TBB_PREVIEW_FLOW_GRAPH_TRY_PUT_AND_WAIT
0396     bool reserve_front(T& v, message_metainfo& metainfo) {
0397         if (my_reserved || !my_item_valid(this->my_head)) return false;
0398         my_reserved = true;
0399         // reserving the head
0400         v = this->front();
0401         metainfo = this->front_metainfo();
0402         this->reserve_item(this->my_head);
0403         return true;
0404     }
0405 #endif
0406 
0407     void consume_front() {
0408         __TBB_ASSERT(my_reserved, "Attempt to consume a non-reserved item");
0409         this->destroy_front();
0410         my_reserved = false;
0411     }
0412 
0413     void release_front() {
0414         __TBB_ASSERT(my_reserved, "Attempt to release a non-reserved item");
0415         this->release_item(this->my_head);
0416         my_reserved = false;
0417     }
0418 
0419     bool my_reserved;
0420 };
0421 
0422 #endif // __TBB__flow_graph_item_buffer_impl_H