File indexing completed on 2025-01-18 09:28:32
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #ifndef BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
0013 #define BOOST_ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP
0014
0015 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0016 # pragma once
0017 #endif
0018
0019 #include <boost/asio/detail/config.hpp>
0020
0021 #if defined(BOOST_ASIO_HAS_KQUEUE)
0022
0023 #include <boost/asio/detail/kqueue_reactor.hpp>
0024 #include <boost/asio/detail/scheduler.hpp>
0025 #include <boost/asio/detail/throw_error.hpp>
0026 #include <boost/asio/error.hpp>
0027
0028 #if defined(__NetBSD__)
0029 # include <sys/param.h>
0030 #endif
0031
0032 #include <boost/asio/detail/push_options.hpp>
0033
0034 #if defined(__NetBSD__) && __NetBSD_Version__ < 999001500
0035 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
0036 EV_SET(ev, ident, filt, flags, fflags, data, \
0037 reinterpret_cast<intptr_t>(static_cast<void*>(udata)))
0038 #else
0039 # define BOOST_ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \
0040 EV_SET(ev, ident, filt, flags, fflags, data, udata)
0041 #endif
0042
0043 namespace boost {
0044 namespace asio {
0045 namespace detail {
0046
0047 kqueue_reactor::kqueue_reactor(boost::asio::execution_context& ctx)
0048 : execution_context_service_base<kqueue_reactor>(ctx),
0049 scheduler_(use_service<scheduler>(ctx)),
0050 mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
0051 REACTOR_REGISTRATION, scheduler_.concurrency_hint())),
0052 kqueue_fd_(do_kqueue_create()),
0053 interrupter_(),
0054 shutdown_(false),
0055 registered_descriptors_mutex_(mutex_.enabled())
0056 {
0057 struct kevent events[1];
0058 BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
0059 EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
0060 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0061 {
0062 boost::system::error_code error(errno,
0063 boost::asio::error::get_system_category());
0064 boost::asio::detail::throw_error(error);
0065 }
0066 }
0067
0068 kqueue_reactor::~kqueue_reactor()
0069 {
0070 close(kqueue_fd_);
0071 }
0072
0073 void kqueue_reactor::shutdown()
0074 {
0075 mutex::scoped_lock lock(mutex_);
0076 shutdown_ = true;
0077 lock.unlock();
0078
0079 op_queue<operation> ops;
0080
0081 while (descriptor_state* state = registered_descriptors_.first())
0082 {
0083 for (int i = 0; i < max_ops; ++i)
0084 ops.push(state->op_queue_[i]);
0085 state->shutdown_ = true;
0086 registered_descriptors_.free(state);
0087 }
0088
0089 timer_queues_.get_all_timers(ops);
0090
0091 scheduler_.abandon_operations(ops);
0092 }
0093
0094 void kqueue_reactor::notify_fork(
0095 boost::asio::execution_context::fork_event fork_ev)
0096 {
0097 if (fork_ev == boost::asio::execution_context::fork_child)
0098 {
0099
0100 kqueue_fd_ = -1;
0101 kqueue_fd_ = do_kqueue_create();
0102
0103 interrupter_.recreate();
0104
0105 struct kevent events[2];
0106 BOOST_ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(),
0107 EVFILT_READ, EV_ADD, 0, 0, &interrupter_);
0108 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0109 {
0110 boost::system::error_code ec(errno,
0111 boost::asio::error::get_system_category());
0112 boost::asio::detail::throw_error(ec, "kqueue interrupter registration");
0113 }
0114
0115
0116 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0117 for (descriptor_state* state = registered_descriptors_.first();
0118 state != 0; state = state->next_)
0119 {
0120 if (state->num_kevents_ > 0)
0121 {
0122 BOOST_ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_,
0123 EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state);
0124 BOOST_ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_,
0125 EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state);
0126 if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1)
0127 {
0128 boost::system::error_code ec(errno,
0129 boost::asio::error::get_system_category());
0130 boost::asio::detail::throw_error(ec, "kqueue re-registration");
0131 }
0132 }
0133 }
0134 }
0135 }
0136
0137 void kqueue_reactor::init_task()
0138 {
0139 scheduler_.init_task();
0140 }
0141
0142 int kqueue_reactor::register_descriptor(socket_type descriptor,
0143 kqueue_reactor::per_descriptor_data& descriptor_data)
0144 {
0145 descriptor_data = allocate_descriptor_state();
0146
0147 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0148 context(), static_cast<uintmax_t>(descriptor),
0149 reinterpret_cast<uintmax_t>(descriptor_data)));
0150
0151 mutex::scoped_lock lock(descriptor_data->mutex_);
0152
0153 descriptor_data->descriptor_ = descriptor;
0154 descriptor_data->num_kevents_ = 0;
0155 descriptor_data->shutdown_ = false;
0156
0157 return 0;
0158 }
0159
0160 int kqueue_reactor::register_internal_descriptor(
0161 int op_type, socket_type descriptor,
0162 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op)
0163 {
0164 descriptor_data = allocate_descriptor_state();
0165
0166 BOOST_ASIO_HANDLER_REACTOR_REGISTRATION((
0167 context(), static_cast<uintmax_t>(descriptor),
0168 reinterpret_cast<uintmax_t>(descriptor_data)));
0169
0170 mutex::scoped_lock lock(descriptor_data->mutex_);
0171
0172 descriptor_data->descriptor_ = descriptor;
0173 descriptor_data->num_kevents_ = 1;
0174 descriptor_data->shutdown_ = false;
0175 descriptor_data->op_queue_[op_type].push(op);
0176
0177 struct kevent events[1];
0178 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0179 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0180 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1)
0181 return errno;
0182
0183 return 0;
0184 }
0185
0186 void kqueue_reactor::move_descriptor(socket_type,
0187 kqueue_reactor::per_descriptor_data& target_descriptor_data,
0188 kqueue_reactor::per_descriptor_data& source_descriptor_data)
0189 {
0190 target_descriptor_data = source_descriptor_data;
0191 source_descriptor_data = 0;
0192 }
0193
0194 void kqueue_reactor::call_post_immediate_completion(
0195 operation* op, bool is_continuation, const void* self)
0196 {
0197 static_cast<const kqueue_reactor*>(self)->post_immediate_completion(
0198 op, is_continuation);
0199 }
0200
0201 void kqueue_reactor::start_op(int op_type, socket_type descriptor,
0202 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op,
0203 bool is_continuation, bool allow_speculative,
0204 void (*on_immediate)(operation*, bool, const void*),
0205 const void* immediate_arg)
0206 {
0207 if (!descriptor_data)
0208 {
0209 op->ec_ = boost::asio::error::bad_descriptor;
0210 on_immediate(op, is_continuation, immediate_arg);
0211 return;
0212 }
0213
0214 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0215
0216 if (descriptor_data->shutdown_)
0217 {
0218 on_immediate(op, is_continuation, immediate_arg);
0219 return;
0220 }
0221
0222 if (descriptor_data->op_queue_[op_type].empty())
0223 {
0224 static const int num_kevents[max_ops] = { 1, 2, 1 };
0225
0226 if (allow_speculative
0227 && (op_type != read_op
0228 || descriptor_data->op_queue_[except_op].empty()))
0229 {
0230 if (op->perform())
0231 {
0232 descriptor_lock.unlock();
0233 on_immediate(op, is_continuation, immediate_arg);
0234 return;
0235 }
0236
0237 if (descriptor_data->num_kevents_ < num_kevents[op_type])
0238 {
0239 struct kevent events[2];
0240 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0241 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0242 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
0243 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0244 if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1)
0245 {
0246 descriptor_data->num_kevents_ = num_kevents[op_type];
0247 }
0248 else
0249 {
0250 op->ec_ = boost::system::error_code(errno,
0251 boost::asio::error::get_system_category());
0252 on_immediate(op, is_continuation, immediate_arg);
0253 return;
0254 }
0255 }
0256 }
0257 else
0258 {
0259 if (descriptor_data->num_kevents_ < num_kevents[op_type])
0260 descriptor_data->num_kevents_ = num_kevents[op_type];
0261
0262 struct kevent events[2];
0263 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ,
0264 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0265 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE,
0266 EV_ADD | EV_CLEAR, 0, 0, descriptor_data);
0267 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0268 }
0269 }
0270
0271 descriptor_data->op_queue_[op_type].push(op);
0272 scheduler_.work_started();
0273 }
0274
0275 void kqueue_reactor::cancel_ops(socket_type,
0276 kqueue_reactor::per_descriptor_data& descriptor_data)
0277 {
0278 if (!descriptor_data)
0279 return;
0280
0281 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0282
0283 op_queue<operation> ops;
0284 for (int i = 0; i < max_ops; ++i)
0285 {
0286 while (reactor_op* op = descriptor_data->op_queue_[i].front())
0287 {
0288 op->ec_ = boost::asio::error::operation_aborted;
0289 descriptor_data->op_queue_[i].pop();
0290 ops.push(op);
0291 }
0292 }
0293
0294 descriptor_lock.unlock();
0295
0296 scheduler_.post_deferred_completions(ops);
0297 }
0298
0299 void kqueue_reactor::cancel_ops_by_key(socket_type,
0300 kqueue_reactor::per_descriptor_data& descriptor_data,
0301 int op_type, void* cancellation_key)
0302 {
0303 if (!descriptor_data)
0304 return;
0305
0306 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0307
0308 op_queue<operation> ops;
0309 op_queue<reactor_op> other_ops;
0310 while (reactor_op* op = descriptor_data->op_queue_[op_type].front())
0311 {
0312 descriptor_data->op_queue_[op_type].pop();
0313 if (op->cancellation_key_ == cancellation_key)
0314 {
0315 op->ec_ = boost::asio::error::operation_aborted;
0316 ops.push(op);
0317 }
0318 else
0319 other_ops.push(op);
0320 }
0321 descriptor_data->op_queue_[op_type].push(other_ops);
0322
0323 descriptor_lock.unlock();
0324
0325 scheduler_.post_deferred_completions(ops);
0326 }
0327
0328 void kqueue_reactor::deregister_descriptor(socket_type descriptor,
0329 kqueue_reactor::per_descriptor_data& descriptor_data, bool closing)
0330 {
0331 if (!descriptor_data)
0332 return;
0333
0334 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0335
0336 if (!descriptor_data->shutdown_)
0337 {
0338 if (closing)
0339 {
0340
0341
0342 }
0343 else
0344 {
0345 struct kevent events[2];
0346 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
0347 EVFILT_READ, EV_DELETE, 0, 0, 0);
0348 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
0349 EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0350 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0351 }
0352
0353 op_queue<operation> ops;
0354 for (int i = 0; i < max_ops; ++i)
0355 {
0356 while (reactor_op* op = descriptor_data->op_queue_[i].front())
0357 {
0358 op->ec_ = boost::asio::error::operation_aborted;
0359 descriptor_data->op_queue_[i].pop();
0360 ops.push(op);
0361 }
0362 }
0363
0364 descriptor_data->descriptor_ = -1;
0365 descriptor_data->shutdown_ = true;
0366
0367 descriptor_lock.unlock();
0368
0369 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0370 context(), static_cast<uintmax_t>(descriptor),
0371 reinterpret_cast<uintmax_t>(descriptor_data)));
0372
0373 scheduler_.post_deferred_completions(ops);
0374
0375
0376
0377 }
0378 else
0379 {
0380
0381
0382 descriptor_data = 0;
0383 }
0384 }
0385
0386 void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor,
0387 kqueue_reactor::per_descriptor_data& descriptor_data)
0388 {
0389 if (!descriptor_data)
0390 return;
0391
0392 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0393
0394 if (!descriptor_data->shutdown_)
0395 {
0396 struct kevent events[2];
0397 BOOST_ASIO_KQUEUE_EV_SET(&events[0], descriptor,
0398 EVFILT_READ, EV_DELETE, 0, 0, 0);
0399 BOOST_ASIO_KQUEUE_EV_SET(&events[1], descriptor,
0400 EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0401 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0);
0402
0403 op_queue<operation> ops;
0404 for (int i = 0; i < max_ops; ++i)
0405 ops.push(descriptor_data->op_queue_[i]);
0406
0407 descriptor_data->descriptor_ = -1;
0408 descriptor_data->shutdown_ = true;
0409
0410 descriptor_lock.unlock();
0411
0412 BOOST_ASIO_HANDLER_REACTOR_DEREGISTRATION((
0413 context(), static_cast<uintmax_t>(descriptor),
0414 reinterpret_cast<uintmax_t>(descriptor_data)));
0415
0416
0417
0418 }
0419 else
0420 {
0421
0422
0423 descriptor_data = 0;
0424 }
0425 }
0426
0427 void kqueue_reactor::cleanup_descriptor_data(
0428 per_descriptor_data& descriptor_data)
0429 {
0430 if (descriptor_data)
0431 {
0432 free_descriptor_state(descriptor_data);
0433 descriptor_data = 0;
0434 }
0435 }
0436
0437 void kqueue_reactor::run(long usec, op_queue<operation>& ops)
0438 {
0439 mutex::scoped_lock lock(mutex_);
0440
0441
0442 timespec timeout_buf = { 0, 0 };
0443 timespec* timeout = usec ? get_timeout(usec, timeout_buf) : &timeout_buf;
0444
0445 lock.unlock();
0446
0447
0448 struct kevent events[128];
0449 int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout);
0450
0451 #if defined(BOOST_ASIO_ENABLE_HANDLER_TRACKING)
0452
0453 for (int i = 0; i < num_events; ++i)
0454 {
0455 void* ptr = reinterpret_cast<void*>(events[i].udata);
0456 if (ptr != &interrupter_)
0457 {
0458 unsigned event_mask = 0;
0459 switch (events[i].filter)
0460 {
0461 case EVFILT_READ:
0462 event_mask |= BOOST_ASIO_HANDLER_REACTOR_READ_EVENT;
0463 break;
0464 case EVFILT_WRITE:
0465 event_mask |= BOOST_ASIO_HANDLER_REACTOR_WRITE_EVENT;
0466 break;
0467 }
0468 if ((events[i].flags & (EV_ERROR | EV_OOBAND)) != 0)
0469 event_mask |= BOOST_ASIO_HANDLER_REACTOR_ERROR_EVENT;
0470 BOOST_ASIO_HANDLER_REACTOR_EVENTS((context(),
0471 reinterpret_cast<uintmax_t>(ptr), event_mask));
0472 }
0473 }
0474 #endif
0475
0476
0477 for (int i = 0; i < num_events; ++i)
0478 {
0479 void* ptr = reinterpret_cast<void*>(events[i].udata);
0480 if (ptr == &interrupter_)
0481 {
0482 interrupter_.reset();
0483 }
0484 else
0485 {
0486 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr);
0487 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
0488
0489 if (events[i].filter == EVFILT_WRITE
0490 && descriptor_data->num_kevents_ == 2
0491 && descriptor_data->op_queue_[write_op].empty())
0492 {
0493
0494
0495
0496
0497 struct kevent delete_events[1];
0498 BOOST_ASIO_KQUEUE_EV_SET(&delete_events[0],
0499 descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
0500 ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0);
0501 descriptor_data->num_kevents_ = 1;
0502 }
0503
0504
0505
0506 #if defined(__NetBSD__)
0507 static const unsigned int filter[max_ops] =
0508 #else
0509 static const int filter[max_ops] =
0510 #endif
0511 { EVFILT_READ, EVFILT_WRITE, EVFILT_READ };
0512 for (int j = max_ops - 1; j >= 0; --j)
0513 {
0514 if (events[i].filter == filter[j])
0515 {
0516 if (j != except_op || events[i].flags & EV_OOBAND)
0517 {
0518 while (reactor_op* op = descriptor_data->op_queue_[j].front())
0519 {
0520 if (events[i].flags & EV_ERROR)
0521 {
0522 op->ec_ = boost::system::error_code(
0523 static_cast<int>(events[i].data),
0524 boost::asio::error::get_system_category());
0525 descriptor_data->op_queue_[j].pop();
0526 ops.push(op);
0527 }
0528 if (op->perform())
0529 {
0530 descriptor_data->op_queue_[j].pop();
0531 ops.push(op);
0532 }
0533 else
0534 break;
0535 }
0536 }
0537 }
0538 }
0539 }
0540 }
0541
0542 lock.lock();
0543 timer_queues_.get_ready_timers(ops);
0544 }
0545
0546 void kqueue_reactor::interrupt()
0547 {
0548 interrupter_.interrupt();
0549 }
0550
0551 int kqueue_reactor::do_kqueue_create()
0552 {
0553 int fd = ::kqueue();
0554 if (fd == -1)
0555 {
0556 boost::system::error_code ec(errno,
0557 boost::asio::error::get_system_category());
0558 boost::asio::detail::throw_error(ec, "kqueue");
0559 }
0560 return fd;
0561 }
0562
0563 kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state()
0564 {
0565 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0566 return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
0567 REACTOR_IO, scheduler_.concurrency_hint()));
0568 }
0569
0570 void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s)
0571 {
0572 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
0573 registered_descriptors_.free(s);
0574 }
0575
0576 void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue)
0577 {
0578 mutex::scoped_lock lock(mutex_);
0579 timer_queues_.insert(&queue);
0580 }
0581
0582 void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue)
0583 {
0584 mutex::scoped_lock lock(mutex_);
0585 timer_queues_.erase(&queue);
0586 }
0587
0588 timespec* kqueue_reactor::get_timeout(long usec, timespec& ts)
0589 {
0590
0591
0592 const long max_usec = 5 * 60 * 1000 * 1000;
0593 usec = timer_queues_.wait_duration_usec(
0594 (usec < 0 || max_usec < usec) ? max_usec : usec);
0595 ts.tv_sec = usec / 1000000;
0596 ts.tv_nsec = (usec % 1000000) * 1000;
0597 return &ts;
0598 }
0599
0600 }
0601 }
0602 }
0603
0604 #undef BOOST_ASIO_KQUEUE_EV_SET
0605
0606 #include <boost/asio/detail/pop_options.hpp>
0607
0608 #endif
0609
0610 #endif