Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-02 08:06:38

0001 //
0002 // experimental/detail/impl/channel_service.hpp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
0012 #define BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/push_options.hpp>
0019 
0020 namespace boost {
0021 namespace asio {
0022 namespace experimental {
0023 namespace detail {
0024 
0025 template <typename Mutex>
0026 inline channel_service<Mutex>::channel_service(
0027     boost::asio::execution_context& ctx)
0028   : boost::asio::detail::execution_context_service_base<channel_service>(ctx),
0029     mutex_(),
0030     impl_list_(0)
0031 {
0032 }
0033 
0034 template <typename Mutex>
0035 inline void channel_service<Mutex>::shutdown()
0036 {
0037   // Abandon all pending operations.
0038   boost::asio::detail::op_queue<channel_operation> ops;
0039   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0040   base_implementation_type* impl = impl_list_;
0041   while (impl)
0042   {
0043     ops.push(impl->waiters_);
0044     impl = impl->next_;
0045   }
0046 }
0047 
0048 template <typename Mutex>
0049 inline void channel_service<Mutex>::construct(
0050     channel_service<Mutex>::base_implementation_type& impl,
0051     std::size_t max_buffer_size)
0052 {
0053   impl.max_buffer_size_ = max_buffer_size;
0054   impl.receive_state_ = block;
0055   impl.send_state_ = max_buffer_size ? buffer : block;
0056 
0057   // Insert implementation into linked list of all implementations.
0058   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0059   impl.next_ = impl_list_;
0060   impl.prev_ = 0;
0061   if (impl_list_)
0062     impl_list_->prev_ = &impl;
0063   impl_list_ = &impl;
0064 }
0065 
0066 template <typename Mutex>
0067 template <typename Traits, typename... Signatures>
0068 void channel_service<Mutex>::destroy(
0069     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0070 {
0071   cancel(impl);
0072   base_destroy(impl);
0073 }
0074 
0075 template <typename Mutex>
0076 template <typename Traits, typename... Signatures>
0077 void channel_service<Mutex>::move_construct(
0078     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0079     channel_service<Mutex>::implementation_type<
0080       Traits, Signatures...>& other_impl)
0081 {
0082   impl.max_buffer_size_ = other_impl.max_buffer_size_;
0083   impl.receive_state_ = other_impl.receive_state_;
0084   other_impl.receive_state_ = block;
0085   impl.send_state_ = other_impl.send_state_;
0086   other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
0087   impl.buffer_move_from(other_impl);
0088 
0089   // Insert implementation into linked list of all implementations.
0090   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0091   impl.next_ = impl_list_;
0092   impl.prev_ = 0;
0093   if (impl_list_)
0094     impl_list_->prev_ = &impl;
0095   impl_list_ = &impl;
0096 }
0097 
0098 template <typename Mutex>
0099 template <typename Traits, typename... Signatures>
0100 void channel_service<Mutex>::move_assign(
0101     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0102     channel_service& other_service,
0103     channel_service<Mutex>::implementation_type<
0104       Traits, Signatures...>& other_impl)
0105 {
0106   cancel(impl);
0107 
0108   if (this != &other_service)
0109   {
0110     // Remove implementation from linked list of all implementations.
0111     boost::asio::detail::mutex::scoped_lock lock(mutex_);
0112     if (impl_list_ == &impl)
0113       impl_list_ = impl.next_;
0114     if (impl.prev_)
0115       impl.prev_->next_ = impl.next_;
0116     if (impl.next_)
0117       impl.next_->prev_= impl.prev_;
0118     impl.next_ = 0;
0119     impl.prev_ = 0;
0120   }
0121 
0122   impl.max_buffer_size_ = other_impl.max_buffer_size_;
0123   impl.receive_state_ = other_impl.receive_state_;
0124   other_impl.receive_state_ = block;
0125   impl.send_state_ = other_impl.send_state_;
0126   other_impl.send_state_ = other_impl.max_buffer_size_ ? buffer : block;
0127   impl.buffer_move_from(other_impl);
0128 
0129   if (this != &other_service)
0130   {
0131     // Insert implementation into linked list of all implementations.
0132     boost::asio::detail::mutex::scoped_lock lock(other_service.mutex_);
0133     impl.next_ = other_service.impl_list_;
0134     impl.prev_ = 0;
0135     if (other_service.impl_list_)
0136       other_service.impl_list_->prev_ = &impl;
0137     other_service.impl_list_ = &impl;
0138   }
0139 }
0140 
0141 template <typename Mutex>
0142 inline void channel_service<Mutex>::base_destroy(
0143     channel_service<Mutex>::base_implementation_type& impl)
0144 {
0145   // Remove implementation from linked list of all implementations.
0146   boost::asio::detail::mutex::scoped_lock lock(mutex_);
0147   if (impl_list_ == &impl)
0148     impl_list_ = impl.next_;
0149   if (impl.prev_)
0150     impl.prev_->next_ = impl.next_;
0151   if (impl.next_)
0152     impl.next_->prev_= impl.prev_;
0153   impl.next_ = 0;
0154   impl.prev_ = 0;
0155 }
0156 
0157 template <typename Mutex>
0158 inline std::size_t channel_service<Mutex>::capacity(
0159     const channel_service<Mutex>::base_implementation_type& impl)
0160   const noexcept
0161 {
0162   typename Mutex::scoped_lock lock(impl.mutex_);
0163 
0164   return impl.max_buffer_size_;
0165 }
0166 
0167 template <typename Mutex>
0168 inline bool channel_service<Mutex>::is_open(
0169     const channel_service<Mutex>::base_implementation_type& impl)
0170   const noexcept
0171 {
0172   typename Mutex::scoped_lock lock(impl.mutex_);
0173 
0174   return impl.send_state_ != closed;
0175 }
0176 
0177 template <typename Mutex>
0178 template <typename Traits, typename... Signatures>
0179 void channel_service<Mutex>::reset(
0180     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0181 {
0182   cancel(impl);
0183 
0184   typename Mutex::scoped_lock lock(impl.mutex_);
0185 
0186   impl.receive_state_ = block;
0187   impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0188   impl.buffer_clear();
0189 }
0190 
0191 template <typename Mutex>
0192 template <typename Traits, typename... Signatures>
0193 void channel_service<Mutex>::close(
0194     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0195 {
0196   typedef typename implementation_type<Traits,
0197       Signatures...>::traits_type traits_type;
0198   typedef typename implementation_type<Traits,
0199       Signatures...>::payload_type payload_type;
0200 
0201   typename Mutex::scoped_lock lock(impl.mutex_);
0202 
0203   if (impl.receive_state_ == block)
0204   {
0205     while (channel_operation* op = impl.waiters_.front())
0206     {
0207       impl.waiters_.pop();
0208       traits_type::invoke_receive_closed(
0209           post_receive<payload_type,
0210             typename traits_type::receive_closed_signature>(
0211               static_cast<channel_receive<payload_type>*>(op)));
0212     }
0213   }
0214 
0215   impl.send_state_ = closed;
0216   if (impl.receive_state_ != buffer)
0217     impl.receive_state_ = closed;
0218 }
0219 
0220 template <typename Mutex>
0221 template <typename Traits, typename... Signatures>
0222 void channel_service<Mutex>::cancel(
0223     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl)
0224 {
0225   typedef typename implementation_type<Traits,
0226       Signatures...>::traits_type traits_type;
0227   typedef typename implementation_type<Traits,
0228       Signatures...>::payload_type payload_type;
0229 
0230   typename Mutex::scoped_lock lock(impl.mutex_);
0231 
0232   while (channel_operation* op = impl.waiters_.front())
0233   {
0234     if (impl.send_state_ == block)
0235     {
0236       impl.waiters_.pop();
0237       static_cast<channel_send<payload_type>*>(op)->cancel();
0238     }
0239     else
0240     {
0241       impl.waiters_.pop();
0242       traits_type::invoke_receive_cancelled(
0243           post_receive<payload_type,
0244             typename traits_type::receive_cancelled_signature>(
0245               static_cast<channel_receive<payload_type>*>(op)));
0246     }
0247   }
0248 
0249   if (impl.receive_state_ == waiter)
0250     impl.receive_state_ = block;
0251   if (impl.send_state_ == waiter)
0252     impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0253 }
0254 
0255 template <typename Mutex>
0256 template <typename Traits, typename... Signatures>
0257 void channel_service<Mutex>::cancel_by_key(
0258     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0259     void* cancellation_key)
0260 {
0261   typedef typename implementation_type<Traits,
0262       Signatures...>::traits_type traits_type;
0263   typedef typename implementation_type<Traits,
0264       Signatures...>::payload_type payload_type;
0265 
0266   typename Mutex::scoped_lock lock(impl.mutex_);
0267 
0268   boost::asio::detail::op_queue<channel_operation> other_ops;
0269   while (channel_operation* op = impl.waiters_.front())
0270   {
0271     if (op->cancellation_key_ == cancellation_key)
0272     {
0273       if (impl.send_state_ == block)
0274       {
0275         impl.waiters_.pop();
0276         static_cast<channel_send<payload_type>*>(op)->cancel();
0277       }
0278       else
0279       {
0280         impl.waiters_.pop();
0281         traits_type::invoke_receive_cancelled(
0282             post_receive<payload_type,
0283               typename traits_type::receive_cancelled_signature>(
0284                 static_cast<channel_receive<payload_type>*>(op)));
0285       }
0286     }
0287     else
0288     {
0289       impl.waiters_.pop();
0290       other_ops.push(op);
0291     }
0292   }
0293   impl.waiters_.push(other_ops);
0294 
0295   if (impl.waiters_.empty())
0296   {
0297     if (impl.receive_state_ == waiter)
0298       impl.receive_state_ = block;
0299     if (impl.send_state_ == waiter)
0300       impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0301   }
0302 }
0303 
0304 template <typename Mutex>
0305 inline bool channel_service<Mutex>::ready(
0306     const channel_service<Mutex>::base_implementation_type& impl)
0307   const noexcept
0308 {
0309   typename Mutex::scoped_lock lock(impl.mutex_);
0310 
0311   return impl.receive_state_ != block;
0312 }
0313 
0314 template <typename Mutex>
0315 template <typename Message, typename Traits,
0316     typename... Signatures, typename... Args>
0317 bool channel_service<Mutex>::try_send(
0318     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0319     bool via_dispatch, Args&&... args)
0320 {
0321   typedef typename implementation_type<Traits,
0322       Signatures...>::payload_type payload_type;
0323 
0324   typename Mutex::scoped_lock lock(impl.mutex_);
0325 
0326   switch (impl.send_state_)
0327   {
0328   case block:
0329     {
0330       return false;
0331     }
0332   case buffer:
0333     {
0334       impl.buffer_push(Message(0, static_cast<Args&&>(args)...));
0335       impl.receive_state_ = buffer;
0336       if (impl.buffer_size() == impl.max_buffer_size_)
0337         impl.send_state_ = block;
0338       return true;
0339     }
0340   case waiter:
0341     {
0342       payload_type payload(Message(0, static_cast<Args&&>(args)...));
0343       channel_receive<payload_type>* receive_op =
0344         static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0345       impl.waiters_.pop();
0346       if (impl.waiters_.empty())
0347         impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0348       lock.unlock();
0349       if (via_dispatch)
0350         receive_op->dispatch(static_cast<payload_type&&>(payload));
0351       else
0352         receive_op->post(static_cast<payload_type&&>(payload));
0353       return true;
0354     }
0355   case closed:
0356   default:
0357     {
0358       return false;
0359     }
0360   }
0361 }
0362 
0363 template <typename Mutex>
0364 template <typename Message, typename Traits,
0365     typename... Signatures, typename... Args>
0366 std::size_t channel_service<Mutex>::try_send_n(
0367     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0368         std::size_t count, bool via_dispatch, Args&&... args)
0369 {
0370   typedef typename implementation_type<Traits,
0371       Signatures...>::payload_type payload_type;
0372 
0373   typename Mutex::scoped_lock lock(impl.mutex_);
0374 
0375   if (count == 0)
0376     return 0;
0377 
0378   switch (impl.send_state_)
0379   {
0380   case block:
0381     return 0;
0382   case buffer:
0383   case waiter:
0384     break;
0385   case closed:
0386   default:
0387     return 0;
0388   }
0389 
0390   payload_type payload(Message(0, static_cast<Args&&>(args)...));
0391 
0392   for (std::size_t i = 0; i < count; ++i)
0393   {
0394     switch (impl.send_state_)
0395     {
0396     case block:
0397       {
0398         return i;
0399       }
0400     case buffer:
0401       {
0402         i += impl.buffer_push_n(count - i,
0403             static_cast<payload_type&&>(payload));
0404         impl.receive_state_ = buffer;
0405         if (impl.buffer_size() == impl.max_buffer_size_)
0406           impl.send_state_ = block;
0407         return i;
0408       }
0409     case waiter:
0410       {
0411         channel_receive<payload_type>* receive_op =
0412           static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0413         impl.waiters_.pop();
0414         if (impl.waiters_.empty())
0415           impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0416         lock.unlock();
0417         if (via_dispatch)
0418           receive_op->dispatch(payload);
0419         else
0420           receive_op->post(payload);
0421         break;
0422       }
0423     case closed:
0424     default:
0425       {
0426         return i;
0427       }
0428     }
0429   }
0430 
0431   return count;
0432 }
0433 
0434 template <typename Mutex>
0435 template <typename Traits, typename... Signatures>
0436 void channel_service<Mutex>::start_send_op(
0437     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0438         channel_send<typename implementation_type<
0439             Traits, Signatures...>::payload_type>* send_op)
0440 {
0441   typedef typename implementation_type<Traits,
0442       Signatures...>::payload_type payload_type;
0443 
0444   typename Mutex::scoped_lock lock(impl.mutex_);
0445 
0446   switch (impl.send_state_)
0447   {
0448   case block:
0449     {
0450       impl.waiters_.push(send_op);
0451       if (impl.receive_state_ == block)
0452         impl.receive_state_ = waiter;
0453       return;
0454     }
0455   case buffer:
0456     {
0457       impl.buffer_push(send_op->get_payload());
0458       impl.receive_state_ = buffer;
0459       if (impl.buffer_size() == impl.max_buffer_size_)
0460         impl.send_state_ = block;
0461       send_op->immediate();
0462       break;
0463     }
0464   case waiter:
0465     {
0466       channel_receive<payload_type>* receive_op =
0467         static_cast<channel_receive<payload_type>*>(impl.waiters_.front());
0468       impl.waiters_.pop();
0469       if (impl.waiters_.empty())
0470         impl.send_state_ = impl.max_buffer_size_ ? buffer : block;
0471       receive_op->post(send_op->get_payload());
0472       send_op->immediate();
0473       break;
0474     }
0475   case closed:
0476   default:
0477     {
0478       send_op->close();
0479       break;
0480     }
0481   }
0482 }
0483 
0484 template <typename Mutex>
0485 template <typename Traits, typename... Signatures, typename Handler>
0486 bool channel_service<Mutex>::try_receive(
0487     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0488         Handler&& handler)
0489 {
0490   typedef typename implementation_type<Traits,
0491       Signatures...>::payload_type payload_type;
0492 
0493   typename Mutex::scoped_lock lock(impl.mutex_);
0494 
0495   switch (impl.receive_state_)
0496   {
0497   case block:
0498     {
0499       return false;
0500     }
0501   case buffer:
0502     {
0503       payload_type payload(impl.buffer_front());
0504       if (channel_send<payload_type>* send_op =
0505           static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
0506       {
0507         impl.buffer_pop();
0508         impl.buffer_push(send_op->get_payload());
0509         impl.waiters_.pop();
0510         send_op->post();
0511       }
0512       else
0513       {
0514         impl.buffer_pop();
0515         if (impl.buffer_size() == 0)
0516           impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0517         impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
0518       }
0519       lock.unlock();
0520       boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
0521       boost::asio::detail::completion_payload_handler<
0522         payload_type, decay_t<Handler>>(
0523           static_cast<payload_type&&>(payload), handler2.value)();
0524       return true;
0525     }
0526   case waiter:
0527     {
0528       channel_send<payload_type>* send_op =
0529         static_cast<channel_send<payload_type>*>(impl.waiters_.front());
0530       payload_type payload = send_op->get_payload();
0531       impl.waiters_.pop();
0532       if (impl.waiters_.front() == 0)
0533         impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0534       send_op->post();
0535       lock.unlock();
0536       boost::asio::detail::non_const_lvalue<Handler> handler2(handler);
0537       boost::asio::detail::completion_payload_handler<
0538         payload_type, decay_t<Handler>>(
0539           static_cast<payload_type&&>(payload), handler2.value)();
0540       return true;
0541     }
0542   case closed:
0543   default:
0544     {
0545       return false;
0546     }
0547   }
0548 }
0549 
0550 template <typename Mutex>
0551 template <typename Traits, typename... Signatures>
0552 void channel_service<Mutex>::start_receive_op(
0553     channel_service<Mutex>::implementation_type<Traits, Signatures...>& impl,
0554         channel_receive<typename implementation_type<
0555             Traits, Signatures...>::payload_type>* receive_op)
0556 {
0557   typedef typename implementation_type<Traits,
0558       Signatures...>::traits_type traits_type;
0559   typedef typename implementation_type<Traits,
0560       Signatures...>::payload_type payload_type;
0561 
0562   typename Mutex::scoped_lock lock(impl.mutex_);
0563 
0564   switch (impl.receive_state_)
0565   {
0566   case block:
0567     {
0568       impl.waiters_.push(receive_op);
0569       if (impl.send_state_ != closed)
0570         impl.send_state_ = waiter;
0571       return;
0572     }
0573   case buffer:
0574     {
0575       payload_type payload(
0576           static_cast<payload_type&&>(impl.buffer_front()));
0577       if (channel_send<payload_type>* send_op =
0578           static_cast<channel_send<payload_type>*>(impl.waiters_.front()))
0579       {
0580         impl.buffer_pop();
0581         impl.buffer_push(send_op->get_payload());
0582         impl.waiters_.pop();
0583         send_op->post();
0584       }
0585       else
0586       {
0587         impl.buffer_pop();
0588         if (impl.buffer_size() == 0)
0589           impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0590         impl.send_state_ = (impl.send_state_ == closed) ? closed : buffer;
0591       }
0592       receive_op->immediate(static_cast<payload_type&&>(payload));
0593       break;
0594     }
0595   case waiter:
0596     {
0597       channel_send<payload_type>* send_op =
0598         static_cast<channel_send<payload_type>*>(impl.waiters_.front());
0599       payload_type payload = send_op->get_payload();
0600       impl.waiters_.pop();
0601       if (impl.waiters_.front() == 0)
0602         impl.receive_state_ = (impl.send_state_ == closed) ? closed : block;
0603       send_op->post();
0604       receive_op->immediate(static_cast<payload_type&&>(payload));
0605       break;
0606     }
0607   case closed:
0608   default:
0609     {
0610       traits_type::invoke_receive_closed(
0611           post_receive<payload_type,
0612             typename traits_type::receive_closed_signature>(receive_op));
0613       break;
0614     }
0615   }
0616 }
0617 
0618 } // namespace detail
0619 } // namespace experimental
0620 } // namespace asio
0621 } // namespace boost
0622 
0623 #include <boost/asio/detail/pop_options.hpp>
0624 
0625 #endif // BOOST_ASIO_EXPERIMENTAL_DETAIL_IMPL_CHANNEL_SERVICE_HPP