File indexing completed on 2025-08-28 08:27:00
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <atomic>
0021 #include <memory>
0022
0023 #include "arrow/testing/gtest_util.h"
0024 #include "arrow/util/async_generator.h"
0025 #include "arrow/util/future.h"
0026
0027 namespace arrow {
0028 namespace util {
0029
0030 template <typename T>
0031 AsyncGenerator<T> AsyncVectorIt(std::vector<T> v) {
0032 return MakeVectorGenerator(std::move(v));
0033 }
0034
0035 template <typename T>
0036 AsyncGenerator<T> FailAt(AsyncGenerator<T> src, int failing_index) {
0037 auto index = std::make_shared<std::atomic<int>>(0);
0038 return [src, index, failing_index]() {
0039 auto idx = index->fetch_add(1);
0040 if (idx >= failing_index) {
0041 return Future<T>::MakeFinished(Status::Invalid("XYZ"));
0042 }
0043 return src();
0044 };
0045 }
0046
0047 template <typename T>
0048 AsyncGenerator<T> SlowdownABit(AsyncGenerator<T> source) {
0049 return MakeMappedGenerator(std::move(source), [](const T& res) {
0050 return SleepABitAsync().Then([res]() { return res; });
0051 });
0052 }
0053
0054 template <typename T>
0055 class TrackingGenerator {
0056 public:
0057 explicit TrackingGenerator(AsyncGenerator<T> source)
0058 : state_(std::make_shared<State>(std::move(source))) {}
0059
0060 Future<T> operator()() {
0061 state_->num_read++;
0062 return state_->source();
0063 }
0064
0065 int num_read() { return state_->num_read.load(); }
0066
0067 private:
0068 struct State {
0069 explicit State(AsyncGenerator<T> source) : source(std::move(source)), num_read(0) {}
0070
0071 AsyncGenerator<T> source;
0072 std::atomic<int> num_read;
0073 };
0074
0075 std::shared_ptr<State> state_;
0076 };
0077
0078 }
0079 }