Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:39:25

0001 /*
0002  *          Copyright Andrey Semashev 2007 - 2015.
0003  * Distributed under the Boost Software License, Version 1.0.
0004  *    (See accompanying file LICENSE_1_0.txt or copy at
0005  *          http://www.boost.org/LICENSE_1_0.txt)
0006  */
0007 /*!
0008  * \file   bounded_ordering_queue.hpp
0009  * \author Andrey Semashev
0010  * \date   06.01.2012
0011  *
0012  * The header contains implementation of bounded ordering queueing strategy for
0013  * the asynchronous sink frontend.
0014  */
0015 
0016 #ifndef BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0017 #define BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0018 
0019 #include <boost/log/detail/config.hpp>
0020 
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024 
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: This header content is only supported in multithreaded environment
0027 #endif
0028 
0029 #include <cstddef>
0030 #include <queue>
0031 #include <vector>
0032 #include <boost/cstdint.hpp>
0033 #include <boost/thread/locks.hpp>
0034 #include <boost/thread/mutex.hpp>
0035 #include <boost/thread/condition_variable.hpp>
0036 #include <boost/thread/thread_time.hpp>
0037 #include <boost/date_time/posix_time/posix_time_types.hpp>
0038 #include <boost/log/detail/timestamp.hpp>
0039 #include <boost/log/detail/enqueued_record.hpp>
0040 #include <boost/log/keywords/order.hpp>
0041 #include <boost/log/keywords/ordering_window.hpp>
0042 #include <boost/log/core/record_view.hpp>
0043 #include <boost/log/detail/header.hpp>
0044 
0045 namespace boost {
0046 
0047 BOOST_LOG_OPEN_NAMESPACE
0048 
0049 namespace sinks {
0050 
0051 /*!
0052  * \brief Bounded ordering log record queueing strategy
0053  *
0054  * The \c bounded_ordering_queue class is intended to be used with
0055  * the \c asynchronous_sink frontend as a log record queueing strategy.
0056  *
0057  * This strategy provides the following properties to the record queueing mechanism:
0058  *
0059  * \li The queue has limited capacity specified by the \c MaxQueueSizeV template parameter.
0060  * \li Upon reaching the size limit, the queue invokes the overflow handling strategy
0061  *     specified in the \c OverflowStrategyT template parameter to handle the situation.
0062  *     The library provides overflow handling strategies for most common cases:
0063  *     \c drop_on_overflow will silently discard the log record, and \c block_on_overflow
0064  *     will put the enqueueing thread to wait until there is space in the queue.
0065  * \li The queue has a fixed latency window. This means that each log record put
0066  *     into the queue will normally not be dequeued for a certain period of time.
0067  * \li The queue performs stable record ordering within the latency window.
0068  *     The ordering predicate can be specified in the \c OrderT template parameter.
0069  */
0070 template< typename OrderT, std::size_t MaxQueueSizeV, typename OverflowStrategyT >
0071 class bounded_ordering_queue :
0072     private OverflowStrategyT
0073 {
0074 private:
0075     typedef OverflowStrategyT overflow_strategy;
0076     typedef boost::mutex mutex_type;
0077     typedef sinks::aux::enqueued_record enqueued_record;
0078 
0079     typedef std::priority_queue<
0080         enqueued_record,
0081         std::vector< enqueued_record >,
0082         enqueued_record::order< OrderT >
0083     > queue_type;
0084 
0085 private:
0086     //! Ordering window duration, in milliseconds
0087     const uint64_t m_ordering_window;
0088     //! Synchronization primitive
0089     mutex_type m_mutex;
0090     //! Condition to block the consuming thread on
0091     condition_variable m_cond;
0092     //! Log record queue
0093     queue_type m_queue;
0094     //! Interruption flag
0095     bool m_interruption_requested;
0096 
0097 public:
0098     /*!
0099      * Returns ordering window size specified during initialization
0100      */
0101     posix_time::time_duration get_ordering_window() const
0102     {
0103         return posix_time::milliseconds(m_ordering_window);
0104     }
0105 
0106     /*!
0107      * Returns default ordering window size.
0108      * The default window size is specific to the operating system thread scheduling mechanism.
0109      */
0110     static posix_time::time_duration get_default_ordering_window()
0111     {
0112         // The main idea behind this parameter is that the ordering window should be large enough
0113         // to allow the frontend to order records from different threads on an attribute
0114         // that contains system time. Thus this value should be:
0115         // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
0116         //   For instance, on Windows it defaults to around 15-16 ms.
0117         // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
0118         //   switch threads on any known OS. It can be tuned for other platforms as needed.
0119         return posix_time::milliseconds(30);
0120     }
0121 
0122 protected:
0123     //! Initializing constructor
0124     template< typename ArgsT >
0125     explicit bounded_ordering_queue(ArgsT const& args) :
0126         m_ordering_window(args[keywords::ordering_window || &bounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
0127         m_queue(args[keywords::order]),
0128         m_interruption_requested(false)
0129     {
0130     }
0131 
0132     //! Enqueues log record to the queue
0133     void enqueue(record_view const& rec)
0134     {
0135         unique_lock< mutex_type > lock(m_mutex);
0136         std::size_t size = m_queue.size();
0137         for (; size >= MaxQueueSizeV; size = m_queue.size())
0138         {
0139             if (!overflow_strategy::on_overflow(rec, lock))
0140                 return;
0141         }
0142 
0143         m_queue.push(enqueued_record(rec));
0144         if (size == 0)
0145             m_cond.notify_one();
0146     }
0147 
0148     //! Attempts to enqueue log record to the queue
0149     bool try_enqueue(record_view const& rec)
0150     {
0151         unique_lock< mutex_type > lock(m_mutex, try_to_lock);
0152         if (lock.owns_lock())
0153         {
0154             const std::size_t size = m_queue.size();
0155 
0156             // Do not invoke the bounding strategy in case of overflow as it may block
0157             if (size < MaxQueueSizeV)
0158             {
0159                 m_queue.push(enqueued_record(rec));
0160                 if (size == 0)
0161                     m_cond.notify_one();
0162                 return true;
0163             }
0164         }
0165 
0166         return false;
0167     }
0168 
0169     //! Attempts to dequeue a log record ready for processing from the queue, does not block if the queue is empty
0170     bool try_dequeue_ready(record_view& rec)
0171     {
0172         lock_guard< mutex_type > lock(m_mutex);
0173         const std::size_t size = m_queue.size();
0174         if (size > 0)
0175         {
0176             const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
0177             enqueued_record const& elem = m_queue.top();
0178             if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
0179             {
0180                 // We got a new element
0181                 rec = elem.m_record;
0182                 m_queue.pop();
0183                 overflow_strategy::on_queue_space_available();
0184                 return true;
0185             }
0186         }
0187 
0188         return false;
0189     }
0190 
0191     //! Attempts to dequeue log record from the queue, does not block if the queue is empty
0192     bool try_dequeue(record_view& rec)
0193     {
0194         lock_guard< mutex_type > lock(m_mutex);
0195         const std::size_t size = m_queue.size();
0196         if (size > 0)
0197         {
0198             enqueued_record const& elem = m_queue.top();
0199             rec = elem.m_record;
0200             m_queue.pop();
0201             overflow_strategy::on_queue_space_available();
0202             return true;
0203         }
0204 
0205         return false;
0206     }
0207 
0208     //! Dequeues log record from the queue, blocks if the queue is empty
0209     bool dequeue_ready(record_view& rec)
0210     {
0211         unique_lock< mutex_type > lock(m_mutex);
0212 
0213         while (!m_interruption_requested)
0214         {
0215             const std::size_t size = m_queue.size();
0216             if (size > 0)
0217             {
0218                 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
0219                 enqueued_record const& elem = m_queue.top();
0220                 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
0221                 if (difference >= m_ordering_window)
0222                 {
0223                     rec = elem.m_record;
0224                     m_queue.pop();
0225                     overflow_strategy::on_queue_space_available();
0226                     return true;
0227                 }
0228                 else
0229                 {
0230                     // Wait until the element becomes ready to be processed
0231                     m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
0232                 }
0233             }
0234             else
0235             {
0236                 m_cond.wait(lock);
0237             }
0238         }
0239         m_interruption_requested = false;
0240 
0241         return false;
0242     }
0243 
0244     //! Wakes a thread possibly blocked in the \c dequeue method
0245     void interrupt_dequeue()
0246     {
0247         lock_guard< mutex_type > lock(m_mutex);
0248         m_interruption_requested = true;
0249         overflow_strategy::interrupt();
0250         m_cond.notify_one();
0251     }
0252 };
0253 
0254 } // namespace sinks
0255 
0256 BOOST_LOG_CLOSE_NAMESPACE // namespace log
0257 
0258 } // namespace boost
0259 
0260 #include <boost/log/detail/footer.hpp>
0261 
0262 #endif // BOOST_LOG_SINKS_BOUNDED_ORDERING_QUEUE_HPP_INCLUDED_