File indexing completed on 2025-08-28 08:27:09
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <utility>
0021 #include <vector>
0022
0023 #include "arrow/status.h"
0024 #include "arrow/util/functional.h"
0025 #include "arrow/util/thread_pool.h"
0026 #include "arrow/util/vector.h"
0027
0028 namespace arrow {
0029 namespace internal {
0030
0031
0032
0033
0034 template <class FUNCTION>
0035 Status ParallelFor(int num_tasks, FUNCTION&& func,
0036 Executor* executor = internal::GetCpuThreadPool()) {
0037 std::vector<Future<>> futures(num_tasks);
0038
0039 for (int i = 0; i < num_tasks; ++i) {
0040 ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i));
0041 }
0042 auto st = Status::OK();
0043 for (auto& fut : futures) {
0044 st &= fut.status();
0045 }
0046 return st;
0047 }
0048
0049 template <class FUNCTION, typename T,
0050 typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
0051 Future<std::vector<R>> ParallelForAsync(std::vector<T> inputs, FUNCTION&& func,
0052 Executor* executor = internal::GetCpuThreadPool(),
0053 TaskHints hints = TaskHints{}) {
0054 std::vector<Future<R>> futures(inputs.size());
0055 for (size_t i = 0; i < inputs.size(); ++i) {
0056 ARROW_ASSIGN_OR_RAISE(futures[i],
0057 executor->Submit(hints, func, i, std::move(inputs[i])));
0058 }
0059 return All(std::move(futures))
0060 .Then([](const std::vector<Result<R>>& results) -> Result<std::vector<R>> {
0061 return UnwrapOrRaise(results);
0062 });
0063 }
0064
0065
0066
0067
0068
0069 template <class FUNCTION>
0070 Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
0071 Executor* executor = internal::GetCpuThreadPool()) {
0072 if (use_threads) {
0073 return ParallelFor(num_tasks, std::forward<FUNCTION>(func), executor);
0074 } else {
0075 for (int i = 0; i < num_tasks; ++i) {
0076 RETURN_NOT_OK(func(i));
0077 }
0078 return Status::OK();
0079 }
0080 }
0081
0082
0083
0084
0085
0086 template <class FUNCTION, typename T,
0087 typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
0088 Future<std::vector<R>> OptionalParallelForAsync(
0089 bool use_threads, std::vector<T> inputs, FUNCTION&& func,
0090 Executor* executor = internal::GetCpuThreadPool(), TaskHints hints = TaskHints{}) {
0091 if (use_threads) {
0092 return ParallelForAsync(std::move(inputs), std::forward<FUNCTION>(func), executor,
0093 hints);
0094 } else {
0095 std::vector<R> result(inputs.size());
0096 for (size_t i = 0; i < inputs.size(); ++i) {
0097 ARROW_ASSIGN_OR_RAISE(result[i], func(i, inputs[i]));
0098 }
0099 return result;
0100 }
0101 }
0102
0103 }
0104 }