File indexing completed on 2025-09-13 08:38:41
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
0016 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
0017
0018 #include <thread>
0019 #include <mutex>
0020 #include <condition_variable>
0021 #include <exception> // std::terminate
0022 #include <boost/log/detail/config.hpp>
0023
0024 #ifdef BOOST_HAS_PRAGMA_ONCE
0025 #pragma once
0026 #endif
0027
0028 #if defined(BOOST_LOG_NO_THREADS)
0029 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
0030 #endif
0031
0032 #include <boost/memory_order.hpp>
0033 #include <boost/atomic/atomic.hpp>
0034 #include <boost/smart_ptr/shared_ptr.hpp>
0035 #include <boost/smart_ptr/make_shared_object.hpp>
0036 #include <boost/preprocessor/control/if.hpp>
0037 #include <boost/preprocessor/comparison/equal.hpp>
0038 #include <boost/log/exceptions.hpp>
0039 #include <boost/log/detail/locking_ptr.hpp>
0040 #include <boost/log/detail/parameter_tools.hpp>
0041 #include <boost/log/core/record_view.hpp>
0042 #include <boost/log/sinks/basic_sink_frontend.hpp>
0043 #include <boost/log/sinks/frontend_requirements.hpp>
0044 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
0045 #include <boost/log/keywords/start_thread.hpp>
0046 #include <boost/log/detail/header.hpp>
0047
0048 namespace boost {
0049
0050 BOOST_LOG_OPEN_NAMESPACE
0051
0052 namespace sinks {
0053
0054 #ifndef BOOST_LOG_DOXYGEN_PASS
0055
0056 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
0057 template< typename T0 >\
0058 explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
0059 base_type(true),\
0060 queue_base_type(arg0),\
0061 m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
0062 m_ActiveOperation(idle),\
0063 m_StopRequested(false),\
0064 m_FlushRequested(false)\
0065 {\
0066 if (arg0[keywords::start_thread | true])\
0067 start_feeding_thread();\
0068 }\
0069 template< typename T0 >\
0070 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
0071 base_type(true),\
0072 queue_base_type(arg0),\
0073 m_pBackend(backend),\
0074 m_ActiveOperation(idle),\
0075 m_StopRequested(false),\
0076 m_FlushRequested(false)\
0077 {\
0078 if (arg0[keywords::start_thread | true])\
0079 start_feeding_thread();\
0080 }
0081
0082 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
0083 template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
0084 explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
0085 base_type(true),\
0086 queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0087 m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0088 m_ActiveOperation(idle),\
0089 m_StopRequested(false),\
0090 m_FlushRequested(false)\
0091 {\
0092 if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
0093 start_feeding_thread();\
0094 }\
0095 template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
0096 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
0097 base_type(true),\
0098 queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0099 m_pBackend(backend),\
0100 m_ActiveOperation(idle),\
0101 m_StopRequested(false),\
0102 m_FlushRequested(false)\
0103 {\
0104 if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
0105 start_feeding_thread();\
0106 }
0107
0108 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
0109 BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
0110
0111 #endif
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
0126 class asynchronous_sink :
0127 public aux::make_sink_frontend_base< SinkBackendT >::type,
0128 public QueueingStrategyT
0129 {
0130 typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
0131 typedef QueueingStrategyT queue_base_type;
0132
0133 private:
0134
0135 typedef std::recursive_mutex backend_mutex_type;
0136
0137 typedef typename base_type::mutex_type frontend_mutex_type;
0138
0139
0140 enum operation
0141 {
0142 idle = 0u,
0143 feeding_records = 1u,
0144 flushing = 3u
0145 };
0146
0147
0148 class run_func
0149 {
0150 public:
0151 typedef void result_type;
0152
0153 private:
0154 asynchronous_sink* m_self;
0155
0156 public:
0157 explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
0158 {
0159 }
0160
0161 result_type operator()() const
0162 {
0163 m_self->run();
0164 }
0165 };
0166
0167
0168 class scoped_feeding_operation
0169 {
0170 private:
0171 asynchronous_sink& m_self;
0172
0173 public:
0174
0175 explicit scoped_feeding_operation(asynchronous_sink& self) : m_self(self)
0176 {
0177 }
0178
0179 ~scoped_feeding_operation()
0180 {
0181 m_self.complete_feeding_operation();
0182 }
0183
0184 BOOST_DELETED_FUNCTION(scoped_feeding_operation(scoped_feeding_operation const&))
0185 BOOST_DELETED_FUNCTION(scoped_feeding_operation& operator= (scoped_feeding_operation const&))
0186 };
0187
0188
0189 class scoped_flag
0190 {
0191 private:
0192 frontend_mutex_type& m_Mutex;
0193 std::condition_variable_any& m_Cond;
0194 boost::atomic< bool >& m_Flag;
0195
0196 public:
0197 explicit scoped_flag(frontend_mutex_type& mut, std::condition_variable_any& cond, boost::atomic< bool >& f) :
0198 m_Mutex(mut), m_Cond(cond), m_Flag(f)
0199 {
0200 }
0201 ~scoped_flag()
0202 {
0203 try
0204 {
0205 std::lock_guard< frontend_mutex_type > lock(m_Mutex);
0206 m_Flag.store(false, boost::memory_order_relaxed);
0207 m_Cond.notify_all();
0208 }
0209 catch (...)
0210 {
0211 }
0212 }
0213
0214 BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
0215 BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
0216 };
0217
0218 public:
0219
0220 typedef SinkBackendT sink_backend_type;
0221
0222 static_assert(has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value, "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
0223
0224
0225 #ifndef BOOST_LOG_DOXYGEN_PASS
0226
0227
0228 typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
0229
0230 #else
0231
0232
0233 typedef implementation_defined locked_backend_ptr;
0234
0235 #endif
0236
0237 private:
0238
0239 backend_mutex_type m_BackendMutex;
0240
0241 const shared_ptr< sink_backend_type > m_pBackend;
0242
0243
0244 std::thread m_DedicatedFeedingThread;
0245
0246 std::condition_variable_any m_BlockCond;
0247
0248
0249 operation m_ActiveOperation;
0250
0251 boost::atomic< bool > m_StopRequested;
0252
0253 boost::atomic< bool > m_FlushRequested;
0254
0255 public:
0256
0257
0258
0259
0260
0261
0262
0263
0264
0265 explicit asynchronous_sink(bool start_thread = true) :
0266 base_type(true),
0267 m_pBackend(boost::make_shared< sink_backend_type >()),
0268 m_ActiveOperation(idle),
0269 m_StopRequested(false),
0270 m_FlushRequested(false)
0271 {
0272 if (start_thread)
0273 start_feeding_thread();
0274 }
0275
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
0287 base_type(true),
0288 m_pBackend(backend),
0289 m_ActiveOperation(idle),
0290 m_StopRequested(false),
0291 m_FlushRequested(false)
0292 {
0293 if (start_thread)
0294 start_feeding_thread();
0295 }
0296
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307
0308 #ifndef BOOST_LOG_DOXYGEN_PASS
0309 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
0310 #else
0311 template< typename... Args >
0312 explicit asynchronous_sink(Args&&... args);
0313 #endif
0314
0315
0316
0317
0318 ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
0319 {
0320 stop();
0321 }
0322
0323
0324
0325
0326 locked_backend_ptr locked_backend()
0327 {
0328 return locked_backend_ptr(m_pBackend, m_BackendMutex);
0329 }
0330
0331
0332
0333
0334 void consume(record_view const& rec) BOOST_OVERRIDE
0335 {
0336 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
0337 {
0338 std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0339
0340 while (m_FlushRequested.load(boost::memory_order_acquire))
0341 m_BlockCond.wait(lock);
0342 }
0343 queue_base_type::enqueue(rec);
0344 }
0345
0346
0347
0348
0349 bool try_consume(record_view const& rec) BOOST_OVERRIDE
0350 {
0351 if (!m_FlushRequested.load(boost::memory_order_acquire))
0352 return queue_base_type::try_enqueue(rec);
0353 else
0354 return false;
0355 }
0356
0357
0358
0359
0360
0361
0362
0363
0364
0365
0366 void run()
0367 {
0368
0369 {
0370 std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0371 if (start_feeding_operation(lock, feeding_records))
0372 return;
0373 }
0374
0375 scoped_feeding_operation guard(*this);
0376
0377
0378 while (true)
0379 {
0380 do_feed_records();
0381 if (!m_StopRequested.load(boost::memory_order_acquire))
0382 {
0383
0384 record_view rec;
0385 if (queue_base_type::dequeue_ready(rec))
0386 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
0387 }
0388 else
0389 break;
0390 }
0391 }
0392
0393
0394
0395
0396
0397
0398
0399
0400
0401
0402
0403
0404
0405
0406
0407
0408
0409
0410
0411
0412
0413
0414
0415
0416 void stop()
0417 {
0418 std::thread feeding_thread;
0419 {
0420 std::lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
0421
0422 m_StopRequested.store(true, boost::memory_order_release);
0423 queue_base_type::interrupt_dequeue();
0424
0425 m_DedicatedFeedingThread.swap(feeding_thread);
0426 }
0427
0428 if (feeding_thread.joinable())
0429 feeding_thread.join();
0430 }
0431
0432
0433
0434
0435
0436
0437 void feed_records()
0438 {
0439
0440 {
0441 std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0442 if (start_feeding_operation(lock, feeding_records))
0443 return;
0444 }
0445
0446 scoped_feeding_operation guard(*this);
0447
0448
0449 do_feed_records();
0450 }
0451
0452
0453
0454
0455
0456
0457 void flush() BOOST_OVERRIDE
0458 {
0459 {
0460 std::unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0461 if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
0462 {
0463
0464 m_FlushRequested.store(true, boost::memory_order_release);
0465 queue_base_type::interrupt_dequeue();
0466 while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
0467 m_BlockCond.wait(lock);
0468
0469
0470
0471 if (m_ActiveOperation != idle)
0472 return;
0473 }
0474
0475 m_ActiveOperation = flushing;
0476 m_FlushRequested.store(true, boost::memory_order_relaxed);
0477 }
0478
0479 scoped_feeding_operation guard(*this);
0480
0481 do_feed_records();
0482 }
0483
0484 private:
0485 #ifndef BOOST_LOG_DOXYGEN_PASS
0486
0487 void start_feeding_thread()
0488 {
0489 std::thread(run_func(this)).swap(m_DedicatedFeedingThread);
0490 }
0491
0492
0493 bool start_feeding_operation(std::unique_lock< frontend_mutex_type >& lock, operation op)
0494 {
0495 while (m_ActiveOperation != idle)
0496 {
0497 if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
0498 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
0499
0500 if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
0501 {
0502 m_StopRequested.store(false, boost::memory_order_relaxed);
0503 return true;
0504 }
0505
0506 m_BlockCond.wait(lock);
0507 }
0508
0509 m_ActiveOperation = op;
0510
0511 return false;
0512 }
0513
0514
0515 void complete_feeding_operation() BOOST_NOEXCEPT
0516 {
0517 try
0518 {
0519 std::lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
0520 m_ActiveOperation = idle;
0521 m_StopRequested.store(false, boost::memory_order_relaxed);
0522 m_BlockCond.notify_all();
0523 }
0524 catch (...)
0525 {
0526 }
0527 }
0528
0529
0530 void do_feed_records()
0531 {
0532 while (!m_StopRequested.load(boost::memory_order_acquire))
0533 {
0534 record_view rec;
0535 bool dequeued = false;
0536 if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
0537 dequeued = queue_base_type::try_dequeue_ready(rec);
0538 else
0539 dequeued = queue_base_type::try_dequeue(rec);
0540
0541 if (dequeued)
0542 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
0543 else
0544 break;
0545 }
0546
0547 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
0548 {
0549 scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
0550 base_type::flush_backend(m_BackendMutex, *m_pBackend);
0551 }
0552 }
0553 #endif
0554 };
0555
0556 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
0557 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
0558 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
0559
0560 }
0561
0562 BOOST_LOG_CLOSE_NAMESPACE
0563
0564 }
0565
0566 #include <boost/log/detail/footer.hpp>
0567
0568 #endif