File indexing completed on 2025-01-18 09:28:35
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019 #include <boost/asio/detail/call_stack.hpp>
0020 #include <boost/asio/detail/strand_service.hpp>
0021
0022 #include <boost/asio/detail/push_options.hpp>
0023
0024 namespace boost {
0025 namespace asio {
0026 namespace detail {
0027
0028 struct strand_service::on_do_complete_exit
0029 {
0030 io_context_impl* owner_;
0031 strand_impl* impl_;
0032
0033 ~on_do_complete_exit()
0034 {
0035 impl_->mutex_.lock();
0036 impl_->ready_queue_.push(impl_->waiting_queue_);
0037 bool more_handlers = impl_->locked_ = !impl_->ready_queue_.empty();
0038 impl_->mutex_.unlock();
0039
0040 if (more_handlers)
0041 owner_->post_immediate_completion(impl_, true);
0042 }
0043 };
0044
0045 strand_service::strand_service(boost::asio::io_context& io_context)
0046 : boost::asio::detail::service_base<strand_service>(io_context),
0047 io_context_(io_context),
0048 io_context_impl_(boost::asio::use_service<io_context_impl>(io_context)),
0049 mutex_(),
0050 salt_(0)
0051 {
0052 }
0053
0054 void strand_service::shutdown()
0055 {
0056 op_queue<operation> ops;
0057
0058 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0059
0060 for (std::size_t i = 0; i < num_implementations; ++i)
0061 {
0062 if (strand_impl* impl = implementations_[i].get())
0063 {
0064 ops.push(impl->waiting_queue_);
0065 ops.push(impl->ready_queue_);
0066 }
0067 }
0068 }
0069
0070 void strand_service::construct(strand_service::implementation_type& impl)
0071 {
0072 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0073
0074 std::size_t salt = salt_++;
0075 #if defined(BOOST_ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION)
0076 std::size_t index = salt;
0077 #else
0078 std::size_t index = reinterpret_cast<std::size_t>(&impl);
0079 index += (reinterpret_cast<std::size_t>(&impl) >> 3);
0080 index ^= salt + 0x9e3779b9 + (index << 6) + (index >> 2);
0081 #endif
0082 index = index % num_implementations;
0083
0084 if (!implementations_[index].get())
0085 implementations_[index].reset(new strand_impl);
0086 impl = implementations_[index].get();
0087 }
0088
0089 bool strand_service::running_in_this_thread(
0090 const implementation_type& impl) const
0091 {
0092 return call_stack<strand_impl>::contains(impl) != 0;
0093 }
0094
0095 struct strand_service::on_dispatch_exit
0096 {
0097 io_context_impl* io_context_impl_;
0098 strand_impl* impl_;
0099
0100 ~on_dispatch_exit()
0101 {
0102 impl_->mutex_.lock();
0103 impl_->ready_queue_.push(impl_->waiting_queue_);
0104 bool more_handlers = impl_->locked_ = !impl_->ready_queue_.empty();
0105 impl_->mutex_.unlock();
0106
0107 if (more_handlers)
0108 io_context_impl_->post_immediate_completion(impl_, false);
0109 }
0110 };
0111
0112 void strand_service::do_dispatch(implementation_type& impl, operation* op)
0113 {
0114
0115
0116 bool can_dispatch = io_context_impl_.can_dispatch();
0117 impl->mutex_.lock();
0118 if (can_dispatch && !impl->locked_)
0119 {
0120
0121 impl->locked_ = true;
0122 impl->mutex_.unlock();
0123
0124
0125 call_stack<strand_impl>::context ctx(impl);
0126
0127
0128 on_dispatch_exit on_exit = { &io_context_impl_, impl };
0129 (void)on_exit;
0130
0131 op->complete(&io_context_impl_, boost::system::error_code(), 0);
0132 return;
0133 }
0134
0135 if (impl->locked_)
0136 {
0137
0138 impl->waiting_queue_.push(op);
0139 impl->mutex_.unlock();
0140 }
0141 else
0142 {
0143
0144
0145 impl->locked_ = true;
0146 impl->mutex_.unlock();
0147 impl->ready_queue_.push(op);
0148 io_context_impl_.post_immediate_completion(impl, false);
0149 }
0150 }
0151
0152 void strand_service::do_post(implementation_type& impl,
0153 operation* op, bool is_continuation)
0154 {
0155 impl->mutex_.lock();
0156 if (impl->locked_)
0157 {
0158
0159 impl->waiting_queue_.push(op);
0160 impl->mutex_.unlock();
0161 }
0162 else
0163 {
0164
0165
0166 impl->locked_ = true;
0167 impl->mutex_.unlock();
0168 impl->ready_queue_.push(op);
0169 io_context_impl_.post_immediate_completion(impl, is_continuation);
0170 }
0171 }
0172
0173 void strand_service::do_complete(void* owner, operation* base,
0174 const boost::system::error_code& ec, std::size_t )
0175 {
0176 if (owner)
0177 {
0178 strand_impl* impl = static_cast<strand_impl*>(base);
0179
0180
0181 call_stack<strand_impl>::context ctx(impl);
0182
0183
0184 on_do_complete_exit on_exit;
0185 on_exit.owner_ = static_cast<io_context_impl*>(owner);
0186 on_exit.impl_ = impl;
0187
0188
0189
0190 while (operation* o = impl->ready_queue_.front())
0191 {
0192 impl->ready_queue_.pop();
0193 o->complete(owner, ec, 0);
0194 }
0195 }
0196 }
0197
0198 }
0199 }
0200 }
0201
0202 #include <boost/asio/detail/pop_options.hpp>
0203
0204 #endif