File indexing completed on 2025-01-18 09:28:33
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_SELECT_REACTOR_IPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019
0020 #if defined(BOOST_ASIO_HAS_IOCP) \
0021 || (!defined(BOOST_ASIO_HAS_DEV_POLL) \
0022 && !defined(BOOST_ASIO_HAS_EPOLL) \
0023 && !defined(BOOST_ASIO_HAS_KQUEUE) \
0024 && !defined(BOOST_ASIO_WINDOWS_RUNTIME))
0025
0026 #include <boost/asio/detail/fd_set_adapter.hpp>
0027 #include <boost/asio/detail/select_reactor.hpp>
0028 #include <boost/asio/detail/signal_blocker.hpp>
0029 #include <boost/asio/detail/socket_ops.hpp>
0030
0031 #if defined(BOOST_ASIO_HAS_IOCP)
0032 # include <boost/asio/detail/win_iocp_io_context.hpp>
0033 #else
0034 # include <boost/asio/detail/scheduler.hpp>
0035 #endif
0036
0037 #include <boost/asio/detail/push_options.hpp>
0038
0039 namespace boost {
0040 namespace asio {
0041 namespace detail {
0042
0043 #if defined(BOOST_ASIO_HAS_IOCP)
0044 class select_reactor::thread_function
0045 {
0046 public:
0047 explicit thread_function(select_reactor* r)
0048 : this_(r)
0049 {
0050 }
0051
0052 void operator()()
0053 {
0054 this_->run_thread();
0055 }
0056
0057 private:
0058 select_reactor* this_;
0059 };
0060 #endif
0061
0062 select_reactor::select_reactor(boost::asio::execution_context& ctx)
0063 : execution_context_service_base<select_reactor>(ctx),
0064 scheduler_(use_service<scheduler_type>(ctx)),
0065 mutex_(),
0066 interrupter_(),
0067 #if defined(BOOST_ASIO_HAS_IOCP)
0068 stop_thread_(false),
0069 thread_(0),
0070 restart_reactor_(this),
0071 #endif
0072 shutdown_(false)
0073 {
0074 #if defined(BOOST_ASIO_HAS_IOCP)
0075 boost::asio::detail::signal_blocker sb;
0076 thread_ = new boost::asio::detail::thread(thread_function(this));
0077 #endif
0078 }
0079
0080 select_reactor::~select_reactor()
0081 {
0082 shutdown();
0083 }
0084
0085 void select_reactor::shutdown()
0086 {
0087 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0088 shutdown_ = true;
0089 #if defined(BOOST_ASIO_HAS_IOCP)
0090 stop_thread_ = true;
0091 if (thread_)
0092 interrupter_.interrupt();
0093 #endif
0094 lock.unlock();
0095
0096 #if defined(BOOST_ASIO_HAS_IOCP)
0097 if (thread_)
0098 {
0099 thread_->join();
0100 delete thread_;
0101 thread_ = 0;
0102 }
0103 #endif
0104
0105 op_queue<operation> ops;
0106
0107 for (int i = 0; i < max_ops; ++i)
0108 op_queue_[i].get_all_operations(ops);
0109
0110 timer_queues_.get_all_timers(ops);
0111
0112 scheduler_.abandon_operations(ops);
0113 }
0114
0115 void select_reactor::notify_fork(
0116 boost::asio::execution_context::fork_event fork_ev)
0117 {
0118 #if defined(BOOST_ASIO_HAS_IOCP)
0119 (void)fork_ev;
0120 #else
0121 if (fork_ev == boost::asio::execution_context::fork_child)
0122 interrupter_.recreate();
0123 #endif
0124 }
0125
0126 void select_reactor::init_task()
0127 {
0128 scheduler_.init_task();
0129 }
0130
0131 int select_reactor::register_descriptor(socket_type,
0132 select_reactor::per_descriptor_data&)
0133 {
0134 return 0;
0135 }
0136
0137 int select_reactor::register_internal_descriptor(
0138 int op_type, socket_type descriptor,
0139 select_reactor::per_descriptor_data&, reactor_op* op)
0140 {
0141 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0142
0143 op_queue_[op_type].enqueue_operation(descriptor, op);
0144 interrupter_.interrupt();
0145
0146 return 0;
0147 }
0148
0149 void select_reactor::move_descriptor(socket_type,
0150 select_reactor::per_descriptor_data&,
0151 select_reactor::per_descriptor_data&)
0152 {
0153 }
0154
0155 void select_reactor::call_post_immediate_completion(
0156 operation* op, bool is_continuation, const void* self)
0157 {
0158 static_cast<const select_reactor*>(self)->post_immediate_completion(
0159 op, is_continuation);
0160 }
0161
0162 void select_reactor::start_op(int op_type, socket_type descriptor,
0163 select_reactor::per_descriptor_data&, reactor_op* op, bool is_continuation,
0164 bool, void (*on_immediate)(operation*, bool, const void*),
0165 const void* immediate_arg)
0166 {
0167 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0168
0169 if (shutdown_)
0170 {
0171 on_immediate(op, is_continuation, immediate_arg);
0172 return;
0173 }
0174
0175 bool first = op_queue_[op_type].enqueue_operation(descriptor, op);
0176 scheduler_.work_started();
0177 if (first)
0178 interrupter_.interrupt();
0179 }
0180
0181 void select_reactor::cancel_ops(socket_type descriptor,
0182 select_reactor::per_descriptor_data&)
0183 {
0184 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0185 cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
0186 }
0187
0188 void select_reactor::cancel_ops_by_key(socket_type descriptor,
0189 select_reactor::per_descriptor_data&,
0190 int op_type, void* cancellation_key)
0191 {
0192 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0193 op_queue<operation> ops;
0194 bool need_interrupt = op_queue_[op_type].cancel_operations_by_key(
0195 descriptor, ops, cancellation_key, boost::asio::error::operation_aborted);
0196 scheduler_.post_deferred_completions(ops);
0197 if (need_interrupt)
0198 interrupter_.interrupt();
0199 }
0200
0201 void select_reactor::deregister_descriptor(socket_type descriptor,
0202 select_reactor::per_descriptor_data&, bool)
0203 {
0204 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0205 cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
0206 }
0207
0208 void select_reactor::deregister_internal_descriptor(
0209 socket_type descriptor, select_reactor::per_descriptor_data&)
0210 {
0211 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0212 op_queue<operation> ops;
0213 for (int i = 0; i < max_ops; ++i)
0214 op_queue_[i].cancel_operations(descriptor, ops);
0215 }
0216
0217 void select_reactor::cleanup_descriptor_data(
0218 select_reactor::per_descriptor_data&)
0219 {
0220 }
0221
0222 void select_reactor::run(long usec, op_queue<operation>& ops)
0223 {
0224 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0225
0226 #if defined(BOOST_ASIO_HAS_IOCP)
0227
0228 if (stop_thread_)
0229 return;
0230 #endif
0231
0232
0233 for (int i = 0; i < max_select_ops; ++i)
0234 fd_sets_[i].reset();
0235 fd_sets_[read_op].set(interrupter_.read_descriptor());
0236 socket_type max_fd = 0;
0237 bool have_work_to_do = !timer_queues_.all_empty();
0238 for (int i = 0; i < max_select_ops; ++i)
0239 {
0240 have_work_to_do = have_work_to_do || !op_queue_[i].empty();
0241 fd_sets_[i].set(op_queue_[i], ops);
0242 if (fd_sets_[i].max_descriptor() > max_fd)
0243 max_fd = fd_sets_[i].max_descriptor();
0244 }
0245
0246 #if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
0247
0248 have_work_to_do = have_work_to_do || !op_queue_[connect_op].empty();
0249 fd_sets_[write_op].set(op_queue_[connect_op], ops);
0250 if (fd_sets_[write_op].max_descriptor() > max_fd)
0251 max_fd = fd_sets_[write_op].max_descriptor();
0252 fd_sets_[except_op].set(op_queue_[connect_op], ops);
0253 if (fd_sets_[except_op].max_descriptor() > max_fd)
0254 max_fd = fd_sets_[except_op].max_descriptor();
0255 #endif
0256
0257
0258
0259 if (!usec && !have_work_to_do)
0260 return;
0261
0262
0263 timeval tv_buf = { 0, 0 };
0264 timeval* tv = usec ? get_timeout(usec, tv_buf) : &tv_buf;
0265
0266 lock.unlock();
0267
0268
0269 boost::system::error_code ec;
0270 int retval = socket_ops::select(static_cast<int>(max_fd + 1),
0271 fd_sets_[read_op], fd_sets_[write_op], fd_sets_[except_op], tv, ec);
0272
0273
0274 if (retval > 0 && fd_sets_[read_op].is_set(interrupter_.read_descriptor()))
0275 {
0276 if (!interrupter_.reset())
0277 {
0278 lock.lock();
0279 #if defined(BOOST_ASIO_HAS_IOCP)
0280 stop_thread_ = true;
0281 scheduler_.post_immediate_completion(&restart_reactor_, false);
0282 #else
0283 interrupter_.recreate();
0284 #endif
0285 }
0286 --retval;
0287 }
0288
0289 lock.lock();
0290
0291
0292 if (retval > 0)
0293 {
0294 #if defined(BOOST_ASIO_WINDOWS) || defined(__CYGWIN__)
0295
0296 fd_sets_[except_op].perform(op_queue_[connect_op], ops);
0297 fd_sets_[write_op].perform(op_queue_[connect_op], ops);
0298 #endif
0299
0300
0301
0302 for (int i = max_select_ops - 1; i >= 0; --i)
0303 fd_sets_[i].perform(op_queue_[i], ops);
0304 }
0305 timer_queues_.get_ready_timers(ops);
0306 }
0307
0308 void select_reactor::interrupt()
0309 {
0310 interrupter_.interrupt();
0311 }
0312
0313 #if defined(BOOST_ASIO_HAS_IOCP)
0314 void select_reactor::run_thread()
0315 {
0316 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0317 while (!stop_thread_)
0318 {
0319 lock.unlock();
0320 op_queue<operation> ops;
0321 run(-1, ops);
0322 scheduler_.post_deferred_completions(ops);
0323 lock.lock();
0324 }
0325 }
0326
0327 void select_reactor::restart_reactor::do_complete(void* owner, operation* base,
0328 const boost::system::error_code& , std::size_t )
0329 {
0330 if (owner)
0331 {
0332 select_reactor* reactor = static_cast<restart_reactor*>(base)->reactor_;
0333
0334 if (reactor->thread_)
0335 {
0336 reactor->thread_->join();
0337 delete reactor->thread_;
0338 reactor->thread_ = 0;
0339 }
0340
0341 boost::asio::detail::mutex::scoped_lock lock(reactor->mutex_);
0342 reactor->interrupter_.recreate();
0343 reactor->stop_thread_ = false;
0344 lock.unlock();
0345
0346 boost::asio::detail::signal_blocker sb;
0347 reactor->thread_ =
0348 new boost::asio::detail::thread(thread_function(reactor));
0349 }
0350 }
0351 #endif
0352
0353 void select_reactor::do_add_timer_queue(timer_queue_base& queue)
0354 {
0355 mutex::scoped_lock lock(mutex_);
0356 timer_queues_.insert(&queue);
0357 }
0358
0359 void select_reactor::do_remove_timer_queue(timer_queue_base& queue)
0360 {
0361 mutex::scoped_lock lock(mutex_);
0362 timer_queues_.erase(&queue);
0363 }
0364
0365 timeval* select_reactor::get_timeout(long usec, timeval& tv)
0366 {
0367
0368
0369 const long max_usec = 5 * 60 * 1000 * 1000;
0370 usec = timer_queues_.wait_duration_usec(
0371 (usec < 0 || max_usec < usec) ? max_usec : usec);
0372 tv.tv_sec = usec / 1000000;
0373 tv.tv_usec = usec % 1000000;
0374 return &tv;
0375 }
0376
0377 void select_reactor::cancel_ops_unlocked(socket_type descriptor,
0378 const boost::system::error_code& ec)
0379 {
0380 bool need_interrupt = false;
0381 op_queue<operation> ops;
0382 for (int i = 0; i < max_ops; ++i)
0383 need_interrupt = op_queue_[i].cancel_operations(
0384 descriptor, ops, ec) || need_interrupt;
0385 scheduler_.post_deferred_completions(ops);
0386 if (need_interrupt)
0387 interrupter_.interrupt();
0388 }
0389
0390 }
0391 }
0392 }
0393
0394 #include <boost/asio/detail/pop_options.hpp>
0395
0396 #endif
0397
0398
0399
0400
0401
0402 #endif