File indexing completed on 2025-01-18 09:39:25
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0017 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0018
0019 #include <boost/log/detail/config.hpp>
0020
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: This header content is only supported in multithreaded environment
0027 #endif
0028
0029 #include <cstddef>
0030 #include <queue>
0031 #include <vector>
0032 #include <boost/cstdint.hpp>
0033 #include <boost/thread/locks.hpp>
0034 #include <boost/thread/mutex.hpp>
0035 #include <boost/thread/condition_variable.hpp>
0036 #include <boost/thread/thread_time.hpp>
0037 #include <boost/date_time/posix_time/posix_time_types.hpp>
0038 #include <boost/log/detail/timestamp.hpp>
0039 #include <boost/log/detail/enqueued_record.hpp>
0040 #include <boost/log/keywords/order.hpp>
0041 #include <boost/log/keywords/ordering_window.hpp>
0042 #include <boost/log/core/record_view.hpp>
0043 #include <boost/log/detail/header.hpp>
0044
0045 namespace boost {
0046
0047 BOOST_LOG_OPEN_NAMESPACE
0048
0049 namespace sinks {
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
0071 class bounded_ordering_queue :
0072 private OverflowStrategyT
0073 {
0074 private:
0075 typedef OverflowStrategyT overflow_strategy;
0076 typedef boost::mutex mutex_type;
0077 typedef sinks::aux::enqueued_record enqueued_record;
0078
0079 typedef std::priority_queue<
0080 enqueued_record,
0081 std::vector< enqueued_record >,
0082 enqueued_record::order< OrderT >
0083 > queue_type;
0084
0085 private:
0086
0087 const uint64_t m_ordering_window;
0088
0089 mutex_type m_mutex;
0090
0091 condition_variable m_cond;
0092
0093 queue_type m_queue;
0094
0095 bool m_interruption_requested;
0096
0097 public:
0098
0099
0100
0101 posix_time::time_duration get_ordering_window() const
0102 {
0103 return posix_time::milliseconds(m_ordering_window);
0104 }
0105
0106
0107
0108
0109
0110 static posix_time::time_duration get_default_ordering_window()
0111 {
0112
0113
0114
0115
0116
0117
0118
0119 return posix_time::milliseconds(30);
0120 }
0121
0122 protected:
0123
0124 template< typename ArgsT >
0125 explicit bounded_ordering_queue(ArgsT const& args) :
0126 m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
0127 m_queue(args[keywords::order]),
0128 m_interruption_requested(false)
0129 {
0130 }
0131
0132
0133 void enqueue(record_view const& rec)
0134 {
0135 unique_lock< mutex_type > lock(m_mutex);
0136 std::size_t size = m_queue.size();
0137 for (; size >= MaxQueueSizeV; size = m_queue.size())
0138 {
0139 if (!overflow_strategy::on_overflow(rec, lock))
0140 return;
0141 }
0142
0143 m_queue.push(enqueued_record(rec));
0144 if (size == 0)
0145 m_cond.notify_one();
0146 }
0147
0148
0149 bool try_enqueue(record_view const& rec)
0150 {
0151 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
0152 if (lock.owns_lock())
0153 {
0154 const std::size_t size = m_queue.size();
0155
0156
0157 if (size < MaxQueueSizeV)
0158 {
0159 m_queue.push(enqueued_record(rec));
0160 if (size == 0)
0161 m_cond.notify_one();
0162 return true;
0163 }
0164 }
0165
0166 return false;
0167 }
0168
0169
0170 bool try_dequeue_ready(record_view& rec)
0171 {
0172 lock_guard< mutex_type > lock(m_mutex);
0173 const std::size_t size = m_queue.size();
0174 if (size > 0)
0175 {
0176 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
0177 enqueued_record const& elem = m_queue.top();
0178 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
0179 {
0180
0181 rec = elem.m_record;
0182 m_queue.pop();
0183 overflow_strategy::on_queue_space_available();
0184 return true;
0185 }
0186 }
0187
0188 return false;
0189 }
0190
0191
0192 bool try_dequeue(record_view& rec)
0193 {
0194 lock_guard< mutex_type > lock(m_mutex);
0195 const std::size_t size = m_queue.size();
0196 if (size > 0)
0197 {
0198 enqueued_record const& elem = m_queue.top();
0199 rec = elem.m_record;
0200 m_queue.pop();
0201 overflow_strategy::on_queue_space_available();
0202 return true;
0203 }
0204
0205 return false;
0206 }
0207
0208
0209 bool dequeue_ready(record_view& rec)
0210 {
0211 unique_lock< mutex_type > lock(m_mutex);
0212
0213 while (!m_interruption_requested)
0214 {
0215 const std::size_t size = m_queue.size();
0216 if (size > 0)
0217 {
0218 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
0219 enqueued_record const& elem = m_queue.top();
0220 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
0221 if (difference >= m_ordering_window)
0222 {
0223 rec = elem.m_record;
0224 m_queue.pop();
0225 overflow_strategy::on_queue_space_available();
0226 return true;
0227 }
0228 else
0229 {
0230
0231 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
0232 }
0233 }
0234 else
0235 {
0236 m_cond.wait(lock);
0237 }
0238 }
0239 m_interruption_requested = false;
0240
0241 return false;
0242 }
0243
0244
0245 void interrupt_dequeue()
0246 {
0247 lock_guard< mutex_type > lock(m_mutex);
0248 m_interruption_requested = true;
0249 overflow_strategy::interrupt();
0250 m_cond.notify_one();
0251 }
0252 };
0253
0254 }
0255
0256 BOOST_LOG_CLOSE_NAMESPACE
0257
0258 }
0259
0260 #include <boost/log/detail/footer.hpp>
0261
0262 #endif