File indexing completed on 2025-08-28 08:27:08
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <atomic>
0021 #include <cmath>
0022 #include <functional>
0023 #include <memory>
0024 #include <optional>
0025 #include <type_traits>
0026 #include <utility>
0027 #include <vector>
0028
0029 #include "arrow/result.h"
0030 #include "arrow/status.h"
0031 #include "arrow/type_fwd.h"
0032 #include "arrow/type_traits.h"
0033 #include "arrow/util/config.h"
0034 #include "arrow/util/functional.h"
0035 #include "arrow/util/macros.h"
0036 #include "arrow/util/tracing.h"
0037 #include "arrow/util/type_fwd.h"
0038 #include "arrow/util/visibility.h"
0039
0040 namespace arrow {
0041
0042 template <typename>
0043 struct EnsureFuture;
0044
0045 namespace detail {
0046
0047 template <typename>
0048 struct is_future : std::false_type {};
0049
0050 template <typename T>
0051 struct is_future<Future<T>> : std::true_type {};
0052
0053 template <typename Signature, typename Enable = void>
0054 struct result_of;
0055
0056 template <typename Fn, typename... A>
0057 struct result_of<Fn(A...),
0058 internal::void_t<decltype(std::declval<Fn>()(std::declval<A>()...))>> {
0059 using type = decltype(std::declval<Fn>()(std::declval<A>()...));
0060 };
0061
0062 template <typename Signature>
0063 using result_of_t = typename result_of<Signature>::type;
0064
0065
0066 template <typename T>
0067 struct SyncType {
0068 using type = Result<T>;
0069 };
0070
0071 template <>
0072 struct SyncType<internal::Empty> {
0073 using type = Status;
0074 };
0075
0076 template <typename Fn>
0077 using first_arg_is_status =
0078 std::is_same<typename std::decay<internal::call_traits::argument_type<0, Fn>>::type,
0079 Status>;
0080
0081 template <typename Fn, typename Then, typename Else,
0082 typename Count = internal::call_traits::argument_count<Fn>>
0083 using if_has_no_args = typename std::conditional<Count::value == 0, Then, Else>::type;
0084
0085
0086 template <typename Source, typename Dest, bool SourceEmpty = Source::is_empty,
0087 bool DestEmpty = Dest::is_empty>
0088 struct MarkNextFinished {};
0089
0090
0091 template <typename Source, typename Dest>
0092 struct MarkNextFinished<Source, Dest, true, true> {
0093 void operator()(const Status& status) && { next.MarkFinished(status); }
0094 Dest next;
0095 };
0096
0097
0098
0099 template <typename Source, typename Dest>
0100 struct MarkNextFinished<Source, Dest, false, true> {
0101 void operator()(const Result<typename Source::ValueType>& res) && {
0102 next.MarkFinished(internal::Empty::ToResult(res.status()));
0103 }
0104 Dest next;
0105 };
0106
0107
0108 template <typename Source, typename Dest>
0109 struct MarkNextFinished<Source, Dest, false, false> {
0110 void operator()(const Result<typename Source::ValueType>& res) && {
0111 next.MarkFinished(res);
0112 }
0113 Dest next;
0114 };
0115
0116
0117 struct ContinueFuture {
0118 template <typename Return>
0119 struct ForReturnImpl;
0120
0121 template <typename Return>
0122 using ForReturn = typename ForReturnImpl<Return>::type;
0123
0124 template <typename Signature>
0125 using ForSignature = ForReturn<result_of_t<Signature>>;
0126
0127
0128 template <typename ContinueFunc, typename... Args,
0129 typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
0130 typename NextFuture = ForReturn<ContinueResult>>
0131 typename std::enable_if<std::is_void<ContinueResult>::value>::type operator()(
0132 NextFuture next, ContinueFunc&& f, Args&&... a) const {
0133 std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
0134 next.MarkFinished();
0135 }
0136
0137
0138
0139
0140
0141
0142
0143 template <typename ContinueFunc, typename... Args,
0144 typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
0145 typename NextFuture = ForReturn<ContinueResult>>
0146 typename std::enable_if<
0147 !std::is_void<ContinueResult>::value && !is_future<ContinueResult>::value &&
0148 (!NextFuture::is_empty || std::is_same<ContinueResult, Status>::value)>::type
0149 operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
0150 next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...));
0151 }
0152
0153
0154
0155
0156
0157
0158
0159
0160 template <typename ContinueFunc, typename... Args,
0161 typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
0162 typename NextFuture = ForReturn<ContinueResult>>
0163 typename std::enable_if<!std::is_void<ContinueResult>::value &&
0164 !is_future<ContinueResult>::value && NextFuture::is_empty &&
0165 !std::is_same<ContinueResult, Status>::value>::type
0166 operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
0167 next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...).status());
0168 }
0169
0170
0171
0172
0173 template <typename ContinueFunc, typename... Args,
0174 typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
0175 typename NextFuture = ForReturn<ContinueResult>>
0176 typename std::enable_if<is_future<ContinueResult>::value>::type operator()(
0177 NextFuture next, ContinueFunc&& f, Args&&... a) const {
0178 ContinueResult signal_to_complete_next =
0179 std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
0180 MarkNextFinished<ContinueResult, NextFuture> callback{std::move(next)};
0181 signal_to_complete_next.AddCallback(std::move(callback));
0182 }
0183
0184
0185 template <typename ContinueFunc, typename NextFuture, typename... Args>
0186 void IgnoringArgsIf(std::true_type, NextFuture&& next, ContinueFunc&& f,
0187 Args&&...) const {
0188 operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f));
0189 }
0190 template <typename ContinueFunc, typename NextFuture, typename... Args>
0191 void IgnoringArgsIf(std::false_type, NextFuture&& next, ContinueFunc&& f,
0192 Args&&... a) const {
0193 operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f),
0194 std::forward<Args>(a)...);
0195 }
0196 };
0197
0198
0199
0200 template <>
0201 struct ContinueFuture::ForReturnImpl<void> {
0202 using type = Future<>;
0203 };
0204
0205 template <>
0206 struct ContinueFuture::ForReturnImpl<Status> {
0207 using type = Future<>;
0208 };
0209
0210 template <typename R>
0211 struct ContinueFuture::ForReturnImpl {
0212 using type = Future<R>;
0213 };
0214
0215 template <typename T>
0216 struct ContinueFuture::ForReturnImpl<Result<T>> {
0217 using type = Future<T>;
0218 };
0219
0220 template <typename T>
0221 struct ContinueFuture::ForReturnImpl<Future<T>> {
0222 using type = Future<T>;
0223 };
0224
0225 }
0226
0227
0228 enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
0229
0230 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
0231
0232
0233 enum class ShouldSchedule {
0234
0235 Never = 0,
0236
0237
0238 IfUnfinished = 1,
0239
0240 Always = 2,
0241
0242
0243 IfDifferentExecutor = 3,
0244 };
0245
0246
0247 struct CallbackOptions {
0248
0249 ShouldSchedule should_schedule = ShouldSchedule::Never;
0250
0251
0252 internal::Executor* executor = NULLPTR;
0253
0254 static CallbackOptions Defaults() { return {}; }
0255 };
0256
0257
0258 class ARROW_EXPORT FutureImpl : public std::enable_shared_from_this<FutureImpl> {
0259 public:
0260 FutureImpl();
0261 virtual ~FutureImpl() = default;
0262
0263 FutureState state() { return state_.load(); }
0264
0265 static std::unique_ptr<FutureImpl> Make();
0266 static std::unique_ptr<FutureImpl> MakeFinished(FutureState state);
0267
0268 #ifdef ARROW_WITH_OPENTELEMETRY
0269 void SetSpan(util::tracing::Span* span) { span_ = span; }
0270 #endif
0271
0272
0273 void MarkFinished();
0274 void MarkFailed();
0275 void Wait();
0276 bool Wait(double seconds);
0277 template <typename ValueType>
0278 Result<ValueType>* CastResult() const {
0279 return static_cast<Result<ValueType>*>(result_.get());
0280 }
0281
0282 using Callback = internal::FnOnce<void(const FutureImpl& impl)>;
0283 void AddCallback(Callback callback, CallbackOptions opts);
0284 bool TryAddCallback(const std::function<Callback()>& callback_factory,
0285 CallbackOptions opts);
0286
0287 std::atomic<FutureState> state_{FutureState::PENDING};
0288
0289
0290
0291 using Storage = std::unique_ptr<void, void (*)(void*)>;
0292 Storage result_{NULLPTR, NULLPTR};
0293
0294 struct CallbackRecord {
0295 Callback callback;
0296 CallbackOptions options;
0297 };
0298 std::vector<CallbackRecord> callbacks_;
0299 #ifdef ARROW_WITH_OPENTELEMETRY
0300 util::tracing::Span* span_ = NULLPTR;
0301 #endif
0302 };
0303
0304
0305
0306
0307
0308
0309
0310
0311
0312
0313
0314
0315
0316
0317 template <typename T>
0318 class [[nodiscard]] Future {
0319 public:
0320 using ValueType = T;
0321 using SyncType = typename detail::SyncType<T>::type;
0322 static constexpr bool is_empty = std::is_same<T, internal::Empty>::value;
0323
0324
0325
0326 Future() = default;
0327
0328 #ifdef ARROW_WITH_OPENTELEMETRY
0329 void SetSpan(util::tracing::Span* span) { impl_->SetSpan(span); }
0330 #endif
0331
0332
0333
0334 bool is_valid() const { return impl_ != NULLPTR; }
0335
0336
0337
0338
0339
0340 FutureState state() const {
0341 CheckValid();
0342 return impl_->state();
0343 }
0344
0345
0346
0347
0348
0349 bool is_finished() const {
0350 CheckValid();
0351 return IsFutureFinished(impl_->state());
0352 }
0353
0354
0355 const Result<ValueType>& result() const& {
0356 Wait();
0357 return *GetResult();
0358 }
0359
0360
0361
0362
0363
0364
0365
0366 Result<ValueType>&& MoveResult() {
0367 Wait();
0368 return std::move(*GetResult());
0369 }
0370
0371
0372 const Status& status() const { return result().status(); }
0373
0374
0375
0376 explicit operator Future<>() const {
0377 Future<> status_future;
0378 status_future.impl_ = impl_;
0379 return status_future;
0380 }
0381
0382
0383 void Wait() const {
0384 CheckValid();
0385 impl_->Wait();
0386 }
0387
0388
0389
0390
0391
0392
0393 bool Wait(double seconds) const {
0394 CheckValid();
0395 return impl_->Wait(seconds);
0396 }
0397
0398
0399
0400
0401
0402
0403 void MarkFinished(Result<ValueType> res) { DoMarkFinished(std::move(res)); }
0404
0405
0406 template <typename E = ValueType, typename = typename std::enable_if<
0407 std::is_same<E, internal::Empty>::value>::type>
0408 void MarkFinished(Status s = Status::OK()) {
0409 return DoMarkFinished(E::ToResult(std::move(s)));
0410 }
0411
0412
0413
0414
0415
0416
0417
0418 static Future Make() {
0419 Future fut;
0420 fut.impl_ = FutureImpl::Make();
0421 return fut;
0422 }
0423
0424
0425 static Future<ValueType> MakeFinished(Result<ValueType> res) {
0426 Future<ValueType> fut;
0427 fut.InitializeFromResult(std::move(res));
0428 return fut;
0429 }
0430
0431
0432 template <typename E = ValueType, typename = typename std::enable_if<
0433 std::is_same<E, internal::Empty>::value>::type>
0434 static Future<> MakeFinished(Status s = Status::OK()) {
0435 return MakeFinished(E::ToResult(std::move(s)));
0436 }
0437
0438 struct WrapResultOnComplete {
0439 template <typename OnComplete>
0440 struct Callback {
0441 void operator()(const FutureImpl& impl) && {
0442 std::move(on_complete)(*impl.CastResult<ValueType>());
0443 }
0444 OnComplete on_complete;
0445 };
0446 };
0447
0448 struct WrapStatusyOnComplete {
0449 template <typename OnComplete>
0450 struct Callback {
0451 static_assert(std::is_same<internal::Empty, ValueType>::value,
0452 "Only callbacks for Future<> should accept Status and not Result");
0453
0454 void operator()(const FutureImpl& impl) && {
0455 std::move(on_complete)(impl.CastResult<ValueType>()->status());
0456 }
0457 OnComplete on_complete;
0458 };
0459 };
0460
0461 template <typename OnComplete>
0462 using WrapOnComplete = typename std::conditional<
0463 detail::first_arg_is_status<OnComplete>::value, WrapStatusyOnComplete,
0464 WrapResultOnComplete>::type::template Callback<OnComplete>;
0465
0466
0467
0468
0469
0470
0471
0472
0473
0474
0475
0476
0477
0478
0479
0480
0481
0482
0483
0484
0485
0486
0487 template <typename OnComplete, typename Callback = WrapOnComplete<OnComplete>>
0488 void AddCallback(OnComplete on_complete,
0489 CallbackOptions opts = CallbackOptions::Defaults()) const {
0490
0491
0492
0493 impl_->AddCallback(Callback{std::move(on_complete)}, opts);
0494 }
0495
0496
0497
0498
0499
0500
0501
0502
0503
0504
0505
0506
0507
0508
0509 template <typename CallbackFactory,
0510 typename OnComplete = detail::result_of_t<CallbackFactory()>,
0511 typename Callback = WrapOnComplete<OnComplete>>
0512 bool TryAddCallback(CallbackFactory callback_factory,
0513 CallbackOptions opts = CallbackOptions::Defaults()) const {
0514 return impl_->TryAddCallback([&]() { return Callback{callback_factory()}; }, opts);
0515 }
0516
0517 template <typename OnSuccess, typename OnFailure>
0518 struct ThenOnComplete {
0519 static constexpr bool has_no_args =
0520 internal::call_traits::argument_count<OnSuccess>::value == 0;
0521
0522 using ContinuedFuture = detail::ContinueFuture::ForSignature<
0523 detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>;
0524
0525 static_assert(
0526 std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
0527 ContinuedFuture>::value,
0528 "OnSuccess and OnFailure must continue with the same future type");
0529
0530 struct DummyOnSuccess {
0531 void operator()(const T&);
0532 };
0533 using OnSuccessArg = typename std::decay<internal::call_traits::argument_type<
0534 0, detail::if_has_no_args<OnSuccess, DummyOnSuccess, OnSuccess>>>::type;
0535
0536 static_assert(
0537 !std::is_same<OnSuccessArg, typename EnsureResult<OnSuccessArg>::type>::value,
0538 "OnSuccess' argument should not be a Result");
0539
0540 void operator()(const Result<T>& result) && {
0541 detail::ContinueFuture continue_future;
0542 if (ARROW_PREDICT_TRUE(result.ok())) {
0543
0544 ARROW_UNUSED(OnFailure(std::move(on_failure)));
0545 continue_future.IgnoringArgsIf(
0546 detail::if_has_no_args<OnSuccess, std::true_type, std::false_type>{},
0547 std::move(next), std::move(on_success), result.ValueOrDie());
0548 } else {
0549 ARROW_UNUSED(OnSuccess(std::move(on_success)));
0550 continue_future(std::move(next), std::move(on_failure), result.status());
0551 }
0552 }
0553
0554 OnSuccess on_success;
0555 OnFailure on_failure;
0556 ContinuedFuture next;
0557 };
0558
0559 template <typename OnSuccess>
0560 struct PassthruOnFailure {
0561 using ContinuedFuture = detail::ContinueFuture::ForSignature<
0562 detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>;
0563
0564 Result<typename ContinuedFuture::ValueType> operator()(const Status& s) { return s; }
0565 };
0566
0567
0568
0569
0570
0571
0572
0573
0574
0575
0576
0577
0578
0579
0580
0581
0582
0583
0584
0585
0586
0587
0588
0589
0590
0591
0592
0593
0594
0595
0596
0597
0598
0599
0600
0601 template <typename OnSuccess, typename OnFailure = PassthruOnFailure<OnSuccess>,
0602 typename OnComplete = ThenOnComplete<OnSuccess, OnFailure>,
0603 typename ContinuedFuture = typename OnComplete::ContinuedFuture>
0604 ContinuedFuture Then(OnSuccess on_success, OnFailure on_failure = {},
0605 CallbackOptions options = CallbackOptions::Defaults()) const {
0606 auto next = ContinuedFuture::Make();
0607 AddCallback(OnComplete{std::forward<OnSuccess>(on_success),
0608 std::forward<OnFailure>(on_failure), next},
0609 options);
0610 return next;
0611 }
0612
0613
0614 Future(ValueType val) : Future() {
0615 impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
0616 SetResult(std::move(val));
0617 }
0618
0619
0620
0621 Future(Result<ValueType> res) : Future() {
0622 if (ARROW_PREDICT_TRUE(res.ok())) {
0623 impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
0624 } else {
0625 impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
0626 }
0627 SetResult(std::move(res));
0628 }
0629
0630
0631
0632 Future(Status s)
0633 : Future(Result<ValueType>(std::move(s))) {}
0634
0635 protected:
0636 void InitializeFromResult(Result<ValueType> res) {
0637 if (ARROW_PREDICT_TRUE(res.ok())) {
0638 impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
0639 } else {
0640 impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
0641 }
0642 SetResult(std::move(res));
0643 }
0644
0645 void Initialize() { impl_ = FutureImpl::Make(); }
0646
0647 Result<ValueType>* GetResult() const { return impl_->CastResult<ValueType>(); }
0648
0649 void SetResult(Result<ValueType> res) {
0650 impl_->result_ = {new Result<ValueType>(std::move(res)),
0651 [](void* p) { delete static_cast<Result<ValueType>*>(p); }};
0652 }
0653
0654 void DoMarkFinished(Result<ValueType> res) {
0655 SetResult(std::move(res));
0656
0657 if (ARROW_PREDICT_TRUE(GetResult()->ok())) {
0658 impl_->MarkFinished();
0659 } else {
0660 impl_->MarkFailed();
0661 }
0662 }
0663
0664 void CheckValid() const {
0665 #ifndef NDEBUG
0666 if (!is_valid()) {
0667 Status::Invalid("Invalid Future (default-initialized?)").Abort();
0668 }
0669 #endif
0670 }
0671
0672 explicit Future(std::shared_ptr<FutureImpl> impl) : impl_(std::move(impl)) {}
0673
0674 std::shared_ptr<FutureImpl> impl_;
0675
0676 friend struct detail::ContinueFuture;
0677
0678 template <typename U>
0679 friend class Future;
0680 friend class WeakFuture<T>;
0681
0682 FRIEND_TEST(FutureRefTest, ChainRemoved);
0683 FRIEND_TEST(FutureRefTest, TailRemoved);
0684 FRIEND_TEST(FutureRefTest, HeadRemoved);
0685 };
0686
0687 template <typename T>
0688 typename Future<T>::SyncType FutureToSync(const Future<T>& fut) {
0689 return fut.result();
0690 }
0691
0692 template <>
0693 inline typename Future<internal::Empty>::SyncType FutureToSync<internal::Empty>(
0694 const Future<internal::Empty>& fut) {
0695 return fut.status();
0696 }
0697
0698 template <>
0699 inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {}
0700
0701 template <typename T>
0702 class WeakFuture {
0703 public:
0704 explicit WeakFuture(const Future<T>& future) : impl_(future.impl_) {}
0705
0706 Future<T> get() { return Future<T>{impl_.lock()}; }
0707
0708 private:
0709 std::weak_ptr<FutureImpl> impl_;
0710 };
0711
0712
0713
0714
0715
0716
0717 template <typename T>
0718 static Future<T> DeferNotOk(Result<Future<T>> maybe_future) {
0719 if (ARROW_PREDICT_FALSE(!maybe_future.ok())) {
0720 return Future<T>::MakeFinished(std::move(maybe_future).status());
0721 }
0722 return std::move(maybe_future).MoveValueUnsafe();
0723 }
0724
0725
0726
0727
0728
0729
0730 template <typename T>
0731 Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) {
0732 struct State {
0733 explicit State(std::vector<Future<T>> f)
0734 : futures(std::move(f)), n_remaining(futures.size()) {}
0735
0736 std::vector<Future<T>> futures;
0737 std::atomic<size_t> n_remaining;
0738 };
0739
0740 if (futures.size() == 0) {
0741 return {std::vector<Result<T>>{}};
0742 }
0743
0744 auto state = std::make_shared<State>(std::move(futures));
0745
0746 auto out = Future<std::vector<Result<T>>>::Make();
0747 for (const Future<T>& future : state->futures) {
0748 future.AddCallback([state, out](const Result<T>&) mutable {
0749 if (state->n_remaining.fetch_sub(1) != 1) return;
0750
0751 std::vector<Result<T>> results(state->futures.size());
0752 for (size_t i = 0; i < results.size(); ++i) {
0753 results[i] = state->futures[i].result();
0754 }
0755 out.MarkFinished(std::move(results));
0756 });
0757 }
0758 return out;
0759 }
0760
0761
0762
0763
0764
0765
0766 ARROW_EXPORT
0767 Future<> AllComplete(const std::vector<Future<>>& futures);
0768
0769
0770
0771
0772
0773
0774
0775
0776
0777 ARROW_EXPORT
0778 Future<> AllFinished(const std::vector<Future<>>& futures);
0779
0780
0781
0782 struct Continue {
0783 template <typename T>
0784 operator std::optional<T>() && {
0785 return {};
0786 }
0787 };
0788
0789 template <typename T = internal::Empty>
0790 std::optional<T> Break(T break_value = {}) {
0791 return std::optional<T>{std::move(break_value)};
0792 }
0793
0794 template <typename T = internal::Empty>
0795 using ControlFlow = std::optional<T>;
0796
0797
0798
0799
0800
0801
0802
0803
0804
0805 template <typename Iterate,
0806 typename Control = typename detail::result_of_t<Iterate()>::ValueType,
0807 typename BreakValueType = typename Control::value_type>
0808 Future<BreakValueType> Loop(Iterate iterate) {
0809 struct Callback {
0810 bool CheckForTermination(const Result<Control>& control_res) {
0811 if (!control_res.ok()) {
0812 break_fut.MarkFinished(control_res.status());
0813 return true;
0814 }
0815 if (control_res->has_value()) {
0816 break_fut.MarkFinished(**control_res);
0817 return true;
0818 }
0819 return false;
0820 }
0821
0822 void operator()(const Result<Control>& maybe_control) && {
0823 if (CheckForTermination(maybe_control)) return;
0824
0825 auto control_fut = iterate();
0826 while (true) {
0827 if (control_fut.TryAddCallback([this]() { return *this; })) {
0828
0829
0830 return;
0831 }
0832
0833
0834
0835 if (CheckForTermination(control_fut.result())) return;
0836
0837 control_fut = iterate();
0838 }
0839 }
0840
0841 Iterate iterate;
0842
0843
0844
0845
0846
0847
0848 Future<BreakValueType> break_fut;
0849 };
0850
0851 auto break_fut = Future<BreakValueType>::Make();
0852 auto control_fut = iterate();
0853 control_fut.AddCallback(Callback{std::move(iterate), break_fut});
0854
0855 return break_fut;
0856 }
0857
0858 inline Future<> ToFuture(Status status) {
0859 return Future<>::MakeFinished(std::move(status));
0860 }
0861
0862 template <typename T>
0863 Future<T> ToFuture(T value) {
0864 return Future<T>::MakeFinished(std::move(value));
0865 }
0866
0867 template <typename T>
0868 Future<T> ToFuture(Result<T> maybe_value) {
0869 return Future<T>::MakeFinished(std::move(maybe_value));
0870 }
0871
0872 template <typename T>
0873 Future<T> ToFuture(Future<T> fut) {
0874 return fut;
0875 }
0876
0877 template <typename T>
0878 struct EnsureFuture {
0879 using type = decltype(ToFuture(std::declval<T>()));
0880 };
0881
0882 }