File indexing completed on 2025-01-30 10:12:06
0001
0002
0003
0004 #pragma once
0005 #include <JANA/Utils/JCpuInfo.h>
0006 #include <JANA/JLogger.h>
0007 #include <mutex>
0008
0009
0010 class JPoolBase {
0011 protected:
0012 size_t m_pool_size;
0013 size_t m_location_count;
0014 bool m_limit_total_events_in_flight;
0015 public:
0016 JPoolBase(
0017 size_t pool_size,
0018 size_t location_count,
0019 bool limit_total_events_in_flight)
0020 : m_pool_size(pool_size)
0021 , m_location_count(location_count)
0022 , m_limit_total_events_in_flight(limit_total_events_in_flight) {}
0023
0024 virtual ~JPoolBase() = default;
0025 };
0026
0027 template <typename T>
0028 class JPool : public JPoolBase {
0029 private:
0030 struct alignas(JANA2_CACHE_LINE_BYTES) LocalPool {
0031 std::mutex mutex;
0032 std::vector<T*> available_items;
0033 std::vector<T> items;
0034 };
0035
0036 std::unique_ptr<LocalPool[]> m_pools;
0037
0038 public:
0039 JPool(size_t pool_size,
0040 size_t location_count,
0041 bool limit_total_events_in_flight) : JPoolBase(pool_size, location_count, limit_total_events_in_flight)
0042 {
0043 assert(m_location_count >= 1);
0044 assert(m_pool_size > 0 || !m_limit_total_events_in_flight);
0045 }
0046
0047 virtual ~JPool() = default;
0048
0049 void init() {
0050 m_pools = std::unique_ptr<LocalPool[]>(new LocalPool[m_location_count]());
0051
0052 for (size_t j=0; j<m_location_count; ++j) {
0053
0054 m_pools[j].items = std::vector<T>(m_pool_size);
0055
0056 for (T& item : m_pools[j].items) {
0057 configure_item(&item);
0058 m_pools[j].available_items.push_back(&item);
0059 }
0060 }
0061 }
0062
0063 virtual void configure_item(T*) {
0064 }
0065
0066 virtual void release_item(T*) {
0067 }
0068
0069
0070 T* get(size_t location=0) {
0071
0072 assert(m_pools != nullptr);
0073 LocalPool& pool = m_pools[location % m_location_count];
0074 std::lock_guard<std::mutex> lock(pool.mutex);
0075
0076 if (pool.available_items.empty()) {
0077 if (m_limit_total_events_in_flight) {
0078 return nullptr;
0079 }
0080 else {
0081 auto t = new T;
0082 configure_item(t);
0083 return t;
0084 }
0085 }
0086 else {
0087 T* item = pool.available_items.back();
0088 pool.available_items.pop_back();
0089 return item;
0090 }
0091 }
0092
0093
0094 void put(T* item, size_t location=0) {
0095
0096 assert(m_pools != nullptr);
0097
0098
0099 release_item(item);
0100
0101
0102 for (size_t l = location; l<location+m_location_count; ++l) {
0103 LocalPool& pool = m_pools[l % m_location_count];
0104
0105
0106 if ((item >= &(pool.items[0])) && (item <= &(pool.items[m_pool_size-1]))) {
0107 std::lock_guard<std::mutex> lock(pool.mutex);
0108 pool.available_items.push_back(item);
0109 return;
0110 }
0111
0112 }
0113
0114 delete item;
0115 }
0116
0117
0118 size_t size() { return m_pool_size; }
0119
0120
0121 bool get_many(std::vector<T*>& dest, size_t count, size_t location=0) {
0122
0123 assert(m_pools != nullptr);
0124
0125 LocalPool& pool = m_pools[location % m_location_count];
0126 std::lock_guard<std::mutex> lock(pool.mutex);
0127
0128 if (m_limit_total_events_in_flight && pool.available_items.size() < count) {
0129 return false;
0130 }
0131 else {
0132 while (count > 0 && !pool.available_items.empty()) {
0133 T* t = pool.available_items.back();
0134 pool.available_items.pop_back();
0135 dest.push_back(t);
0136 count -= 1;
0137 }
0138 while (count > 0) {
0139 auto t = new T;
0140 configure_item(t);
0141 dest.push_back(t);
0142 count -= 1;
0143 }
0144 return true;
0145 }
0146 }
0147
0148
0149 void put_many(std::vector<T*>& finished_events, size_t location=0) {
0150 for (T* item : finished_events) {
0151 put(item, location);
0152 }
0153 }
0154
0155
0156 size_t pop(T** dest, size_t min_count, size_t max_count, size_t location=0) {
0157
0158 assert(m_pools != nullptr);
0159
0160 LocalPool& pool = m_pools[location % m_location_count];
0161 std::lock_guard<std::mutex> lock(pool.mutex);
0162
0163 size_t available_count = pool.available_items.size();
0164
0165 if (m_limit_total_events_in_flight && available_count < min_count) {
0166
0167 return 0;
0168 }
0169 if (m_limit_total_events_in_flight) {
0170
0171 size_t count = std::min(available_count, max_count);
0172 for (size_t i=0; i<count; ++i) {
0173 T* t = pool.available_items.back();
0174 pool.available_items.pop_back();
0175 dest[i] = t;
0176 }
0177 return count;
0178 }
0179 else {
0180
0181 size_t count = std::min(available_count, max_count);
0182 size_t i=0;
0183 for (i=0; i<count; ++i) {
0184
0185 T* t = pool.available_items.back();
0186 pool.available_items.pop_back();
0187 dest[i] = t;
0188 }
0189 for (; i<min_count; ++i) {
0190
0191 auto t = new T;
0192 configure_item(t);
0193 dest[i] = t;
0194 }
0195 return i;
0196 }
0197 }
0198
0199 void push(T** source, size_t count, size_t location=0) {
0200 for (size_t i=0; i<count; ++i) {
0201 put(source[i], location);
0202 source[i] = nullptr;
0203 }
0204 }
0205 };
0206
0207
0208
0209