File indexing completed on 2025-01-18 09:39:24
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015 #ifndef BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
0016 #define BOOST_LOG_SINKS_ASYNC_FRONTEND_HPP_INCLUDED_
0017
0018 #include <exception> // std::terminate
0019 #include <boost/log/detail/config.hpp>
0020
0021 #ifdef BOOST_HAS_PRAGMA_ONCE
0022 #pragma once
0023 #endif
0024
0025 #if defined(BOOST_LOG_NO_THREADS)
0026 #error Boost.Log: Asynchronous sink frontend is only supported in multithreaded environment
0027 #endif
0028
0029 #include <boost/memory_order.hpp>
0030 #include <boost/atomic/atomic.hpp>
0031 #include <boost/smart_ptr/shared_ptr.hpp>
0032 #include <boost/smart_ptr/make_shared_object.hpp>
0033 #include <boost/preprocessor/control/if.hpp>
0034 #include <boost/preprocessor/comparison/equal.hpp>
0035 #include <boost/thread/locks.hpp>
0036 #include <boost/thread/recursive_mutex.hpp>
0037 #include <boost/thread/thread.hpp>
0038 #include <boost/thread/condition_variable.hpp>
0039 #include <boost/log/exceptions.hpp>
0040 #include <boost/log/detail/locking_ptr.hpp>
0041 #include <boost/log/detail/parameter_tools.hpp>
0042 #include <boost/log/core/record_view.hpp>
0043 #include <boost/log/sinks/basic_sink_frontend.hpp>
0044 #include <boost/log/sinks/frontend_requirements.hpp>
0045 #include <boost/log/sinks/unbounded_fifo_queue.hpp>
0046 #include <boost/log/keywords/start_thread.hpp>
0047 #include <boost/log/detail/header.hpp>
0048
0049 namespace boost {
0050
0051 BOOST_LOG_OPEN_NAMESPACE
0052
0053 namespace sinks {
0054
0055 #ifndef BOOST_LOG_DOXYGEN_PASS
0056
0057 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1(z, n, data)\
0058 template< typename T0 >\
0059 explicit asynchronous_sink(T0 const& arg0, typename boost::log::aux::enable_if_named_parameters< T0, boost::log::aux::sfinae_dummy >::type = boost::log::aux::sfinae_dummy()) :\
0060 base_type(true),\
0061 queue_base_type(arg0),\
0062 m_pBackend(boost::make_shared< sink_backend_type >(arg0)),\
0063 m_ActiveOperation(idle),\
0064 m_StopRequested(false),\
0065 m_FlushRequested(false)\
0066 {\
0067 if (arg0[keywords::start_thread | true])\
0068 start_feeding_thread();\
0069 }\
0070 template< typename T0 >\
0071 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, T0 const& arg0) :\
0072 base_type(true),\
0073 queue_base_type(arg0),\
0074 m_pBackend(backend),\
0075 m_ActiveOperation(idle),\
0076 m_StopRequested(false),\
0077 m_FlushRequested(false)\
0078 {\
0079 if (arg0[keywords::start_thread | true])\
0080 start_feeding_thread();\
0081 }
0082
0083 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N(z, n, data)\
0084 template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
0085 explicit asynchronous_sink(BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
0086 base_type(true),\
0087 queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0088 m_pBackend(boost::make_shared< sink_backend_type >(BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0089 m_ActiveOperation(idle),\
0090 m_StopRequested(false),\
0091 m_FlushRequested(false)\
0092 {\
0093 if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
0094 start_feeding_thread();\
0095 }\
0096 template< BOOST_PP_ENUM_PARAMS_Z(z, n, typename T) >\
0097 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, BOOST_PP_ENUM_BINARY_PARAMS_Z(z, n, T, const& arg)) :\
0098 base_type(true),\
0099 queue_base_type((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))),\
0100 m_pBackend(backend),\
0101 m_ActiveOperation(idle),\
0102 m_StopRequested(false),\
0103 m_FlushRequested(false)\
0104 {\
0105 if ((BOOST_PP_ENUM_PARAMS_Z(z, n, arg))[keywords::start_thread | true])\
0106 start_feeding_thread();\
0107 }
0108
0109 #define BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL(z, n, data)\
0110 BOOST_PP_IF(BOOST_PP_EQUAL(n, 1), BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1, BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N)(z, n, data)
0111
0112 #endif
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126 template< typename SinkBackendT, typename QueueingStrategyT = unbounded_fifo_queue >
0127 class asynchronous_sink :
0128 public aux::make_sink_frontend_base< SinkBackendT >::type,
0129 public QueueingStrategyT
0130 {
0131 typedef typename aux::make_sink_frontend_base< SinkBackendT >::type base_type;
0132 typedef QueueingStrategyT queue_base_type;
0133
0134 private:
0135
0136 typedef boost::recursive_mutex backend_mutex_type;
0137
0138 typedef typename base_type::mutex_type frontend_mutex_type;
0139
0140
0141 enum operation
0142 {
0143 idle = 0u,
0144 feeding_records = 1u,
0145 flushing = 3u
0146 };
0147
0148
0149 class run_func
0150 {
0151 public:
0152 typedef void result_type;
0153
0154 private:
0155 asynchronous_sink* m_self;
0156
0157 public:
0158 explicit run_func(asynchronous_sink* self) BOOST_NOEXCEPT : m_self(self)
0159 {
0160 }
0161
0162 result_type operator()() const
0163 {
0164 m_self->run();
0165 }
0166 };
0167
0168
0169 class scoped_feeding_operation
0170 {
0171 private:
0172 asynchronous_sink& m_self;
0173
0174 public:
0175
0176 explicit scoped_feeding_operation(asynchronous_sink& self) : m_self(self)
0177 {
0178 }
0179
0180 ~scoped_feeding_operation()
0181 {
0182 m_self.complete_feeding_operation();
0183 }
0184
0185 BOOST_DELETED_FUNCTION(scoped_feeding_operation(scoped_feeding_operation const&))
0186 BOOST_DELETED_FUNCTION(scoped_feeding_operation& operator= (scoped_feeding_operation const&))
0187 };
0188
0189
0190 class scoped_flag
0191 {
0192 private:
0193 frontend_mutex_type& m_Mutex;
0194 condition_variable_any& m_Cond;
0195 boost::atomic< bool >& m_Flag;
0196
0197 public:
0198 explicit scoped_flag(frontend_mutex_type& mut, condition_variable_any& cond, boost::atomic< bool >& f) :
0199 m_Mutex(mut), m_Cond(cond), m_Flag(f)
0200 {
0201 }
0202 ~scoped_flag()
0203 {
0204 try
0205 {
0206 lock_guard< frontend_mutex_type > lock(m_Mutex);
0207 m_Flag.store(false, boost::memory_order_relaxed);
0208 m_Cond.notify_all();
0209 }
0210 catch (...)
0211 {
0212 }
0213 }
0214
0215 BOOST_DELETED_FUNCTION(scoped_flag(scoped_flag const&))
0216 BOOST_DELETED_FUNCTION(scoped_flag& operator= (scoped_flag const&))
0217 };
0218
0219 public:
0220
0221 typedef SinkBackendT sink_backend_type;
0222
0223 static_assert(has_requirement< typename sink_backend_type::frontend_requirements, synchronized_feeding >::value, "Asynchronous sink frontend is incompatible with the specified backend: thread synchronization requirements are not met");
0224
0225
0226 #ifndef BOOST_LOG_DOXYGEN_PASS
0227
0228
0229 typedef boost::log::aux::locking_ptr< sink_backend_type, backend_mutex_type > locked_backend_ptr;
0230
0231 #else
0232
0233
0234 typedef implementation_defined locked_backend_ptr;
0235
0236 #endif
0237
0238 private:
0239
0240 backend_mutex_type m_BackendMutex;
0241
0242 const shared_ptr< sink_backend_type > m_pBackend;
0243
0244
0245 thread m_DedicatedFeedingThread;
0246
0247 condition_variable_any m_BlockCond;
0248
0249
0250 operation m_ActiveOperation;
0251
0252 boost::atomic< bool > m_StopRequested;
0253
0254 boost::atomic< bool > m_FlushRequested;
0255
0256 public:
0257
0258
0259
0260
0261
0262
0263
0264
0265
0266 explicit asynchronous_sink(bool start_thread = true) :
0267 base_type(true),
0268 m_pBackend(boost::make_shared< sink_backend_type >()),
0269 m_ActiveOperation(idle),
0270 m_StopRequested(false),
0271 m_FlushRequested(false)
0272 {
0273 if (start_thread)
0274 start_feeding_thread();
0275 }
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287 explicit asynchronous_sink(shared_ptr< sink_backend_type > const& backend, bool start_thread = true) :
0288 base_type(true),
0289 m_pBackend(backend),
0290 m_ActiveOperation(idle),
0291 m_StopRequested(false),
0292 m_FlushRequested(false)
0293 {
0294 if (start_thread)
0295 start_feeding_thread();
0296 }
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307
0308
0309 #ifndef BOOST_LOG_DOXYGEN_PASS
0310 BOOST_LOG_PARAMETRIZED_CONSTRUCTORS_GEN(BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL, ~)
0311 #else
0312 template< typename... Args >
0313 explicit asynchronous_sink(Args&&... args);
0314 #endif
0315
0316
0317
0318
0319 ~asynchronous_sink() BOOST_NOEXCEPT BOOST_OVERRIDE
0320 {
0321 try
0322 {
0323 boost::this_thread::disable_interruption no_interrupts;
0324 stop();
0325 }
0326 catch (...)
0327 {
0328 std::terminate();
0329 }
0330 }
0331
0332
0333
0334
0335 locked_backend_ptr locked_backend()
0336 {
0337 return locked_backend_ptr(m_pBackend, m_BackendMutex);
0338 }
0339
0340
0341
0342
0343 void consume(record_view const& rec) BOOST_OVERRIDE
0344 {
0345 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
0346 {
0347 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0348
0349 while (m_FlushRequested.load(boost::memory_order_acquire))
0350 m_BlockCond.wait(lock);
0351 }
0352 queue_base_type::enqueue(rec);
0353 }
0354
0355
0356
0357
0358 bool try_consume(record_view const& rec) BOOST_OVERRIDE
0359 {
0360 if (!m_FlushRequested.load(boost::memory_order_acquire))
0361 {
0362 return queue_base_type::try_enqueue(rec);
0363 }
0364 else
0365 return false;
0366 }
0367
0368
0369
0370
0371
0372
0373
0374
0375
0376
0377 void run()
0378 {
0379
0380 {
0381 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0382 if (start_feeding_operation(lock, feeding_records))
0383 return;
0384 }
0385
0386 scoped_feeding_operation guard(*this);
0387
0388
0389 while (true)
0390 {
0391 do_feed_records();
0392 if (!m_StopRequested.load(boost::memory_order_acquire))
0393 {
0394
0395 record_view rec;
0396 if (queue_base_type::dequeue_ready(rec))
0397 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
0398 }
0399 else
0400 break;
0401 }
0402 }
0403
0404
0405
0406
0407
0408
0409
0410
0411
0412
0413
0414
0415
0416
0417
0418
0419
0420
0421
0422
0423
0424
0425
0426
0427 void stop()
0428 {
0429 boost::thread feeding_thread;
0430 {
0431 lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
0432
0433 m_StopRequested.store(true, boost::memory_order_release);
0434 queue_base_type::interrupt_dequeue();
0435
0436 m_DedicatedFeedingThread.swap(feeding_thread);
0437 }
0438
0439 if (feeding_thread.joinable())
0440 feeding_thread.join();
0441 }
0442
0443
0444
0445
0446
0447
0448 void feed_records()
0449 {
0450
0451 {
0452 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0453 if (start_feeding_operation(lock, feeding_records))
0454 return;
0455 }
0456
0457 scoped_feeding_operation guard(*this);
0458
0459
0460 do_feed_records();
0461 }
0462
0463
0464
0465
0466
0467
0468 void flush() BOOST_OVERRIDE
0469 {
0470 {
0471 unique_lock< frontend_mutex_type > lock(base_type::frontend_mutex());
0472 if (static_cast< unsigned int >(m_ActiveOperation & feeding_records) != 0u)
0473 {
0474
0475 m_FlushRequested.store(true, boost::memory_order_release);
0476 queue_base_type::interrupt_dequeue();
0477 while (!m_StopRequested.load(boost::memory_order_acquire) && m_FlushRequested.load(boost::memory_order_acquire))
0478 m_BlockCond.wait(lock);
0479
0480
0481
0482 if (m_ActiveOperation != idle)
0483 return;
0484 }
0485
0486 m_ActiveOperation = flushing;
0487 m_FlushRequested.store(true, boost::memory_order_relaxed);
0488 }
0489
0490 scoped_feeding_operation guard(*this);
0491
0492 do_feed_records();
0493 }
0494
0495 private:
0496 #ifndef BOOST_LOG_DOXYGEN_PASS
0497
0498 void start_feeding_thread()
0499 {
0500 boost::thread(run_func(this)).swap(m_DedicatedFeedingThread);
0501 }
0502
0503
0504 bool start_feeding_operation(unique_lock< frontend_mutex_type >& lock, operation op)
0505 {
0506 while (m_ActiveOperation != idle)
0507 {
0508 if (BOOST_UNLIKELY(op == feeding_records && m_ActiveOperation == feeding_records))
0509 BOOST_LOG_THROW_DESCR(unexpected_call, "Asynchronous sink frontend already runs a record feeding thread");
0510
0511 if (BOOST_UNLIKELY(m_StopRequested.load(boost::memory_order_relaxed)))
0512 {
0513 m_StopRequested.store(false, boost::memory_order_relaxed);
0514 return true;
0515 }
0516
0517 m_BlockCond.wait(lock);
0518 }
0519
0520 m_ActiveOperation = op;
0521
0522 return false;
0523 }
0524
0525
0526 void complete_feeding_operation() BOOST_NOEXCEPT
0527 {
0528 try
0529 {
0530 lock_guard< frontend_mutex_type > lock(base_type::frontend_mutex());
0531 m_ActiveOperation = idle;
0532 m_StopRequested.store(false, boost::memory_order_relaxed);
0533 m_BlockCond.notify_all();
0534 }
0535 catch (...)
0536 {
0537 }
0538 }
0539
0540
0541 void do_feed_records()
0542 {
0543 while (!m_StopRequested.load(boost::memory_order_acquire))
0544 {
0545 record_view rec;
0546 bool dequeued = false;
0547 if (BOOST_LIKELY(!m_FlushRequested.load(boost::memory_order_acquire)))
0548 dequeued = queue_base_type::try_dequeue_ready(rec);
0549 else
0550 dequeued = queue_base_type::try_dequeue(rec);
0551
0552 if (dequeued)
0553 base_type::feed_record(rec, m_BackendMutex, *m_pBackend);
0554 else
0555 break;
0556 }
0557
0558 if (BOOST_UNLIKELY(m_FlushRequested.load(boost::memory_order_acquire)))
0559 {
0560 scoped_flag guard(base_type::frontend_mutex(), m_BlockCond, m_FlushRequested);
0561 base_type::flush_backend(m_BackendMutex, *m_pBackend);
0562 }
0563 }
0564 #endif
0565 };
0566
0567 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_1
0568 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL_N
0569 #undef BOOST_LOG_SINK_CTOR_FORWARD_INTERNAL
0570
0571 }
0572
0573 BOOST_LOG_CLOSE_NAMESPACE
0574
0575 }
0576
0577 #include <boost/log/detail/footer.hpp>
0578
0579 #endif