Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-13 08:38:41

0001 /*
0002  *          Copyright Andrey Semashev 2007 - 2015.
0003  * Distributed under the Boost Software License, Version 1.0.
0004  *    (See accompanying file LICENSE_1_0.txt or copy at
0005  *          http://www.boost.org/LICENSE_1_0.txt)
0006  */
0007 /*!
0008  * \file   async_frontend.hpp
0009  * \author Andrey Semashev
0010  * \date   14.07.2009
0011  *
0012  * The header contains implementation of asynchronous sink frontend.
0013  */
0014 
0015 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
0016 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
0017 
0018 #include <thread>
0019 #include <mutex>
0020 #include <condition_variable>
0021 #include <exception> // std::terminate
0022 #include <boost/log/detail/config.hpp>
0023 
0024 #ifdef BOOST_HAS_PRAGMA_ONCE
0025 #pragma once
0026 #endif
0027 
0028 #if defined(BOOST_LOG_NO_THREADS)
0029 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
0030 #endif
0031 
0032 #include <boost/memory_order.hpp>
0033 #include <boost/atomic/atomic.hpp>
0034 #include <boost/smart_ptr/shared_ptr.hpp>
0035 #include <boost/smart_ptr/make_shared_object.hpp>
0036 #include <boost/preprocessor/control/if.hpp>
0037 #include <boost/preprocessor/comparison/equal.hpp>
0038 #include <boost/log/exceptions.hpp>
0039 #include <boost/log/detail/locking_ptr.hpp>
0040 #include <boost/log/detail/parameter_tools.hpp>
0041 #include <boost/log/core/record_view.hpp>
0042 #include <boost/log/sinks/basic_sink_frontend.hpp>
0043 #include <boost/log/sinks/frontend_requirements.hpp>
0044 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
0045 #include <boost/log/keywords/start_thread.hpp>
0046 #include <boost/log/detail/header.hpp>
0047 
0048 namespace boost {
0049 
0050 BOOST_LOG_OPEN_NAMESPACE
0051 
0052 namespace sinks {
0053 
0054 #ifndef BOOST_LOG_DOXYGEN_PASS
0055 
0056 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
0057     template< typename T0 >\
0058     explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
0059         base_type(true),\
0060         queue_base_type(arg0),\
0061         m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
0062         m_ActiveOperation(idle),\
0063         m_StopRequested(false),\
0064         m_FlushRequested(false)\
0065     {\
0066         if (arg0[keywords::start_thread | true])\
0067             start_feeding_thread();\
0068     }\
0069     template< typename T0 >\
0070     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
0071         base_type(true),\
0072         queue_base_type(arg0),\
0073         m_pBackend(backend),\
0074         m_ActiveOperation(idle),\
0075         m_StopRequested(false),\
0076         m_FlushRequested(false)\
0077     {\
0078         if (arg0[keywords::start_thread | true])\
0079             start_feeding_thread();\
0080     }
0081 
0082 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
0083     template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
0084     explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
0085         base_type(true),\
0086         queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0087         m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0088         m_ActiveOperation(idle),\
0089         m_StopRequested(false),\
0090         m_FlushRequested(false)\
0091     {\
0092         if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
0093             start_feeding_thread();\
0094     }\
0095     template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
0096     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
0097         base_type(true),\
0098         queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0099         m_pBackend(backend),\
0100         m_ActiveOperation(idle),\
0101         m_StopRequested(false),\
0102         m_FlushRequested(false)\
0103     {\
0104         if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
0105             start_feeding_thread();\
0106     }
0107 
0108 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
0109     BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
0110 
0111 #endif // BOOST_LOG_DOXYGEN_PASS
0112 
0113 /*!
0114  * \brief Asynchronous logging sink frontend
0115  *
0116  * The frontend starts a separate thread on construction. All logging records are passed
0117  * to the backend in this dedicated thread.
0118  *
0119  * The user can prevent spawning the internal thread by specifying \c start_thread parameter
0120  * with the value of \c false on construction. In this case log records will be buffered
0121  * in the internal queue until the user calls \c run, \c feed_records or \c flush in his own
0122  * thread. Log record queueing strategy is specified in the \c QueueingStrategyT template
0123  * parameter.
0124  */
0125 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
0126 class asynchronous_sink :
0127     public aux::make_sink_frontend_base< SinkBackendT >::type,
0128     public QueueingStrategyT
0129 {
0130     typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
0131     typedef QueueingStrategyT queue_base_type;
0132 
0133 private:
0134     //! Backend synchronization mutex type
0135     typedef std::recursive_mutex backend_mutex_type;
0136     //! Frontend synchronization mutex type
0137     typedef typename base_type::mutex_type frontend_mutex_type;
0138 
0139     //! Operation bit mask
0140     enum operation
0141     {
0142         idle = 0u,
0143         feeding_records = 1u,
0144         flushing = 3u
0145     };
0146 
0147     //! Function object to run the log record feeding thread
0148     class run_func
0149     {
0150     public:
0151         typedef void result_type;
0152 
0153     private:
0154         asynchronous_sink* m_self;
0155 
0156     public:
0157         explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
0158         {
0159         }
0160 
0161         result_type operator()() const
0162         {
0163             m_self->run();
0164         }
0165     };
0166 
0167     //! A scope guard that implements active operation management
0168     class scoped_feeding_operation
0169     {
0170     private:
0171         asynchronous_sink& m_self;
0172 
0173     public:
0174         //! Initializing constructor
0175         explicit scoped_feeding_operation(asynchronous_sink& self) : m_self(self)
0176         {
0177         }
0178         //! Destructor
0179         ~scoped_feeding_operation()
0180         {
0181             m_self.complete_feeding_operation();
0182         }
0183 
0184         BOOST_DELETED_FUNCTION(scoped_feeding_operation(scoped_feeding_operation const&))
0185         BOOST_DELETED_FUNCTION(scoped_feeding_operation& operator= (scoped_feeding_operation const&))
0186     };
0187 
0188     //! A scope guard that resets a flag on destructor
0189     class scoped_flag
0190     {
0191     private:
0192         frontend_mutex_type& m_Mutex;
0193         std::condition_variable_any& m_Cond;
0194         boost::atomic< bool >& m_Flag;
0195 
0196     public:
0197         explicit scoped_flag(frontend_mutex_type& mut, std::condition_variable_any& cond, boost::atomic< bool >& f) :
0198             m_Mutex(mut), m_Cond(cond), m_Flag(f)
0199         {
0200         }
0201         ~scoped_flag()
0202         {
0203             try
0204             {
0205                 std::lock_guard< frontend_mutex_type > lock(m_Mutex);
0206                 m_Flag.store(false, boost::memory_order_relaxed);
0207                 m_Cond.notify_all();
0208             }
0209             catch (...)
0210             {
0211             }
0212         }
0213 
0214         BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
0215         BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
0216     };
0217 
0218 public:
0219     //! Sink implementation type
0220     typedef SinkBackendT sink_backend_type;
0221     //! \cond
0222     static_assert(has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value, "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
0223     //! \endcond
0224 
0225 #ifndef BOOST_LOG_DOXYGEN_PASS
0226 
0227     //! A pointer type that locks the backend until it's destroyed
0228     typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
0229 
0230 #else // BOOST_LOG_DOXYGEN_PASS
0231 
0232     //! A pointer type that locks the backend until it's destroyed
0233     typedef implementation_defined locked_backend_ptr;
0234 
0235 #endif // BOOST_LOG_DOXYGEN_PASS
0236 
0237 private:
0238     //! Synchronization mutex
0239     backend_mutex_type m_BackendMutex;
0240     //! Pointer to the backend
0241     const shared_ptr< sink_backend_type > m_pBackend;
0242 
0243     //! Dedicated record feeding thread
0244     std::thread m_DedicatedFeedingThread;
0245     //! Condition variable to implement blocking operations
0246     std::condition_variable_any m_BlockCond;
0247 
0248     //! Currently active operation
0249     operation m_ActiveOperation;
0250     //! The flag indicates that the feeding loop has to be stopped
0251     boost::atomic< bool > m_StopRequested;
0252     //! The flag indicates that queue flush has been requested
0253     boost::atomic< bool > m_FlushRequested;
0254 
0255 public:
0256     /*!
0257      * Default constructor. Constructs the sink backend instance.
0258      * Requires the backend to be default-constructible.
0259      *
0260      * \param start_thread If \c true, the frontend creates a thread to feed
0261      *                     log records to the backend. Otherwise no thread is
0262      *                     started and it is assumed that the user will call
0263      *                     \c run, \c feed_records or \c flush himself.
0264      */
0265     explicit asynchronous_sink(bool start_thread = true) :
0266         base_type(true),
0267         m_pBackend(boost::make_shared< sink_backend_type >()),
0268         m_ActiveOperation(idle),
0269         m_StopRequested(false),
0270         m_FlushRequested(false)
0271     {
0272         if (start_thread)
0273             start_feeding_thread();
0274     }
0275     /*!
0276      * Constructor attaches user-constructed backend instance
0277      *
0278      * \param backend Pointer to the backend instance.
0279      * \param start_thread If \c true, the frontend creates a thread to feed
0280      *                     log records to the backend. Otherwise no thread is
0281      *                     started and it is assumed that the user will call
0282      *                     \c run, \c feed_records or \c flush himself.
0283      *
0284      * \pre \a backend is not \c NULL.
0285      */
0286     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
0287         base_type(true),
0288         m_pBackend(backend),
0289         m_ActiveOperation(idle),
0290         m_StopRequested(false),
0291         m_FlushRequested(false)
0292     {
0293         if (start_thread)
0294             start_feeding_thread();
0295     }
0296 
0297     /*!
0298      * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor.
0299      * Refer to the backend documentation for the list of supported parameters.
0300      *
0301      * The frontend uses the following named parameters:
0302      *
0303      *   \li start_thread - If \c true, the frontend creates a thread to feed
0304      *                      log records to the backend. Otherwise no thread is
0305      *                      started and it is assumed that the user will call
0306      *                      \c run, \c feed_records or \c flush himself.
0307      */
0308 #ifndef BOOST_LOG_DOXYGEN_PASS
0309     BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
0310 #else
0311     template< typename... Args >
0312     explicit asynchronous_sink(Args&&... args);
0313 #endif
0314 
0315     /*!
0316      * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
0317      */
0318     ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
0319     {
0320         stop();
0321     }
0322 
0323     /*!
0324      * Locking accessor to the attached backend
0325      */
0326     locked_backend_ptr locked_backend()
0327     {
0328         return locked_backend_ptr(m_pBackend, m_BackendMutex);
0329     }
0330 
0331     /*!
0332      * Enqueues the log record to the backend
0333      */
0334     void consume(record_view const& rec) BOOST_OVERRIDE
0335     {
0336         if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
0337         {
0338             std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0339             // Wait until flush is done
0340             while (m_FlushRequested.load(boost::memory_order_acquire))
0341                 m_BlockCond.wait(lock);
0342         }
0343         queue_base_type::enqueue(rec);
0344     }
0345 
0346     /*!
0347      * The method attempts to pass logging record to the backend
0348      */
0349     bool try_consume(record_view const& rec) BOOST_OVERRIDE
0350     {
0351         if (!m_FlushRequested.load(boost::memory_order_acquire))
0352             return queue_base_type::try_enqueue(rec);
0353         else
0354             return false;
0355     }
0356 
0357     /*!
0358      * The method starts record feeding loop and effectively blocks until either of this happens:
0359      *
0360      * \li the thread is interrupted due to a call to \c stop
0361      * \li an exception is thrown while processing a log record in the backend, and the exception is
0362      *     not terminated by the exception handler, if one is installed
0363      *
0364      * \pre The sink frontend must be constructed without spawning a dedicated thread
0365      */
0366     void run()
0367     {
0368         // First check that no other thread is running
0369         {
0370             std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0371             if (start_feeding_operation(lock, feeding_records))
0372                 return;
0373         }
0374 
0375         scoped_feeding_operation guard(*this);
0376 
0377         // Now start the feeding loop
0378         while (true)
0379         {
0380             do_feed_records();
0381             if (!m_StopRequested.load(boost::memory_order_acquire))
0382             {
0383                 // Block until new record is available
0384                 record_view rec;
0385                 if (queue_base_type::dequeue_ready(rec))
0386                     base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
0387             }
0388             else
0389                 break;
0390         }
0391     }
0392 
0393     /*!
0394      * The method softly interrupts record feeding loop. This method must be called when \c run,
0395      * \c feed_records or \c flush method execution has to be interrupted. Unlike regular thread
0396      * interruption, calling \c stop will not interrupt the record processing in the middle.
0397      * Instead, the sink frontend will attempt to finish its business with the record in progress
0398      * and return afterwards. This method can be called either if the sink was created with
0399      * an internal dedicated thread, or if the feeding loop was initiated by user.
0400      *
0401      * If no record feeding operation is in progress, calling \c stop marks the sink frontend
0402      * so that the next feeding operation stops immediately.
0403      *
0404      * \note Returning from this method does not guarantee that there are no records left buffered
0405      *       in the sink frontend. It is possible that log records keep coming during and after this
0406      *       method is called. At some point of execution of this method log records stop being processed,
0407      *       and all records that come after this point are put into the queue. These records will be
0408      *       processed upon further calls to \c run or \c feed_records.
0409      *
0410      * \note If the record feeding loop is being run in a user's thread (i.e. \c start_thread was specified
0411      *       as \c false on frontend construction), this method does not guarantee that upon return the thread
0412      *       has returned from the record feeding loop or that it won't enter it in the future. The method
0413      *       only ensures that the record feeding thread will eventually return from the feeding loop. It is
0414      *       user's responsibility to synchronize with the user's record feeding thread.
0415      */
0416     void stop()
0417     {
0418         std::thread feeding_thread;
0419         {
0420             std::lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
0421 
0422             m_StopRequested.store(true, boost::memory_order_release);
0423             queue_base_type::interrupt_dequeue();
0424 
0425             m_DedicatedFeedingThread.swap(feeding_thread);
0426         }
0427 
0428         if (feeding_thread.joinable())
0429             feeding_thread.join();
0430     }
0431 
0432     /*!
0433      * The method feeds log records that may have been buffered to the backend and returns
0434      *
0435      * \pre The sink frontend must be constructed without spawning a dedicated thread
0436      */
0437     void feed_records()
0438     {
0439         // First check that no other thread is running
0440         {
0441             std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0442             if (start_feeding_operation(lock, feeding_records))
0443                 return;
0444         }
0445 
0446         scoped_feeding_operation guard(*this);
0447 
0448         // Now start the feeding loop
0449         do_feed_records();
0450     }
0451 
0452     /*!
0453      * The method feeds all log records that may have been buffered to the backend and returns.
0454      * Unlike \c feed_records, in case of ordering queueing the method also feeds records
0455      * that were enqueued during the ordering window, attempting to drain the queue completely.
0456      */
0457     void flush() BOOST_OVERRIDE
0458     {
0459         {
0460             std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0461             if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
0462             {
0463                 // There is already a thread feeding records, let it do the job
0464                 m_FlushRequested.store(true, boost::memory_order_release);
0465                 queue_base_type::interrupt_dequeue();
0466                 while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
0467                     m_BlockCond.wait(lock);
0468 
0469                 // The condition may have been signalled when the feeding operation was finishing.
0470                 // In that case records may not have been flushed, and we do the flush ourselves.
0471                 if (m_ActiveOperation != idle)
0472                     return;
0473             }
0474 
0475             m_ActiveOperation = flushing;
0476             m_FlushRequested.store(true, boost::memory_order_relaxed);
0477         }
0478 
0479         scoped_feeding_operation guard(*this);
0480 
0481         do_feed_records();
0482     }
0483 
0484 private:
0485 #ifndef BOOST_LOG_DOXYGEN_PASS
0486     //! The method spawns record feeding thread
0487     void start_feeding_thread()
0488     {
0489         std::thread(run_func(this)).swap(m_DedicatedFeedingThread);
0490     }
0491 
0492     //! Starts record feeding operation. The method blocks or throws if another feeding operation is in progress.
0493     bool start_feeding_operation(std::unique_lock< frontend_mutex_type >& lock, operation op)
0494     {
0495         while (m_ActiveOperation != idle)
0496         {
0497             if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
0498                 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
0499 
0500             if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
0501             {
0502                 m_StopRequested.store(false, boost::memory_order_relaxed);
0503                 return true;
0504             }
0505 
0506             m_BlockCond.wait(lock);
0507         }
0508 
0509         m_ActiveOperation = op;
0510 
0511         return false;
0512     }
0513 
0514     //! Completes record feeding operation
0515     void complete_feeding_operation() BOOST_NOEXCEPT
0516     {
0517         try
0518         {
0519             std::lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
0520             m_ActiveOperation = idle;
0521             m_StopRequested.store(false, boost::memory_order_relaxed);
0522             m_BlockCond.notify_all();
0523         }
0524         catch (...)
0525         {
0526         }
0527     }
0528 
0529     //! The record feeding loop
0530     void do_feed_records()
0531     {
0532         while (!m_StopRequested.load(boost::memory_order_acquire))
0533         {
0534             record_view rec;
0535             bool dequeued = false;
0536             if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
0537                 dequeued = queue_base_type::try_dequeue_ready(rec);
0538             else
0539                 dequeued = queue_base_type::try_dequeue(rec);
0540 
0541             if (dequeued)
0542                 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
0543             else
0544                 break;
0545         }
0546 
0547         if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
0548         {
0549             scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
0550             base_type::flush_backend(m_BackendMutex, *m_pBackend);
0551         }
0552     }
0553 #endif // BOOST_LOG_DOXYGEN_PASS
0554 };
0555 
0556 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
0557 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
0558 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
0559 
0560 } // namespace sinks
0561 
0562 BOOST_LOG_CLOSE_NAMESPACE // namespace log
0563 
0564 } // namespace boost
0565 
0566 #include <boost/log/detail/footer.hpp>
0567 
0568 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_