File indexing completed on 2025-11-02 09:09:50
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019
0020 #if defined(BOOST_ASIO_HAS_EPOLL)
0021
0022 #include <cstddef>
0023 #include <sys/epoll.h>
0024 #include <boost/asio/config.hpp>
0025 #include <boost/asio/detail/epoll_reactor.hpp>
0026 #include <boost/asio/detail/scheduler.hpp>
0027 #include <boost/asio/detail/throw_error.hpp>
0028 #include <boost/asio/error.hpp>
0029
0030 #if defined(BOOST_ASIO_HAS_TIMERFD)
0031 # include <sys/timerfd.h>
0032 #endif
0033
0034 #include <boost/asio/detail/push_options.hpp>
0035
0036 namespace boost {
0037 namespace asio {
0038 namespace detail {
0039
0040 epoll_reactor::epoll_reactor(boost::asio::execution_context& ctx)
0041 : execution_context_service_base<epoll_reactor>(ctx),
0042 scheduler_(use_service<scheduler>(ctx)),
0043 mutex_(config(ctx).get("reactor", "registration_locking", true),
0044 config(ctx).get("reactor", "registration_locking_spin_count", 0)),
0045 interrupter_(),
0046 epoll_fd_(do_epoll_create()),
0047 timer_fd_(do_timerfd_create()),
0048 shutdown_(false),
0049 io_locking_(config(ctx).get("reactor", "io_locking", true)),
0050 io_locking_spin_count_(
0051 config(ctx).get("reactor", "io_locking_spin_count", 0)),
0052 registered_descriptors_mutex_(mutex_.enabled(), mutex_.spin_count()),
0053 registered_descriptors_(
0054 config(ctx).get("reactor", "preallocated_io_objects", 0U),
0055 io_locking_, io_locking_spin_count_)
0056 {
0057
0058 epoll_event ev = { 0, { 0 } };
0059 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0060 ev.data.ptr = &interrupter_;
0061 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
0062 interrupter_.interrupt();
0063
0064
0065 if (timer_fd_ != -1)
0066 {
0067 ev.events = EPOLLIN | EPOLLERR;
0068 ev.data.ptr = &timer_fd_;
0069 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
0070 }
0071 }
0072
0073 epoll_reactor::~epoll_reactor()
0074 {
0075 if (epoll_fd_ != -1)
0076 close(epoll_fd_);
0077 if (timer_fd_ != -1)
0078 close(timer_fd_);
0079 }
0080
0081 void epoll_reactor::shutdown()
0082 {
0083 mutex::scoped_lock lock(mutex_);
0084 shutdown_ = true;
0085 lock.unlock();
0086
0087 op_queue<operation> ops;
0088
0089 while (descriptor_state* state = registered_descriptors_.first())
0090 {
0091 for (int i = 0; i < max_ops; ++i)
0092 ops.push(state->op_queue_[i]);
0093 state->shutdown_ = true;
0094 registered_descriptors_.free(state);
0095 }
0096
0097 timer_queues_.get_all_timers(ops);
0098
0099 scheduler_.abandon_operations(ops);
0100 }
0101
0102 void epoll_reactor::notify_fork(
0103 boost::asio::execution_context::fork_event fork_ev)
0104 {
0105 if (fork_ev == boost::asio::execution_context::fork_child)
0106 {
0107 if (epoll_fd_ != -1)
0108 ::close(epoll_fd_);
0109 epoll_fd_ = -1;
0110 epoll_fd_ = do_epoll_create();
0111
0112 if (timer_fd_ != -1)
0113 ::close(timer_fd_);
0114 timer_fd_ = -1;
0115 timer_fd_ = do_timerfd_create();
0116
0117 interrupter_.recreate();
0118
0119
0120 epoll_event ev = { 0, { 0 } };
0121 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0122 ev.data.ptr = &interrupter_;
0123 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
0124 interrupter_.interrupt();
0125
0126
0127 if (timer_fd_ != -1)
0128 {
0129 ev.events = EPOLLIN | EPOLLERR;
0130 ev.data.ptr = &timer_fd_;
0131 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
0132 }
0133
0134 update_timeout();
0135
0136
0137 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0138 for (descriptor_state* state = registered_descriptors_.first();
0139 state != 0; state = state->next_)
0140 {
0141 if (state->registered_events_ != 0)
0142 {
0143 ev.events = state->registered_events_;
0144 ev.data.ptr = state;
0145 int result = epoll_ctl(epoll_fd_,
0146 EPOLL_CTL_ADD, state->descriptor_, &ev);
0147 if (result != 0)
0148 {
0149 boost::system::error_code ec(errno,
0150 boost::asio::error::get_system_category());
0151 boost::asio::detail::throw_error(ec, "epoll re-registration");
0152 }
0153 }
0154 }
0155 }
0156 }
0157
0158 void epoll_reactor::init_task()
0159 {
0160 scheduler_.init_task();
0161 }
0162
0163 int epoll_reactor::register_descriptor(socket_type descriptor,
0164 epoll_reactor::per_descriptor_data& descriptor_data)
0165 {
0166 descriptor_data = allocate_descriptor_state();
0167
0168 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0169 context(), static_cast<uintmax_t>(descriptor),
0170 reinterpret_cast<uintmax_t>(descriptor_data)));
0171
0172 {
0173 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0174
0175 descriptor_data->reactor_ = this;
0176 descriptor_data->descriptor_ = descriptor;
0177 descriptor_data->shutdown_ = false;
0178 for (int i = 0; i < max_ops; ++i)
0179 descriptor_data->try_speculative_[i] = true;
0180 }
0181
0182 epoll_event ev = { 0, { 0 } };
0183 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
0184 descriptor_data->registered_events_ = ev.events;
0185 ev.data.ptr = descriptor_data;
0186 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
0187 if (result != 0)
0188 {
0189 if (errno == EPERM)
0190 {
0191
0192
0193
0194
0195 descriptor_data->registered_events_ = 0;
0196 return 0;
0197 }
0198 return errno;
0199 }
0200
0201 return 0;
0202 }
0203
0204 int epoll_reactor::register_internal_descriptor(
0205 int op_type, socket_type descriptor,
0206 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
0207 {
0208 descriptor_data = allocate_descriptor_state();
0209
0210 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0211 context(), static_cast<uintmax_t>(descriptor),
0212 reinterpret_cast<uintmax_t>(descriptor_data)));
0213
0214 {
0215 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0216
0217 descriptor_data->reactor_ = this;
0218 descriptor_data->descriptor_ = descriptor;
0219 descriptor_data->shutdown_ = false;
0220 descriptor_data->op_queue_[op_type].push(op);
0221 for (int i = 0; i < max_ops; ++i)
0222 descriptor_data->try_speculative_[i] = true;
0223 }
0224
0225 epoll_event ev = { 0, { 0 } };
0226 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
0227 descriptor_data->registered_events_ = ev.events;
0228 ev.data.ptr = descriptor_data;
0229 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
0230 if (result != 0)
0231 {
0232
0233 descriptor_data->registered_events_ = 0;
0234 return errno;
0235 }
0236
0237 return 0;
0238 }
0239
0240 void epoll_reactor::move_descriptor(socket_type,
0241 epoll_reactor::per_descriptor_data& target_descriptor_data,
0242 epoll_reactor::per_descriptor_data& source_descriptor_data)
0243 {
0244 target_descriptor_data = source_descriptor_data;
0245 source_descriptor_data = 0;
0246 }
0247
0248 void epoll_reactor::call_post_immediate_completion(
0249 operation* op, bool is_continuation, const void* self)
0250 {
0251 static_cast<const epoll_reactor*>(self)->post_immediate_completion(
0252 op, is_continuation);
0253 }
0254
0255 void epoll_reactor::start_op(int op_type, socket_type descriptor,
0256 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
0257 bool is_continuation, bool allow_speculative,
0258 void (*on_immediate)(operation*, bool, const void*),
0259 const void* immediate_arg)
0260 {
0261 if (!descriptor_data)
0262 {
0263 op->ec_ = boost::asio::error::bad_descriptor;
0264 on_immediate(op, is_continuation, immediate_arg);
0265 return;
0266 }
0267
0268 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0269
0270 if (descriptor_data->shutdown_)
0271 {
0272 on_immediate(op, is_continuation, immediate_arg);
0273 return;
0274 }
0275
0276 if (descriptor_data->op_queue_[op_type].empty())
0277 {
0278 if (allow_speculative
0279 && (op_type != read_op
0280 || descriptor_data->op_queue_[except_op].empty()))
0281 {
0282 if (descriptor_data->try_speculative_[op_type])
0283 {
0284 if (reactor_op::status status = op->perform())
0285 {
0286 if (status == reactor_op::done_and_exhausted)
0287 if (descriptor_data->registered_events_ != 0)
0288 descriptor_data->try_speculative_[op_type] = false;
0289 descriptor_lock.unlock();
0290 on_immediate(op, is_continuation, immediate_arg);
0291 return;
0292 }
0293 }
0294
0295 if (descriptor_data->registered_events_ == 0)
0296 {
0297 op->ec_ = boost::asio::error::operation_not_supported;
0298 on_immediate(op, is_continuation, immediate_arg);
0299 return;
0300 }
0301
0302 if (op_type == write_op)
0303 {
0304 if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
0305 {
0306 epoll_event ev = { 0, { 0 } };
0307 ev.events = descriptor_data->registered_events_ | EPOLLOUT;
0308 ev.data.ptr = descriptor_data;
0309 if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
0310 {
0311 descriptor_data->registered_events_ |= ev.events;
0312 }
0313 else
0314 {
0315 op->ec_ = boost::system::error_code(errno,
0316 boost::asio::error::get_system_category());
0317 on_immediate(op, is_continuation, immediate_arg);
0318 return;
0319 }
0320 }
0321 }
0322 }
0323 else if (descriptor_data->registered_events_ == 0)
0324 {
0325 op->ec_ = boost::asio::error::operation_not_supported;
0326 on_immediate(op, is_continuation, immediate_arg);
0327 return;
0328 }
0329 else
0330 {
0331 if (op_type == write_op)
0332 {
0333 descriptor_data->registered_events_ |= EPOLLOUT;
0334 }
0335
0336 epoll_event ev = { 0, { 0 } };
0337 ev.events = descriptor_data->registered_events_;
0338 ev.data.ptr = descriptor_data;
0339 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
0340 }
0341 }
0342
0343 descriptor_data->op_queue_[op_type].push(op);
0344 scheduler_.work_started();
0345 }
0346
0347 void epoll_reactor::cancel_ops(socket_type,
0348 epoll_reactor::per_descriptor_data& descriptor_data)
0349 {
0350 if (!descriptor_data)
0351 return;
0352
0353 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0354
0355 op_queue<operation> ops;
0356 for (int i = 0; i < max_ops; ++i)
0357 {
0358 while (reactor_op* op = descriptor_data->op_queue_[i].front())
0359 {
0360 op->ec_ = boost::asio::error::operation_aborted;
0361 descriptor_data->op_queue_[i].pop();
0362 ops.push(op);
0363 }
0364 }
0365
0366 descriptor_lock.unlock();
0367
0368 scheduler_.post_deferred_completions(ops);
0369 }
0370
0371 void epoll_reactor::cancel_ops_by_key(socket_type,
0372 epoll_reactor::per_descriptor_data& descriptor_data,
0373 int op_type, void* cancellation_key)
0374 {
0375 if (!descriptor_data)
0376 return;
0377
0378 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0379
0380 op_queue<operation> ops;
0381 op_queue<reactor_op> other_ops;
0382 while (reactor_op* op = descriptor_data->op_queue_[op_type].front())
0383 {
0384 descriptor_data->op_queue_[op_type].pop();
0385 if (op->cancellation_key_ == cancellation_key)
0386 {
0387 op->ec_ = boost::asio::error::operation_aborted;
0388 ops.push(op);
0389 }
0390 else
0391 other_ops.push(op);
0392 }
0393 descriptor_data->op_queue_[op_type].push(other_ops);
0394
0395 descriptor_lock.unlock();
0396
0397 scheduler_.post_deferred_completions(ops);
0398 }
0399
0400 void epoll_reactor::deregister_descriptor(socket_type descriptor,
0401 epoll_reactor::per_descriptor_data& descriptor_data, bool closing)
0402 {
0403 if (!descriptor_data)
0404 return;
0405
0406 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0407
0408 if (!descriptor_data->shutdown_)
0409 {
0410 if (closing)
0411 {
0412
0413
0414 }
0415 else if (descriptor_data->registered_events_ != 0)
0416 {
0417 epoll_event ev = { 0, { 0 } };
0418 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
0419 }
0420
0421 op_queue<operation> ops;
0422 for (int i = 0; i < max_ops; ++i)
0423 {
0424 while (reactor_op* op = descriptor_data->op_queue_[i].front())
0425 {
0426 op->ec_ = boost::asio::error::operation_aborted;
0427 descriptor_data->op_queue_[i].pop();
0428 ops.push(op);
0429 }
0430 }
0431
0432 descriptor_data->descriptor_ = -1;
0433 descriptor_data->shutdown_ = true;
0434
0435 descriptor_lock.unlock();
0436
0437 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0438 context(), static_cast<uintmax_t>(descriptor),
0439 reinterpret_cast<uintmax_t>(descriptor_data)));
0440
0441 scheduler_.post_deferred_completions(ops);
0442
0443
0444
0445 }
0446 else
0447 {
0448
0449
0450 descriptor_data = 0;
0451 }
0452 }
0453
0454 void epoll_reactor::deregister_internal_descriptor(socket_type descriptor,
0455 epoll_reactor::per_descriptor_data& descriptor_data)
0456 {
0457 if (!descriptor_data)
0458 return;
0459
0460 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0461
0462 if (!descriptor_data->shutdown_)
0463 {
0464 epoll_event ev = { 0, { 0 } };
0465 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
0466
0467 op_queue<operation> ops;
0468 for (int i = 0; i < max_ops; ++i)
0469 ops.push(descriptor_data->op_queue_[i]);
0470
0471 descriptor_data->descriptor_ = -1;
0472 descriptor_data->shutdown_ = true;
0473
0474 descriptor_lock.unlock();
0475
0476 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0477 context(), static_cast<uintmax_t>(descriptor),
0478 reinterpret_cast<uintmax_t>(descriptor_data)));
0479
0480
0481
0482 }
0483 else
0484 {
0485
0486
0487 descriptor_data = 0;
0488 }
0489 }
0490
0491 void epoll_reactor::cleanup_descriptor_data(
0492 per_descriptor_data& descriptor_data)
0493 {
0494 if (descriptor_data)
0495 {
0496 free_descriptor_state(descriptor_data);
0497 descriptor_data = 0;
0498 }
0499 }
0500
0501 void epoll_reactor::run(long usec, op_queue<operation>& ops)
0502 {
0503
0504
0505
0506
0507
0508
0509
0510 int timeout;
0511 if (usec == 0)
0512 timeout = 0;
0513 else
0514 {
0515 timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
0516 if (timer_fd_ == -1)
0517 {
0518 mutex::scoped_lock lock(mutex_);
0519 timeout = get_timeout(timeout);
0520 }
0521 }
0522
0523
0524 epoll_event events[128];
0525 int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
0526
0527 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0528
0529 for (int i = 0; i < num_events; ++i)
0530 {
0531 void* ptr = events[i].data.ptr;
0532 if (ptr == &interrupter_)
0533 {
0534
0535 }
0536 # if defined(BOOST_ASIO_HAS_TIMERFD)
0537 else if (ptr == &timer_fd_)
0538 {
0539
0540 }
0541 # endif
0542 else
0543 {
0544 unsigned event_mask = 0;
0545 if ((events[i].events & EPOLLIN) != 0)
0546 event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT;
0547 if ((events[i].events & EPOLLOUT))
0548 event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT;
0549 if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0)
0550 event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT;
0551 BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(),
0552 reinterpret_cast<uintmax_t>(ptr), event_mask));
0553 }
0554 }
0555 #endif
0556
0557 #if defined(BOOST_ASIO_HAS_TIMERFD)
0558 bool check_timers = (timer_fd_ == -1);
0559 #else
0560 bool check_timers = true;
0561 #endif
0562
0563
0564 for (int i = 0; i < num_events; ++i)
0565 {
0566 void* ptr = events[i].data.ptr;
0567 if (ptr == &interrupter_)
0568 {
0569
0570
0571
0572
0573
0574 #if defined(BOOST_ASIO_HAS_TIMERFD)
0575 if (timer_fd_ == -1)
0576 check_timers = true;
0577 #else
0578 check_timers = true;
0579 #endif
0580 }
0581 #if defined(BOOST_ASIO_HAS_TIMERFD)
0582 else if (ptr == &timer_fd_)
0583 {
0584 check_timers = true;
0585 }
0586 #endif
0587 else
0588 {
0589
0590
0591
0592 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
0593 if (!ops.is_enqueued(descriptor_data))
0594 {
0595 descriptor_data->set_ready_events(events[i].events);
0596 ops.push(descriptor_data);
0597 }
0598 else
0599 {
0600 descriptor_data->add_ready_events(events[i].events);
0601 }
0602 }
0603 }
0604
0605 if (check_timers)
0606 {
0607 mutex::scoped_lock common_lock(mutex_);
0608 timer_queues_.get_ready_timers(ops);
0609
0610 #if defined(BOOST_ASIO_HAS_TIMERFD)
0611 if (timer_fd_ != -1)
0612 {
0613 itimerspec new_timeout;
0614 itimerspec old_timeout;
0615 int flags = get_timeout(new_timeout);
0616 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
0617 }
0618 #endif
0619 }
0620 }
0621
0622 void epoll_reactor::interrupt()
0623 {
0624 epoll_event ev = { 0, { 0 } };
0625 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0626 ev.data.ptr = &interrupter_;
0627 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
0628 }
0629
0630 int epoll_reactor::do_epoll_create()
0631 {
0632 #if defined(EPOLL_CLOEXEC)
0633 int fd = epoll_create1(EPOLL_CLOEXEC);
0634 #else
0635 int fd = -1;
0636 errno = EINVAL;
0637 #endif
0638
0639 if (fd == -1 && (errno == EINVAL || errno == ENOSYS))
0640 {
0641 fd = epoll_create(epoll_size);
0642 if (fd != -1)
0643 ::fcntl(fd, F_SETFD, FD_CLOEXEC);
0644 }
0645
0646 if (fd == -1)
0647 {
0648 boost::system::error_code ec(errno,
0649 boost::asio::error::get_system_category());
0650 boost::asio::detail::throw_error(ec, "epoll");
0651 }
0652
0653 return fd;
0654 }
0655
0656 int epoll_reactor::do_timerfd_create()
0657 {
0658 #if defined(BOOST_ASIO_HAS_TIMERFD)
0659 # if defined(TFD_CLOEXEC)
0660 int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
0661 # else
0662 int fd = -1;
0663 errno = EINVAL;
0664 # endif
0665
0666 if (fd == -1 && errno == EINVAL)
0667 {
0668 fd = timerfd_create(CLOCK_MONOTONIC, 0);
0669 if (fd != -1)
0670 ::fcntl(fd, F_SETFD, FD_CLOEXEC);
0671 }
0672
0673 return fd;
0674 #else
0675 return -1;
0676 #endif
0677 }
0678
0679 epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
0680 {
0681 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0682 return registered_descriptors_.alloc(io_locking_, io_locking_spin_count_);
0683 }
0684
0685 void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s)
0686 {
0687 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0688 registered_descriptors_.free(s);
0689 }
0690
0691 void epoll_reactor::do_add_timer_queue(timer_queue_base& queue)
0692 {
0693 mutex::scoped_lock lock(mutex_);
0694 timer_queues_.insert(&queue);
0695 }
0696
0697 void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue)
0698 {
0699 mutex::scoped_lock lock(mutex_);
0700 timer_queues_.erase(&queue);
0701 }
0702
0703 void epoll_reactor::update_timeout()
0704 {
0705 #if defined(BOOST_ASIO_HAS_TIMERFD)
0706 if (timer_fd_ != -1)
0707 {
0708 itimerspec new_timeout;
0709 itimerspec old_timeout;
0710 int flags = get_timeout(new_timeout);
0711 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
0712 return;
0713 }
0714 #endif
0715 interrupt();
0716 }
0717
0718 int epoll_reactor::get_timeout(int msec)
0719 {
0720
0721
0722 const int max_msec = 5 * 60 * 1000;
0723 return timer_queues_.wait_duration_msec(
0724 (msec < 0 || max_msec < msec) ? max_msec : msec);
0725 }
0726
0727 #if defined(BOOST_ASIO_HAS_TIMERFD)
0728 int epoll_reactor::get_timeout(itimerspec& ts)
0729 {
0730 ts.it_interval.tv_sec = 0;
0731 ts.it_interval.tv_nsec = 0;
0732
0733 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
0734 ts.it_value.tv_sec = usec / 1000000;
0735 ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
0736
0737 return usec ? 0 : TFD_TIMER_ABSTIME;
0738 }
0739 #endif
0740
0741 struct epoll_reactor::perform_io_cleanup_on_block_exit
0742 {
0743 explicit perform_io_cleanup_on_block_exit(epoll_reactor* r)
0744 : reactor_(r), first_op_(0)
0745 {
0746 }
0747
0748 ~perform_io_cleanup_on_block_exit()
0749 {
0750 if (first_op_)
0751 {
0752
0753 if (!ops_.empty())
0754 reactor_->scheduler_.post_deferred_completions(ops_);
0755
0756
0757
0758
0759 }
0760 else
0761 {
0762
0763
0764
0765 reactor_->scheduler_.compensating_work_started();
0766 }
0767 }
0768
0769 epoll_reactor* reactor_;
0770 op_queue<operation> ops_;
0771 operation* first_op_;
0772 };
0773
0774 epoll_reactor::descriptor_state::descriptor_state(bool locking, int spin_count)
0775 : operation(&epoll_reactor::descriptor_state::do_complete),
0776 mutex_(locking, spin_count)
0777 {
0778 }
0779
0780 operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
0781 {
0782 mutex_.lock();
0783 perform_io_cleanup_on_block_exit io_cleanup(reactor_);
0784 mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock);
0785
0786
0787
0788 static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI };
0789 for (int j = max_ops - 1; j >= 0; --j)
0790 {
0791 if (events & (flag[j] | EPOLLERR | EPOLLHUP))
0792 {
0793 try_speculative_[j] = true;
0794 while (reactor_op* op = op_queue_[j].front())
0795 {
0796 if (reactor_op::status status = op->perform())
0797 {
0798 op_queue_[j].pop();
0799 io_cleanup.ops_.push(op);
0800 if (status == reactor_op::done_and_exhausted)
0801 {
0802 try_speculative_[j] = false;
0803 break;
0804 }
0805 }
0806 else
0807 break;
0808 }
0809 }
0810 }
0811
0812
0813
0814 io_cleanup.first_op_ = io_cleanup.ops_.front();
0815 io_cleanup.ops_.pop();
0816 return io_cleanup.first_op_;
0817 }
0818
0819 void epoll_reactor::descriptor_state::do_complete(
0820 void* owner, operation* base,
0821 const boost::system::error_code& ec, std::size_t bytes_transferred)
0822 {
0823 if (owner)
0824 {
0825 descriptor_state* descriptor_data = static_cast<descriptor_state*>(base);
0826 uint32_t events = static_cast<uint32_t>(bytes_transferred);
0827 if (operation* op = descriptor_data->perform_io(events))
0828 {
0829 op->complete(owner, ec, 0);
0830 }
0831 }
0832 }
0833
0834 }
0835 }
0836 }
0837
0838 #include <boost/asio/detail/pop_options.hpp>
0839
0840 #endif
0841
0842 #endif