Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-07-01 08:08:58

0001 //
0002 // experimental/impl/parallel_group.hpp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2024 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
0012 #define BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/config.hpp>
0019 #include <atomic>
0020 #include <deque>
0021 #include <memory>
0022 #include <new>
0023 #include <tuple>
0024 #include <boost/asio/associated_cancellation_slot.hpp>
0025 #include <boost/asio/detail/recycling_allocator.hpp>
0026 #include <boost/asio/detail/type_traits.hpp>
0027 #include <boost/asio/dispatch.hpp>
0028 
0029 #include <boost/asio/detail/push_options.hpp>
0030 
0031 namespace boost {
0032 namespace asio {
0033 namespace experimental {
0034 namespace detail {
0035 
0036 // Stores the result from an individual asynchronous operation.
0037 template <typename T, typename = void>
0038 struct parallel_group_op_result
0039 {
0040 public:
0041   parallel_group_op_result()
0042     : has_value_(false)
0043   {
0044   }
0045 
0046   parallel_group_op_result(parallel_group_op_result&& other)
0047     : has_value_(other.has_value_)
0048   {
0049     if (has_value_)
0050       new (&u_.value_) T(std::move(other.get()));
0051   }
0052 
0053   ~parallel_group_op_result()
0054   {
0055     if (has_value_)
0056       u_.value_.~T();
0057   }
0058 
0059   T& get() noexcept
0060   {
0061     return u_.value_;
0062   }
0063 
0064   template <typename... Args>
0065   void emplace(Args&&... args)
0066   {
0067     new (&u_.value_) T(std::forward<Args>(args)...);
0068     has_value_ = true;
0069   }
0070 
0071 private:
0072   union u
0073   {
0074     u() {}
0075     ~u() {}
0076     char c_;
0077     T value_;
0078   } u_;
0079   bool has_value_;
0080 };
0081 
0082 // Proxy completion handler for the group of parallel operatations. Unpacks and
0083 // concatenates the individual operations' results, and invokes the user's
0084 // completion handler.
0085 template <typename Handler, typename... Ops>
0086 struct parallel_group_completion_handler
0087 {
0088   typedef decay_t<
0089       prefer_result_t<
0090         associated_executor_t<Handler>,
0091         execution::outstanding_work_t::tracked_t
0092       >
0093     > executor_type;
0094 
0095   parallel_group_completion_handler(Handler&& h)
0096     : handler_(std::move(h)),
0097       executor_(
0098           boost::asio::prefer(
0099             boost::asio::get_associated_executor(handler_),
0100             execution::outstanding_work.tracked))
0101   {
0102   }
0103 
0104   executor_type get_executor() const noexcept
0105   {
0106     return executor_;
0107   }
0108 
0109   void operator()()
0110   {
0111     this->invoke(boost::asio::detail::make_index_sequence<sizeof...(Ops)>());
0112   }
0113 
0114   template <std::size_t... I>
0115   void invoke(boost::asio::detail::index_sequence<I...>)
0116   {
0117     this->invoke(std::tuple_cat(std::move(std::get<I>(args_).get())...));
0118   }
0119 
0120   template <typename... Args>
0121   void invoke(std::tuple<Args...>&& args)
0122   {
0123     this->invoke(std::move(args),
0124         boost::asio::detail::index_sequence_for<Args...>());
0125   }
0126 
0127   template <typename... Args, std::size_t... I>
0128   void invoke(std::tuple<Args...>&& args,
0129       boost::asio::detail::index_sequence<I...>)
0130   {
0131     std::move(handler_)(completion_order_, std::move(std::get<I>(args))...);
0132   }
0133 
0134   Handler handler_;
0135   executor_type executor_;
0136   std::array<std::size_t, sizeof...(Ops)> completion_order_{};
0137   std::tuple<
0138       parallel_group_op_result<
0139         typename parallel_op_signature_as_tuple<
0140           completion_signature_of_t<Ops>
0141         >::type
0142       >...
0143     > args_{};
0144 };
0145 
0146 // Shared state for the parallel group.
0147 template <typename Condition, typename Handler, typename... Ops>
0148 struct parallel_group_state
0149 {
0150   parallel_group_state(Condition&& c, Handler&& h)
0151     : cancellation_condition_(std::move(c)),
0152       handler_(std::move(h))
0153   {
0154   }
0155 
0156   // The number of operations that have completed so far. Used to determine the
0157   // order of completion.
0158   std::atomic<unsigned int> completed_{0};
0159 
0160   // The non-none cancellation type that resulted from a cancellation condition.
0161   // Stored here for use by the group's initiating function.
0162   std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
0163 
0164   // The number of cancellations that have been requested, either on completion
0165   // of the operations within the group, or via the cancellation slot for the
0166   // group operation. Initially set to the number of operations to prevent
0167   // cancellation signals from being emitted until after all of the group's
0168   // operations' initiating functions have completed.
0169   std::atomic<unsigned int> cancellations_requested_{sizeof...(Ops)};
0170 
0171   // The number of operations that are yet to complete. Used to determine when
0172   // it is safe to invoke the user's completion handler.
0173   std::atomic<unsigned int> outstanding_{sizeof...(Ops)};
0174 
0175   // The cancellation signals for each operation in the group.
0176   boost::asio::cancellation_signal cancellation_signals_[sizeof...(Ops)];
0177 
0178   // The cancellation condition is used to determine whether the results from an
0179   // individual operation warrant a cancellation request for the whole group.
0180   Condition cancellation_condition_;
0181 
0182   // The proxy handler to be invoked once all operations in the group complete.
0183   parallel_group_completion_handler<Handler, Ops...> handler_;
0184 };
0185 
0186 // Handler for an individual operation within the parallel group.
0187 template <std::size_t I, typename Condition, typename Handler, typename... Ops>
0188 struct parallel_group_op_handler
0189 {
0190   typedef boost::asio::cancellation_slot cancellation_slot_type;
0191 
0192   parallel_group_op_handler(
0193     std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
0194     : state_(std::move(state))
0195   {
0196   }
0197 
0198   cancellation_slot_type get_cancellation_slot() const noexcept
0199   {
0200     return state_->cancellation_signals_[I].slot();
0201   }
0202 
0203   template <typename... Args>
0204   void operator()(Args... args)
0205   {
0206     // Capture this operation into the completion order.
0207     state_->handler_.completion_order_[state_->completed_++] = I;
0208 
0209     // Determine whether the results of this operation require cancellation of
0210     // the whole group.
0211     cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
0212 
0213     // Capture the result of the operation into the proxy completion handler.
0214     std::get<I>(state_->handler_.args_).emplace(std::move(args)...);
0215 
0216     if (cancel_type != cancellation_type::none)
0217     {
0218       // Save the type for potential use by the group's initiating function.
0219       state_->cancel_type_ = cancel_type;
0220 
0221       // If we are the first operation to request cancellation, emit a signal
0222       // for each operation in the group.
0223       if (state_->cancellations_requested_++ == 0)
0224         for (std::size_t i = 0; i < sizeof...(Ops); ++i)
0225           if (i != I)
0226             state_->cancellation_signals_[i].emit(cancel_type);
0227     }
0228 
0229     // If this is the last outstanding operation, invoke the user's handler.
0230     if (--state_->outstanding_ == 0)
0231       boost::asio::dispatch(std::move(state_->handler_));
0232   }
0233 
0234   std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
0235 };
0236 
0237 // Handler for an individual operation within the parallel group that has an
0238 // explicitly specified executor.
0239 template <typename Executor, std::size_t I,
0240     typename Condition, typename Handler, typename... Ops>
0241 struct parallel_group_op_handler_with_executor :
0242   parallel_group_op_handler<I, Condition, Handler, Ops...>
0243 {
0244   typedef parallel_group_op_handler<I, Condition, Handler, Ops...> base_type;
0245   typedef boost::asio::cancellation_slot cancellation_slot_type;
0246   typedef Executor executor_type;
0247 
0248   parallel_group_op_handler_with_executor(
0249       std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state,
0250       executor_type ex)
0251     : parallel_group_op_handler<I, Condition, Handler, Ops...>(std::move(state))
0252   {
0253     cancel_proxy_ =
0254       &this->state_->cancellation_signals_[I].slot().template
0255         emplace<cancel_proxy>(this->state_, std::move(ex));
0256   }
0257 
0258   cancellation_slot_type get_cancellation_slot() const noexcept
0259   {
0260     return cancel_proxy_->signal_.slot();
0261   }
0262 
0263   executor_type get_executor() const noexcept
0264   {
0265     return cancel_proxy_->executor_;
0266   }
0267 
0268   // Proxy handler that forwards the emitted signal to the correct executor.
0269   struct cancel_proxy
0270   {
0271     cancel_proxy(
0272         std::shared_ptr<parallel_group_state<
0273           Condition, Handler, Ops...>> state,
0274         executor_type ex)
0275       : state_(std::move(state)),
0276         executor_(std::move(ex))
0277     {
0278     }
0279 
0280     void operator()(cancellation_type_t type)
0281     {
0282       if (auto state = state_.lock())
0283       {
0284         boost::asio::cancellation_signal* sig = &signal_;
0285         boost::asio::dispatch(executor_,
0286             [state, sig, type]{ sig->emit(type); });
0287       }
0288     }
0289 
0290     std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
0291     boost::asio::cancellation_signal signal_;
0292     executor_type executor_;
0293   };
0294 
0295   cancel_proxy* cancel_proxy_;
0296 };
0297 
0298 // Helper to launch an operation using the correct executor, if any.
0299 template <std::size_t I, typename Op, typename = void>
0300 struct parallel_group_op_launcher
0301 {
0302   template <typename Condition, typename Handler, typename... Ops>
0303   static void launch(Op& op,
0304     const std::shared_ptr<parallel_group_state<
0305       Condition, Handler, Ops...>>& state)
0306   {
0307     typedef associated_executor_t<Op> ex_type;
0308     ex_type ex = boost::asio::get_associated_executor(op);
0309     std::move(op)(
0310         parallel_group_op_handler_with_executor<ex_type, I,
0311           Condition, Handler, Ops...>(state, std::move(ex)));
0312   }
0313 };
0314 
0315 // Specialised launcher for operations that specify no executor.
0316 template <std::size_t I, typename Op>
0317 struct parallel_group_op_launcher<I, Op,
0318     enable_if_t<
0319       is_same<
0320         typename associated_executor<
0321           Op>::asio_associated_executor_is_unspecialised,
0322         void
0323       >::value
0324     >>
0325 {
0326   template <typename Condition, typename Handler, typename... Ops>
0327   static void launch(Op& op,
0328     const std::shared_ptr<parallel_group_state<
0329       Condition, Handler, Ops...>>& state)
0330   {
0331     std::move(op)(
0332         parallel_group_op_handler<I, Condition, Handler, Ops...>(state));
0333   }
0334 };
0335 
0336 template <typename Condition, typename Handler, typename... Ops>
0337 struct parallel_group_cancellation_handler
0338 {
0339   parallel_group_cancellation_handler(
0340     std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
0341     : state_(std::move(state))
0342   {
0343   }
0344 
0345   void operator()(cancellation_type_t cancel_type)
0346   {
0347     // If we are the first place to request cancellation, i.e. no operation has
0348     // yet completed and requested cancellation, emit a signal for each
0349     // operation in the group.
0350     if (cancel_type != cancellation_type::none)
0351       if (auto state = state_.lock())
0352         if (state->cancellations_requested_++ == 0)
0353           for (std::size_t i = 0; i < sizeof...(Ops); ++i)
0354             state->cancellation_signals_[i].emit(cancel_type);
0355   }
0356 
0357   std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
0358 };
0359 
0360 template <typename Condition, typename Handler,
0361     typename... Ops, std::size_t... I>
0362 void parallel_group_launch(Condition cancellation_condition, Handler handler,
0363     std::tuple<Ops...>& ops, boost::asio::detail::index_sequence<I...>)
0364 {
0365   // Get the user's completion handler's cancellation slot, so that we can allow
0366   // cancellation of the entire group.
0367   associated_cancellation_slot_t<Handler> slot
0368     = boost::asio::get_associated_cancellation_slot(handler);
0369 
0370   // Create the shared state for the operation.
0371   typedef parallel_group_state<Condition, Handler, Ops...> state_type;
0372   std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
0373       boost::asio::detail::recycling_allocator<state_type,
0374         boost::asio::detail::thread_info_base::parallel_group_tag>(),
0375       std::move(cancellation_condition), std::move(handler));
0376 
0377   // Initiate each individual operation in the group.
0378   int fold[] = { 0,
0379     ( parallel_group_op_launcher<I, Ops>::launch(std::get<I>(ops), state),
0380       0 )...
0381   };
0382   (void)fold;
0383 
0384   // Check if any of the operations has already requested cancellation, and if
0385   // so, emit a signal for each operation in the group.
0386   if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0)
0387     for (auto& signal : state->cancellation_signals_)
0388       signal.emit(state->cancel_type_);
0389 
0390   // Register a handler with the user's completion handler's cancellation slot.
0391   if (slot.is_connected())
0392     slot.template emplace<
0393       parallel_group_cancellation_handler<
0394         Condition, Handler, Ops...>>(state);
0395 }
0396 
0397 // Proxy completion handler for the ranged group of parallel operatations.
0398 // Unpacks and recombines the individual operations' results, and invokes the
0399 // user's completion handler.
0400 template <typename Handler, typename Op, typename Allocator>
0401 struct ranged_parallel_group_completion_handler
0402 {
0403   typedef decay_t<
0404       prefer_result_t<
0405         associated_executor_t<Handler>,
0406         execution::outstanding_work_t::tracked_t
0407       >
0408     > executor_type;
0409 
0410   typedef typename parallel_op_signature_as_tuple<
0411       completion_signature_of_t<Op>
0412     >::type op_tuple_type;
0413 
0414   typedef parallel_group_op_result<op_tuple_type> op_result_type;
0415 
0416   ranged_parallel_group_completion_handler(Handler&& h,
0417       std::size_t size, const Allocator& allocator)
0418     : handler_(std::move(h)),
0419       executor_(
0420           boost::asio::prefer(
0421             boost::asio::get_associated_executor(handler_),
0422             execution::outstanding_work.tracked)),
0423       allocator_(allocator),
0424       completion_order_(size, 0,
0425           BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)(allocator)),
0426       args_(BOOST_ASIO_REBIND_ALLOC(Allocator, op_result_type)(allocator))
0427   {
0428     for (std::size_t i = 0; i < size; ++i)
0429       args_.emplace_back();
0430   }
0431 
0432   executor_type get_executor() const noexcept
0433   {
0434     return executor_;
0435   }
0436 
0437   void operator()()
0438   {
0439     this->invoke(
0440         boost::asio::detail::make_index_sequence<
0441           std::tuple_size<op_tuple_type>::value>());
0442   }
0443 
0444   template <std::size_t... I>
0445   void invoke(boost::asio::detail::index_sequence<I...>)
0446   {
0447     typedef typename parallel_op_signature_as_tuple<
0448         typename ranged_parallel_group_signature<
0449           completion_signature_of_t<Op>,
0450           Allocator
0451         >::raw_type
0452       >::type vectors_type;
0453 
0454     // Construct all result vectors using the supplied allocator.
0455     vectors_type vectors{
0456         typename std::tuple_element<I, vectors_type>::type(
0457           BOOST_ASIO_REBIND_ALLOC(Allocator, int)(allocator_))...};
0458 
0459     // Reserve sufficient space in each of the result vectors.
0460     int reserve_fold[] = { 0,
0461       ( std::get<I>(vectors).reserve(completion_order_.size()),
0462         0 )...
0463     };
0464     (void)reserve_fold;
0465 
0466     // Copy the results from all operations into the result vectors.
0467     for (std::size_t idx = 0; idx < completion_order_.size(); ++idx)
0468     {
0469       int pushback_fold[] = { 0,
0470         ( std::get<I>(vectors).push_back(
0471             std::move(std::get<I>(args_[idx].get()))),
0472           0 )...
0473       };
0474       (void)pushback_fold;
0475     }
0476 
0477     std::move(handler_)(std::move(completion_order_),
0478         std::move(std::get<I>(vectors))...);
0479   }
0480 
0481   Handler handler_;
0482   executor_type executor_;
0483   Allocator allocator_;
0484   std::vector<std::size_t,
0485     BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)> completion_order_;
0486   std::deque<op_result_type,
0487     BOOST_ASIO_REBIND_ALLOC(Allocator, op_result_type)> args_;
0488 };
0489 
0490 // Shared state for the parallel group.
0491 template <typename Condition, typename Handler, typename Op, typename Allocator>
0492 struct ranged_parallel_group_state
0493 {
0494   ranged_parallel_group_state(Condition&& c, Handler&& h,
0495       std::size_t size, const Allocator& allocator)
0496     : cancellations_requested_(size),
0497       outstanding_(size),
0498       cancellation_signals_(
0499           BOOST_ASIO_REBIND_ALLOC(Allocator,
0500             boost::asio::cancellation_signal)(allocator)),
0501       cancellation_condition_(std::move(c)),
0502       handler_(std::move(h), size, allocator)
0503   {
0504     for (std::size_t i = 0; i < size; ++i)
0505       cancellation_signals_.emplace_back();
0506   }
0507 
0508   // The number of operations that have completed so far. Used to determine the
0509   // order of completion.
0510   std::atomic<unsigned int> completed_{0};
0511 
0512   // The non-none cancellation type that resulted from a cancellation condition.
0513   // Stored here for use by the group's initiating function.
0514   std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
0515 
0516   // The number of cancellations that have been requested, either on completion
0517   // of the operations within the group, or via the cancellation slot for the
0518   // group operation. Initially set to the number of operations to prevent
0519   // cancellation signals from being emitted until after all of the group's
0520   // operations' initiating functions have completed.
0521   std::atomic<unsigned int> cancellations_requested_;
0522 
0523   // The number of operations that are yet to complete. Used to determine when
0524   // it is safe to invoke the user's completion handler.
0525   std::atomic<unsigned int> outstanding_;
0526 
0527   // The cancellation signals for each operation in the group.
0528   std::deque<boost::asio::cancellation_signal,
0529     BOOST_ASIO_REBIND_ALLOC(Allocator, boost::asio::cancellation_signal)>
0530       cancellation_signals_;
0531 
0532   // The cancellation condition is used to determine whether the results from an
0533   // individual operation warrant a cancellation request for the whole group.
0534   Condition cancellation_condition_;
0535 
0536   // The proxy handler to be invoked once all operations in the group complete.
0537   ranged_parallel_group_completion_handler<Handler, Op, Allocator> handler_;
0538 };
0539 
0540 // Handler for an individual operation within the parallel group.
0541 template <typename Condition, typename Handler, typename Op, typename Allocator>
0542 struct ranged_parallel_group_op_handler
0543 {
0544   typedef boost::asio::cancellation_slot cancellation_slot_type;
0545 
0546   ranged_parallel_group_op_handler(
0547       std::shared_ptr<ranged_parallel_group_state<
0548         Condition, Handler, Op, Allocator>> state,
0549       std::size_t idx)
0550     : state_(std::move(state)),
0551       idx_(idx)
0552   {
0553   }
0554 
0555   cancellation_slot_type get_cancellation_slot() const noexcept
0556   {
0557     return state_->cancellation_signals_[idx_].slot();
0558   }
0559 
0560   template <typename... Args>
0561   void operator()(Args... args)
0562   {
0563     // Capture this operation into the completion order.
0564     state_->handler_.completion_order_[state_->completed_++] = idx_;
0565 
0566     // Determine whether the results of this operation require cancellation of
0567     // the whole group.
0568     cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
0569 
0570     // Capture the result of the operation into the proxy completion handler.
0571     state_->handler_.args_[idx_].emplace(std::move(args)...);
0572 
0573     if (cancel_type != cancellation_type::none)
0574     {
0575       // Save the type for potential use by the group's initiating function.
0576       state_->cancel_type_ = cancel_type;
0577 
0578       // If we are the first operation to request cancellation, emit a signal
0579       // for each operation in the group.
0580       if (state_->cancellations_requested_++ == 0)
0581         for (std::size_t i = 0; i < state_->cancellation_signals_.size(); ++i)
0582           if (i != idx_)
0583             state_->cancellation_signals_[i].emit(cancel_type);
0584     }
0585 
0586     // If this is the last outstanding operation, invoke the user's handler.
0587     if (--state_->outstanding_ == 0)
0588       boost::asio::dispatch(std::move(state_->handler_));
0589   }
0590 
0591   std::shared_ptr<ranged_parallel_group_state<
0592     Condition, Handler, Op, Allocator>> state_;
0593   std::size_t idx_;
0594 };
0595 
0596 // Handler for an individual operation within the parallel group that has an
0597 // explicitly specified executor.
0598 template <typename Executor, typename Condition,
0599     typename Handler, typename Op, typename Allocator>
0600 struct ranged_parallel_group_op_handler_with_executor :
0601   ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>
0602 {
0603   typedef ranged_parallel_group_op_handler<
0604     Condition, Handler, Op, Allocator> base_type;
0605   typedef boost::asio::cancellation_slot cancellation_slot_type;
0606   typedef Executor executor_type;
0607 
0608   ranged_parallel_group_op_handler_with_executor(
0609       std::shared_ptr<ranged_parallel_group_state<
0610         Condition, Handler, Op, Allocator>> state,
0611       executor_type ex, std::size_t idx)
0612     : ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>(
0613         std::move(state), idx)
0614   {
0615     cancel_proxy_ =
0616       &this->state_->cancellation_signals_[idx].slot().template
0617         emplace<cancel_proxy>(this->state_, std::move(ex));
0618   }
0619 
0620   cancellation_slot_type get_cancellation_slot() const noexcept
0621   {
0622     return cancel_proxy_->signal_.slot();
0623   }
0624 
0625   executor_type get_executor() const noexcept
0626   {
0627     return cancel_proxy_->executor_;
0628   }
0629 
0630   // Proxy handler that forwards the emitted signal to the correct executor.
0631   struct cancel_proxy
0632   {
0633     cancel_proxy(
0634         std::shared_ptr<ranged_parallel_group_state<
0635           Condition, Handler, Op, Allocator>> state,
0636         executor_type ex)
0637       : state_(std::move(state)),
0638         executor_(std::move(ex))
0639     {
0640     }
0641 
0642     void operator()(cancellation_type_t type)
0643     {
0644       if (auto state = state_.lock())
0645       {
0646         boost::asio::cancellation_signal* sig = &signal_;
0647         boost::asio::dispatch(executor_,
0648             [state, sig, type]{ sig->emit(type); });
0649       }
0650     }
0651 
0652     std::weak_ptr<ranged_parallel_group_state<
0653       Condition, Handler, Op, Allocator>> state_;
0654     boost::asio::cancellation_signal signal_;
0655     executor_type executor_;
0656   };
0657 
0658   cancel_proxy* cancel_proxy_;
0659 };
0660 
0661 template <typename Condition, typename Handler, typename Op, typename Allocator>
0662 struct ranged_parallel_group_cancellation_handler
0663 {
0664   ranged_parallel_group_cancellation_handler(
0665       std::shared_ptr<ranged_parallel_group_state<
0666         Condition, Handler, Op, Allocator>> state)
0667     : state_(std::move(state))
0668   {
0669   }
0670 
0671   void operator()(cancellation_type_t cancel_type)
0672   {
0673     // If we are the first place to request cancellation, i.e. no operation has
0674     // yet completed and requested cancellation, emit a signal for each
0675     // operation in the group.
0676     if (cancel_type != cancellation_type::none)
0677       if (auto state = state_.lock())
0678         if (state->cancellations_requested_++ == 0)
0679           for (std::size_t i = 0; i < state->cancellation_signals_.size(); ++i)
0680             state->cancellation_signals_[i].emit(cancel_type);
0681   }
0682 
0683   std::weak_ptr<ranged_parallel_group_state<
0684     Condition, Handler, Op, Allocator>> state_;
0685 };
0686 
0687 template <typename Condition, typename Handler,
0688     typename Range, typename Allocator>
0689 void ranged_parallel_group_launch(Condition cancellation_condition,
0690     Handler handler, Range&& range, const Allocator& allocator)
0691 {
0692   // Get the user's completion handler's cancellation slot, so that we can allow
0693   // cancellation of the entire group.
0694   associated_cancellation_slot_t<Handler> slot
0695     = boost::asio::get_associated_cancellation_slot(handler);
0696 
0697   // The type of the asynchronous operation.
0698   typedef decay_t<decltype(*declval<typename Range::iterator>())> op_type;
0699 
0700   // Create the shared state for the operation.
0701   typedef ranged_parallel_group_state<Condition,
0702     Handler, op_type, Allocator> state_type;
0703   std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
0704       boost::asio::detail::recycling_allocator<state_type,
0705         boost::asio::detail::thread_info_base::parallel_group_tag>(),
0706       std::move(cancellation_condition),
0707       std::move(handler), range.size(), allocator);
0708 
0709   std::size_t idx = 0;
0710   for (auto&& op : std::forward<Range>(range))
0711   {
0712     typedef associated_executor_t<op_type> ex_type;
0713     ex_type ex = boost::asio::get_associated_executor(op);
0714     std::move(op)(
0715         ranged_parallel_group_op_handler_with_executor<
0716           ex_type, Condition, Handler, op_type, Allocator>(
0717             state, std::move(ex), idx++));
0718   }
0719 
0720   // Check if any of the operations has already requested cancellation, and if
0721   // so, emit a signal for each operation in the group.
0722   if ((state->cancellations_requested_ -= range.size()) > 0)
0723     for (auto& signal : state->cancellation_signals_)
0724       signal.emit(state->cancel_type_);
0725 
0726   // Register a handler with the user's completion handler's cancellation slot.
0727   if (slot.is_connected())
0728     slot.template emplace<
0729       ranged_parallel_group_cancellation_handler<
0730         Condition, Handler, op_type, Allocator>>(state);
0731 }
0732 
0733 } // namespace detail
0734 } // namespace experimental
0735 
0736 template <template <typename, typename> class Associator,
0737     typename Handler, typename... Ops, typename DefaultCandidate>
0738 struct associator<Associator,
0739     experimental::detail::parallel_group_completion_handler<Handler, Ops...>,
0740     DefaultCandidate>
0741   : Associator<Handler, DefaultCandidate>
0742 {
0743   static typename Associator<Handler, DefaultCandidate>::type get(
0744       const experimental::detail::parallel_group_completion_handler<
0745         Handler, Ops...>& h) noexcept
0746   {
0747     return Associator<Handler, DefaultCandidate>::get(h.handler_);
0748   }
0749 
0750   static auto get(
0751       const experimental::detail::parallel_group_completion_handler<
0752         Handler, Ops...>& h,
0753       const DefaultCandidate& c) noexcept
0754     -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
0755   {
0756     return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
0757   }
0758 };
0759 
0760 template <template <typename, typename> class Associator, typename Handler,
0761     typename Op, typename Allocator, typename DefaultCandidate>
0762 struct associator<Associator,
0763     experimental::detail::ranged_parallel_group_completion_handler<
0764       Handler, Op, Allocator>,
0765     DefaultCandidate>
0766   : Associator<Handler, DefaultCandidate>
0767 {
0768   static typename Associator<Handler, DefaultCandidate>::type get(
0769       const experimental::detail::ranged_parallel_group_completion_handler<
0770         Handler, Op, Allocator>& h) noexcept
0771   {
0772     return Associator<Handler, DefaultCandidate>::get(h.handler_);
0773   }
0774 
0775   static auto get(
0776       const experimental::detail::ranged_parallel_group_completion_handler<
0777         Handler, Op, Allocator>& h,
0778       const DefaultCandidate& c) noexcept
0779     -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
0780   {
0781     return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
0782   }
0783 };
0784 
0785 } // namespace asio
0786 } // namespace boost
0787 
0788 #include <boost/asio/detail/pop_options.hpp>
0789 
0790 #endif // BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP