|
||||
File indexing completed on 2025-01-18 10:17:35
0001 0002 // Copyright 2020, Jefferson Science Associates, LLC. 0003 // Subject to the terms in the LICENSE file found in the top-level directory. 0004 0005 #pragma once 0006 #include <JANA/Streaming/JWindow.h> 0007 0008 #include <map> 0009 #include <queue> 0010 0011 /// JSessionWindow aggregates JMessages adaptively, i.e. a JEvent's time interval starts with the 0012 /// first JMessage and ends once there are no more JMessages timestamped before a configurable 0013 /// max interval width. This is usually what is meant by 'event-building'. 0014 template <typename T> 0015 class JSessionWindow : public JWindow<T> { 0016 private: 0017 template <typename S> 0018 struct maybe { 0019 bool is_none = true; 0020 S value; 0021 }; 0022 0023 public: 0024 0025 JSessionWindow(Timestamp event_interval, const std::vector<DetectorId> &detectors) 0026 : m_event_interval(event_interval) { 0027 for (auto id : detectors) { 0028 m_inbox.insert({id, {}}); 0029 } 0030 } 0031 0032 void pushMessage(T* message) final { 0033 // TODO: Why was messages a vector before? 0034 auto iter = m_inbox.find(message->source_id); 0035 if (iter == m_inbox.end()) { 0036 throw std::runtime_error("Unexpected detector!"); 0037 } else { 0038 iter->second.push_back(message); 0039 } 0040 }; 0041 0042 bool pullEvent(JEvent& event) final { 0043 return false; 0044 }; 0045 0046 /* 0047 maybe<JMessage::Timestamp> find_next_event_start() { 0048 maybe<JMessage::Timestamp> result; 0049 result.is_none = true; 0050 if (!m_inbox.empty()) { 0051 result.is_none = false; 0052 auto iter = m_inbox.begin(); 0053 result.value = iter->second.timestamp; 0054 0055 while (++iter != m_inbox.end()) { 0056 if (iter->second.is_none) { 0057 result.is_none = true; 0058 return result; 0059 } 0060 result.value = std::min(result.value, iter->second.value); 0061 } 0062 } 0063 return result; 0064 } 0065 0066 bool stage_next_event() { 0067 if (m_next_event_start.is_none) { 0068 m_next_event_start = find_next_event_start(); 0069 if (m_next_event_start.is_none) { 0070 return false; 0071 } 0072 } 0073 Timestamp next_event_finish = m_next_event_start.value + m_event_interval; 0074 for (auto iter : m_inbox) { // for each detector 0075 auto &q = iter.second; 0076 while (q.front().timestamp <= next_event_finish) { 0077 m_outbox.push_back(std::move(q.front())); 0078 q.pop_front(); 0079 } 0080 if (q.empty()) { 0081 // we've run out of samples, so we can't say whether we have everything or not 0082 return false; 0083 } 0084 } 0085 return true; 0086 } 0087 0088 */ 0089 private: 0090 std::map<DetectorId, std::deque<JMessage*>> m_inbox; 0091 std::vector<T *> m_outbox; 0092 maybe<Timestamp> m_next_event_start; 0093 Timestamp m_event_interval; // TODO: This should be a duration 0094 }; 0095 0096
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |