File indexing completed on 2025-10-30 08:21:43
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0017 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0018
0019 #include <boost/log/detail/config.hpp>
0020
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: This header content is only supported in multithreaded environment
0027 #endif
0028
0029 #include <cstddef>
0030 #include <queue>
0031 #include <vector>
0032 #include <chrono>
0033 #include <mutex>
0034 #include <condition_variable>
0035 #include <boost/log/detail/enqueued_record.hpp>
0036 #include <boost/log/keywords/order.hpp>
0037 #include <boost/log/keywords/ordering_window.hpp>
0038 #include <boost/log/core/record_view.hpp>
0039 #include <boost/log/detail/header.hpp>
0040
0041 namespace boost {
0042
0043 BOOST_LOG_OPEN_NAMESPACE
0044
0045 namespace sinks {
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
0067 class bounded_ordering_queue :
0068 private OverflowStrategyT
0069 {
0070 private:
0071 typedef OverflowStrategyT overflow_strategy;
0072 typedef std::mutex mutex_type;
0073 typedef sinks::aux::enqueued_record enqueued_record;
0074
0075 typedef std::priority_queue<
0076 enqueued_record,
0077 std::vector< enqueued_record >,
0078 enqueued_record::order< OrderT >
0079 > queue_type;
0080
0081 private:
0082
0083 const std::chrono::steady_clock::duration m_ordering_window;
0084
0085 mutex_type m_mutex;
0086
0087 std::condition_variable m_cond;
0088
0089 queue_type m_queue;
0090
0091 bool m_interruption_requested;
0092
0093 public:
0094
0095
0096
0097 std::chrono::steady_clock::duration get_ordering_window() const
0098 {
0099 return m_ordering_window;
0100 }
0101
0102
0103
0104
0105
0106 static BOOST_CONSTEXPR std::chrono::steady_clock::duration get_default_ordering_window() BOOST_NOEXCEPT
0107 {
0108
0109
0110
0111
0112
0113
0114
0115 return std::chrono::milliseconds(30);
0116 }
0117
0118 protected:
0119
0120 template< typename ArgsT >
0121 explicit bounded_ordering_queue(ArgsT const& args) :
0122 m_ordering_window(std::chrono::duration_cast< std::chrono::steady_clock::duration >(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window])),
0123 m_queue(args[keywords::order]),
0124 m_interruption_requested(false)
0125 {
0126 }
0127
0128
0129 void enqueue(record_view const& rec)
0130 {
0131 std::unique_lock< mutex_type > lock(m_mutex);
0132 std::size_t size = m_queue.size();
0133 for (; size >= MaxQueueSizeV; size = m_queue.size())
0134 {
0135 if (!overflow_strategy::on_overflow(rec, lock))
0136 return;
0137 }
0138
0139 m_queue.push(enqueued_record(rec));
0140 if (size == 0)
0141 m_cond.notify_one();
0142 }
0143
0144
0145 bool try_enqueue(record_view const& rec)
0146 {
0147 std::unique_lock< mutex_type > lock(m_mutex, std::try_to_lock);
0148 if (lock.owns_lock())
0149 {
0150 const std::size_t size = m_queue.size();
0151
0152
0153 if (size < MaxQueueSizeV)
0154 {
0155 m_queue.push(enqueued_record(rec));
0156 if (size == 0)
0157 m_cond.notify_one();
0158 return true;
0159 }
0160 }
0161
0162 return false;
0163 }
0164
0165
0166 bool try_dequeue_ready(record_view& rec)
0167 {
0168 std::lock_guard< mutex_type > lock(m_mutex);
0169 const std::size_t size = m_queue.size();
0170 if (size > 0)
0171 {
0172 const auto now = std::chrono::steady_clock::now();
0173 enqueued_record const& elem = m_queue.top();
0174 if ((now - elem.m_timestamp) >= m_ordering_window)
0175 {
0176
0177 rec = elem.m_record;
0178 m_queue.pop();
0179 overflow_strategy::on_queue_space_available();
0180 return true;
0181 }
0182 }
0183
0184 return false;
0185 }
0186
0187
0188 bool try_dequeue(record_view& rec)
0189 {
0190 std::lock_guard< mutex_type > lock(m_mutex);
0191 const std::size_t size = m_queue.size();
0192 if (size > 0)
0193 {
0194 enqueued_record const& elem = m_queue.top();
0195 rec = elem.m_record;
0196 m_queue.pop();
0197 overflow_strategy::on_queue_space_available();
0198 return true;
0199 }
0200
0201 return false;
0202 }
0203
0204
0205 bool dequeue_ready(record_view& rec)
0206 {
0207 std::unique_lock< mutex_type > lock(m_mutex);
0208
0209 while (!m_interruption_requested)
0210 {
0211 const std::size_t size = m_queue.size();
0212 if (size > 0)
0213 {
0214 const auto now = std::chrono::steady_clock::now();
0215 enqueued_record const& elem = m_queue.top();
0216 const auto difference = now - elem.m_timestamp;
0217 if (difference >= m_ordering_window)
0218 {
0219 rec = elem.m_record;
0220 m_queue.pop();
0221 overflow_strategy::on_queue_space_available();
0222 return true;
0223 }
0224 else
0225 {
0226
0227 m_cond.wait_for(lock, m_ordering_window - difference);
0228 }
0229 }
0230 else
0231 {
0232 m_cond.wait(lock);
0233 }
0234 }
0235 m_interruption_requested = false;
0236
0237 return false;
0238 }
0239
0240
0241 void interrupt_dequeue()
0242 {
0243 std::lock_guard< mutex_type > lock(m_mutex);
0244 m_interruption_requested = true;
0245 overflow_strategy::interrupt();
0246 m_cond.notify_one();
0247 }
0248 };
0249
0250 }
0251
0252 BOOST_LOG_CLOSE_NAMESPACE
0253
0254 }
0255
0256 #include <boost/log/detail/footer.hpp>
0257
0258 #endif