Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:10

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <cstdint>
0021 #include <memory>
0022 #include <queue>
0023 #include <type_traits>
0024 #include <unordered_set>
0025 #include <utility>
0026 
0027 #include "arrow/result.h"
0028 #include "arrow/status.h"
0029 #include "arrow/util/cancel.h"
0030 #include "arrow/util/config.h"
0031 #include "arrow/util/functional.h"
0032 #include "arrow/util/future.h"
0033 #include "arrow/util/iterator.h"
0034 #include "arrow/util/macros.h"
0035 #include "arrow/util/visibility.h"
0036 
0037 #if defined(_MSC_VER)
0038 // Disable harmless warning for decorated name length limit
0039 #  pragma warning(disable : 4503)
0040 #endif
0041 
0042 namespace arrow {
0043 
0044 /// \brief Get the capacity of the global thread pool
0045 ///
0046 /// Return the number of worker threads in the thread pool to which
0047 /// Arrow dispatches various CPU-bound tasks.  This is an ideal number,
0048 /// not necessarily the exact number of threads at a given point in time.
0049 ///
0050 /// You can change this number using SetCpuThreadPoolCapacity().
0051 ARROW_EXPORT int GetCpuThreadPoolCapacity();
0052 
0053 /// \brief Set the capacity of the global thread pool
0054 ///
0055 /// Set the number of worker threads int the thread pool to which
0056 /// Arrow dispatches various CPU-bound tasks.
0057 ///
0058 /// The current number is returned by GetCpuThreadPoolCapacity().
0059 ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads);
0060 
0061 namespace internal {
0062 
0063 // Hints about a task that may be used by an Executor.
0064 // They are ignored by the provided ThreadPool implementation.
0065 struct TaskHints {
0066   // The lower, the more urgent
0067   int32_t priority = 0;
0068   // The IO transfer size in bytes
0069   int64_t io_size = -1;
0070   // The approximate CPU cost in number of instructions
0071   int64_t cpu_cost = -1;
0072   // An application-specific ID
0073   int64_t external_id = -1;
0074 };
0075 
0076 class ARROW_EXPORT Executor {
0077  public:
0078   using StopCallback = internal::FnOnce<void(const Status&)>;
0079 
0080   virtual ~Executor();
0081 
0082   // Spawn a fire-and-forget task.
0083   template <typename Function>
0084   Status Spawn(Function&& func) {
0085     return SpawnReal(TaskHints{}, std::forward<Function>(func), StopToken::Unstoppable(),
0086                      StopCallback{});
0087   }
0088   template <typename Function>
0089   Status Spawn(Function&& func, StopToken stop_token) {
0090     return SpawnReal(TaskHints{}, std::forward<Function>(func), std::move(stop_token),
0091                      StopCallback{});
0092   }
0093   template <typename Function>
0094   Status Spawn(TaskHints hints, Function&& func) {
0095     return SpawnReal(hints, std::forward<Function>(func), StopToken::Unstoppable(),
0096                      StopCallback{});
0097   }
0098   template <typename Function>
0099   Status Spawn(TaskHints hints, Function&& func, StopToken stop_token) {
0100     return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
0101                      StopCallback{});
0102   }
0103   template <typename Function>
0104   Status Spawn(TaskHints hints, Function&& func, StopToken stop_token,
0105                StopCallback stop_callback) {
0106     return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
0107                      std::move(stop_callback));
0108   }
0109 
0110   // Transfers a future to this executor.  Any continuations added to the
0111   // returned future will run in this executor.  Otherwise they would run
0112   // on the same thread that called MarkFinished.
0113   //
0114   // This is necessary when (for example) an I/O task is completing a future.
0115   // The continuations of that future should run on the CPU thread pool keeping
0116   // CPU heavy work off the I/O thread pool.  So the I/O task should transfer
0117   // the future to the CPU executor before returning.
0118   //
0119   // By default this method will only transfer if the future is not already completed.  If
0120   // the future is already completed then any callback would be run synchronously and so
0121   // no transfer is typically necessary.  However, in cases where you want to force a
0122   // transfer (e.g. to help the scheduler break up units of work across multiple cores)
0123   // then you can override this behavior with `always_transfer`.
0124   template <typename T>
0125   Future<T> Transfer(Future<T> future) {
0126     return DoTransfer(std::move(future), false);
0127   }
0128 
0129   // Overload of Transfer which will always schedule callbacks on new threads even if the
0130   // future is finished when the callback is added.
0131   //
0132   // This can be useful in cases where you want to ensure parallelism
0133   template <typename T>
0134   Future<T> TransferAlways(Future<T> future) {
0135     return DoTransfer(std::move(future), true);
0136   }
0137 
0138   // Submit a callable and arguments for execution.  Return a future that
0139   // will return the callable's result value once.
0140   // The callable's arguments are copied before execution.
0141   template <typename Function, typename... Args,
0142             typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
0143                 Function && (Args && ...)>>
0144   Result<FutureType> Submit(TaskHints hints, StopToken stop_token, Function&& func,
0145                             Args&&... args) {
0146     using ValueType = typename FutureType::ValueType;
0147 
0148     auto future = FutureType::Make();
0149     auto task = std::bind(::arrow::detail::ContinueFuture{}, future,
0150                           std::forward<Function>(func), std::forward<Args>(args)...);
0151     struct {
0152       WeakFuture<ValueType> weak_fut;
0153 
0154       void operator()(const Status& st) {
0155         auto fut = weak_fut.get();
0156         if (fut.is_valid()) {
0157           fut.MarkFinished(st);
0158         }
0159       }
0160     } stop_callback{WeakFuture<ValueType>(future)};
0161     ARROW_RETURN_NOT_OK(SpawnReal(hints, std::move(task), std::move(stop_token),
0162                                   std::move(stop_callback)));
0163 
0164     return future;
0165   }
0166 
0167   template <typename Function, typename... Args,
0168             typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
0169                 Function && (Args && ...)>>
0170   Result<FutureType> Submit(StopToken stop_token, Function&& func, Args&&... args) {
0171     return Submit(TaskHints{}, stop_token, std::forward<Function>(func),
0172                   std::forward<Args>(args)...);
0173   }
0174 
0175   template <typename Function, typename... Args,
0176             typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
0177                 Function && (Args && ...)>>
0178   Result<FutureType> Submit(TaskHints hints, Function&& func, Args&&... args) {
0179     return Submit(std::move(hints), StopToken::Unstoppable(),
0180                   std::forward<Function>(func), std::forward<Args>(args)...);
0181   }
0182 
0183   template <typename Function, typename... Args,
0184             typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
0185                 Function && (Args && ...)>>
0186   Result<FutureType> Submit(Function&& func, Args&&... args) {
0187     return Submit(TaskHints{}, StopToken::Unstoppable(), std::forward<Function>(func),
0188                   std::forward<Args>(args)...);
0189   }
0190 
0191   // Return the level of parallelism (the number of tasks that may be executed
0192   // concurrently).  This may be an approximate number.
0193   virtual int GetCapacity() = 0;
0194 
0195   // Return true if the thread from which this function is called is owned by this
0196   // Executor. Returns false if this Executor does not support this property.
0197   virtual bool OwnsThisThread() { return false; }
0198 
0199   // Return true if this is the current executor being called
0200   // n.b. this defaults to just calling OwnsThisThread
0201   // unless the threadpool is disabled
0202   virtual bool IsCurrentExecutor() { return OwnsThisThread(); }
0203 
0204   /// \brief An interface to represent something with a custom destructor
0205   ///
0206   /// \see KeepAlive
0207   class ARROW_EXPORT Resource {
0208    public:
0209     virtual ~Resource() = default;
0210   };
0211 
0212   /// \brief Keep a resource alive until all executor threads have terminated
0213   ///
0214   /// Executors may have static storage duration.  In particular, the CPU and I/O
0215   /// executors are currently implemented this way.  These threads may access other
0216   /// objects with static storage duration such as the OpenTelemetry runtime context
0217   /// the default memory pool, or other static executors.
0218   ///
0219   /// The order in which these objects are destroyed is difficult to control.  In order
0220   /// to ensure those objects remain alive until all threads have finished those objects
0221   /// should be wrapped in a Resource object and passed into this method.  The given
0222   /// shared_ptr will be kept alive until all threads have finished their worker loops.
0223   virtual void KeepAlive(std::shared_ptr<Resource> resource);
0224 
0225  protected:
0226   ARROW_DISALLOW_COPY_AND_ASSIGN(Executor);
0227 
0228   Executor() = default;
0229 
0230   template <typename T, typename FT = Future<T>, typename FTSync = typename FT::SyncType>
0231   Future<T> DoTransfer(Future<T> future, bool always_transfer = false) {
0232     auto transferred = Future<T>::Make();
0233     if (always_transfer) {
0234       CallbackOptions callback_options = CallbackOptions::Defaults();
0235       callback_options.should_schedule = ShouldSchedule::Always;
0236       callback_options.executor = this;
0237       auto sync_callback = [transferred](const FTSync& result) mutable {
0238         transferred.MarkFinished(result);
0239       };
0240       future.AddCallback(sync_callback, callback_options);
0241       return transferred;
0242     }
0243 
0244     // We could use AddCallback's ShouldSchedule::IfUnfinished but we can save a bit of
0245     // work by doing the test here.
0246     auto callback = [this, transferred](const FTSync& result) mutable {
0247       auto spawn_status =
0248           Spawn([transferred, result]() mutable { transferred.MarkFinished(result); });
0249       if (!spawn_status.ok()) {
0250         transferred.MarkFinished(spawn_status);
0251       }
0252     };
0253     auto callback_factory = [&callback]() { return callback; };
0254     if (future.TryAddCallback(callback_factory)) {
0255       return transferred;
0256     }
0257     // If the future is already finished and we aren't going to force spawn a thread
0258     // then we don't need to add another layer of callback and can return the original
0259     // future
0260     return future;
0261   }
0262 
0263   // Subclassing API
0264   virtual Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
0265                            StopCallback&&) = 0;
0266 };
0267 
0268 /// \brief An executor implementation that runs all tasks on a single thread using an
0269 /// event loop.
0270 ///
0271 /// Note: Any sort of nested parallelism will deadlock this executor.  Blocking waits are
0272 /// fine but if one task needs to wait for another task it must be expressed as an
0273 /// asynchronous continuation.
0274 class ARROW_EXPORT SerialExecutor : public Executor {
0275  public:
0276   template <typename T = ::arrow::internal::Empty>
0277   using TopLevelTask = internal::FnOnce<Future<T>(Executor*)>;
0278 
0279   ~SerialExecutor() override;
0280 
0281   int GetCapacity() override { return 1; };
0282   bool OwnsThisThread() override;
0283   Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
0284                    StopCallback&&) override;
0285 
0286   // Return the number of tasks either running or in the queue.
0287   int GetNumTasks();
0288 
0289   /// \brief Runs the TopLevelTask and any scheduled tasks
0290   ///
0291   /// The TopLevelTask (or one of the tasks it schedules) must either return an invalid
0292   /// status or call the finish signal. Failure to do this will result in a deadlock.  For
0293   /// this reason it is preferable (if possible) to use the helper methods (below)
0294   /// RunSynchronously/RunSerially which delegates the responsibility onto a Future
0295   /// producer's existing responsibility to always mark a future finished (which can
0296   /// someday be aided by ARROW-12207).
0297   template <typename T = internal::Empty, typename FT = Future<T>,
0298             typename FTSync = typename FT::SyncType>
0299   static FTSync RunInSerialExecutor(TopLevelTask<T> initial_task) {
0300     Future<T> fut = SerialExecutor().Run<T>(std::move(initial_task));
0301     return FutureToSync(fut);
0302   }
0303 
0304   /// \brief Transform an AsyncGenerator into an Iterator
0305   ///
0306   /// An event loop will be created and each call to Next will power the event loop with
0307   /// the calling thread until the next item is ready to be delivered.
0308   ///
0309   /// Note: The iterator's destructor will run until the given generator is fully
0310   /// exhausted. If you wish to abandon iteration before completion then the correct
0311   /// approach is to use a stop token to cause the generator to exhaust early.
0312   template <typename T>
0313   static Iterator<T> IterateGenerator(
0314       internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
0315     auto serial_executor = std::unique_ptr<SerialExecutor>(new SerialExecutor());
0316     auto maybe_generator = std::move(initial_task)(serial_executor.get());
0317     if (!maybe_generator.ok()) {
0318       return MakeErrorIterator<T>(maybe_generator.status());
0319     }
0320     auto generator = maybe_generator.MoveValueUnsafe();
0321     struct SerialIterator {
0322       SerialIterator(std::unique_ptr<SerialExecutor> executor,
0323                      std::function<Future<T>()> generator)
0324           : executor(std::move(executor)), generator(std::move(generator)) {}
0325       ARROW_DISALLOW_COPY_AND_ASSIGN(SerialIterator);
0326       ARROW_DEFAULT_MOVE_AND_ASSIGN(SerialIterator);
0327       ~SerialIterator() {
0328         // A serial iterator must be consumed before it can be destroyed.  Allowing it to
0329         // do otherwise would lead to resource leakage.  There will likely be deadlocks at
0330         // this spot in the future but these will be the result of other bugs and not the
0331         // fact that we are forcing consumption here.
0332 
0333         // If a streaming API needs to support early abandonment then it should be done so
0334         // with a cancellation token and not simply discarding the iterator and expecting
0335         // the underlying work to clean up correctly.
0336         if (executor && !executor->IsFinished()) {
0337           while (true) {
0338             Result<T> maybe_next = Next();
0339             if (!maybe_next.ok() || IsIterationEnd(*maybe_next)) {
0340               break;
0341             }
0342           }
0343         }
0344       }
0345 
0346       Result<T> Next() {
0347         executor->Unpause();
0348         // This call may lead to tasks being scheduled in the serial executor
0349         Future<T> next_fut = generator();
0350         next_fut.AddCallback([this](const Result<T>& res) {
0351           // If we're done iterating we should drain the rest of the tasks in the executor
0352           if (!res.ok() || IsIterationEnd(*res)) {
0353             executor->Finish();
0354             return;
0355           }
0356           // Otherwise we will break out immediately, leaving the remaining tasks for
0357           // the next call.
0358           executor->Pause();
0359         });
0360 #ifdef ARROW_ENABLE_THREADING
0361         // future must run on this thread
0362         // Borrow this thread and run tasks until the future is finished
0363         executor->RunLoop();
0364 #else
0365         next_fut.Wait();
0366 #endif
0367         if (!next_fut.is_finished()) {
0368           // Not clear this is possible since RunLoop wouldn't generally exit
0369           // unless we paused/finished which would imply next_fut has been
0370           // finished.
0371           return Status::Invalid(
0372               "Serial executor terminated before next result computed");
0373         }
0374         // At this point we may still have tasks in the executor, that is ok.
0375         // We will run those tasks the next time through.
0376         return next_fut.result();
0377       }
0378 
0379       std::unique_ptr<SerialExecutor> executor;
0380       std::function<Future<T>()> generator;
0381     };
0382     return Iterator<T>(SerialIterator{std::move(serial_executor), std::move(generator)});
0383   }
0384 
0385 #ifndef ARROW_ENABLE_THREADING
0386   // run a pending task from loop
0387   // returns true if any tasks were run in the last go round the loop (i.e. if it
0388   // returns false, all executors are waiting)
0389   static bool RunTasksOnAllExecutors();
0390   static SerialExecutor* GetCurrentExecutor();
0391 
0392   bool IsCurrentExecutor() override;
0393 
0394 #endif
0395 
0396  protected:
0397   virtual void RunLoop();
0398 
0399   // State uses mutex
0400   struct State;
0401   std::shared_ptr<State> state_;
0402 
0403   SerialExecutor();
0404 
0405   // We mark the serial executor "finished" when there should be
0406   // no more tasks scheduled on it.  It's not strictly needed but
0407   // can help catch bugs where we are trying to use the executor
0408   // after we are done with it.
0409   void Finish();
0410   bool IsFinished();
0411   // We pause the executor when we are running an async generator
0412   // and we have received an item that we can deliver.
0413   void Pause();
0414   void Unpause();
0415 
0416   template <typename T, typename FTSync = typename Future<T>::SyncType>
0417   Future<T> Run(TopLevelTask<T> initial_task) {
0418     auto final_fut = std::move(initial_task)(this);
0419     final_fut.AddCallback([this](const FTSync&) { Finish(); });
0420     RunLoop();
0421     return final_fut;
0422   }
0423 
0424 #ifndef ARROW_ENABLE_THREADING
0425   // we have to run tasks from all live executors
0426   // during RunLoop if we don't have threading
0427   static std::unordered_set<SerialExecutor*> all_executors;
0428   // a pointer to the last one called by the loop
0429   // so all tasks get spawned equally
0430   // on multiple calls to RunTasksOnAllExecutors
0431   static SerialExecutor* last_called_executor;
0432   // without threading we can't tell which executor called the
0433   // current process - so we set it in spawning the task
0434   static SerialExecutor* current_executor;
0435 #endif  // ARROW_ENABLE_THREADING
0436 };
0437 
0438 #ifdef ARROW_ENABLE_THREADING
0439 
0440 /// An Executor implementation spawning tasks in FIFO manner on a fixed-size
0441 /// pool of worker threads.
0442 ///
0443 /// Note: Any sort of nested parallelism will deadlock this executor.  Blocking waits are
0444 /// fine but if one task needs to wait for another task it must be expressed as an
0445 /// asynchronous continuation.
0446 class ARROW_EXPORT ThreadPool : public Executor {
0447  public:
0448   // Construct a thread pool with the given number of worker threads
0449   static Result<std::shared_ptr<ThreadPool>> Make(int threads);
0450 
0451   // Like Make(), but takes care that the returned ThreadPool is compatible
0452   // with destruction late at process exit.
0453   static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);
0454 
0455   // Destroy thread pool; the pool will first be shut down
0456   ~ThreadPool() override;
0457 
0458   // Return the desired number of worker threads.
0459   // The actual number of workers may lag a bit before being adjusted to
0460   // match this value.
0461   int GetCapacity() override;
0462 
0463   // Return the number of tasks either running or in the queue.
0464   int GetNumTasks();
0465 
0466   bool OwnsThisThread() override;
0467   // Dynamically change the number of worker threads.
0468   //
0469   // This function always returns immediately.
0470   // If fewer threads are running than this number, new threads are spawned
0471   // on-demand when needed for task execution.
0472   // If more threads are running than this number, excess threads are reaped
0473   // as soon as possible.
0474   Status SetCapacity(int threads);
0475 
0476   // Heuristic for the default capacity of a thread pool for CPU-bound tasks.
0477   // This is exposed as a static method to help with testing.
0478   static int DefaultCapacity();
0479 
0480   // Shutdown the pool.  Once the pool starts shutting down, new tasks
0481   // cannot be submitted anymore.
0482   // If "wait" is true, shutdown waits for all pending tasks to be finished.
0483   // If "wait" is false, workers are stopped as soon as currently executing
0484   // tasks are finished.
0485   Status Shutdown(bool wait = true);
0486 
0487   // Wait for the thread pool to become idle
0488   //
0489   // This is useful for sequencing tests
0490   void WaitForIdle();
0491 
0492   void KeepAlive(std::shared_ptr<Executor::Resource> resource) override;
0493 
0494   struct State;
0495 
0496  protected:
0497   FRIEND_TEST(TestThreadPool, SetCapacity);
0498   FRIEND_TEST(TestGlobalThreadPool, Capacity);
0499   ARROW_FRIEND_EXPORT friend ThreadPool* GetCpuThreadPool();
0500 
0501   ThreadPool();
0502 
0503   Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
0504                    StopCallback&&) override;
0505 
0506   // Collect finished worker threads, making sure the OS threads have exited
0507   void CollectFinishedWorkersUnlocked();
0508   // Launch a given number of additional workers
0509   void LaunchWorkersUnlocked(int threads);
0510   // Get the current actual capacity
0511   int GetActualCapacity();
0512 
0513   static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
0514 
0515   std::shared_ptr<State> sp_state_;
0516   State* state_;
0517   bool shutdown_on_destroy_;
0518 };
0519 #else  // ARROW_ENABLE_THREADING
0520 // an executor implementation which pretends to be a thread pool but runs everything
0521 // on the main thread using a static queue (shared between all thread pools, otherwise
0522 // cross-threadpool dependencies will break everything)
0523 class ARROW_EXPORT ThreadPool : public SerialExecutor {
0524  public:
0525   ARROW_FRIEND_EXPORT friend ThreadPool* GetCpuThreadPool();
0526 
0527   static Result<std::shared_ptr<ThreadPool>> Make(int threads);
0528 
0529   // Like Make(), but takes care that the returned ThreadPool is compatible
0530   // with destruction late at process exit.
0531   static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);
0532 
0533   // Destroy thread pool; the pool will first be shut down
0534   ~ThreadPool() override;
0535 
0536   // Return the desired number of worker threads.
0537   // The actual number of workers may lag a bit before being adjusted to
0538   // match this value.
0539   int GetCapacity() override;
0540 
0541   virtual int GetActualCapacity();
0542 
0543   bool OwnsThisThread() override { return true; }
0544 
0545   // Dynamically change the number of worker threads.
0546   // without threading this is equal to the
0547   // number of tasks that can be running at once
0548   // (inside each other)
0549   Status SetCapacity(int threads);
0550 
0551   static int DefaultCapacity() { return 8; }
0552 
0553   // Shutdown the pool.  Once the pool starts shutting down, new tasks
0554   // cannot be submitted anymore.
0555   // If "wait" is true, shutdown waits for all pending tasks to be finished.
0556   // If "wait" is false, workers are stopped as soon as currently executing
0557   // tasks are finished.
0558   Status Shutdown(bool wait = true);
0559 
0560   // Wait for the thread pool to become idle
0561   //
0562   // This is useful for sequencing tests
0563   void WaitForIdle();
0564 
0565  protected:
0566   static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
0567   ThreadPool();
0568 };
0569 
0570 #endif  // ARROW_ENABLE_THREADING
0571 
0572 // Return the process-global thread pool for CPU-bound tasks.
0573 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
0574 
0575 /// \brief Potentially run an async operation serially (if use_threads is false)
0576 /// \see RunSerially
0577 ///
0578 /// If `use_threads` is true, the global CPU executor is used.
0579 /// If `use_threads` is false, a temporary SerialExecutor is used.
0580 /// `get_future` is called (from this thread) with the chosen executor and must
0581 /// return a future that will eventually finish. This function returns once the
0582 /// future has finished.
0583 template <typename Fut, typename ValueType = typename Fut::ValueType>
0584 typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
0585                                         bool use_threads) {
0586   if (use_threads) {
0587     auto fut = std::move(get_future)(GetCpuThreadPool());
0588     return FutureToSync(fut);
0589   } else {
0590     return SerialExecutor::RunInSerialExecutor<ValueType>(std::move(get_future));
0591   }
0592 }
0593 
0594 /// \brief Potentially iterate an async generator serially (if use_threads is false)
0595 /// \see IterateGenerator
0596 ///
0597 /// If `use_threads` is true, the global CPU executor will be used.  Each call to
0598 ///   the iterator will simply wait until the next item is available.  Tasks may run in
0599 ///   the background between calls.
0600 ///
0601 /// If `use_threads` is false, the calling thread only will be used.  Each call to
0602 ///   the iterator will use the calling thread to do enough work to generate one item.
0603 ///   Tasks will be left in a queue until the next call and no work will be done between
0604 ///   calls.
0605 template <typename T>
0606 Iterator<T> IterateSynchronously(
0607     FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads) {
0608   if (use_threads) {
0609     auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
0610     if (!maybe_gen.ok()) {
0611       return MakeErrorIterator<T>(maybe_gen.status());
0612     }
0613     return MakeGeneratorIterator(*maybe_gen);
0614   } else {
0615     return SerialExecutor::IterateGenerator(std::move(get_gen));
0616   }
0617 }
0618 
0619 }  // namespace internal
0620 }  // namespace arrow