Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:39:25

0001 /*
0002  *          Copyright Andrey Semashev 2007 - 2015.
0003  * Distributed under the Boost Software License, Version 1.0.
0004  *    (See accompanying file LICENSE_1_0.txt or copy at
0005  *          http://www.boost.org/LICENSE_1_0.txt)
0006  */
0007 /*!
0008  * \file   unbounded_ordering_queue.hpp
0009  * \author Andrey Semashev
0010  * \date   24.07.2011
0011  *
0012  * The header contains implementation of unbounded ordering record queueing strategy for
0013  * the asynchronous sink frontend.
0014  */
0015 
0016 #ifndef BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0017 #define BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_
0018 
0019 #include <boost/log/detail/config.hpp>
0020 
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024 
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: This header content is only supported in multithreaded environment
0027 #endif
0028 
0029 #include <queue>
0030 #include <vector>
0031 #include <boost/cstdint.hpp>
0032 #include <boost/thread/locks.hpp>
0033 #include <boost/thread/mutex.hpp>
0034 #include <boost/thread/condition_variable.hpp>
0035 #include <boost/thread/thread_time.hpp>
0036 #include <boost/date_time/posix_time/posix_time_types.hpp>
0037 #include <boost/log/detail/timestamp.hpp>
0038 #include <boost/log/detail/enqueued_record.hpp>
0039 #include <boost/log/keywords/order.hpp>
0040 #include <boost/log/keywords/ordering_window.hpp>
0041 #include <boost/log/core/record_view.hpp>
0042 #include <boost/log/detail/header.hpp>
0043 
0044 namespace boost {
0045 
0046 BOOST_LOG_OPEN_NAMESPACE
0047 
0048 namespace sinks {
0049 
0050 /*!
0051  * \brief Unbounded ordering log record queueing strategy
0052  *
0053  * The \c unbounded_ordering_queue class is intended to be used with
0054  * the \c asynchronous_sink frontend as a log record queueing strategy.
0055  *
0056  * This strategy provides the following properties to the record queueing mechanism:
0057  *
0058  * \li The queue has no size limits.
0059  * \li The queue has a fixed latency window. This means that each log record put
0060  *     into the queue will normally not be dequeued for a certain period of time.
0061  * \li The queue performs stable record ordering within the latency window.
0062  *     The ordering predicate can be specified in the \c OrderT template parameter.
0063  *
0064  * Since this queue has no size limits, it may grow uncontrollably if sink backends
0065  * dequeue log records not fast enough. When this is an issue, it is recommended to
0066  * use one of the bounded strategies.
0067  */
0068 template< typename OrderT >
0069 class unbounded_ordering_queue
0070 {
0071 private:
0072     typedef boost::mutex mutex_type;
0073     typedef sinks::aux::enqueued_record enqueued_record;
0074 
0075     typedef std::priority_queue<
0076         enqueued_record,
0077         std::vector< enqueued_record >,
0078         enqueued_record::order< OrderT >
0079     > queue_type;
0080 
0081 private:
0082     //! Ordering window duration, in milliseconds
0083     const uint64_t m_ordering_window;
0084     //! Synchronization mutex
0085     mutex_type m_mutex;
0086     //! Condition for blocking
0087     condition_variable m_cond;
0088     //! Thread-safe queue
0089     queue_type m_queue;
0090     //! Interruption flag
0091     bool m_interruption_requested;
0092 
0093 public:
0094     /*!
0095      * Returns ordering window size specified during initialization
0096      */
0097     posix_time::time_duration get_ordering_window() const
0098     {
0099         return posix_time::milliseconds(m_ordering_window);
0100     }
0101 
0102     /*!
0103      * Returns default ordering window size.
0104      * The default window size is specific to the operating system thread scheduling mechanism.
0105      */
0106     static posix_time::time_duration get_default_ordering_window()
0107     {
0108         // The main idea behind this parameter is that the ordering window should be large enough
0109         // to allow the frontend to order records from different threads on an attribute
0110         // that contains system time. Thus this value should be:
0111         // * No less than the minimum time resolution quant that Boost.DateTime provides on the current OS.
0112         //   For instance, on Windows it defaults to around 15-16 ms.
0113         // * No less than thread switching quant on the current OS. For now 30 ms is large enough window size to
0114         //   switch threads on any known OS. It can be tuned for other platforms as needed.
0115         return posix_time::milliseconds(30);
0116     }
0117 
0118 protected:
0119     //! Initializing constructor
0120     template< typename ArgsT >
0121     explicit unbounded_ordering_queue(ArgsT const& args) :
0122         m_ordering_window(args[keywords::ordering_window || &unbounded_ordering_queue::get_default_ordering_window].total_milliseconds()),
0123         m_queue(args[keywords::order]),
0124         m_interruption_requested(false)
0125     {
0126     }
0127 
0128     //! Enqueues log record to the queue
0129     void enqueue(record_view const& rec)
0130     {
0131         lock_guard< mutex_type > lock(m_mutex);
0132         enqueue_unlocked(rec);
0133     }
0134 
0135     //! Attempts to enqueue log record to the queue
0136     bool try_enqueue(record_view const& rec)
0137     {
0138         unique_lock< mutex_type > lock(m_mutex, try_to_lock);
0139         if (lock.owns_lock())
0140         {
0141             enqueue_unlocked(rec);
0142             return true;
0143         }
0144         else
0145             return false;
0146     }
0147 
0148     //! Attempts to dequeue a log record ready for processing from the queue, does not block if no log records are ready to be processed
0149     bool try_dequeue_ready(record_view& rec)
0150     {
0151         lock_guard< mutex_type > lock(m_mutex);
0152         if (!m_queue.empty())
0153         {
0154             const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
0155             enqueued_record const& elem = m_queue.top();
0156             if (static_cast< uint64_t >((now - elem.m_timestamp).milliseconds()) >= m_ordering_window)
0157             {
0158                 // We got a new element
0159                 rec = elem.m_record;
0160                 m_queue.pop();
0161                 return true;
0162             }
0163         }
0164 
0165         return false;
0166     }
0167 
0168     //! Attempts to dequeue log record from the queue, does not block.
0169     bool try_dequeue(record_view& rec)
0170     {
0171         lock_guard< mutex_type > lock(m_mutex);
0172         if (!m_queue.empty())
0173         {
0174             enqueued_record const& elem = m_queue.top();
0175             rec = elem.m_record;
0176             m_queue.pop();
0177             return true;
0178         }
0179 
0180         return false;
0181     }
0182 
0183     //! Dequeues log record from the queue, blocks if no log records are ready to be processed
0184     bool dequeue_ready(record_view& rec)
0185     {
0186         unique_lock< mutex_type > lock(m_mutex);
0187         while (!m_interruption_requested)
0188         {
0189             if (!m_queue.empty())
0190             {
0191                 const boost::log::aux::timestamp now = boost::log::aux::get_timestamp();
0192                 enqueued_record const& elem = m_queue.top();
0193                 const uint64_t difference = (now - elem.m_timestamp).milliseconds();
0194                 if (difference >= m_ordering_window)
0195                 {
0196                     // We got a new element
0197                     rec = elem.m_record;
0198                     m_queue.pop();
0199                     return true;
0200                 }
0201                 else
0202                 {
0203                     // Wait until the element becomes ready to be processed
0204                     m_cond.timed_wait(lock, posix_time::milliseconds(m_ordering_window - difference));
0205                 }
0206             }
0207             else
0208             {
0209                 // Wait for an element to come
0210                 m_cond.wait(lock);
0211             }
0212         }
0213         m_interruption_requested = false;
0214 
0215         return false;
0216     }
0217 
0218     //! Wakes a thread possibly blocked in the \c dequeue method
0219     void interrupt_dequeue()
0220     {
0221         lock_guard< mutex_type > lock(m_mutex);
0222         m_interruption_requested = true;
0223         m_cond.notify_one();
0224     }
0225 
0226 private:
0227     //! Enqueues a log record
0228     void enqueue_unlocked(record_view const& rec)
0229     {
0230         const bool was_empty = m_queue.empty();
0231         m_queue.push(enqueued_record(rec));
0232         if (was_empty)
0233             m_cond.notify_one();
0234     }
0235 };
0236 
0237 } // namespace sinks
0238 
0239 BOOST_LOG_CLOSE_NAMESPACE // namespace log
0240 
0241 } // namespace boost
0242 
0243 #include <boost/log/detail/footer.hpp>
0244 
0245 #endif // BOOST_LOG_SINKS_UNBOUNDED_ORDERING_QUEUE_HPP_INCLUDED_