Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:01:39

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 #pragma once
0006 #include <mutex>
0007 #include <chrono>
0008 
0009 class JArrowMetrics {
0010 
0011 public:
0012     enum class Status {KeepGoing, ComeBackLater, Finished, NotRunYet, Error};
0013     using duration_t = std::chrono::steady_clock::duration;
0014 
0015 private:
0016     mutable std::mutex m_mutex;
0017     // Mutex is mutable so that we can lock before reading from a const ref
0018 
0019     Status m_last_status;
0020     size_t m_total_message_count;
0021     size_t m_last_message_count;
0022     size_t m_total_queue_visits;
0023     size_t m_last_queue_visits;
0024     duration_t m_total_latency;
0025     duration_t m_last_latency;
0026     duration_t m_total_queue_latency;
0027     duration_t m_last_queue_latency;
0028 
0029 
0030     // TODO: We might want to add a timestamp, so that
0031     // the 'last_*' measurements can reflect the most recent value,
0032     // rather than the last-to-be-accumulated value.
0033 
0034 public:
0035     void clear() {
0036 
0037         m_mutex.lock();
0038         m_last_status = Status::NotRunYet;
0039         m_total_message_count = 0;
0040         m_last_message_count = 0;
0041         m_total_queue_visits = 0;
0042         m_last_queue_visits = 0;
0043         m_total_latency = duration_t::zero();
0044         m_last_latency = duration_t::zero();
0045         m_total_queue_latency = duration_t::zero();
0046         m_last_queue_latency = duration_t::zero();
0047         m_mutex.unlock();
0048     }
0049 
0050     void take(JArrowMetrics& other) {
0051 
0052         m_mutex.lock();
0053         other.m_mutex.lock();
0054 
0055         if (other.m_last_message_count != 0) {
0056             m_last_message_count = other.m_last_message_count;
0057             m_last_latency = other.m_last_latency;
0058         }
0059 
0060         m_last_status = other.m_last_status;
0061         m_total_message_count += other.m_total_message_count;
0062         m_total_queue_visits += other.m_total_queue_visits;
0063         m_last_queue_visits = other.m_last_queue_visits;
0064         m_total_latency += other.m_total_latency;
0065         m_total_queue_latency += other.m_total_queue_latency;
0066         m_last_queue_latency = other.m_last_queue_latency;
0067 
0068         other.m_last_status = Status::NotRunYet;
0069         other.m_total_message_count = 0;
0070         other.m_last_message_count = 0;
0071         other.m_total_queue_visits = 0;
0072         other.m_last_queue_visits = 0;
0073         other.m_total_latency = duration_t::zero();
0074         other.m_last_latency = duration_t::zero();
0075         other.m_total_queue_latency = duration_t::zero();
0076         other.m_last_queue_latency = duration_t::zero();
0077         other.m_mutex.unlock();
0078         m_mutex.unlock();
0079     };
0080 
0081     void update(const JArrowMetrics &other) {
0082 
0083         m_mutex.lock();
0084         other.m_mutex.lock();
0085 
0086         if (other.m_last_message_count != 0) {
0087             m_last_message_count = other.m_last_message_count;
0088             m_last_latency = other.m_last_latency;
0089         }
0090         m_total_latency += other.m_total_latency;
0091         m_last_status = other.m_last_status;
0092         m_total_message_count += other.m_total_message_count;
0093         m_total_queue_visits += other.m_total_queue_visits;
0094         m_last_queue_visits = other.m_last_queue_visits;
0095         m_total_queue_latency += other.m_total_queue_latency;
0096         m_last_queue_latency = other.m_last_queue_latency;
0097         other.m_mutex.unlock();
0098         m_mutex.unlock();
0099     };
0100 
0101     void update_finished() {
0102         m_mutex.lock();
0103         m_last_status = Status::Finished;
0104         m_mutex.unlock();
0105     }
0106 
0107     void update(const Status& last_status,
0108                 const size_t& message_count_delta,
0109                 const size_t& queue_visit_delta,
0110                 const duration_t& latency_delta,
0111                 const duration_t& queue_latency_delta) {
0112 
0113         m_mutex.lock();
0114         m_last_status = last_status;
0115 
0116         if (message_count_delta > 0) {
0117             // We don't want to lose our most recent latency numbers
0118             // when the most recent execute() encounters an empty
0119             // queue and consequently processes zero items.
0120             m_last_message_count = message_count_delta;
0121             m_last_latency = latency_delta;
0122         }
0123         m_total_message_count += message_count_delta;
0124         m_total_queue_visits += queue_visit_delta;
0125         m_last_queue_visits = queue_visit_delta;
0126         m_total_latency += latency_delta;
0127         m_total_queue_latency += queue_latency_delta;
0128         m_last_queue_latency = queue_latency_delta;
0129         m_mutex.unlock();
0130 
0131     };
0132 
0133     void get(Status& last_status,
0134              size_t& total_message_count,
0135              size_t& last_message_count,
0136              size_t& total_queue_visits,
0137              size_t& last_queue_visits,
0138              duration_t& total_latency,
0139              duration_t& last_latency,
0140              duration_t& total_queue_latency,
0141              duration_t& last_queue_latency) {
0142 
0143         m_mutex.lock();
0144         last_status = m_last_status;
0145         total_message_count = m_total_message_count;
0146         last_message_count = m_last_message_count;
0147         total_queue_visits = m_total_queue_visits;
0148         last_queue_visits = m_last_queue_visits;
0149         total_latency = m_total_latency;
0150         last_latency = m_last_latency;
0151         total_queue_latency = m_total_queue_latency;
0152         last_queue_latency = m_last_queue_latency;
0153         m_mutex.unlock();
0154     }
0155 
0156     size_t get_total_message_count() {
0157         std::lock_guard<std::mutex> lock(m_mutex);
0158         return m_total_message_count;
0159     }
0160 
0161     Status get_last_status() {
0162         std::lock_guard<std::mutex> lock(m_mutex);
0163         return m_last_status;
0164     }
0165 
0166     void summarize() {
0167 
0168     }
0169 };
0170 
0171 inline std::string to_string(JArrowMetrics::Status h) {
0172     switch (h) {
0173         case JArrowMetrics::Status::KeepGoing:     return "KeepGoing";
0174         case JArrowMetrics::Status::ComeBackLater: return "ComeBackLater";
0175         case JArrowMetrics::Status::Finished:      return "Finished";
0176         case JArrowMetrics::Status::NotRunYet:     return "NotRunYet";
0177         case JArrowMetrics::Status::Error:
0178         default:                          return "Error";
0179     }
0180 }
0181