Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:12:06

0001 // Copyright 2023, Jefferson Science Associates, LLC.
0002 // Subject to the terms in the LICENSE file found in the top-level directory.
0003 
0004 #pragma once
0005 #include <JANA/Utils/JCpuInfo.h>
0006 #include <JANA/JLogger.h>
0007 #include <mutex>
0008 
0009 
0010 class JPoolBase {
0011 protected:
0012     size_t m_pool_size;
0013     size_t m_location_count;
0014     bool m_limit_total_events_in_flight;
0015 public:
0016     JPoolBase(
0017         size_t pool_size,
0018         size_t location_count,
0019         bool limit_total_events_in_flight)
0020       : m_pool_size(pool_size)
0021       , m_location_count(location_count)
0022       , m_limit_total_events_in_flight(limit_total_events_in_flight) {}
0023 
0024     virtual ~JPoolBase() = default;
0025 };
0026 
0027 template <typename T>
0028 class JPool : public JPoolBase {
0029 private:
0030     struct alignas(JANA2_CACHE_LINE_BYTES) LocalPool {
0031         std::mutex mutex;
0032         std::vector<T*> available_items;
0033         std::vector<T> items;
0034     };
0035 
0036     std::unique_ptr<LocalPool[]> m_pools;
0037 
0038 public:
0039     JPool(size_t pool_size,
0040           size_t location_count,
0041           bool limit_total_events_in_flight) : JPoolBase(pool_size, location_count, limit_total_events_in_flight)
0042     {
0043         assert(m_location_count >= 1);
0044         assert(m_pool_size > 0 || !m_limit_total_events_in_flight);
0045     }
0046 
0047     virtual ~JPool() = default;
0048 
0049     void init() {
0050         m_pools = std::unique_ptr<LocalPool[]>(new LocalPool[m_location_count]());
0051 
0052         for (size_t j=0; j<m_location_count; ++j) {
0053 
0054             m_pools[j].items = std::vector<T>(m_pool_size); // Default-construct everything in place
0055 
0056             for (T& item : m_pools[j].items) {
0057                 configure_item(&item);
0058                 m_pools[j].available_items.push_back(&item);
0059             }
0060         }
0061     }
0062 
0063     virtual void configure_item(T*) {
0064     }
0065 
0066     virtual void release_item(T*) {
0067     }
0068 
0069 
0070     T* get(size_t location=0) {
0071 
0072         assert(m_pools != nullptr); // If you hit this, you forgot to call init().
0073         LocalPool& pool = m_pools[location % m_location_count];
0074         std::lock_guard<std::mutex> lock(pool.mutex);
0075 
0076         if (pool.available_items.empty()) {
0077             if (m_limit_total_events_in_flight) {
0078                 return nullptr;
0079             }
0080             else {
0081                 auto t = new T;
0082                 configure_item(t);
0083                 return t;
0084             }
0085         }
0086         else {
0087             T* item = pool.available_items.back();
0088             pool.available_items.pop_back();
0089             return item;
0090         }
0091     }
0092 
0093 
0094     void put(T* item, size_t location=0) {
0095 
0096         assert(m_pools != nullptr); // If you hit this, you forgot to call init().
0097         
0098         // Do any necessary teardown within the item itself
0099         release_item(item);
0100 
0101         // Consider each location starting with current one
0102         for (size_t l = location; l<location+m_location_count; ++l) {
0103             LocalPool& pool = m_pools[l % m_location_count];
0104 
0105             // Check if item came from this location
0106             if ((item >= &(pool.items[0])) && (item <= &(pool.items[m_pool_size-1]))) {
0107                 std::lock_guard<std::mutex> lock(pool.mutex);
0108                 pool.available_items.push_back(item);
0109                 return;
0110             }
0111 
0112         }
0113         // Otherwise it was allocated on the heap
0114         delete item;
0115     }
0116 
0117     // TODO: This is wrong. Do we use this anywhere?
0118     size_t size() { return m_pool_size; }
0119 
0120     // TODO: Remove me
0121     bool get_many(std::vector<T*>& dest, size_t count, size_t location=0) {
0122 
0123         assert(m_pools != nullptr); // If you hit this, you forgot to call init().
0124 
0125         LocalPool& pool = m_pools[location % m_location_count];
0126         std::lock_guard<std::mutex> lock(pool.mutex);
0127 
0128         if (m_limit_total_events_in_flight && pool.available_items.size() < count) {
0129             return false;
0130         }
0131         else {
0132             while (count > 0 && !pool.available_items.empty()) {
0133                 T* t = pool.available_items.back();
0134                 pool.available_items.pop_back();
0135                 dest.push_back(t);
0136                 count -= 1;
0137             }
0138             while (count > 0) {
0139                 auto t = new T;
0140                 configure_item(t);
0141                 dest.push_back(t);
0142                 count -= 1;
0143             }
0144             return true;
0145         }
0146     }
0147 
0148     // TODO: Remove me
0149     void put_many(std::vector<T*>& finished_events, size_t location=0) {
0150         for (T* item : finished_events) {
0151             put(item, location);
0152         }
0153     }
0154 
0155 
0156     size_t pop(T** dest, size_t min_count, size_t max_count, size_t location=0) {
0157 
0158         assert(m_pools != nullptr); // If you hit this, you forgot to call init().
0159 
0160         LocalPool& pool = m_pools[location % m_location_count];
0161         std::lock_guard<std::mutex> lock(pool.mutex);
0162 
0163         size_t available_count = pool.available_items.size();
0164 
0165         if (m_limit_total_events_in_flight && available_count < min_count) {
0166             // Exit immmediately if we can't reach the minimum
0167             return 0;
0168         }
0169         if (m_limit_total_events_in_flight) {
0170             // Return as many as we can. We aren't allowed to create any more
0171             size_t count = std::min(available_count, max_count);
0172             for (size_t i=0; i<count; ++i) {
0173                 T* t = pool.available_items.back();
0174                 pool.available_items.pop_back();
0175                 dest[i] = t;
0176             }
0177             return count;
0178         }
0179         else {
0180             // Try to minimize number of allocations, as long as we meet min_count
0181             size_t count = std::min(available_count, max_count);
0182             size_t i=0;
0183             for (i=0; i<count; ++i) {
0184                 // Pop the items already in the pool
0185                 T* t = pool.available_items.back();
0186                 pool.available_items.pop_back();
0187                 dest[i] = t;
0188             }
0189             for (; i<min_count; ++i) {
0190                 // If we haven't reached our min count yet, allocate just enough to reach it
0191                 auto t = new T;
0192                 configure_item(t);
0193                 dest[i] = t;
0194             }
0195             return i;
0196         }
0197     }
0198 
0199     void push(T** source, size_t count, size_t location=0) {
0200         for (size_t i=0; i<count; ++i) {
0201             put(source[i], location);
0202             source[i] = nullptr;
0203         }
0204     }
0205 };
0206 
0207 
0208 
0209