File indexing completed on 2025-01-18 09:28:31
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/config.hpp>
0019
0020 #if defined(BOOST_ASIO_HAS_EPOLL)
0021
0022 #include <cstddef>
0023 #include <sys/epoll.h>
0024 #include <boost/asio/detail/epoll_reactor.hpp>
0025 #include <boost/asio/detail/scheduler.hpp>
0026 #include <boost/asio/detail/throw_error.hpp>
0027 #include <boost/asio/error.hpp>
0028
0029 #if defined(BOOST_ASIO_HAS_TIMERFD)
0030 # include <sys/timerfd.h>
0031 #endif
0032
0033 #include <boost/asio/detail/push_options.hpp>
0034
0035 namespace boost {
0036 namespace asio {
0037 namespace detail {
0038
0039 epoll_reactor::epoll_reactor(boost::asio::execution_context& ctx)
0040 : execution_context_service_base<epoll_reactor>(ctx),
0041 scheduler_(use_service<scheduler>(ctx)),
0042 mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
0043 REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
0044 interrupter_(),
0045 epoll_fd_(do_epoll_create()),
0046 timer_fd_(do_timerfd_create()),
0047 shutdown_(false),
0048 registered_descriptors_mutex_(mutex_.enabled())
0049 {
0050
0051 epoll_event ev = { 0, { 0 } };
0052 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0053 ev.data.ptr = &interrupter_;
0054 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
0055 interrupter_.interrupt();
0056
0057
0058 if (timer_fd_ != -1)
0059 {
0060 ev.events = EPOLLIN | EPOLLERR;
0061 ev.data.ptr = &timer_fd_;
0062 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
0063 }
0064 }
0065
0066 epoll_reactor::~epoll_reactor()
0067 {
0068 if (epoll_fd_ != -1)
0069 close(epoll_fd_);
0070 if (timer_fd_ != -1)
0071 close(timer_fd_);
0072 }
0073
0074 void epoll_reactor::shutdown()
0075 {
0076 mutex::scoped_lock lock(mutex_);
0077 shutdown_ = true;
0078 lock.unlock();
0079
0080 op_queue<operation> ops;
0081
0082 while (descriptor_state* state = registered_descriptors_.first())
0083 {
0084 for (int i = 0; i < max_ops; ++i)
0085 ops.push(state->op_queue_[i]);
0086 state->shutdown_ = true;
0087 registered_descriptors_.free(state);
0088 }
0089
0090 timer_queues_.get_all_timers(ops);
0091
0092 scheduler_.abandon_operations(ops);
0093 }
0094
0095 void epoll_reactor::notify_fork(
0096 boost::asio::execution_context::fork_event fork_ev)
0097 {
0098 if (fork_ev == boost::asio::execution_context::fork_child)
0099 {
0100 if (epoll_fd_ != -1)
0101 ::close(epoll_fd_);
0102 epoll_fd_ = -1;
0103 epoll_fd_ = do_epoll_create();
0104
0105 if (timer_fd_ != -1)
0106 ::close(timer_fd_);
0107 timer_fd_ = -1;
0108 timer_fd_ = do_timerfd_create();
0109
0110 interrupter_.recreate();
0111
0112
0113 epoll_event ev = { 0, { 0 } };
0114 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0115 ev.data.ptr = &interrupter_;
0116 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
0117 interrupter_.interrupt();
0118
0119
0120 if (timer_fd_ != -1)
0121 {
0122 ev.events = EPOLLIN | EPOLLERR;
0123 ev.data.ptr = &timer_fd_;
0124 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
0125 }
0126
0127 update_timeout();
0128
0129
0130 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0131 for (descriptor_state* state = registered_descriptors_.first();
0132 state != 0; state = state->next_)
0133 {
0134 ev.events = state->registered_events_;
0135 ev.data.ptr = state;
0136 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev);
0137 if (result != 0)
0138 {
0139 boost::system::error_code ec(errno,
0140 boost::asio::error::get_system_category());
0141 boost::asio::detail::throw_error(ec, "epoll re-registration");
0142 }
0143 }
0144 }
0145 }
0146
0147 void epoll_reactor::init_task()
0148 {
0149 scheduler_.init_task();
0150 }
0151
0152 int epoll_reactor::register_descriptor(socket_type descriptor,
0153 epoll_reactor::per_descriptor_data& descriptor_data)
0154 {
0155 descriptor_data = allocate_descriptor_state();
0156
0157 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0158 context(), static_cast<uintmax_t>(descriptor),
0159 reinterpret_cast<uintmax_t>(descriptor_data)));
0160
0161 {
0162 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0163
0164 descriptor_data->reactor_ = this;
0165 descriptor_data->descriptor_ = descriptor;
0166 descriptor_data->shutdown_ = false;
0167 for (int i = 0; i < max_ops; ++i)
0168 descriptor_data->try_speculative_[i] = true;
0169 }
0170
0171 epoll_event ev = { 0, { 0 } };
0172 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
0173 descriptor_data->registered_events_ = ev.events;
0174 ev.data.ptr = descriptor_data;
0175 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
0176 if (result != 0)
0177 {
0178 if (errno == EPERM)
0179 {
0180
0181
0182
0183
0184 descriptor_data->registered_events_ = 0;
0185 return 0;
0186 }
0187 return errno;
0188 }
0189
0190 return 0;
0191 }
0192
0193 int epoll_reactor::register_internal_descriptor(
0194 int op_type, socket_type descriptor,
0195 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
0196 {
0197 descriptor_data = allocate_descriptor_state();
0198
0199 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0200 context(), static_cast<uintmax_t>(descriptor),
0201 reinterpret_cast<uintmax_t>(descriptor_data)));
0202
0203 {
0204 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0205
0206 descriptor_data->reactor_ = this;
0207 descriptor_data->descriptor_ = descriptor;
0208 descriptor_data->shutdown_ = false;
0209 descriptor_data->op_queue_[op_type].push(op);
0210 for (int i = 0; i < max_ops; ++i)
0211 descriptor_data->try_speculative_[i] = true;
0212 }
0213
0214 epoll_event ev = { 0, { 0 } };
0215 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
0216 descriptor_data->registered_events_ = ev.events;
0217 ev.data.ptr = descriptor_data;
0218 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
0219 if (result != 0)
0220 return errno;
0221
0222 return 0;
0223 }
0224
0225 void epoll_reactor::move_descriptor(socket_type,
0226 epoll_reactor::per_descriptor_data& target_descriptor_data,
0227 epoll_reactor::per_descriptor_data& source_descriptor_data)
0228 {
0229 target_descriptor_data = source_descriptor_data;
0230 source_descriptor_data = 0;
0231 }
0232
0233 void epoll_reactor::call_post_immediate_completion(
0234 operation* op, bool is_continuation, const void* self)
0235 {
0236 static_cast<const epoll_reactor*>(self)->post_immediate_completion(
0237 op, is_continuation);
0238 }
0239
0240 void epoll_reactor::start_op(int op_type, socket_type descriptor,
0241 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
0242 bool is_continuation, bool allow_speculative,
0243 void (*on_immediate)(operation*, bool, const void*),
0244 const void* immediate_arg)
0245 {
0246 if (!descriptor_data)
0247 {
0248 op->ec_ = boost::asio::error::bad_descriptor;
0249 on_immediate(op, is_continuation, immediate_arg);
0250 return;
0251 }
0252
0253 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0254
0255 if (descriptor_data->shutdown_)
0256 {
0257 on_immediate(op, is_continuation, immediate_arg);
0258 return;
0259 }
0260
0261 if (descriptor_data->op_queue_[op_type].empty())
0262 {
0263 if (allow_speculative
0264 && (op_type != read_op
0265 || descriptor_data->op_queue_[except_op].empty()))
0266 {
0267 if (descriptor_data->try_speculative_[op_type])
0268 {
0269 if (reactor_op::status status = op->perform())
0270 {
0271 if (status == reactor_op::done_and_exhausted)
0272 if (descriptor_data->registered_events_ != 0)
0273 descriptor_data->try_speculative_[op_type] = false;
0274 descriptor_lock.unlock();
0275 on_immediate(op, is_continuation, immediate_arg);
0276 return;
0277 }
0278 }
0279
0280 if (descriptor_data->registered_events_ == 0)
0281 {
0282 op->ec_ = boost::asio::error::operation_not_supported;
0283 on_immediate(op, is_continuation, immediate_arg);
0284 return;
0285 }
0286
0287 if (op_type == write_op)
0288 {
0289 if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
0290 {
0291 epoll_event ev = { 0, { 0 } };
0292 ev.events = descriptor_data->registered_events_ | EPOLLOUT;
0293 ev.data.ptr = descriptor_data;
0294 if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
0295 {
0296 descriptor_data->registered_events_ |= ev.events;
0297 }
0298 else
0299 {
0300 op->ec_ = boost::system::error_code(errno,
0301 boost::asio::error::get_system_category());
0302 on_immediate(op, is_continuation, immediate_arg);
0303 return;
0304 }
0305 }
0306 }
0307 }
0308 else if (descriptor_data->registered_events_ == 0)
0309 {
0310 op->ec_ = boost::asio::error::operation_not_supported;
0311 on_immediate(op, is_continuation, immediate_arg);
0312 return;
0313 }
0314 else
0315 {
0316 if (op_type == write_op)
0317 {
0318 descriptor_data->registered_events_ |= EPOLLOUT;
0319 }
0320
0321 epoll_event ev = { 0, { 0 } };
0322 ev.events = descriptor_data->registered_events_;
0323 ev.data.ptr = descriptor_data;
0324 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
0325 }
0326 }
0327
0328 descriptor_data->op_queue_[op_type].push(op);
0329 scheduler_.work_started();
0330 }
0331
0332 void epoll_reactor::cancel_ops(socket_type,
0333 epoll_reactor::per_descriptor_data& descriptor_data)
0334 {
0335 if (!descriptor_data)
0336 return;
0337
0338 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0339
0340 op_queue<operation> ops;
0341 for (int i = 0; i < max_ops; ++i)
0342 {
0343 while (reactor_op* op = descriptor_data->op_queue_[i].front())
0344 {
0345 op->ec_ = boost::asio::error::operation_aborted;
0346 descriptor_data->op_queue_[i].pop();
0347 ops.push(op);
0348 }
0349 }
0350
0351 descriptor_lock.unlock();
0352
0353 scheduler_.post_deferred_completions(ops);
0354 }
0355
0356 void epoll_reactor::cancel_ops_by_key(socket_type,
0357 epoll_reactor::per_descriptor_data& descriptor_data,
0358 int op_type, void* cancellation_key)
0359 {
0360 if (!descriptor_data)
0361 return;
0362
0363 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0364
0365 op_queue<operation> ops;
0366 op_queue<reactor_op> other_ops;
0367 while (reactor_op* op = descriptor_data->op_queue_[op_type].front())
0368 {
0369 descriptor_data->op_queue_[op_type].pop();
0370 if (op->cancellation_key_ == cancellation_key)
0371 {
0372 op->ec_ = boost::asio::error::operation_aborted;
0373 ops.push(op);
0374 }
0375 else
0376 other_ops.push(op);
0377 }
0378 descriptor_data->op_queue_[op_type].push(other_ops);
0379
0380 descriptor_lock.unlock();
0381
0382 scheduler_.post_deferred_completions(ops);
0383 }
0384
0385 void epoll_reactor::deregister_descriptor(socket_type descriptor,
0386 epoll_reactor::per_descriptor_data& descriptor_data, bool closing)
0387 {
0388 if (!descriptor_data)
0389 return;
0390
0391 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0392
0393 if (!descriptor_data->shutdown_)
0394 {
0395 if (closing)
0396 {
0397
0398
0399 }
0400 else if (descriptor_data->registered_events_ != 0)
0401 {
0402 epoll_event ev = { 0, { 0 } };
0403 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
0404 }
0405
0406 op_queue<operation> ops;
0407 for (int i = 0; i < max_ops; ++i)
0408 {
0409 while (reactor_op* op = descriptor_data->op_queue_[i].front())
0410 {
0411 op->ec_ = boost::asio::error::operation_aborted;
0412 descriptor_data->op_queue_[i].pop();
0413 ops.push(op);
0414 }
0415 }
0416
0417 descriptor_data->descriptor_ = -1;
0418 descriptor_data->shutdown_ = true;
0419
0420 descriptor_lock.unlock();
0421
0422 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0423 context(), static_cast<uintmax_t>(descriptor),
0424 reinterpret_cast<uintmax_t>(descriptor_data)));
0425
0426 scheduler_.post_deferred_completions(ops);
0427
0428
0429
0430 }
0431 else
0432 {
0433
0434
0435 descriptor_data = 0;
0436 }
0437 }
0438
0439 void epoll_reactor::deregister_internal_descriptor(socket_type descriptor,
0440 epoll_reactor::per_descriptor_data& descriptor_data)
0441 {
0442 if (!descriptor_data)
0443 return;
0444
0445 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0446
0447 if (!descriptor_data->shutdown_)
0448 {
0449 epoll_event ev = { 0, { 0 } };
0450 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
0451
0452 op_queue<operation> ops;
0453 for (int i = 0; i < max_ops; ++i)
0454 ops.push(descriptor_data->op_queue_[i]);
0455
0456 descriptor_data->descriptor_ = -1;
0457 descriptor_data->shutdown_ = true;
0458
0459 descriptor_lock.unlock();
0460
0461 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0462 context(), static_cast<uintmax_t>(descriptor),
0463 reinterpret_cast<uintmax_t>(descriptor_data)));
0464
0465
0466
0467 }
0468 else
0469 {
0470
0471
0472 descriptor_data = 0;
0473 }
0474 }
0475
0476 void epoll_reactor::cleanup_descriptor_data(
0477 per_descriptor_data& descriptor_data)
0478 {
0479 if (descriptor_data)
0480 {
0481 free_descriptor_state(descriptor_data);
0482 descriptor_data = 0;
0483 }
0484 }
0485
0486 void epoll_reactor::run(long usec, op_queue<operation>& ops)
0487 {
0488
0489
0490
0491
0492
0493
0494
0495 int timeout;
0496 if (usec == 0)
0497 timeout = 0;
0498 else
0499 {
0500 timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
0501 if (timer_fd_ == -1)
0502 {
0503 mutex::scoped_lock lock(mutex_);
0504 timeout = get_timeout(timeout);
0505 }
0506 }
0507
0508
0509 epoll_event events[128];
0510 int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
0511
0512 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0513
0514 for (int i = 0; i < num_events; ++i)
0515 {
0516 void* ptr = events[i].data.ptr;
0517 if (ptr == &interrupter_)
0518 {
0519
0520 }
0521 # if defined(BOOST_ASIO_HAS_TIMERFD)
0522 else if (ptr == &timer_fd_)
0523 {
0524
0525 }
0526 # endif
0527 else
0528 {
0529 unsigned event_mask = 0;
0530 if ((events[i].events & EPOLLIN) != 0)
0531 event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT;
0532 if ((events[i].events & EPOLLOUT))
0533 event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT;
0534 if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0)
0535 event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT;
0536 BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(),
0537 reinterpret_cast<uintmax_t>(ptr), event_mask));
0538 }
0539 }
0540 #endif
0541
0542 #if defined(BOOST_ASIO_HAS_TIMERFD)
0543 bool check_timers = (timer_fd_ == -1);
0544 #else
0545 bool check_timers = true;
0546 #endif
0547
0548
0549 for (int i = 0; i < num_events; ++i)
0550 {
0551 void* ptr = events[i].data.ptr;
0552 if (ptr == &interrupter_)
0553 {
0554
0555
0556
0557
0558
0559 #if defined(BOOST_ASIO_HAS_TIMERFD)
0560 if (timer_fd_ == -1)
0561 check_timers = true;
0562 #else
0563 check_timers = true;
0564 #endif
0565 }
0566 #if defined(BOOST_ASIO_HAS_TIMERFD)
0567 else if (ptr == &timer_fd_)
0568 {
0569 check_timers = true;
0570 }
0571 #endif
0572 else
0573 {
0574
0575
0576
0577 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
0578 if (!ops.is_enqueued(descriptor_data))
0579 {
0580 descriptor_data->set_ready_events(events[i].events);
0581 ops.push(descriptor_data);
0582 }
0583 else
0584 {
0585 descriptor_data->add_ready_events(events[i].events);
0586 }
0587 }
0588 }
0589
0590 if (check_timers)
0591 {
0592 mutex::scoped_lock common_lock(mutex_);
0593 timer_queues_.get_ready_timers(ops);
0594
0595 #if defined(BOOST_ASIO_HAS_TIMERFD)
0596 if (timer_fd_ != -1)
0597 {
0598 itimerspec new_timeout;
0599 itimerspec old_timeout;
0600 int flags = get_timeout(new_timeout);
0601 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
0602 }
0603 #endif
0604 }
0605 }
0606
0607 void epoll_reactor::interrupt()
0608 {
0609 epoll_event ev = { 0, { 0 } };
0610 ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0611 ev.data.ptr = &interrupter_;
0612 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
0613 }
0614
0615 int epoll_reactor::do_epoll_create()
0616 {
0617 #if defined(EPOLL_CLOEXEC)
0618 int fd = epoll_create1(EPOLL_CLOEXEC);
0619 #else
0620 int fd = -1;
0621 errno = EINVAL;
0622 #endif
0623
0624 if (fd == -1 && (errno == EINVAL || errno == ENOSYS))
0625 {
0626 fd = epoll_create(epoll_size);
0627 if (fd != -1)
0628 ::fcntl(fd, F_SETFD, FD_CLOEXEC);
0629 }
0630
0631 if (fd == -1)
0632 {
0633 boost::system::error_code ec(errno,
0634 boost::asio::error::get_system_category());
0635 boost::asio::detail::throw_error(ec, "epoll");
0636 }
0637
0638 return fd;
0639 }
0640
0641 int epoll_reactor::do_timerfd_create()
0642 {
0643 #if defined(BOOST_ASIO_HAS_TIMERFD)
0644 # if defined(TFD_CLOEXEC)
0645 int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
0646 # else
0647 int fd = -1;
0648 errno = EINVAL;
0649 # endif
0650
0651 if (fd == -1 && errno == EINVAL)
0652 {
0653 fd = timerfd_create(CLOCK_MONOTONIC, 0);
0654 if (fd != -1)
0655 ::fcntl(fd, F_SETFD, FD_CLOEXEC);
0656 }
0657
0658 return fd;
0659 #else
0660 return -1;
0661 #endif
0662 }
0663
0664 epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
0665 {
0666 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0667 return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
0668 REACTOR_IO, scheduler_.concurrency_hint()));
0669 }
0670
0671 void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s)
0672 {
0673 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0674 registered_descriptors_.free(s);
0675 }
0676
0677 void epoll_reactor::do_add_timer_queue(timer_queue_base& queue)
0678 {
0679 mutex::scoped_lock lock(mutex_);
0680 timer_queues_.insert(&queue);
0681 }
0682
0683 void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue)
0684 {
0685 mutex::scoped_lock lock(mutex_);
0686 timer_queues_.erase(&queue);
0687 }
0688
0689 void epoll_reactor::update_timeout()
0690 {
0691 #if defined(BOOST_ASIO_HAS_TIMERFD)
0692 if (timer_fd_ != -1)
0693 {
0694 itimerspec new_timeout;
0695 itimerspec old_timeout;
0696 int flags = get_timeout(new_timeout);
0697 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
0698 return;
0699 }
0700 #endif
0701 interrupt();
0702 }
0703
0704 int epoll_reactor::get_timeout(int msec)
0705 {
0706
0707
0708 const int max_msec = 5 * 60 * 1000;
0709 return timer_queues_.wait_duration_msec(
0710 (msec < 0 || max_msec < msec) ? max_msec : msec);
0711 }
0712
0713 #if defined(BOOST_ASIO_HAS_TIMERFD)
0714 int epoll_reactor::get_timeout(itimerspec& ts)
0715 {
0716 ts.it_interval.tv_sec = 0;
0717 ts.it_interval.tv_nsec = 0;
0718
0719 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
0720 ts.it_value.tv_sec = usec / 1000000;
0721 ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
0722
0723 return usec ? 0 : TFD_TIMER_ABSTIME;
0724 }
0725 #endif
0726
0727 struct epoll_reactor::perform_io_cleanup_on_block_exit
0728 {
0729 explicit perform_io_cleanup_on_block_exit(epoll_reactor* r)
0730 : reactor_(r), first_op_(0)
0731 {
0732 }
0733
0734 ~perform_io_cleanup_on_block_exit()
0735 {
0736 if (first_op_)
0737 {
0738
0739 if (!ops_.empty())
0740 reactor_->scheduler_.post_deferred_completions(ops_);
0741
0742
0743
0744
0745 }
0746 else
0747 {
0748
0749
0750
0751 reactor_->scheduler_.compensating_work_started();
0752 }
0753 }
0754
0755 epoll_reactor* reactor_;
0756 op_queue<operation> ops_;
0757 operation* first_op_;
0758 };
0759
0760 epoll_reactor::descriptor_state::descriptor_state(bool locking)
0761 : operation(&epoll_reactor::descriptor_state::do_complete),
0762 mutex_(locking)
0763 {
0764 }
0765
0766 operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
0767 {
0768 mutex_.lock();
0769 perform_io_cleanup_on_block_exit io_cleanup(reactor_);
0770 mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock);
0771
0772
0773
0774 static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI };
0775 for (int j = max_ops - 1; j >= 0; --j)
0776 {
0777 if (events & (flag[j] | EPOLLERR | EPOLLHUP))
0778 {
0779 try_speculative_[j] = true;
0780 while (reactor_op* op = op_queue_[j].front())
0781 {
0782 if (reactor_op::status status = op->perform())
0783 {
0784 op_queue_[j].pop();
0785 io_cleanup.ops_.push(op);
0786 if (status == reactor_op::done_and_exhausted)
0787 {
0788 try_speculative_[j] = false;
0789 break;
0790 }
0791 }
0792 else
0793 break;
0794 }
0795 }
0796 }
0797
0798
0799
0800 io_cleanup.first_op_ = io_cleanup.ops_.front();
0801 io_cleanup.ops_.pop();
0802 return io_cleanup.first_op_;
0803 }
0804
0805 void epoll_reactor::descriptor_state::do_complete(
0806 void* owner, operation* base,
0807 const boost::system::error_code& ec, std::size_t bytes_transferred)
0808 {
0809 if (owner)
0810 {
0811 descriptor_state* descriptor_data = static_cast<descriptor_state*>(base);
0812 uint32_t events = static_cast<uint32_t>(bytes_transferred);
0813 if (operation* op = descriptor_data->perform_io(events))
0814 {
0815 op->complete(owner, ec, 0);
0816 }
0817 }
0818 }
0819
0820 }
0821 }
0822 }
0823
0824 #include <boost/asio/detail/pop_options.hpp>
0825
0826 #endif
0827
0828 #endif