Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:02

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <atomic>
0021 #include <cassert>
0022 #include <cstring>
0023 #include <deque>
0024 #include <limits>
0025 #include <optional>
0026 #include <queue>
0027 
0028 #include "arrow/util/async_generator_fwd.h"
0029 #include "arrow/util/async_util.h"
0030 #include "arrow/util/functional.h"
0031 #include "arrow/util/future.h"
0032 #include "arrow/util/io_util.h"
0033 #include "arrow/util/iterator.h"
0034 #include "arrow/util/mutex.h"
0035 #include "arrow/util/queue.h"
0036 #include "arrow/util/thread_pool.h"
0037 
0038 namespace arrow {
0039 
0040 // The methods in this file create, modify, and utilize AsyncGenerator which is an
0041 // iterator of futures.  This allows an asynchronous source (like file input) to be run
0042 // through a pipeline in the same way that iterators can be used to create pipelined
0043 // workflows.
0044 //
0045 // In order to support pipeline parallelism we introduce the concept of asynchronous
0046 // reentrancy. This is different than synchronous reentrancy.  With synchronous code a
0047 // function is reentrant if the function can be called again while a previous call to that
0048 // function is still running.  Unless otherwise specified none of these generators are
0049 // synchronously reentrant.  Care should be taken to avoid calling them in such a way (and
0050 // the utilities Visit/Collect/Await take care to do this).
0051 //
0052 // Asynchronous reentrancy on the other hand means the function is called again before the
0053 // future returned by the function is marked finished (but after the call to get the
0054 // future returns).  Some of these generators are async-reentrant while others (e.g.
0055 // those that depend on ordered processing like decompression) are not.  Read the MakeXYZ
0056 // function comments to determine which generators support async reentrancy.
0057 //
0058 // Note: Generators that are not asynchronously reentrant can still support readahead
0059 // (\see MakeSerialReadaheadGenerator).
0060 //
0061 // Readahead operators, and some other operators, may introduce queueing.  Any operators
0062 // that introduce buffering should detail the amount of buffering they introduce in their
0063 // MakeXYZ function comments.
0064 //
0065 // A generator should always be fully consumed before it is destroyed.
0066 // A generator should not mark a future complete with an error status or a terminal value
0067 //   until all outstanding futures have completed.  Generators that spawn multiple
0068 //   concurrent futures may need to hold onto an error while other concurrent futures wrap
0069 //   up.
0070 template <typename T>
0071 struct IterationTraits<AsyncGenerator<T>> {
0072   /// \brief by default when iterating through a sequence of AsyncGenerator<T>,
0073   /// an empty function indicates the end of iteration.
0074   static AsyncGenerator<T> End() { return AsyncGenerator<T>(); }
0075 
0076   static bool IsEnd(const AsyncGenerator<T>& val) { return !val; }
0077 };
0078 
0079 template <typename T>
0080 Future<T> AsyncGeneratorEnd() {
0081   return Future<T>::MakeFinished(IterationTraits<T>::End());
0082 }
0083 
0084 /// returning a future that completes when all have been visited
0085 template <typename T, typename Visitor>
0086 Future<> VisitAsyncGenerator(AsyncGenerator<T> generator, Visitor visitor) {
0087   struct LoopBody {
0088     struct Callback {
0089       Result<ControlFlow<>> operator()(const T& next) {
0090         if (IsIterationEnd(next)) {
0091           return Break();
0092         } else {
0093           auto visited = visitor(next);
0094           if (visited.ok()) {
0095             return Continue();
0096           } else {
0097             return visited;
0098           }
0099         }
0100       }
0101 
0102       Visitor visitor;
0103     };
0104 
0105     Future<ControlFlow<>> operator()() {
0106       Callback callback{visitor};
0107       auto next = generator();
0108       return next.Then(std::move(callback));
0109     }
0110 
0111     AsyncGenerator<T> generator;
0112     Visitor visitor;
0113   };
0114 
0115   return Loop(LoopBody{std::move(generator), std::move(visitor)});
0116 }
0117 
0118 /// \brief Wait for an async generator to complete, discarding results.
0119 template <typename T>
0120 Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) {
0121   std::function<Status(T)> visitor = [](const T&) { return Status::OK(); };
0122   return VisitAsyncGenerator(generator, visitor);
0123 }
0124 
0125 /// \brief Collect the results of an async generator into a vector
0126 template <typename T>
0127 Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
0128   auto vec = std::make_shared<std::vector<T>>();
0129   auto loop_body = [generator = std::move(generator),
0130                     vec = std::move(vec)]() -> Future<ControlFlow<std::vector<T>>> {
0131     auto next = generator();
0132     return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
0133       if (IsIterationEnd(result)) {
0134         return Break(*vec);
0135       } else {
0136         vec->push_back(result);
0137         return Continue();
0138       }
0139     });
0140   };
0141   return Loop(std::move(loop_body));
0142 }
0143 
0144 /// \see MakeMappedGenerator
0145 template <typename T, typename V>
0146 class MappingGenerator {
0147  public:
0148   MappingGenerator(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
0149       : state_(std::make_shared<State>(std::move(source), std::move(map))) {}
0150 
0151   Future<V> operator()() {
0152     auto future = Future<V>::Make();
0153     bool should_trigger;
0154     {
0155       auto guard = state_->mutex.Lock();
0156       if (state_->finished) {
0157         return AsyncGeneratorEnd<V>();
0158       }
0159       should_trigger = state_->waiting_jobs.empty();
0160       state_->waiting_jobs.push_back(future);
0161     }
0162     if (should_trigger) {
0163       state_->source().AddCallback(Callback{state_});
0164     }
0165     return future;
0166   }
0167 
0168  private:
0169   struct State {
0170     State(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
0171         : source(std::move(source)),
0172           map(std::move(map)),
0173           waiting_jobs(),
0174           mutex(),
0175           finished(false) {}
0176 
0177     void Purge() {
0178       // This might be called by an original callback (if the source iterator fails or
0179       // ends) or by a mapped callback (if the map function fails or ends prematurely).
0180       // Either way it should only be called once and after finished is set so there is no
0181       // need to guard access to `waiting_jobs`.
0182       while (!waiting_jobs.empty()) {
0183         waiting_jobs.front().MarkFinished(IterationTraits<V>::End());
0184         waiting_jobs.pop_front();
0185       }
0186     }
0187 
0188     AsyncGenerator<T> source;
0189     std::function<Future<V>(const T&)> map;
0190     std::deque<Future<V>> waiting_jobs;
0191     util::Mutex mutex;
0192     bool finished;
0193   };
0194 
0195   struct Callback;
0196 
0197   struct MappedCallback {
0198     void operator()(const Result<V>& maybe_next) {
0199       bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
0200       bool should_purge = false;
0201       if (end) {
0202         {
0203           auto guard = state->mutex.Lock();
0204           should_purge = !state->finished;
0205           state->finished = true;
0206         }
0207       }
0208       sink.MarkFinished(maybe_next);
0209       if (should_purge) {
0210         state->Purge();
0211       }
0212     }
0213     std::shared_ptr<State> state;
0214     Future<V> sink;
0215   };
0216 
0217   struct Callback {
0218     void operator()(const Result<T>& maybe_next) {
0219       Future<V> sink;
0220       bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
0221       bool should_purge = false;
0222       bool should_trigger;
0223       {
0224         auto guard = state->mutex.Lock();
0225         // A MappedCallback may have purged or be purging the queue;
0226         // we shouldn't do anything here.
0227         if (state->finished) return;
0228         if (end) {
0229           should_purge = !state->finished;
0230           state->finished = true;
0231         }
0232         sink = state->waiting_jobs.front();
0233         state->waiting_jobs.pop_front();
0234         should_trigger = !end && !state->waiting_jobs.empty();
0235       }
0236       if (should_purge) {
0237         state->Purge();
0238       }
0239       if (should_trigger) {
0240         state->source().AddCallback(Callback{state});
0241       }
0242       if (maybe_next.ok()) {
0243         const T& val = maybe_next.ValueUnsafe();
0244         if (IsIterationEnd(val)) {
0245           sink.MarkFinished(IterationTraits<V>::End());
0246         } else {
0247           Future<V> mapped_fut = state->map(val);
0248           mapped_fut.AddCallback(MappedCallback{std::move(state), std::move(sink)});
0249         }
0250       } else {
0251         sink.MarkFinished(maybe_next.status());
0252       }
0253     }
0254 
0255     std::shared_ptr<State> state;
0256   };
0257 
0258   std::shared_ptr<State> state_;
0259 };
0260 
0261 /// \brief Create a generator that will apply the map function to each element of
0262 /// source.  The map function is not called on the end token.
0263 ///
0264 /// Note: This function makes a copy of `map` for each item
0265 /// Note: Errors returned from the `map` function will be propagated
0266 ///
0267 /// If the source generator is async-reentrant then this generator will be also
0268 template <typename T, typename MapFn,
0269           typename Mapped = detail::result_of_t<MapFn(const T&)>,
0270           typename V = typename EnsureFuture<Mapped>::type::ValueType>
0271 AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, MapFn map) {
0272   auto map_callback = [map = std::move(map)](const T& val) mutable -> Future<V> {
0273     return ToFuture(map(val));
0274   };
0275   return MappingGenerator<T, V>(std::move(source_generator), std::move(map_callback));
0276 }
0277 
0278 /// \brief Create a generator that will apply the map function to
0279 /// each element of source.  The map function is not called on the end
0280 /// token.  The result of the map function should be another
0281 /// generator; all these generators will then be flattened to produce
0282 /// a single stream of items.
0283 ///
0284 /// Note: This function makes a copy of `map` for each item
0285 /// Note: Errors returned from the `map` function will be propagated
0286 ///
0287 /// If the source generator is async-reentrant then this generator will be also
0288 template <typename T, typename MapFn,
0289           typename Mapped = detail::result_of_t<MapFn(const T&)>,
0290           typename V = typename EnsureFuture<Mapped>::type::ValueType>
0291 AsyncGenerator<T> MakeFlatMappedGenerator(AsyncGenerator<T> source_generator, MapFn map) {
0292   return MakeConcatenatedGenerator(
0293       MakeMappedGenerator(std::move(source_generator), std::move(map)));
0294 }
0295 
0296 /// \see MakeSequencingGenerator
0297 template <typename T, typename ComesAfter, typename IsNext>
0298 class SequencingGenerator {
0299  public:
0300   SequencingGenerator(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next,
0301                       T initial_value)
0302       : state_(std::make_shared<State>(std::move(source), std::move(compare),
0303                                        std::move(is_next), std::move(initial_value))) {}
0304 
0305   Future<T> operator()() {
0306     {
0307       auto guard = state_->mutex.Lock();
0308       // We can send a result immediately if the top of the queue is either an
0309       // error or the next item
0310       if (!state_->queue.empty() &&
0311           (!state_->queue.top().ok() ||
0312            state_->is_next(state_->previous_value, *state_->queue.top()))) {
0313         auto result = std::move(state_->queue.top());
0314         if (result.ok()) {
0315           state_->previous_value = *result;
0316         }
0317         state_->queue.pop();
0318         return Future<T>::MakeFinished(result);
0319       }
0320       if (state_->finished) {
0321         return AsyncGeneratorEnd<T>();
0322       }
0323       // The next item is not in the queue so we will need to wait
0324       auto new_waiting_fut = Future<T>::Make();
0325       state_->waiting_future = new_waiting_fut;
0326       guard.Unlock();
0327       state_->source().AddCallback(Callback{state_});
0328       return new_waiting_fut;
0329     }
0330   }
0331 
0332  private:
0333   struct WrappedComesAfter {
0334     bool operator()(const Result<T>& left, const Result<T>& right) {
0335       if (!left.ok() || !right.ok()) {
0336         // Should never happen
0337         return false;
0338       }
0339       return compare(*left, *right);
0340     }
0341     ComesAfter compare;
0342   };
0343 
0344   struct State {
0345     State(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next, T initial_value)
0346         : source(std::move(source)),
0347           is_next(std::move(is_next)),
0348           previous_value(std::move(initial_value)),
0349           waiting_future(),
0350           queue(WrappedComesAfter{compare}),
0351           finished(false),
0352           mutex() {}
0353 
0354     AsyncGenerator<T> source;
0355     IsNext is_next;
0356     T previous_value;
0357     Future<T> waiting_future;
0358     std::priority_queue<Result<T>, std::vector<Result<T>>, WrappedComesAfter> queue;
0359     bool finished;
0360     util::Mutex mutex;
0361   };
0362 
0363   class Callback {
0364    public:
0365     explicit Callback(std::shared_ptr<State> state) : state_(std::move(state)) {}
0366 
0367     void operator()(const Result<T> result) {
0368       Future<T> to_deliver;
0369       bool finished;
0370       {
0371         auto guard = state_->mutex.Lock();
0372         bool ready_to_deliver = false;
0373         if (!result.ok()) {
0374           // Clear any cached results
0375           while (!state_->queue.empty()) {
0376             state_->queue.pop();
0377           }
0378           ready_to_deliver = true;
0379           state_->finished = true;
0380         } else if (IsIterationEnd<T>(result.ValueUnsafe())) {
0381           ready_to_deliver = state_->queue.empty();
0382           state_->finished = true;
0383         } else {
0384           ready_to_deliver = state_->is_next(state_->previous_value, *result);
0385         }
0386 
0387         if (ready_to_deliver && state_->waiting_future.is_valid()) {
0388           to_deliver = state_->waiting_future;
0389           if (result.ok()) {
0390             state_->previous_value = *result;
0391           }
0392         } else {
0393           state_->queue.push(result);
0394         }
0395         // Capture state_->finished so we can access it outside the mutex
0396         finished = state_->finished;
0397       }
0398       // Must deliver result outside of the mutex
0399       if (to_deliver.is_valid()) {
0400         to_deliver.MarkFinished(result);
0401       } else {
0402         // Otherwise, if we didn't get the next item (or a terminal item), we
0403         // need to keep looking
0404         if (!finished) {
0405           state_->source().AddCallback(Callback{state_});
0406         }
0407       }
0408     }
0409 
0410    private:
0411     const std::shared_ptr<State> state_;
0412   };
0413 
0414   const std::shared_ptr<State> state_;
0415 };
0416 
0417 /// \brief Buffer an AsyncGenerator to return values in sequence order  ComesAfter
0418 /// and IsNext determine the sequence order.
0419 ///
0420 /// ComesAfter should be a BinaryPredicate that only returns true if a comes after b
0421 ///
0422 /// IsNext should be a BinaryPredicate that returns true, given `a` and `b`, only if
0423 /// `b` follows immediately after `a`.  It should return true given `initial_value` and
0424 /// `b` if `b` is the first item in the sequence.
0425 ///
0426 /// This operator will queue unboundedly while waiting for the next item.  It is intended
0427 /// for jittery sources that might scatter an ordered sequence.  It is NOT intended to
0428 /// sort.  Using it to try and sort could result in excessive RAM usage.  This generator
0429 /// will queue up to N blocks where N is the max "out of order"ness of the source.
0430 ///
0431 /// For example, if the source is 1,6,2,5,4,3 it will queue 3 blocks because 3 is 3
0432 /// blocks beyond where it belongs.
0433 ///
0434 /// This generator is not async-reentrant but it consists only of a simple log(n)
0435 /// insertion into a priority queue.
0436 template <typename T, typename ComesAfter, typename IsNext>
0437 AsyncGenerator<T> MakeSequencingGenerator(AsyncGenerator<T> source_generator,
0438                                           ComesAfter compare, IsNext is_next,
0439                                           T initial_value) {
0440   return SequencingGenerator<T, ComesAfter, IsNext>(
0441       std::move(source_generator), std::move(compare), std::move(is_next),
0442       std::move(initial_value));
0443 }
0444 
0445 /// \see MakeTransformedGenerator
0446 template <typename T, typename V>
0447 class TransformingGenerator {
0448   // The transforming generator state will be referenced as an async generator but will
0449   // also be referenced via callback to various futures.  If the async generator owner
0450   // moves it around we need the state to be consistent for future callbacks.
0451   struct TransformingGeneratorState
0452       : std::enable_shared_from_this<TransformingGeneratorState> {
0453     TransformingGeneratorState(AsyncGenerator<T> generator, Transformer<T, V> transformer)
0454         : generator_(std::move(generator)),
0455           transformer_(std::move(transformer)),
0456           last_value_(),
0457           finished_() {}
0458 
0459     Future<V> operator()() {
0460       while (true) {
0461         auto maybe_next_result = Pump();
0462         if (!maybe_next_result.ok()) {
0463           return Future<V>::MakeFinished(maybe_next_result.status());
0464         }
0465         auto maybe_next = std::move(maybe_next_result).ValueUnsafe();
0466         if (maybe_next.has_value()) {
0467           return Future<V>::MakeFinished(*std::move(maybe_next));
0468         }
0469 
0470         auto next_fut = generator_();
0471         // If finished already, process results immediately inside the loop to avoid
0472         // stack overflow
0473         if (next_fut.is_finished()) {
0474           auto next_result = next_fut.result();
0475           if (next_result.ok()) {
0476             last_value_ = *next_result;
0477           } else {
0478             return Future<V>::MakeFinished(next_result.status());
0479           }
0480           // Otherwise, if not finished immediately, add callback to process results
0481         } else {
0482           auto self = this->shared_from_this();
0483           return next_fut.Then([self](const T& next_result) {
0484             self->last_value_ = next_result;
0485             return (*self)();
0486           });
0487         }
0488       }
0489     }
0490 
0491     // See comment on TransformingIterator::Pump
0492     Result<std::optional<V>> Pump() {
0493       if (!finished_ && last_value_.has_value()) {
0494         ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
0495         if (next.ReadyForNext()) {
0496           if (IsIterationEnd(*last_value_)) {
0497             finished_ = true;
0498           }
0499           last_value_.reset();
0500         }
0501         if (next.Finished()) {
0502           finished_ = true;
0503         }
0504         if (next.HasValue()) {
0505           return next.Value();
0506         }
0507       }
0508       if (finished_) {
0509         return IterationTraits<V>::End();
0510       }
0511       return std::nullopt;
0512     }
0513 
0514     AsyncGenerator<T> generator_;
0515     Transformer<T, V> transformer_;
0516     std::optional<T> last_value_;
0517     bool finished_;
0518   };
0519 
0520  public:
0521   explicit TransformingGenerator(AsyncGenerator<T> generator,
0522                                  Transformer<T, V> transformer)
0523       : state_(std::make_shared<TransformingGeneratorState>(std::move(generator),
0524                                                             std::move(transformer))) {}
0525 
0526   Future<V> operator()() { return (*state_)(); }
0527 
0528  protected:
0529   std::shared_ptr<TransformingGeneratorState> state_;
0530 };
0531 
0532 /// \brief Transform an async generator using a transformer function returning a new
0533 /// AsyncGenerator
0534 ///
0535 /// The transform function here behaves exactly the same as the transform function in
0536 /// MakeTransformedIterator and you can safely use the same transform function to
0537 /// transform both synchronous and asynchronous streams.
0538 ///
0539 /// This generator is not async-reentrant
0540 ///
0541 /// This generator may queue up to 1 instance of T but will not delay
0542 template <typename T, typename V>
0543 AsyncGenerator<V> MakeTransformedGenerator(AsyncGenerator<T> generator,
0544                                            Transformer<T, V> transformer) {
0545   return TransformingGenerator<T, V>(generator, transformer);
0546 }
0547 
0548 /// \see MakeSerialReadaheadGenerator
0549 template <typename T>
0550 class SerialReadaheadGenerator {
0551  public:
0552   SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
0553       : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
0554 
0555   Future<T> operator()() {
0556     if (state_->first_) {
0557       // Lazy generator, need to wait for the first ask to prime the pump
0558       state_->first_ = false;
0559       auto next = state_->source_();
0560       return next.Then(Callback{state_}, ErrCallback{state_});
0561     }
0562 
0563     // This generator is not async-reentrant.  We won't be called until the last
0564     // future finished so we know there is something in the queue
0565     auto finished = state_->finished_.load();
0566     if (finished && state_->readahead_queue_.IsEmpty()) {
0567       return AsyncGeneratorEnd<T>();
0568     }
0569 
0570     std::shared_ptr<Future<T>> next;
0571     if (!state_->readahead_queue_.Read(next)) {
0572       return Status::UnknownError("Could not read from readahead_queue");
0573     }
0574 
0575     auto last_available = state_->spaces_available_.fetch_add(1);
0576     if (last_available == 0 && !finished) {
0577       // Reader idled out, we need to restart it
0578       ARROW_RETURN_NOT_OK(state_->Pump(state_));
0579     }
0580     return *next;
0581   }
0582 
0583  private:
0584   struct State {
0585     State(AsyncGenerator<T> source, int max_readahead)
0586         : first_(true),
0587           source_(std::move(source)),
0588           finished_(false),
0589           // There is one extra "space" for the in-flight request
0590           spaces_available_(max_readahead + 1),
0591           // The SPSC queue has size-1 "usable" slots so we need to overallocate 1
0592           readahead_queue_(max_readahead + 1) {}
0593 
0594     Status Pump(const std::shared_ptr<State>& self) {
0595       // Can't do readahead_queue.write(source().Then(...)) because then the
0596       // callback might run immediately and add itself to the queue before this gets added
0597       // to the queue messing up the order.
0598       auto next_slot = std::make_shared<Future<T>>();
0599       auto written = readahead_queue_.Write(next_slot);
0600       if (!written) {
0601         return Status::UnknownError("Could not write to readahead_queue");
0602       }
0603       // If this Pump is being called from a callback it is possible for the source to
0604       // poll and read from the queue between the Write and this spot where we fill the
0605       // value in. However, it is not possible for the future to read this value we are
0606       // writing.  That is because this callback (the callback for future X) must be
0607       // finished before future X is marked complete and this source is not pulled
0608       // reentrantly so it will not poll for future X+1 until this callback has completed.
0609       *next_slot = source_().Then(Callback{self}, ErrCallback{self});
0610       return Status::OK();
0611     }
0612 
0613     // Only accessed by the consumer end
0614     bool first_;
0615     // Accessed by both threads
0616     AsyncGenerator<T> source_;
0617     std::atomic<bool> finished_;
0618     // The queue has a size but it is not atomic.  We keep track of how many spaces are
0619     // left in the queue here so we know if we've just written the last value and we need
0620     // to stop reading ahead or if we've just read from a full queue and we need to
0621     // restart reading ahead
0622     std::atomic<uint32_t> spaces_available_;
0623     // Needs to be a queue of shared_ptr and not Future because we set the value of the
0624     // future after we add it to the queue
0625     util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue_;
0626   };
0627 
0628   struct Callback {
0629     Result<T> operator()(const T& next) {
0630       if (IsIterationEnd(next)) {
0631         state_->finished_.store(true);
0632         return next;
0633       }
0634       auto last_available = state_->spaces_available_.fetch_sub(1);
0635       if (last_available > 1) {
0636         ARROW_RETURN_NOT_OK(state_->Pump(state_));
0637       }
0638       return next;
0639     }
0640 
0641     std::shared_ptr<State> state_;
0642   };
0643 
0644   struct ErrCallback {
0645     Result<T> operator()(const Status& st) {
0646       state_->finished_.store(true);
0647       return st;
0648     }
0649 
0650     std::shared_ptr<State> state_;
0651   };
0652 
0653   std::shared_ptr<State> state_;
0654 };
0655 
0656 /// \see MakeFromFuture
0657 template <typename T>
0658 class FutureFirstGenerator {
0659  public:
0660   explicit FutureFirstGenerator(Future<AsyncGenerator<T>> future)
0661       : state_(std::make_shared<State>(std::move(future))) {}
0662 
0663   Future<T> operator()() {
0664     if (state_->source_) {
0665       return state_->source_();
0666     } else {
0667       auto state = state_;
0668       return state_->future_.Then([state](const AsyncGenerator<T>& source) {
0669         state->source_ = source;
0670         return state->source_();
0671       });
0672     }
0673   }
0674 
0675  private:
0676   struct State {
0677     explicit State(Future<AsyncGenerator<T>> future) : future_(future), source_() {}
0678 
0679     Future<AsyncGenerator<T>> future_;
0680     AsyncGenerator<T> source_;
0681   };
0682 
0683   std::shared_ptr<State> state_;
0684 };
0685 
0686 /// \brief Transform a Future<AsyncGenerator<T>> into an AsyncGenerator<T>
0687 /// that waits for the future to complete as part of the first item.
0688 ///
0689 /// This generator is not async-reentrant (even if the generator yielded by future is)
0690 ///
0691 /// This generator does not queue
0692 template <typename T>
0693 AsyncGenerator<T> MakeFromFuture(Future<AsyncGenerator<T>> future) {
0694   return FutureFirstGenerator<T>(std::move(future));
0695 }
0696 
0697 /// \brief Create a generator that will pull from the source into a queue.  Unlike
0698 /// MakeReadaheadGenerator this will not pull reentrantly from the source.
0699 ///
0700 /// The source generator does not need to be async-reentrant
0701 ///
0702 /// This generator is not async-reentrant (even if the source is)
0703 ///
0704 /// This generator may queue up to max_readahead additional instances of T
0705 template <typename T>
0706 AsyncGenerator<T> MakeSerialReadaheadGenerator(AsyncGenerator<T> source_generator,
0707                                                int max_readahead) {
0708   return SerialReadaheadGenerator<T>(std::move(source_generator), max_readahead);
0709 }
0710 
0711 /// \brief Create a generator that immediately pulls from the source
0712 ///
0713 /// Typical generators do not pull from their source until they themselves
0714 /// are pulled.  This generator does not follow that convention and will call
0715 /// generator() once before it returns.  The returned generator will otherwise
0716 /// mirror the source.
0717 ///
0718 /// This generator forwards async-reentrant pressure to the source
0719 /// This generator buffers one item (the first result) until it is delivered.
0720 template <typename T>
0721 AsyncGenerator<T> MakeAutoStartingGenerator(AsyncGenerator<T> generator) {
0722   struct AutostartGenerator {
0723     Future<T> operator()() {
0724       if (first_future->is_valid()) {
0725         Future<T> result = *first_future;
0726         *first_future = Future<T>();
0727         return result;
0728       }
0729       return source();
0730     }
0731 
0732     std::shared_ptr<Future<T>> first_future;
0733     AsyncGenerator<T> source;
0734   };
0735 
0736   std::shared_ptr<Future<T>> first_future = std::make_shared<Future<T>>(generator());
0737   return AutostartGenerator{std::move(first_future), std::move(generator)};
0738 }
0739 
0740 /// \see MakeReadaheadGenerator
0741 template <typename T>
0742 class ReadaheadGenerator {
0743  public:
0744   ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
0745       : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
0746 
0747   Future<T> AddMarkFinishedContinuation(Future<T> fut) {
0748     auto state = state_;
0749     return fut.Then(
0750         [state](const T& result) -> Future<T> {
0751           state->MarkFinishedIfDone(result);
0752           if (state->finished.load()) {
0753             if (state->num_running.fetch_sub(1) == 1) {
0754               state->final_future.MarkFinished();
0755             }
0756           } else {
0757             state->num_running.fetch_sub(1);
0758           }
0759           return result;
0760         },
0761         [state](const Status& err) -> Future<T> {
0762           // If there is an error we need to make sure all running
0763           // tasks finish before we return the error.
0764           state->finished.store(true);
0765           if (state->num_running.fetch_sub(1) == 1) {
0766             state->final_future.MarkFinished();
0767           }
0768           return state->final_future.Then([err]() -> Result<T> { return err; });
0769         });
0770   }
0771 
0772   Future<T> operator()() {
0773     if (state_->readahead_queue.empty()) {
0774       // This is the first request, let's pump the underlying queue
0775       state_->num_running.store(state_->max_readahead);
0776       for (int i = 0; i < state_->max_readahead; i++) {
0777         auto next = state_->source_generator();
0778         auto next_after_check = AddMarkFinishedContinuation(std::move(next));
0779         state_->readahead_queue.push(std::move(next_after_check));
0780       }
0781     }
0782     // Pop one and add one
0783     auto result = state_->readahead_queue.front();
0784     state_->readahead_queue.pop();
0785     if (state_->finished.load()) {
0786       state_->readahead_queue.push(AsyncGeneratorEnd<T>());
0787     } else {
0788       state_->num_running.fetch_add(1);
0789       auto back_of_queue = state_->source_generator();
0790       auto back_of_queue_after_check =
0791           AddMarkFinishedContinuation(std::move(back_of_queue));
0792       state_->readahead_queue.push(std::move(back_of_queue_after_check));
0793     }
0794     return result;
0795   }
0796 
0797  private:
0798   struct State {
0799     State(AsyncGenerator<T> source_generator, int max_readahead)
0800         : source_generator(std::move(source_generator)), max_readahead(max_readahead) {}
0801 
0802     void MarkFinishedIfDone(const T& next_result) {
0803       if (IsIterationEnd(next_result)) {
0804         finished.store(true);
0805       }
0806     }
0807 
0808     AsyncGenerator<T> source_generator;
0809     int max_readahead;
0810     Future<> final_future = Future<>::Make();
0811     std::atomic<int> num_running{0};
0812     std::atomic<bool> finished{false};
0813     std::queue<Future<T>> readahead_queue;
0814   };
0815 
0816   std::shared_ptr<State> state_;
0817 };
0818 
0819 /// \brief A generator where the producer pushes items on a queue.
0820 ///
0821 /// No back-pressure is applied, so this generator is mostly useful when
0822 /// producing the values is neither CPU- nor memory-expensive (e.g. fetching
0823 /// filesystem metadata).
0824 ///
0825 /// This generator is not async-reentrant.
0826 template <typename T>
0827 class PushGenerator {
0828   struct State {
0829     State() {}
0830 
0831     util::Mutex mutex;
0832     std::deque<Result<T>> result_q;
0833     std::optional<Future<T>> consumer_fut;
0834     bool finished = false;
0835   };
0836 
0837  public:
0838   /// Producer API for PushGenerator
0839   class Producer {
0840    public:
0841     explicit Producer(const std::shared_ptr<State>& state) : weak_state_(state) {}
0842 
0843     /// \brief Push a value on the queue
0844     ///
0845     /// True is returned if the value was pushed, false if the generator is
0846     /// already closed or destroyed.  If the latter, it is recommended to stop
0847     /// producing any further values.
0848     bool Push(Result<T> result) {
0849       auto state = weak_state_.lock();
0850       if (!state) {
0851         // Generator was destroyed
0852         return false;
0853       }
0854       auto lock = state->mutex.Lock();
0855       if (state->finished) {
0856         // Closed early
0857         return false;
0858       }
0859       if (state->consumer_fut.has_value()) {
0860         auto fut = std::move(state->consumer_fut.value());
0861         state->consumer_fut.reset();
0862         lock.Unlock();  // unlock before potentially invoking a callback
0863         fut.MarkFinished(std::move(result));
0864       } else {
0865         state->result_q.push_back(std::move(result));
0866       }
0867       return true;
0868     }
0869 
0870     /// \brief Tell the consumer we have finished producing
0871     ///
0872     /// It is allowed to call this and later call Push() again ("early close").
0873     /// In this case, calls to Push() after the queue is closed are silently
0874     /// ignored.  This can help implementing non-trivial cancellation cases.
0875     ///
0876     /// True is returned on success, false if the generator is already closed
0877     /// or destroyed.
0878     bool Close() {
0879       auto state = weak_state_.lock();
0880       if (!state) {
0881         // Generator was destroyed
0882         return false;
0883       }
0884       auto lock = state->mutex.Lock();
0885       if (state->finished) {
0886         // Already closed
0887         return false;
0888       }
0889       state->finished = true;
0890       if (state->consumer_fut.has_value()) {
0891         auto fut = std::move(state->consumer_fut.value());
0892         state->consumer_fut.reset();
0893         lock.Unlock();  // unlock before potentially invoking a callback
0894         fut.MarkFinished(IterationTraits<T>::End());
0895       }
0896       return true;
0897     }
0898 
0899     /// Return whether the generator was closed or destroyed.
0900     bool is_closed() const {
0901       auto state = weak_state_.lock();
0902       if (!state) {
0903         // Generator was destroyed
0904         return true;
0905       }
0906       auto lock = state->mutex.Lock();
0907       return state->finished;
0908     }
0909 
0910    private:
0911     const std::weak_ptr<State> weak_state_;
0912   };
0913 
0914   PushGenerator() : state_(std::make_shared<State>()) {}
0915 
0916   /// Read an item from the queue
0917   Future<T> operator()() const {
0918     auto lock = state_->mutex.Lock();
0919     assert(!state_->consumer_fut.has_value());  // Non-reentrant
0920     if (!state_->result_q.empty()) {
0921       auto fut = Future<T>::MakeFinished(std::move(state_->result_q.front()));
0922       state_->result_q.pop_front();
0923       return fut;
0924     }
0925     if (state_->finished) {
0926       return AsyncGeneratorEnd<T>();
0927     }
0928     auto fut = Future<T>::Make();
0929     state_->consumer_fut = fut;
0930     return fut;
0931   }
0932 
0933   /// \brief Return producer-side interface
0934   ///
0935   /// The returned object must be used by the producer to push values on the queue.
0936   /// Only a single Producer object should be instantiated.
0937   Producer producer() { return Producer{state_}; }
0938 
0939  private:
0940   const std::shared_ptr<State> state_;
0941 };
0942 
0943 /// \brief Create a generator that pulls reentrantly from a source
0944 /// This generator will pull reentrantly from a source, ensuring that max_readahead
0945 /// requests are active at any given time.
0946 ///
0947 /// The source generator must be async-reentrant
0948 ///
0949 /// This generator itself is async-reentrant.
0950 ///
0951 /// This generator may queue up to max_readahead instances of T
0952 template <typename T>
0953 AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
0954                                          int max_readahead) {
0955   return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
0956 }
0957 
0958 /// \brief Creates a generator that will yield finished futures from a vector
0959 ///
0960 /// This generator is async-reentrant
0961 template <typename T>
0962 AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
0963   struct State {
0964     explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
0965 
0966     std::vector<T> vec;
0967     std::atomic<std::size_t> vec_idx;
0968   };
0969 
0970   auto state = std::make_shared<State>(std::move(vec));
0971   return [state]() {
0972     auto idx = state->vec_idx.fetch_add(1);
0973     if (idx >= state->vec.size()) {
0974       // Eagerly return memory
0975       state->vec.clear();
0976       return AsyncGeneratorEnd<T>();
0977     }
0978     return Future<T>::MakeFinished(state->vec[idx]);
0979   };
0980 }
0981 
0982 /// \see MakeMergedGenerator
0983 template <typename T>
0984 class MergedGenerator {
0985   // Note, the implementation of this class is quite complex at the moment (PRs to
0986   // simplify are always welcome)
0987   //
0988   // Terminology is borrowed from rxjs.  This is a pull based implementation of the
0989   // mergeAll operator.  The "outer subscription" refers to the async
0990   // generator that the caller provided when creating this.  The outer subscription
0991   // yields generators.
0992   //
0993   // Each of these generators is then subscribed to (up to max_subscriptions) and these
0994   // are referred to as "inner subscriptions".
0995   //
0996   // As soon as we start we try and establish `max_subscriptions` inner subscriptions. For
0997   // each inner subscription we will cache up to 1 value.  This means we may have more
0998   // values than we have been asked for.  In our example, if a caller asks for one record
0999   // batch we will start scanning `max_subscriptions` different files.  For each file we
1000   // will only queue up to 1 batch (so a separate readahead is needed on the file if batch
1001   // readahead is desired).
1002   //
1003   // If the caller is slow we may accumulate ready-to-deliver items.  These are stored
1004   // in `delivered_jobs`.
1005   //
1006   // If the caller is very quick we may accumulate requests.  These are stored in
1007   // `waiting_jobs`.
1008   //
1009   // It may be helpful to consider an example, in the scanner the outer subscription
1010   // is some kind of asynchronous directory listing.  The inner subscription is
1011   // then a scan on a file yielded by the directory listing.
1012   //
1013   // An "outstanding" request is when we have polled either the inner or outer
1014   // subscription but that future hasn't completed yet.
1015   //
1016   // There are three possible "events" that can happen.
1017   // * A caller could request the next future
1018   // * An outer callback occurs when the next subscription is ready (e.g. the directory
1019   //     listing has produced a new file)
1020   // * An inner callback occurs when one of the inner subscriptions emits a value (e.g.
1021   //     a file scan emits a record batch)
1022   //
1023   // Any time an event happens the logic is broken into two phases.  First, we grab the
1024   // lock and modify the shared state.  While doing this we figure out what callbacks we
1025   // will need to execute.  Then, we give up the lock and execute these callbacks.  It is
1026   // important to execute these callbacks without the lock to avoid deadlock.
1027  public:
1028   explicit MergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
1029                            int max_subscriptions)
1030       : state_(std::make_shared<State>(std::move(source), max_subscriptions)) {}
1031 
1032   Future<T> operator()() {
1033     // A caller has requested a future
1034     Future<T> waiting_future;
1035     std::shared_ptr<DeliveredJob> delivered_job;
1036     bool mark_generator_complete = false;
1037     {
1038       auto guard = state_->mutex.Lock();
1039       if (!state_->delivered_jobs.empty()) {
1040         // If we have a job sitting around we can deliver it
1041         delivered_job = std::move(state_->delivered_jobs.front());
1042         state_->delivered_jobs.pop_front();
1043         if (state_->IsCompleteUnlocked(guard)) {
1044           // It's possible this waiting job was the only thing left to handle and
1045           // we have now completed the generator.
1046           mark_generator_complete = true;
1047         } else {
1048           // Since we had a job sitting around we also had an inner subscription
1049           // that had paused.  We are going to restart this inner subscription and
1050           // so there will be a new outstanding request.
1051           state_->outstanding_requests++;
1052         }
1053       } else if (state_->broken ||
1054                  (!state_->first && state_->num_running_subscriptions == 0)) {
1055         // If we are broken or exhausted then prepare a terminal item but
1056         // we won't complete it until we've finished.
1057         Result<T> end_res = IterationEnd<T>();
1058         if (!state_->final_error.ok()) {
1059           end_res = state_->final_error;
1060           state_->final_error = Status::OK();
1061         }
1062         return state_->all_finished.Then([end_res]() -> Result<T> { return end_res; });
1063       } else {
1064         // Otherwise we just queue the request and it will be completed when one of the
1065         // ongoing inner subscriptions delivers a result
1066         waiting_future = Future<T>::Make();
1067         state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future));
1068       }
1069       if (state_->first) {
1070         // On the first request we are going to try and immediately fill our queue
1071         // of subscriptions.  We assume we are going to be able to start them all.
1072         state_->outstanding_requests +=
1073             static_cast<int>(state_->active_subscriptions.size());
1074         state_->num_running_subscriptions +=
1075             static_cast<int>(state_->active_subscriptions.size());
1076       }
1077     }
1078     // If we grabbed a finished item from the delivered_jobs queue then we may need
1079     // to mark the generator finished or issue a request for a new item to fill in
1080     // the spot we just vacated.  Notice that we issue that request to the same
1081     // subscription that delivered it (deliverer).
1082     if (delivered_job) {
1083       if (mark_generator_complete) {
1084         state_->all_finished.MarkFinished();
1085       } else {
1086         delivered_job->deliverer().AddCallback(
1087             InnerCallback(state_, delivered_job->index));
1088       }
1089       return std::move(delivered_job->value);
1090     }
1091     // On the first call we try and fill up our subscriptions.  It's possible the outer
1092     // generator only has a few items and we can't fill up to what we were hoping.  In
1093     // that case we have to bail early.
1094     if (state_->first) {
1095       state_->first = false;
1096       mark_generator_complete = false;
1097       for (int i = 0; i < static_cast<int>(state_->active_subscriptions.size()); i++) {
1098         state_->PullSource().AddCallback(
1099             OuterCallback{state_, static_cast<std::size_t>(i)});
1100         // If we have to bail early then we need to update the shared state again so
1101         // we need to reacquire the lock.
1102         auto guard = state_->mutex.Lock();
1103         if (state_->source_exhausted) {
1104           int excess_requests =
1105               static_cast<int>(state_->active_subscriptions.size()) - i - 1;
1106           state_->outstanding_requests -= excess_requests;
1107           state_->num_running_subscriptions -= excess_requests;
1108           if (excess_requests > 0) {
1109             // It's possible that we are completing the generator by reducing the number
1110             // of outstanding requests (e.g. this happens when the outer subscription and
1111             // all inner subscriptions are synchronous)
1112             mark_generator_complete = state_->IsCompleteUnlocked(guard);
1113           }
1114           break;
1115         }
1116       }
1117       if (mark_generator_complete) {
1118         state_->MarkFinishedAndPurge();
1119       }
1120     }
1121     return waiting_future;
1122   }
1123 
1124  private:
1125   struct DeliveredJob {
1126     explicit DeliveredJob(AsyncGenerator<T> deliverer_, Result<T> value_,
1127                           std::size_t index_)
1128         : deliverer(deliverer_), value(std::move(value_)), index(index_) {}
1129 
1130     // The generator that delivered this result, we will request another item
1131     // from this generator once the result is delivered
1132     AsyncGenerator<T> deliverer;
1133     // The result we received from the generator
1134     Result<T> value;
1135     // The index of the generator (in active_subscriptions) that delivered this
1136     // result.  This is used if we need to replace a finished generator.
1137     std::size_t index;
1138   };
1139 
1140   struct State {
1141     State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
1142         : source(std::move(source)),
1143           active_subscriptions(max_subscriptions),
1144           delivered_jobs(),
1145           waiting_jobs(),
1146           mutex(),
1147           first(true),
1148           broken(false),
1149           source_exhausted(false),
1150           outstanding_requests(0),
1151           num_running_subscriptions(0),
1152           final_error(Status::OK()) {}
1153 
1154     Future<AsyncGenerator<T>> PullSource() {
1155       // Need to guard access to source() so we don't pull sync-reentrantly which
1156       // is never valid.
1157       auto lock = mutex.Lock();
1158       return source();
1159     }
1160 
1161     void SignalErrorUnlocked(const util::Mutex::Guard& guard) {
1162       broken = true;
1163       // Empty any results that have arrived but not asked for.
1164       while (!delivered_jobs.empty()) {
1165         delivered_jobs.pop_front();
1166       }
1167     }
1168 
1169     // This function is called outside the mutex but it will only ever be
1170     // called once
1171     void MarkFinishedAndPurge() {
1172       all_finished.MarkFinished();
1173       while (!waiting_jobs.empty()) {
1174         waiting_jobs.front()->MarkFinished(IterationEnd<T>());
1175         waiting_jobs.pop_front();
1176       }
1177     }
1178 
1179     // This is called outside the mutex but it is only ever called
1180     // once and Future<>::AddCallback is thread-safe
1181     void MarkFinalError(const Status& err, Future<T> maybe_sink) {
1182       if (maybe_sink.is_valid()) {
1183         // Someone is waiting for this error so lets mark it complete when
1184         // all the work is done
1185         all_finished.AddCallback([maybe_sink, err](const Status& status) mutable {
1186           maybe_sink.MarkFinished(err);
1187         });
1188       } else {
1189         // No one is waiting for this error right now so it will be delivered
1190         // next.
1191         final_error = err;
1192       }
1193     }
1194 
1195     bool IsCompleteUnlocked(const util::Mutex::Guard& guard) {
1196       return outstanding_requests == 0 &&
1197              (broken || (source_exhausted && num_running_subscriptions == 0 &&
1198                          delivered_jobs.empty()));
1199     }
1200 
1201     bool MarkTaskFinishedUnlocked(const util::Mutex::Guard& guard) {
1202       --outstanding_requests;
1203       return IsCompleteUnlocked(guard);
1204     }
1205 
1206     // The outer generator.  Each item we pull from this will be its own generator
1207     // and become an inner subscription
1208     AsyncGenerator<AsyncGenerator<T>> source;
1209     // active_subscriptions and delivered_jobs will be bounded by max_subscriptions
1210     std::vector<AsyncGenerator<T>> active_subscriptions;
1211     // Results delivered by the inner subscriptions that weren't yet asked for by the
1212     // caller
1213     std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
1214     // waiting_jobs is unbounded, reentrant pulls (e.g. AddReadahead) will provide the
1215     // backpressure
1216     std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
1217     // A future that will be marked complete when the terminal item has arrived and all
1218     // outstanding futures have completed.  It is used to hold off emission of an error
1219     // until all outstanding work is done.
1220     Future<> all_finished = Future<>::Make();
1221     util::Mutex mutex;
1222     // A flag cleared when the caller firsts asks for a future.  Used to start polling.
1223     bool first;
1224     // A flag set when an error arrives, prevents us from issuing new requests.
1225     bool broken;
1226     // A flag set when the outer subscription has been exhausted.  Prevents us from
1227     // pulling it further (even though it would be generally harmless) and lets us know we
1228     // are finishing up.
1229     bool source_exhausted;
1230     // The number of futures that we have requested from either the outer or inner
1231     // subscriptions that have not yet completed.  We cannot mark all_finished until this
1232     // reaches 0.  This will never be greater than max_subscriptions
1233     int outstanding_requests;
1234     // The number of running subscriptions.  We ramp this up to `max_subscriptions` as
1235     // soon as the first item is requested and then it stays at that level (each exhausted
1236     // inner subscription is replaced by a new inner subscription) until the outer
1237     // subscription is exhausted at which point this descends to 0 (and source_exhausted)
1238     // is then set to true.
1239     int num_running_subscriptions;
1240     // If an error arrives, and the caller hasn't asked for that item, we store the error
1241     // here.  It is analagous to delivered_jobs but for errors instead of finished
1242     // results.
1243     Status final_error;
1244   };
1245 
1246   struct InnerCallback {
1247     InnerCallback(std::shared_ptr<State> state, std::size_t index, bool recursive = false)
1248         : state(std::move(state)), index(index), recursive(recursive) {}
1249 
1250     void operator()(const Result<T>& maybe_next_ref) {
1251       // An item has been delivered by one of the inner subscriptions
1252       Future<T> next_fut;
1253       const Result<T>* maybe_next = &maybe_next_ref;
1254 
1255       // When an item is delivered (and the caller has asked for it) we grab the
1256       // next item from the inner subscription.  To avoid this behavior leading to an
1257       // infinite loop (this can happen if the caller's callback asks for the next item)
1258       // we use a while loop.
1259       while (true) {
1260         Future<T> sink;
1261         bool sub_finished = maybe_next->ok() && IsIterationEnd(**maybe_next);
1262         bool pull_next_sub = false;
1263         bool was_broken = false;
1264         bool should_mark_gen_complete = false;
1265         bool should_mark_final_error = false;
1266         {
1267           auto guard = state->mutex.Lock();
1268           if (state->broken) {
1269             // We've errored out previously so ignore the result.  If anyone was waiting
1270             // for this they will get IterationEnd when we purge
1271             was_broken = true;
1272           } else {
1273             if (!sub_finished) {
1274               // There is a result to deliver.  Either we can deliver it now or we will
1275               // queue it up
1276               if (state->waiting_jobs.empty()) {
1277                 state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
1278                     state->active_subscriptions[index], *maybe_next, index));
1279               } else {
1280                 sink = std::move(*state->waiting_jobs.front());
1281                 state->waiting_jobs.pop_front();
1282               }
1283             }
1284 
1285             // If this is the first error then we transition the state to a broken state
1286             if (!maybe_next->ok()) {
1287               should_mark_final_error = true;
1288               state->SignalErrorUnlocked(guard);
1289             }
1290           }
1291 
1292           // If we finished this inner subscription then we need to grab a new inner
1293           // subscription to take its spot.  If we can't (because we're broken or
1294           // exhausted) then we aren't going to be starting any new futures and so
1295           // the number of running subscriptions drops.
1296           pull_next_sub = sub_finished && !state->source_exhausted && !was_broken;
1297           if (sub_finished && !pull_next_sub) {
1298             state->num_running_subscriptions--;
1299           }
1300           // There are three situations we won't pull again.  If an error occurred or we
1301           // are already finished or if no one was waiting for our result and so we queued
1302           // it up.  We will decrement outstanding_requests and possibly mark the
1303           // generator completed.
1304           if (state->broken || (!sink.is_valid() && !sub_finished) ||
1305               (sub_finished && state->source_exhausted)) {
1306             if (state->MarkTaskFinishedUnlocked(guard)) {
1307               should_mark_gen_complete = true;
1308             }
1309           }
1310         }
1311 
1312         // Now we have given up the lock and we can take all the actions we decided we
1313         // need to take.
1314         if (should_mark_final_error) {
1315           state->MarkFinalError(maybe_next->status(), std::move(sink));
1316         }
1317 
1318         if (should_mark_gen_complete) {
1319           state->MarkFinishedAndPurge();
1320         }
1321 
1322         // An error occurred elsewhere so there is no need to mark any future
1323         // finished (will happen during the purge) or pull from anything
1324         if (was_broken) {
1325           return;
1326         }
1327 
1328         if (pull_next_sub) {
1329           if (recursive) {
1330             was_empty = true;
1331             return;
1332           }
1333           // We pulled an end token so we need to start a new subscription
1334           // in our spot
1335           state->PullSource().AddCallback(OuterCallback{state, index});
1336         } else if (sink.is_valid()) {
1337           // We pulled a valid result and there was someone waiting for it
1338           // so lets fetch the next result from our subscription
1339           sink.MarkFinished(*maybe_next);
1340           next_fut = state->active_subscriptions[index]();
1341           if (next_fut.TryAddCallback([this]() { return InnerCallback(state, index); })) {
1342             return;
1343           }
1344           // Already completed. Avoid very deep recursion by looping
1345           // here instead of relying on the callback.
1346           maybe_next = &next_fut.result();
1347           continue;
1348         }
1349         // else: We pulled a valid result but no one was waiting for it so
1350         // we can just stop.
1351         return;
1352       }
1353     }
1354     std::shared_ptr<State> state;
1355     std::size_t index;
1356     bool recursive;
1357     bool was_empty = false;
1358   };
1359 
1360   struct OuterCallback {
1361     void operator()(const Result<AsyncGenerator<T>>& initial_maybe_next) {
1362       Result<AsyncGenerator<T>> maybe_next = initial_maybe_next;
1363       while (true) {
1364         // We have been given a new inner subscription
1365         bool should_continue = false;
1366         bool should_mark_gen_complete = false;
1367         bool should_deliver_error = false;
1368         bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next);
1369         Future<T> error_sink;
1370         {
1371           auto guard = state->mutex.Lock();
1372           if (!maybe_next.ok() || source_exhausted || state->broken) {
1373             // If here then we will not pull any more from the outer source
1374             if (!state->broken && !maybe_next.ok()) {
1375               state->SignalErrorUnlocked(guard);
1376               // If here then we are the first error so we need to deliver it
1377               should_deliver_error = true;
1378               if (!state->waiting_jobs.empty()) {
1379                 error_sink = std::move(*state->waiting_jobs.front());
1380                 state->waiting_jobs.pop_front();
1381               }
1382             }
1383             if (source_exhausted) {
1384               state->source_exhausted = true;
1385               state->num_running_subscriptions--;
1386             }
1387             if (state->MarkTaskFinishedUnlocked(guard)) {
1388               should_mark_gen_complete = true;
1389             }
1390           } else {
1391             state->active_subscriptions[index] = *maybe_next;
1392             should_continue = true;
1393           }
1394         }
1395         if (should_deliver_error) {
1396           state->MarkFinalError(maybe_next.status(), std::move(error_sink));
1397         }
1398         if (should_mark_gen_complete) {
1399           state->MarkFinishedAndPurge();
1400         }
1401         if (should_continue) {
1402           // There is a possibility that a large sequence of immediately available inner
1403           // callbacks could lead to a stack overflow.  To avoid this we need to
1404           // synchronously loop through inner/outer callbacks until we either find an
1405           // unfinished future or we find an actual item to deliver.
1406           Future<T> next_item = (*maybe_next)();
1407           if (!next_item.TryAddCallback([this] { return InnerCallback(state, index); })) {
1408             // By setting recursive to true we signal to the inner callback that, if it is
1409             // empty, instead of adding a new outer callback, it should just immediately
1410             // return, flagging was_empty so that we know we need to check the next
1411             // subscription.
1412             InnerCallback immediate_inner(state, index, /*recursive=*/true);
1413             immediate_inner(next_item.result());
1414             if (immediate_inner.was_empty) {
1415               Future<AsyncGenerator<T>> next_source = state->PullSource();
1416               if (next_source.TryAddCallback([this] {
1417                     return OuterCallback{state, index};
1418                   })) {
1419                 // We hit an unfinished future so we can stop looping
1420                 return;
1421               }
1422               // The current subscription was immediately and synchronously empty
1423               // and we were able to synchronously pull the next subscription so we
1424               // can keep looping.
1425               maybe_next = next_source.result();
1426               continue;
1427             }
1428           }
1429         }
1430         return;
1431       }
1432     }
1433     std::shared_ptr<State> state;
1434     std::size_t index;
1435   };
1436 
1437   std::shared_ptr<State> state_;
1438 };
1439 
1440 /// \brief Create a generator that takes in a stream of generators and pulls from up to
1441 /// max_subscriptions at a time
1442 ///
1443 /// Note: This may deliver items out of sequence. For example, items from the third
1444 /// AsyncGenerator generated by the source may be emitted before some items from the first
1445 /// AsyncGenerator generated by the source.
1446 ///
1447 /// This generator will pull from source async-reentrantly unless max_subscriptions is 1
1448 /// This generator will not pull from the individual subscriptions reentrantly.  Add
1449 /// readahead to the individual subscriptions if that is desired.
1450 /// This generator is async-reentrant
1451 ///
1452 /// This generator may queue up to max_subscriptions instances of T
1453 template <typename T>
1454 AsyncGenerator<T> MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
1455                                       int max_subscriptions) {
1456   return MergedGenerator<T>(std::move(source), max_subscriptions);
1457 }
1458 
1459 template <typename T>
1460 Result<AsyncGenerator<T>> MakeSequencedMergedGenerator(
1461     AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions) {
1462   if (max_subscriptions < 0) {
1463     return Status::Invalid("max_subscriptions must be a positive integer");
1464   }
1465   if (max_subscriptions == 1) {
1466     return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1");
1467   }
1468   AsyncGenerator<AsyncGenerator<T>> autostarting_source = MakeMappedGenerator(
1469       std::move(source),
1470       [](const AsyncGenerator<T>& sub) { return MakeAutoStartingGenerator(sub); });
1471   AsyncGenerator<AsyncGenerator<T>> sub_readahead =
1472       MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1);
1473   return MakeConcatenatedGenerator(std::move(sub_readahead));
1474 }
1475 
1476 /// \brief Create a generator that takes in a stream of generators and pulls from each
1477 /// one in sequence.
1478 ///
1479 /// This generator is async-reentrant but will never pull from source reentrantly and
1480 /// will never pull from any subscription reentrantly.
1481 ///
1482 /// This generator may queue 1 instance of T
1483 ///
1484 /// TODO: Could potentially make a bespoke implementation instead of MergedGenerator that
1485 /// forwards async-reentrant requests instead of buffering them (which is what
1486 /// MergedGenerator does)
1487 template <typename T>
1488 AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> source) {
1489   return MergedGenerator<T>(std::move(source), 1);
1490 }
1491 
1492 template <typename T>
1493 struct Enumerated {
1494   T value;
1495   int index;
1496   bool last;
1497 };
1498 
1499 template <typename T>
1500 struct IterationTraits<Enumerated<T>> {
1501   static Enumerated<T> End() { return Enumerated<T>{IterationEnd<T>(), -1, false}; }
1502   static bool IsEnd(const Enumerated<T>& val) { return val.index < 0; }
1503 };
1504 
1505 /// \see MakeEnumeratedGenerator
1506 template <typename T>
1507 class EnumeratingGenerator {
1508  public:
1509   EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
1510       : state_(std::make_shared<State>(std::move(source), std::move(initial_value))) {}
1511 
1512   Future<Enumerated<T>> operator()() {
1513     if (state_->finished) {
1514       return AsyncGeneratorEnd<Enumerated<T>>();
1515     } else {
1516       auto state = state_;
1517       return state->source().Then([state](const T& next) {
1518         auto finished = IsIterationEnd<T>(next);
1519         auto prev = Enumerated<T>{state->prev_value, state->prev_index, finished};
1520         state->prev_value = next;
1521         state->prev_index++;
1522         state->finished = finished;
1523         return prev;
1524       });
1525     }
1526   }
1527 
1528  private:
1529   struct State {
1530     State(AsyncGenerator<T> source, T initial_value)
1531         : source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) {
1532       finished = IsIterationEnd<T>(prev_value);
1533     }
1534 
1535     AsyncGenerator<T> source;
1536     T prev_value;
1537     int prev_index;
1538     bool finished;
1539   };
1540 
1541   std::shared_ptr<State> state_;
1542 };
1543 
1544 /// Wrap items from a source generator with positional information
1545 ///
1546 /// When used with MakeMergedGenerator and MakeSequencingGenerator this allows items to be
1547 /// processed in a "first-available" fashion and later resequenced which can reduce the
1548 /// impact of sources with erratic performance (e.g. a filesystem where some items may
1549 /// take longer to read than others).
1550 ///
1551 /// TODO(ARROW-12371) Would require this generator be async-reentrant
1552 ///
1553 /// \see MakeSequencingGenerator for an example of putting items back in order
1554 ///
1555 /// This generator is not async-reentrant
1556 ///
1557 /// This generator buffers one item (so it knows which item is the last item)
1558 template <typename T>
1559 AsyncGenerator<Enumerated<T>> MakeEnumeratedGenerator(AsyncGenerator<T> source) {
1560   return FutureFirstGenerator<Enumerated<T>>(
1561       source().Then([source](const T& initial_value) -> AsyncGenerator<Enumerated<T>> {
1562         return EnumeratingGenerator<T>(std::move(source), initial_value);
1563       }));
1564 }
1565 
1566 /// \see MakeTransferredGenerator
1567 template <typename T>
1568 class TransferringGenerator {
1569  public:
1570   explicit TransferringGenerator(AsyncGenerator<T> source, internal::Executor* executor)
1571       : source_(std::move(source)), executor_(executor) {}
1572 
1573   Future<T> operator()() { return executor_->Transfer(source_()); }
1574 
1575  private:
1576   AsyncGenerator<T> source_;
1577   internal::Executor* executor_;
1578 };
1579 
1580 /// \brief Transfer a future to an underlying executor.
1581 ///
1582 /// Continuations run on the returned future will be run on the given executor
1583 /// if they cannot be run synchronously.
1584 ///
1585 /// This is often needed to move computation off I/O threads or other external
1586 /// completion sources and back on to the CPU executor so the I/O thread can
1587 /// stay busy and focused on I/O
1588 ///
1589 /// Keep in mind that continuations called on an already completed future will
1590 /// always be run synchronously and so no transfer will happen in that case.
1591 ///
1592 /// This generator is async reentrant if the source is
1593 ///
1594 /// This generator will not queue
1595 template <typename T>
1596 AsyncGenerator<T> MakeTransferredGenerator(AsyncGenerator<T> source,
1597                                            internal::Executor* executor) {
1598   return TransferringGenerator<T>(std::move(source), executor);
1599 }
1600 
1601 /// \see MakeBackgroundGenerator
1602 template <typename T>
1603 class BackgroundGenerator {
1604  public:
1605   explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor, int max_q,
1606                                int q_restart)
1607       : state_(std::make_shared<State>(io_executor, std::move(it), max_q, q_restart)),
1608         cleanup_(std::make_shared<Cleanup>(state_.get())) {}
1609 
1610   Future<T> operator()() {
1611     auto guard = state_->mutex.Lock();
1612     Future<T> waiting_future;
1613     if (state_->queue.empty()) {
1614       if (state_->finished) {
1615         return AsyncGeneratorEnd<T>();
1616       } else {
1617         waiting_future = Future<T>::Make();
1618         state_->waiting_future = waiting_future;
1619       }
1620     } else {
1621       auto next = Future<T>::MakeFinished(std::move(state_->queue.front()));
1622       state_->queue.pop();
1623       if (state_->NeedsRestart()) {
1624         return state_->RestartTask(state_, std::move(guard), std::move(next));
1625       }
1626       return next;
1627     }
1628     // This should only trigger the very first time this method is called
1629     if (state_->NeedsRestart()) {
1630       return state_->RestartTask(state_, std::move(guard), std::move(waiting_future));
1631     }
1632     return waiting_future;
1633   }
1634 
1635  protected:
1636   static constexpr uint64_t kUnlikelyThreadId{std::numeric_limits<uint64_t>::max()};
1637 
1638   struct State {
1639     State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart)
1640         : io_executor(io_executor),
1641           max_q(max_q),
1642           q_restart(q_restart),
1643           it(std::move(it)),
1644           reading(false),
1645           finished(false),
1646           should_shutdown(false) {}
1647 
1648     void ClearQueue() {
1649       while (!queue.empty()) {
1650         queue.pop();
1651       }
1652     }
1653 
1654     bool TaskIsRunning() const { return task_finished.is_valid(); }
1655 
1656     bool NeedsRestart() const {
1657       return !finished && !reading && static_cast<int>(queue.size()) <= q_restart;
1658     }
1659 
1660     void DoRestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) {
1661       // If we get here we are actually going to start a new task so let's create a
1662       // task_finished future for it
1663       state->task_finished = Future<>::Make();
1664       state->reading = true;
1665       auto spawn_status = io_executor->Spawn(
1666           [state]() { BackgroundGenerator::WorkerTask(std::move(state)); });
1667       if (!spawn_status.ok()) {
1668         // If we can't spawn a new task then send an error to the consumer (either via a
1669         // waiting future or the queue) and mark ourselves finished
1670         state->finished = true;
1671         state->task_finished = Future<>();
1672         if (waiting_future.has_value()) {
1673           auto to_deliver = std::move(waiting_future.value());
1674           waiting_future.reset();
1675           guard.Unlock();
1676           to_deliver.MarkFinished(spawn_status);
1677         } else {
1678           ClearQueue();
1679           queue.push(spawn_status);
1680         }
1681       }
1682     }
1683 
1684     Future<T> RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard,
1685                           Future<T> next) {
1686       if (TaskIsRunning()) {
1687         // If the task is still cleaning up we need to wait for it to finish before
1688         // restarting.  We also want to block the consumer until we've restarted the
1689         // reader to avoid multiple restarts
1690         return task_finished.Then([state, next]() {
1691           // This may appear dangerous (recursive mutex) but we should be guaranteed the
1692           // outer guard has been released by this point.  We know...
1693           // * task_finished is not already finished (it would be invalid in that case)
1694           // * task_finished will not be marked complete until we've given up the mutex
1695           auto guard_ = state->mutex.Lock();
1696           state->DoRestartTask(state, std::move(guard_));
1697           return next;
1698         });
1699       }
1700       // Otherwise we can restart immediately
1701       DoRestartTask(std::move(state), std::move(guard));
1702       return next;
1703     }
1704 
1705     internal::Executor* io_executor;
1706     const int max_q;
1707     const int q_restart;
1708     Iterator<T> it;
1709     std::atomic<uint64_t> worker_thread_id{kUnlikelyThreadId};
1710 
1711     // If true, the task is actively pumping items from the queue and does not need a
1712     // restart
1713     bool reading;
1714     // Set to true when a terminal item arrives
1715     bool finished;
1716     // Signal to the background task to end early because consumers have given up on it
1717     bool should_shutdown;
1718     // If the queue is empty, the consumer will create a waiting future and wait for it
1719     std::queue<Result<T>> queue;
1720     std::optional<Future<T>> waiting_future;
1721     // Every background task is given a future to complete when it is entirely finished
1722     // processing and ready for the next task to start or for State to be destroyed
1723     Future<> task_finished;
1724     util::Mutex mutex;
1725   };
1726 
1727   // Cleanup task that will be run when all consumer references to the generator are lost
1728   struct Cleanup {
1729     explicit Cleanup(State* state) : state(state) {}
1730     ~Cleanup() {
1731       /// TODO: Once ARROW-13109 is available then we can be force consumers to spawn and
1732       /// there is no need to perform this check.
1733       ///
1734       /// It's a deadlock if we enter cleanup from
1735       /// the worker thread but it can happen if the consumer doesn't transfer away
1736       assert(state->worker_thread_id.load() != ::arrow::internal::GetThreadId());
1737       Future<> finish_fut;
1738       {
1739         auto lock = state->mutex.Lock();
1740         if (!state->TaskIsRunning()) {
1741           return;
1742         }
1743         // Signal the current task to stop and wait for it to finish
1744         state->should_shutdown = true;
1745         finish_fut = state->task_finished;
1746       }
1747       // Using future as a condition variable here
1748       Status st = finish_fut.status();
1749       ARROW_UNUSED(st);
1750     }
1751     State* state;
1752   };
1753 
1754   static void WorkerTask(std::shared_ptr<State> state) {
1755     state->worker_thread_id.store(::arrow::internal::GetThreadId());
1756     // We need to capture the state to read while outside the mutex
1757     bool reading = true;
1758     while (reading) {
1759       auto next = state->it.Next();
1760       // Need to capture state->waiting_future inside the mutex to mark finished outside
1761       Future<T> waiting_future;
1762       {
1763         auto guard = state->mutex.Lock();
1764 
1765         if (state->should_shutdown) {
1766           state->finished = true;
1767           break;
1768         }
1769 
1770         if (!next.ok() || IsIterationEnd<T>(*next)) {
1771           // Terminal item.  Mark finished to true, send this last item, and quit
1772           state->finished = true;
1773           if (!next.ok()) {
1774             state->ClearQueue();
1775           }
1776         }
1777         // At this point we are going to send an item.  Either we will add it to the
1778         // queue or deliver it to a waiting future.
1779         if (state->waiting_future.has_value()) {
1780           waiting_future = std::move(state->waiting_future.value());
1781           state->waiting_future.reset();
1782         } else {
1783           state->queue.push(std::move(next));
1784           // We just filled up the queue so it is time to quit.  We may need to notify
1785           // a cleanup task so we transition to Quitting
1786           if (static_cast<int>(state->queue.size()) >= state->max_q) {
1787             state->reading = false;
1788           }
1789         }
1790         reading = state->reading && !state->finished;
1791       }
1792       // This should happen outside the mutex.  Presumably there is a
1793       // transferring generator on the other end that will quickly transfer any
1794       // callbacks off of this thread so we can continue looping.  Still, best not to
1795       // rely on that
1796       if (waiting_future.is_valid()) {
1797         waiting_future.MarkFinished(next);
1798       }
1799     }
1800     // Once we've sent our last item we can notify any waiters that we are done and so
1801     // either state can be cleaned up or a new background task can be started
1802     Future<> task_finished;
1803     {
1804       auto guard = state->mutex.Lock();
1805       // After we give up the mutex state can be safely deleted.  We will no longer
1806       // reference it.  We can safely transition to idle now.
1807       task_finished = state->task_finished;
1808       state->task_finished = Future<>();
1809       state->worker_thread_id.store(kUnlikelyThreadId);
1810     }
1811     task_finished.MarkFinished();
1812   }
1813 
1814   std::shared_ptr<State> state_;
1815   // state_ is held by both the generator and the background thread so it won't be cleaned
1816   // up when all consumer references are relinquished.  cleanup_ is only held by the
1817   // generator so it will be destructed when the last consumer reference is gone.  We use
1818   // this to cleanup / stop the background generator in case the consuming end stops
1819   // listening (e.g. due to a downstream error)
1820   std::shared_ptr<Cleanup> cleanup_;
1821 };
1822 
1823 constexpr int kDefaultBackgroundMaxQ = 32;
1824 constexpr int kDefaultBackgroundQRestart = 16;
1825 
1826 /// \brief Create an AsyncGenerator<T> by iterating over an Iterator<T> on a background
1827 /// thread
1828 ///
1829 /// The parameter max_q and q_restart control queue size and background thread task
1830 /// management. If the background task is fast you typically don't want it creating a
1831 /// thread task for every item.  Instead the background thread will run until it fills
1832 /// up a readahead queue.
1833 ///
1834 /// Once the queue has filled up the background thread task will terminate (allowing other
1835 /// I/O tasks to use the thread).  Once the queue has been drained enough (specified by
1836 /// q_restart) then the background thread task will be restarted.  If q_restart is too low
1837 /// then you may exhaust the queue waiting for the background thread task to start running
1838 /// again.  If it is too high then it will be constantly stopping and restarting the
1839 /// background queue task
1840 ///
1841 /// The "background thread" is a logical thread and will run as tasks on the io_executor.
1842 /// This thread may stop and start when the queue fills up but there will only be one
1843 /// active background thread task at any given time.  You MUST transfer away from this
1844 /// background generator.  Otherwise there could be a race condition if a callback on the
1845 /// background thread deletes the last consumer reference to the background generator. You
1846 /// can transfer onto the same executor as the background thread, it is only necessary to
1847 /// create a new thread task, not to switch executors.
1848 ///
1849 /// This generator is not async-reentrant
1850 ///
1851 /// This generator will queue up to max_q blocks
1852 template <typename T>
1853 static Result<AsyncGenerator<T>> MakeBackgroundGenerator(
1854     Iterator<T> iterator, internal::Executor* io_executor,
1855     int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) {
1856   if (max_q < q_restart) {
1857     return Status::Invalid("max_q must be >= q_restart");
1858   }
1859   return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q, q_restart);
1860 }
1861 
1862 /// \brief Create an AsyncGenerator<T> by iterating over an Iterator<T> synchronously
1863 ///
1864 /// This should only be used if you know the source iterator does not involve any
1865 /// I/O (or other blocking calls).  Otherwise a CPU thread will be blocked and, depending
1866 /// on the complexity of the iterator, it may lead to deadlock.
1867 ///
1868 /// If you are not certain if there will be I/O then it is better to use
1869 /// MakeBackgroundGenerator.  If helpful you can think of this as the AsyncGenerator
1870 /// equivalent of Future::MakeFinished
1871 ///
1872 /// It is impossible to call this in an async-reentrant manner since the returned
1873 /// future will be completed by the time it is polled.
1874 ///
1875 /// This generator does not queue
1876 template <typename T>
1877 static Result<AsyncGenerator<T>> MakeBlockingGenerator(
1878     std::shared_ptr<Iterator<T>> iterator) {
1879   return [it = std::move(iterator)]() mutable -> Future<T> {
1880     return Future<T>::MakeFinished(it->Next());
1881   };
1882 }
1883 
1884 template <typename T>
1885 static Result<AsyncGenerator<T>> MakeBlockingGenerator(Iterator<T> iterator) {
1886   return MakeBlockingGenerator(std::make_shared<Iterator<T>>(std::move(iterator)));
1887 }
1888 
1889 /// \see MakeGeneratorIterator
1890 template <typename T>
1891 class GeneratorIterator {
1892  public:
1893   explicit GeneratorIterator(AsyncGenerator<T> source) : source_(std::move(source)) {}
1894 
1895   Result<T> Next() { return source_().result(); }
1896 
1897  private:
1898   AsyncGenerator<T> source_;
1899 };
1900 
1901 /// \brief Convert an AsyncGenerator<T> to an Iterator<T> which blocks until each future
1902 /// is finished
1903 template <typename T>
1904 Iterator<T> MakeGeneratorIterator(AsyncGenerator<T> source) {
1905   return Iterator<T>(GeneratorIterator<T>(std::move(source)));
1906 }
1907 
1908 /// \brief Add readahead to an iterator using a background thread.
1909 ///
1910 /// Under the hood this is converting the iterator to a generator using
1911 /// MakeBackgroundGenerator, adding readahead to the converted generator with
1912 /// MakeReadaheadGenerator, and then converting back to an iterator using
1913 /// MakeGeneratorIterator.
1914 template <typename T>
1915 Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> it, int readahead_queue_size) {
1916   ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1));
1917   auto max_q = readahead_queue_size;
1918   auto q_restart = std::max(1, max_q / 2);
1919   ARROW_ASSIGN_OR_RAISE(
1920       auto background_generator,
1921       MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart));
1922   // Capture io_executor to keep it alive as long as owned_bg_generator is still
1923   // referenced
1924   AsyncGenerator<T> owned_bg_generator = [io_executor, background_generator]() {
1925     return background_generator();
1926   };
1927   return MakeGeneratorIterator(std::move(owned_bg_generator));
1928 }
1929 
1930 /// \brief Make a generator that returns a single pre-generated future
1931 ///
1932 /// This generator is async-reentrant.
1933 template <typename T>
1934 std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
1935   assert(future.is_valid());
1936   auto state = std::make_shared<Future<T>>(std::move(future));
1937   return [state]() -> Future<T> {
1938     auto fut = std::move(*state);
1939     if (fut.is_valid()) {
1940       return fut;
1941     } else {
1942       return AsyncGeneratorEnd<T>();
1943     }
1944   };
1945 }
1946 
1947 /// \brief Make a generator that immediately ends.
1948 ///
1949 /// This generator is async-reentrant.
1950 template <typename T>
1951 std::function<Future<T>()> MakeEmptyGenerator() {
1952   return []() -> Future<T> { return AsyncGeneratorEnd<T>(); };
1953 }
1954 
1955 /// \brief Make a generator that always fails with a given error
1956 ///
1957 /// This generator is async-reentrant.
1958 template <typename T>
1959 AsyncGenerator<T> MakeFailingGenerator(Status st) {
1960   assert(!st.ok());
1961   auto state = std::make_shared<Status>(std::move(st));
1962   return [state]() -> Future<T> {
1963     auto st = std::move(*state);
1964     if (!st.ok()) {
1965       return st;
1966     } else {
1967       return AsyncGeneratorEnd<T>();
1968     }
1969   };
1970 }
1971 
1972 /// \brief Make a generator that always fails with a given error
1973 ///
1974 /// This overload allows inferring the return type from the argument.
1975 template <typename T>
1976 AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) {
1977   return MakeFailingGenerator<T>(result.status());
1978 }
1979 
1980 /// \brief Prepend initial_values onto a generator
1981 ///
1982 /// This generator is async-reentrant but will buffer requests and will not
1983 /// pull from following_values async-reentrantly.
1984 template <typename T>
1985 AsyncGenerator<T> MakeGeneratorStartsWith(std::vector<T> initial_values,
1986                                           AsyncGenerator<T> following_values) {
1987   auto initial_values_vec_gen = MakeVectorGenerator(std::move(initial_values));
1988   auto gen_gen = MakeVectorGenerator<AsyncGenerator<T>>(
1989       {std::move(initial_values_vec_gen), std::move(following_values)});
1990   return MakeConcatenatedGenerator(std::move(gen_gen));
1991 }
1992 
1993 template <typename T>
1994 struct CancellableGenerator {
1995   Future<T> operator()() {
1996     if (stop_token.IsStopRequested()) {
1997       return stop_token.Poll();
1998     }
1999     return source();
2000   }
2001 
2002   AsyncGenerator<T> source;
2003   StopToken stop_token;
2004 };
2005 
2006 /// \brief Allow an async generator to be cancelled
2007 ///
2008 /// This generator is async-reentrant
2009 template <typename T>
2010 AsyncGenerator<T> MakeCancellable(AsyncGenerator<T> source, StopToken stop_token) {
2011   return CancellableGenerator<T>{std::move(source), std::move(stop_token)};
2012 }
2013 
2014 template <typename T>
2015 class DefaultIfEmptyGenerator {
2016  public:
2017   DefaultIfEmptyGenerator(AsyncGenerator<T> source, T or_value)
2018       : state_(std::make_shared<State>(std::move(source), std::move(or_value))) {}
2019 
2020   Future<T> operator()() {
2021     if (state_->first) {
2022       state_->first = false;
2023       struct {
2024         T or_value;
2025 
2026         Result<T> operator()(const T& value) {
2027           if (IterationTraits<T>::IsEnd(value)) {
2028             return std::move(or_value);
2029           }
2030           return value;
2031         }
2032       } Continuation;
2033       Continuation.or_value = std::move(state_->or_value);
2034       return state_->source().Then(std::move(Continuation));
2035     }
2036     return state_->source();
2037   }
2038 
2039  private:
2040   struct State {
2041     AsyncGenerator<T> source;
2042     T or_value;
2043     bool first;
2044     State(AsyncGenerator<T> source_, T or_value_)
2045         : source(std::move(source_)), or_value(std::move(or_value_)), first(true) {}
2046   };
2047   std::shared_ptr<State> state_;
2048 };
2049 
2050 /// \brief If the generator is empty, return the given value, else
2051 /// forward the values from the generator.
2052 ///
2053 /// This generator is async-reentrant.
2054 template <typename T>
2055 AsyncGenerator<T> MakeDefaultIfEmptyGenerator(AsyncGenerator<T> source, T or_value) {
2056   return DefaultIfEmptyGenerator<T>(std::move(source), std::move(or_value));
2057 }
2058 }  // namespace arrow