Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:35

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 #include <JANA/Streaming/JWindow.h>
0007 
0008 #include <map>
0009 #include <queue>
0010 
0011 /// JSessionWindow aggregates JMessages adaptively, i.e. a JEvent's time interval starts with the
0012 /// first JMessage and ends once there are no more JMessages timestamped before a configurable
0013 /// max interval width. This is usually what is meant by 'event-building'.
0014 template <typename T>
0015 class JSessionWindow : public JWindow<T> {
0016 private:
0017     template <typename S>
0018     struct maybe {
0019         bool is_none = true;
0020         S value;
0021     };
0022 
0023 public:
0024 
0025     JSessionWindow(Timestamp event_interval, const std::vector<DetectorId> &detectors)
0026     : m_event_interval(event_interval) {
0027         for (auto id : detectors) {
0028             m_inbox.insert({id, {}});
0029         }
0030     }
0031 
0032     void pushMessage(T* message) final {
0033         // TODO: Why was messages a vector before?
0034         auto iter = m_inbox.find(message->source_id);
0035         if (iter == m_inbox.end()) {
0036             throw std::runtime_error("Unexpected detector!");
0037         } else {
0038             iter->second.push_back(message);
0039         }
0040     };
0041 
0042     bool pullEvent(JEvent& event) final {
0043         return false;
0044     };
0045 
0046 /*
0047     maybe<JMessage::Timestamp> find_next_event_start() {
0048         maybe<JMessage::Timestamp> result;
0049         result.is_none = true;
0050         if (!m_inbox.empty()) {
0051             result.is_none = false;
0052             auto iter = m_inbox.begin();
0053             result.value = iter->second.timestamp;
0054 
0055             while (++iter != m_inbox.end()) {
0056                 if (iter->second.is_none) {
0057                     result.is_none = true;
0058                     return result;
0059                 }
0060                 result.value = std::min(result.value, iter->second.value);
0061             }
0062         }
0063         return result;
0064     }
0065 
0066     bool stage_next_event() {
0067         if (m_next_event_start.is_none) {
0068             m_next_event_start = find_next_event_start();
0069             if (m_next_event_start.is_none) {
0070                 return false;
0071             }
0072         }
0073         Timestamp next_event_finish = m_next_event_start.value + m_event_interval;
0074         for (auto iter : m_inbox) { // for each detector
0075             auto &q = iter.second;
0076             while (q.front().timestamp <= next_event_finish) {
0077                 m_outbox.push_back(std::move(q.front()));
0078                 q.pop_front();
0079             }
0080             if (q.empty()) {
0081                 // we've run out of samples, so we can't say whether we have everything or not
0082                 return false;
0083             }
0084         }
0085         return true;
0086     }
0087 
0088 */
0089 private:
0090     std::map<DetectorId, std::deque<JMessage*>> m_inbox;
0091     std::vector<T *> m_outbox;
0092     maybe<Timestamp> m_next_event_start;
0093     Timestamp m_event_interval; // TODO: This should be a duration
0094 };
0095 
0096