Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:39:24

0001 /*
0002  *          Copyright Andrey Semashev 2007 - 2015.
0003  * Distributed under the Boost Software License, Version 1.0.
0004  *    (See accompanying file LICENSE_1_0.txt or copy at
0005  *          http://www.boost.org/LICENSE_1_0.txt)
0006  */
0007 /*!
0008  * \file   async_frontend.hpp
0009  * \author Andrey Semashev
0010  * \date   14.07.2009
0011  *
0012  * The header contains implementation of asynchronous sink frontend.
0013  */
0014 
0015 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
0016 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
0017 
0018 #include <exception> // std::terminate
0019 #include <boost/log/detail/config.hpp>
0020 
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024 
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
0027 #endif
0028 
0029 #include <boost/memory_order.hpp>
0030 #include <boost/atomic/atomic.hpp>
0031 #include <boost/smart_ptr/shared_ptr.hpp>
0032 #include <boost/smart_ptr/make_shared_object.hpp>
0033 #include <boost/preprocessor/control/if.hpp>
0034 #include <boost/preprocessor/comparison/equal.hpp>
0035 #include <boost/thread/locks.hpp>
0036 #include <boost/thread/recursive_mutex.hpp>
0037 #include <boost/thread/thread.hpp>
0038 #include <boost/thread/condition_variable.hpp>
0039 #include <boost/log/exceptions.hpp>
0040 #include <boost/log/detail/locking_ptr.hpp>
0041 #include <boost/log/detail/parameter_tools.hpp>
0042 #include <boost/log/core/record_view.hpp>
0043 #include <boost/log/sinks/basic_sink_frontend.hpp>
0044 #include <boost/log/sinks/frontend_requirements.hpp>
0045 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
0046 #include <boost/log/keywords/start_thread.hpp>
0047 #include <boost/log/detail/header.hpp>
0048 
0049 namespace boost {
0050 
0051 BOOST_LOG_OPEN_NAMESPACE
0052 
0053 namespace sinks {
0054 
0055 #ifndef BOOST_LOG_DOXYGEN_PASS
0056 
0057 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
0058     template< typename T0 >\
0059     explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
0060         base_type(true),\
0061         queue_base_type(arg0),\
0062         m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
0063         m_ActiveOperation(idle),\
0064         m_StopRequested(false),\
0065         m_FlushRequested(false)\
0066     {\
0067         if (arg0[keywords::start_thread | true])\
0068             start_feeding_thread();\
0069     }\
0070     template< typename T0 >\
0071     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
0072         base_type(true),\
0073         queue_base_type(arg0),\
0074         m_pBackend(backend),\
0075         m_ActiveOperation(idle),\
0076         m_StopRequested(false),\
0077         m_FlushRequested(false)\
0078     {\
0079         if (arg0[keywords::start_thread | true])\
0080             start_feeding_thread();\
0081     }
0082 
0083 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
0084     template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
0085     explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
0086         base_type(true),\
0087         queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0088         m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0089         m_ActiveOperation(idle),\
0090         m_StopRequested(false),\
0091         m_FlushRequested(false)\
0092     {\
0093         if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
0094             start_feeding_thread();\
0095     }\
0096     template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
0097     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
0098         base_type(true),\
0099         queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0100         m_pBackend(backend),\
0101         m_ActiveOperation(idle),\
0102         m_StopRequested(false),\
0103         m_FlushRequested(false)\
0104     {\
0105         if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
0106             start_feeding_thread();\
0107     }
0108 
0109 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
0110     BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
0111 
0112 #endif // BOOST_LOG_DOXYGEN_PASS
0113 
0114 /*!
0115  * \brief Asynchronous logging sink frontend
0116  *
0117  * The frontend starts a separate thread on construction. All logging records are passed
0118  * to the backend in this dedicated thread.
0119  *
0120  * The user can prevent spawning the internal thread by specifying \c start_thread parameter
0121  * with the value of \c false on construction. In this case log records will be buffered
0122  * in the internal queue until the user calls \c run, \c feed_records or \c flush in his own
0123  * thread. Log record queueing strategy is specified in the \c QueueingStrategyT template
0124  * parameter.
0125  */
0126 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
0127 class asynchronous_sink :
0128     public aux::make_sink_frontend_base< SinkBackendT >::type,
0129     public QueueingStrategyT
0130 {
0131     typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
0132     typedef QueueingStrategyT queue_base_type;
0133 
0134 private:
0135     //! Backend synchronization mutex type
0136     typedef boost::recursive_mutex backend_mutex_type;
0137     //! Frontend synchronization mutex type
0138     typedef typename base_type::mutex_type frontend_mutex_type;
0139 
0140     //! Operation bit mask
0141     enum operation
0142     {
0143         idle = 0u,
0144         feeding_records = 1u,
0145         flushing = 3u
0146     };
0147 
0148     //! Function object to run the log record feeding thread
0149     class run_func
0150     {
0151     public:
0152         typedef void result_type;
0153 
0154     private:
0155         asynchronous_sink* m_self;
0156 
0157     public:
0158         explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
0159         {
0160         }
0161 
0162         result_type operator()() const
0163         {
0164             m_self->run();
0165         }
0166     };
0167 
0168     //! A scope guard that implements active operation management
0169     class scoped_feeding_operation
0170     {
0171     private:
0172         asynchronous_sink& m_self;
0173 
0174     public:
0175         //! Initializing constructor
0176         explicit scoped_feeding_operation(asynchronous_sink& self) : m_self(self)
0177         {
0178         }
0179         //! Destructor
0180         ~scoped_feeding_operation()
0181         {
0182             m_self.complete_feeding_operation();
0183         }
0184 
0185         BOOST_DELETED_FUNCTION(scoped_feeding_operation(scoped_feeding_operation const&))
0186         BOOST_DELETED_FUNCTION(scoped_feeding_operation& operator= (scoped_feeding_operation const&))
0187     };
0188 
0189     //! A scope guard that resets a flag on destructor
0190     class scoped_flag
0191     {
0192     private:
0193         frontend_mutex_type& m_Mutex;
0194         condition_variable_any& m_Cond;
0195         boost::atomic< bool >& m_Flag;
0196 
0197     public:
0198         explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) :
0199             m_Mutex(mut), m_Cond(cond), m_Flag(f)
0200         {
0201         }
0202         ~scoped_flag()
0203         {
0204             try
0205             {
0206                 lock_guard< frontend_mutex_type > lock(m_Mutex);
0207                 m_Flag.store(false, boost::memory_order_relaxed);
0208                 m_Cond.notify_all();
0209             }
0210             catch (...)
0211             {
0212             }
0213         }
0214 
0215         BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
0216         BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
0217     };
0218 
0219 public:
0220     //! Sink implementation type
0221     typedef SinkBackendT sink_backend_type;
0222     //! \cond
0223     static_assert(has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value, "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
0224     //! \endcond
0225 
0226 #ifndef BOOST_LOG_DOXYGEN_PASS
0227 
0228     //! A pointer type that locks the backend until it's destroyed
0229     typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
0230 
0231 #else // BOOST_LOG_DOXYGEN_PASS
0232 
0233     //! A pointer type that locks the backend until it's destroyed
0234     typedef implementation_defined locked_backend_ptr;
0235 
0236 #endif // BOOST_LOG_DOXYGEN_PASS
0237 
0238 private:
0239     //! Synchronization mutex
0240     backend_mutex_type m_BackendMutex;
0241     //! Pointer to the backend
0242     const shared_ptr< sink_backend_type > m_pBackend;
0243 
0244     //! Dedicated record feeding thread
0245     thread m_DedicatedFeedingThread;
0246     //! Condition variable to implement blocking operations
0247     condition_variable_any m_BlockCond;
0248 
0249     //! Currently active operation
0250     operation m_ActiveOperation;
0251     //! The flag indicates that the feeding loop has to be stopped
0252     boost::atomic< bool > m_StopRequested;
0253     //! The flag indicates that queue flush has been requested
0254     boost::atomic< bool > m_FlushRequested;
0255 
0256 public:
0257     /*!
0258      * Default constructor. Constructs the sink backend instance.
0259      * Requires the backend to be default-constructible.
0260      *
0261      * \param start_thread If \c true, the frontend creates a thread to feed
0262      *                     log records to the backend. Otherwise no thread is
0263      *                     started and it is assumed that the user will call
0264      *                     \c run, \c feed_records or \c flush himself.
0265      */
0266     explicit asynchronous_sink(bool start_thread = true) :
0267         base_type(true),
0268         m_pBackend(boost::make_shared< sink_backend_type >()),
0269         m_ActiveOperation(idle),
0270         m_StopRequested(false),
0271         m_FlushRequested(false)
0272     {
0273         if (start_thread)
0274             start_feeding_thread();
0275     }
0276     /*!
0277      * Constructor attaches user-constructed backend instance
0278      *
0279      * \param backend Pointer to the backend instance.
0280      * \param start_thread If \c true, the frontend creates a thread to feed
0281      *                     log records to the backend. Otherwise no thread is
0282      *                     started and it is assumed that the user will call
0283      *                     \c run, \c feed_records or \c flush himself.
0284      *
0285      * \pre \a backend is not \c NULL.
0286      */
0287     explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
0288         base_type(true),
0289         m_pBackend(backend),
0290         m_ActiveOperation(idle),
0291         m_StopRequested(false),
0292         m_FlushRequested(false)
0293     {
0294         if (start_thread)
0295             start_feeding_thread();
0296     }
0297 
0298     /*!
0299      * Constructor that passes arbitrary named parameters to the interprocess sink backend constructor.
0300      * Refer to the backend documentation for the list of supported parameters.
0301      *
0302      * The frontend uses the following named parameters:
0303      *
0304      *   \li start_thread - If \c true, the frontend creates a thread to feed
0305      *                      log records to the backend. Otherwise no thread is
0306      *                      started and it is assumed that the user will call
0307      *                      \c run, \c feed_records or \c flush himself.
0308      */
0309 #ifndef BOOST_LOG_DOXYGEN_PASS
0310     BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
0311 #else
0312     template< typename... Args >
0313     explicit asynchronous_sink(Args&&... args);
0314 #endif
0315 
0316     /*!
0317      * Destructor. Implicitly stops the dedicated feeding thread, if one is running.
0318      */
0319     ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
0320     {
0321         try
0322         {
0323             boost::this_thread::disable_interruption no_interrupts;
0324             stop();
0325         }
0326         catch (...)
0327         {
0328             std::terminate();
0329         }
0330     }
0331 
0332     /*!
0333      * Locking accessor to the attached backend
0334      */
0335     locked_backend_ptr locked_backend()
0336     {
0337         return locked_backend_ptr(m_pBackend, m_BackendMutex);
0338     }
0339 
0340     /*!
0341      * Enqueues the log record to the backend
0342      */
0343     void consume(record_view const& rec) BOOST_OVERRIDE
0344     {
0345         if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
0346         {
0347             unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0348             // Wait until flush is done
0349             while (m_FlushRequested.load(boost::memory_order_acquire))
0350                 m_BlockCond.wait(lock);
0351         }
0352         queue_base_type::enqueue(rec);
0353     }
0354 
0355     /*!
0356      * The method attempts to pass logging record to the backend
0357      */
0358     bool try_consume(record_view const& rec) BOOST_OVERRIDE
0359     {
0360         if (!m_FlushRequested.load(boost::memory_order_acquire))
0361         {
0362             return queue_base_type::try_enqueue(rec);
0363         }
0364         else
0365             return false;
0366     }
0367 
0368     /*!
0369      * The method starts record feeding loop and effectively blocks until either of this happens:
0370      *
0371      * \li the thread is interrupted due to either standard thread interruption or a call to \c stop
0372      * \li an exception is thrown while processing a log record in the backend, and the exception is
0373      *     not terminated by the exception handler, if one is installed
0374      *
0375      * \pre The sink frontend must be constructed without spawning a dedicated thread
0376      */
0377     void run()
0378     {
0379         // First check that no other thread is running
0380         {
0381             unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0382             if (start_feeding_operation(lock, feeding_records))
0383                 return;
0384         }
0385 
0386         scoped_feeding_operation guard(*this);
0387 
0388         // Now start the feeding loop
0389         while (true)
0390         {
0391             do_feed_records();
0392             if (!m_StopRequested.load(boost::memory_order_acquire))
0393             {
0394                 // Block until new record is available
0395                 record_view rec;
0396                 if (queue_base_type::dequeue_ready(rec))
0397                     base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
0398             }
0399             else
0400                 break;
0401         }
0402     }
0403 
0404     /*!
0405      * The method softly interrupts record feeding loop. This method must be called when \c run,
0406      * \c feed_records or \c flush method execution has to be interrupted. Unlike regular thread
0407      * interruption, calling \c stop will not interrupt the record processing in the middle.
0408      * Instead, the sink frontend will attempt to finish its business with the record in progress
0409      * and return afterwards. This method can be called either if the sink was created with
0410      * an internal dedicated thread, or if the feeding loop was initiated by user.
0411      *
0412      * If no record feeding operation is in progress, calling \c stop marks the sink frontend
0413      * so that the next feeding operation stops immediately.
0414      *
0415      * \note Returning from this method does not guarantee that there are no records left buffered
0416      *       in the sink frontend. It is possible that log records keep coming during and after this
0417      *       method is called. At some point of execution of this method log records stop being processed,
0418      *       and all records that come after this point are put into the queue. These records will be
0419      *       processed upon further calls to \c run or \c feed_records.
0420      *
0421      * \note If the record feeding loop is being run in a user's thread (i.e. \c start_thread was specified
0422      *       as \c false on frontend construction), this method does not guarantee that upon return the thread
0423      *       has returned from the record feeding loop or that it won't enter it in the future. The method
0424      *       only ensures that the record feeding thread will eventually return from the feeding loop. It is
0425      *       user's responsibility to synchronize with the user's record feeding thread.
0426      */
0427     void stop()
0428     {
0429         boost::thread feeding_thread;
0430         {
0431             lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
0432 
0433             m_StopRequested.store(true, boost::memory_order_release);
0434             queue_base_type::interrupt_dequeue();
0435 
0436             m_DedicatedFeedingThread.swap(feeding_thread);
0437         }
0438 
0439         if (feeding_thread.joinable())
0440             feeding_thread.join();
0441     }
0442 
0443     /*!
0444      * The method feeds log records that may have been buffered to the backend and returns
0445      *
0446      * \pre The sink frontend must be constructed without spawning a dedicated thread
0447      */
0448     void feed_records()
0449     {
0450         // First check that no other thread is running
0451         {
0452             unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0453             if (start_feeding_operation(lock, feeding_records))
0454                 return;
0455         }
0456 
0457         scoped_feeding_operation guard(*this);
0458 
0459         // Now start the feeding loop
0460         do_feed_records();
0461     }
0462 
0463     /*!
0464      * The method feeds all log records that may have been buffered to the backend and returns.
0465      * Unlike \c feed_records, in case of ordering queueing the method also feeds records
0466      * that were enqueued during the ordering window, attempting to drain the queue completely.
0467      */
0468     void flush() BOOST_OVERRIDE
0469     {
0470         {
0471             unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0472             if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
0473             {
0474                 // There is already a thread feeding records, let it do the job
0475                 m_FlushRequested.store(true, boost::memory_order_release);
0476                 queue_base_type::interrupt_dequeue();
0477                 while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
0478                     m_BlockCond.wait(lock);
0479 
0480                 // The condition may have been signalled when the feeding operation was finishing.
0481                 // In that case records may not have been flushed, and we do the flush ourselves.
0482                 if (m_ActiveOperation != idle)
0483                     return;
0484             }
0485 
0486             m_ActiveOperation = flushing;
0487             m_FlushRequested.store(true, boost::memory_order_relaxed);
0488         }
0489 
0490         scoped_feeding_operation guard(*this);
0491 
0492         do_feed_records();
0493     }
0494 
0495 private:
0496 #ifndef BOOST_LOG_DOXYGEN_PASS
0497     //! The method spawns record feeding thread
0498     void start_feeding_thread()
0499     {
0500         boost::thread(run_func(this)).swap(m_DedicatedFeedingThread);
0501     }
0502 
0503     //! Starts record feeding operation. The method blocks or throws if another feeding operation is in progress.
0504     bool start_feeding_operation(unique_lock< frontend_mutex_type >& lock, operation op)
0505     {
0506         while (m_ActiveOperation != idle)
0507         {
0508             if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
0509                 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
0510 
0511             if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
0512             {
0513                 m_StopRequested.store(false, boost::memory_order_relaxed);
0514                 return true;
0515             }
0516 
0517             m_BlockCond.wait(lock);
0518         }
0519 
0520         m_ActiveOperation = op;
0521 
0522         return false;
0523     }
0524 
0525     //! Completes record feeding operation
0526     void complete_feeding_operation() BOOST_NOEXCEPT
0527     {
0528         try
0529         {
0530             lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
0531             m_ActiveOperation = idle;
0532             m_StopRequested.store(false, boost::memory_order_relaxed);
0533             m_BlockCond.notify_all();
0534         }
0535         catch (...)
0536         {
0537         }
0538     }
0539 
0540     //! The record feeding loop
0541     void do_feed_records()
0542     {
0543         while (!m_StopRequested.load(boost::memory_order_acquire))
0544         {
0545             record_view rec;
0546             bool dequeued = false;
0547             if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
0548                 dequeued = queue_base_type::try_dequeue_ready(rec);
0549             else
0550                 dequeued = queue_base_type::try_dequeue(rec);
0551 
0552             if (dequeued)
0553                 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
0554             else
0555                 break;
0556         }
0557 
0558         if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
0559         {
0560             scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
0561             base_type::flush_backend(m_BackendMutex, *m_pBackend);
0562         }
0563     }
0564 #endif // BOOST_LOG_DOXYGEN_PASS
0565 };
0566 
0567 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
0568 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
0569 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
0570 
0571 } // namespace sinks
0572 
0573 BOOST_LOG_CLOSE_NAMESPACE // namespace log
0574 
0575 } // namespace boost
0576 
0577 #include <boost/log/detail/footer.hpp>
0578 
0579 #endif // BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_