Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:12:06

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 #include <JANA/Utils/JCpuInfo.h>
0007 #include <JANA/JEvent.h>
0008 
0009 #include <mutex>
0010 #include <deque>
0011 
0012 /// JMailbox is a threadsafe event queue designed for communication between Arrows.
0013 /// It is different from the standard data structure in the following ways:
0014 ///   - pushes and pops return a Status enum, handling the problem of .size() always being stale
0015 ///   - pops may fail but pushes may not
0016 ///   - pushes and pops are chunked, reducing contention and handling the failure case cleanly
0017 ///   - when the .reserve() method is used, the queue size is bounded
0018 ///   - the underlying queue may be shared by all threads, NUMA-domain-local, or thread-local
0019 ///   - the Arrow doesn't have to know anything about locality.
0020 ///
0021 /// To handle memory locality at different granularities, we introduce the concept of a location.
0022 /// Each thread belongs to exactly one location, represented by contiguous unsigned
0023 /// ints starting at 0. While JArrows are wired to one logical JMailbox, JWorkers interact with
0024 /// the physical LocalQueue corresponding to their location. Locations prevent events from crossing 
0025 /// NUMA domains as they get picked up by different JWorker threads.
0026 ///
0027 /// \tparam T must be moveable. Usually this is unique_ptr<JEvent>.
0028 ///
0029 /// Improvements:
0030 ///   1. Pad LocalQueue
0031 ///   2. Enable work stealing
0032 
0033 
0034 class JQueue {
0035 protected:
0036     size_t m_capacity;
0037     size_t m_locations_count;
0038     bool m_enable_work_stealing = false;
0039     int m_id = 0;
0040     JLogger m_logger;
0041 
0042 public:
0043     inline size_t get_threshold() { return m_capacity; }
0044     inline size_t get_locations_count() { return m_locations_count; }
0045     inline bool is_work_stealing_enabled() { return m_enable_work_stealing; }
0046     void set_logger(JLogger logger) { m_logger = logger; }
0047     void set_id(int id) { m_id = id; }
0048 
0049 
0050     inline JQueue(size_t threshold, size_t locations_count, bool enable_work_stealing)
0051         : m_capacity(threshold), m_locations_count(locations_count), m_enable_work_stealing(enable_work_stealing) {}
0052     virtual ~JQueue() = default;
0053 };
0054 
0055 template <typename T>
0056 class JMailbox : public JQueue {
0057 
0058     struct LocalQueue {
0059         std::mutex mutex;
0060         std::deque<T> queue;
0061         size_t reserved_count = 0;
0062     };
0063 
0064     // TODO: Copy these params into DLMB for better locality
0065     std::unique_ptr<LocalQueue[]> m_queues;
0066 
0067 public:
0068 
0069     enum class Status {Ready, Congested, Empty, Full};
0070 
0071     friend std::ostream& operator<<(std::ostream& os, const Status& s) {
0072         switch (s) {
0073             case Status::Ready:     os << "Ready"; break;
0074             case Status::Congested: os << "Congested"; break;
0075             case Status::Empty:     os << "Empty"; break;
0076             case Status::Full:      os << "Full"; break;
0077             default:                os << "Unknown"; break;
0078         }
0079         return os;
0080     }
0081 
0082 
0083     /// threshold: the (soft) maximum number of items in the queue at any time
0084     /// locations_count: the number of locations. More locations = better NUMA performance, worse load balancing
0085     /// enable_work_stealing: allow events to cross locations only when no other work is available. Improves aforementioned load balancing.
0086     JMailbox(size_t threshold=100, size_t locations_count=1, bool enable_work_stealing=false)
0087         : JQueue(threshold, locations_count, enable_work_stealing) {
0088 
0089         m_queues = std::unique_ptr<LocalQueue[]>(new LocalQueue[locations_count]);
0090     }
0091 
0092     virtual ~JMailbox() {
0093         //delete [] m_queues;
0094     }
0095 
0096     // We can do this (for now) because we use a deque underneath, so threshold is 'soft'
0097     inline void set_threshold(size_t threshold) { m_capacity = threshold; }
0098 
0099     /// size() counts the number of items in the queue across all locations
0100     /// This should be used sparingly because it will mess up a bunch of caches.
0101     /// Meant to be used by measure_perf()
0102     size_t size() {
0103         size_t result = 0;
0104         for (size_t i = 0; i<m_locations_count; ++i) {
0105             std::lock_guard<std::mutex> lock(m_queues[i].mutex);
0106             result += m_queues[i].queue.size();
0107         }
0108         return result;
0109     };
0110 
0111     /// size(location_id) counts the number of items in the queue for a particular location
0112     /// Meant to be used by Scheduler::next_assignment() and measure_perf(), eventually
0113     size_t size(size_t location_id) {
0114         return m_queues[location_id].queue.size();
0115     }
0116 
0117     /// reserve(requested_count) keeps our queues bounded in size. The caller should
0118     /// reserve their desired chunk size on the output queue first. The output
0119     /// queue will return a reservation which is less than or equal to requested_count.
0120     /// The caller may then request as many items from the input queue as have been
0121     /// reserved on the output queue. Note that because the input queue may return
0122     /// fewer items than requested, the caller must push their original reserved_count
0123     /// alongside the items, to avoid a "reservation leak".
0124     size_t reserve(size_t requested_count, size_t location_id = 0) {
0125 
0126         LocalQueue& mb = m_queues[location_id];
0127         std::lock_guard<std::mutex> lock(mb.mutex);
0128         size_t doable_count = m_capacity - mb.queue.size() - mb.reserved_count;
0129         if (doable_count > 0) {
0130             size_t reservation = std::min(doable_count, requested_count);
0131             mb.reserved_count += reservation;
0132             return reservation;
0133         }
0134         return 0;
0135     };
0136 
0137     /// push(items, reserved_count, location_id) This function will always
0138     /// succeed, although it may exceed the threshold if the caller didn't reserve
0139     /// space, and it may take a long time because it will wait on a mutex.
0140     /// Note that if the caller had called reserve(), they must pass in the reserved_count here.
0141     Status push(std::vector<T>& buffer, size_t reserved_count = 0, size_t location_id = 0) {
0142 
0143         auto& mb = m_queues[location_id];
0144         std::lock_guard<std::mutex> lock(mb.mutex);
0145         mb.reserved_count -= reserved_count;
0146         for (const T& t : buffer) {
0147              mb.queue.push_back(std::move(t));
0148         }
0149         buffer.clear();
0150         if (mb.queue.size() > m_capacity) {
0151             return Status::Full;
0152         }
0153         return Status::Ready;
0154     }
0155 
0156 
0157     /// pop() will pop up to requested_count items for the desired location_id.
0158     /// If many threads are contending for the queue, this will fail with Status::Contention,
0159     /// in which case the caller should probably consult the Scheduler.
0160     Status pop(std::vector<T>& buffer, size_t requested_count, size_t location_id = 0) {
0161 
0162         auto& mb = m_queues[location_id];
0163         if (!mb.mutex.try_lock()) {
0164             return Status::Congested;
0165         }
0166         auto nitems = std::min(requested_count, mb.queue.size());
0167         buffer.reserve(nitems);
0168         for (size_t i=0; i<nitems; ++i) {
0169             buffer.push_back(std::move(mb.queue.front()));
0170             mb.queue.pop_front();
0171         }
0172         auto size = mb.queue.size();
0173         mb.mutex.unlock();
0174         if (size >= m_capacity) {
0175             return Status::Full;
0176         }
0177         else if (size != 0) {
0178             return Status::Ready;
0179         }
0180         return Status::Empty;
0181     }
0182 
0183 
0184     Status pop(T& item, bool& success, size_t location_id = 0) {
0185 
0186         success = false;
0187         auto& mb = m_queues[location_id];
0188         if (!mb.mutex.try_lock()) {
0189             return Status::Congested;
0190         }
0191         size_t nitems = mb.queue.size();
0192         if (nitems > 1) {
0193             item = std::move(mb.queue.front());
0194             mb.queue.pop_front();
0195             success = true;
0196             mb.mutex.unlock();
0197             return Status::Ready;
0198         }
0199         else if (nitems == 1) {
0200             item = std::move(mb.queue.front());
0201             mb.queue.pop_front();
0202             success = true;
0203             mb.mutex.unlock();
0204             return Status::Empty;
0205         }
0206         mb.mutex.unlock();
0207         return Status::Empty;
0208     }
0209 
0210 
0211 
0212 
0213     bool try_push(T* buffer, size_t count, size_t location_id = 0) {
0214         auto& mb = m_queues[location_id];
0215         std::lock_guard<std::mutex> lock(mb.mutex);
0216         if (mb.queue.size() + count > m_capacity) return false;
0217         for (size_t i=0; i<count; ++i) {
0218              mb.queue.push_back(buffer[i]);
0219              buffer[i] = nullptr;
0220         }
0221         return true;
0222     }
0223 
0224     void push_and_unreserve(T* buffer, size_t count, size_t reserved_count = 0, size_t location_id = 0) {
0225 
0226         auto& mb = m_queues[location_id];
0227         std::lock_guard<std::mutex> lock(mb.mutex);
0228         assert(reserved_count <= mb.reserved_count);
0229         assert(mb.queue.size() + count <= m_capacity);
0230         mb.reserved_count -= reserved_count;
0231         for (size_t i=0; i<count; ++i) {
0232              mb.queue.push_back(buffer[i]);
0233              buffer[i] = nullptr;
0234         }
0235     }
0236 
0237     size_t pop(T* buffer, size_t min_requested_count, size_t max_requested_count, size_t location_id = 0) {
0238 
0239         auto& mb = m_queues[location_id];
0240         std::lock_guard<std::mutex> lock(mb.mutex);
0241 
0242         if (mb.queue.size() < min_requested_count) return 0;
0243 
0244         auto nitems = std::min(max_requested_count, mb.queue.size());
0245 
0246         for (size_t i=0; i<nitems; ++i) {
0247             buffer[i] = mb.queue.front();
0248             mb.queue.pop_front();
0249         }
0250         return nitems;
0251     }
0252 
0253     size_t pop_and_reserve(T* buffer, size_t min_requested_count, size_t max_requested_count, size_t location_id = 0) {
0254 
0255         auto& mb = m_queues[location_id];
0256         std::lock_guard<std::mutex> lock(mb.mutex);
0257 
0258         if (mb.queue.size() < min_requested_count) return 0;
0259 
0260         auto nitems = std::min(max_requested_count, mb.queue.size());
0261         mb.reserved_count += nitems;
0262 
0263         for (size_t i=0; i<nitems; ++i) {
0264             buffer[i] = mb.queue.front();
0265             mb.queue.pop_front();
0266         }
0267         return nitems;
0268     }
0269 
0270     size_t reserve(size_t min_requested_count, size_t max_requested_count, size_t location_id) {
0271 
0272         LocalQueue& mb = m_queues[location_id];
0273         std::lock_guard<std::mutex> lock(mb.mutex);
0274         size_t available_count = m_capacity - mb.queue.size() - mb.reserved_count;
0275         size_t count = std::min(available_count, max_requested_count);
0276         if (count < min_requested_count) {
0277             return 0;
0278         }
0279         mb.reserved_count += count;
0280         return count;
0281     };
0282 
0283     void unreserve(size_t reserved_count, size_t location_id) {
0284 
0285         LocalQueue& mb = m_queues[location_id];
0286         std::lock_guard<std::mutex> lock(mb.mutex);
0287         assert(reserved_count <= mb.reserved_count);
0288         mb.reserved_count -= reserved_count;
0289     };
0290 
0291 };
0292 
0293 template <>
0294 inline void JMailbox<std::shared_ptr<JEvent>*>::push_and_unreserve(std::shared_ptr<JEvent>** buffer, size_t count, size_t reserved_count, size_t location_id) {
0295 
0296     auto& mb = m_queues[location_id];
0297     std::lock_guard<std::mutex> lock(mb.mutex);
0298     assert(reserved_count <= mb.reserved_count);
0299     assert(mb.queue.size() + count <= m_capacity);
0300     mb.reserved_count -= reserved_count;
0301     for (size_t i=0; i<count; ++i) {
0302         LOG_TRACE(m_logger) << "JMailbox: push_and_unreserve(): queue #" << m_id << ", event #" << buffer[i]->get()->GetEventNumber() << LOG_END;
0303         mb.queue.push_back(buffer[i]);
0304         buffer[i] = nullptr;
0305     }
0306 } 
0307 
0308 template <>
0309 inline size_t JMailbox<std::shared_ptr<JEvent>*>::pop_and_reserve(std::shared_ptr<JEvent>** buffer, size_t min_requested_count, size_t max_requested_count, size_t location_id) {
0310 
0311     auto& mb = m_queues[location_id];
0312     std::lock_guard<std::mutex> lock(mb.mutex);
0313 
0314     if (mb.queue.size() < min_requested_count) return 0;
0315 
0316     auto nitems = std::min(max_requested_count, mb.queue.size());
0317     mb.reserved_count += nitems;
0318 
0319     for (size_t i=0; i<nitems; ++i) {
0320         buffer[i] = mb.queue.front();
0321         LOG_TRACE(m_logger) << "JMailbox: pop_and_reserve(): queue #" << m_id << ", event #" << buffer[i]->get()->GetEventNumber() << LOG_END;
0322         mb.queue.pop_front();
0323     }
0324     return nitems;
0325 }
0326 
0327