File indexing completed on 2025-07-02 08:06:38
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
0012 #define BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
0013
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif
0017
0018 #include <boost/asio/detail/push_options.hpp>
0019
0020 namespace boost {
0021 namespace asio {
0022 namespace experimental {
0023 namespace detail {
0024
0025 template <typename Mutex>
0026 inline channel_service<Mutex>::channel_service(
0027 boost::asio::execution_context& ctx)
0028 : boost::asio::detail::execution_context_service_base<channel_service>(ctx),
0029 mutex_(),
0030 impl_list_(0)
0031 {
0032 }
0033
0034 template <typename Mutex>
0035 inline void channel_service<Mutex>::shutdown()
0036 {
0037
0038 boost::asio::detail::op_queue<channel_operation> ops;
0039 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0040 base_implementation_type* impl = impl_list_;
0041 while (impl)
0042 {
0043 ops.push(impl->waiters_);
0044 impl = impl->next_;
0045 }
0046 }
0047
0048 template <typename Mutex>
0049 inline void channel_service<Mutex>::construct(
0050 channel_service<Mutex>::base_implementation_type& impl,
0051 std::size_t max_buffer_size)
0052 {
0053 impl.max_buffer_size_ = max_buffer_size;
0054 impl.receive_state_ = block;
0055 impl.send_state_ = max_buffer_size ? buffer : block;
0056
0057
0058 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0059 impl.next_ = impl_list_;
0060 impl.prev_ = 0;
0061 if (impl_list_)
0062 impl_list_->prev_ = &impl;
0063 impl_list_ = &impl;
0064 }
0065
0066 template <typename Mutex>
0067 template <typename Traits, typename... Signatures>
0068 void channel_service<Mutex>::destroy(
0069 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0070 {
0071 cancel(impl);
0072 base_destroy(impl);
0073 }
0074
0075 template <typename Mutex>
0076 template <typename Traits, typename... Signatures>
0077 void channel_service<Mutex>::move_construct(
0078 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0079 channel_service<Mutex>::implementation_type<
0080 Traits, Signatures...>& other_impl)
0081 {
0082 impl.max_buffer_size_ = other_impl.max_buffer_size_;
0083 impl.receive_state_ = other_impl.receive_state_;
0084 other_impl.receive_state_ = block;
0085 impl.send_state_ = other_impl.send_state_;
0086 other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
0087 impl.buffer_move_from(other_impl);
0088
0089
0090 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0091 impl.next_ = impl_list_;
0092 impl.prev_ = 0;
0093 if (impl_list_)
0094 impl_list_->prev_ = &impl;
0095 impl_list_ = &impl;
0096 }
0097
0098 template <typename Mutex>
0099 template <typename Traits, typename... Signatures>
0100 void channel_service<Mutex>::move_assign(
0101 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0102 channel_service& other_service,
0103 channel_service<Mutex>::implementation_type<
0104 Traits, Signatures...>& other_impl)
0105 {
0106 cancel(impl);
0107
0108 if (this != &other_service)
0109 {
0110
0111 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0112 if (impl_list_ == &impl)
0113 impl_list_ = impl.next_;
0114 if (impl.prev_)
0115 impl.prev_->next_ = impl.next_;
0116 if (impl.next_)
0117 impl.next_->prev_= impl.prev_;
0118 impl.next_ = 0;
0119 impl.prev_ = 0;
0120 }
0121
0122 impl.max_buffer_size_ = other_impl.max_buffer_size_;
0123 impl.receive_state_ = other_impl.receive_state_;
0124 other_impl.receive_state_ = block;
0125 impl.send_state_ = other_impl.send_state_;
0126 other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
0127 impl.buffer_move_from(other_impl);
0128
0129 if (this != &other_service)
0130 {
0131
0132 boost::asio::detail::mutex::scoped_lock lock(other_service.mutex_);
0133 impl.next_ = other_service.impl_list_;
0134 impl.prev_ = 0;
0135 if (other_service.impl_list_)
0136 other_service.impl_list_->prev_ = &impl;
0137 other_service.impl_list_ = &impl;
0138 }
0139 }
0140
0141 template <typename Mutex>
0142 inline void channel_service<Mutex>::base_destroy(
0143 channel_service<Mutex>::base_implementation_type& impl)
0144 {
0145
0146 boost::asio::detail::mutex::scoped_lock lock(mutex_);
0147 if (impl_list_ == &impl)
0148 impl_list_ = impl.next_;
0149 if (impl.prev_)
0150 impl.prev_->next_ = impl.next_;
0151 if (impl.next_)
0152 impl.next_->prev_= impl.prev_;
0153 impl.next_ = 0;
0154 impl.prev_ = 0;
0155 }
0156
0157 template <typename Mutex>
0158 inline std::size_t channel_service<Mutex>::capacity(
0159 const channel_service<Mutex>::base_implementation_type& impl)
0160 const noexcept
0161 {
0162 typename Mutex::scoped_lock lock(impl.mutex_);
0163
0164 return impl.max_buffer_size_;
0165 }
0166
0167 template <typename Mutex>
0168 inline bool channel_service<Mutex>::is_open(
0169 const channel_service<Mutex>::base_implementation_type& impl)
0170 const noexcept
0171 {
0172 typename Mutex::scoped_lock lock(impl.mutex_);
0173
0174 return impl.send_state_ != closed;
0175 }
0176
0177 template <typename Mutex>
0178 template <typename Traits, typename... Signatures>
0179 void channel_service<Mutex>::reset(
0180 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0181 {
0182 cancel(impl);
0183
0184 typename Mutex::scoped_lock lock(impl.mutex_);
0185
0186 impl.receive_state_ = block;
0187 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0188 impl.buffer_clear();
0189 }
0190
0191 template <typename Mutex>
0192 template <typename Traits, typename... Signatures>
0193 void channel_service<Mutex>::close(
0194 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0195 {
0196 typedef typename implementation_type<Traits,
0197 Signatures...>::traits_type traits_type;
0198 typedef typename implementation_type<Traits,
0199 Signatures...>::payload_type payload_type;
0200
0201 typename Mutex::scoped_lock lock(impl.mutex_);
0202
0203 if (impl.receive_state_ == block)
0204 {
0205 while (channel_operation* op = impl.waiters_.front())
0206 {
0207 impl.waiters_.pop();
0208 traits_type::invoke_receive_closed(
0209 post_receive<payload_type,
0210 typename traits_type::receive_closed_signature>(
0211 static_cast<channel_receive<payload_type>*>(op)));
0212 }
0213 }
0214
0215 impl.send_state_ = closed;
0216 if (impl.receive_state_ != buffer)
0217 impl.receive_state_ = closed;
0218 }
0219
0220 template <typename Mutex>
0221 template <typename Traits, typename... Signatures>
0222 void channel_service<Mutex>::cancel(
0223 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0224 {
0225 typedef typename implementation_type<Traits,
0226 Signatures...>::traits_type traits_type;
0227 typedef typename implementation_type<Traits,
0228 Signatures...>::payload_type payload_type;
0229
0230 typename Mutex::scoped_lock lock(impl.mutex_);
0231
0232 while (channel_operation* op = impl.waiters_.front())
0233 {
0234 if (impl.send_state_ == block)
0235 {
0236 impl.waiters_.pop();
0237 static_cast<channel_send<payload_type>*>(op)->cancel();
0238 }
0239 else
0240 {
0241 impl.waiters_.pop();
0242 traits_type::invoke_receive_cancelled(
0243 post_receive<payload_type,
0244 typename traits_type::receive_cancelled_signature>(
0245 static_cast<channel_receive<payload_type>*>(op)));
0246 }
0247 }
0248
0249 if (impl.receive_state_ == waiter)
0250 impl.receive_state_ = block;
0251 if (impl.send_state_ == waiter)
0252 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0253 }
0254
0255 template <typename Mutex>
0256 template <typename Traits, typename... Signatures>
0257 void channel_service<Mutex>::cancel_by_key(
0258 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0259 void* cancellation_key)
0260 {
0261 typedef typename implementation_type<Traits,
0262 Signatures...>::traits_type traits_type;
0263 typedef typename implementation_type<Traits,
0264 Signatures...>::payload_type payload_type;
0265
0266 typename Mutex::scoped_lock lock(impl.mutex_);
0267
0268 boost::asio::detail::op_queue<channel_operation> other_ops;
0269 while (channel_operation* op = impl.waiters_.front())
0270 {
0271 if (op->cancellation_key_ == cancellation_key)
0272 {
0273 if (impl.send_state_ == block)
0274 {
0275 impl.waiters_.pop();
0276 static_cast<channel_send<payload_type>*>(op)->cancel();
0277 }
0278 else
0279 {
0280 impl.waiters_.pop();
0281 traits_type::invoke_receive_cancelled(
0282 post_receive<payload_type,
0283 typename traits_type::receive_cancelled_signature>(
0284 static_cast<channel_receive<payload_type>*>(op)));
0285 }
0286 }
0287 else
0288 {
0289 impl.waiters_.pop();
0290 other_ops.push(op);
0291 }
0292 }
0293 impl.waiters_.push(other_ops);
0294
0295 if (impl.waiters_.empty())
0296 {
0297 if (impl.receive_state_ == waiter)
0298 impl.receive_state_ = block;
0299 if (impl.send_state_ == waiter)
0300 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0301 }
0302 }
0303
0304 template <typename Mutex>
0305 inline bool channel_service<Mutex>::ready(
0306 const channel_service<Mutex>::base_implementation_type& impl)
0307 const noexcept
0308 {
0309 typename Mutex::scoped_lock lock(impl.mutex_);
0310
0311 return impl.receive_state_ != block;
0312 }
0313
0314 template <typename Mutex>
0315 template <typename Message, typename Traits,
0316 typename... Signatures, typename... Args>
0317 bool channel_service<Mutex>::try_send(
0318 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0319 bool via_dispatch, Args&&... args)
0320 {
0321 typedef typename implementation_type<Traits,
0322 Signatures...>::payload_type payload_type;
0323
0324 typename Mutex::scoped_lock lock(impl.mutex_);
0325
0326 switch (impl.send_state_)
0327 {
0328 case block:
0329 {
0330 return false;
0331 }
0332 case buffer:
0333 {
0334 impl.buffer_push(Message(0, static_cast<Args&&>(args)...));
0335 impl.receive_state_ = buffer;
0336 if (impl.buffer_size() == impl.max_buffer_size_)
0337 impl.send_state_ = block;
0338 return true;
0339 }
0340 case waiter:
0341 {
0342 payload_type payload(Message(0, static_cast<Args&&>(args)...));
0343 channel_receive<payload_type>* receive_op =
0344 static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0345 impl.waiters_.pop();
0346 if (impl.waiters_.empty())
0347 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0348 lock.unlock();
0349 if (via_dispatch)
0350 receive_op->dispatch(static_cast<payload_type&&>(payload));
0351 else
0352 receive_op->post(static_cast<payload_type&&>(payload));
0353 return true;
0354 }
0355 case closed:
0356 default:
0357 {
0358 return false;
0359 }
0360 }
0361 }
0362
0363 template <typename Mutex>
0364 template <typename Message, typename Traits,
0365 typename... Signatures, typename... Args>
0366 std::size_t channel_service<Mutex>::try_send_n(
0367 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0368 std::size_t count, bool via_dispatch, Args&&... args)
0369 {
0370 typedef typename implementation_type<Traits,
0371 Signatures...>::payload_type payload_type;
0372
0373 typename Mutex::scoped_lock lock(impl.mutex_);
0374
0375 if (count == 0)
0376 return 0;
0377
0378 switch (impl.send_state_)
0379 {
0380 case block:
0381 return 0;
0382 case buffer:
0383 case waiter:
0384 break;
0385 case closed:
0386 default:
0387 return 0;
0388 }
0389
0390 payload_type payload(Message(0, static_cast<Args&&>(args)...));
0391
0392 for (std::size_t i = 0; i < count; ++i)
0393 {
0394 switch (impl.send_state_)
0395 {
0396 case block:
0397 {
0398 return i;
0399 }
0400 case buffer:
0401 {
0402 i += impl.buffer_push_n(count - i,
0403 static_cast<payload_type&&>(payload));
0404 impl.receive_state_ = buffer;
0405 if (impl.buffer_size() == impl.max_buffer_size_)
0406 impl.send_state_ = block;
0407 return i;
0408 }
0409 case waiter:
0410 {
0411 channel_receive<payload_type>* receive_op =
0412 static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0413 impl.waiters_.pop();
0414 if (impl.waiters_.empty())
0415 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0416 lock.unlock();
0417 if (via_dispatch)
0418 receive_op->dispatch(payload);
0419 else
0420 receive_op->post(payload);
0421 break;
0422 }
0423 case closed:
0424 default:
0425 {
0426 return i;
0427 }
0428 }
0429 }
0430
0431 return count;
0432 }
0433
0434 template <typename Mutex>
0435 template <typename Traits, typename... Signatures>
0436 void channel_service<Mutex>::start_send_op(
0437 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0438 channel_send<typename implementation_type<
0439 Traits, Signatures...>::payload_type>* send_op)
0440 {
0441 typedef typename implementation_type<Traits,
0442 Signatures...>::payload_type payload_type;
0443
0444 typename Mutex::scoped_lock lock(impl.mutex_);
0445
0446 switch (impl.send_state_)
0447 {
0448 case block:
0449 {
0450 impl.waiters_.push(send_op);
0451 if (impl.receive_state_ == block)
0452 impl.receive_state_ = waiter;
0453 return;
0454 }
0455 case buffer:
0456 {
0457 impl.buffer_push(send_op->get_payload());
0458 impl.receive_state_ = buffer;
0459 if (impl.buffer_size() == impl.max_buffer_size_)
0460 impl.send_state_ = block;
0461 send_op->immediate();
0462 break;
0463 }
0464 case waiter:
0465 {
0466 channel_receive<payload_type>* receive_op =
0467 static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0468 impl.waiters_.pop();
0469 if (impl.waiters_.empty())
0470 impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0471 receive_op->post(send_op->get_payload());
0472 send_op->immediate();
0473 break;
0474 }
0475 case closed:
0476 default:
0477 {
0478 send_op->close();
0479 break;
0480 }
0481 }
0482 }
0483
0484 template <typename Mutex>
0485 template <typename Traits, typename... Signatures, typename Handler>
0486 bool channel_service<Mutex>::try_receive(
0487 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0488 Handler&& handler)
0489 {
0490 typedef typename implementation_type<Traits,
0491 Signatures...>::payload_type payload_type;
0492
0493 typename Mutex::scoped_lock lock(impl.mutex_);
0494
0495 switch (impl.receive_state_)
0496 {
0497 case block:
0498 {
0499 return false;
0500 }
0501 case buffer:
0502 {
0503 payload_type payload(impl.buffer_front());
0504 if (channel_send<payload_type>* send_op =
0505 static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
0506 {
0507 impl.buffer_pop();
0508 impl.buffer_push(send_op->get_payload());
0509 impl.waiters_.pop();
0510 send_op->post();
0511 }
0512 else
0513 {
0514 impl.buffer_pop();
0515 if (impl.buffer_size() == 0)
0516 impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0517 impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
0518 }
0519 lock.unlock();
0520 boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
0521 boost::asio::detail::completion_payload_handler<
0522 payload_type, decay_t<Handler>>(
0523 static_cast<payload_type&&>(payload), handler2.value)();
0524 return true;
0525 }
0526 case waiter:
0527 {
0528 channel_send<payload_type>* send_op =
0529 static_cast<channel_send<payload_type>*>(impl.waiters_.front());
0530 payload_type payload = send_op->get_payload();
0531 impl.waiters_.pop();
0532 if (impl.waiters_.front() == 0)
0533 impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0534 send_op->post();
0535 lock.unlock();
0536 boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
0537 boost::asio::detail::completion_payload_handler<
0538 payload_type, decay_t<Handler>>(
0539 static_cast<payload_type&&>(payload), handler2.value)();
0540 return true;
0541 }
0542 case closed:
0543 default:
0544 {
0545 return false;
0546 }
0547 }
0548 }
0549
0550 template <typename Mutex>
0551 template <typename Traits, typename... Signatures>
0552 void channel_service<Mutex>::start_receive_op(
0553 channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0554 channel_receive<typename implementation_type<
0555 Traits, Signatures...>::payload_type>* receive_op)
0556 {
0557 typedef typename implementation_type<Traits,
0558 Signatures...>::traits_type traits_type;
0559 typedef typename implementation_type<Traits,
0560 Signatures...>::payload_type payload_type;
0561
0562 typename Mutex::scoped_lock lock(impl.mutex_);
0563
0564 switch (impl.receive_state_)
0565 {
0566 case block:
0567 {
0568 impl.waiters_.push(receive_op);
0569 if (impl.send_state_ != closed)
0570 impl.send_state_ = waiter;
0571 return;
0572 }
0573 case buffer:
0574 {
0575 payload_type payload(
0576 static_cast<payload_type&&>(impl.buffer_front()));
0577 if (channel_send<payload_type>* send_op =
0578 static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
0579 {
0580 impl.buffer_pop();
0581 impl.buffer_push(send_op->get_payload());
0582 impl.waiters_.pop();
0583 send_op->post();
0584 }
0585 else
0586 {
0587 impl.buffer_pop();
0588 if (impl.buffer_size() == 0)
0589 impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0590 impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
0591 }
0592 receive_op->immediate(static_cast<payload_type&&>(payload));
0593 break;
0594 }
0595 case waiter:
0596 {
0597 channel_send<payload_type>* send_op =
0598 static_cast<channel_send<payload_type>*>(impl.waiters_.front());
0599 payload_type payload = send_op->get_payload();
0600 impl.waiters_.pop();
0601 if (impl.waiters_.front() == 0)
0602 impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0603 send_op->post();
0604 receive_op->immediate(static_cast<payload_type&&>(payload));
0605 break;
0606 }
0607 case closed:
0608 default:
0609 {
0610 traits_type::invoke_receive_closed(
0611 post_receive<payload_type,
0612 typename traits_type::receive_closed_signature>(receive_op));
0613 break;
0614 }
0615 }
0616 }
0617
0618 }
0619 }
0620 }
0621 }
0622
0623 #include <boost/asio/detail/pop_options.hpp>
0624
0625 #endif