Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:28:32

0001 //
0002 // detail/impl/kqueue_reactor.ipp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 // Copyright (c) 2005 Stefan Arentz (stefan at soze dot com)
0007 //
0008 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0009 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0010 //
0011 
0012 #ifndef BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
0013 #define BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
0014 
0015 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0016 # pragma once
0017 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0018 
0019 #include <boost/asio/detail/config.hpp>
0020 
0021 #if defined(BOOST_ASIO_HAS_KQUEUE)
0022 
0023 #include <boost/asio/detail/kqueue_reactor.hpp>
0024 #include <boost/asio/detail/scheduler.hpp>
0025 #include <boost/asio/detail/throw_error.hpp>
0026 #include <boost/asio/error.hpp>
0027 
0028 #if defined(__NetBSD__)
0029 # include <sys/param.h>
0030 #endif
0031 
0032 #include <boost/asio/detail/push_options.hpp>
0033 
0034 #if defined(__NetBSD__) && __NetBSD_Version__ < 999001500
0035 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
0036     EV_SET(ev, ident, filt, flags, fflags, data, \
0037       reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
0038 #else
0039 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
0040     EV_SET(ev, ident, filt, flags, fflags, data, udata)
0041 #endif
0042 
0043 namespace boost {
0044 namespace asio {
0045 namespace detail {
0046 
0047 kqueue_reactor::kqueue_reactor(boost::asio::execution_context& ctx)
0048   : execution_context_service_base<kqueue_reactor>(ctx),
0049     scheduler_(use_service<scheduler>(ctx)),
0050     mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
0051           REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
0052     kqueue_fd_(do_kqueue_create()),
0053     interrupter_(),
0054     shutdown_(false),
0055     registered_descriptors_mutex_(mutex_.enabled())
0056 {
0057   struct kevent events[1];
0058   BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
0059       EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
0060   if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0061   {
0062     boost::system::error_code error(errno,
0063         boost::asio::error::get_system_category());
0064     boost::asio::detail::throw_error(error);
0065   }
0066 }
0067 
0068 kqueue_reactor::~kqueue_reactor()
0069 {
0070   close(kqueue_fd_);
0071 }
0072 
0073 void kqueue_reactor::shutdown()
0074 {
0075   mutex::scoped_lock lock(mutex_);
0076   shutdown_ = true;
0077   lock.unlock();
0078 
0079   op_queue<operation> ops;
0080 
0081   while (descriptor_state* state = registered_descriptors_.first())
0082   {
0083     for (int i = 0; i < max_ops; ++i)
0084       ops.push(state->op_queue_[i]);
0085     state->shutdown_ = true;
0086     registered_descriptors_.free(state);
0087   }
0088 
0089   timer_queues_.get_all_timers(ops);
0090 
0091   scheduler_.abandon_operations(ops);
0092 }
0093 
0094 void kqueue_reactor::notify_fork(
0095     boost::asio::execution_context::fork_event fork_ev)
0096 {
0097   if (fork_ev == boost::asio::execution_context::fork_child)
0098   {
0099     // The kqueue descriptor is automatically closed in the child.
0100     kqueue_fd_ = -1;
0101     kqueue_fd_ = do_kqueue_create();
0102 
0103     interrupter_.recreate();
0104 
0105     struct kevent events[2];
0106     BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
0107         EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
0108     if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0109     {
0110       boost::system::error_code ec(errno,
0111           boost::asio::error::get_system_category());
0112       boost::asio::detail::throw_error(ec, "kqueue interrupter registration");
0113     }
0114 
0115     // Re-register all descriptors with kqueue.
0116     mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0117     for (descriptor_state* state = registered_descriptors_.first();
0118         state != 0; state = state->next_)
0119     {
0120       if (state->num_kevents_ > 0)
0121       {
0122         BOOST_ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_,
0123             EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
0124         BOOST_ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
0125             EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
0126         if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1)
0127         {
0128           boost::system::error_code ec(errno,
0129               boost::asio::error::get_system_category());
0130           boost::asio::detail::throw_error(ec, "kqueue re-registration");
0131         }
0132       }
0133     }
0134   }
0135 }
0136 
0137 void kqueue_reactor::init_task()
0138 {
0139   scheduler_.init_task();
0140 }
0141 
0142 int kqueue_reactor::register_descriptor(socket_type descriptor,
0143     kqueue_reactor::per_descriptor_data& descriptor_data)
0144 {
0145   descriptor_data = allocate_descriptor_state();
0146 
0147   BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0148         context(), static_cast<uintmax_t>(descriptor),
0149         reinterpret_cast<uintmax_t>(descriptor_data)));
0150 
0151   mutex::scoped_lock lock(descriptor_data->mutex_);
0152 
0153   descriptor_data->descriptor_ = descriptor;
0154   descriptor_data->num_kevents_ = 0;
0155   descriptor_data->shutdown_ = false;
0156 
0157   return 0;
0158 }
0159 
0160 int kqueue_reactor::register_internal_descriptor(
0161     int op_type, socket_type descriptor,
0162     kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
0163 {
0164   descriptor_data = allocate_descriptor_state();
0165 
0166   BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0167         context(), static_cast<uintmax_t>(descriptor),
0168         reinterpret_cast<uintmax_t>(descriptor_data)));
0169 
0170   mutex::scoped_lock lock(descriptor_data->mutex_);
0171 
0172   descriptor_data->descriptor_ = descriptor;
0173   descriptor_data->num_kevents_ = 1;
0174   descriptor_data->shutdown_ = false;
0175   descriptor_data->op_queue_[op_type].push(op);
0176 
0177   struct kevent events[1];
0178   BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0179       EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0180   if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0181     return errno;
0182 
0183   return 0;
0184 }
0185 
0186 void kqueue_reactor::move_descriptor(socket_type,
0187     kqueue_reactor::per_descriptor_data& target_descriptor_data,
0188     kqueue_reactor::per_descriptor_data& source_descriptor_data)
0189 {
0190   target_descriptor_data = source_descriptor_data;
0191   source_descriptor_data = 0;
0192 }
0193 
0194 void kqueue_reactor::call_post_immediate_completion(
0195     operation* op, bool is_continuation, const void* self)
0196 {
0197   static_cast<const kqueue_reactor*>(self)->post_immediate_completion(
0198       op, is_continuation);
0199 }
0200 
0201 void kqueue_reactor::start_op(int op_type, socket_type descriptor,
0202     kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
0203     bool is_continuation, bool allow_speculative,
0204     void (*on_immediate)(operation*, bool, const void*),
0205     const void* immediate_arg)
0206 {
0207   if (!descriptor_data)
0208   {
0209     op->ec_ = boost::asio::error::bad_descriptor;
0210     on_immediate(op, is_continuation, immediate_arg);
0211     return;
0212   }
0213 
0214   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0215 
0216   if (descriptor_data->shutdown_)
0217   {
0218     on_immediate(op, is_continuation, immediate_arg);
0219     return;
0220   }
0221 
0222   if (descriptor_data->op_queue_[op_type].empty())
0223   {
0224     static const int num_kevents[max_ops] = { 1, 2, 1 };
0225 
0226     if (allow_speculative
0227         && (op_type != read_op
0228           || descriptor_data->op_queue_[except_op].empty()))
0229     {
0230       if (op->perform())
0231       {
0232         descriptor_lock.unlock();
0233         on_immediate(op, is_continuation, immediate_arg);
0234         return;
0235       }
0236 
0237       if (descriptor_data->num_kevents_ < num_kevents[op_type])
0238       {
0239         struct kevent events[2];
0240         BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0241             EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0242         BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
0243             EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0244         if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1)
0245         {
0246           descriptor_data->num_kevents_ = num_kevents[op_type];
0247         }
0248         else
0249         {
0250           op->ec_ = boost::system::error_code(errno,
0251               boost::asio::error::get_system_category());
0252           on_immediate(op, is_continuation, immediate_arg);
0253           return;
0254         }
0255       }
0256     }
0257     else
0258     {
0259       if (descriptor_data->num_kevents_ < num_kevents[op_type])
0260         descriptor_data->num_kevents_ = num_kevents[op_type];
0261 
0262       struct kevent events[2];
0263       BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0264           EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0265       BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
0266           EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0267       ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0268     }
0269   }
0270 
0271   descriptor_data->op_queue_[op_type].push(op);
0272   scheduler_.work_started();
0273 }
0274 
0275 void kqueue_reactor::cancel_ops(socket_type,
0276     kqueue_reactor::per_descriptor_data& descriptor_data)
0277 {
0278   if (!descriptor_data)
0279     return;
0280 
0281   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0282 
0283   op_queue<operation> ops;
0284   for (int i = 0; i < max_ops; ++i)
0285   {
0286     while (reactor_op* op = descriptor_data->op_queue_[i].front())
0287     {
0288       op->ec_ = boost::asio::error::operation_aborted;
0289       descriptor_data->op_queue_[i].pop();
0290       ops.push(op);
0291     }
0292   }
0293 
0294   descriptor_lock.unlock();
0295 
0296   scheduler_.post_deferred_completions(ops);
0297 }
0298 
0299 void kqueue_reactor::cancel_ops_by_key(socket_type,
0300     kqueue_reactor::per_descriptor_data& descriptor_data,
0301     int op_type, void* cancellation_key)
0302 {
0303   if (!descriptor_data)
0304     return;
0305 
0306   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0307 
0308   op_queue<operation> ops;
0309   op_queue<reactor_op> other_ops;
0310   while (reactor_op* op = descriptor_data->op_queue_[op_type].front())
0311   {
0312     descriptor_data->op_queue_[op_type].pop();
0313     if (op->cancellation_key_ == cancellation_key)
0314     {
0315       op->ec_ = boost::asio::error::operation_aborted;
0316       ops.push(op);
0317     }
0318     else
0319       other_ops.push(op);
0320   }
0321   descriptor_data->op_queue_[op_type].push(other_ops);
0322 
0323   descriptor_lock.unlock();
0324 
0325   scheduler_.post_deferred_completions(ops);
0326 }
0327 
0328 void kqueue_reactor::deregister_descriptor(socket_type descriptor,
0329     kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
0330 {
0331   if (!descriptor_data)
0332     return;
0333 
0334   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0335 
0336   if (!descriptor_data->shutdown_)
0337   {
0338     if (closing)
0339     {
0340       // The descriptor will be automatically removed from the kqueue when it
0341       // is closed.
0342     }
0343     else
0344     {
0345       struct kevent events[2];
0346       BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
0347           EVFILT_READ, EV_DELETE, 0, 0, 0);
0348       BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
0349           EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0350       ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0351     }
0352 
0353     op_queue<operation> ops;
0354     for (int i = 0; i < max_ops; ++i)
0355     {
0356       while (reactor_op* op = descriptor_data->op_queue_[i].front())
0357       {
0358         op->ec_ = boost::asio::error::operation_aborted;
0359         descriptor_data->op_queue_[i].pop();
0360         ops.push(op);
0361       }
0362     }
0363 
0364     descriptor_data->descriptor_ = -1;
0365     descriptor_data->shutdown_ = true;
0366 
0367     descriptor_lock.unlock();
0368 
0369     BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0370           context(), static_cast<uintmax_t>(descriptor),
0371           reinterpret_cast<uintmax_t>(descriptor_data)));
0372 
0373     scheduler_.post_deferred_completions(ops);
0374 
0375     // Leave descriptor_data set so that it will be freed by the subsequent
0376     // call to cleanup_descriptor_data.
0377   }
0378   else
0379   {
0380     // We are shutting down, so prevent cleanup_descriptor_data from freeing
0381     // the descriptor_data object and let the destructor free it instead.
0382     descriptor_data = 0;
0383   }
0384 }
0385 
0386 void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
0387     kqueue_reactor::per_descriptor_data& descriptor_data)
0388 {
0389   if (!descriptor_data)
0390     return;
0391 
0392   mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0393 
0394   if (!descriptor_data->shutdown_)
0395   {
0396     struct kevent events[2];
0397     BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
0398         EVFILT_READ, EV_DELETE, 0, 0, 0);
0399     BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
0400         EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0401     ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0402 
0403     op_queue<operation> ops;
0404     for (int i = 0; i < max_ops; ++i)
0405       ops.push(descriptor_data->op_queue_[i]);
0406 
0407     descriptor_data->descriptor_ = -1;
0408     descriptor_data->shutdown_ = true;
0409 
0410     descriptor_lock.unlock();
0411 
0412     BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0413           context(), static_cast<uintmax_t>(descriptor),
0414           reinterpret_cast<uintmax_t>(descriptor_data)));
0415 
0416     // Leave descriptor_data set so that it will be freed by the subsequent
0417     // call to cleanup_descriptor_data.
0418   }
0419   else
0420   {
0421     // We are shutting down, so prevent cleanup_descriptor_data from freeing
0422     // the descriptor_data object and let the destructor free it instead.
0423     descriptor_data = 0;
0424   }
0425 }
0426 
0427 void kqueue_reactor::cleanup_descriptor_data(
0428     per_descriptor_data& descriptor_data)
0429 {
0430   if (descriptor_data)
0431   {
0432     free_descriptor_state(descriptor_data);
0433     descriptor_data = 0;
0434   }
0435 }
0436 
0437 void kqueue_reactor::run(long usec, op_queue<operation>& ops)
0438 {
0439   mutex::scoped_lock lock(mutex_);
0440 
0441   // Determine how long to block while waiting for events.
0442   timespec timeout_buf = { 0, 0 };
0443   timespec* timeout = usec ? get_timeout(usec, timeout_buf) : &timeout_buf;
0444 
0445   lock.unlock();
0446 
0447   // Block on the kqueue descriptor.
0448   struct kevent events[128];
0449   int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
0450 
0451 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0452   // Trace the waiting events.
0453   for (int i = 0; i < num_events; ++i)
0454   {
0455     void* ptr = reinterpret_cast<void*>(events[i].udata);
0456     if (ptr != &interrupter_)
0457     {
0458       unsigned event_mask = 0;
0459       switch (events[i].filter)
0460       {
0461       case EVFILT_READ:
0462         event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT;
0463         break;
0464       case EVFILT_WRITE:
0465         event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT;
0466         break;
0467       }
0468       if ((events[i].flags & (EV_ERROR | EV_OOBAND)) != 0)
0469         event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT;
0470       BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(),
0471             reinterpret_cast<uintmax_t>(ptr), event_mask));
0472     }
0473   }
0474 #endif // defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0475 
0476   // Dispatch the waiting events.
0477   for (int i = 0; i < num_events; ++i)
0478   {
0479     void* ptr = reinterpret_cast<void*>(events[i].udata);
0480     if (ptr == &interrupter_)
0481     {
0482       interrupter_.reset();
0483     }
0484     else
0485     {
0486       descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
0487       mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0488 
0489       if (events[i].filter == EVFILT_WRITE
0490           && descriptor_data->num_kevents_ == 2
0491           && descriptor_data->op_queue_[write_op].empty())
0492       {
0493         // Some descriptor types, like serial ports, don't seem to support
0494         // EV_CLEAR with EVFILT_WRITE. Since we have no pending write
0495         // operations we'll remove the EVFILT_WRITE registration here so that
0496         // we don't end up in a tight spin.
0497         struct kevent delete_events[1];
0498         BOOST_ASIO_KQUEUE_EV_SET(&delete_events[0],
0499             descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0500         ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0);
0501         descriptor_data->num_kevents_ = 1;
0502       }
0503 
0504       // Exception operations must be processed first to ensure that any
0505       // out-of-band data is read before normal data.
0506 #if defined(__NetBSD__)
0507       static const unsigned int filter[max_ops] =
0508 #else
0509       static const int filter[max_ops] =
0510 #endif
0511         { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
0512       for (int j = max_ops - 1; j >= 0; --j)
0513       {
0514         if (events[i].filter == filter[j])
0515         {
0516           if (j != except_op || events[i].flags & EV_OOBAND)
0517           {
0518             while (reactor_op* op = descriptor_data->op_queue_[j].front())
0519             {
0520               if (events[i].flags & EV_ERROR)
0521               {
0522                 op->ec_ = boost::system::error_code(
0523                     static_cast<int>(events[i].data),
0524                     boost::asio::error::get_system_category());
0525                 descriptor_data->op_queue_[j].pop();
0526                 ops.push(op);
0527               }
0528               if (op->perform())
0529               {
0530                 descriptor_data->op_queue_[j].pop();
0531                 ops.push(op);
0532               }
0533               else
0534                 break;
0535             }
0536           }
0537         }
0538       }
0539     }
0540   }
0541 
0542   lock.lock();
0543   timer_queues_.get_ready_timers(ops);
0544 }
0545 
0546 void kqueue_reactor::interrupt()
0547 {
0548   interrupter_.interrupt();
0549 }
0550 
0551 int kqueue_reactor::do_kqueue_create()
0552 {
0553   int fd = ::kqueue();
0554   if (fd == -1)
0555   {
0556     boost::system::error_code ec(errno,
0557         boost::asio::error::get_system_category());
0558     boost::asio::detail::throw_error(ec, "kqueue");
0559   }
0560   return fd;
0561 }
0562 
0563 kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
0564 {
0565   mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0566   return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
0567         REACTOR_IO, scheduler_.concurrency_hint()));
0568 }
0569 
0570 void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
0571 {
0572   mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0573   registered_descriptors_.free(s);
0574 }
0575 
0576 void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
0577 {
0578   mutex::scoped_lock lock(mutex_);
0579   timer_queues_.insert(&queue);
0580 }
0581 
0582 void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
0583 {
0584   mutex::scoped_lock lock(mutex_);
0585   timer_queues_.erase(&queue);
0586 }
0587 
0588 timespec* kqueue_reactor::get_timeout(long usec, timespec& ts)
0589 {
0590   // By default we will wait no longer than 5 minutes. This will ensure that
0591   // any changes to the system clock are detected after no longer than this.
0592   const long max_usec = 5 * 60 * 1000 * 1000;
0593   usec = timer_queues_.wait_duration_usec(
0594       (usec < 0 || max_usec < usec) ? max_usec : usec);
0595   ts.tv_sec = usec / 1000000;
0596   ts.tv_nsec = (usec % 1000000) * 1000;
0597   return &ts;
0598 }
0599 
0600 } // namespace detail
0601 } // namespace asio
0602 } // namespace boost
0603 
0604 #undef BOOST_ASIO_KQUEUE_EV_SET
0605 
0606 #include <boost/asio/detail/pop_options.hpp>
0607 
0608 #endif // defined(BOOST_ASIO_HAS_KQUEUE)
0609 
0610 #endif // BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP