Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:36

0001 
0002 // Copyright 2024, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 // Author: Nathan Brei
0005 
0006 #pragma once
0007 #include <JANA/Utils/JCpuInfo.h>
0008 #include <JANA/JEvent.h>
0009 
0010 #include <memory>
0011 #include <vector>
0012 #include <cassert>
0013 
0014 // JEventQueue is a queue designed for communication between JArrows, with the following features:
0015 //
0016 // - Fixed-capacity during processing, since the max number of events in flight is predetermined and small
0017 // - Ringbuffer-based, to avoid allocations during processing
0018 // - Adjustable-capacity outside processing, since the max number of events in flight may change when the threadcount does
0019 // - Locality-aware, so that events that live in one location (e.g. NUMA domain) can stay within that location
0020 // - NOT thread-safe, because all queue accesses are protected by the JExecutionEngine mutex.
0021 //
0022 // - Previous versions of JEventQueue have allowed work stealing (taking events out of a different 
0023 //   NUMA domain when none are otherwise available). The current version does not, though we may
0024 //   bring this feature back later.
0025 //
0026 /// To handle memory locality at different granularities, we introduce the concept of a location.
0027 /// Each thread belongs to exactly one location, represented by contiguous unsigned
0028 /// ints starting at 0. While JArrows are wired to one logical JEventQueue, threads interact with
0029 /// the physical LocalQueue corresponding to their location. Locations prevent events from crossing 
0030 /// NUMA domains as they get picked up by different worker threads.
0031 
0032 class JEventQueue {
0033 
0034 protected:
0035     struct alignas(JANA2_CACHE_LINE_BYTES) LocalQueue {
0036         std::vector<JEvent*> ringbuffer;
0037         size_t size=0;
0038         size_t capacity=0;
0039         size_t front=0;
0040         size_t back=0;
0041     };
0042 
0043     std::vector<std::unique_ptr<LocalQueue>> m_local_queues;
0044     size_t m_capacity;
0045 
0046 public:
0047     inline JEventQueue(size_t initial_capacity, size_t locations_count) {
0048 
0049         assert(locations_count >= 1);
0050         for (size_t location=0; location<locations_count; ++location) {
0051             m_local_queues.push_back(std::make_unique<LocalQueue>());
0052         }
0053         m_capacity = initial_capacity;
0054         Scale(initial_capacity);
0055     }
0056 
0057     virtual ~JEventQueue() = default;
0058 
0059     virtual void Scale(size_t capacity) {
0060         if (capacity < m_capacity) {
0061             for (auto& local_queue : m_local_queues) {
0062                 if (local_queue->size != 0) {
0063                     throw JException("Attempted to shrink a non-empty JEventQueue. Please drain the topology before attempting to downscale.");
0064                 }
0065             }
0066         }
0067         m_capacity = capacity;
0068         for (auto& local_queue: m_local_queues) {
0069             local_queue->ringbuffer.resize(capacity, nullptr);
0070             local_queue->capacity = capacity;
0071         }
0072     }
0073 
0074     inline size_t GetLocationCount() {
0075         return m_local_queues.size();
0076     }
0077 
0078     inline size_t GetSize(size_t location) {
0079         auto& local_queue = m_local_queues[location];
0080         return local_queue->size;
0081     }
0082 
0083     inline size_t GetCapacity() {
0084         return m_capacity;
0085     }
0086 
0087     inline void Push(JEvent* event, size_t location) {
0088         auto& local_queue = *m_local_queues[location];
0089         if (local_queue.size == local_queue.capacity) {
0090             throw JException("Attempted to push to a full JEventQueue. This probably means there is an error in your topology wiring");
0091         }
0092 
0093         local_queue.ringbuffer[local_queue.front] = event;
0094         local_queue.front = (local_queue.front + 1) % local_queue.capacity;
0095         local_queue.size += 1;
0096     }
0097 
0098     inline JEvent* Pop(size_t location) {
0099         auto& local_queue= *m_local_queues[location];
0100         if (local_queue.size == 0) {
0101             return nullptr;
0102         }
0103         JEvent* result = local_queue.ringbuffer[local_queue.back];
0104         local_queue.ringbuffer[local_queue.back] = nullptr;
0105         local_queue.back = (local_queue.back + 1) % local_queue.capacity;
0106         local_queue.size -= 1;
0107         return result;
0108     };
0109 
0110 };
0111 
0112