File indexing completed on 2025-12-16 10:36:45
0001
0002
0003
0004
0005
0006 #pragma once
0007 #include <JANA/Utils/JCpuInfo.h>
0008 #include <JANA/JEvent.h>
0009
0010 #include <memory>
0011 #include <vector>
0012 #include <cassert>
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032 class JEventQueue {
0033
0034 protected:
0035 struct alignas(JANA2_CACHE_LINE_BYTES) LocalQueue {
0036 std::vector<JEvent*> ringbuffer;
0037 size_t size=0;
0038 size_t capacity=0;
0039 size_t front=0;
0040 size_t back=0;
0041 };
0042
0043 std::vector<std::unique_ptr<LocalQueue>> m_local_queues;
0044 size_t m_capacity = 0;
0045
0046
0047 bool m_establishes_ordering = false;
0048 size_t m_next_event_index = 0;
0049
0050
0051 bool m_enforces_ordering = false;
0052 size_t m_next_slot = 0;
0053 int m_min_index = 0;
0054 int m_max_index = 0;
0055
0056 public:
0057 inline JEventQueue(size_t initial_capacity, size_t locations_count) {
0058
0059 assert(locations_count >= 1);
0060 for (size_t location=0; location<locations_count; ++location) {
0061 m_local_queues.push_back(std::make_unique<LocalQueue>());
0062 }
0063 Scale(initial_capacity);
0064 }
0065
0066 virtual ~JEventQueue() = default;
0067
0068
0069 void SetEstablishesOrdering(bool establishes_ordering=true) {
0070 m_establishes_ordering = establishes_ordering;
0071 }
0072
0073 void SetEnforcesOrdering(bool enforces_ordering=true) {
0074 m_enforces_ordering = enforces_ordering;
0075 }
0076
0077 bool GetEstablishesOrdering() const {
0078 return m_establishes_ordering;
0079 }
0080
0081 bool GetEnforcesOrdering() const {
0082 return m_enforces_ordering;
0083 }
0084
0085 virtual void Scale(size_t capacity) {
0086 if (capacity < m_capacity) {
0087 for (auto& local_queue : m_local_queues) {
0088 if (local_queue->size != 0) {
0089 throw JException("Attempted to shrink a non-empty JEventQueue. Please drain the topology before attempting to downscale.");
0090 }
0091 }
0092 }
0093 m_capacity = capacity;
0094 m_max_index = capacity - 1;
0095 for (auto& local_queue: m_local_queues) {
0096 local_queue->ringbuffer.resize(capacity, nullptr);
0097 local_queue->capacity = capacity;
0098 }
0099 }
0100
0101 inline size_t GetLocationCount() {
0102 return m_local_queues.size();
0103 }
0104
0105 inline size_t GetSize(size_t location) {
0106 auto& local_queue = m_local_queues[location];
0107 return local_queue->size;
0108 }
0109
0110 inline size_t GetCapacity() {
0111 return m_capacity;
0112 }
0113
0114 inline void Push(JEvent* event, size_t location) {
0115
0116 if (m_enforces_ordering) {
0117
0118 auto index = event->GetEventIndex();
0119 if (m_max_index < index) {
0120 throw JException("Event index=%lu is above max=%lu", index, m_max_index);
0121 }
0122 if (m_min_index > index) {
0123 throw JException("Event index=%lu is below min=%lu", index, m_min_index);
0124 }
0125 size_t slot = index % m_capacity;
0126 auto& buffer = m_local_queues[0]->ringbuffer;
0127
0128 if (buffer[slot] != nullptr) {
0129 throw JException("Collision when pushing to ordered queue. slot=%lu", slot);
0130 }
0131 buffer[slot] = event;
0132 }
0133 else {
0134
0135 auto& local_queue = *m_local_queues[location];
0136 if (local_queue.size == local_queue.capacity) {
0137 throw JException("Attempted to push to a full JEventQueue. This probably means there is an error in your topology wiring");
0138 }
0139
0140 local_queue.ringbuffer[local_queue.front] = event;
0141 local_queue.front = (local_queue.front + 1) % local_queue.capacity;
0142 local_queue.size += 1;
0143 }
0144 }
0145
0146 inline JEvent* Pop(size_t location) {
0147 if (m_enforces_ordering) {
0148 auto& buffer = m_local_queues[0]->ringbuffer;
0149 auto event = buffer[m_next_slot];
0150 if (event == nullptr) {
0151 return nullptr;
0152 }
0153 buffer[m_next_slot] = nullptr;
0154 m_min_index += 1;
0155 m_max_index += 1;
0156 m_next_slot += 1;
0157 m_next_slot %= m_capacity;
0158 return event;
0159 }
0160 else {
0161 auto& local_queue= *m_local_queues[location];
0162 if (local_queue.size == 0) {
0163 return nullptr;
0164 }
0165 JEvent* result = local_queue.ringbuffer[local_queue.back];
0166 local_queue.ringbuffer[local_queue.back] = nullptr;
0167 local_queue.back = (local_queue.back + 1) % local_queue.capacity;
0168 local_queue.size -= 1;
0169
0170 if (m_establishes_ordering) {
0171 result->SetEventIndex(m_next_event_index);
0172 m_next_event_index += 1;
0173 }
0174 return result;
0175 }
0176 };
0177
0178 };
0179
0180