Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-15 08:39:21

0001 /*
0002  *          Copyright Andrey Semashev 2007 - 2015.
0003  * Distributed under the Boost Software License, Version 1.0.
0004  *    (See accompanying file LICENSE_1_0.txt or copy at
0005  *          http://www.boost.org/LICENSE_1_0.txt)
0006  */
0007 /*!
0008  * \file   unbounded_ordering_queue.hpp
0009  * \author Andrey Semashev
0010  * \date   24.07.2011
0011  *
0012  * The header contains implementation of unbounded ordering record queueing strategy for
0013  * the asynchronous sink frontend.
0014  */
0015 
0016 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0017 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0018 
0019 #include <boost/log/detail/config.hpp>
0020 
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024 
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: This header content is only supported in multithreaded environment
0027 #endif
0028 
0029 #include <queue>
0030 #include <vector>
0031 #include <chrono>
0032 #include <mutex>
0033 #include <condition_variable>
0034 #include <boost/log/detail/enqueued_record.hpp>
0035 #include <boost/log/keywords/order.hpp>
0036 #include <boost/log/keywords/ordering_window.hpp>
0037 #include <boost/log/core/record_view.hpp>
0038 #include <boost/log/detail/header.hpp>
0039 
0040 namespace boost {
0041 
0042 BOOST_LOG_OPEN_NAMESPACE
0043 
0044 namespace sinks {
0045 
0046 /*!
0047  * \brief Unbounded ordering log record queueing strategy
0048  *
0049  * The \c unbounded_ordering_queue class is intended to be used with
0050  * the \c asynchronous_sink frontend as a log record queueing strategy.
0051  *
0052  * This strategy provides the following properties to the record queueing mechanism:
0053  *
0054  * \li The queue has no size limits.
0055  * \li The queue has a fixed latency window. This means that each log record put
0056  *     into the queue will normally not be dequeued for a certain period of time.
0057  * \li The queue performs stable record ordering within the latency window.
0058  *     The ordering predicate can be specified in the \c OrderT template parameter.
0059  *
0060  * Since this queue has no size limits, it may grow uncontrollably if sink backends
0061  * dequeue log records not fast enough. When this is an issue, it is recommended to
0062  * use one of the bounded strategies.
0063  */
0064 template< typename OrderT >
0065 class unbounded_ordering_queue
0066 {
0067 private:
0068     typedef std::mutex mutex_type;
0069     typedef sinks::aux::enqueued_record enqueued_record;
0070 
0071     typedef std::priority_queue<
0072         enqueued_record,
0073         std::vector< enqueued_record >,
0074         enqueued_record::order< OrderT >
0075     > queue_type;
0076 
0077 private:
0078     //! Ordering window duration
0079     const std::chrono::steady_clock::duration m_ordering_window;
0080     //! Synchronization mutex
0081     mutex_type m_mutex;
0082     //! Condition for blocking
0083     std::condition_variable m_cond;
0084     //! Thread-safe queue
0085     queue_type m_queue;
0086     //! Interruption flag
0087     bool m_interruption_requested;
0088 
0089 public:
0090     /*!
0091      * Returns ordering window size specified during initialization
0092      */
0093     std::chrono::steady_clock::duration get_ordering_window() const
0094     {
0095         return m_ordering_window;
0096     }
0097 
0098     /*!
0099      * Returns default ordering window size.
0100      * The default window size is specific to the operating system thread scheduling mechanism.
0101      */
0102     static BOOST_CONSTEXPR std::chrono::steady_clock::duration get_default_ordering_window() BOOST_NOEXCEPT
0103     {
0104         // The main idea behind this parameter is that the ordering window should be large enough
0105         // to allow the frontend to order records from different threads on an attribute
0106         // that contains system time. Thus this value should be:
0107         // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
0108         //   For instance, on Windows it defaults to around 15-16 ms.
0109         // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
0110         //   switch threads on any known OS. It can be tuned for other platforms as needed.
0111         return std::chrono::milliseconds(30);
0112     }
0113 
0114 protected:
0115     //! Initializing constructor
0116     template< typename ArgsT >
0117     explicit unbounded_ordering_queue(ArgsT const& args) :
0118         m_ordering_window(std::chrono::duration_cast< std::chrono::steady_clock::duration >(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window])),
0119         m_queue(args[keywords::order]),
0120         m_interruption_requested(false)
0121     {
0122     }
0123 
0124     //! Enqueues log record to the queue
0125     void enqueue(record_view const& rec)
0126     {
0127         std::lock_guard< mutex_type > lock(m_mutex);
0128         enqueue_unlocked(rec);
0129     }
0130 
0131     //! Attempts to enqueue log record to the queue
0132     bool try_enqueue(record_view const& rec)
0133     {
0134         std::unique_lock< mutex_type > lock(m_mutex, std::try_to_lock);
0135         if (lock.owns_lock())
0136         {
0137             enqueue_unlocked(rec);
0138             return true;
0139         }
0140         else
0141             return false;
0142     }
0143 
0144     //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed
0145     bool try_dequeue_ready(record_view& rec)
0146     {
0147         std::lock_guard< mutex_type > lock(m_mutex);
0148         if (!m_queue.empty())
0149         {
0150             const auto now = std::chrono::steady_clock::now();
0151             enqueued_record const& elem = m_queue.top();
0152             if ((now - elem.m_timestamp) >= m_ordering_window)
0153             {
0154                 // We got a new element
0155                 rec = elem.m_record;
0156                 m_queue.pop();
0157                 return true;
0158             }
0159         }
0160 
0161         return false;
0162     }
0163 
0164     //! Attempts to dequeue log record from the queue, does not block.
0165     bool try_dequeue(record_view& rec)
0166     {
0167         std::lock_guard< mutex_type > lock(m_mutex);
0168         if (!m_queue.empty())
0169         {
0170             enqueued_record const& elem = m_queue.top();
0171             rec = elem.m_record;
0172             m_queue.pop();
0173             return true;
0174         }
0175 
0176         return false;
0177     }
0178 
0179     //! Dequeues log record from the queue, blocks if no log records are ready to be processed
0180     bool dequeue_ready(record_view& rec)
0181     {
0182         std::unique_lock< mutex_type > lock(m_mutex);
0183         while (!m_interruption_requested)
0184         {
0185             if (!m_queue.empty())
0186             {
0187                 const auto now = std::chrono::steady_clock::now();
0188                 enqueued_record const& elem = m_queue.top();
0189                 const auto difference = now - elem.m_timestamp;
0190                 if (difference >= m_ordering_window)
0191                 {
0192                     // We got a new element
0193                     rec = elem.m_record;
0194                     m_queue.pop();
0195                     return true;
0196                 }
0197                 else
0198                 {
0199                     // Wait until the element becomes ready to be processed
0200                     m_cond.wait_for(lock, m_ordering_window - difference);
0201                 }
0202             }
0203             else
0204             {
0205                 // Wait for an element to come
0206                 m_cond.wait(lock);
0207             }
0208         }
0209         m_interruption_requested = false;
0210 
0211         return false;
0212     }
0213 
0214     //! Wakes a thread possibly blocked in the \c dequeue method
0215     void interrupt_dequeue()
0216     {
0217         std::lock_guard< mutex_type > lock(m_mutex);
0218         m_interruption_requested = true;
0219         m_cond.notify_one();
0220     }
0221 
0222 private:
0223     //! Enqueues a log record
0224     void enqueue_unlocked(record_view const& rec)
0225     {
0226         const bool was_empty = m_queue.empty();
0227         m_queue.push(enqueued_record(rec));
0228         if (was_empty)
0229             m_cond.notify_one();
0230     }
0231 };
0232 
0233 } // namespace sinks
0234 
0235 BOOST_LOG_CLOSE_NAMESPACE // namespace log
0236 
0237 } // namespace boost
0238 
0239 #include <boost/log/detail/footer.hpp>
0240 
0241 #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_