File indexing completed on 2025-09-15 08:39:21
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0017 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0018
0019 #include <boost/log/detail/config.hpp>
0020
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: This header content is only supported in multithreaded environment
0027 #endif
0028
0029 #include <queue>
0030 #include <vector>
0031 #include <chrono>
0032 #include <mutex>
0033 #include <condition_variable>
0034 #include <boost/log/detail/enqueued_record.hpp>
0035 #include <boost/log/keywords/order.hpp>
0036 #include <boost/log/keywords/ordering_window.hpp>
0037 #include <boost/log/core/record_view.hpp>
0038 #include <boost/log/detail/header.hpp>
0039
0040 namespace boost {
0041
0042 BOOST_LOG_OPEN_NAMESPACE
0043
0044 namespace sinks {
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064 template< typename OrderT >
0065 class unbounded_ordering_queue
0066 {
0067 private:
0068 typedef std::mutex mutex_type;
0069 typedef sinks::aux::enqueued_record enqueued_record;
0070
0071 typedef std::priority_queue<
0072 enqueued_record,
0073 std::vector< enqueued_record >,
0074 enqueued_record::order< OrderT >
0075 > queue_type;
0076
0077 private:
0078
0079 const std::chrono::steady_clock::duration m_ordering_window;
0080
0081 mutex_type m_mutex;
0082
0083 std::condition_variable m_cond;
0084
0085 queue_type m_queue;
0086
0087 bool m_interruption_requested;
0088
0089 public:
0090
0091
0092
0093 std::chrono::steady_clock::duration get_ordering_window() const
0094 {
0095 return m_ordering_window;
0096 }
0097
0098
0099
0100
0101
0102 static BOOST_CONSTEXPR std::chrono::steady_clock::duration get_default_ordering_window() BOOST_NOEXCEPT
0103 {
0104
0105
0106
0107
0108
0109
0110
0111 return std::chrono::milliseconds(30);
0112 }
0113
0114 protected:
0115
0116 template< typename ArgsT >
0117 explicit unbounded_ordering_queue(ArgsT const& args) :
0118 m_ordering_window(std::chrono::duration_cast< std::chrono::steady_clock::duration >(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window])),
0119 m_queue(args[keywords::order]),
0120 m_interruption_requested(false)
0121 {
0122 }
0123
0124
0125 void enqueue(record_view const& rec)
0126 {
0127 std::lock_guard< mutex_type > lock(m_mutex);
0128 enqueue_unlocked(rec);
0129 }
0130
0131
0132 bool try_enqueue(record_view const& rec)
0133 {
0134 std::unique_lock< mutex_type > lock(m_mutex, std::try_to_lock);
0135 if (lock.owns_lock())
0136 {
0137 enqueue_unlocked(rec);
0138 return true;
0139 }
0140 else
0141 return false;
0142 }
0143
0144
0145 bool try_dequeue_ready(record_view& rec)
0146 {
0147 std::lock_guard< mutex_type > lock(m_mutex);
0148 if (!m_queue.empty())
0149 {
0150 const auto now = std::chrono::steady_clock::now();
0151 enqueued_record const& elem = m_queue.top();
0152 if ((now - elem.m_timestamp) >= m_ordering_window)
0153 {
0154
0155 rec = elem.m_record;
0156 m_queue.pop();
0157 return true;
0158 }
0159 }
0160
0161 return false;
0162 }
0163
0164
0165 bool try_dequeue(record_view& rec)
0166 {
0167 std::lock_guard< mutex_type > lock(m_mutex);
0168 if (!m_queue.empty())
0169 {
0170 enqueued_record const& elem = m_queue.top();
0171 rec = elem.m_record;
0172 m_queue.pop();
0173 return true;
0174 }
0175
0176 return false;
0177 }
0178
0179
0180 bool dequeue_ready(record_view& rec)
0181 {
0182 std::unique_lock< mutex_type > lock(m_mutex);
0183 while (!m_interruption_requested)
0184 {
0185 if (!m_queue.empty())
0186 {
0187 const auto now = std::chrono::steady_clock::now();
0188 enqueued_record const& elem = m_queue.top();
0189 const auto difference = now - elem.m_timestamp;
0190 if (difference >= m_ordering_window)
0191 {
0192
0193 rec = elem.m_record;
0194 m_queue.pop();
0195 return true;
0196 }
0197 else
0198 {
0199
0200 m_cond.wait_for(lock, m_ordering_window - difference);
0201 }
0202 }
0203 else
0204 {
0205
0206 m_cond.wait(lock);
0207 }
0208 }
0209 m_interruption_requested = false;
0210
0211 return false;
0212 }
0213
0214
0215 void interrupt_dequeue()
0216 {
0217 std::lock_guard< mutex_type > lock(m_mutex);
0218 m_interruption_requested = true;
0219 m_cond.notify_one();
0220 }
0221
0222 private:
0223
0224 void enqueue_unlocked(record_view const& rec)
0225 {
0226 const bool was_empty = m_queue.empty();
0227 m_queue.push(enqueued_record(rec));
0228 if (was_empty)
0229 m_cond.notify_one();
0230 }
0231 };
0232
0233 }
0234
0235 BOOST_LOG_CLOSE_NAMESPACE
0236
0237 }
0238
0239 #include <boost/log/detail/footer.hpp>
0240
0241 #endif