File indexing completed on 2025-01-18 10:17:36
0001
0002
0003
0004
0005
0006 #pragma once
0007 #include <JANA/Utils/JCpuInfo.h>
0008 #include <JANA/JEvent.h>
0009
0010 #include <memory>
0011 #include <vector>
0012 #include <cassert>
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032 class JEventQueue {
0033
0034 protected:
0035 struct alignas(JANA2_CACHE_LINE_BYTES) LocalQueue {
0036 std::vector<JEvent*> ringbuffer;
0037 size_t size=0;
0038 size_t capacity=0;
0039 size_t front=0;
0040 size_t back=0;
0041 };
0042
0043 std::vector<std::unique_ptr<LocalQueue>> m_local_queues;
0044 size_t m_capacity;
0045
0046 public:
0047 inline JEventQueue(size_t initial_capacity, size_t locations_count) {
0048
0049 assert(locations_count >= 1);
0050 for (size_t location=0; location<locations_count; ++location) {
0051 m_local_queues.push_back(std::make_unique<LocalQueue>());
0052 }
0053 m_capacity = initial_capacity;
0054 Scale(initial_capacity);
0055 }
0056
0057 virtual ~JEventQueue() = default;
0058
0059 virtual void Scale(size_t capacity) {
0060 if (capacity < m_capacity) {
0061 for (auto& local_queue : m_local_queues) {
0062 if (local_queue->size != 0) {
0063 throw JException("Attempted to shrink a non-empty JEventQueue. Please drain the topology before attempting to downscale.");
0064 }
0065 }
0066 }
0067 m_capacity = capacity;
0068 for (auto& local_queue: m_local_queues) {
0069 local_queue->ringbuffer.resize(capacity, nullptr);
0070 local_queue->capacity = capacity;
0071 }
0072 }
0073
0074 inline size_t GetLocationCount() {
0075 return m_local_queues.size();
0076 }
0077
0078 inline size_t GetSize(size_t location) {
0079 auto& local_queue = m_local_queues[location];
0080 return local_queue->size;
0081 }
0082
0083 inline size_t GetCapacity() {
0084 return m_capacity;
0085 }
0086
0087 inline void Push(JEvent* event, size_t location) {
0088 auto& local_queue = *m_local_queues[location];
0089 if (local_queue.size == local_queue.capacity) {
0090 throw JException("Attempted to push to a full JEventQueue. This probably means there is an error in your topology wiring");
0091 }
0092
0093 local_queue.ringbuffer[local_queue.front] = event;
0094 local_queue.front = (local_queue.front + 1) % local_queue.capacity;
0095 local_queue.size += 1;
0096 }
0097
0098 inline JEvent* Pop(size_t location) {
0099 auto& local_queue= *m_local_queues[location];
0100 if (local_queue.size == 0) {
0101 return nullptr;
0102 }
0103 JEvent* result = local_queue.ringbuffer[local_queue.back];
0104 local_queue.ringbuffer[local_queue.back] = nullptr;
0105 local_queue.back = (local_queue.back + 1) % local_queue.capacity;
0106 local_queue.size -= 1;
0107 return result;
0108 };
0109
0110 };
0111
0112