Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:08

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <atomic>
0021 #include <cmath>
0022 #include <functional>
0023 #include <memory>
0024 #include <optional>
0025 #include <type_traits>
0026 #include <utility>
0027 #include <vector>
0028 
0029 #include "arrow/result.h"
0030 #include "arrow/status.h"
0031 #include "arrow/type_fwd.h"
0032 #include "arrow/type_traits.h"
0033 #include "arrow/util/config.h"
0034 #include "arrow/util/functional.h"
0035 #include "arrow/util/macros.h"
0036 #include "arrow/util/tracing.h"
0037 #include "arrow/util/type_fwd.h"
0038 #include "arrow/util/visibility.h"
0039 
0040 namespace arrow {
0041 
0042 template <typename>
0043 struct EnsureFuture;
0044 
0045 namespace detail {
0046 
0047 template <typename>
0048 struct is_future : std::false_type {};
0049 
0050 template <typename T>
0051 struct is_future<Future<T>> : std::true_type {};
0052 
0053 template <typename Signature, typename Enable = void>
0054 struct result_of;
0055 
0056 template <typename Fn, typename... A>
0057 struct result_of<Fn(A...),
0058                  internal::void_t<decltype(std::declval<Fn>()(std::declval<A>()...))>> {
0059   using type = decltype(std::declval<Fn>()(std::declval<A>()...));
0060 };
0061 
0062 template <typename Signature>
0063 using result_of_t = typename result_of<Signature>::type;
0064 
0065 // Helper to find the synchronous counterpart for a Future
0066 template <typename T>
0067 struct SyncType {
0068   using type = Result<T>;
0069 };
0070 
0071 template <>
0072 struct SyncType<internal::Empty> {
0073   using type = Status;
0074 };
0075 
0076 template <typename Fn>
0077 using first_arg_is_status =
0078     std::is_same<typename std::decay<internal::call_traits::argument_type<0, Fn>>::type,
0079                  Status>;
0080 
0081 template <typename Fn, typename Then, typename Else,
0082           typename Count = internal::call_traits::argument_count<Fn>>
0083 using if_has_no_args = typename std::conditional<Count::value == 0, Then, Else>::type;
0084 
0085 /// Creates a callback that can be added to a future to mark a `dest` future finished
0086 template <typename Source, typename Dest, bool SourceEmpty = Source::is_empty,
0087           bool DestEmpty = Dest::is_empty>
0088 struct MarkNextFinished {};
0089 
0090 /// If the source and dest are both empty we can pass on the status
0091 template <typename Source, typename Dest>
0092 struct MarkNextFinished<Source, Dest, true, true> {
0093   void operator()(const Status& status) && { next.MarkFinished(status); }
0094   Dest next;
0095 };
0096 
0097 /// If the source is not empty but the dest is then we can take the
0098 /// status out of the result
0099 template <typename Source, typename Dest>
0100 struct MarkNextFinished<Source, Dest, false, true> {
0101   void operator()(const Result<typename Source::ValueType>& res) && {
0102     next.MarkFinished(internal::Empty::ToResult(res.status()));
0103   }
0104   Dest next;
0105 };
0106 
0107 /// If neither are empty we pass on the result
0108 template <typename Source, typename Dest>
0109 struct MarkNextFinished<Source, Dest, false, false> {
0110   void operator()(const Result<typename Source::ValueType>& res) && {
0111     next.MarkFinished(res);
0112   }
0113   Dest next;
0114 };
0115 
0116 /// Helper that contains information about how to apply a continuation
0117 struct ContinueFuture {
0118   template <typename Return>
0119   struct ForReturnImpl;
0120 
0121   template <typename Return>
0122   using ForReturn = typename ForReturnImpl<Return>::type;
0123 
0124   template <typename Signature>
0125   using ForSignature = ForReturn<result_of_t<Signature>>;
0126 
0127   // If the callback returns void then we return Future<> that always finishes OK.
0128   template <typename ContinueFunc, typename... Args,
0129             typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
0130             typename NextFuture = ForReturn<ContinueResult>>
0131   typename std::enable_if<std::is_void<ContinueResult>::value>::type operator()(
0132       NextFuture next, ContinueFunc&& f, Args&&... a) const {
0133     std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
0134     next.MarkFinished();
0135   }
0136 
0137   /// If the callback returns a non-future then we return Future<T>
0138   /// and mark the future finished with the callback result.  It will get promoted
0139   /// to Result<T> as part of MarkFinished if it isn't already.
0140   ///
0141   /// If the callback returns Status and we return Future<> then also send the callback
0142   /// result as-is to the destination future.
0143   template <typename ContinueFunc, typename... Args,
0144             typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
0145             typename NextFuture = ForReturn<ContinueResult>>
0146   typename std::enable_if<
0147       !std::is_void<ContinueResult>::value && !is_future<ContinueResult>::value &&
0148       (!NextFuture::is_empty || std::is_same<ContinueResult, Status>::value)>::type
0149   operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
0150     next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...));
0151   }
0152 
0153   /// If the callback returns a Result and the next future is Future<> then we mark
0154   /// the future finished with the callback result.
0155   ///
0156   /// It may seem odd that the next future is Future<> when the callback returns a
0157   /// result but this can occur if the OnFailure callback returns a result while the
0158   /// OnSuccess callback is void/Status (e.g. you would get this calling the one-arg
0159   /// version of Then with an OnSuccess callback that returns void)
0160   template <typename ContinueFunc, typename... Args,
0161             typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
0162             typename NextFuture = ForReturn<ContinueResult>>
0163   typename std::enable_if<!std::is_void<ContinueResult>::value &&
0164                           !is_future<ContinueResult>::value && NextFuture::is_empty &&
0165                           !std::is_same<ContinueResult, Status>::value>::type
0166   operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
0167     next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...).status());
0168   }
0169 
0170   /// If the callback returns a Future<T> then we return Future<T>.  We create a new
0171   /// future and add a callback to the future given to us by the user that forwards the
0172   /// result to the future we just created
0173   template <typename ContinueFunc, typename... Args,
0174             typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
0175             typename NextFuture = ForReturn<ContinueResult>>
0176   typename std::enable_if<is_future<ContinueResult>::value>::type operator()(
0177       NextFuture next, ContinueFunc&& f, Args&&... a) const {
0178     ContinueResult signal_to_complete_next =
0179         std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
0180     MarkNextFinished<ContinueResult, NextFuture> callback{std::move(next)};
0181     signal_to_complete_next.AddCallback(std::move(callback));
0182   }
0183 
0184   /// Helpers to conditionally ignore arguments to ContinueFunc
0185   template <typename ContinueFunc, typename NextFuture, typename... Args>
0186   void IgnoringArgsIf(std::true_type, NextFuture&& next, ContinueFunc&& f,
0187                       Args&&...) const {
0188     operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f));
0189   }
0190   template <typename ContinueFunc, typename NextFuture, typename... Args>
0191   void IgnoringArgsIf(std::false_type, NextFuture&& next, ContinueFunc&& f,
0192                       Args&&... a) const {
0193     operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f),
0194                std::forward<Args>(a)...);
0195   }
0196 };
0197 
0198 /// Helper struct which tells us what kind of Future gets returned from `Then` based on
0199 /// the return type of the OnSuccess callback
0200 template <>
0201 struct ContinueFuture::ForReturnImpl<void> {
0202   using type = Future<>;
0203 };
0204 
0205 template <>
0206 struct ContinueFuture::ForReturnImpl<Status> {
0207   using type = Future<>;
0208 };
0209 
0210 template <typename R>
0211 struct ContinueFuture::ForReturnImpl {
0212   using type = Future<R>;
0213 };
0214 
0215 template <typename T>
0216 struct ContinueFuture::ForReturnImpl<Result<T>> {
0217   using type = Future<T>;
0218 };
0219 
0220 template <typename T>
0221 struct ContinueFuture::ForReturnImpl<Future<T>> {
0222   using type = Future<T>;
0223 };
0224 
0225 }  // namespace detail
0226 
0227 /// A Future's execution or completion status
0228 enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
0229 
0230 inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
0231 
0232 /// \brief Describe whether the callback should be scheduled or run synchronously
0233 enum class ShouldSchedule {
0234   /// Always run the callback synchronously (the default)
0235   Never = 0,
0236   /// Schedule a new task only if the future is not finished when the
0237   /// callback is added
0238   IfUnfinished = 1,
0239   /// Always schedule the callback as a new task
0240   Always = 2,
0241   /// Schedule a new task only if it would run on an executor other than
0242   /// the specified executor.
0243   IfDifferentExecutor = 3,
0244 };
0245 
0246 /// \brief Options that control how a continuation is run
0247 struct CallbackOptions {
0248   /// Describe whether the callback should be run synchronously or scheduled
0249   ShouldSchedule should_schedule = ShouldSchedule::Never;
0250   /// If the callback is scheduled then this is the executor it should be scheduled
0251   /// on.  If this is NULL then should_schedule must be Never
0252   internal::Executor* executor = NULLPTR;
0253 
0254   static CallbackOptions Defaults() { return {}; }
0255 };
0256 
0257 // Untyped private implementation
0258 class ARROW_EXPORT FutureImpl : public std::enable_shared_from_this<FutureImpl> {
0259  public:
0260   FutureImpl();
0261   virtual ~FutureImpl() = default;
0262 
0263   FutureState state() { return state_.load(); }
0264 
0265   static std::unique_ptr<FutureImpl> Make();
0266   static std::unique_ptr<FutureImpl> MakeFinished(FutureState state);
0267 
0268 #ifdef ARROW_WITH_OPENTELEMETRY
0269   void SetSpan(util::tracing::Span* span) { span_ = span; }
0270 #endif
0271 
0272   // Future API
0273   void MarkFinished();
0274   void MarkFailed();
0275   void Wait();
0276   bool Wait(double seconds);
0277   template <typename ValueType>
0278   Result<ValueType>* CastResult() const {
0279     return static_cast<Result<ValueType>*>(result_.get());
0280   }
0281 
0282   using Callback = internal::FnOnce<void(const FutureImpl& impl)>;
0283   void AddCallback(Callback callback, CallbackOptions opts);
0284   bool TryAddCallback(const std::function<Callback()>& callback_factory,
0285                       CallbackOptions opts);
0286 
0287   std::atomic<FutureState> state_{FutureState::PENDING};
0288 
0289   // Type erased storage for arbitrary results
0290   // XXX small objects could be stored inline instead of boxed in a pointer
0291   using Storage = std::unique_ptr<void, void (*)(void*)>;
0292   Storage result_{NULLPTR, NULLPTR};
0293 
0294   struct CallbackRecord {
0295     Callback callback;
0296     CallbackOptions options;
0297   };
0298   std::vector<CallbackRecord> callbacks_;
0299 #ifdef ARROW_WITH_OPENTELEMETRY
0300   util::tracing::Span* span_ = NULLPTR;
0301 #endif
0302 };
0303 
0304 // ---------------------------------------------------------------------
0305 // Public API
0306 
0307 /// \brief EXPERIMENTAL A std::future-like class with more functionality.
0308 ///
0309 /// A Future represents the results of a past or future computation.
0310 /// The Future API has two sides: a producer side and a consumer side.
0311 ///
0312 /// The producer API allows creating a Future and setting its result or
0313 /// status, possibly after running a computation function.
0314 ///
0315 /// The consumer API allows querying a Future's current state, wait for it
0316 /// to complete, and composing futures with callbacks.
0317 template <typename T>
0318 class [[nodiscard]] Future {
0319  public:
0320   using ValueType = T;
0321   using SyncType = typename detail::SyncType<T>::type;
0322   static constexpr bool is_empty = std::is_same<T, internal::Empty>::value;
0323   // The default constructor creates an invalid Future.  Use Future::Make()
0324   // for a valid Future.  This constructor is mostly for the convenience
0325   // of being able to presize a vector of Futures.
0326   Future() = default;
0327 
0328 #ifdef ARROW_WITH_OPENTELEMETRY
0329   void SetSpan(util::tracing::Span* span) { impl_->SetSpan(span); }
0330 #endif
0331 
0332   // Consumer API
0333 
0334   bool is_valid() const { return impl_ != NULLPTR; }
0335 
0336   /// \brief Return the Future's current state
0337   ///
0338   /// A return value of PENDING is only indicative, as the Future can complete
0339   /// concurrently.  A return value of FAILURE or SUCCESS is definitive, though.
0340   FutureState state() const {
0341     CheckValid();
0342     return impl_->state();
0343   }
0344 
0345   /// \brief Whether the Future is finished
0346   ///
0347   /// A false return value is only indicative, as the Future can complete
0348   /// concurrently.  A true return value is definitive, though.
0349   bool is_finished() const {
0350     CheckValid();
0351     return IsFutureFinished(impl_->state());
0352   }
0353 
0354   /// \brief Wait for the Future to complete and return its Result
0355   const Result<ValueType>& result() const& {
0356     Wait();
0357     return *GetResult();
0358   }
0359 
0360   /// \brief Returns an rvalue to the result.  This method is potentially unsafe
0361   ///
0362   /// The future is not the unique owner of the result, copies of a future will
0363   /// also point to the same result.  You must make sure that no other copies
0364   /// of the future exist.  Attempts to add callbacks after you move the result
0365   /// will result in undefined behavior.
0366   Result<ValueType>&& MoveResult() {
0367     Wait();
0368     return std::move(*GetResult());
0369   }
0370 
0371   /// \brief Wait for the Future to complete and return its Status
0372   const Status& status() const { return result().status(); }
0373 
0374   /// \brief Future<T> is convertible to Future<>, which views only the
0375   /// Status of the original. Marking the returned Future Finished is not supported.
0376   explicit operator Future<>() const {
0377     Future<> status_future;
0378     status_future.impl_ = impl_;
0379     return status_future;
0380   }
0381 
0382   /// \brief Wait for the Future to complete
0383   void Wait() const {
0384     CheckValid();
0385     impl_->Wait();
0386   }
0387 
0388   /// \brief Wait for the Future to complete, or for the timeout to expire
0389   ///
0390   /// `true` is returned if the Future completed, `false` if the timeout expired.
0391   /// Note a `false` value is only indicative, as the Future can complete
0392   /// concurrently.
0393   bool Wait(double seconds) const {
0394     CheckValid();
0395     return impl_->Wait(seconds);
0396   }
0397 
0398   // Producer API
0399 
0400   /// \brief Producer API: mark Future finished
0401   ///
0402   /// The Future's result is set to `res`.
0403   void MarkFinished(Result<ValueType> res) { DoMarkFinished(std::move(res)); }
0404 
0405   /// \brief Mark a Future<> completed with the provided Status.
0406   template <typename E = ValueType, typename = typename std::enable_if<
0407                                         std::is_same<E, internal::Empty>::value>::type>
0408   void MarkFinished(Status s = Status::OK()) {
0409     return DoMarkFinished(E::ToResult(std::move(s)));
0410   }
0411 
0412   /// \brief Producer API: instantiate a valid Future
0413   ///
0414   /// The Future's state is initialized with PENDING.  If you are creating a future with
0415   /// this method you must ensure that future is eventually completed (with success or
0416   /// failure).  Creating a future, returning it, and never completing the future can lead
0417   /// to memory leaks (for example, see Loop).
0418   static Future Make() {
0419     Future fut;
0420     fut.impl_ = FutureImpl::Make();
0421     return fut;
0422   }
0423 
0424   /// \brief Producer API: instantiate a finished Future
0425   static Future<ValueType> MakeFinished(Result<ValueType> res) {
0426     Future<ValueType> fut;
0427     fut.InitializeFromResult(std::move(res));
0428     return fut;
0429   }
0430 
0431   /// \brief Make a finished Future<> with the provided Status.
0432   template <typename E = ValueType, typename = typename std::enable_if<
0433                                         std::is_same<E, internal::Empty>::value>::type>
0434   static Future<> MakeFinished(Status s = Status::OK()) {
0435     return MakeFinished(E::ToResult(std::move(s)));
0436   }
0437 
0438   struct WrapResultOnComplete {
0439     template <typename OnComplete>
0440     struct Callback {
0441       void operator()(const FutureImpl& impl) && {
0442         std::move(on_complete)(*impl.CastResult<ValueType>());
0443       }
0444       OnComplete on_complete;
0445     };
0446   };
0447 
0448   struct WrapStatusyOnComplete {
0449     template <typename OnComplete>
0450     struct Callback {
0451       static_assert(std::is_same<internal::Empty, ValueType>::value,
0452                     "Only callbacks for Future<> should accept Status and not Result");
0453 
0454       void operator()(const FutureImpl& impl) && {
0455         std::move(on_complete)(impl.CastResult<ValueType>()->status());
0456       }
0457       OnComplete on_complete;
0458     };
0459   };
0460 
0461   template <typename OnComplete>
0462   using WrapOnComplete = typename std::conditional<
0463       detail::first_arg_is_status<OnComplete>::value, WrapStatusyOnComplete,
0464       WrapResultOnComplete>::type::template Callback<OnComplete>;
0465 
0466   /// \brief Consumer API: Register a callback to run when this future completes
0467   ///
0468   /// The callback should receive the result of the future (const Result<T>&)
0469   /// For a void or statusy future this should be (const Status&)
0470   ///
0471   /// There is no guarantee to the order in which callbacks will run.  In
0472   /// particular, callbacks added while the future is being marked complete
0473   /// may be executed immediately, ahead of, or even the same time as, other
0474   /// callbacks that have been previously added.
0475   ///
0476   /// WARNING: callbacks may hold arbitrary references, including cyclic references.
0477   /// Since callbacks will only be destroyed after they are invoked, this can lead to
0478   /// memory leaks if a Future is never marked finished (abandoned):
0479   ///
0480   /// {
0481   ///     auto fut = Future<>::Make();
0482   ///     fut.AddCallback([fut]() {});
0483   /// }
0484   ///
0485   /// In this example `fut` falls out of scope but is not destroyed because it holds a
0486   /// cyclic reference to itself through the callback.
0487   template <typename OnComplete, typename Callback = WrapOnComplete<OnComplete>>
0488   void AddCallback(OnComplete on_complete,
0489                    CallbackOptions opts = CallbackOptions::Defaults()) const {
0490     // We know impl_ will not be dangling when invoking callbacks because at least one
0491     // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
0492     // weak reference to impl_ here
0493     impl_->AddCallback(Callback{std::move(on_complete)}, opts);
0494   }
0495 
0496   /// \brief Overload of AddCallback that will return false instead of running
0497   /// synchronously
0498   ///
0499   /// This overload will guarantee the callback is never run synchronously.  If the future
0500   /// is already finished then it will simply return false.  This can be useful to avoid
0501   /// stack overflow in a situation where you have recursive Futures.  For an example
0502   /// see the Loop function
0503   ///
0504   /// Takes in a callback factory function to allow moving callbacks (the factory function
0505   /// will only be called if the callback can successfully be added)
0506   ///
0507   /// Returns true if a callback was actually added and false if the callback failed
0508   /// to add because the future was marked complete.
0509   template <typename CallbackFactory,
0510             typename OnComplete = detail::result_of_t<CallbackFactory()>,
0511             typename Callback = WrapOnComplete<OnComplete>>
0512   bool TryAddCallback(CallbackFactory callback_factory,
0513                       CallbackOptions opts = CallbackOptions::Defaults()) const {
0514     return impl_->TryAddCallback([&]() { return Callback{callback_factory()}; }, opts);
0515   }
0516 
0517   template <typename OnSuccess, typename OnFailure>
0518   struct ThenOnComplete {
0519     static constexpr bool has_no_args =
0520         internal::call_traits::argument_count<OnSuccess>::value == 0;
0521 
0522     using ContinuedFuture = detail::ContinueFuture::ForSignature<
0523         detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>;
0524 
0525     static_assert(
0526         std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
0527                      ContinuedFuture>::value,
0528         "OnSuccess and OnFailure must continue with the same future type");
0529 
0530     struct DummyOnSuccess {
0531       void operator()(const T&);
0532     };
0533     using OnSuccessArg = typename std::decay<internal::call_traits::argument_type<
0534         0, detail::if_has_no_args<OnSuccess, DummyOnSuccess, OnSuccess>>>::type;
0535 
0536     static_assert(
0537         !std::is_same<OnSuccessArg, typename EnsureResult<OnSuccessArg>::type>::value,
0538         "OnSuccess' argument should not be a Result");
0539 
0540     void operator()(const Result<T>& result) && {
0541       detail::ContinueFuture continue_future;
0542       if (ARROW_PREDICT_TRUE(result.ok())) {
0543         // move on_failure to a(n immediately destroyed) temporary to free its resources
0544         ARROW_UNUSED(OnFailure(std::move(on_failure)));
0545         continue_future.IgnoringArgsIf(
0546             detail::if_has_no_args<OnSuccess, std::true_type, std::false_type>{},
0547             std::move(next), std::move(on_success), result.ValueOrDie());
0548       } else {
0549         ARROW_UNUSED(OnSuccess(std::move(on_success)));
0550         continue_future(std::move(next), std::move(on_failure), result.status());
0551       }
0552     }
0553 
0554     OnSuccess on_success;
0555     OnFailure on_failure;
0556     ContinuedFuture next;
0557   };
0558 
0559   template <typename OnSuccess>
0560   struct PassthruOnFailure {
0561     using ContinuedFuture = detail::ContinueFuture::ForSignature<
0562         detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>;
0563 
0564     Result<typename ContinuedFuture::ValueType> operator()(const Status& s) { return s; }
0565   };
0566 
0567   /// \brief Consumer API: Register a continuation to run when this future completes
0568   ///
0569   /// The continuation will run in the same thread that called MarkFinished (whatever
0570   /// callback is registered with this function will run before MarkFinished returns).
0571   /// Avoid long-running callbacks in favor of submitting a task to an Executor and
0572   /// returning the future.
0573   ///
0574   /// Two callbacks are supported:
0575   /// - OnSuccess, called with the result (const ValueType&) on successful completion.
0576   ///              for an empty future this will be called with nothing ()
0577   /// - OnFailure, called with the error (const Status&) on failed completion.
0578   ///              This callback is optional and defaults to a passthru of any errors.
0579   ///
0580   /// Then() returns a Future whose ValueType is derived from the return type of the
0581   /// callbacks. If a callback returns:
0582   /// - void, a Future<> will be returned which will completes successfully as soon
0583   ///   as the callback runs.
0584   /// - Status, a Future<> will be returned which will complete with the returned Status
0585   ///   as soon as the callback runs.
0586   /// - V or Result<V>, a Future<V> will be returned which will complete with the result
0587   ///   of invoking the callback as soon as the callback runs.
0588   /// - Future<V>, a Future<V> will be returned which will be marked complete when the
0589   ///   future returned by the callback completes (and will complete with the same
0590   ///   result).
0591   ///
0592   /// The continued Future type must be the same for both callbacks.
0593   ///
0594   /// Note that OnFailure can swallow errors, allowing continued Futures to successfully
0595   /// complete even if this Future fails.
0596   ///
0597   /// If this future is already completed then the callback will be run immediately
0598   /// and the returned future may already be marked complete.
0599   ///
0600   /// See AddCallback for general considerations when writing callbacks.
0601   template <typename OnSuccess, typename OnFailure = PassthruOnFailure<OnSuccess>,
0602             typename OnComplete = ThenOnComplete<OnSuccess, OnFailure>,
0603             typename ContinuedFuture = typename OnComplete::ContinuedFuture>
0604   ContinuedFuture Then(OnSuccess on_success, OnFailure on_failure = {},
0605                        CallbackOptions options = CallbackOptions::Defaults()) const {
0606     auto next = ContinuedFuture::Make();
0607     AddCallback(OnComplete{std::forward<OnSuccess>(on_success),
0608                            std::forward<OnFailure>(on_failure), next},
0609                 options);
0610     return next;
0611   }
0612 
0613   /// \brief Implicit constructor to create a finished future from a value
0614   Future(ValueType val) : Future() {  // NOLINT runtime/explicit
0615     impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
0616     SetResult(std::move(val));
0617   }
0618 
0619   /// \brief Implicit constructor to create a future from a Result, enabling use
0620   ///     of macros like ARROW_ASSIGN_OR_RAISE.
0621   Future(Result<ValueType> res) : Future() {  // NOLINT runtime/explicit
0622     if (ARROW_PREDICT_TRUE(res.ok())) {
0623       impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
0624     } else {
0625       impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
0626     }
0627     SetResult(std::move(res));
0628   }
0629 
0630   /// \brief Implicit constructor to create a future from a Status, enabling use
0631   ///     of macros like ARROW_RETURN_NOT_OK.
0632   Future(Status s)  // NOLINT runtime/explicit
0633       : Future(Result<ValueType>(std::move(s))) {}
0634 
0635  protected:
0636   void InitializeFromResult(Result<ValueType> res) {
0637     if (ARROW_PREDICT_TRUE(res.ok())) {
0638       impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
0639     } else {
0640       impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
0641     }
0642     SetResult(std::move(res));
0643   }
0644 
0645   void Initialize() { impl_ = FutureImpl::Make(); }
0646 
0647   Result<ValueType>* GetResult() const { return impl_->CastResult<ValueType>(); }
0648 
0649   void SetResult(Result<ValueType> res) {
0650     impl_->result_ = {new Result<ValueType>(std::move(res)),
0651                       [](void* p) { delete static_cast<Result<ValueType>*>(p); }};
0652   }
0653 
0654   void DoMarkFinished(Result<ValueType> res) {
0655     SetResult(std::move(res));
0656 
0657     if (ARROW_PREDICT_TRUE(GetResult()->ok())) {
0658       impl_->MarkFinished();
0659     } else {
0660       impl_->MarkFailed();
0661     }
0662   }
0663 
0664   void CheckValid() const {
0665 #ifndef NDEBUG
0666     if (!is_valid()) {
0667       Status::Invalid("Invalid Future (default-initialized?)").Abort();
0668     }
0669 #endif
0670   }
0671 
0672   explicit Future(std::shared_ptr<FutureImpl> impl) : impl_(std::move(impl)) {}
0673 
0674   std::shared_ptr<FutureImpl> impl_;
0675 
0676   friend struct detail::ContinueFuture;
0677 
0678   template <typename U>
0679   friend class Future;
0680   friend class WeakFuture<T>;
0681 
0682   FRIEND_TEST(FutureRefTest, ChainRemoved);
0683   FRIEND_TEST(FutureRefTest, TailRemoved);
0684   FRIEND_TEST(FutureRefTest, HeadRemoved);
0685 };
0686 
0687 template <typename T>
0688 typename Future<T>::SyncType FutureToSync(const Future<T>& fut) {
0689   return fut.result();
0690 }
0691 
0692 template <>
0693 inline typename Future<internal::Empty>::SyncType FutureToSync<internal::Empty>(
0694     const Future<internal::Empty>& fut) {
0695   return fut.status();
0696 }
0697 
0698 template <>
0699 inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {}
0700 
0701 template <typename T>
0702 class WeakFuture {
0703  public:
0704   explicit WeakFuture(const Future<T>& future) : impl_(future.impl_) {}
0705 
0706   Future<T> get() { return Future<T>{impl_.lock()}; }
0707 
0708  private:
0709   std::weak_ptr<FutureImpl> impl_;
0710 };
0711 
0712 /// \defgroup future-utilities Functions for working with Futures
0713 /// @{
0714 
0715 /// If a Result<Future> holds an error instead of a Future, construct a finished Future
0716 /// holding that error.
0717 template <typename T>
0718 static Future<T> DeferNotOk(Result<Future<T>> maybe_future) {
0719   if (ARROW_PREDICT_FALSE(!maybe_future.ok())) {
0720     return Future<T>::MakeFinished(std::move(maybe_future).status());
0721   }
0722   return std::move(maybe_future).MoveValueUnsafe();
0723 }
0724 
0725 /// \brief Create a Future which completes when all of `futures` complete.
0726 ///
0727 /// The future's result is a vector of the results of `futures`.
0728 /// Note that this future will never be marked "failed"; failed results
0729 /// will be stored in the result vector alongside successful results.
0730 template <typename T>
0731 Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) {
0732   struct State {
0733     explicit State(std::vector<Future<T>> f)
0734         : futures(std::move(f)), n_remaining(futures.size()) {}
0735 
0736     std::vector<Future<T>> futures;
0737     std::atomic<size_t> n_remaining;
0738   };
0739 
0740   if (futures.size() == 0) {
0741     return {std::vector<Result<T>>{}};
0742   }
0743 
0744   auto state = std::make_shared<State>(std::move(futures));
0745 
0746   auto out = Future<std::vector<Result<T>>>::Make();
0747   for (const Future<T>& future : state->futures) {
0748     future.AddCallback([state, out](const Result<T>&) mutable {
0749       if (state->n_remaining.fetch_sub(1) != 1) return;
0750 
0751       std::vector<Result<T>> results(state->futures.size());
0752       for (size_t i = 0; i < results.size(); ++i) {
0753         results[i] = state->futures[i].result();
0754       }
0755       out.MarkFinished(std::move(results));
0756     });
0757   }
0758   return out;
0759 }
0760 
0761 /// \brief Create a Future which completes when all of `futures` complete.
0762 ///
0763 /// The future will be marked complete if all `futures` complete
0764 /// successfully. Otherwise, it will be marked failed with the status of
0765 /// the first failing future.
0766 ARROW_EXPORT
0767 Future<> AllComplete(const std::vector<Future<>>& futures);
0768 
0769 /// \brief Create a Future which completes when all of `futures` complete.
0770 ///
0771 /// The future will finish with an ok status if all `futures` finish with
0772 /// an ok status. Otherwise, it will be marked failed with the status of
0773 /// one of the failing futures.
0774 ///
0775 /// Unlike AllComplete this Future will not complete immediately when a
0776 /// failure occurs.  It will wait until all futures have finished.
0777 ARROW_EXPORT
0778 Future<> AllFinished(const std::vector<Future<>>& futures);
0779 
0780 /// @}
0781 
0782 struct Continue {
0783   template <typename T>
0784   operator std::optional<T>() && {  // NOLINT explicit
0785     return {};
0786   }
0787 };
0788 
0789 template <typename T = internal::Empty>
0790 std::optional<T> Break(T break_value = {}) {
0791   return std::optional<T>{std::move(break_value)};
0792 }
0793 
0794 template <typename T = internal::Empty>
0795 using ControlFlow = std::optional<T>;
0796 
0797 /// \brief Loop through an asynchronous sequence
0798 ///
0799 /// \param[in] iterate A generator of Future<ControlFlow<BreakValue>>. On completion
0800 /// of each yielded future the resulting ControlFlow will be examined. A Break will
0801 /// terminate the loop, while a Continue will re-invoke `iterate`.
0802 ///
0803 /// \return A future which will complete when a Future returned by iterate completes with
0804 /// a Break
0805 template <typename Iterate,
0806           typename Control = typename detail::result_of_t<Iterate()>::ValueType,
0807           typename BreakValueType = typename Control::value_type>
0808 Future<BreakValueType> Loop(Iterate iterate) {
0809   struct Callback {
0810     bool CheckForTermination(const Result<Control>& control_res) {
0811       if (!control_res.ok()) {
0812         break_fut.MarkFinished(control_res.status());
0813         return true;
0814       }
0815       if (control_res->has_value()) {
0816         break_fut.MarkFinished(**control_res);
0817         return true;
0818       }
0819       return false;
0820     }
0821 
0822     void operator()(const Result<Control>& maybe_control) && {
0823       if (CheckForTermination(maybe_control)) return;
0824 
0825       auto control_fut = iterate();
0826       while (true) {
0827         if (control_fut.TryAddCallback([this]() { return *this; })) {
0828           // Adding a callback succeeded; control_fut was not finished
0829           // and we must wait to CheckForTermination.
0830           return;
0831         }
0832         // Adding a callback failed; control_fut was finished and we
0833         // can CheckForTermination immediately. This also avoids recursion and potential
0834         // stack overflow.
0835         if (CheckForTermination(control_fut.result())) return;
0836 
0837         control_fut = iterate();
0838       }
0839     }
0840 
0841     Iterate iterate;
0842 
0843     // If the future returned by control_fut is never completed then we will be hanging on
0844     // to break_fut forever even if the listener has given up listening on it.  Instead we
0845     // rely on the fact that a producer (the caller of Future<>::Make) is always
0846     // responsible for completing the futures they create.
0847     // TODO: Could avoid this kind of situation with "future abandonment" similar to mesos
0848     Future<BreakValueType> break_fut;
0849   };
0850 
0851   auto break_fut = Future<BreakValueType>::Make();
0852   auto control_fut = iterate();
0853   control_fut.AddCallback(Callback{std::move(iterate), break_fut});
0854 
0855   return break_fut;
0856 }
0857 
0858 inline Future<> ToFuture(Status status) {
0859   return Future<>::MakeFinished(std::move(status));
0860 }
0861 
0862 template <typename T>
0863 Future<T> ToFuture(T value) {
0864   return Future<T>::MakeFinished(std::move(value));
0865 }
0866 
0867 template <typename T>
0868 Future<T> ToFuture(Result<T> maybe_value) {
0869   return Future<T>::MakeFinished(std::move(maybe_value));
0870 }
0871 
0872 template <typename T>
0873 Future<T> ToFuture(Future<T> fut) {
0874   return fut;
0875 }
0876 
0877 template <typename T>
0878 struct EnsureFuture {
0879   using type = decltype(ToFuture(std::declval<T>()));
0880 };
0881 
0882 }  // namespace arrow