Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:28:31

0001 //
0002 // detail/impl/epoll_reactor.ipp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
0012 #define BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/config.hpp>
0019 
0020 #if defined(BOOST_ASIO_HAS_EPOLL)
0021 
0022 #include <cstddef>
0023 #include <sys/epoll.h>
0024 #include <boost/asio/detail/epoll_reactor.hpp>
0025 #include <boost/asio/detail/scheduler.hpp>
0026 #include <boost/asio/detail/throw_error.hpp>
0027 #include <boost/asio/error.hpp>
0028 
0029 #if defined(BOOST_ASIO_HAS_TIMERFD)
0030 # include <sys/timerfd.h>
0031 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
0032 
0033 #include <boost/asio/detail/push_options.hpp>
0034 
0035 namespace boost {
0036 namespace asio {
0037 namespace detail {
0038 
0039 epoll_reactor::epoll_reactor(boost::asio::execution_context& ctx)
0040   : execution_context_service_base<epoll_reactor>(ctx),
0041     scheduler_(use_service<scheduler>(ctx)),
0042     mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
0043           REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
0044     interrupter_(),
0045     epoll_fd_(do_epoll_create()),
0046     timer_fd_(do_timerfd_create()),
0047     shutdown_(false),
0048     registered_descriptors_mutex_(mutex_.enabled())
0049 {
0050   // Add the interrupter's descriptor to epoll.
0051   epoll_event ev = { 0, { 0 } };
0052   ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0053   ev.data.ptr = &interrupter_;
0054   epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
0055   interrupter_.interrupt();
0056 
0057   // Add the timer descriptor to epoll.
0058   if (timer_fd_ != -1)
0059   {
0060     ev.events = EPOLLIN | EPOLLERR;
0061     ev.data.ptr = &timer_fd_;
0062     epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
0063   }
0064 }
0065 
0066 epoll_reactor::~epoll_reactor()
0067 {
0068   if (epoll_fd_ != -1)
0069     close(epoll_fd_);
0070   if (timer_fd_ != -1)
0071     close(timer_fd_);
0072 }
0073 
0074 void epoll_reactor::shutdown()
0075 {
0076   mutex::scoped_lock lock(mutex_);
0077   shutdown_ = true;
0078   lock.unlock();
0079 
0080   op_queue<operation> ops;
0081 
0082   while (descriptor_state* state = registered_descriptors_.first())
0083   {
0084     for (int i = 0; i < max_ops; ++i)
0085       ops.push(state->op_queue_[i]);
0086     state->shutdown_ = true;
0087     registered_descriptors_.free(state);
0088   }
0089 
0090   timer_queues_.get_all_timers(ops);
0091 
0092   scheduler_.abandon_operations(ops);
0093 }
0094 
0095 void epoll_reactor::notify_fork(
0096     boost::asio::execution_context::fork_event fork_ev)
0097 {
0098   if (fork_ev == boost::asio::execution_context::fork_child)
0099   {
0100     if (epoll_fd_ != -1)
0101       ::close(epoll_fd_);
0102     epoll_fd_ = -1;
0103     epoll_fd_ = do_epoll_create();
0104 
0105     if (timer_fd_ != -1)
0106       ::close(timer_fd_);
0107     timer_fd_ = -1;
0108     timer_fd_ = do_timerfd_create();
0109 
0110     interrupter_.recreate();
0111 
0112     // Add the interrupter's descriptor to epoll.
0113     epoll_event ev = { 0, { 0 } };
0114     ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0115     ev.data.ptr = &interrupter_;
0116     epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev);
0117     interrupter_.interrupt();
0118 
0119     // Add the timer descriptor to epoll.
0120     if (timer_fd_ != -1)
0121     {
0122       ev.events = EPOLLIN | EPOLLERR;
0123       ev.data.ptr = &timer_fd_;
0124       epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev);
0125     }
0126 
0127     update_timeout();
0128 
0129     // Re-register all descriptors with epoll.
0130     mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0131     for (descriptor_state* state = registered_descriptors_.first();
0132         state != 0; state = state->next_)
0133     {
0134       ev.events = state->registered_events_;
0135       ev.data.ptr = state;
0136       int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev);
0137       if (result != 0)
0138       {
0139         boost::system::error_code ec(errno,
0140             boost::asio::error::get_system_category());
0141         boost::asio::detail::throw_error(ec, "epoll re-registration");
0142       }
0143     }
0144   }
0145 }
0146 
0147 void epoll_reactor::init_task()
0148 {
0149   scheduler_.init_task();
0150 }
0151 
0152 int epoll_reactor::register_descriptor(socket_type descriptor,
0153     epoll_reactor::per_descriptor_data& descriptor_data)
0154 {
0155   descriptor_data = allocate_descriptor_state();
0156 
0157   BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0158         context(), static_cast<uintmax_t>(descriptor),
0159         reinterpret_cast<uintmax_t>(descriptor_data)));
0160 
0161   {
0162     mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0163 
0164     descriptor_data->reactor_ = this;
0165     descriptor_data->descriptor_ = descriptor;
0166     descriptor_data->shutdown_ = false;
0167     for (int i = 0; i < max_ops; ++i)
0168       descriptor_data->try_speculative_[i] = true;
0169   }
0170 
0171   epoll_event ev = { 0, { 0 } };
0172   ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
0173   descriptor_data->registered_events_ = ev.events;
0174   ev.data.ptr = descriptor_data;
0175   int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
0176   if (result != 0)
0177   {
0178     if (errno == EPERM)
0179     {
0180       // This file descriptor type is not supported by epoll. However, if it is
0181       // a regular file then operations on it will not block. We will allow
0182       // this descriptor to be used and fail later if an operation on it would
0183       // otherwise require a trip through the reactor.
0184       descriptor_data->registered_events_ = 0;
0185       return 0;
0186     }
0187     return errno;
0188   }
0189 
0190   return 0;
0191 }
0192 
0193 int epoll_reactor::register_internal_descriptor(
0194     int op_type, socket_type descriptor,
0195     epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
0196 {
0197   descriptor_data = allocate_descriptor_state();
0198 
0199   BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0200         context(), static_cast<uintmax_t>(descriptor),
0201         reinterpret_cast<uintmax_t>(descriptor_data)));
0202 
0203   {
0204     mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0205 
0206     descriptor_data->reactor_ = this;
0207     descriptor_data->descriptor_ = descriptor;
0208     descriptor_data->shutdown_ = false;
0209     descriptor_data->op_queue_[op_type].push(op);
0210     for (int i = 0; i < max_ops; ++i)
0211       descriptor_data->try_speculative_[i] = true;
0212   }
0213 
0214   epoll_event ev = { 0, { 0 } };
0215   ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
0216   descriptor_data->registered_events_ = ev.events;
0217   ev.data.ptr = descriptor_data;
0218   int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
0219   if (result != 0)
0220     return errno;
0221 
0222   return 0;
0223 }
0224 
0225 void epoll_reactor::move_descriptor(socket_type,
0226     epoll_reactor::per_descriptor_data& target_descriptor_data,
0227     epoll_reactor::per_descriptor_data& source_descriptor_data)
0228 {
0229   target_descriptor_data = source_descriptor_data;
0230   source_descriptor_data = 0;
0231 }
0232 
0233 void epoll_reactor::call_post_immediate_completion(
0234     operation* op, bool is_continuation, const void* self)
0235 {
0236   static_cast<const epoll_reactor*>(self)->post_immediate_completion(
0237       op, is_continuation);
0238 }
0239 
0240 void epoll_reactor::start_op(int op_type, socket_type descriptor,
0241     epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
0242     bool is_continuation, bool allow_speculative,
0243     void (*on_immediate)(operation*, bool, const void*),
0244     const void* immediate_arg)
0245 {
0246   if (!descriptor_data)
0247   {
0248     op->ec_ = boost::asio::error::bad_descriptor;
0249     on_immediate(op, is_continuation, immediate_arg);
0250     return;
0251   }
0252 
0253   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0254 
0255   if (descriptor_data->shutdown_)
0256   {
0257     on_immediate(op, is_continuation, immediate_arg);
0258     return;
0259   }
0260 
0261   if (descriptor_data->op_queue_[op_type].empty())
0262   {
0263     if (allow_speculative
0264         && (op_type != read_op
0265           || descriptor_data->op_queue_[except_op].empty()))
0266     {
0267       if (descriptor_data->try_speculative_[op_type])
0268       {
0269         if (reactor_op::status status = op->perform())
0270         {
0271           if (status == reactor_op::done_and_exhausted)
0272             if (descriptor_data->registered_events_ != 0)
0273               descriptor_data->try_speculative_[op_type] = false;
0274           descriptor_lock.unlock();
0275           on_immediate(op, is_continuation, immediate_arg);
0276           return;
0277         }
0278       }
0279 
0280       if (descriptor_data->registered_events_ == 0)
0281       {
0282         op->ec_ = boost::asio::error::operation_not_supported;
0283         on_immediate(op, is_continuation, immediate_arg);
0284         return;
0285       }
0286 
0287       if (op_type == write_op)
0288       {
0289         if ((descriptor_data->registered_events_ & EPOLLOUT) == 0)
0290         {
0291           epoll_event ev = { 0, { 0 } };
0292           ev.events = descriptor_data->registered_events_ | EPOLLOUT;
0293           ev.data.ptr = descriptor_data;
0294           if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
0295           {
0296             descriptor_data->registered_events_ |= ev.events;
0297           }
0298           else
0299           {
0300             op->ec_ = boost::system::error_code(errno,
0301                 boost::asio::error::get_system_category());
0302             on_immediate(op, is_continuation, immediate_arg);
0303             return;
0304           }
0305         }
0306       }
0307     }
0308     else if (descriptor_data->registered_events_ == 0)
0309     {
0310       op->ec_ = boost::asio::error::operation_not_supported;
0311       on_immediate(op, is_continuation, immediate_arg);
0312       return;
0313     }
0314     else
0315     {
0316       if (op_type == write_op)
0317       {
0318         descriptor_data->registered_events_ |= EPOLLOUT;
0319       }
0320 
0321       epoll_event ev = { 0, { 0 } };
0322       ev.events = descriptor_data->registered_events_;
0323       ev.data.ptr = descriptor_data;
0324       epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
0325     }
0326   }
0327 
0328   descriptor_data->op_queue_[op_type].push(op);
0329   scheduler_.work_started();
0330 }
0331 
0332 void epoll_reactor::cancel_ops(socket_type,
0333     epoll_reactor::per_descriptor_data& descriptor_data)
0334 {
0335   if (!descriptor_data)
0336     return;
0337 
0338   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0339 
0340   op_queue<operation> ops;
0341   for (int i = 0; i < max_ops; ++i)
0342   {
0343     while (reactor_op* op = descriptor_data->op_queue_[i].front())
0344     {
0345       op->ec_ = boost::asio::error::operation_aborted;
0346       descriptor_data->op_queue_[i].pop();
0347       ops.push(op);
0348     }
0349   }
0350 
0351   descriptor_lock.unlock();
0352 
0353   scheduler_.post_deferred_completions(ops);
0354 }
0355 
0356 void epoll_reactor::cancel_ops_by_key(socket_type,
0357     epoll_reactor::per_descriptor_data& descriptor_data,
0358     int op_type, void* cancellation_key)
0359 {
0360   if (!descriptor_data)
0361     return;
0362 
0363   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0364 
0365   op_queue<operation> ops;
0366   op_queue<reactor_op> other_ops;
0367   while (reactor_op* op = descriptor_data->op_queue_[op_type].front())
0368   {
0369     descriptor_data->op_queue_[op_type].pop();
0370     if (op->cancellation_key_ == cancellation_key)
0371     {
0372       op->ec_ = boost::asio::error::operation_aborted;
0373       ops.push(op);
0374     }
0375     else
0376       other_ops.push(op);
0377   }
0378   descriptor_data->op_queue_[op_type].push(other_ops);
0379 
0380   descriptor_lock.unlock();
0381 
0382   scheduler_.post_deferred_completions(ops);
0383 }
0384 
0385 void epoll_reactor::deregister_descriptor(socket_type descriptor,
0386     epoll_reactor::per_descriptor_data& descriptor_data, bool closing)
0387 {
0388   if (!descriptor_data)
0389     return;
0390 
0391   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0392 
0393   if (!descriptor_data->shutdown_)
0394   {
0395     if (closing)
0396     {
0397       // The descriptor will be automatically removed from the epoll set when
0398       // it is closed.
0399     }
0400     else if (descriptor_data->registered_events_ != 0)
0401     {
0402       epoll_event ev = { 0, { 0 } };
0403       epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
0404     }
0405 
0406     op_queue<operation> ops;
0407     for (int i = 0; i < max_ops; ++i)
0408     {
0409       while (reactor_op* op = descriptor_data->op_queue_[i].front())
0410       {
0411         op->ec_ = boost::asio::error::operation_aborted;
0412         descriptor_data->op_queue_[i].pop();
0413         ops.push(op);
0414       }
0415     }
0416 
0417     descriptor_data->descriptor_ = -1;
0418     descriptor_data->shutdown_ = true;
0419 
0420     descriptor_lock.unlock();
0421 
0422     BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0423           context(), static_cast<uintmax_t>(descriptor),
0424           reinterpret_cast<uintmax_t>(descriptor_data)));
0425 
0426     scheduler_.post_deferred_completions(ops);
0427 
0428     // Leave descriptor_data set so that it will be freed by the subsequent
0429     // call to cleanup_descriptor_data.
0430   }
0431   else
0432   {
0433     // We are shutting down, so prevent cleanup_descriptor_data from freeing
0434     // the descriptor_data object and let the destructor free it instead.
0435     descriptor_data = 0;
0436   }
0437 }
0438 
0439 void epoll_reactor::deregister_internal_descriptor(socket_type descriptor,
0440     epoll_reactor::per_descriptor_data& descriptor_data)
0441 {
0442   if (!descriptor_data)
0443     return;
0444 
0445   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0446 
0447   if (!descriptor_data->shutdown_)
0448   {
0449     epoll_event ev = { 0, { 0 } };
0450     epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev);
0451 
0452     op_queue<operation> ops;
0453     for (int i = 0; i < max_ops; ++i)
0454       ops.push(descriptor_data->op_queue_[i]);
0455 
0456     descriptor_data->descriptor_ = -1;
0457     descriptor_data->shutdown_ = true;
0458 
0459     descriptor_lock.unlock();
0460 
0461     BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0462           context(), static_cast<uintmax_t>(descriptor),
0463           reinterpret_cast<uintmax_t>(descriptor_data)));
0464 
0465     // Leave descriptor_data set so that it will be freed by the subsequent
0466     // call to cleanup_descriptor_data.
0467   }
0468   else
0469   {
0470     // We are shutting down, so prevent cleanup_descriptor_data from freeing
0471     // the descriptor_data object and let the destructor free it instead.
0472     descriptor_data = 0;
0473   }
0474 }
0475 
0476 void epoll_reactor::cleanup_descriptor_data(
0477     per_descriptor_data& descriptor_data)
0478 {
0479   if (descriptor_data)
0480   {
0481     free_descriptor_state(descriptor_data);
0482     descriptor_data = 0;
0483   }
0484 }
0485 
0486 void epoll_reactor::run(long usec, op_queue<operation>& ops)
0487 {
0488   // This code relies on the fact that the scheduler queues the reactor task
0489   // behind all descriptor operations generated by this function. This means,
0490   // that by the time we reach this point, any previously returned descriptor
0491   // operations have already been dequeued. Therefore it is now safe for us to
0492   // reuse and return them for the scheduler to queue again.
0493 
0494   // Calculate timeout. Check the timer queues only if timerfd is not in use.
0495   int timeout;
0496   if (usec == 0)
0497     timeout = 0;
0498   else
0499   {
0500     timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
0501     if (timer_fd_ == -1)
0502     {
0503       mutex::scoped_lock lock(mutex_);
0504       timeout = get_timeout(timeout);
0505     }
0506   }
0507 
0508   // Block on the epoll descriptor.
0509   epoll_event events[128];
0510   int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
0511 
0512 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0513   // Trace the waiting events.
0514   for (int i = 0; i < num_events; ++i)
0515   {
0516     void* ptr = events[i].data.ptr;
0517     if (ptr == &interrupter_)
0518     {
0519       // Ignore.
0520     }
0521 # if defined(BOOST_ASIO_HAS_TIMERFD)
0522     else if (ptr == &timer_fd_)
0523     {
0524       // Ignore.
0525     }
0526 # endif // defined(BOOST_ASIO_HAS_TIMERFD)
0527     else
0528     {
0529       unsigned event_mask = 0;
0530       if ((events[i].events & EPOLLIN) != 0)
0531         event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT;
0532       if ((events[i].events & EPOLLOUT))
0533         event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT;
0534       if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0)
0535         event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT;
0536       BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(),
0537             reinterpret_cast<uintmax_t>(ptr), event_mask));
0538     }
0539   }
0540 #endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0541 
0542 #if defined(BOOST_ASIO_HAS_TIMERFD)
0543   bool check_timers = (timer_fd_ == -1);
0544 #else // defined(BOOST_ASIO_HAS_TIMERFD)
0545   bool check_timers = true;
0546 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
0547 
0548   // Dispatch the waiting events.
0549   for (int i = 0; i < num_events; ++i)
0550   {
0551     void* ptr = events[i].data.ptr;
0552     if (ptr == &interrupter_)
0553     {
0554       // No need to reset the interrupter since we're leaving the descriptor
0555       // in a ready-to-read state and relying on edge-triggered notifications
0556       // to make it so that we only get woken up when the descriptor's epoll
0557       // registration is updated.
0558 
0559 #if defined(BOOST_ASIO_HAS_TIMERFD)
0560       if (timer_fd_ == -1)
0561         check_timers = true;
0562 #else // defined(BOOST_ASIO_HAS_TIMERFD)
0563       check_timers = true;
0564 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
0565     }
0566 #if defined(BOOST_ASIO_HAS_TIMERFD)
0567     else if (ptr == &timer_fd_)
0568     {
0569       check_timers = true;
0570     }
0571 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
0572     else
0573     {
0574       // The descriptor operation doesn't count as work in and of itself, so we
0575       // don't call work_started() here. This still allows the scheduler to
0576       // stop if the only remaining operations are descriptor operations.
0577       descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
0578       if (!ops.is_enqueued(descriptor_data))
0579       {
0580         descriptor_data->set_ready_events(events[i].events);
0581         ops.push(descriptor_data);
0582       }
0583       else
0584       {
0585         descriptor_data->add_ready_events(events[i].events);
0586       }
0587     }
0588   }
0589 
0590   if (check_timers)
0591   {
0592     mutex::scoped_lock common_lock(mutex_);
0593     timer_queues_.get_ready_timers(ops);
0594 
0595 #if defined(BOOST_ASIO_HAS_TIMERFD)
0596     if (timer_fd_ != -1)
0597     {
0598       itimerspec new_timeout;
0599       itimerspec old_timeout;
0600       int flags = get_timeout(new_timeout);
0601       timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
0602     }
0603 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
0604   }
0605 }
0606 
0607 void epoll_reactor::interrupt()
0608 {
0609   epoll_event ev = { 0, { 0 } };
0610   ev.events = EPOLLIN | EPOLLERR | EPOLLET;
0611   ev.data.ptr = &interrupter_;
0612   epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev);
0613 }
0614 
0615 int epoll_reactor::do_epoll_create()
0616 {
0617 #if defined(EPOLL_CLOEXEC)
0618   int fd = epoll_create1(EPOLL_CLOEXEC);
0619 #else // defined(EPOLL_CLOEXEC)
0620   int fd = -1;
0621   errno = EINVAL;
0622 #endif // defined(EPOLL_CLOEXEC)
0623 
0624   if (fd == -1 && (errno == EINVAL || errno == ENOSYS))
0625   {
0626     fd = epoll_create(epoll_size);
0627     if (fd != -1)
0628       ::fcntl(fd, F_SETFD, FD_CLOEXEC);
0629   }
0630 
0631   if (fd == -1)
0632   {
0633     boost::system::error_code ec(errno,
0634         boost::asio::error::get_system_category());
0635     boost::asio::detail::throw_error(ec, "epoll");
0636   }
0637 
0638   return fd;
0639 }
0640 
0641 int epoll_reactor::do_timerfd_create()
0642 {
0643 #if defined(BOOST_ASIO_HAS_TIMERFD)
0644 # if defined(TFD_CLOEXEC)
0645   int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
0646 # else // defined(TFD_CLOEXEC)
0647   int fd = -1;
0648   errno = EINVAL;
0649 # endif // defined(TFD_CLOEXEC)
0650 
0651   if (fd == -1 && errno == EINVAL)
0652   {
0653     fd = timerfd_create(CLOCK_MONOTONIC, 0);
0654     if (fd != -1)
0655       ::fcntl(fd, F_SETFD, FD_CLOEXEC);
0656   }
0657 
0658   return fd;
0659 #else // defined(BOOST_ASIO_HAS_TIMERFD)
0660   return -1;
0661 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
0662 }
0663 
0664 epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
0665 {
0666   mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0667   return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
0668         REACTOR_IO, scheduler_.concurrency_hint()));
0669 }
0670 
0671 void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s)
0672 {
0673   mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0674   registered_descriptors_.free(s);
0675 }
0676 
0677 void epoll_reactor::do_add_timer_queue(timer_queue_base& queue)
0678 {
0679   mutex::scoped_lock lock(mutex_);
0680   timer_queues_.insert(&queue);
0681 }
0682 
0683 void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue)
0684 {
0685   mutex::scoped_lock lock(mutex_);
0686   timer_queues_.erase(&queue);
0687 }
0688 
0689 void epoll_reactor::update_timeout()
0690 {
0691 #if defined(BOOST_ASIO_HAS_TIMERFD)
0692   if (timer_fd_ != -1)
0693   {
0694     itimerspec new_timeout;
0695     itimerspec old_timeout;
0696     int flags = get_timeout(new_timeout);
0697     timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout);
0698     return;
0699   }
0700 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
0701   interrupt();
0702 }
0703 
0704 int epoll_reactor::get_timeout(int msec)
0705 {
0706   // By default we will wait no longer than 5 minutes. This will ensure that
0707   // any changes to the system clock are detected after no longer than this.
0708   const int max_msec = 5 * 60 * 1000;
0709   return timer_queues_.wait_duration_msec(
0710       (msec < 0 || max_msec < msec) ? max_msec : msec);
0711 }
0712 
0713 #if defined(BOOST_ASIO_HAS_TIMERFD)
0714 int epoll_reactor::get_timeout(itimerspec& ts)
0715 {
0716   ts.it_interval.tv_sec = 0;
0717   ts.it_interval.tv_nsec = 0;
0718 
0719   long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000);
0720   ts.it_value.tv_sec = usec / 1000000;
0721   ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1;
0722 
0723   return usec ? 0 : TFD_TIMER_ABSTIME;
0724 }
0725 #endif // defined(BOOST_ASIO_HAS_TIMERFD)
0726 
0727 struct epoll_reactor::perform_io_cleanup_on_block_exit
0728 {
0729   explicit perform_io_cleanup_on_block_exit(epoll_reactor* r)
0730     : reactor_(r), first_op_(0)
0731   {
0732   }
0733 
0734   ~perform_io_cleanup_on_block_exit()
0735   {
0736     if (first_op_)
0737     {
0738       // Post the remaining completed operations for invocation.
0739       if (!ops_.empty())
0740         reactor_->scheduler_.post_deferred_completions(ops_);
0741 
0742       // A user-initiated operation has completed, but there's no need to
0743       // explicitly call work_finished() here. Instead, we'll take advantage of
0744       // the fact that the scheduler will call work_finished() once we return.
0745     }
0746     else
0747     {
0748       // No user-initiated operations have completed, so we need to compensate
0749       // for the work_finished() call that the scheduler will make once this
0750       // operation returns.
0751       reactor_->scheduler_.compensating_work_started();
0752     }
0753   }
0754 
0755   epoll_reactor* reactor_;
0756   op_queue<operation> ops_;
0757   operation* first_op_;
0758 };
0759 
0760 epoll_reactor::descriptor_state::descriptor_state(bool locking)
0761   : operation(&epoll_reactor::descriptor_state::do_complete),
0762     mutex_(locking)
0763 {
0764 }
0765 
0766 operation* epoll_reactor::descriptor_state::perform_io(uint32_t events)
0767 {
0768   mutex_.lock();
0769   perform_io_cleanup_on_block_exit io_cleanup(reactor_);
0770   mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock);
0771 
0772   // Exception operations must be processed first to ensure that any
0773   // out-of-band data is read before normal data.
0774   static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI };
0775   for (int j = max_ops - 1; j >= 0; --j)
0776   {
0777     if (events & (flag[j] | EPOLLERR | EPOLLHUP))
0778     {
0779       try_speculative_[j] = true;
0780       while (reactor_op* op = op_queue_[j].front())
0781       {
0782         if (reactor_op::status status = op->perform())
0783         {
0784           op_queue_[j].pop();
0785           io_cleanup.ops_.push(op);
0786           if (status == reactor_op::done_and_exhausted)
0787           {
0788             try_speculative_[j] = false;
0789             break;
0790           }
0791         }
0792         else
0793           break;
0794       }
0795     }
0796   }
0797 
0798   // The first operation will be returned for completion now. The others will
0799   // be posted for later by the io_cleanup object's destructor.
0800   io_cleanup.first_op_ = io_cleanup.ops_.front();
0801   io_cleanup.ops_.pop();
0802   return io_cleanup.first_op_;
0803 }
0804 
0805 void epoll_reactor::descriptor_state::do_complete(
0806     void* owner, operation* base,
0807     const boost::system::error_code& ec, std::size_t bytes_transferred)
0808 {
0809   if (owner)
0810   {
0811     descriptor_state* descriptor_data = static_cast<descriptor_state*>(base);
0812     uint32_t events = static_cast<uint32_t>(bytes_transferred);
0813     if (operation* op = descriptor_data->perform_io(events))
0814     {
0815       op->complete(owner, ec, 0);
0816     }
0817   }
0818 }
0819 
0820 } // namespace detail
0821 } // namespace asio
0822 } // namespace boost
0823 
0824 #include <boost/asio/detail/pop_options.hpp>
0825 
0826 #endif // defined(BOOST_ASIO_HAS_EPOLL)
0827 
0828 #endif // BOOST_ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP