Warning, file /include/boost/asio/detail/impl/kqueue_reactor.ipp was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #ifndef BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
0013 #define BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
0014
0015 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0016 # pragma once
0017 #endif
0018
0019 #include <boost/asio/detail/config.hpp>
0020
0021 #if defined(BOOST_ASIO_HAS_KQUEUE)
0022
0023 #include <boost/asio/config.hpp>
0024 #include <boost/asio/detail/kqueue_reactor.hpp>
0025 #include <boost/asio/detail/scheduler.hpp>
0026 #include <boost/asio/detail/throw_error.hpp>
0027 #include <boost/asio/error.hpp>
0028
0029 #if defined(__NetBSD__)
0030 # include <sys/param.h>
0031 #endif
0032
0033 #include <boost/asio/detail/push_options.hpp>
0034
0035 #if defined(__NetBSD__) && __NetBSD_Version__ < 999001500
0036 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
0037 EV_SET(ev, ident, filt, flags, fflags, data, \
0038 reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
0039 #else
0040 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
0041 EV_SET(ev, ident, filt, flags, fflags, data, udata)
0042 #endif
0043
0044 namespace boost {
0045 namespace asio {
0046 namespace detail {
0047
0048 kqueue_reactor::kqueue_reactor(boost::asio::execution_context& ctx)
0049 : execution_context_service_base<kqueue_reactor>(ctx),
0050 scheduler_(use_service<scheduler>(ctx)),
0051 mutex_(config(ctx).get("reactor", "registration_locking", true),
0052 config(ctx).get("reactor", "registration_locking_spin_count", 0)),
0053 kqueue_fd_(do_kqueue_create()),
0054 interrupter_(),
0055 shutdown_(false),
0056 io_locking_(config(ctx).get("reactor", "io_locking", true)),
0057 io_locking_spin_count_(
0058 config(ctx).get("reactor", "io_locking_spin_count", 0)),
0059 registered_descriptors_mutex_(mutex_.enabled()),
0060 registered_descriptors_(
0061 config(ctx).get("reactor", "preallocated_io_objects", 0U),
0062 io_locking_, io_locking_spin_count_)
0063 {
0064 struct kevent events[1];
0065 BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
0066 EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
0067 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0068 {
0069 boost::system::error_code error(errno,
0070 boost::asio::error::get_system_category());
0071 boost::asio::detail::throw_error(error);
0072 }
0073 }
0074
0075 kqueue_reactor::~kqueue_reactor()
0076 {
0077 close(kqueue_fd_);
0078 }
0079
0080 void kqueue_reactor::shutdown()
0081 {
0082 mutex::scoped_lock lock(mutex_);
0083 shutdown_ = true;
0084 lock.unlock();
0085
0086 op_queue<operation> ops;
0087
0088 while (descriptor_state* state = registered_descriptors_.first())
0089 {
0090 for (int i = 0; i < max_ops; ++i)
0091 ops.push(state->op_queue_[i]);
0092 state->shutdown_ = true;
0093 registered_descriptors_.free(state);
0094 }
0095
0096 timer_queues_.get_all_timers(ops);
0097
0098 scheduler_.abandon_operations(ops);
0099 }
0100
0101 void kqueue_reactor::notify_fork(
0102 boost::asio::execution_context::fork_event fork_ev)
0103 {
0104 if (fork_ev == boost::asio::execution_context::fork_child)
0105 {
0106
0107 kqueue_fd_ = -1;
0108 kqueue_fd_ = do_kqueue_create();
0109
0110 interrupter_.recreate();
0111
0112 struct kevent events[2];
0113 BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
0114 EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
0115 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0116 {
0117 boost::system::error_code ec(errno,
0118 boost::asio::error::get_system_category());
0119 boost::asio::detail::throw_error(ec, "kqueue interrupter registration");
0120 }
0121
0122
0123 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0124 for (descriptor_state* state = registered_descriptors_.first();
0125 state != 0; state = state->next_)
0126 {
0127 if (state->num_kevents_ > 0)
0128 {
0129 BOOST_ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_,
0130 EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
0131 BOOST_ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
0132 EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
0133 if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1)
0134 {
0135 boost::system::error_code ec(errno,
0136 boost::asio::error::get_system_category());
0137 boost::asio::detail::throw_error(ec, "kqueue re-registration");
0138 }
0139 }
0140 }
0141 }
0142 }
0143
0144 void kqueue_reactor::init_task()
0145 {
0146 scheduler_.init_task();
0147 }
0148
0149 int kqueue_reactor::register_descriptor(socket_type descriptor,
0150 kqueue_reactor::per_descriptor_data& descriptor_data)
0151 {
0152 descriptor_data = allocate_descriptor_state();
0153
0154 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0155 context(), static_cast<uintmax_t>(descriptor),
0156 reinterpret_cast<uintmax_t>(descriptor_data)));
0157
0158 mutex::scoped_lock lock(descriptor_data->mutex_);
0159
0160 descriptor_data->descriptor_ = descriptor;
0161 descriptor_data->num_kevents_ = 0;
0162 descriptor_data->shutdown_ = false;
0163
0164 return 0;
0165 }
0166
0167 int kqueue_reactor::register_internal_descriptor(
0168 int op_type, socket_type descriptor,
0169 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
0170 {
0171 descriptor_data = allocate_descriptor_state();
0172
0173 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0174 context(), static_cast<uintmax_t>(descriptor),
0175 reinterpret_cast<uintmax_t>(descriptor_data)));
0176
0177 mutex::scoped_lock lock(descriptor_data->mutex_);
0178
0179 descriptor_data->descriptor_ = descriptor;
0180 descriptor_data->num_kevents_ = 1;
0181 descriptor_data->shutdown_ = false;
0182 descriptor_data->op_queue_[op_type].push(op);
0183
0184 struct kevent events[1];
0185 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0186 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0187 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0188 return errno;
0189
0190 return 0;
0191 }
0192
0193 void kqueue_reactor::move_descriptor(socket_type,
0194 kqueue_reactor::per_descriptor_data& target_descriptor_data,
0195 kqueue_reactor::per_descriptor_data& source_descriptor_data)
0196 {
0197 target_descriptor_data = source_descriptor_data;
0198 source_descriptor_data = 0;
0199 }
0200
0201 void kqueue_reactor::call_post_immediate_completion(
0202 operation* op, bool is_continuation, const void* self)
0203 {
0204 static_cast<const kqueue_reactor*>(self)->post_immediate_completion(
0205 op, is_continuation);
0206 }
0207
0208 void kqueue_reactor::start_op(int op_type, socket_type descriptor,
0209 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
0210 bool is_continuation, bool allow_speculative,
0211 void (*on_immediate)(operation*, bool, const void*),
0212 const void* immediate_arg)
0213 {
0214 if (!descriptor_data)
0215 {
0216 op->ec_ = boost::asio::error::bad_descriptor;
0217 on_immediate(op, is_continuation, immediate_arg);
0218 return;
0219 }
0220
0221 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0222
0223 if (descriptor_data->shutdown_)
0224 {
0225 on_immediate(op, is_continuation, immediate_arg);
0226 return;
0227 }
0228
0229 if (descriptor_data->op_queue_[op_type].empty())
0230 {
0231 static const int num_kevents[max_ops] = { 1, 2, 1 };
0232
0233 if (allow_speculative
0234 && (op_type != read_op
0235 || descriptor_data->op_queue_[except_op].empty()))
0236 {
0237 if (op->perform())
0238 {
0239 descriptor_lock.unlock();
0240 on_immediate(op, is_continuation, immediate_arg);
0241 return;
0242 }
0243
0244 if (descriptor_data->num_kevents_ < num_kevents[op_type])
0245 {
0246 struct kevent events[2];
0247 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0248 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0249 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
0250 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0251 if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1)
0252 {
0253 descriptor_data->num_kevents_ = num_kevents[op_type];
0254 }
0255 else
0256 {
0257 op->ec_ = boost::system::error_code(errno,
0258 boost::asio::error::get_system_category());
0259 on_immediate(op, is_continuation, immediate_arg);
0260 return;
0261 }
0262 }
0263 }
0264 else
0265 {
0266 if (descriptor_data->num_kevents_ < num_kevents[op_type])
0267 descriptor_data->num_kevents_ = num_kevents[op_type];
0268
0269 struct kevent events[2];
0270 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0271 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0272 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
0273 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0274 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0275 }
0276 }
0277
0278 descriptor_data->op_queue_[op_type].push(op);
0279 scheduler_.work_started();
0280 }
0281
0282 void kqueue_reactor::cancel_ops(socket_type,
0283 kqueue_reactor::per_descriptor_data& descriptor_data)
0284 {
0285 if (!descriptor_data)
0286 return;
0287
0288 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0289
0290 op_queue<operation> ops;
0291 for (int i = 0; i < max_ops; ++i)
0292 {
0293 while (reactor_op* op = descriptor_data->op_queue_[i].front())
0294 {
0295 op->ec_ = boost::asio::error::operation_aborted;
0296 descriptor_data->op_queue_[i].pop();
0297 ops.push(op);
0298 }
0299 }
0300
0301 descriptor_lock.unlock();
0302
0303 scheduler_.post_deferred_completions(ops);
0304 }
0305
0306 void kqueue_reactor::cancel_ops_by_key(socket_type,
0307 kqueue_reactor::per_descriptor_data& descriptor_data,
0308 int op_type, void* cancellation_key)
0309 {
0310 if (!descriptor_data)
0311 return;
0312
0313 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0314
0315 op_queue<operation> ops;
0316 op_queue<reactor_op> other_ops;
0317 while (reactor_op* op = descriptor_data->op_queue_[op_type].front())
0318 {
0319 descriptor_data->op_queue_[op_type].pop();
0320 if (op->cancellation_key_ == cancellation_key)
0321 {
0322 op->ec_ = boost::asio::error::operation_aborted;
0323 ops.push(op);
0324 }
0325 else
0326 other_ops.push(op);
0327 }
0328 descriptor_data->op_queue_[op_type].push(other_ops);
0329
0330 descriptor_lock.unlock();
0331
0332 scheduler_.post_deferred_completions(ops);
0333 }
0334
0335 void kqueue_reactor::deregister_descriptor(socket_type descriptor,
0336 kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
0337 {
0338 if (!descriptor_data)
0339 return;
0340
0341 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0342
0343 if (!descriptor_data->shutdown_)
0344 {
0345 if (closing)
0346 {
0347
0348
0349 }
0350 else
0351 {
0352 struct kevent events[2];
0353 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
0354 EVFILT_READ, EV_DELETE, 0, 0, 0);
0355 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
0356 EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0357 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0358 }
0359
0360 op_queue<operation> ops;
0361 for (int i = 0; i < max_ops; ++i)
0362 {
0363 while (reactor_op* op = descriptor_data->op_queue_[i].front())
0364 {
0365 op->ec_ = boost::asio::error::operation_aborted;
0366 descriptor_data->op_queue_[i].pop();
0367 ops.push(op);
0368 }
0369 }
0370
0371 descriptor_data->descriptor_ = -1;
0372 descriptor_data->shutdown_ = true;
0373
0374 descriptor_lock.unlock();
0375
0376 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0377 context(), static_cast<uintmax_t>(descriptor),
0378 reinterpret_cast<uintmax_t>(descriptor_data)));
0379
0380 scheduler_.post_deferred_completions(ops);
0381
0382
0383
0384 }
0385 else
0386 {
0387
0388
0389 descriptor_data = 0;
0390 }
0391 }
0392
0393 void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
0394 kqueue_reactor::per_descriptor_data& descriptor_data)
0395 {
0396 if (!descriptor_data)
0397 return;
0398
0399 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0400
0401 if (!descriptor_data->shutdown_)
0402 {
0403 struct kevent events[2];
0404 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
0405 EVFILT_READ, EV_DELETE, 0, 0, 0);
0406 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
0407 EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0408 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0409
0410 op_queue<operation> ops;
0411 for (int i = 0; i < max_ops; ++i)
0412 ops.push(descriptor_data->op_queue_[i]);
0413
0414 descriptor_data->descriptor_ = -1;
0415 descriptor_data->shutdown_ = true;
0416
0417 descriptor_lock.unlock();
0418
0419 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0420 context(), static_cast<uintmax_t>(descriptor),
0421 reinterpret_cast<uintmax_t>(descriptor_data)));
0422
0423
0424
0425 }
0426 else
0427 {
0428
0429
0430 descriptor_data = 0;
0431 }
0432 }
0433
0434 void kqueue_reactor::cleanup_descriptor_data(
0435 per_descriptor_data& descriptor_data)
0436 {
0437 if (descriptor_data)
0438 {
0439 free_descriptor_state(descriptor_data);
0440 descriptor_data = 0;
0441 }
0442 }
0443
0444 void kqueue_reactor::run(long usec, op_queue<operation>& ops)
0445 {
0446 mutex::scoped_lock lock(mutex_);
0447
0448
0449 timespec timeout_buf = { 0, 0 };
0450 timespec* timeout = usec ? get_timeout(usec, timeout_buf) : &timeout_buf;
0451
0452 lock.unlock();
0453
0454
0455 struct kevent events[128];
0456 int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
0457
0458 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0459
0460 for (int i = 0; i < num_events; ++i)
0461 {
0462 void* ptr = reinterpret_cast<void*>(events[i].udata);
0463 if (ptr != &interrupter_)
0464 {
0465 unsigned event_mask = 0;
0466 switch (events[i].filter)
0467 {
0468 case EVFILT_READ:
0469 event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT;
0470 break;
0471 case EVFILT_WRITE:
0472 event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT;
0473 break;
0474 }
0475 if ((events[i].flags & (EV_ERROR | EV_OOBAND)) != 0)
0476 event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT;
0477 BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(),
0478 reinterpret_cast<uintmax_t>(ptr), event_mask));
0479 }
0480 }
0481 #endif
0482
0483
0484 for (int i = 0; i < num_events; ++i)
0485 {
0486 void* ptr = reinterpret_cast<void*>(events[i].udata);
0487 if (ptr == &interrupter_)
0488 {
0489 interrupter_.reset();
0490 }
0491 else
0492 {
0493 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
0494 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0495
0496 if (events[i].filter == EVFILT_WRITE
0497 && descriptor_data->num_kevents_ == 2
0498 && descriptor_data->op_queue_[write_op].empty())
0499 {
0500
0501
0502
0503
0504 struct kevent delete_events[1];
0505 BOOST_ASIO_KQUEUE_EV_SET(&delete_events[0],
0506 descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0507 ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0);
0508 descriptor_data->num_kevents_ = 1;
0509 }
0510
0511
0512
0513 #if defined(__NetBSD__)
0514 static const unsigned int filter[max_ops] =
0515 #else
0516 static const int filter[max_ops] =
0517 #endif
0518 { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
0519 for (int j = max_ops - 1; j >= 0; --j)
0520 {
0521 if (events[i].filter == filter[j])
0522 {
0523 if (j != except_op || events[i].flags & EV_OOBAND)
0524 {
0525 while (reactor_op* op = descriptor_data->op_queue_[j].front())
0526 {
0527 if (events[i].flags & EV_ERROR)
0528 {
0529 op->ec_ = boost::system::error_code(
0530 static_cast<int>(events[i].data),
0531 boost::asio::error::get_system_category());
0532 descriptor_data->op_queue_[j].pop();
0533 ops.push(op);
0534 }
0535 if (op->perform())
0536 {
0537 descriptor_data->op_queue_[j].pop();
0538 ops.push(op);
0539 }
0540 else
0541 break;
0542 }
0543 }
0544 }
0545 }
0546 }
0547 }
0548
0549 lock.lock();
0550 timer_queues_.get_ready_timers(ops);
0551 }
0552
0553 void kqueue_reactor::interrupt()
0554 {
0555 interrupter_.interrupt();
0556 }
0557
0558 int kqueue_reactor::do_kqueue_create()
0559 {
0560 int fd = ::kqueue();
0561 if (fd == -1)
0562 {
0563 boost::system::error_code ec(errno,
0564 boost::asio::error::get_system_category());
0565 boost::asio::detail::throw_error(ec, "kqueue");
0566 }
0567 return fd;
0568 }
0569
0570 kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
0571 {
0572 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0573 return registered_descriptors_.alloc(io_locking_, io_locking_spin_count_);
0574 }
0575
0576 void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
0577 {
0578 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0579 registered_descriptors_.free(s);
0580 }
0581
0582 void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
0583 {
0584 mutex::scoped_lock lock(mutex_);
0585 timer_queues_.insert(&queue);
0586 }
0587
0588 void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
0589 {
0590 mutex::scoped_lock lock(mutex_);
0591 timer_queues_.erase(&queue);
0592 }
0593
0594 timespec* kqueue_reactor::get_timeout(long usec, timespec& ts)
0595 {
0596
0597
0598 const long max_usec = 5 * 60 * 1000 * 1000;
0599 usec = timer_queues_.wait_duration_usec(
0600 (usec < 0 || max_usec < usec) ? max_usec : usec);
0601 ts.tv_sec = usec / 1000000;
0602 ts.tv_nsec = (usec % 1000000) * 1000;
0603 return &ts;
0604 }
0605
0606 }
0607 }
0608 }
0609
0610 #undef BOOST_ASIO_KQUEUE_EV_SET
0611
0612 #include <boost/asio/detail/pop_options.hpp>
0613
0614 #endif
0615
0616 #endif