Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:09

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <utility>
0021 #include <vector>
0022 
0023 #include "arrow/status.h"
0024 #include "arrow/util/functional.h"
0025 #include "arrow/util/thread_pool.h"
0026 #include "arrow/util/vector.h"
0027 
0028 namespace arrow {
0029 namespace internal {
0030 
0031 // A parallelizer that takes a `Status(int)` function and calls it with
0032 // arguments between 0 and `num_tasks - 1`, on an arbitrary number of threads.
0033 
0034 template <class FUNCTION>
0035 Status ParallelFor(int num_tasks, FUNCTION&& func,
0036                    Executor* executor = internal::GetCpuThreadPool()) {
0037   std::vector<Future<>> futures(num_tasks);
0038 
0039   for (int i = 0; i < num_tasks; ++i) {
0040     ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i));
0041   }
0042   auto st = Status::OK();
0043   for (auto& fut : futures) {
0044     st &= fut.status();
0045   }
0046   return st;
0047 }
0048 
0049 template <class FUNCTION, typename T,
0050           typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
0051 Future<std::vector<R>> ParallelForAsync(std::vector<T> inputs, FUNCTION&& func,
0052                                         Executor* executor = internal::GetCpuThreadPool(),
0053                                         TaskHints hints = TaskHints{}) {
0054   std::vector<Future<R>> futures(inputs.size());
0055   for (size_t i = 0; i < inputs.size(); ++i) {
0056     ARROW_ASSIGN_OR_RAISE(futures[i],
0057                           executor->Submit(hints, func, i, std::move(inputs[i])));
0058   }
0059   return All(std::move(futures))
0060       .Then([](const std::vector<Result<R>>& results) -> Result<std::vector<R>> {
0061         return UnwrapOrRaise(results);
0062       });
0063 }
0064 
0065 // A parallelizer that takes a `Status(int)` function and calls it with
0066 // arguments between 0 and `num_tasks - 1`, in sequence or in parallel,
0067 // depending on the input boolean.
0068 
0069 template <class FUNCTION>
0070 Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
0071                            Executor* executor = internal::GetCpuThreadPool()) {
0072   if (use_threads) {
0073     return ParallelFor(num_tasks, std::forward<FUNCTION>(func), executor);
0074   } else {
0075     for (int i = 0; i < num_tasks; ++i) {
0076       RETURN_NOT_OK(func(i));
0077     }
0078     return Status::OK();
0079   }
0080 }
0081 
0082 // A parallelizer that takes a `Result<R>(int index, T item)` function and
0083 // calls it with each item from the input array, in sequence or in parallel,
0084 // depending on the input boolean.
0085 
0086 template <class FUNCTION, typename T,
0087           typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
0088 Future<std::vector<R>> OptionalParallelForAsync(
0089     bool use_threads, std::vector<T> inputs, FUNCTION&& func,
0090     Executor* executor = internal::GetCpuThreadPool(), TaskHints hints = TaskHints{}) {
0091   if (use_threads) {
0092     return ParallelForAsync(std::move(inputs), std::forward<FUNCTION>(func), executor,
0093                             hints);
0094   } else {
0095     std::vector<R> result(inputs.size());
0096     for (size_t i = 0; i < inputs.size(); ++i) {
0097       ARROW_ASSIGN_OR_RAISE(result[i], func(i, inputs[i]));
0098     }
0099     return result;
0100   }
0101 }
0102 
0103 }  // namespace internal
0104 }  // namespace arrow