Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 10:36:45

0001 
0002 // Copyright 2024, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 // Author: Nathan Brei
0005 
0006 #pragma once
0007 #include <JANA/Utils/JCpuInfo.h>
0008 #include <JANA/JEvent.h>
0009 
0010 #include <memory>
0011 #include <vector>
0012 #include <cassert>
0013 
0014 // JEventQueue is a queue designed for communication between JArrows, with the following features:
0015 //
0016 // - Fixed-capacity during processing, since the max number of events in flight is predetermined and small
0017 // - Ringbuffer-based, to avoid allocations during processing
0018 // - Adjustable-capacity outside processing, since the max number of events in flight may change when the threadcount does
0019 // - Locality-aware, so that events that live in one location (e.g. NUMA domain) can stay within that location
0020 // - NOT thread-safe, because all queue accesses are protected by the JExecutionEngine mutex.
0021 //
0022 // - Previous versions of JEventQueue have allowed work stealing (taking events out of a different 
0023 //   NUMA domain when none are otherwise available). The current version does not, though we may
0024 //   bring this feature back later.
0025 //
0026 /// To handle memory locality at different granularities, we introduce the concept of a location.
0027 /// Each thread belongs to exactly one location, represented by contiguous unsigned
0028 /// ints starting at 0. While JArrows are wired to one logical JEventQueue, threads interact with
0029 /// the physical LocalQueue corresponding to their location. Locations prevent events from crossing 
0030 /// NUMA domains as they get picked up by different worker threads.
0031 
0032 class JEventQueue {
0033 
0034 protected:
0035     struct alignas(JANA2_CACHE_LINE_BYTES) LocalQueue {
0036         std::vector<JEvent*> ringbuffer;
0037         size_t size=0;
0038         size_t capacity=0;
0039         size_t front=0;
0040         size_t back=0;
0041     };
0042 
0043     std::vector<std::unique_ptr<LocalQueue>> m_local_queues;
0044     size_t m_capacity = 0;
0045 
0046     // Order-establishing state
0047     bool m_establishes_ordering = false;
0048     size_t m_next_event_index = 0;
0049 
0050     // Order-enforcing state
0051     bool m_enforces_ordering = false;
0052     size_t m_next_slot = 0;
0053     int m_min_index = 0;
0054     int m_max_index = 0;
0055 
0056 public:
0057     inline JEventQueue(size_t initial_capacity, size_t locations_count) {
0058 
0059         assert(locations_count >= 1);
0060         for (size_t location=0; location<locations_count; ++location) {
0061             m_local_queues.push_back(std::make_unique<LocalQueue>());
0062         }
0063         Scale(initial_capacity);
0064     }
0065 
0066     virtual ~JEventQueue() = default;
0067 
0068 
0069     void SetEstablishesOrdering(bool establishes_ordering=true) {
0070         m_establishes_ordering = establishes_ordering;
0071     }
0072 
0073     void SetEnforcesOrdering(bool enforces_ordering=true) {
0074         m_enforces_ordering = enforces_ordering;
0075     }
0076 
0077     bool GetEstablishesOrdering() const { 
0078         return m_establishes_ordering;
0079     }
0080 
0081     bool GetEnforcesOrdering() const {
0082         return m_enforces_ordering;
0083     }
0084 
0085     virtual void Scale(size_t capacity) {
0086         if (capacity < m_capacity) {
0087             for (auto& local_queue : m_local_queues) {
0088                 if (local_queue->size != 0) {
0089                     throw JException("Attempted to shrink a non-empty JEventQueue. Please drain the topology before attempting to downscale.");
0090                 }
0091             }
0092         }
0093         m_capacity = capacity;
0094         m_max_index = capacity - 1; // zero-indexed
0095         for (auto& local_queue: m_local_queues) {
0096             local_queue->ringbuffer.resize(capacity, nullptr);
0097             local_queue->capacity = capacity;
0098         }
0099     }
0100 
0101     inline size_t GetLocationCount() {
0102         return m_local_queues.size();
0103     }
0104 
0105     inline size_t GetSize(size_t location) {
0106         auto& local_queue = m_local_queues[location];
0107         return local_queue->size;
0108     }
0109 
0110     inline size_t GetCapacity() {
0111         return m_capacity;
0112     }
0113 
0114     inline void Push(JEvent* event, size_t location) {
0115 
0116         if (m_enforces_ordering) {
0117             // Repurpose m_local_queues[0] as an associative array instead of a queue
0118             auto index = event->GetEventIndex();
0119             if (m_max_index < index) {
0120                 throw JException("Event index=%lu is above max=%lu", index, m_max_index);
0121             }
0122             if (m_min_index > index) {
0123                 throw JException("Event index=%lu is below min=%lu", index, m_min_index);
0124             }
0125             size_t slot = index % m_capacity;
0126             auto& buffer = m_local_queues[0]->ringbuffer;
0127 
0128             if (buffer[slot] != nullptr) {
0129                 throw JException("Collision when pushing to ordered queue. slot=%lu", slot);
0130             }
0131             buffer[slot] = event;
0132         }
0133         else {
0134             // Use local_queue as intended
0135             auto& local_queue = *m_local_queues[location];
0136             if (local_queue.size == local_queue.capacity) {
0137                 throw JException("Attempted to push to a full JEventQueue. This probably means there is an error in your topology wiring");
0138             }
0139 
0140             local_queue.ringbuffer[local_queue.front] = event;
0141             local_queue.front = (local_queue.front + 1) % local_queue.capacity;
0142             local_queue.size += 1;
0143         }
0144     }
0145 
0146     inline JEvent* Pop(size_t location) {
0147         if (m_enforces_ordering) {
0148             auto& buffer = m_local_queues[0]->ringbuffer;
0149             auto event = buffer[m_next_slot];
0150             if (event == nullptr) {
0151                 return nullptr;
0152             }
0153             buffer[m_next_slot] = nullptr;
0154             m_min_index += 1;
0155             m_max_index += 1;
0156             m_next_slot += 1;
0157             m_next_slot %= m_capacity;
0158             return event;
0159         }
0160         else {
0161             auto& local_queue= *m_local_queues[location];
0162             if (local_queue.size == 0) {
0163                 return nullptr;
0164             }
0165             JEvent* result = local_queue.ringbuffer[local_queue.back];
0166             local_queue.ringbuffer[local_queue.back] = nullptr;
0167             local_queue.back = (local_queue.back + 1) % local_queue.capacity;
0168             local_queue.size -= 1;
0169 
0170             if (m_establishes_ordering) {
0171                 result->SetEventIndex(m_next_event_index);
0172                 m_next_event_index += 1;
0173             }
0174             return result;
0175         }
0176     };
0177 
0178 };
0179 
0180