File indexing completed on 2025-01-30 10:12:06
0001
0002
0003
0004
0005 #pragma once
0006 #include <JANA/Utils/JCpuInfo.h>
0007 #include <JANA/JEvent.h>
0008
0009 #include <mutex>
0010 #include <deque>
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034 class JQueue {
0035 protected:
0036 size_t m_capacity;
0037 size_t m_locations_count;
0038 bool m_enable_work_stealing = false;
0039 int m_id = 0;
0040 JLogger m_logger;
0041
0042 public:
0043 inline size_t get_threshold() { return m_capacity; }
0044 inline size_t get_locations_count() { return m_locations_count; }
0045 inline bool is_work_stealing_enabled() { return m_enable_work_stealing; }
0046 void set_logger(JLogger logger) { m_logger = logger; }
0047 void set_id(int id) { m_id = id; }
0048
0049
0050 inline JQueue(size_t threshold, size_t locations_count, bool enable_work_stealing)
0051 : m_capacity(threshold), m_locations_count(locations_count), m_enable_work_stealing(enable_work_stealing) {}
0052 virtual ~JQueue() = default;
0053 };
0054
0055 template <typename T>
0056 class JMailbox : public JQueue {
0057
0058 struct LocalQueue {
0059 std::mutex mutex;
0060 std::deque<T> queue;
0061 size_t reserved_count = 0;
0062 };
0063
0064
0065 std::unique_ptr<LocalQueue[]> m_queues;
0066
0067 public:
0068
0069 enum class Status {Ready, Congested, Empty, Full};
0070
0071 friend std::ostream& operator<<(std::ostream& os, const Status& s) {
0072 switch (s) {
0073 case Status::Ready: os << "Ready"; break;
0074 case Status::Congested: os << "Congested"; break;
0075 case Status::Empty: os << "Empty"; break;
0076 case Status::Full: os << "Full"; break;
0077 default: os << "Unknown"; break;
0078 }
0079 return os;
0080 }
0081
0082
0083
0084
0085
0086 JMailbox(size_t threshold=100, size_t locations_count=1, bool enable_work_stealing=false)
0087 : JQueue(threshold, locations_count, enable_work_stealing) {
0088
0089 m_queues = std::unique_ptr<LocalQueue[]>(new LocalQueue[locations_count]);
0090 }
0091
0092 virtual ~JMailbox() {
0093
0094 }
0095
0096
0097 inline void set_threshold(size_t threshold) { m_capacity = threshold; }
0098
0099
0100
0101
0102 size_t size() {
0103 size_t result = 0;
0104 for (size_t i = 0; i<m_locations_count; ++i) {
0105 std::lock_guard<std::mutex> lock(m_queues[i].mutex);
0106 result += m_queues[i].queue.size();
0107 }
0108 return result;
0109 };
0110
0111
0112
0113 size_t size(size_t location_id) {
0114 return m_queues[location_id].queue.size();
0115 }
0116
0117
0118
0119
0120
0121
0122
0123
0124 size_t reserve(size_t requested_count, size_t location_id = 0) {
0125
0126 LocalQueue& mb = m_queues[location_id];
0127 std::lock_guard<std::mutex> lock(mb.mutex);
0128 size_t doable_count = m_capacity - mb.queue.size() - mb.reserved_count;
0129 if (doable_count > 0) {
0130 size_t reservation = std::min(doable_count, requested_count);
0131 mb.reserved_count += reservation;
0132 return reservation;
0133 }
0134 return 0;
0135 };
0136
0137
0138
0139
0140
0141 Status push(std::vector<T>& buffer, size_t reserved_count = 0, size_t location_id = 0) {
0142
0143 auto& mb = m_queues[location_id];
0144 std::lock_guard<std::mutex> lock(mb.mutex);
0145 mb.reserved_count -= reserved_count;
0146 for (const T& t : buffer) {
0147 mb.queue.push_back(std::move(t));
0148 }
0149 buffer.clear();
0150 if (mb.queue.size() > m_capacity) {
0151 return Status::Full;
0152 }
0153 return Status::Ready;
0154 }
0155
0156
0157
0158
0159
0160 Status pop(std::vector<T>& buffer, size_t requested_count, size_t location_id = 0) {
0161
0162 auto& mb = m_queues[location_id];
0163 if (!mb.mutex.try_lock()) {
0164 return Status::Congested;
0165 }
0166 auto nitems = std::min(requested_count, mb.queue.size());
0167 buffer.reserve(nitems);
0168 for (size_t i=0; i<nitems; ++i) {
0169 buffer.push_back(std::move(mb.queue.front()));
0170 mb.queue.pop_front();
0171 }
0172 auto size = mb.queue.size();
0173 mb.mutex.unlock();
0174 if (size >= m_capacity) {
0175 return Status::Full;
0176 }
0177 else if (size != 0) {
0178 return Status::Ready;
0179 }
0180 return Status::Empty;
0181 }
0182
0183
0184 Status pop(T& item, bool& success, size_t location_id = 0) {
0185
0186 success = false;
0187 auto& mb = m_queues[location_id];
0188 if (!mb.mutex.try_lock()) {
0189 return Status::Congested;
0190 }
0191 size_t nitems = mb.queue.size();
0192 if (nitems > 1) {
0193 item = std::move(mb.queue.front());
0194 mb.queue.pop_front();
0195 success = true;
0196 mb.mutex.unlock();
0197 return Status::Ready;
0198 }
0199 else if (nitems == 1) {
0200 item = std::move(mb.queue.front());
0201 mb.queue.pop_front();
0202 success = true;
0203 mb.mutex.unlock();
0204 return Status::Empty;
0205 }
0206 mb.mutex.unlock();
0207 return Status::Empty;
0208 }
0209
0210
0211
0212
0213 bool try_push(T* buffer, size_t count, size_t location_id = 0) {
0214 auto& mb = m_queues[location_id];
0215 std::lock_guard<std::mutex> lock(mb.mutex);
0216 if (mb.queue.size() + count > m_capacity) return false;
0217 for (size_t i=0; i<count; ++i) {
0218 mb.queue.push_back(buffer[i]);
0219 buffer[i] = nullptr;
0220 }
0221 return true;
0222 }
0223
0224 void push_and_unreserve(T* buffer, size_t count, size_t reserved_count = 0, size_t location_id = 0) {
0225
0226 auto& mb = m_queues[location_id];
0227 std::lock_guard<std::mutex> lock(mb.mutex);
0228 assert(reserved_count <= mb.reserved_count);
0229 assert(mb.queue.size() + count <= m_capacity);
0230 mb.reserved_count -= reserved_count;
0231 for (size_t i=0; i<count; ++i) {
0232 mb.queue.push_back(buffer[i]);
0233 buffer[i] = nullptr;
0234 }
0235 }
0236
0237 size_t pop(T* buffer, size_t min_requested_count, size_t max_requested_count, size_t location_id = 0) {
0238
0239 auto& mb = m_queues[location_id];
0240 std::lock_guard<std::mutex> lock(mb.mutex);
0241
0242 if (mb.queue.size() < min_requested_count) return 0;
0243
0244 auto nitems = std::min(max_requested_count, mb.queue.size());
0245
0246 for (size_t i=0; i<nitems; ++i) {
0247 buffer[i] = mb.queue.front();
0248 mb.queue.pop_front();
0249 }
0250 return nitems;
0251 }
0252
0253 size_t pop_and_reserve(T* buffer, size_t min_requested_count, size_t max_requested_count, size_t location_id = 0) {
0254
0255 auto& mb = m_queues[location_id];
0256 std::lock_guard<std::mutex> lock(mb.mutex);
0257
0258 if (mb.queue.size() < min_requested_count) return 0;
0259
0260 auto nitems = std::min(max_requested_count, mb.queue.size());
0261 mb.reserved_count += nitems;
0262
0263 for (size_t i=0; i<nitems; ++i) {
0264 buffer[i] = mb.queue.front();
0265 mb.queue.pop_front();
0266 }
0267 return nitems;
0268 }
0269
0270 size_t reserve(size_t min_requested_count, size_t max_requested_count, size_t location_id) {
0271
0272 LocalQueue& mb = m_queues[location_id];
0273 std::lock_guard<std::mutex> lock(mb.mutex);
0274 size_t available_count = m_capacity - mb.queue.size() - mb.reserved_count;
0275 size_t count = std::min(available_count, max_requested_count);
0276 if (count < min_requested_count) {
0277 return 0;
0278 }
0279 mb.reserved_count += count;
0280 return count;
0281 };
0282
0283 void unreserve(size_t reserved_count, size_t location_id) {
0284
0285 LocalQueue& mb = m_queues[location_id];
0286 std::lock_guard<std::mutex> lock(mb.mutex);
0287 assert(reserved_count <= mb.reserved_count);
0288 mb.reserved_count -= reserved_count;
0289 };
0290
0291 };
0292
0293 template <>
0294 inline void JMailbox<std::shared_ptr<JEvent>*>::push_and_unreserve(std::shared_ptr<JEvent>** buffer, size_t count, size_t reserved_count, size_t location_id) {
0295
0296 auto& mb = m_queues[location_id];
0297 std::lock_guard<std::mutex> lock(mb.mutex);
0298 assert(reserved_count <= mb.reserved_count);
0299 assert(mb.queue.size() + count <= m_capacity);
0300 mb.reserved_count -= reserved_count;
0301 for (size_t i=0; i<count; ++i) {
0302 LOG_TRACE(m_logger) << "JMailbox: push_and_unreserve(): queue #" << m_id << ", event #" << buffer[i]->get()->GetEventNumber() << LOG_END;
0303 mb.queue.push_back(buffer[i]);
0304 buffer[i] = nullptr;
0305 }
0306 }
0307
0308 template <>
0309 inline size_t JMailbox<std::shared_ptr<JEvent>*>::pop_and_reserve(std::shared_ptr<JEvent>** buffer, size_t min_requested_count, size_t max_requested_count, size_t location_id) {
0310
0311 auto& mb = m_queues[location_id];
0312 std::lock_guard<std::mutex> lock(mb.mutex);
0313
0314 if (mb.queue.size() < min_requested_count) return 0;
0315
0316 auto nitems = std::min(max_requested_count, mb.queue.size());
0317 mb.reserved_count += nitems;
0318
0319 for (size_t i=0; i<nitems; ++i) {
0320 buffer[i] = mb.queue.front();
0321 LOG_TRACE(m_logger) << "JMailbox: pop_and_reserve(): queue #" << m_id << ", event #" << buffer[i]->get()->GetEventNumber() << LOG_END;
0322 mb.queue.pop_front();
0323 }
0324 return nitems;
0325 }
0326
0327