Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:35

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 #include <JANA/Streaming/JMessage.h>
0007 #include <JANA/JEvent.h>
0008 
0009 #include <queue>
0010 
0011 /// JWindow is an abstract data structure for aggregating individual JMessages into a
0012 /// single JEvent.  We generally assume that messages from any particular source arrive in-order, and
0013 /// make no assumptions about ordering between different sources. We provide different implementations to fit
0014 /// different use cases so that the user should rarely need to implement one of these by themselves.
0015 /// The choice of JWindow determines how the time interval associated with each
0016 /// JEvent is calculated, and furthermore is responsible for placing the correct JMessages into
0017 /// the JEvent. As long as the time intervals do not overlap, this amounts to simple transferring
0018 /// of ownership of a raw pointer. When JEvents intervals do overlap, which happens in the case of JSlidingWindow
0019 /// and JMergeWindow, we have to decide whether to use shared ownership or to clone the offending data.
0020 template <typename T>
0021 struct JWindow {
0022 
0023     virtual ~JWindow() = default;
0024     virtual void pushMessage(T* message) = 0;
0025     virtual bool pullEvent(JEvent& event) = 0;
0026 
0027 };
0028 
0029 
0030 
0031 /// JFixedWindow partitions time into fixed, contiguous buckets, and emits a JEvent containing
0032 /// all JMessages for all sources which fall into that bucket.
0033 template <typename T>
0034 class JFixedWindow : public JWindow<T> {
0035 public:
0036     void pushMessage(T* message) final;
0037     bool pullEvent(JEvent& event) final;
0038 private:
0039 };
0040 
0041 
0042 /// JSessionWindow aggregates JMessages adaptively, i.e. a JEvent's time interval starts with the
0043 /// first JMessage and ends once there are no more JMessages timestamped before a configurable
0044 /// max interval width. This is usually what is meant by 'event-building'.
0045 template <typename T>
0046 class JSessionWindow : public JWindow<T> {
0047 private:
0048     template <typename S>
0049     struct maybe {
0050         bool is_none = true;
0051         S value;
0052     };
0053 
0054 public:
0055 
0056     JSessionWindow(Timestamp event_interval, const std::vector<DetectorId> &detectors)
0057             : m_event_interval(event_interval) {
0058         for (auto id : detectors) {
0059             m_inbox.insert({id, {}});
0060         }
0061     }
0062 
0063     void pushMessage(T* message) final {
0064         // TODO: Why was messages a vector before?
0065         auto iter = m_inbox.find(message->source_id);
0066         if (iter == m_inbox.end()) {
0067             throw std::runtime_error("Unexpected detector!");
0068         } else {
0069             iter->second.push_back(message);
0070         }
0071     };
0072 
0073     bool pullEvent(JEvent&) final {
0074         return false;
0075     };
0076 
0077 /*
0078     maybe<JMessage::Timestamp> find_next_event_start() {
0079         maybe<JMessage::Timestamp> result;
0080         result.is_none = true;
0081         if (!m_inbox.empty()) {
0082             result.is_none = false;
0083             auto iter = m_inbox.begin();
0084             result.value = iter->second.timestamp;
0085 
0086             while (++iter != m_inbox.end()) {
0087                 if (iter->second.is_none) {
0088                     result.is_none = true;
0089                     return result;
0090                 }
0091                 result.value = std::min(result.value, iter->second.value);
0092             }
0093         }
0094         return result;
0095     }
0096 
0097     bool stage_next_event() {
0098         if (m_next_event_start.is_none) {
0099             m_next_event_start = find_next_event_start();
0100             if (m_next_event_start.is_none) {
0101                 return false;
0102             }
0103         }
0104         Timestamp next_event_finish = m_next_event_start.value + m_event_interval;
0105         for (auto iter : m_inbox) { // for each detector
0106             auto &q = iter.second;
0107             while (q.front().timestamp <= next_event_finish) {
0108                 m_outbox.push_back(std::move(q.front()));
0109                 q.pop_front();
0110             }
0111             if (q.empty()) {
0112                 // we've run out of samples, so we can't say whether we have everything or not
0113                 return false;
0114             }
0115         }
0116         return true;
0117     }
0118 
0119 */
0120 private:
0121     std::map<DetectorId, std::deque<JMessage*>> m_inbox;
0122     std::vector<T *> m_outbox;
0123     maybe<Timestamp> m_next_event_start;
0124     Timestamp m_event_interval; // TODO: This should be a duration
0125 };
0126