File indexing completed on 2025-01-18 10:01:39
0001
0002
0003
0004
0005 #pragma once
0006 #include <mutex>
0007 #include <chrono>
0008
0009 class JArrowMetrics {
0010
0011 public:
0012 enum class Status {KeepGoing, ComeBackLater, Finished, NotRunYet, Error};
0013 using duration_t = std::chrono::steady_clock::duration;
0014
0015 private:
0016 mutable std::mutex m_mutex;
0017
0018
0019 Status m_last_status;
0020 size_t m_total_message_count;
0021 size_t m_last_message_count;
0022 size_t m_total_queue_visits;
0023 size_t m_last_queue_visits;
0024 duration_t m_total_latency;
0025 duration_t m_last_latency;
0026 duration_t m_total_queue_latency;
0027 duration_t m_last_queue_latency;
0028
0029
0030
0031
0032
0033
0034 public:
0035 void clear() {
0036
0037 m_mutex.lock();
0038 m_last_status = Status::NotRunYet;
0039 m_total_message_count = 0;
0040 m_last_message_count = 0;
0041 m_total_queue_visits = 0;
0042 m_last_queue_visits = 0;
0043 m_total_latency = duration_t::zero();
0044 m_last_latency = duration_t::zero();
0045 m_total_queue_latency = duration_t::zero();
0046 m_last_queue_latency = duration_t::zero();
0047 m_mutex.unlock();
0048 }
0049
0050 void take(JArrowMetrics& other) {
0051
0052 m_mutex.lock();
0053 other.m_mutex.lock();
0054
0055 if (other.m_last_message_count != 0) {
0056 m_last_message_count = other.m_last_message_count;
0057 m_last_latency = other.m_last_latency;
0058 }
0059
0060 m_last_status = other.m_last_status;
0061 m_total_message_count += other.m_total_message_count;
0062 m_total_queue_visits += other.m_total_queue_visits;
0063 m_last_queue_visits = other.m_last_queue_visits;
0064 m_total_latency += other.m_total_latency;
0065 m_total_queue_latency += other.m_total_queue_latency;
0066 m_last_queue_latency = other.m_last_queue_latency;
0067
0068 other.m_last_status = Status::NotRunYet;
0069 other.m_total_message_count = 0;
0070 other.m_last_message_count = 0;
0071 other.m_total_queue_visits = 0;
0072 other.m_last_queue_visits = 0;
0073 other.m_total_latency = duration_t::zero();
0074 other.m_last_latency = duration_t::zero();
0075 other.m_total_queue_latency = duration_t::zero();
0076 other.m_last_queue_latency = duration_t::zero();
0077 other.m_mutex.unlock();
0078 m_mutex.unlock();
0079 };
0080
0081 void update(const JArrowMetrics &other) {
0082
0083 m_mutex.lock();
0084 other.m_mutex.lock();
0085
0086 if (other.m_last_message_count != 0) {
0087 m_last_message_count = other.m_last_message_count;
0088 m_last_latency = other.m_last_latency;
0089 }
0090 m_total_latency += other.m_total_latency;
0091 m_last_status = other.m_last_status;
0092 m_total_message_count += other.m_total_message_count;
0093 m_total_queue_visits += other.m_total_queue_visits;
0094 m_last_queue_visits = other.m_last_queue_visits;
0095 m_total_queue_latency += other.m_total_queue_latency;
0096 m_last_queue_latency = other.m_last_queue_latency;
0097 other.m_mutex.unlock();
0098 m_mutex.unlock();
0099 };
0100
0101 void update_finished() {
0102 m_mutex.lock();
0103 m_last_status = Status::Finished;
0104 m_mutex.unlock();
0105 }
0106
0107 void update(const Status& last_status,
0108 const size_t& message_count_delta,
0109 const size_t& queue_visit_delta,
0110 const duration_t& latency_delta,
0111 const duration_t& queue_latency_delta) {
0112
0113 m_mutex.lock();
0114 m_last_status = last_status;
0115
0116 if (message_count_delta > 0) {
0117
0118
0119
0120 m_last_message_count = message_count_delta;
0121 m_last_latency = latency_delta;
0122 }
0123 m_total_message_count += message_count_delta;
0124 m_total_queue_visits += queue_visit_delta;
0125 m_last_queue_visits = queue_visit_delta;
0126 m_total_latency += latency_delta;
0127 m_total_queue_latency += queue_latency_delta;
0128 m_last_queue_latency = queue_latency_delta;
0129 m_mutex.unlock();
0130
0131 };
0132
0133 void get(Status& last_status,
0134 size_t& total_message_count,
0135 size_t& last_message_count,
0136 size_t& total_queue_visits,
0137 size_t& last_queue_visits,
0138 duration_t& total_latency,
0139 duration_t& last_latency,
0140 duration_t& total_queue_latency,
0141 duration_t& last_queue_latency) {
0142
0143 m_mutex.lock();
0144 last_status = m_last_status;
0145 total_message_count = m_total_message_count;
0146 last_message_count = m_last_message_count;
0147 total_queue_visits = m_total_queue_visits;
0148 last_queue_visits = m_last_queue_visits;
0149 total_latency = m_total_latency;
0150 last_latency = m_last_latency;
0151 total_queue_latency = m_total_queue_latency;
0152 last_queue_latency = m_last_queue_latency;
0153 m_mutex.unlock();
0154 }
0155
0156 size_t get_total_message_count() {
0157 std::lock_guard<std::mutex> lock(m_mutex);
0158 return m_total_message_count;
0159 }
0160
0161 Status get_last_status() {
0162 std::lock_guard<std::mutex> lock(m_mutex);
0163 return m_last_status;
0164 }
0165
0166 void summarize() {
0167
0168 }
0169 };
0170
0171 inline std::string to_string(JArrowMetrics::Status h) {
0172 switch (h) {
0173 case JArrowMetrics::Status::KeepGoing: return "KeepGoing";
0174 case JArrowMetrics::Status::ComeBackLater: return "ComeBackLater";
0175 case JArrowMetrics::Status::Finished: return "Finished";
0176 case JArrowMetrics::Status::NotRunYet: return "NotRunYet";
0177 case JArrowMetrics::Status::Error:
0178 default: return "Error";
0179 }
0180 }
0181