Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:28:50

0001 //
0002 // experimental/detail/impl/channel_service.hpp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
0012 #define BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/push_options.hpp>
0019 
0020 namespace boost {
0021 namespace asio {
0022 namespace experimental {
0023 namespace detail {
0024 
0025 template <typename Mutex>
0026 inline channel_service<Mutex>::channel_service(execution_context& ctx)
0027   : boost::asio::detail::execution_context_service_base<channel_service>(ctx),
0028     mutex_(),
0029     impl_list_(0)
0030 {
0031 }
0032 
0033 template <typename Mutex>
0034 inline void channel_service<Mutex>::shutdown()
0035 {
0036   // Abandon all pending operations.
0037   boost::asio::detail::op_queue<channel_operation> ops;
0038   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0039   base_implementation_type* impl = impl_list_;
0040   while (impl)
0041   {
0042     ops.push(impl->waiters_);
0043     impl = impl->next_;
0044   }
0045 }
0046 
0047 template <typename Mutex>
0048 inline void channel_service<Mutex>::construct(
0049     channel_service<Mutex>::base_implementation_type& impl,
0050     std::size_t max_buffer_size)
0051 {
0052   impl.max_buffer_size_ = max_buffer_size;
0053   impl.receive_state_ = block;
0054   impl.send_state_ = max_buffer_size ? buffer : block;
0055 
0056   // Insert implementation into linked list of all implementations.
0057   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0058   impl.next_ = impl_list_;
0059   impl.prev_ = 0;
0060   if (impl_list_)
0061     impl_list_->prev_ = &impl;
0062   impl_list_ = &impl;
0063 }
0064 
0065 template <typename Mutex>
0066 template <typename Traits, typename... Signatures>
0067 void channel_service<Mutex>::destroy(
0068     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0069 {
0070   cancel(impl);
0071   base_destroy(impl);
0072 }
0073 
0074 template <typename Mutex>
0075 template <typename Traits, typename... Signatures>
0076 void channel_service<Mutex>::move_construct(
0077     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0078     channel_service<Mutex>::implementation_type<
0079       Traits, Signatures...>& other_impl)
0080 {
0081   impl.max_buffer_size_ = other_impl.max_buffer_size_;
0082   impl.receive_state_ = other_impl.receive_state_;
0083   other_impl.receive_state_ = block;
0084   impl.send_state_ = other_impl.send_state_;
0085   other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
0086   impl.buffer_move_from(other_impl);
0087 
0088   // Insert implementation into linked list of all implementations.
0089   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0090   impl.next_ = impl_list_;
0091   impl.prev_ = 0;
0092   if (impl_list_)
0093     impl_list_->prev_ = &impl;
0094   impl_list_ = &impl;
0095 }
0096 
0097 template <typename Mutex>
0098 template <typename Traits, typename... Signatures>
0099 void channel_service<Mutex>::move_assign(
0100     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0101     channel_service& other_service,
0102     channel_service<Mutex>::implementation_type<
0103       Traits, Signatures...>& other_impl)
0104 {
0105   cancel(impl);
0106 
0107   if (this != &other_service)
0108   {
0109     // Remove implementation from linked list of all implementations.
0110     boost::asio::detail::mutex::scoped_lock lock(mutex_);
0111     if (impl_list_ == &impl)
0112       impl_list_ = impl.next_;
0113     if (impl.prev_)
0114       impl.prev_->next_ = impl.next_;
0115     if (impl.next_)
0116       impl.next_->prev_= impl.prev_;
0117     impl.next_ = 0;
0118     impl.prev_ = 0;
0119   }
0120 
0121   impl.max_buffer_size_ = other_impl.max_buffer_size_;
0122   impl.receive_state_ = other_impl.receive_state_;
0123   other_impl.receive_state_ = block;
0124   impl.send_state_ = other_impl.send_state_;
0125   other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
0126   impl.buffer_move_from(other_impl);
0127 
0128   if (this != &other_service)
0129   {
0130     // Insert implementation into linked list of all implementations.
0131     boost::asio::detail::mutex::scoped_lock lock(other_service.mutex_);
0132     impl.next_ = other_service.impl_list_;
0133     impl.prev_ = 0;
0134     if (other_service.impl_list_)
0135       other_service.impl_list_->prev_ = &impl;
0136     other_service.impl_list_ = &impl;
0137   }
0138 }
0139 
0140 template <typename Mutex>
0141 inline void channel_service<Mutex>::base_destroy(
0142     channel_service<Mutex>::base_implementation_type& impl)
0143 {
0144   // Remove implementation from linked list of all implementations.
0145   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0146   if (impl_list_ == &impl)
0147     impl_list_ = impl.next_;
0148   if (impl.prev_)
0149     impl.prev_->next_ = impl.next_;
0150   if (impl.next_)
0151     impl.next_->prev_= impl.prev_;
0152   impl.next_ = 0;
0153   impl.prev_ = 0;
0154 }
0155 
0156 template <typename Mutex>
0157 inline std::size_t channel_service<Mutex>::capacity(
0158     const channel_service<Mutex>::base_implementation_type& impl)
0159   const noexcept
0160 {
0161   typename Mutex::scoped_lock lock(impl.mutex_);
0162 
0163   return impl.max_buffer_size_;
0164 }
0165 
0166 template <typename Mutex>
0167 inline bool channel_service<Mutex>::is_open(
0168     const channel_service<Mutex>::base_implementation_type& impl)
0169   const noexcept
0170 {
0171   typename Mutex::scoped_lock lock(impl.mutex_);
0172 
0173   return impl.send_state_ != closed;
0174 }
0175 
0176 template <typename Mutex>
0177 template <typename Traits, typename... Signatures>
0178 void channel_service<Mutex>::reset(
0179     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0180 {
0181   cancel(impl);
0182 
0183   typename Mutex::scoped_lock lock(impl.mutex_);
0184 
0185   impl.receive_state_ = block;
0186   impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0187   impl.buffer_clear();
0188 }
0189 
0190 template <typename Mutex>
0191 template <typename Traits, typename... Signatures>
0192 void channel_service<Mutex>::close(
0193     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0194 {
0195   typedef typename implementation_type<Traits,
0196       Signatures...>::traits_type traits_type;
0197   typedef typename implementation_type<Traits,
0198       Signatures...>::payload_type payload_type;
0199 
0200   typename Mutex::scoped_lock lock(impl.mutex_);
0201 
0202   if (impl.receive_state_ == block)
0203   {
0204     while (channel_operation* op = impl.waiters_.front())
0205     {
0206       impl.waiters_.pop();
0207       traits_type::invoke_receive_closed(
0208           post_receive<payload_type,
0209             typename traits_type::receive_closed_signature>(
0210               static_cast<channel_receive<payload_type>*>(op)));
0211     }
0212   }
0213 
0214   impl.send_state_ = closed;
0215   if (impl.receive_state_ != buffer)
0216     impl.receive_state_ = closed;
0217 }
0218 
0219 template <typename Mutex>
0220 template <typename Traits, typename... Signatures>
0221 void channel_service<Mutex>::cancel(
0222     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0223 {
0224   typedef typename implementation_type<Traits,
0225       Signatures...>::traits_type traits_type;
0226   typedef typename implementation_type<Traits,
0227       Signatures...>::payload_type payload_type;
0228 
0229   typename Mutex::scoped_lock lock(impl.mutex_);
0230 
0231   while (channel_operation* op = impl.waiters_.front())
0232   {
0233     if (impl.send_state_ == block)
0234     {
0235       impl.waiters_.pop();
0236       static_cast<channel_send<payload_type>*>(op)->cancel();
0237     }
0238     else
0239     {
0240       impl.waiters_.pop();
0241       traits_type::invoke_receive_cancelled(
0242           post_receive<payload_type,
0243             typename traits_type::receive_cancelled_signature>(
0244               static_cast<channel_receive<payload_type>*>(op)));
0245     }
0246   }
0247 
0248   if (impl.receive_state_ == waiter)
0249     impl.receive_state_ = block;
0250   if (impl.send_state_ == waiter)
0251     impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0252 }
0253 
0254 template <typename Mutex>
0255 template <typename Traits, typename... Signatures>
0256 void channel_service<Mutex>::cancel_by_key(
0257     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0258     void* cancellation_key)
0259 {
0260   typedef typename implementation_type<Traits,
0261       Signatures...>::traits_type traits_type;
0262   typedef typename implementation_type<Traits,
0263       Signatures...>::payload_type payload_type;
0264 
0265   typename Mutex::scoped_lock lock(impl.mutex_);
0266 
0267   boost::asio::detail::op_queue<channel_operation> other_ops;
0268   while (channel_operation* op = impl.waiters_.front())
0269   {
0270     if (op->cancellation_key_ == cancellation_key)
0271     {
0272       if (impl.send_state_ == block)
0273       {
0274         impl.waiters_.pop();
0275         static_cast<channel_send<payload_type>*>(op)->cancel();
0276       }
0277       else
0278       {
0279         impl.waiters_.pop();
0280         traits_type::invoke_receive_cancelled(
0281             post_receive<payload_type,
0282               typename traits_type::receive_cancelled_signature>(
0283                 static_cast<channel_receive<payload_type>*>(op)));
0284       }
0285     }
0286     else
0287     {
0288       impl.waiters_.pop();
0289       other_ops.push(op);
0290     }
0291   }
0292   impl.waiters_.push(other_ops);
0293 
0294   if (impl.waiters_.empty())
0295   {
0296     if (impl.receive_state_ == waiter)
0297       impl.receive_state_ = block;
0298     if (impl.send_state_ == waiter)
0299       impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0300   }
0301 }
0302 
0303 template <typename Mutex>
0304 inline bool channel_service<Mutex>::ready(
0305     const channel_service<Mutex>::base_implementation_type& impl)
0306   const noexcept
0307 {
0308   typename Mutex::scoped_lock lock(impl.mutex_);
0309 
0310   return impl.receive_state_ != block;
0311 }
0312 
0313 template <typename Mutex>
0314 template <typename Message, typename Traits,
0315     typename... Signatures, typename... Args>
0316 bool channel_service<Mutex>::try_send(
0317     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0318     bool via_dispatch, Args&&... args)
0319 {
0320   typedef typename implementation_type<Traits,
0321       Signatures...>::payload_type payload_type;
0322 
0323   typename Mutex::scoped_lock lock(impl.mutex_);
0324 
0325   switch (impl.send_state_)
0326   {
0327   case block:
0328     {
0329       return false;
0330     }
0331   case buffer:
0332     {
0333       impl.buffer_push(Message(0, static_cast<Args&&>(args)...));
0334       impl.receive_state_ = buffer;
0335       if (impl.buffer_size() == impl.max_buffer_size_)
0336         impl.send_state_ = block;
0337       return true;
0338     }
0339   case waiter:
0340     {
0341       payload_type payload(Message(0, static_cast<Args&&>(args)...));
0342       channel_receive<payload_type>* receive_op =
0343         static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0344       impl.waiters_.pop();
0345       if (impl.waiters_.empty())
0346         impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0347       lock.unlock();
0348       if (via_dispatch)
0349         receive_op->dispatch(static_cast<payload_type&&>(payload));
0350       else
0351         receive_op->post(static_cast<payload_type&&>(payload));
0352       return true;
0353     }
0354   case closed:
0355   default:
0356     {
0357       return false;
0358     }
0359   }
0360 }
0361 
0362 template <typename Mutex>
0363 template <typename Message, typename Traits,
0364     typename... Signatures, typename... Args>
0365 std::size_t channel_service<Mutex>::try_send_n(
0366     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0367         std::size_t count, bool via_dispatch, Args&&... args)
0368 {
0369   typedef typename implementation_type<Traits,
0370       Signatures...>::payload_type payload_type;
0371 
0372   typename Mutex::scoped_lock lock(impl.mutex_);
0373 
0374   if (count == 0)
0375     return 0;
0376 
0377   switch (impl.send_state_)
0378   {
0379   case block:
0380     return 0;
0381   case buffer:
0382   case waiter:
0383     break;
0384   case closed:
0385   default:
0386     return 0;
0387   }
0388 
0389   payload_type payload(Message(0, static_cast<Args&&>(args)...));
0390 
0391   for (std::size_t i = 0; i < count; ++i)
0392   {
0393     switch (impl.send_state_)
0394     {
0395     case block:
0396       {
0397         return i;
0398       }
0399     case buffer:
0400       {
0401         i += impl.buffer_push_n(count - i,
0402             static_cast<payload_type&&>(payload));
0403         impl.receive_state_ = buffer;
0404         if (impl.buffer_size() == impl.max_buffer_size_)
0405           impl.send_state_ = block;
0406         return i;
0407       }
0408     case waiter:
0409       {
0410         channel_receive<payload_type>* receive_op =
0411           static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0412         impl.waiters_.pop();
0413         if (impl.waiters_.empty())
0414           impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0415         lock.unlock();
0416         if (via_dispatch)
0417           receive_op->dispatch(payload);
0418         else
0419           receive_op->post(payload);
0420         break;
0421       }
0422     case closed:
0423     default:
0424       {
0425         return i;
0426       }
0427     }
0428   }
0429 
0430   return count;
0431 }
0432 
0433 template <typename Mutex>
0434 template <typename Traits, typename... Signatures>
0435 void channel_service<Mutex>::start_send_op(
0436     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0437         channel_send<typename implementation_type<
0438             Traits, Signatures...>::payload_type>* send_op)
0439 {
0440   typedef typename implementation_type<Traits,
0441       Signatures...>::payload_type payload_type;
0442 
0443   typename Mutex::scoped_lock lock(impl.mutex_);
0444 
0445   switch (impl.send_state_)
0446   {
0447   case block:
0448     {
0449       impl.waiters_.push(send_op);
0450       if (impl.receive_state_ == block)
0451         impl.receive_state_ = waiter;
0452       return;
0453     }
0454   case buffer:
0455     {
0456       impl.buffer_push(send_op->get_payload());
0457       impl.receive_state_ = buffer;
0458       if (impl.buffer_size() == impl.max_buffer_size_)
0459         impl.send_state_ = block;
0460       send_op->immediate();
0461       break;
0462     }
0463   case waiter:
0464     {
0465       channel_receive<payload_type>* receive_op =
0466         static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0467       impl.waiters_.pop();
0468       if (impl.waiters_.empty())
0469         impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0470       receive_op->post(send_op->get_payload());
0471       send_op->immediate();
0472       break;
0473     }
0474   case closed:
0475   default:
0476     {
0477       send_op->close();
0478       break;
0479     }
0480   }
0481 }
0482 
0483 template <typename Mutex>
0484 template <typename Traits, typename... Signatures, typename Handler>
0485 bool channel_service<Mutex>::try_receive(
0486     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0487         Handler&& handler)
0488 {
0489   typedef typename implementation_type<Traits,
0490       Signatures...>::payload_type payload_type;
0491 
0492   typename Mutex::scoped_lock lock(impl.mutex_);
0493 
0494   switch (impl.receive_state_)
0495   {
0496   case block:
0497     {
0498       return false;
0499     }
0500   case buffer:
0501     {
0502       payload_type payload(impl.buffer_front());
0503       if (channel_send<payload_type>* send_op =
0504           static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
0505       {
0506         impl.buffer_pop();
0507         impl.buffer_push(send_op->get_payload());
0508         impl.waiters_.pop();
0509         send_op->post();
0510       }
0511       else
0512       {
0513         impl.buffer_pop();
0514         if (impl.buffer_size() == 0)
0515           impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0516         impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
0517       }
0518       lock.unlock();
0519       boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
0520       channel_handler<payload_type, decay_t<Handler>>(
0521           static_cast<payload_type&&>(payload), handler2.value)();
0522       return true;
0523     }
0524   case waiter:
0525     {
0526       channel_send<payload_type>* send_op =
0527         static_cast<channel_send<payload_type>*>(impl.waiters_.front());
0528       payload_type payload = send_op->get_payload();
0529       impl.waiters_.pop();
0530       if (impl.waiters_.front() == 0)
0531         impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0532       send_op->post();
0533       lock.unlock();
0534       boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
0535       channel_handler<payload_type, decay_t<Handler>>(
0536           static_cast<payload_type&&>(payload), handler2.value)();
0537       return true;
0538     }
0539   case closed:
0540   default:
0541     {
0542       return false;
0543     }
0544   }
0545 }
0546 
0547 template <typename Mutex>
0548 template <typename Traits, typename... Signatures>
0549 void channel_service<Mutex>::start_receive_op(
0550     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0551         channel_receive<typename implementation_type<
0552             Traits, Signatures...>::payload_type>* receive_op)
0553 {
0554   typedef typename implementation_type<Traits,
0555       Signatures...>::traits_type traits_type;
0556   typedef typename implementation_type<Traits,
0557       Signatures...>::payload_type payload_type;
0558 
0559   typename Mutex::scoped_lock lock(impl.mutex_);
0560 
0561   switch (impl.receive_state_)
0562   {
0563   case block:
0564     {
0565       impl.waiters_.push(receive_op);
0566       if (impl.send_state_ != closed)
0567         impl.send_state_ = waiter;
0568       return;
0569     }
0570   case buffer:
0571     {
0572       payload_type payload(
0573           static_cast<payload_type&&>(impl.buffer_front()));
0574       if (channel_send<payload_type>* send_op =
0575           static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
0576       {
0577         impl.buffer_pop();
0578         impl.buffer_push(send_op->get_payload());
0579         impl.waiters_.pop();
0580         send_op->post();
0581       }
0582       else
0583       {
0584         impl.buffer_pop();
0585         if (impl.buffer_size() == 0)
0586           impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0587         impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
0588       }
0589       receive_op->immediate(static_cast<payload_type&&>(payload));
0590       break;
0591     }
0592   case waiter:
0593     {
0594       channel_send<payload_type>* send_op =
0595         static_cast<channel_send<payload_type>*>(impl.waiters_.front());
0596       payload_type payload = send_op->get_payload();
0597       impl.waiters_.pop();
0598       if (impl.waiters_.front() == 0)
0599         impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0600       send_op->post();
0601       receive_op->immediate(static_cast<payload_type&&>(payload));
0602       break;
0603     }
0604   case closed:
0605   default:
0606     {
0607       traits_type::invoke_receive_closed(
0608           post_receive<payload_type,
0609             typename traits_type::receive_closed_signature>(receive_op));
0610       break;
0611     }
0612   }
0613 }
0614 
0615 } // namespace detail
0616 } // namespace experimental
0617 } // namespace asio
0618 } // namespace boost
0619 
0620 #include <boost/asio/detail/pop_options.hpp>
0621 
0622 #endif // BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP