File indexing completed on 2025-01-18 09:28:50
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
0012 #define BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/push_options.hpp>
0019
0020 namespace boost {
0021 namespace asio {
0022 namespace experimental {
0023 namespace detail {
0024
0025 template <typename Mutex>
0026 inline channel_service<Mutex>::channel_service(execution_context& ctx)
0027 : boost::asio::detail::execution_context_service_base<channel_service>(ctx),
0028 mutex_(),
0029 impl_list_(0)
0030 {
0031 }
0032
0033 template <typename Mutex>
0034 inline void channel_service<Mutex>::shutdown()
0035 {
0036
0037 boost::asio::detail::op_queue<channel_operation> ops;
0038 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0039 base_implementation_type* impl = impl_list_;
0040 while (impl)
0041 {
0042 ops.push(impl->waiters_);
0043 impl = impl->next_;
0044 }
0045 }
0046
0047 template <typename Mutex>
0048 inline void channel_service<Mutex>::construct(
0049 channel_service<Mutex>::base_implementation_type& impl,
0050 std::size_t max_buffer_size)
0051 {
0052 impl.max_buffer_size_ = max_buffer_size;
0053 impl.receive_state_ = block;
0054 impl.send_state_ = max_buffer_size ? buffer : block;
0055
0056
0057 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0058 impl.next_ = impl_list_;
0059 impl.prev_ = 0;
0060 if (impl_list_)
0061 impl_list_->prev_ = &impl;
0062 impl_list_ = &impl;
0063 }
0064
0065 template <typename Mutex>
0066 template <typename Traits, typename... Signatures>
0067 void channel_service<Mutex>::destroy(
0068 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0069 {
0070 cancel(impl);
0071 base_destroy(impl);
0072 }
0073
0074 template <typename Mutex>
0075 template <typename Traits, typename... Signatures>
0076 void channel_service<Mutex>::move_construct(
0077 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0078 channel_service<Mutex>::implementation_type<
0079 Traits, Signatures...>& other_impl)
0080 {
0081 impl.max_buffer_size_ = other_impl.max_buffer_size_;
0082 impl.receive_state_ = other_impl.receive_state_;
0083 other_impl.receive_state_ = block;
0084 impl.send_state_ = other_impl.send_state_;
0085 other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
0086 impl.buffer_move_from(other_impl);
0087
0088
0089 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0090 impl.next_ = impl_list_;
0091 impl.prev_ = 0;
0092 if (impl_list_)
0093 impl_list_->prev_ = &impl;
0094 impl_list_ = &impl;
0095 }
0096
0097 template <typename Mutex>
0098 template <typename Traits, typename... Signatures>
0099 void channel_service<Mutex>::move_assign(
0100 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0101 channel_service& other_service,
0102 channel_service<Mutex>::implementation_type<
0103 Traits, Signatures...>& other_impl)
0104 {
0105 cancel(impl);
0106
0107 if (this != &other_service)
0108 {
0109
0110 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0111 if (impl_list_ == &impl)
0112 impl_list_ = impl.next_;
0113 if (impl.prev_)
0114 impl.prev_->next_ = impl.next_;
0115 if (impl.next_)
0116 impl.next_->prev_= impl.prev_;
0117 impl.next_ = 0;
0118 impl.prev_ = 0;
0119 }
0120
0121 impl.max_buffer_size_ = other_impl.max_buffer_size_;
0122 impl.receive_state_ = other_impl.receive_state_;
0123 other_impl.receive_state_ = block;
0124 impl.send_state_ = other_impl.send_state_;
0125 other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
0126 impl.buffer_move_from(other_impl);
0127
0128 if (this != &other_service)
0129 {
0130
0131 boost::asio::detail::mutex::scoped_lock lock(other_service.mutex_);
0132 impl.next_ = other_service.impl_list_;
0133 impl.prev_ = 0;
0134 if (other_service.impl_list_)
0135 other_service.impl_list_->prev_ = &impl;
0136 other_service.impl_list_ = &impl;
0137 }
0138 }
0139
0140 template <typename Mutex>
0141 inline void channel_service<Mutex>::base_destroy(
0142 channel_service<Mutex>::base_implementation_type& impl)
0143 {
0144
0145 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0146 if (impl_list_ == &impl)
0147 impl_list_ = impl.next_;
0148 if (impl.prev_)
0149 impl.prev_->next_ = impl.next_;
0150 if (impl.next_)
0151 impl.next_->prev_= impl.prev_;
0152 impl.next_ = 0;
0153 impl.prev_ = 0;
0154 }
0155
0156 template <typename Mutex>
0157 inline std::size_t channel_service<Mutex>::capacity(
0158 const channel_service<Mutex>::base_implementation_type& impl)
0159 const noexcept
0160 {
0161 typename Mutex::scoped_lock lock(impl.mutex_);
0162
0163 return impl.max_buffer_size_;
0164 }
0165
0166 template <typename Mutex>
0167 inline bool channel_service<Mutex>::is_open(
0168 const channel_service<Mutex>::base_implementation_type& impl)
0169 const noexcept
0170 {
0171 typename Mutex::scoped_lock lock(impl.mutex_);
0172
0173 return impl.send_state_ != closed;
0174 }
0175
0176 template <typename Mutex>
0177 template <typename Traits, typename... Signatures>
0178 void channel_service<Mutex>::reset(
0179 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0180 {
0181 cancel(impl);
0182
0183 typename Mutex::scoped_lock lock(impl.mutex_);
0184
0185 impl.receive_state_ = block;
0186 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0187 impl.buffer_clear();
0188 }
0189
0190 template <typename Mutex>
0191 template <typename Traits, typename... Signatures>
0192 void channel_service<Mutex>::close(
0193 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0194 {
0195 typedef typename implementation_type<Traits,
0196 Signatures...>::traits_type traits_type;
0197 typedef typename implementation_type<Traits,
0198 Signatures...>::payload_type payload_type;
0199
0200 typename Mutex::scoped_lock lock(impl.mutex_);
0201
0202 if (impl.receive_state_ == block)
0203 {
0204 while (channel_operation* op = impl.waiters_.front())
0205 {
0206 impl.waiters_.pop();
0207 traits_type::invoke_receive_closed(
0208 post_receive<payload_type,
0209 typename traits_type::receive_closed_signature>(
0210 static_cast<channel_receive<payload_type>*>(op)));
0211 }
0212 }
0213
0214 impl.send_state_ = closed;
0215 if (impl.receive_state_ != buffer)
0216 impl.receive_state_ = closed;
0217 }
0218
0219 template <typename Mutex>
0220 template <typename Traits, typename... Signatures>
0221 void channel_service<Mutex>::cancel(
0222 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0223 {
0224 typedef typename implementation_type<Traits,
0225 Signatures...>::traits_type traits_type;
0226 typedef typename implementation_type<Traits,
0227 Signatures...>::payload_type payload_type;
0228
0229 typename Mutex::scoped_lock lock(impl.mutex_);
0230
0231 while (channel_operation* op = impl.waiters_.front())
0232 {
0233 if (impl.send_state_ == block)
0234 {
0235 impl.waiters_.pop();
0236 static_cast<channel_send<payload_type>*>(op)->cancel();
0237 }
0238 else
0239 {
0240 impl.waiters_.pop();
0241 traits_type::invoke_receive_cancelled(
0242 post_receive<payload_type,
0243 typename traits_type::receive_cancelled_signature>(
0244 static_cast<channel_receive<payload_type>*>(op)));
0245 }
0246 }
0247
0248 if (impl.receive_state_ == waiter)
0249 impl.receive_state_ = block;
0250 if (impl.send_state_ == waiter)
0251 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0252 }
0253
0254 template <typename Mutex>
0255 template <typename Traits, typename... Signatures>
0256 void channel_service<Mutex>::cancel_by_key(
0257 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0258 void* cancellation_key)
0259 {
0260 typedef typename implementation_type<Traits,
0261 Signatures...>::traits_type traits_type;
0262 typedef typename implementation_type<Traits,
0263 Signatures...>::payload_type payload_type;
0264
0265 typename Mutex::scoped_lock lock(impl.mutex_);
0266
0267 boost::asio::detail::op_queue<channel_operation> other_ops;
0268 while (channel_operation* op = impl.waiters_.front())
0269 {
0270 if (op->cancellation_key_ == cancellation_key)
0271 {
0272 if (impl.send_state_ == block)
0273 {
0274 impl.waiters_.pop();
0275 static_cast<channel_send<payload_type>*>(op)->cancel();
0276 }
0277 else
0278 {
0279 impl.waiters_.pop();
0280 traits_type::invoke_receive_cancelled(
0281 post_receive<payload_type,
0282 typename traits_type::receive_cancelled_signature>(
0283 static_cast<channel_receive<payload_type>*>(op)));
0284 }
0285 }
0286 else
0287 {
0288 impl.waiters_.pop();
0289 other_ops.push(op);
0290 }
0291 }
0292 impl.waiters_.push(other_ops);
0293
0294 if (impl.waiters_.empty())
0295 {
0296 if (impl.receive_state_ == waiter)
0297 impl.receive_state_ = block;
0298 if (impl.send_state_ == waiter)
0299 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0300 }
0301 }
0302
0303 template <typename Mutex>
0304 inline bool channel_service<Mutex>::ready(
0305 const channel_service<Mutex>::base_implementation_type& impl)
0306 const noexcept
0307 {
0308 typename Mutex::scoped_lock lock(impl.mutex_);
0309
0310 return impl.receive_state_ != block;
0311 }
0312
0313 template <typename Mutex>
0314 template <typename Message, typename Traits,
0315 typename... Signatures, typename... Args>
0316 bool channel_service<Mutex>::try_send(
0317 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0318 bool via_dispatch, Args&&... args)
0319 {
0320 typedef typename implementation_type<Traits,
0321 Signatures...>::payload_type payload_type;
0322
0323 typename Mutex::scoped_lock lock(impl.mutex_);
0324
0325 switch (impl.send_state_)
0326 {
0327 case block:
0328 {
0329 return false;
0330 }
0331 case buffer:
0332 {
0333 impl.buffer_push(Message(0, static_cast<Args&&>(args)...));
0334 impl.receive_state_ = buffer;
0335 if (impl.buffer_size() == impl.max_buffer_size_)
0336 impl.send_state_ = block;
0337 return true;
0338 }
0339 case waiter:
0340 {
0341 payload_type payload(Message(0, static_cast<Args&&>(args)...));
0342 channel_receive<payload_type>* receive_op =
0343 static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0344 impl.waiters_.pop();
0345 if (impl.waiters_.empty())
0346 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0347 lock.unlock();
0348 if (via_dispatch)
0349 receive_op->dispatch(static_cast<payload_type&&>(payload));
0350 else
0351 receive_op->post(static_cast<payload_type&&>(payload));
0352 return true;
0353 }
0354 case closed:
0355 default:
0356 {
0357 return false;
0358 }
0359 }
0360 }
0361
0362 template <typename Mutex>
0363 template <typename Message, typename Traits,
0364 typename... Signatures, typename... Args>
0365 std::size_t channel_service<Mutex>::try_send_n(
0366 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0367 std::size_t count, bool via_dispatch, Args&&... args)
0368 {
0369 typedef typename implementation_type<Traits,
0370 Signatures...>::payload_type payload_type;
0371
0372 typename Mutex::scoped_lock lock(impl.mutex_);
0373
0374 if (count == 0)
0375 return 0;
0376
0377 switch (impl.send_state_)
0378 {
0379 case block:
0380 return 0;
0381 case buffer:
0382 case waiter:
0383 break;
0384 case closed:
0385 default:
0386 return 0;
0387 }
0388
0389 payload_type payload(Message(0, static_cast<Args&&>(args)...));
0390
0391 for (std::size_t i = 0; i < count; ++i)
0392 {
0393 switch (impl.send_state_)
0394 {
0395 case block:
0396 {
0397 return i;
0398 }
0399 case buffer:
0400 {
0401 i += impl.buffer_push_n(count - i,
0402 static_cast<payload_type&&>(payload));
0403 impl.receive_state_ = buffer;
0404 if (impl.buffer_size() == impl.max_buffer_size_)
0405 impl.send_state_ = block;
0406 return i;
0407 }
0408 case waiter:
0409 {
0410 channel_receive<payload_type>* receive_op =
0411 static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0412 impl.waiters_.pop();
0413 if (impl.waiters_.empty())
0414 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0415 lock.unlock();
0416 if (via_dispatch)
0417 receive_op->dispatch(payload);
0418 else
0419 receive_op->post(payload);
0420 break;
0421 }
0422 case closed:
0423 default:
0424 {
0425 return i;
0426 }
0427 }
0428 }
0429
0430 return count;
0431 }
0432
0433 template <typename Mutex>
0434 template <typename Traits, typename... Signatures>
0435 void channel_service<Mutex>::start_send_op(
0436 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0437 channel_send<typename implementation_type<
0438 Traits, Signatures...>::payload_type>* send_op)
0439 {
0440 typedef typename implementation_type<Traits,
0441 Signatures...>::payload_type payload_type;
0442
0443 typename Mutex::scoped_lock lock(impl.mutex_);
0444
0445 switch (impl.send_state_)
0446 {
0447 case block:
0448 {
0449 impl.waiters_.push(send_op);
0450 if (impl.receive_state_ == block)
0451 impl.receive_state_ = waiter;
0452 return;
0453 }
0454 case buffer:
0455 {
0456 impl.buffer_push(send_op->get_payload());
0457 impl.receive_state_ = buffer;
0458 if (impl.buffer_size() == impl.max_buffer_size_)
0459 impl.send_state_ = block;
0460 send_op->immediate();
0461 break;
0462 }
0463 case waiter:
0464 {
0465 channel_receive<payload_type>* receive_op =
0466 static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0467 impl.waiters_.pop();
0468 if (impl.waiters_.empty())
0469 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0470 receive_op->post(send_op->get_payload());
0471 send_op->immediate();
0472 break;
0473 }
0474 case closed:
0475 default:
0476 {
0477 send_op->close();
0478 break;
0479 }
0480 }
0481 }
0482
0483 template <typename Mutex>
0484 template <typename Traits, typename... Signatures, typename Handler>
0485 bool channel_service<Mutex>::try_receive(
0486 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0487 Handler&& handler)
0488 {
0489 typedef typename implementation_type<Traits,
0490 Signatures...>::payload_type payload_type;
0491
0492 typename Mutex::scoped_lock lock(impl.mutex_);
0493
0494 switch (impl.receive_state_)
0495 {
0496 case block:
0497 {
0498 return false;
0499 }
0500 case buffer:
0501 {
0502 payload_type payload(impl.buffer_front());
0503 if (channel_send<payload_type>* send_op =
0504 static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
0505 {
0506 impl.buffer_pop();
0507 impl.buffer_push(send_op->get_payload());
0508 impl.waiters_.pop();
0509 send_op->post();
0510 }
0511 else
0512 {
0513 impl.buffer_pop();
0514 if (impl.buffer_size() == 0)
0515 impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0516 impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
0517 }
0518 lock.unlock();
0519 boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
0520 channel_handler<payload_type, decay_t<Handler>>(
0521 static_cast<payload_type&&>(payload), handler2.value)();
0522 return true;
0523 }
0524 case waiter:
0525 {
0526 channel_send<payload_type>* send_op =
0527 static_cast<channel_send<payload_type>*>(impl.waiters_.front());
0528 payload_type payload = send_op->get_payload();
0529 impl.waiters_.pop();
0530 if (impl.waiters_.front() == 0)
0531 impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0532 send_op->post();
0533 lock.unlock();
0534 boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
0535 channel_handler<payload_type, decay_t<Handler>>(
0536 static_cast<payload_type&&>(payload), handler2.value)();
0537 return true;
0538 }
0539 case closed:
0540 default:
0541 {
0542 return false;
0543 }
0544 }
0545 }
0546
0547 template <typename Mutex>
0548 template <typename Traits, typename... Signatures>
0549 void channel_service<Mutex>::start_receive_op(
0550 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0551 channel_receive<typename implementation_type<
0552 Traits, Signatures...>::payload_type>* receive_op)
0553 {
0554 typedef typename implementation_type<Traits,
0555 Signatures...>::traits_type traits_type;
0556 typedef typename implementation_type<Traits,
0557 Signatures...>::payload_type payload_type;
0558
0559 typename Mutex::scoped_lock lock(impl.mutex_);
0560
0561 switch (impl.receive_state_)
0562 {
0563 case block:
0564 {
0565 impl.waiters_.push(receive_op);
0566 if (impl.send_state_ != closed)
0567 impl.send_state_ = waiter;
0568 return;
0569 }
0570 case buffer:
0571 {
0572 payload_type payload(
0573 static_cast<payload_type&&>(impl.buffer_front()));
0574 if (channel_send<payload_type>* send_op =
0575 static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
0576 {
0577 impl.buffer_pop();
0578 impl.buffer_push(send_op->get_payload());
0579 impl.waiters_.pop();
0580 send_op->post();
0581 }
0582 else
0583 {
0584 impl.buffer_pop();
0585 if (impl.buffer_size() == 0)
0586 impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0587 impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
0588 }
0589 receive_op->immediate(static_cast<payload_type&&>(payload));
0590 break;
0591 }
0592 case waiter:
0593 {
0594 channel_send<payload_type>* send_op =
0595 static_cast<channel_send<payload_type>*>(impl.waiters_.front());
0596 payload_type payload = send_op->get_payload();
0597 impl.waiters_.pop();
0598 if (impl.waiters_.front() == 0)
0599 impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0600 send_op->post();
0601 receive_op->immediate(static_cast<payload_type&&>(payload));
0602 break;
0603 }
0604 case closed:
0605 default:
0606 {
0607 traits_type::invoke_receive_closed(
0608 post_receive<payload_type,
0609 typename traits_type::receive_closed_signature>(receive_op));
0610 break;
0611 }
0612 }
0613 }
0614
0615 }
0616 }
0617 }
0618 }
0619
0620 #include <boost/asio/detail/pop_options.hpp>
0621
0622 #endif