File indexing completed on 2025-08-28 08:27:02
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <atomic>
0021 #include <cassert>
0022 #include <cstring>
0023 #include <deque>
0024 #include <limits>
0025 #include <optional>
0026 #include <queue>
0027
0028 #include "arrow/util/async_generator_fwd.h"
0029 #include "arrow/util/async_util.h"
0030 #include "arrow/util/functional.h"
0031 #include "arrow/util/future.h"
0032 #include "arrow/util/io_util.h"
0033 #include "arrow/util/iterator.h"
0034 #include "arrow/util/mutex.h"
0035 #include "arrow/util/queue.h"
0036 #include "arrow/util/thread_pool.h"
0037
0038 namespace arrow {
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070 template <typename T>
0071 struct IterationTraits<AsyncGenerator<T>> {
0072
0073
0074 static AsyncGenerator<T> End() { return AsyncGenerator<T>(); }
0075
0076 static bool IsEnd(const AsyncGenerator<T>& val) { return !val; }
0077 };
0078
0079 template <typename T>
0080 Future<T> AsyncGeneratorEnd() {
0081 return Future<T>::MakeFinished(IterationTraits<T>::End());
0082 }
0083
0084
0085 template <typename T, typename Visitor>
0086 Future<> VisitAsyncGenerator(AsyncGenerator<T> generator, Visitor visitor) {
0087 struct LoopBody {
0088 struct Callback {
0089 Result<ControlFlow<>> operator()(const T& next) {
0090 if (IsIterationEnd(next)) {
0091 return Break();
0092 } else {
0093 auto visited = visitor(next);
0094 if (visited.ok()) {
0095 return Continue();
0096 } else {
0097 return visited;
0098 }
0099 }
0100 }
0101
0102 Visitor visitor;
0103 };
0104
0105 Future<ControlFlow<>> operator()() {
0106 Callback callback{visitor};
0107 auto next = generator();
0108 return next.Then(std::move(callback));
0109 }
0110
0111 AsyncGenerator<T> generator;
0112 Visitor visitor;
0113 };
0114
0115 return Loop(LoopBody{std::move(generator), std::move(visitor)});
0116 }
0117
0118
0119 template <typename T>
0120 Future<> DiscardAllFromAsyncGenerator(AsyncGenerator<T> generator) {
0121 std::function<Status(T)> visitor = [](const T&) { return Status::OK(); };
0122 return VisitAsyncGenerator(generator, visitor);
0123 }
0124
0125
0126 template <typename T>
0127 Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
0128 auto vec = std::make_shared<std::vector<T>>();
0129 auto loop_body = [generator = std::move(generator),
0130 vec = std::move(vec)]() -> Future<ControlFlow<std::vector<T>>> {
0131 auto next = generator();
0132 return next.Then([vec](const T& result) -> Result<ControlFlow<std::vector<T>>> {
0133 if (IsIterationEnd(result)) {
0134 return Break(*vec);
0135 } else {
0136 vec->push_back(result);
0137 return Continue();
0138 }
0139 });
0140 };
0141 return Loop(std::move(loop_body));
0142 }
0143
0144
0145 template <typename T, typename V>
0146 class MappingGenerator {
0147 public:
0148 MappingGenerator(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
0149 : state_(std::make_shared<State>(std::move(source), std::move(map))) {}
0150
0151 Future<V> operator()() {
0152 auto future = Future<V>::Make();
0153 bool should_trigger;
0154 {
0155 auto guard = state_->mutex.Lock();
0156 if (state_->finished) {
0157 return AsyncGeneratorEnd<V>();
0158 }
0159 should_trigger = state_->waiting_jobs.empty();
0160 state_->waiting_jobs.push_back(future);
0161 }
0162 if (should_trigger) {
0163 state_->source().AddCallback(Callback{state_});
0164 }
0165 return future;
0166 }
0167
0168 private:
0169 struct State {
0170 State(AsyncGenerator<T> source, std::function<Future<V>(const T&)> map)
0171 : source(std::move(source)),
0172 map(std::move(map)),
0173 waiting_jobs(),
0174 mutex(),
0175 finished(false) {}
0176
0177 void Purge() {
0178
0179
0180
0181
0182 while (!waiting_jobs.empty()) {
0183 waiting_jobs.front().MarkFinished(IterationTraits<V>::End());
0184 waiting_jobs.pop_front();
0185 }
0186 }
0187
0188 AsyncGenerator<T> source;
0189 std::function<Future<V>(const T&)> map;
0190 std::deque<Future<V>> waiting_jobs;
0191 util::Mutex mutex;
0192 bool finished;
0193 };
0194
0195 struct Callback;
0196
0197 struct MappedCallback {
0198 void operator()(const Result<V>& maybe_next) {
0199 bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
0200 bool should_purge = false;
0201 if (end) {
0202 {
0203 auto guard = state->mutex.Lock();
0204 should_purge = !state->finished;
0205 state->finished = true;
0206 }
0207 }
0208 sink.MarkFinished(maybe_next);
0209 if (should_purge) {
0210 state->Purge();
0211 }
0212 }
0213 std::shared_ptr<State> state;
0214 Future<V> sink;
0215 };
0216
0217 struct Callback {
0218 void operator()(const Result<T>& maybe_next) {
0219 Future<V> sink;
0220 bool end = !maybe_next.ok() || IsIterationEnd(*maybe_next);
0221 bool should_purge = false;
0222 bool should_trigger;
0223 {
0224 auto guard = state->mutex.Lock();
0225
0226
0227 if (state->finished) return;
0228 if (end) {
0229 should_purge = !state->finished;
0230 state->finished = true;
0231 }
0232 sink = state->waiting_jobs.front();
0233 state->waiting_jobs.pop_front();
0234 should_trigger = !end && !state->waiting_jobs.empty();
0235 }
0236 if (should_purge) {
0237 state->Purge();
0238 }
0239 if (should_trigger) {
0240 state->source().AddCallback(Callback{state});
0241 }
0242 if (maybe_next.ok()) {
0243 const T& val = maybe_next.ValueUnsafe();
0244 if (IsIterationEnd(val)) {
0245 sink.MarkFinished(IterationTraits<V>::End());
0246 } else {
0247 Future<V> mapped_fut = state->map(val);
0248 mapped_fut.AddCallback(MappedCallback{std::move(state), std::move(sink)});
0249 }
0250 } else {
0251 sink.MarkFinished(maybe_next.status());
0252 }
0253 }
0254
0255 std::shared_ptr<State> state;
0256 };
0257
0258 std::shared_ptr<State> state_;
0259 };
0260
0261
0262
0263
0264
0265
0266
0267
0268 template <typename T, typename MapFn,
0269 typename Mapped = detail::result_of_t<MapFn(const T&)>,
0270 typename V = typename EnsureFuture<Mapped>::type::ValueType>
0271 AsyncGenerator<V> MakeMappedGenerator(AsyncGenerator<T> source_generator, MapFn map) {
0272 auto map_callback = [map = std::move(map)](const T& val) mutable -> Future<V> {
0273 return ToFuture(map(val));
0274 };
0275 return MappingGenerator<T, V>(std::move(source_generator), std::move(map_callback));
0276 }
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287
0288 template <typename T, typename MapFn,
0289 typename Mapped = detail::result_of_t<MapFn(const T&)>,
0290 typename V = typename EnsureFuture<Mapped>::type::ValueType>
0291 AsyncGenerator<T> MakeFlatMappedGenerator(AsyncGenerator<T> source_generator, MapFn map) {
0292 return MakeConcatenatedGenerator(
0293 MakeMappedGenerator(std::move(source_generator), std::move(map)));
0294 }
0295
0296
0297 template <typename T, typename ComesAfter, typename IsNext>
0298 class SequencingGenerator {
0299 public:
0300 SequencingGenerator(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next,
0301 T initial_value)
0302 : state_(std::make_shared<State>(std::move(source), std::move(compare),
0303 std::move(is_next), std::move(initial_value))) {}
0304
0305 Future<T> operator()() {
0306 {
0307 auto guard = state_->mutex.Lock();
0308
0309
0310 if (!state_->queue.empty() &&
0311 (!state_->queue.top().ok() ||
0312 state_->is_next(state_->previous_value, *state_->queue.top()))) {
0313 auto result = std::move(state_->queue.top());
0314 if (result.ok()) {
0315 state_->previous_value = *result;
0316 }
0317 state_->queue.pop();
0318 return Future<T>::MakeFinished(result);
0319 }
0320 if (state_->finished) {
0321 return AsyncGeneratorEnd<T>();
0322 }
0323
0324 auto new_waiting_fut = Future<T>::Make();
0325 state_->waiting_future = new_waiting_fut;
0326 guard.Unlock();
0327 state_->source().AddCallback(Callback{state_});
0328 return new_waiting_fut;
0329 }
0330 }
0331
0332 private:
0333 struct WrappedComesAfter {
0334 bool operator()(const Result<T>& left, const Result<T>& right) {
0335 if (!left.ok() || !right.ok()) {
0336
0337 return false;
0338 }
0339 return compare(*left, *right);
0340 }
0341 ComesAfter compare;
0342 };
0343
0344 struct State {
0345 State(AsyncGenerator<T> source, ComesAfter compare, IsNext is_next, T initial_value)
0346 : source(std::move(source)),
0347 is_next(std::move(is_next)),
0348 previous_value(std::move(initial_value)),
0349 waiting_future(),
0350 queue(WrappedComesAfter{compare}),
0351 finished(false),
0352 mutex() {}
0353
0354 AsyncGenerator<T> source;
0355 IsNext is_next;
0356 T previous_value;
0357 Future<T> waiting_future;
0358 std::priority_queue<Result<T>, std::vector<Result<T>>, WrappedComesAfter> queue;
0359 bool finished;
0360 util::Mutex mutex;
0361 };
0362
0363 class Callback {
0364 public:
0365 explicit Callback(std::shared_ptr<State> state) : state_(std::move(state)) {}
0366
0367 void operator()(const Result<T> result) {
0368 Future<T> to_deliver;
0369 bool finished;
0370 {
0371 auto guard = state_->mutex.Lock();
0372 bool ready_to_deliver = false;
0373 if (!result.ok()) {
0374
0375 while (!state_->queue.empty()) {
0376 state_->queue.pop();
0377 }
0378 ready_to_deliver = true;
0379 state_->finished = true;
0380 } else if (IsIterationEnd<T>(result.ValueUnsafe())) {
0381 ready_to_deliver = state_->queue.empty();
0382 state_->finished = true;
0383 } else {
0384 ready_to_deliver = state_->is_next(state_->previous_value, *result);
0385 }
0386
0387 if (ready_to_deliver && state_->waiting_future.is_valid()) {
0388 to_deliver = state_->waiting_future;
0389 if (result.ok()) {
0390 state_->previous_value = *result;
0391 }
0392 } else {
0393 state_->queue.push(result);
0394 }
0395
0396 finished = state_->finished;
0397 }
0398
0399 if (to_deliver.is_valid()) {
0400 to_deliver.MarkFinished(result);
0401 } else {
0402
0403
0404 if (!finished) {
0405 state_->source().AddCallback(Callback{state_});
0406 }
0407 }
0408 }
0409
0410 private:
0411 const std::shared_ptr<State> state_;
0412 };
0413
0414 const std::shared_ptr<State> state_;
0415 };
0416
0417
0418
0419
0420
0421
0422
0423
0424
0425
0426
0427
0428
0429
0430
0431
0432
0433
0434
0435
0436 template <typename T, typename ComesAfter, typename IsNext>
0437 AsyncGenerator<T> MakeSequencingGenerator(AsyncGenerator<T> source_generator,
0438 ComesAfter compare, IsNext is_next,
0439 T initial_value) {
0440 return SequencingGenerator<T, ComesAfter, IsNext>(
0441 std::move(source_generator), std::move(compare), std::move(is_next),
0442 std::move(initial_value));
0443 }
0444
0445
0446 template <typename T, typename V>
0447 class TransformingGenerator {
0448
0449
0450
0451 struct TransformingGeneratorState
0452 : std::enable_shared_from_this<TransformingGeneratorState> {
0453 TransformingGeneratorState(AsyncGenerator<T> generator, Transformer<T, V> transformer)
0454 : generator_(std::move(generator)),
0455 transformer_(std::move(transformer)),
0456 last_value_(),
0457 finished_() {}
0458
0459 Future<V> operator()() {
0460 while (true) {
0461 auto maybe_next_result = Pump();
0462 if (!maybe_next_result.ok()) {
0463 return Future<V>::MakeFinished(maybe_next_result.status());
0464 }
0465 auto maybe_next = std::move(maybe_next_result).ValueUnsafe();
0466 if (maybe_next.has_value()) {
0467 return Future<V>::MakeFinished(*std::move(maybe_next));
0468 }
0469
0470 auto next_fut = generator_();
0471
0472
0473 if (next_fut.is_finished()) {
0474 auto next_result = next_fut.result();
0475 if (next_result.ok()) {
0476 last_value_ = *next_result;
0477 } else {
0478 return Future<V>::MakeFinished(next_result.status());
0479 }
0480
0481 } else {
0482 auto self = this->shared_from_this();
0483 return next_fut.Then([self](const T& next_result) {
0484 self->last_value_ = next_result;
0485 return (*self)();
0486 });
0487 }
0488 }
0489 }
0490
0491
0492 Result<std::optional<V>> Pump() {
0493 if (!finished_ && last_value_.has_value()) {
0494 ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
0495 if (next.ReadyForNext()) {
0496 if (IsIterationEnd(*last_value_)) {
0497 finished_ = true;
0498 }
0499 last_value_.reset();
0500 }
0501 if (next.Finished()) {
0502 finished_ = true;
0503 }
0504 if (next.HasValue()) {
0505 return next.Value();
0506 }
0507 }
0508 if (finished_) {
0509 return IterationTraits<V>::End();
0510 }
0511 return std::nullopt;
0512 }
0513
0514 AsyncGenerator<T> generator_;
0515 Transformer<T, V> transformer_;
0516 std::optional<T> last_value_;
0517 bool finished_;
0518 };
0519
0520 public:
0521 explicit TransformingGenerator(AsyncGenerator<T> generator,
0522 Transformer<T, V> transformer)
0523 : state_(std::make_shared<TransformingGeneratorState>(std::move(generator),
0524 std::move(transformer))) {}
0525
0526 Future<V> operator()() { return (*state_)(); }
0527
0528 protected:
0529 std::shared_ptr<TransformingGeneratorState> state_;
0530 };
0531
0532
0533
0534
0535
0536
0537
0538
0539
0540
0541
0542 template <typename T, typename V>
0543 AsyncGenerator<V> MakeTransformedGenerator(AsyncGenerator<T> generator,
0544 Transformer<T, V> transformer) {
0545 return TransformingGenerator<T, V>(generator, transformer);
0546 }
0547
0548
0549 template <typename T>
0550 class SerialReadaheadGenerator {
0551 public:
0552 SerialReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
0553 : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
0554
0555 Future<T> operator()() {
0556 if (state_->first_) {
0557
0558 state_->first_ = false;
0559 auto next = state_->source_();
0560 return next.Then(Callback{state_}, ErrCallback{state_});
0561 }
0562
0563
0564
0565 auto finished = state_->finished_.load();
0566 if (finished && state_->readahead_queue_.IsEmpty()) {
0567 return AsyncGeneratorEnd<T>();
0568 }
0569
0570 std::shared_ptr<Future<T>> next;
0571 if (!state_->readahead_queue_.Read(next)) {
0572 return Status::UnknownError("Could not read from readahead_queue");
0573 }
0574
0575 auto last_available = state_->spaces_available_.fetch_add(1);
0576 if (last_available == 0 && !finished) {
0577
0578 ARROW_RETURN_NOT_OK(state_->Pump(state_));
0579 }
0580 return *next;
0581 }
0582
0583 private:
0584 struct State {
0585 State(AsyncGenerator<T> source, int max_readahead)
0586 : first_(true),
0587 source_(std::move(source)),
0588 finished_(false),
0589
0590 spaces_available_(max_readahead + 1),
0591
0592 readahead_queue_(max_readahead + 1) {}
0593
0594 Status Pump(const std::shared_ptr<State>& self) {
0595
0596
0597
0598 auto next_slot = std::make_shared<Future<T>>();
0599 auto written = readahead_queue_.Write(next_slot);
0600 if (!written) {
0601 return Status::UnknownError("Could not write to readahead_queue");
0602 }
0603
0604
0605
0606
0607
0608
0609 *next_slot = source_().Then(Callback{self}, ErrCallback{self});
0610 return Status::OK();
0611 }
0612
0613
0614 bool first_;
0615
0616 AsyncGenerator<T> source_;
0617 std::atomic<bool> finished_;
0618
0619
0620
0621
0622 std::atomic<uint32_t> spaces_available_;
0623
0624
0625 util::SpscQueue<std::shared_ptr<Future<T>>> readahead_queue_;
0626 };
0627
0628 struct Callback {
0629 Result<T> operator()(const T& next) {
0630 if (IsIterationEnd(next)) {
0631 state_->finished_.store(true);
0632 return next;
0633 }
0634 auto last_available = state_->spaces_available_.fetch_sub(1);
0635 if (last_available > 1) {
0636 ARROW_RETURN_NOT_OK(state_->Pump(state_));
0637 }
0638 return next;
0639 }
0640
0641 std::shared_ptr<State> state_;
0642 };
0643
0644 struct ErrCallback {
0645 Result<T> operator()(const Status& st) {
0646 state_->finished_.store(true);
0647 return st;
0648 }
0649
0650 std::shared_ptr<State> state_;
0651 };
0652
0653 std::shared_ptr<State> state_;
0654 };
0655
0656
0657 template <typename T>
0658 class FutureFirstGenerator {
0659 public:
0660 explicit FutureFirstGenerator(Future<AsyncGenerator<T>> future)
0661 : state_(std::make_shared<State>(std::move(future))) {}
0662
0663 Future<T> operator()() {
0664 if (state_->source_) {
0665 return state_->source_();
0666 } else {
0667 auto state = state_;
0668 return state_->future_.Then([state](const AsyncGenerator<T>& source) {
0669 state->source_ = source;
0670 return state->source_();
0671 });
0672 }
0673 }
0674
0675 private:
0676 struct State {
0677 explicit State(Future<AsyncGenerator<T>> future) : future_(future), source_() {}
0678
0679 Future<AsyncGenerator<T>> future_;
0680 AsyncGenerator<T> source_;
0681 };
0682
0683 std::shared_ptr<State> state_;
0684 };
0685
0686
0687
0688
0689
0690
0691
0692 template <typename T>
0693 AsyncGenerator<T> MakeFromFuture(Future<AsyncGenerator<T>> future) {
0694 return FutureFirstGenerator<T>(std::move(future));
0695 }
0696
0697
0698
0699
0700
0701
0702
0703
0704
0705 template <typename T>
0706 AsyncGenerator<T> MakeSerialReadaheadGenerator(AsyncGenerator<T> source_generator,
0707 int max_readahead) {
0708 return SerialReadaheadGenerator<T>(std::move(source_generator), max_readahead);
0709 }
0710
0711
0712
0713
0714
0715
0716
0717
0718
0719
0720 template <typename T>
0721 AsyncGenerator<T> MakeAutoStartingGenerator(AsyncGenerator<T> generator) {
0722 struct AutostartGenerator {
0723 Future<T> operator()() {
0724 if (first_future->is_valid()) {
0725 Future<T> result = *first_future;
0726 *first_future = Future<T>();
0727 return result;
0728 }
0729 return source();
0730 }
0731
0732 std::shared_ptr<Future<T>> first_future;
0733 AsyncGenerator<T> source;
0734 };
0735
0736 std::shared_ptr<Future<T>> first_future = std::make_shared<Future<T>>(generator());
0737 return AutostartGenerator{std::move(first_future), std::move(generator)};
0738 }
0739
0740
0741 template <typename T>
0742 class ReadaheadGenerator {
0743 public:
0744 ReadaheadGenerator(AsyncGenerator<T> source_generator, int max_readahead)
0745 : state_(std::make_shared<State>(std::move(source_generator), max_readahead)) {}
0746
0747 Future<T> AddMarkFinishedContinuation(Future<T> fut) {
0748 auto state = state_;
0749 return fut.Then(
0750 [state](const T& result) -> Future<T> {
0751 state->MarkFinishedIfDone(result);
0752 if (state->finished.load()) {
0753 if (state->num_running.fetch_sub(1) == 1) {
0754 state->final_future.MarkFinished();
0755 }
0756 } else {
0757 state->num_running.fetch_sub(1);
0758 }
0759 return result;
0760 },
0761 [state](const Status& err) -> Future<T> {
0762
0763
0764 state->finished.store(true);
0765 if (state->num_running.fetch_sub(1) == 1) {
0766 state->final_future.MarkFinished();
0767 }
0768 return state->final_future.Then([err]() -> Result<T> { return err; });
0769 });
0770 }
0771
0772 Future<T> operator()() {
0773 if (state_->readahead_queue.empty()) {
0774
0775 state_->num_running.store(state_->max_readahead);
0776 for (int i = 0; i < state_->max_readahead; i++) {
0777 auto next = state_->source_generator();
0778 auto next_after_check = AddMarkFinishedContinuation(std::move(next));
0779 state_->readahead_queue.push(std::move(next_after_check));
0780 }
0781 }
0782
0783 auto result = state_->readahead_queue.front();
0784 state_->readahead_queue.pop();
0785 if (state_->finished.load()) {
0786 state_->readahead_queue.push(AsyncGeneratorEnd<T>());
0787 } else {
0788 state_->num_running.fetch_add(1);
0789 auto back_of_queue = state_->source_generator();
0790 auto back_of_queue_after_check =
0791 AddMarkFinishedContinuation(std::move(back_of_queue));
0792 state_->readahead_queue.push(std::move(back_of_queue_after_check));
0793 }
0794 return result;
0795 }
0796
0797 private:
0798 struct State {
0799 State(AsyncGenerator<T> source_generator, int max_readahead)
0800 : source_generator(std::move(source_generator)), max_readahead(max_readahead) {}
0801
0802 void MarkFinishedIfDone(const T& next_result) {
0803 if (IsIterationEnd(next_result)) {
0804 finished.store(true);
0805 }
0806 }
0807
0808 AsyncGenerator<T> source_generator;
0809 int max_readahead;
0810 Future<> final_future = Future<>::Make();
0811 std::atomic<int> num_running{0};
0812 std::atomic<bool> finished{false};
0813 std::queue<Future<T>> readahead_queue;
0814 };
0815
0816 std::shared_ptr<State> state_;
0817 };
0818
0819
0820
0821
0822
0823
0824
0825
0826 template <typename T>
0827 class PushGenerator {
0828 struct State {
0829 State() {}
0830
0831 util::Mutex mutex;
0832 std::deque<Result<T>> result_q;
0833 std::optional<Future<T>> consumer_fut;
0834 bool finished = false;
0835 };
0836
0837 public:
0838
0839 class Producer {
0840 public:
0841 explicit Producer(const std::shared_ptr<State>& state) : weak_state_(state) {}
0842
0843
0844
0845
0846
0847
0848 bool Push(Result<T> result) {
0849 auto state = weak_state_.lock();
0850 if (!state) {
0851
0852 return false;
0853 }
0854 auto lock = state->mutex.Lock();
0855 if (state->finished) {
0856
0857 return false;
0858 }
0859 if (state->consumer_fut.has_value()) {
0860 auto fut = std::move(state->consumer_fut.value());
0861 state->consumer_fut.reset();
0862 lock.Unlock();
0863 fut.MarkFinished(std::move(result));
0864 } else {
0865 state->result_q.push_back(std::move(result));
0866 }
0867 return true;
0868 }
0869
0870
0871
0872
0873
0874
0875
0876
0877
0878 bool Close() {
0879 auto state = weak_state_.lock();
0880 if (!state) {
0881
0882 return false;
0883 }
0884 auto lock = state->mutex.Lock();
0885 if (state->finished) {
0886
0887 return false;
0888 }
0889 state->finished = true;
0890 if (state->consumer_fut.has_value()) {
0891 auto fut = std::move(state->consumer_fut.value());
0892 state->consumer_fut.reset();
0893 lock.Unlock();
0894 fut.MarkFinished(IterationTraits<T>::End());
0895 }
0896 return true;
0897 }
0898
0899
0900 bool is_closed() const {
0901 auto state = weak_state_.lock();
0902 if (!state) {
0903
0904 return true;
0905 }
0906 auto lock = state->mutex.Lock();
0907 return state->finished;
0908 }
0909
0910 private:
0911 const std::weak_ptr<State> weak_state_;
0912 };
0913
0914 PushGenerator() : state_(std::make_shared<State>()) {}
0915
0916
0917 Future<T> operator()() const {
0918 auto lock = state_->mutex.Lock();
0919 assert(!state_->consumer_fut.has_value());
0920 if (!state_->result_q.empty()) {
0921 auto fut = Future<T>::MakeFinished(std::move(state_->result_q.front()));
0922 state_->result_q.pop_front();
0923 return fut;
0924 }
0925 if (state_->finished) {
0926 return AsyncGeneratorEnd<T>();
0927 }
0928 auto fut = Future<T>::Make();
0929 state_->consumer_fut = fut;
0930 return fut;
0931 }
0932
0933
0934
0935
0936
0937 Producer producer() { return Producer{state_}; }
0938
0939 private:
0940 const std::shared_ptr<State> state_;
0941 };
0942
0943
0944
0945
0946
0947
0948
0949
0950
0951
0952 template <typename T>
0953 AsyncGenerator<T> MakeReadaheadGenerator(AsyncGenerator<T> source_generator,
0954 int max_readahead) {
0955 return ReadaheadGenerator<T>(std::move(source_generator), max_readahead);
0956 }
0957
0958
0959
0960
0961 template <typename T>
0962 AsyncGenerator<T> MakeVectorGenerator(std::vector<T> vec) {
0963 struct State {
0964 explicit State(std::vector<T> vec_) : vec(std::move(vec_)), vec_idx(0) {}
0965
0966 std::vector<T> vec;
0967 std::atomic<std::size_t> vec_idx;
0968 };
0969
0970 auto state = std::make_shared<State>(std::move(vec));
0971 return [state]() {
0972 auto idx = state->vec_idx.fetch_add(1);
0973 if (idx >= state->vec.size()) {
0974
0975 state->vec.clear();
0976 return AsyncGeneratorEnd<T>();
0977 }
0978 return Future<T>::MakeFinished(state->vec[idx]);
0979 };
0980 }
0981
0982
0983 template <typename T>
0984 class MergedGenerator {
0985
0986
0987
0988
0989
0990
0991
0992
0993
0994
0995
0996
0997
0998
0999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027 public:
1028 explicit MergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
1029 int max_subscriptions)
1030 : state_(std::make_shared<State>(std::move(source), max_subscriptions)) {}
1031
1032 Future<T> operator()() {
1033
1034 Future<T> waiting_future;
1035 std::shared_ptr<DeliveredJob> delivered_job;
1036 bool mark_generator_complete = false;
1037 {
1038 auto guard = state_->mutex.Lock();
1039 if (!state_->delivered_jobs.empty()) {
1040
1041 delivered_job = std::move(state_->delivered_jobs.front());
1042 state_->delivered_jobs.pop_front();
1043 if (state_->IsCompleteUnlocked(guard)) {
1044
1045
1046 mark_generator_complete = true;
1047 } else {
1048
1049
1050
1051 state_->outstanding_requests++;
1052 }
1053 } else if (state_->broken ||
1054 (!state_->first && state_->num_running_subscriptions == 0)) {
1055
1056
1057 Result<T> end_res = IterationEnd<T>();
1058 if (!state_->final_error.ok()) {
1059 end_res = state_->final_error;
1060 state_->final_error = Status::OK();
1061 }
1062 return state_->all_finished.Then([end_res]() -> Result<T> { return end_res; });
1063 } else {
1064
1065
1066 waiting_future = Future<T>::Make();
1067 state_->waiting_jobs.push_back(std::make_shared<Future<T>>(waiting_future));
1068 }
1069 if (state_->first) {
1070
1071
1072 state_->outstanding_requests +=
1073 static_cast<int>(state_->active_subscriptions.size());
1074 state_->num_running_subscriptions +=
1075 static_cast<int>(state_->active_subscriptions.size());
1076 }
1077 }
1078
1079
1080
1081
1082 if (delivered_job) {
1083 if (mark_generator_complete) {
1084 state_->all_finished.MarkFinished();
1085 } else {
1086 delivered_job->deliverer().AddCallback(
1087 InnerCallback(state_, delivered_job->index));
1088 }
1089 return std::move(delivered_job->value);
1090 }
1091
1092
1093
1094 if (state_->first) {
1095 state_->first = false;
1096 mark_generator_complete = false;
1097 for (int i = 0; i < static_cast<int>(state_->active_subscriptions.size()); i++) {
1098 state_->PullSource().AddCallback(
1099 OuterCallback{state_, static_cast<std::size_t>(i)});
1100
1101
1102 auto guard = state_->mutex.Lock();
1103 if (state_->source_exhausted) {
1104 int excess_requests =
1105 static_cast<int>(state_->active_subscriptions.size()) - i - 1;
1106 state_->outstanding_requests -= excess_requests;
1107 state_->num_running_subscriptions -= excess_requests;
1108 if (excess_requests > 0) {
1109
1110
1111
1112 mark_generator_complete = state_->IsCompleteUnlocked(guard);
1113 }
1114 break;
1115 }
1116 }
1117 if (mark_generator_complete) {
1118 state_->MarkFinishedAndPurge();
1119 }
1120 }
1121 return waiting_future;
1122 }
1123
1124 private:
1125 struct DeliveredJob {
1126 explicit DeliveredJob(AsyncGenerator<T> deliverer_, Result<T> value_,
1127 std::size_t index_)
1128 : deliverer(deliverer_), value(std::move(value_)), index(index_) {}
1129
1130
1131
1132 AsyncGenerator<T> deliverer;
1133
1134 Result<T> value;
1135
1136
1137 std::size_t index;
1138 };
1139
1140 struct State {
1141 State(AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions)
1142 : source(std::move(source)),
1143 active_subscriptions(max_subscriptions),
1144 delivered_jobs(),
1145 waiting_jobs(),
1146 mutex(),
1147 first(true),
1148 broken(false),
1149 source_exhausted(false),
1150 outstanding_requests(0),
1151 num_running_subscriptions(0),
1152 final_error(Status::OK()) {}
1153
1154 Future<AsyncGenerator<T>> PullSource() {
1155
1156
1157 auto lock = mutex.Lock();
1158 return source();
1159 }
1160
1161 void SignalErrorUnlocked(const util::Mutex::Guard& guard) {
1162 broken = true;
1163
1164 while (!delivered_jobs.empty()) {
1165 delivered_jobs.pop_front();
1166 }
1167 }
1168
1169
1170
1171 void MarkFinishedAndPurge() {
1172 all_finished.MarkFinished();
1173 while (!waiting_jobs.empty()) {
1174 waiting_jobs.front()->MarkFinished(IterationEnd<T>());
1175 waiting_jobs.pop_front();
1176 }
1177 }
1178
1179
1180
1181 void MarkFinalError(const Status& err, Future<T> maybe_sink) {
1182 if (maybe_sink.is_valid()) {
1183
1184
1185 all_finished.AddCallback([maybe_sink, err](const Status& status) mutable {
1186 maybe_sink.MarkFinished(err);
1187 });
1188 } else {
1189
1190
1191 final_error = err;
1192 }
1193 }
1194
1195 bool IsCompleteUnlocked(const util::Mutex::Guard& guard) {
1196 return outstanding_requests == 0 &&
1197 (broken || (source_exhausted && num_running_subscriptions == 0 &&
1198 delivered_jobs.empty()));
1199 }
1200
1201 bool MarkTaskFinishedUnlocked(const util::Mutex::Guard& guard) {
1202 --outstanding_requests;
1203 return IsCompleteUnlocked(guard);
1204 }
1205
1206
1207
1208 AsyncGenerator<AsyncGenerator<T>> source;
1209
1210 std::vector<AsyncGenerator<T>> active_subscriptions;
1211
1212
1213 std::deque<std::shared_ptr<DeliveredJob>> delivered_jobs;
1214
1215
1216 std::deque<std::shared_ptr<Future<T>>> waiting_jobs;
1217
1218
1219
1220 Future<> all_finished = Future<>::Make();
1221 util::Mutex mutex;
1222
1223 bool first;
1224
1225 bool broken;
1226
1227
1228
1229 bool source_exhausted;
1230
1231
1232
1233 int outstanding_requests;
1234
1235
1236
1237
1238
1239 int num_running_subscriptions;
1240
1241
1242
1243 Status final_error;
1244 };
1245
1246 struct InnerCallback {
1247 InnerCallback(std::shared_ptr<State> state, std::size_t index, bool recursive = false)
1248 : state(std::move(state)), index(index), recursive(recursive) {}
1249
1250 void operator()(const Result<T>& maybe_next_ref) {
1251
1252 Future<T> next_fut;
1253 const Result<T>* maybe_next = &maybe_next_ref;
1254
1255
1256
1257
1258
1259 while (true) {
1260 Future<T> sink;
1261 bool sub_finished = maybe_next->ok() && IsIterationEnd(**maybe_next);
1262 bool pull_next_sub = false;
1263 bool was_broken = false;
1264 bool should_mark_gen_complete = false;
1265 bool should_mark_final_error = false;
1266 {
1267 auto guard = state->mutex.Lock();
1268 if (state->broken) {
1269
1270
1271 was_broken = true;
1272 } else {
1273 if (!sub_finished) {
1274
1275
1276 if (state->waiting_jobs.empty()) {
1277 state->delivered_jobs.push_back(std::make_shared<DeliveredJob>(
1278 state->active_subscriptions[index], *maybe_next, index));
1279 } else {
1280 sink = std::move(*state->waiting_jobs.front());
1281 state->waiting_jobs.pop_front();
1282 }
1283 }
1284
1285
1286 if (!maybe_next->ok()) {
1287 should_mark_final_error = true;
1288 state->SignalErrorUnlocked(guard);
1289 }
1290 }
1291
1292
1293
1294
1295
1296 pull_next_sub = sub_finished && !state->source_exhausted && !was_broken;
1297 if (sub_finished && !pull_next_sub) {
1298 state->num_running_subscriptions--;
1299 }
1300
1301
1302
1303
1304 if (state->broken || (!sink.is_valid() && !sub_finished) ||
1305 (sub_finished && state->source_exhausted)) {
1306 if (state->MarkTaskFinishedUnlocked(guard)) {
1307 should_mark_gen_complete = true;
1308 }
1309 }
1310 }
1311
1312
1313
1314 if (should_mark_final_error) {
1315 state->MarkFinalError(maybe_next->status(), std::move(sink));
1316 }
1317
1318 if (should_mark_gen_complete) {
1319 state->MarkFinishedAndPurge();
1320 }
1321
1322
1323
1324 if (was_broken) {
1325 return;
1326 }
1327
1328 if (pull_next_sub) {
1329 if (recursive) {
1330 was_empty = true;
1331 return;
1332 }
1333
1334
1335 state->PullSource().AddCallback(OuterCallback{state, index});
1336 } else if (sink.is_valid()) {
1337
1338
1339 sink.MarkFinished(*maybe_next);
1340 next_fut = state->active_subscriptions[index]();
1341 if (next_fut.TryAddCallback([this]() { return InnerCallback(state, index); })) {
1342 return;
1343 }
1344
1345
1346 maybe_next = &next_fut.result();
1347 continue;
1348 }
1349
1350
1351 return;
1352 }
1353 }
1354 std::shared_ptr<State> state;
1355 std::size_t index;
1356 bool recursive;
1357 bool was_empty = false;
1358 };
1359
1360 struct OuterCallback {
1361 void operator()(const Result<AsyncGenerator<T>>& initial_maybe_next) {
1362 Result<AsyncGenerator<T>> maybe_next = initial_maybe_next;
1363 while (true) {
1364
1365 bool should_continue = false;
1366 bool should_mark_gen_complete = false;
1367 bool should_deliver_error = false;
1368 bool source_exhausted = maybe_next.ok() && IsIterationEnd(*maybe_next);
1369 Future<T> error_sink;
1370 {
1371 auto guard = state->mutex.Lock();
1372 if (!maybe_next.ok() || source_exhausted || state->broken) {
1373
1374 if (!state->broken && !maybe_next.ok()) {
1375 state->SignalErrorUnlocked(guard);
1376
1377 should_deliver_error = true;
1378 if (!state->waiting_jobs.empty()) {
1379 error_sink = std::move(*state->waiting_jobs.front());
1380 state->waiting_jobs.pop_front();
1381 }
1382 }
1383 if (source_exhausted) {
1384 state->source_exhausted = true;
1385 state->num_running_subscriptions--;
1386 }
1387 if (state->MarkTaskFinishedUnlocked(guard)) {
1388 should_mark_gen_complete = true;
1389 }
1390 } else {
1391 state->active_subscriptions[index] = *maybe_next;
1392 should_continue = true;
1393 }
1394 }
1395 if (should_deliver_error) {
1396 state->MarkFinalError(maybe_next.status(), std::move(error_sink));
1397 }
1398 if (should_mark_gen_complete) {
1399 state->MarkFinishedAndPurge();
1400 }
1401 if (should_continue) {
1402
1403
1404
1405
1406 Future<T> next_item = (*maybe_next)();
1407 if (!next_item.TryAddCallback([this] { return InnerCallback(state, index); })) {
1408
1409
1410
1411
1412 InnerCallback immediate_inner(state, index, true);
1413 immediate_inner(next_item.result());
1414 if (immediate_inner.was_empty) {
1415 Future<AsyncGenerator<T>> next_source = state->PullSource();
1416 if (next_source.TryAddCallback([this] {
1417 return OuterCallback{state, index};
1418 })) {
1419
1420 return;
1421 }
1422
1423
1424
1425 maybe_next = next_source.result();
1426 continue;
1427 }
1428 }
1429 }
1430 return;
1431 }
1432 }
1433 std::shared_ptr<State> state;
1434 std::size_t index;
1435 };
1436
1437 std::shared_ptr<State> state_;
1438 };
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453 template <typename T>
1454 AsyncGenerator<T> MakeMergedGenerator(AsyncGenerator<AsyncGenerator<T>> source,
1455 int max_subscriptions) {
1456 return MergedGenerator<T>(std::move(source), max_subscriptions);
1457 }
1458
1459 template <typename T>
1460 Result<AsyncGenerator<T>> MakeSequencedMergedGenerator(
1461 AsyncGenerator<AsyncGenerator<T>> source, int max_subscriptions) {
1462 if (max_subscriptions < 0) {
1463 return Status::Invalid("max_subscriptions must be a positive integer");
1464 }
1465 if (max_subscriptions == 1) {
1466 return Status::Invalid("Use MakeConcatenatedGenerator if max_subscriptions is 1");
1467 }
1468 AsyncGenerator<AsyncGenerator<T>> autostarting_source = MakeMappedGenerator(
1469 std::move(source),
1470 [](const AsyncGenerator<T>& sub) { return MakeAutoStartingGenerator(sub); });
1471 AsyncGenerator<AsyncGenerator<T>> sub_readahead =
1472 MakeSerialReadaheadGenerator(std::move(autostarting_source), max_subscriptions - 1);
1473 return MakeConcatenatedGenerator(std::move(sub_readahead));
1474 }
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487 template <typename T>
1488 AsyncGenerator<T> MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> source) {
1489 return MergedGenerator<T>(std::move(source), 1);
1490 }
1491
1492 template <typename T>
1493 struct Enumerated {
1494 T value;
1495 int index;
1496 bool last;
1497 };
1498
1499 template <typename T>
1500 struct IterationTraits<Enumerated<T>> {
1501 static Enumerated<T> End() { return Enumerated<T>{IterationEnd<T>(), -1, false}; }
1502 static bool IsEnd(const Enumerated<T>& val) { return val.index < 0; }
1503 };
1504
1505
1506 template <typename T>
1507 class EnumeratingGenerator {
1508 public:
1509 EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
1510 : state_(std::make_shared<State>(std::move(source), std::move(initial_value))) {}
1511
1512 Future<Enumerated<T>> operator()() {
1513 if (state_->finished) {
1514 return AsyncGeneratorEnd<Enumerated<T>>();
1515 } else {
1516 auto state = state_;
1517 return state->source().Then([state](const T& next) {
1518 auto finished = IsIterationEnd<T>(next);
1519 auto prev = Enumerated<T>{state->prev_value, state->prev_index, finished};
1520 state->prev_value = next;
1521 state->prev_index++;
1522 state->finished = finished;
1523 return prev;
1524 });
1525 }
1526 }
1527
1528 private:
1529 struct State {
1530 State(AsyncGenerator<T> source, T initial_value)
1531 : source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) {
1532 finished = IsIterationEnd<T>(prev_value);
1533 }
1534
1535 AsyncGenerator<T> source;
1536 T prev_value;
1537 int prev_index;
1538 bool finished;
1539 };
1540
1541 std::shared_ptr<State> state_;
1542 };
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558 template <typename T>
1559 AsyncGenerator<Enumerated<T>> MakeEnumeratedGenerator(AsyncGenerator<T> source) {
1560 return FutureFirstGenerator<Enumerated<T>>(
1561 source().Then([source](const T& initial_value) -> AsyncGenerator<Enumerated<T>> {
1562 return EnumeratingGenerator<T>(std::move(source), initial_value);
1563 }));
1564 }
1565
1566
1567 template <typename T>
1568 class TransferringGenerator {
1569 public:
1570 explicit TransferringGenerator(AsyncGenerator<T> source, internal::Executor* executor)
1571 : source_(std::move(source)), executor_(executor) {}
1572
1573 Future<T> operator()() { return executor_->Transfer(source_()); }
1574
1575 private:
1576 AsyncGenerator<T> source_;
1577 internal::Executor* executor_;
1578 };
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595 template <typename T>
1596 AsyncGenerator<T> MakeTransferredGenerator(AsyncGenerator<T> source,
1597 internal::Executor* executor) {
1598 return TransferringGenerator<T>(std::move(source), executor);
1599 }
1600
1601
1602 template <typename T>
1603 class BackgroundGenerator {
1604 public:
1605 explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor, int max_q,
1606 int q_restart)
1607 : state_(std::make_shared<State>(io_executor, std::move(it), max_q, q_restart)),
1608 cleanup_(std::make_shared<Cleanup>(state_.get())) {}
1609
1610 Future<T> operator()() {
1611 auto guard = state_->mutex.Lock();
1612 Future<T> waiting_future;
1613 if (state_->queue.empty()) {
1614 if (state_->finished) {
1615 return AsyncGeneratorEnd<T>();
1616 } else {
1617 waiting_future = Future<T>::Make();
1618 state_->waiting_future = waiting_future;
1619 }
1620 } else {
1621 auto next = Future<T>::MakeFinished(std::move(state_->queue.front()));
1622 state_->queue.pop();
1623 if (state_->NeedsRestart()) {
1624 return state_->RestartTask(state_, std::move(guard), std::move(next));
1625 }
1626 return next;
1627 }
1628
1629 if (state_->NeedsRestart()) {
1630 return state_->RestartTask(state_, std::move(guard), std::move(waiting_future));
1631 }
1632 return waiting_future;
1633 }
1634
1635 protected:
1636 static constexpr uint64_t kUnlikelyThreadId{std::numeric_limits<uint64_t>::max()};
1637
1638 struct State {
1639 State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart)
1640 : io_executor(io_executor),
1641 max_q(max_q),
1642 q_restart(q_restart),
1643 it(std::move(it)),
1644 reading(false),
1645 finished(false),
1646 should_shutdown(false) {}
1647
1648 void ClearQueue() {
1649 while (!queue.empty()) {
1650 queue.pop();
1651 }
1652 }
1653
1654 bool TaskIsRunning() const { return task_finished.is_valid(); }
1655
1656 bool NeedsRestart() const {
1657 return !finished && !reading && static_cast<int>(queue.size()) <= q_restart;
1658 }
1659
1660 void DoRestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) {
1661
1662
1663 state->task_finished = Future<>::Make();
1664 state->reading = true;
1665 auto spawn_status = io_executor->Spawn(
1666 [state]() { BackgroundGenerator::WorkerTask(std::move(state)); });
1667 if (!spawn_status.ok()) {
1668
1669
1670 state->finished = true;
1671 state->task_finished = Future<>();
1672 if (waiting_future.has_value()) {
1673 auto to_deliver = std::move(waiting_future.value());
1674 waiting_future.reset();
1675 guard.Unlock();
1676 to_deliver.MarkFinished(spawn_status);
1677 } else {
1678 ClearQueue();
1679 queue.push(spawn_status);
1680 }
1681 }
1682 }
1683
1684 Future<T> RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard,
1685 Future<T> next) {
1686 if (TaskIsRunning()) {
1687
1688
1689
1690 return task_finished.Then([state, next]() {
1691
1692
1693
1694
1695 auto guard_ = state->mutex.Lock();
1696 state->DoRestartTask(state, std::move(guard_));
1697 return next;
1698 });
1699 }
1700
1701 DoRestartTask(std::move(state), std::move(guard));
1702 return next;
1703 }
1704
1705 internal::Executor* io_executor;
1706 const int max_q;
1707 const int q_restart;
1708 Iterator<T> it;
1709 std::atomic<uint64_t> worker_thread_id{kUnlikelyThreadId};
1710
1711
1712
1713 bool reading;
1714
1715 bool finished;
1716
1717 bool should_shutdown;
1718
1719 std::queue<Result<T>> queue;
1720 std::optional<Future<T>> waiting_future;
1721
1722
1723 Future<> task_finished;
1724 util::Mutex mutex;
1725 };
1726
1727
1728 struct Cleanup {
1729 explicit Cleanup(State* state) : state(state) {}
1730 ~Cleanup() {
1731
1732
1733
1734
1735
1736 assert(state->worker_thread_id.load() != ::arrow::internal::GetThreadId());
1737 Future<> finish_fut;
1738 {
1739 auto lock = state->mutex.Lock();
1740 if (!state->TaskIsRunning()) {
1741 return;
1742 }
1743
1744 state->should_shutdown = true;
1745 finish_fut = state->task_finished;
1746 }
1747
1748 Status st = finish_fut.status();
1749 ARROW_UNUSED(st);
1750 }
1751 State* state;
1752 };
1753
1754 static void WorkerTask(std::shared_ptr<State> state) {
1755 state->worker_thread_id.store(::arrow::internal::GetThreadId());
1756
1757 bool reading = true;
1758 while (reading) {
1759 auto next = state->it.Next();
1760
1761 Future<T> waiting_future;
1762 {
1763 auto guard = state->mutex.Lock();
1764
1765 if (state->should_shutdown) {
1766 state->finished = true;
1767 break;
1768 }
1769
1770 if (!next.ok() || IsIterationEnd<T>(*next)) {
1771
1772 state->finished = true;
1773 if (!next.ok()) {
1774 state->ClearQueue();
1775 }
1776 }
1777
1778
1779 if (state->waiting_future.has_value()) {
1780 waiting_future = std::move(state->waiting_future.value());
1781 state->waiting_future.reset();
1782 } else {
1783 state->queue.push(std::move(next));
1784
1785
1786 if (static_cast<int>(state->queue.size()) >= state->max_q) {
1787 state->reading = false;
1788 }
1789 }
1790 reading = state->reading && !state->finished;
1791 }
1792
1793
1794
1795
1796 if (waiting_future.is_valid()) {
1797 waiting_future.MarkFinished(next);
1798 }
1799 }
1800
1801
1802 Future<> task_finished;
1803 {
1804 auto guard = state->mutex.Lock();
1805
1806
1807 task_finished = state->task_finished;
1808 state->task_finished = Future<>();
1809 state->worker_thread_id.store(kUnlikelyThreadId);
1810 }
1811 task_finished.MarkFinished();
1812 }
1813
1814 std::shared_ptr<State> state_;
1815
1816
1817
1818
1819
1820 std::shared_ptr<Cleanup> cleanup_;
1821 };
1822
1823 constexpr int kDefaultBackgroundMaxQ = 32;
1824 constexpr int kDefaultBackgroundQRestart = 16;
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852 template <typename T>
1853 static Result<AsyncGenerator<T>> MakeBackgroundGenerator(
1854 Iterator<T> iterator, internal::Executor* io_executor,
1855 int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) {
1856 if (max_q < q_restart) {
1857 return Status::Invalid("max_q must be >= q_restart");
1858 }
1859 return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q, q_restart);
1860 }
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876 template <typename T>
1877 static Result<AsyncGenerator<T>> MakeBlockingGenerator(
1878 std::shared_ptr<Iterator<T>> iterator) {
1879 return [it = std::move(iterator)]() mutable -> Future<T> {
1880 return Future<T>::MakeFinished(it->Next());
1881 };
1882 }
1883
1884 template <typename T>
1885 static Result<AsyncGenerator<T>> MakeBlockingGenerator(Iterator<T> iterator) {
1886 return MakeBlockingGenerator(std::make_shared<Iterator<T>>(std::move(iterator)));
1887 }
1888
1889
1890 template <typename T>
1891 class GeneratorIterator {
1892 public:
1893 explicit GeneratorIterator(AsyncGenerator<T> source) : source_(std::move(source)) {}
1894
1895 Result<T> Next() { return source_().result(); }
1896
1897 private:
1898 AsyncGenerator<T> source_;
1899 };
1900
1901
1902
1903 template <typename T>
1904 Iterator<T> MakeGeneratorIterator(AsyncGenerator<T> source) {
1905 return Iterator<T>(GeneratorIterator<T>(std::move(source)));
1906 }
1907
1908
1909
1910
1911
1912
1913
1914 template <typename T>
1915 Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> it, int readahead_queue_size) {
1916 ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1));
1917 auto max_q = readahead_queue_size;
1918 auto q_restart = std::max(1, max_q / 2);
1919 ARROW_ASSIGN_OR_RAISE(
1920 auto background_generator,
1921 MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart));
1922
1923
1924 AsyncGenerator<T> owned_bg_generator = [io_executor, background_generator]() {
1925 return background_generator();
1926 };
1927 return MakeGeneratorIterator(std::move(owned_bg_generator));
1928 }
1929
1930
1931
1932
1933 template <typename T>
1934 std::function<Future<T>()> MakeSingleFutureGenerator(Future<T> future) {
1935 assert(future.is_valid());
1936 auto state = std::make_shared<Future<T>>(std::move(future));
1937 return [state]() -> Future<T> {
1938 auto fut = std::move(*state);
1939 if (fut.is_valid()) {
1940 return fut;
1941 } else {
1942 return AsyncGeneratorEnd<T>();
1943 }
1944 };
1945 }
1946
1947
1948
1949
1950 template <typename T>
1951 std::function<Future<T>()> MakeEmptyGenerator() {
1952 return []() -> Future<T> { return AsyncGeneratorEnd<T>(); };
1953 }
1954
1955
1956
1957
1958 template <typename T>
1959 AsyncGenerator<T> MakeFailingGenerator(Status st) {
1960 assert(!st.ok());
1961 auto state = std::make_shared<Status>(std::move(st));
1962 return [state]() -> Future<T> {
1963 auto st = std::move(*state);
1964 if (!st.ok()) {
1965 return st;
1966 } else {
1967 return AsyncGeneratorEnd<T>();
1968 }
1969 };
1970 }
1971
1972
1973
1974
1975 template <typename T>
1976 AsyncGenerator<T> MakeFailingGenerator(const Result<T>& result) {
1977 return MakeFailingGenerator<T>(result.status());
1978 }
1979
1980
1981
1982
1983
1984 template <typename T>
1985 AsyncGenerator<T> MakeGeneratorStartsWith(std::vector<T> initial_values,
1986 AsyncGenerator<T> following_values) {
1987 auto initial_values_vec_gen = MakeVectorGenerator(std::move(initial_values));
1988 auto gen_gen = MakeVectorGenerator<AsyncGenerator<T>>(
1989 {std::move(initial_values_vec_gen), std::move(following_values)});
1990 return MakeConcatenatedGenerator(std::move(gen_gen));
1991 }
1992
1993 template <typename T>
1994 struct CancellableGenerator {
1995 Future<T> operator()() {
1996 if (stop_token.IsStopRequested()) {
1997 return stop_token.Poll();
1998 }
1999 return source();
2000 }
2001
2002 AsyncGenerator<T> source;
2003 StopToken stop_token;
2004 };
2005
2006
2007
2008
2009 template <typename T>
2010 AsyncGenerator<T> MakeCancellable(AsyncGenerator<T> source, StopToken stop_token) {
2011 return CancellableGenerator<T>{std::move(source), std::move(stop_token)};
2012 }
2013
2014 template <typename T>
2015 class DefaultIfEmptyGenerator {
2016 public:
2017 DefaultIfEmptyGenerator(AsyncGenerator<T> source, T or_value)
2018 : state_(std::make_shared<State>(std::move(source), std::move(or_value))) {}
2019
2020 Future<T> operator()() {
2021 if (state_->first) {
2022 state_->first = false;
2023 struct {
2024 T or_value;
2025
2026 Result<T> operator()(const T& value) {
2027 if (IterationTraits<T>::IsEnd(value)) {
2028 return std::move(or_value);
2029 }
2030 return value;
2031 }
2032 } Continuation;
2033 Continuation.or_value = std::move(state_->or_value);
2034 return state_->source().Then(std::move(Continuation));
2035 }
2036 return state_->source();
2037 }
2038
2039 private:
2040 struct State {
2041 AsyncGenerator<T> source;
2042 T or_value;
2043 bool first;
2044 State(AsyncGenerator<T> source_, T or_value_)
2045 : source(std::move(source_)), or_value(std::move(or_value_)), first(true) {}
2046 };
2047 std::shared_ptr<State> state_;
2048 };
2049
2050
2051
2052
2053
2054 template <typename T>
2055 AsyncGenerator<T> MakeDefaultIfEmptyGenerator(AsyncGenerator<T> source, T or_value) {
2056 return DefaultIfEmptyGenerator<T>(std::move(source), std::move(or_value));
2057 }
2058 }