Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:00

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <atomic>
0021 #include <memory>
0022 
0023 #include "arrow/testing/gtest_util.h"
0024 #include "arrow/util/async_generator.h"
0025 #include "arrow/util/future.h"
0026 
0027 namespace arrow {
0028 namespace util {
0029 
0030 template <typename T>
0031 AsyncGenerator<T> AsyncVectorIt(std::vector<T> v) {
0032   return MakeVectorGenerator(std::move(v));
0033 }
0034 
0035 template <typename T>
0036 AsyncGenerator<T> FailAt(AsyncGenerator<T> src, int failing_index) {
0037   auto index = std::make_shared<std::atomic<int>>(0);
0038   return [src, index, failing_index]() {
0039     auto idx = index->fetch_add(1);
0040     if (idx >= failing_index) {
0041       return Future<T>::MakeFinished(Status::Invalid("XYZ"));
0042     }
0043     return src();
0044   };
0045 }
0046 
0047 template <typename T>
0048 AsyncGenerator<T> SlowdownABit(AsyncGenerator<T> source) {
0049   return MakeMappedGenerator(std::move(source), [](const T& res) {
0050     return SleepABitAsync().Then([res]() { return res; });
0051   });
0052 }
0053 
0054 template <typename T>
0055 class TrackingGenerator {
0056  public:
0057   explicit TrackingGenerator(AsyncGenerator<T> source)
0058       : state_(std::make_shared<State>(std::move(source))) {}
0059 
0060   Future<T> operator()() {
0061     state_->num_read++;
0062     return state_->source();
0063   }
0064 
0065   int num_read() { return state_->num_read.load(); }
0066 
0067  private:
0068   struct State {
0069     explicit State(AsyncGenerator<T> source) : source(std::move(source)), num_read(0) {}
0070 
0071     AsyncGenerator<T> source;
0072     std::atomic<int> num_read;
0073   };
0074 
0075   std::shared_ptr<State> state_;
0076 };
0077 
0078 }  // namespace util
0079 }  // namespace arrow