|
||||
File indexing completed on 2025-01-18 10:17:35
0001 0002 // Copyright 2020, Jefferson Science Associates, LLC. 0003 // Subject to the terms in the LICENSE file found in the top-level directory. 0004 0005 #pragma once 0006 #include <JANA/Streaming/JMessage.h> 0007 #include <JANA/JEvent.h> 0008 0009 #include <queue> 0010 0011 /// JWindow is an abstract data structure for aggregating individual JMessages into a 0012 /// single JEvent. We generally assume that messages from any particular source arrive in-order, and 0013 /// make no assumptions about ordering between different sources. We provide different implementations to fit 0014 /// different use cases so that the user should rarely need to implement one of these by themselves. 0015 /// The choice of JWindow determines how the time interval associated with each 0016 /// JEvent is calculated, and furthermore is responsible for placing the correct JMessages into 0017 /// the JEvent. As long as the time intervals do not overlap, this amounts to simple transferring 0018 /// of ownership of a raw pointer. When JEvents intervals do overlap, which happens in the case of JSlidingWindow 0019 /// and JMergeWindow, we have to decide whether to use shared ownership or to clone the offending data. 0020 template <typename T> 0021 struct JWindow { 0022 0023 virtual ~JWindow() = default; 0024 virtual void pushMessage(T* message) = 0; 0025 virtual bool pullEvent(JEvent& event) = 0; 0026 0027 }; 0028 0029 0030 0031 /// JFixedWindow partitions time into fixed, contiguous buckets, and emits a JEvent containing 0032 /// all JMessages for all sources which fall into that bucket. 0033 template <typename T> 0034 class JFixedWindow : public JWindow<T> { 0035 public: 0036 void pushMessage(T* message) final; 0037 bool pullEvent(JEvent& event) final; 0038 private: 0039 }; 0040 0041 0042 /// JSessionWindow aggregates JMessages adaptively, i.e. a JEvent's time interval starts with the 0043 /// first JMessage and ends once there are no more JMessages timestamped before a configurable 0044 /// max interval width. This is usually what is meant by 'event-building'. 0045 template <typename T> 0046 class JSessionWindow : public JWindow<T> { 0047 private: 0048 template <typename S> 0049 struct maybe { 0050 bool is_none = true; 0051 S value; 0052 }; 0053 0054 public: 0055 0056 JSessionWindow(Timestamp event_interval, const std::vector<DetectorId> &detectors) 0057 : m_event_interval(event_interval) { 0058 for (auto id : detectors) { 0059 m_inbox.insert({id, {}}); 0060 } 0061 } 0062 0063 void pushMessage(T* message) final { 0064 // TODO: Why was messages a vector before? 0065 auto iter = m_inbox.find(message->source_id); 0066 if (iter == m_inbox.end()) { 0067 throw std::runtime_error("Unexpected detector!"); 0068 } else { 0069 iter->second.push_back(message); 0070 } 0071 }; 0072 0073 bool pullEvent(JEvent&) final { 0074 return false; 0075 }; 0076 0077 /* 0078 maybe<JMessage::Timestamp> find_next_event_start() { 0079 maybe<JMessage::Timestamp> result; 0080 result.is_none = true; 0081 if (!m_inbox.empty()) { 0082 result.is_none = false; 0083 auto iter = m_inbox.begin(); 0084 result.value = iter->second.timestamp; 0085 0086 while (++iter != m_inbox.end()) { 0087 if (iter->second.is_none) { 0088 result.is_none = true; 0089 return result; 0090 } 0091 result.value = std::min(result.value, iter->second.value); 0092 } 0093 } 0094 return result; 0095 } 0096 0097 bool stage_next_event() { 0098 if (m_next_event_start.is_none) { 0099 m_next_event_start = find_next_event_start(); 0100 if (m_next_event_start.is_none) { 0101 return false; 0102 } 0103 } 0104 Timestamp next_event_finish = m_next_event_start.value + m_event_interval; 0105 for (auto iter : m_inbox) { // for each detector 0106 auto &q = iter.second; 0107 while (q.front().timestamp <= next_event_finish) { 0108 m_outbox.push_back(std::move(q.front())); 0109 q.pop_front(); 0110 } 0111 if (q.empty()) { 0112 // we've run out of samples, so we can't say whether we have everything or not 0113 return false; 0114 } 0115 } 0116 return true; 0117 } 0118 0119 */ 0120 private: 0121 std::map<DetectorId, std::deque<JMessage*>> m_inbox; 0122 std::vector<T *> m_outbox; 0123 maybe<Timestamp> m_next_event_start; 0124 Timestamp m_event_interval; // TODO: This should be a duration 0125 }; 0126
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |