File indexing completed on 2025-01-18 09:39:25
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0017 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0018
0019 #include <boost/log/detail/config.hpp>
0020
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: This header content is only supported in multithreaded environment
0027 #endif
0028
0029 #include <queue>
0030 #include <vector>
0031 #include <boost/cstdint.hpp>
0032 #include <boost/thread/locks.hpp>
0033 #include <boost/thread/mutex.hpp>
0034 #include <boost/thread/condition_variable.hpp>
0035 #include <boost/thread/thread_time.hpp>
0036 #include <boost/date_time/posix_time/posix_time_types.hpp>
0037 #include <boost/log/detail/timestamp.hpp>
0038 #include <boost/log/detail/enqueued_record.hpp>
0039 #include <boost/log/keywords/order.hpp>
0040 #include <boost/log/keywords/ordering_window.hpp>
0041 #include <boost/log/core/record_view.hpp>
0042 #include <boost/log/detail/header.hpp>
0043
0044 namespace boost {
0045
0046 BOOST_LOG_OPEN_NAMESPACE
0047
0048 namespace sinks {
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068 template< typename OrderT >
0069 class unbounded_ordering_queue
0070 {
0071 private:
0072 typedef boost::mutex mutex_type;
0073 typedef sinks::aux::enqueued_record enqueued_record;
0074
0075 typedef std::priority_queue<
0076 enqueued_record,
0077 std::vector< enqueued_record >,
0078 enqueued_record::order< OrderT >
0079 > queue_type;
0080
0081 private:
0082
0083 const uint64_t m_ordering_window;
0084
0085 mutex_type m_mutex;
0086
0087 condition_variable m_cond;
0088
0089 queue_type m_queue;
0090
0091 bool m_interruption_requested;
0092
0093 public:
0094
0095
0096
0097 posix_time::time_duration get_ordering_window() const
0098 {
0099 return posix_time::milliseconds(m_ordering_window);
0100 }
0101
0102
0103
0104
0105
0106 static posix_time::time_duration get_default_ordering_window()
0107 {
0108
0109
0110
0111
0112
0113
0114
0115 return posix_time::milliseconds(30);
0116 }
0117
0118 protected:
0119
0120 template< typename ArgsT >
0121 explicit unbounded_ordering_queue(ArgsT const& args) :
0122 m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
0123 m_queue(args[keywords::order]),
0124 m_interruption_requested(false)
0125 {
0126 }
0127
0128
0129 void enqueue(record_view const& rec)
0130 {
0131 lock_guard< mutex_type > lock(m_mutex);
0132 enqueue_unlocked(rec);
0133 }
0134
0135
0136 bool try_enqueue(record_view const& rec)
0137 {
0138 unique_lock< mutex_type > lock(m_mutex, try_to_lock);
0139 if (lock.owns_lock())
0140 {
0141 enqueue_unlocked(rec);
0142 return true;
0143 }
0144 else
0145 return false;
0146 }
0147
0148
0149 bool try_dequeue_ready(record_view& rec)
0150 {
0151 lock_guard< mutex_type > lock(m_mutex);
0152 if (!m_queue.empty())
0153 {
0154 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
0155 enqueued_record const& elem = m_queue.top();
0156 if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
0157 {
0158
0159 rec = elem.m_record;
0160 m_queue.pop();
0161 return true;
0162 }
0163 }
0164
0165 return false;
0166 }
0167
0168
0169 bool try_dequeue(record_view& rec)
0170 {
0171 lock_guard< mutex_type > lock(m_mutex);
0172 if (!m_queue.empty())
0173 {
0174 enqueued_record const& elem = m_queue.top();
0175 rec = elem.m_record;
0176 m_queue.pop();
0177 return true;
0178 }
0179
0180 return false;
0181 }
0182
0183
0184 bool dequeue_ready(record_view& rec)
0185 {
0186 unique_lock< mutex_type > lock(m_mutex);
0187 while (!m_interruption_requested)
0188 {
0189 if (!m_queue.empty())
0190 {
0191 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
0192 enqueued_record const& elem = m_queue.top();
0193 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
0194 if (difference >= m_ordering_window)
0195 {
0196
0197 rec = elem.m_record;
0198 m_queue.pop();
0199 return true;
0200 }
0201 else
0202 {
0203
0204 m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
0205 }
0206 }
0207 else
0208 {
0209
0210 m_cond.wait(lock);
0211 }
0212 }
0213 m_interruption_requested = false;
0214
0215 return false;
0216 }
0217
0218
0219 void interrupt_dequeue()
0220 {
0221 lock_guard< mutex_type > lock(m_mutex);
0222 m_interruption_requested = true;
0223 m_cond.notify_one();
0224 }
0225
0226 private:
0227
0228 void enqueue_unlocked(record_view const& rec)
0229 {
0230 const bool was_empty = m_queue.empty();
0231 m_queue.push(enqueued_record(rec));
0232 if (was_empty)
0233 m_cond.notify_one();
0234 }
0235 };
0236
0237 }
0238
0239 BOOST_LOG_CLOSE_NAMESPACE
0240
0241 }
0242
0243 #include <boost/log/detail/footer.hpp>
0244
0245 #endif