Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-10-30 08:21:43

0001 /*
0002  *          Copyright Andrey Semashev 2007 - 2015.
0003  * Distributed under the Boost Software License, Version 1.0.
0004  *    (See accompanying file LICENSE_1_0.txt or copy at
0005  *          http://www.boost.org/LICENSE_1_0.txt)
0006  */
0007 /*!
0008  * \file   bounded_ordering_queue.hpp
0009  * \author Andrey Semashev
0010  * \date   06.01.2012
0011  *
0012  * The header contains implementation of bounded ordering queueing strategy for
0013  * the asynchronous sink frontend.
0014  */
0015 
0016 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0017 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0018 
0019 #include <boost/log/detail/config.hpp>
0020 
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024 
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: This header content is only supported in multithreaded environment
0027 #endif
0028 
0029 #include <cstddef>
0030 #include <queue>
0031 #include <vector>
0032 #include <chrono>
0033 #include <mutex>
0034 #include <condition_variable>
0035 #include <boost/log/detail/enqueued_record.hpp>
0036 #include <boost/log/keywords/order.hpp>
0037 #include <boost/log/keywords/ordering_window.hpp>
0038 #include <boost/log/core/record_view.hpp>
0039 #include <boost/log/detail/header.hpp>
0040 
0041 namespace boost {
0042 
0043 BOOST_LOG_OPEN_NAMESPACE
0044 
0045 namespace sinks {
0046 
0047 /*!
0048  * \brief Bounded ordering log record queueing strategy
0049  *
0050  * The \c bounded_ordering_queue class is intended to be used with
0051  * the \c asynchronous_sink frontend as a log record queueing strategy.
0052  *
0053  * This strategy provides the following properties to the record queueing mechanism:
0054  *
0055  * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter.
0056  * \li Upon reaching the size limit, the queue invokes the overflow handling strategy
0057  *     specified in the \c OverflowStrategyT template parameter to handle the situation.
0058  *     The library provides overflow handling strategies for most common cases:
0059  *     \c drop_on_overflow will silently discard the log record, and \c block_on_overflow
0060  *     will put the enqueueing thread to wait until there is space in the queue.
0061  * \li The queue has a fixed latency window. This means that each log record put
0062  *     into the queue will normally not be dequeued for a certain period of time.
0063  * \li The queue performs stable record ordering within the latency window.
0064  *     The ordering predicate can be specified in the \c OrderT template parameter.
0065  */
0066 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
0067 class bounded_ordering_queue :
0068     private OverflowStrategyT
0069 {
0070 private:
0071     typedef OverflowStrategyT overflow_strategy;
0072     typedef std::mutex mutex_type;
0073     typedef sinks::aux::enqueued_record enqueued_record;
0074 
0075     typedef std::priority_queue<
0076         enqueued_record,
0077         std::vector< enqueued_record >,
0078         enqueued_record::order< OrderT >
0079     > queue_type;
0080 
0081 private:
0082     //! Ordering window duration
0083     const std::chrono::steady_clock::duration m_ordering_window;
0084     //! Synchronization primitive
0085     mutex_type m_mutex;
0086     //! Condition to block the consuming thread on
0087     std::condition_variable m_cond;
0088     //! Log record queue
0089     queue_type m_queue;
0090     //! Interruption flag
0091     bool m_interruption_requested;
0092 
0093 public:
0094     /*!
0095      * Returns ordering window size specified during initialization
0096      */
0097     std::chrono::steady_clock::duration get_ordering_window() const
0098     {
0099         return m_ordering_window;
0100     }
0101 
0102     /*!
0103      * Returns default ordering window size.
0104      * The default window size is specific to the operating system thread scheduling mechanism.
0105      */
0106     static BOOST_CONSTEXPR std::chrono::steady_clock::duration get_default_ordering_window() BOOST_NOEXCEPT
0107     {
0108         // The main idea behind this parameter is that the ordering window should be large enough
0109         // to allow the frontend to order records from different threads on an attribute
0110         // that contains system time. Thus this value should be:
0111         // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
0112         //   For instance, on Windows it defaults to around 15-16 ms.
0113         // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
0114         //   switch threads on any known OS. It can be tuned for other platforms as needed.
0115         return std::chrono::milliseconds(30);
0116     }
0117 
0118 protected:
0119     //! Initializing constructor
0120     template< typename ArgsT >
0121     explicit bounded_ordering_queue(ArgsT const& args) :
0122         m_ordering_window(std::chrono::duration_cast< std::chrono::steady_clock::duration >(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window])),
0123         m_queue(args[keywords::order]),
0124         m_interruption_requested(false)
0125     {
0126     }
0127 
0128     //! Enqueues log record to the queue
0129     void enqueue(record_view const& rec)
0130     {
0131         std::unique_lock< mutex_type > lock(m_mutex);
0132         std::size_t size = m_queue.size();
0133         for (; size >= MaxQueueSizeV; size = m_queue.size())
0134         {
0135             if (!overflow_strategy::on_overflow(rec, lock))
0136                 return;
0137         }
0138 
0139         m_queue.push(enqueued_record(rec));
0140         if (size == 0)
0141             m_cond.notify_one();
0142     }
0143 
0144     //! Attempts to enqueue log record to the queue
0145     bool try_enqueue(record_view const& rec)
0146     {
0147         std::unique_lock< mutex_type > lock(m_mutex, std::try_to_lock);
0148         if (lock.owns_lock())
0149         {
0150             const std::size_t size = m_queue.size();
0151 
0152             // Do not invoke the bounding strategy in case of overflow as it may block
0153             if (size < MaxQueueSizeV)
0154             {
0155                 m_queue.push(enqueued_record(rec));
0156                 if (size == 0)
0157                     m_cond.notify_one();
0158                 return true;
0159             }
0160         }
0161 
0162         return false;
0163     }
0164 
0165     //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
0166     bool try_dequeue_ready(record_view& rec)
0167     {
0168         std::lock_guard< mutex_type > lock(m_mutex);
0169         const std::size_t size = m_queue.size();
0170         if (size > 0)
0171         {
0172             const auto now = std::chrono::steady_clock::now();
0173             enqueued_record const& elem = m_queue.top();
0174             if ((now - elem.m_timestamp) >= m_ordering_window)
0175             {
0176                 // We got a new element
0177                 rec = elem.m_record;
0178                 m_queue.pop();
0179                 overflow_strategy::on_queue_space_available();
0180                 return true;
0181             }
0182         }
0183 
0184         return false;
0185     }
0186 
0187     //! Attempts to dequeue log record from the queue, does not block if the queue is empty
0188     bool try_dequeue(record_view& rec)
0189     {
0190         std::lock_guard< mutex_type > lock(m_mutex);
0191         const std::size_t size = m_queue.size();
0192         if (size > 0)
0193         {
0194             enqueued_record const& elem = m_queue.top();
0195             rec = elem.m_record;
0196             m_queue.pop();
0197             overflow_strategy::on_queue_space_available();
0198             return true;
0199         }
0200 
0201         return false;
0202     }
0203 
0204     //! Dequeues log record from the queue, blocks if the queue is empty
0205     bool dequeue_ready(record_view& rec)
0206     {
0207         std::unique_lock< mutex_type > lock(m_mutex);
0208 
0209         while (!m_interruption_requested)
0210         {
0211             const std::size_t size = m_queue.size();
0212             if (size > 0)
0213             {
0214                 const auto now = std::chrono::steady_clock::now();
0215                 enqueued_record const& elem = m_queue.top();
0216                 const auto difference = now - elem.m_timestamp;
0217                 if (difference >= m_ordering_window)
0218                 {
0219                     rec = elem.m_record;
0220                     m_queue.pop();
0221                     overflow_strategy::on_queue_space_available();
0222                     return true;
0223                 }
0224                 else
0225                 {
0226                     // Wait until the element becomes ready to be processed
0227                     m_cond.wait_for(lock, m_ordering_window - difference);
0228                 }
0229             }
0230             else
0231             {
0232                 m_cond.wait(lock);
0233             }
0234         }
0235         m_interruption_requested = false;
0236 
0237         return false;
0238     }
0239 
0240     //! Wakes a thread possibly blocked in the \c dequeue method
0241     void interrupt_dequeue()
0242     {
0243         std::lock_guard< mutex_type > lock(m_mutex);
0244         m_interruption_requested = true;
0245         overflow_strategy::interrupt();
0246         m_cond.notify_one();
0247     }
0248 };
0249 
0250 } // namespace sinks
0251 
0252 BOOST_LOG_CLOSE_NAMESPACE // namespace log
0253 
0254 } // namespace boost
0255 
0256 #include <boost/log/detail/footer.hpp>
0257 
0258 #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_