Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 09:28:52

0001 //
0002 // experimental/impl/parallel_group.hpp
0003 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
0004 //
0005 // Copyright (c) 2003-2023 Christopher M. Kohlhoff (chris at kohlhoff dot com)
0006 //
0007 // Distributed under the Boost Software License, Version 1.0. (See accompanying
0008 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0009 //
0010 
0011 #ifndef BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
0012 #define BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP
0013 
0014 #if defined(_MSC_VER) && (_MSC_VER >= 1200)
0015 # pragma once
0016 #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
0017 
0018 #include <boost/asio/detail/config.hpp>
0019 #include <atomic>
0020 #include <deque>
0021 #include <memory>
0022 #include <new>
0023 #include <tuple>
0024 #include <boost/asio/associated_cancellation_slot.hpp>
0025 #include <boost/asio/detail/recycling_allocator.hpp>
0026 #include <boost/asio/detail/type_traits.hpp>
0027 #include <boost/asio/dispatch.hpp>
0028 
0029 #include <boost/asio/detail/push_options.hpp>
0030 
0031 namespace boost {
0032 namespace asio {
0033 namespace experimental {
0034 namespace detail {
0035 
0036 // Stores the result from an individual asynchronous operation.
0037 template <typename T, typename = void>
0038 struct parallel_group_op_result
0039 {
0040 public:
0041   parallel_group_op_result()
0042     : has_value_(false)
0043   {
0044   }
0045 
0046   parallel_group_op_result(parallel_group_op_result&& other)
0047     : has_value_(other.has_value_)
0048   {
0049     if (has_value_)
0050       new (&u_.value_) T(std::move(other.get()));
0051   }
0052 
0053   ~parallel_group_op_result()
0054   {
0055     if (has_value_)
0056       u_.value_.~T();
0057   }
0058 
0059   T& get() noexcept
0060   {
0061     return u_.value_;
0062   }
0063 
0064   template <typename... Args>
0065   void emplace(Args&&... args)
0066   {
0067     new (&u_.value_) T(std::forward<Args>(args)...);
0068     has_value_ = true;
0069   }
0070 
0071 private:
0072   union u
0073   {
0074     u() {}
0075     ~u() {}
0076     char c_;
0077     T value_;
0078   } u_;
0079   bool has_value_;
0080 };
0081 
0082 // Proxy completion handler for the group of parallel operatations. Unpacks and
0083 // concatenates the individual operations' results, and invokes the user's
0084 // completion handler.
0085 template <typename Handler, typename... Ops>
0086 struct parallel_group_completion_handler
0087 {
0088   typedef decay_t<
0089       prefer_result_t<
0090         associated_executor_t<Handler>,
0091         execution::outstanding_work_t::tracked_t
0092       >
0093     > executor_type;
0094 
0095   parallel_group_completion_handler(Handler&& h)
0096     : handler_(std::move(h)),
0097       executor_(
0098           boost::asio::prefer(
0099             boost::asio::get_associated_executor(handler_),
0100             execution::outstanding_work.tracked))
0101   {
0102   }
0103 
0104   executor_type get_executor() const noexcept
0105   {
0106     return executor_;
0107   }
0108 
0109   void operator()()
0110   {
0111     this->invoke(boost::asio::detail::make_index_sequence<sizeof...(Ops)>());
0112   }
0113 
0114   template <std::size_t... I>
0115   void invoke(boost::asio::detail::index_sequence<I...>)
0116   {
0117     this->invoke(std::tuple_cat(std::move(std::get<I>(args_).get())...));
0118   }
0119 
0120   template <typename... Args>
0121   void invoke(std::tuple<Args...>&& args)
0122   {
0123     this->invoke(std::move(args),
0124         boost::asio::detail::index_sequence_for<Args...>());
0125   }
0126 
0127   template <typename... Args, std::size_t... I>
0128   void invoke(std::tuple<Args...>&& args,
0129       boost::asio::detail::index_sequence<I...>)
0130   {
0131     std::move(handler_)(completion_order_, std::move(std::get<I>(args))...);
0132   }
0133 
0134   Handler handler_;
0135   executor_type executor_;
0136   std::array<std::size_t, sizeof...(Ops)> completion_order_{};
0137   std::tuple<
0138       parallel_group_op_result<
0139         typename parallel_op_signature_as_tuple<
0140           completion_signature_of_t<Ops>
0141         >::type
0142       >...
0143     > args_{};
0144 };
0145 
0146 // Shared state for the parallel group.
0147 template <typename Condition, typename Handler, typename... Ops>
0148 struct parallel_group_state
0149 {
0150   parallel_group_state(Condition&& c, Handler&& h)
0151     : cancellation_condition_(std::move(c)),
0152       handler_(std::move(h))
0153   {
0154   }
0155 
0156   // The number of operations that have completed so far. Used to determine the
0157   // order of completion.
0158   std::atomic<unsigned int> completed_{0};
0159 
0160   // The non-none cancellation type that resulted from a cancellation condition.
0161   // Stored here for use by the group's initiating function.
0162   std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
0163 
0164   // The number of cancellations that have been requested, either on completion
0165   // of the operations within the group, or via the cancellation slot for the
0166   // group operation. Initially set to the number of operations to prevent
0167   // cancellation signals from being emitted until after all of the group's
0168   // operations' initiating functions have completed.
0169   std::atomic<unsigned int> cancellations_requested_{sizeof...(Ops)};
0170 
0171   // The number of operations that are yet to complete. Used to determine when
0172   // it is safe to invoke the user's completion handler.
0173   std::atomic<unsigned int> outstanding_{sizeof...(Ops)};
0174 
0175   // The cancellation signals for each operation in the group.
0176   boost::asio::cancellation_signal cancellation_signals_[sizeof...(Ops)];
0177 
0178   // The cancellation condition is used to determine whether the results from an
0179   // individual operation warrant a cancellation request for the whole group.
0180   Condition cancellation_condition_;
0181 
0182   // The proxy handler to be invoked once all operations in the group complete.
0183   parallel_group_completion_handler<Handler, Ops...> handler_;
0184 };
0185 
0186 // Handler for an individual operation within the parallel group.
0187 template <std::size_t I, typename Condition, typename Handler, typename... Ops>
0188 struct parallel_group_op_handler
0189 {
0190   typedef boost::asio::cancellation_slot cancellation_slot_type;
0191 
0192   parallel_group_op_handler(
0193     std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
0194     : state_(std::move(state))
0195   {
0196   }
0197 
0198   cancellation_slot_type get_cancellation_slot() const noexcept
0199   {
0200     return state_->cancellation_signals_[I].slot();
0201   }
0202 
0203   template <typename... Args>
0204   void operator()(Args... args)
0205   {
0206     // Capture this operation into the completion order.
0207     state_->handler_.completion_order_[state_->completed_++] = I;
0208 
0209     // Determine whether the results of this operation require cancellation of
0210     // the whole group.
0211     cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
0212 
0213     // Capture the result of the operation into the proxy completion handler.
0214     std::get<I>(state_->handler_.args_).emplace(std::move(args)...);
0215 
0216     if (cancel_type != cancellation_type::none)
0217     {
0218       // Save the type for potential use by the group's initiating function.
0219       state_->cancel_type_ = cancel_type;
0220 
0221       // If we are the first operation to request cancellation, emit a signal
0222       // for each operation in the group.
0223       if (state_->cancellations_requested_++ == 0)
0224         for (std::size_t i = 0; i < sizeof...(Ops); ++i)
0225           if (i != I)
0226             state_->cancellation_signals_[i].emit(cancel_type);
0227     }
0228 
0229     // If this is the last outstanding operation, invoke the user's handler.
0230     if (--state_->outstanding_ == 0)
0231       boost::asio::dispatch(std::move(state_->handler_));
0232   }
0233 
0234   std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
0235 };
0236 
0237 // Handler for an individual operation within the parallel group that has an
0238 // explicitly specified executor.
0239 template <typename Executor, std::size_t I,
0240     typename Condition, typename Handler, typename... Ops>
0241 struct parallel_group_op_handler_with_executor :
0242   parallel_group_op_handler<I, Condition, Handler, Ops...>
0243 {
0244   typedef parallel_group_op_handler<I, Condition, Handler, Ops...> base_type;
0245   typedef boost::asio::cancellation_slot cancellation_slot_type;
0246   typedef Executor executor_type;
0247 
0248   parallel_group_op_handler_with_executor(
0249       std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state,
0250       executor_type ex)
0251     : parallel_group_op_handler<I, Condition, Handler, Ops...>(std::move(state))
0252   {
0253     cancel_proxy_ =
0254       &this->state_->cancellation_signals_[I].slot().template
0255         emplace<cancel_proxy>(this->state_, std::move(ex));
0256   }
0257 
0258   cancellation_slot_type get_cancellation_slot() const noexcept
0259   {
0260     return cancel_proxy_->signal_.slot();
0261   }
0262 
0263   executor_type get_executor() const noexcept
0264   {
0265     return cancel_proxy_->executor_;
0266   }
0267 
0268   // Proxy handler that forwards the emitted signal to the correct executor.
0269   struct cancel_proxy
0270   {
0271     cancel_proxy(
0272         std::shared_ptr<parallel_group_state<
0273           Condition, Handler, Ops...>> state,
0274         executor_type ex)
0275       : state_(std::move(state)),
0276         executor_(std::move(ex))
0277     {
0278     }
0279 
0280     void operator()(cancellation_type_t type)
0281     {
0282       if (auto state = state_.lock())
0283       {
0284         boost::asio::cancellation_signal* sig = &signal_;
0285         boost::asio::dispatch(executor_,
0286             [state, sig, type]{ sig->emit(type); });
0287       }
0288     }
0289 
0290     std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
0291     boost::asio::cancellation_signal signal_;
0292     executor_type executor_;
0293   };
0294 
0295   cancel_proxy* cancel_proxy_;
0296 };
0297 
0298 // Helper to launch an operation using the correct executor, if any.
0299 template <std::size_t I, typename Op, typename = void>
0300 struct parallel_group_op_launcher
0301 {
0302   template <typename Condition, typename Handler, typename... Ops>
0303   static void launch(Op& op,
0304     const std::shared_ptr<parallel_group_state<
0305       Condition, Handler, Ops...>>& state)
0306   {
0307     typedef associated_executor_t<Op> ex_type;
0308     ex_type ex = boost::asio::get_associated_executor(op);
0309     std::move(op)(
0310         parallel_group_op_handler_with_executor<ex_type, I,
0311           Condition, Handler, Ops...>(state, std::move(ex)));
0312   }
0313 };
0314 
0315 // Specialised launcher for operations that specify no executor.
0316 template <std::size_t I, typename Op>
0317 struct parallel_group_op_launcher<I, Op,
0318     enable_if_t<
0319       is_same<
0320         typename associated_executor<
0321           Op>::asio_associated_executor_is_unspecialised,
0322         void
0323       >::value
0324     >>
0325 {
0326   template <typename Condition, typename Handler, typename... Ops>
0327   static void launch(Op& op,
0328     const std::shared_ptr<parallel_group_state<
0329       Condition, Handler, Ops...>>& state)
0330   {
0331     std::move(op)(
0332         parallel_group_op_handler<I, Condition, Handler, Ops...>(state));
0333   }
0334 };
0335 
0336 template <typename Condition, typename Handler, typename... Ops>
0337 struct parallel_group_cancellation_handler
0338 {
0339   parallel_group_cancellation_handler(
0340     std::shared_ptr<parallel_group_state<Condition, Handler, Ops...>> state)
0341     : state_(std::move(state))
0342   {
0343   }
0344 
0345   void operator()(cancellation_type_t cancel_type)
0346   {
0347     // If we are the first place to request cancellation, i.e. no operation has
0348     // yet completed and requested cancellation, emit a signal for each
0349     // operation in the group.
0350     if (cancel_type != cancellation_type::none)
0351       if (auto state = state_.lock())
0352         if (state->cancellations_requested_++ == 0)
0353           for (std::size_t i = 0; i < sizeof...(Ops); ++i)
0354             state->cancellation_signals_[i].emit(cancel_type);
0355   }
0356 
0357   std::weak_ptr<parallel_group_state<Condition, Handler, Ops...>> state_;
0358 };
0359 
0360 template <typename Condition, typename Handler,
0361     typename... Ops, std::size_t... I>
0362 void parallel_group_launch(Condition cancellation_condition, Handler handler,
0363     std::tuple<Ops...>& ops, boost::asio::detail::index_sequence<I...>)
0364 {
0365   // Get the user's completion handler's cancellation slot, so that we can allow
0366   // cancellation of the entire group.
0367   associated_cancellation_slot_t<Handler> slot
0368     = boost::asio::get_associated_cancellation_slot(handler);
0369 
0370   // Create the shared state for the operation.
0371   typedef parallel_group_state<Condition, Handler, Ops...> state_type;
0372   std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
0373       boost::asio::detail::recycling_allocator<state_type,
0374         boost::asio::detail::thread_info_base::parallel_group_tag>(),
0375       std::move(cancellation_condition), std::move(handler));
0376 
0377   // Initiate each individual operation in the group.
0378   int fold[] = { 0,
0379     ( parallel_group_op_launcher<I, Ops>::launch(std::get<I>(ops), state),
0380       0 )...
0381   };
0382   (void)fold;
0383 
0384   // Check if any of the operations has already requested cancellation, and if
0385   // so, emit a signal for each operation in the group.
0386   if ((state->cancellations_requested_ -= sizeof...(Ops)) > 0)
0387     for (auto& signal : state->cancellation_signals_)
0388       signal.emit(state->cancel_type_);
0389 
0390   // Register a handler with the user's completion handler's cancellation slot.
0391   if (slot.is_connected())
0392     slot.template emplace<
0393       parallel_group_cancellation_handler<
0394         Condition, Handler, Ops...>>(state);
0395 }
0396 
0397 // Proxy completion handler for the ranged group of parallel operatations.
0398 // Unpacks and recombines the individual operations' results, and invokes the
0399 // user's completion handler.
0400 template <typename Handler, typename Op, typename Allocator>
0401 struct ranged_parallel_group_completion_handler
0402 {
0403   typedef decay_t<
0404       prefer_result_t<
0405         associated_executor_t<Handler>,
0406         execution::outstanding_work_t::tracked_t
0407       >
0408     > executor_type;
0409 
0410   typedef typename parallel_op_signature_as_tuple<
0411       completion_signature_of_t<Op>
0412     >::type op_tuple_type;
0413 
0414   typedef parallel_group_op_result<op_tuple_type> op_result_type;
0415 
0416   ranged_parallel_group_completion_handler(Handler&& h,
0417       std::size_t size, const Allocator& allocator)
0418     : handler_(std::move(h)),
0419       executor_(
0420           boost::asio::prefer(
0421             boost::asio::get_associated_executor(handler_),
0422             execution::outstanding_work.tracked)),
0423       allocator_(allocator),
0424       completion_order_(size, 0,
0425           BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)(allocator)),
0426       args_(BOOST_ASIO_REBIND_ALLOC(Allocator, op_result_type)(allocator))
0427   {
0428     for (std::size_t i = 0; i < size; ++i)
0429       args_.emplace_back();
0430   }
0431 
0432   executor_type get_executor() const noexcept
0433   {
0434     return executor_;
0435   }
0436 
0437   void operator()()
0438   {
0439     this->invoke(
0440         boost::asio::detail::make_index_sequence<
0441           std::tuple_size<op_tuple_type>::value>());
0442   }
0443 
0444   template <std::size_t... I>
0445   void invoke(boost::asio::detail::index_sequence<I...>)
0446   {
0447     typedef typename parallel_op_signature_as_tuple<
0448         typename ranged_parallel_group_signature<
0449           completion_signature_of_t<Op>,
0450           Allocator
0451         >::raw_type
0452       >::type vectors_type;
0453 
0454     // Construct all result vectors using the supplied allocator.
0455     vectors_type vectors{
0456         typename std::tuple_element<I, vectors_type>::type(
0457           BOOST_ASIO_REBIND_ALLOC(Allocator, int)(allocator_))...};
0458 
0459     // Reserve sufficient space in each of the result vectors.
0460     int reserve_fold[] = { 0,
0461       ( std::get<I>(vectors).reserve(completion_order_.size()),
0462         0 )...
0463     };
0464     (void)reserve_fold;
0465 
0466     // Copy the results from all operations into the result vectors.
0467     for (std::size_t idx = 0; idx < completion_order_.size(); ++idx)
0468     {
0469       int pushback_fold[] = { 0,
0470         ( std::get<I>(vectors).push_back(
0471             std::move(std::get<I>(args_[idx].get()))),
0472           0 )...
0473       };
0474       (void)pushback_fold;
0475     }
0476 
0477     std::move(handler_)(completion_order_, std::move(std::get<I>(vectors))...);
0478   }
0479 
0480   Handler handler_;
0481   executor_type executor_;
0482   Allocator allocator_;
0483   std::vector<std::size_t,
0484     BOOST_ASIO_REBIND_ALLOC(Allocator, std::size_t)> completion_order_;
0485   std::deque<op_result_type,
0486     BOOST_ASIO_REBIND_ALLOC(Allocator, op_result_type)> args_;
0487 };
0488 
0489 // Shared state for the parallel group.
0490 template <typename Condition, typename Handler, typename Op, typename Allocator>
0491 struct ranged_parallel_group_state
0492 {
0493   ranged_parallel_group_state(Condition&& c, Handler&& h,
0494       std::size_t size, const Allocator& allocator)
0495     : cancellations_requested_(size),
0496       outstanding_(size),
0497       cancellation_signals_(
0498           BOOST_ASIO_REBIND_ALLOC(Allocator,
0499             boost::asio::cancellation_signal)(allocator)),
0500       cancellation_condition_(std::move(c)),
0501       handler_(std::move(h), size, allocator)
0502   {
0503     for (std::size_t i = 0; i < size; ++i)
0504       cancellation_signals_.emplace_back();
0505   }
0506 
0507   // The number of operations that have completed so far. Used to determine the
0508   // order of completion.
0509   std::atomic<unsigned int> completed_{0};
0510 
0511   // The non-none cancellation type that resulted from a cancellation condition.
0512   // Stored here for use by the group's initiating function.
0513   std::atomic<cancellation_type_t> cancel_type_{cancellation_type::none};
0514 
0515   // The number of cancellations that have been requested, either on completion
0516   // of the operations within the group, or via the cancellation slot for the
0517   // group operation. Initially set to the number of operations to prevent
0518   // cancellation signals from being emitted until after all of the group's
0519   // operations' initiating functions have completed.
0520   std::atomic<unsigned int> cancellations_requested_;
0521 
0522   // The number of operations that are yet to complete. Used to determine when
0523   // it is safe to invoke the user's completion handler.
0524   std::atomic<unsigned int> outstanding_;
0525 
0526   // The cancellation signals for each operation in the group.
0527   std::deque<boost::asio::cancellation_signal,
0528     BOOST_ASIO_REBIND_ALLOC(Allocator, boost::asio::cancellation_signal)>
0529       cancellation_signals_;
0530 
0531   // The cancellation condition is used to determine whether the results from an
0532   // individual operation warrant a cancellation request for the whole group.
0533   Condition cancellation_condition_;
0534 
0535   // The proxy handler to be invoked once all operations in the group complete.
0536   ranged_parallel_group_completion_handler<Handler, Op, Allocator> handler_;
0537 };
0538 
0539 // Handler for an individual operation within the parallel group.
0540 template <typename Condition, typename Handler, typename Op, typename Allocator>
0541 struct ranged_parallel_group_op_handler
0542 {
0543   typedef boost::asio::cancellation_slot cancellation_slot_type;
0544 
0545   ranged_parallel_group_op_handler(
0546       std::shared_ptr<ranged_parallel_group_state<
0547         Condition, Handler, Op, Allocator>> state,
0548       std::size_t idx)
0549     : state_(std::move(state)),
0550       idx_(idx)
0551   {
0552   }
0553 
0554   cancellation_slot_type get_cancellation_slot() const noexcept
0555   {
0556     return state_->cancellation_signals_[idx_].slot();
0557   }
0558 
0559   template <typename... Args>
0560   void operator()(Args... args)
0561   {
0562     // Capture this operation into the completion order.
0563     state_->handler_.completion_order_[state_->completed_++] = idx_;
0564 
0565     // Determine whether the results of this operation require cancellation of
0566     // the whole group.
0567     cancellation_type_t cancel_type = state_->cancellation_condition_(args...);
0568 
0569     // Capture the result of the operation into the proxy completion handler.
0570     state_->handler_.args_[idx_].emplace(std::move(args)...);
0571 
0572     if (cancel_type != cancellation_type::none)
0573     {
0574       // Save the type for potential use by the group's initiating function.
0575       state_->cancel_type_ = cancel_type;
0576 
0577       // If we are the first operation to request cancellation, emit a signal
0578       // for each operation in the group.
0579       if (state_->cancellations_requested_++ == 0)
0580         for (std::size_t i = 0; i < state_->cancellation_signals_.size(); ++i)
0581           if (i != idx_)
0582             state_->cancellation_signals_[i].emit(cancel_type);
0583     }
0584 
0585     // If this is the last outstanding operation, invoke the user's handler.
0586     if (--state_->outstanding_ == 0)
0587       boost::asio::dispatch(std::move(state_->handler_));
0588   }
0589 
0590   std::shared_ptr<ranged_parallel_group_state<
0591     Condition, Handler, Op, Allocator>> state_;
0592   std::size_t idx_;
0593 };
0594 
0595 // Handler for an individual operation within the parallel group that has an
0596 // explicitly specified executor.
0597 template <typename Executor, typename Condition,
0598     typename Handler, typename Op, typename Allocator>
0599 struct ranged_parallel_group_op_handler_with_executor :
0600   ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>
0601 {
0602   typedef ranged_parallel_group_op_handler<
0603     Condition, Handler, Op, Allocator> base_type;
0604   typedef boost::asio::cancellation_slot cancellation_slot_type;
0605   typedef Executor executor_type;
0606 
0607   ranged_parallel_group_op_handler_with_executor(
0608       std::shared_ptr<ranged_parallel_group_state<
0609         Condition, Handler, Op, Allocator>> state,
0610       executor_type ex, std::size_t idx)
0611     : ranged_parallel_group_op_handler<Condition, Handler, Op, Allocator>(
0612         std::move(state), idx)
0613   {
0614     cancel_proxy_ =
0615       &this->state_->cancellation_signals_[idx].slot().template
0616         emplace<cancel_proxy>(this->state_, std::move(ex));
0617   }
0618 
0619   cancellation_slot_type get_cancellation_slot() const noexcept
0620   {
0621     return cancel_proxy_->signal_.slot();
0622   }
0623 
0624   executor_type get_executor() const noexcept
0625   {
0626     return cancel_proxy_->executor_;
0627   }
0628 
0629   // Proxy handler that forwards the emitted signal to the correct executor.
0630   struct cancel_proxy
0631   {
0632     cancel_proxy(
0633         std::shared_ptr<ranged_parallel_group_state<
0634           Condition, Handler, Op, Allocator>> state,
0635         executor_type ex)
0636       : state_(std::move(state)),
0637         executor_(std::move(ex))
0638     {
0639     }
0640 
0641     void operator()(cancellation_type_t type)
0642     {
0643       if (auto state = state_.lock())
0644       {
0645         boost::asio::cancellation_signal* sig = &signal_;
0646         boost::asio::dispatch(executor_,
0647             [state, sig, type]{ sig->emit(type); });
0648       }
0649     }
0650 
0651     std::weak_ptr<ranged_parallel_group_state<
0652       Condition, Handler, Op, Allocator>> state_;
0653     boost::asio::cancellation_signal signal_;
0654     executor_type executor_;
0655   };
0656 
0657   cancel_proxy* cancel_proxy_;
0658 };
0659 
0660 template <typename Condition, typename Handler, typename Op, typename Allocator>
0661 struct ranged_parallel_group_cancellation_handler
0662 {
0663   ranged_parallel_group_cancellation_handler(
0664       std::shared_ptr<ranged_parallel_group_state<
0665         Condition, Handler, Op, Allocator>> state)
0666     : state_(std::move(state))
0667   {
0668   }
0669 
0670   void operator()(cancellation_type_t cancel_type)
0671   {
0672     // If we are the first place to request cancellation, i.e. no operation has
0673     // yet completed and requested cancellation, emit a signal for each
0674     // operation in the group.
0675     if (cancel_type != cancellation_type::none)
0676       if (auto state = state_.lock())
0677         if (state->cancellations_requested_++ == 0)
0678           for (std::size_t i = 0; i < state->cancellation_signals_.size(); ++i)
0679             state->cancellation_signals_[i].emit(cancel_type);
0680   }
0681 
0682   std::weak_ptr<ranged_parallel_group_state<
0683     Condition, Handler, Op, Allocator>> state_;
0684 };
0685 
0686 template <typename Condition, typename Handler,
0687     typename Range, typename Allocator>
0688 void ranged_parallel_group_launch(Condition cancellation_condition,
0689     Handler handler, Range&& range, const Allocator& allocator)
0690 {
0691   // Get the user's completion handler's cancellation slot, so that we can allow
0692   // cancellation of the entire group.
0693   associated_cancellation_slot_t<Handler> slot
0694     = boost::asio::get_associated_cancellation_slot(handler);
0695 
0696   // The type of the asynchronous operation.
0697   typedef decay_t<decltype(*declval<typename Range::iterator>())> op_type;
0698 
0699   // Create the shared state for the operation.
0700   typedef ranged_parallel_group_state<Condition,
0701     Handler, op_type, Allocator> state_type;
0702   std::shared_ptr<state_type> state = std::allocate_shared<state_type>(
0703       boost::asio::detail::recycling_allocator<state_type,
0704         boost::asio::detail::thread_info_base::parallel_group_tag>(),
0705       std::move(cancellation_condition),
0706       std::move(handler), range.size(), allocator);
0707 
0708   std::size_t idx = 0;
0709   for (auto&& op : std::forward<Range>(range))
0710   {
0711     typedef associated_executor_t<op_type> ex_type;
0712     ex_type ex = boost::asio::get_associated_executor(op);
0713     std::move(op)(
0714         ranged_parallel_group_op_handler_with_executor<
0715           ex_type, Condition, Handler, op_type, Allocator>(
0716             state, std::move(ex), idx++));
0717   }
0718 
0719   // Check if any of the operations has already requested cancellation, and if
0720   // so, emit a signal for each operation in the group.
0721   if ((state->cancellations_requested_ -= range.size()) > 0)
0722     for (auto& signal : state->cancellation_signals_)
0723       signal.emit(state->cancel_type_);
0724 
0725   // Register a handler with the user's completion handler's cancellation slot.
0726   if (slot.is_connected())
0727     slot.template emplace<
0728       ranged_parallel_group_cancellation_handler<
0729         Condition, Handler, op_type, Allocator>>(state);
0730 }
0731 
0732 } // namespace detail
0733 } // namespace experimental
0734 
0735 template <template <typename, typename> class Associator,
0736     typename Handler, typename... Ops, typename DefaultCandidate>
0737 struct associator<Associator,
0738     experimental::detail::parallel_group_completion_handler<Handler, Ops...>,
0739     DefaultCandidate>
0740   : Associator<Handler, DefaultCandidate>
0741 {
0742   static typename Associator<Handler, DefaultCandidate>::type get(
0743       const experimental::detail::parallel_group_completion_handler<
0744         Handler, Ops...>& h) noexcept
0745   {
0746     return Associator<Handler, DefaultCandidate>::get(h.handler_);
0747   }
0748 
0749   static auto get(
0750       const experimental::detail::parallel_group_completion_handler<
0751         Handler, Ops...>& h,
0752       const DefaultCandidate& c) noexcept
0753     -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
0754   {
0755     return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
0756   }
0757 };
0758 
0759 template <template <typename, typename> class Associator, typename Handler,
0760     typename Op, typename Allocator, typename DefaultCandidate>
0761 struct associator<Associator,
0762     experimental::detail::ranged_parallel_group_completion_handler<
0763       Handler, Op, Allocator>,
0764     DefaultCandidate>
0765   : Associator<Handler, DefaultCandidate>
0766 {
0767   static typename Associator<Handler, DefaultCandidate>::type get(
0768       const experimental::detail::ranged_parallel_group_completion_handler<
0769         Handler, Op, Allocator>& h) noexcept
0770   {
0771     return Associator<Handler, DefaultCandidate>::get(h.handler_);
0772   }
0773 
0774   static auto get(
0775       const experimental::detail::ranged_parallel_group_completion_handler<
0776         Handler, Op, Allocator>& h,
0777       const DefaultCandidate& c) noexcept
0778     -> decltype(Associator<Handler, DefaultCandidate>::get(h.handler_, c))
0779   {
0780     return Associator<Handler, DefaultCandidate>::get(h.handler_, c);
0781   }
0782 };
0783 
0784 } // namespace asio
0785 } // namespace boost
0786 
0787 #include <boost/asio/detail/pop_options.hpp>
0788 
0789 #endif // BOOST_ASIO_IMPL_EXPERIMENTAL_PARALLEL_GROUP_HPP