File indexing completed on 2025-08-28 08:27:10
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #pragma once
0019
0020 #include <cstdint>
0021 #include <memory>
0022 #include <queue>
0023 #include <type_traits>
0024 #include <unordered_set>
0025 #include <utility>
0026
0027 #include "arrow/result.h"
0028 #include "arrow/status.h"
0029 #include "arrow/util/cancel.h"
0030 #include "arrow/util/config.h"
0031 #include "arrow/util/functional.h"
0032 #include "arrow/util/future.h"
0033 #include "arrow/util/iterator.h"
0034 #include "arrow/util/macros.h"
0035 #include "arrow/util/visibility.h"
0036
0037 #if defined(_MSC_VER)
0038
0039 # pragma warning(disable : 4503)
0040 #endif
0041
0042 namespace arrow {
0043
0044
0045
0046
0047
0048
0049
0050
0051 ARROW_EXPORT int GetCpuThreadPoolCapacity();
0052
0053
0054
0055
0056
0057
0058
0059 ARROW_EXPORT Status SetCpuThreadPoolCapacity(int threads);
0060
0061 namespace internal {
0062
0063
0064
0065 struct TaskHints {
0066
0067 int32_t priority = 0;
0068
0069 int64_t io_size = -1;
0070
0071 int64_t cpu_cost = -1;
0072
0073 int64_t external_id = -1;
0074 };
0075
0076 class ARROW_EXPORT Executor {
0077 public:
0078 using StopCallback = internal::FnOnce<void(const Status&)>;
0079
0080 virtual ~Executor();
0081
0082
0083 template <typename Function>
0084 Status Spawn(Function&& func) {
0085 return SpawnReal(TaskHints{}, std::forward<Function>(func), StopToken::Unstoppable(),
0086 StopCallback{});
0087 }
0088 template <typename Function>
0089 Status Spawn(Function&& func, StopToken stop_token) {
0090 return SpawnReal(TaskHints{}, std::forward<Function>(func), std::move(stop_token),
0091 StopCallback{});
0092 }
0093 template <typename Function>
0094 Status Spawn(TaskHints hints, Function&& func) {
0095 return SpawnReal(hints, std::forward<Function>(func), StopToken::Unstoppable(),
0096 StopCallback{});
0097 }
0098 template <typename Function>
0099 Status Spawn(TaskHints hints, Function&& func, StopToken stop_token) {
0100 return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
0101 StopCallback{});
0102 }
0103 template <typename Function>
0104 Status Spawn(TaskHints hints, Function&& func, StopToken stop_token,
0105 StopCallback stop_callback) {
0106 return SpawnReal(hints, std::forward<Function>(func), std::move(stop_token),
0107 std::move(stop_callback));
0108 }
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124 template <typename T>
0125 Future<T> Transfer(Future<T> future) {
0126 return DoTransfer(std::move(future), false);
0127 }
0128
0129
0130
0131
0132
0133 template <typename T>
0134 Future<T> TransferAlways(Future<T> future) {
0135 return DoTransfer(std::move(future), true);
0136 }
0137
0138
0139
0140
0141 template <typename Function, typename... Args,
0142 typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
0143 Function && (Args && ...)>>
0144 Result<FutureType> Submit(TaskHints hints, StopToken stop_token, Function&& func,
0145 Args&&... args) {
0146 using ValueType = typename FutureType::ValueType;
0147
0148 auto future = FutureType::Make();
0149 auto task = std::bind(::arrow::detail::ContinueFuture{}, future,
0150 std::forward<Function>(func), std::forward<Args>(args)...);
0151 struct {
0152 WeakFuture<ValueType> weak_fut;
0153
0154 void operator()(const Status& st) {
0155 auto fut = weak_fut.get();
0156 if (fut.is_valid()) {
0157 fut.MarkFinished(st);
0158 }
0159 }
0160 } stop_callback{WeakFuture<ValueType>(future)};
0161 ARROW_RETURN_NOT_OK(SpawnReal(hints, std::move(task), std::move(stop_token),
0162 std::move(stop_callback)));
0163
0164 return future;
0165 }
0166
0167 template <typename Function, typename... Args,
0168 typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
0169 Function && (Args && ...)>>
0170 Result<FutureType> Submit(StopToken stop_token, Function&& func, Args&&... args) {
0171 return Submit(TaskHints{}, stop_token, std::forward<Function>(func),
0172 std::forward<Args>(args)...);
0173 }
0174
0175 template <typename Function, typename... Args,
0176 typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
0177 Function && (Args && ...)>>
0178 Result<FutureType> Submit(TaskHints hints, Function&& func, Args&&... args) {
0179 return Submit(std::move(hints), StopToken::Unstoppable(),
0180 std::forward<Function>(func), std::forward<Args>(args)...);
0181 }
0182
0183 template <typename Function, typename... Args,
0184 typename FutureType = typename ::arrow::detail::ContinueFuture::ForSignature<
0185 Function && (Args && ...)>>
0186 Result<FutureType> Submit(Function&& func, Args&&... args) {
0187 return Submit(TaskHints{}, StopToken::Unstoppable(), std::forward<Function>(func),
0188 std::forward<Args>(args)...);
0189 }
0190
0191
0192
0193 virtual int GetCapacity() = 0;
0194
0195
0196
0197 virtual bool OwnsThisThread() { return false; }
0198
0199
0200
0201
0202 virtual bool IsCurrentExecutor() { return OwnsThisThread(); }
0203
0204
0205
0206
0207 class ARROW_EXPORT Resource {
0208 public:
0209 virtual ~Resource() = default;
0210 };
0211
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221
0222
0223 virtual void KeepAlive(std::shared_ptr<Resource> resource);
0224
0225 protected:
0226 ARROW_DISALLOW_COPY_AND_ASSIGN(Executor);
0227
0228 Executor() = default;
0229
0230 template <typename T, typename FT = Future<T>, typename FTSync = typename FT::SyncType>
0231 Future<T> DoTransfer(Future<T> future, bool always_transfer = false) {
0232 auto transferred = Future<T>::Make();
0233 if (always_transfer) {
0234 CallbackOptions callback_options = CallbackOptions::Defaults();
0235 callback_options.should_schedule = ShouldSchedule::Always;
0236 callback_options.executor = this;
0237 auto sync_callback = [transferred](const FTSync& result) mutable {
0238 transferred.MarkFinished(result);
0239 };
0240 future.AddCallback(sync_callback, callback_options);
0241 return transferred;
0242 }
0243
0244
0245
0246 auto callback = [this, transferred](const FTSync& result) mutable {
0247 auto spawn_status =
0248 Spawn([transferred, result]() mutable { transferred.MarkFinished(result); });
0249 if (!spawn_status.ok()) {
0250 transferred.MarkFinished(spawn_status);
0251 }
0252 };
0253 auto callback_factory = [&callback]() { return callback; };
0254 if (future.TryAddCallback(callback_factory)) {
0255 return transferred;
0256 }
0257
0258
0259
0260 return future;
0261 }
0262
0263
0264 virtual Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
0265 StopCallback&&) = 0;
0266 };
0267
0268
0269
0270
0271
0272
0273
0274 class ARROW_EXPORT SerialExecutor : public Executor {
0275 public:
0276 template <typename T = ::arrow::internal::Empty>
0277 using TopLevelTask = internal::FnOnce<Future<T>(Executor*)>;
0278
0279 ~SerialExecutor() override;
0280
0281 int GetCapacity() override { return 1; };
0282 bool OwnsThisThread() override;
0283 Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
0284 StopCallback&&) override;
0285
0286
0287 int GetNumTasks();
0288
0289
0290
0291
0292
0293
0294
0295
0296
0297 template <typename T = internal::Empty, typename FT = Future<T>,
0298 typename FTSync = typename FT::SyncType>
0299 static FTSync RunInSerialExecutor(TopLevelTask<T> initial_task) {
0300 Future<T> fut = SerialExecutor().Run<T>(std::move(initial_task));
0301 return FutureToSync(fut);
0302 }
0303
0304
0305
0306
0307
0308
0309
0310
0311
0312 template <typename T>
0313 static Iterator<T> IterateGenerator(
0314 internal::FnOnce<Result<std::function<Future<T>()>>(Executor*)> initial_task) {
0315 auto serial_executor = std::unique_ptr<SerialExecutor>(new SerialExecutor());
0316 auto maybe_generator = std::move(initial_task)(serial_executor.get());
0317 if (!maybe_generator.ok()) {
0318 return MakeErrorIterator<T>(maybe_generator.status());
0319 }
0320 auto generator = maybe_generator.MoveValueUnsafe();
0321 struct SerialIterator {
0322 SerialIterator(std::unique_ptr<SerialExecutor> executor,
0323 std::function<Future<T>()> generator)
0324 : executor(std::move(executor)), generator(std::move(generator)) {}
0325 ARROW_DISALLOW_COPY_AND_ASSIGN(SerialIterator);
0326 ARROW_DEFAULT_MOVE_AND_ASSIGN(SerialIterator);
0327 ~SerialIterator() {
0328
0329
0330
0331
0332
0333
0334
0335
0336 if (executor && !executor->IsFinished()) {
0337 while (true) {
0338 Result<T> maybe_next = Next();
0339 if (!maybe_next.ok() || IsIterationEnd(*maybe_next)) {
0340 break;
0341 }
0342 }
0343 }
0344 }
0345
0346 Result<T> Next() {
0347 executor->Unpause();
0348
0349 Future<T> next_fut = generator();
0350 next_fut.AddCallback([this](const Result<T>& res) {
0351
0352 if (!res.ok() || IsIterationEnd(*res)) {
0353 executor->Finish();
0354 return;
0355 }
0356
0357
0358 executor->Pause();
0359 });
0360 #ifdef ARROW_ENABLE_THREADING
0361
0362
0363 executor->RunLoop();
0364 #else
0365 next_fut.Wait();
0366 #endif
0367 if (!next_fut.is_finished()) {
0368
0369
0370
0371 return Status::Invalid(
0372 "Serial executor terminated before next result computed");
0373 }
0374
0375
0376 return next_fut.result();
0377 }
0378
0379 std::unique_ptr<SerialExecutor> executor;
0380 std::function<Future<T>()> generator;
0381 };
0382 return Iterator<T>(SerialIterator{std::move(serial_executor), std::move(generator)});
0383 }
0384
0385 #ifndef ARROW_ENABLE_THREADING
0386
0387
0388
0389 static bool RunTasksOnAllExecutors();
0390 static SerialExecutor* GetCurrentExecutor();
0391
0392 bool IsCurrentExecutor() override;
0393
0394 #endif
0395
0396 protected:
0397 virtual void RunLoop();
0398
0399
0400 struct State;
0401 std::shared_ptr<State> state_;
0402
0403 SerialExecutor();
0404
0405
0406
0407
0408
0409 void Finish();
0410 bool IsFinished();
0411
0412
0413 void Pause();
0414 void Unpause();
0415
0416 template <typename T, typename FTSync = typename Future<T>::SyncType>
0417 Future<T> Run(TopLevelTask<T> initial_task) {
0418 auto final_fut = std::move(initial_task)(this);
0419 final_fut.AddCallback([this](const FTSync&) { Finish(); });
0420 RunLoop();
0421 return final_fut;
0422 }
0423
0424 #ifndef ARROW_ENABLE_THREADING
0425
0426
0427 static std::unordered_set<SerialExecutor*> all_executors;
0428
0429
0430
0431 static SerialExecutor* last_called_executor;
0432
0433
0434 static SerialExecutor* current_executor;
0435 #endif
0436 };
0437
0438 #ifdef ARROW_ENABLE_THREADING
0439
0440
0441
0442
0443
0444
0445
0446 class ARROW_EXPORT ThreadPool : public Executor {
0447 public:
0448
0449 static Result<std::shared_ptr<ThreadPool>> Make(int threads);
0450
0451
0452
0453 static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);
0454
0455
0456 ~ThreadPool() override;
0457
0458
0459
0460
0461 int GetCapacity() override;
0462
0463
0464 int GetNumTasks();
0465
0466 bool OwnsThisThread() override;
0467
0468
0469
0470
0471
0472
0473
0474 Status SetCapacity(int threads);
0475
0476
0477
0478 static int DefaultCapacity();
0479
0480
0481
0482
0483
0484
0485 Status Shutdown(bool wait = true);
0486
0487
0488
0489
0490 void WaitForIdle();
0491
0492 void KeepAlive(std::shared_ptr<Executor::Resource> resource) override;
0493
0494 struct State;
0495
0496 protected:
0497 FRIEND_TEST(TestThreadPool, SetCapacity);
0498 FRIEND_TEST(TestGlobalThreadPool, Capacity);
0499 ARROW_FRIEND_EXPORT friend ThreadPool* GetCpuThreadPool();
0500
0501 ThreadPool();
0502
0503 Status SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken,
0504 StopCallback&&) override;
0505
0506
0507 void CollectFinishedWorkersUnlocked();
0508
0509 void LaunchWorkersUnlocked(int threads);
0510
0511 int GetActualCapacity();
0512
0513 static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
0514
0515 std::shared_ptr<State> sp_state_;
0516 State* state_;
0517 bool shutdown_on_destroy_;
0518 };
0519 #else
0520
0521
0522
0523 class ARROW_EXPORT ThreadPool : public SerialExecutor {
0524 public:
0525 ARROW_FRIEND_EXPORT friend ThreadPool* GetCpuThreadPool();
0526
0527 static Result<std::shared_ptr<ThreadPool>> Make(int threads);
0528
0529
0530
0531 static Result<std::shared_ptr<ThreadPool>> MakeEternal(int threads);
0532
0533
0534 ~ThreadPool() override;
0535
0536
0537
0538
0539 int GetCapacity() override;
0540
0541 virtual int GetActualCapacity();
0542
0543 bool OwnsThisThread() override { return true; }
0544
0545
0546
0547
0548
0549 Status SetCapacity(int threads);
0550
0551 static int DefaultCapacity() { return 8; }
0552
0553
0554
0555
0556
0557
0558 Status Shutdown(bool wait = true);
0559
0560
0561
0562
0563 void WaitForIdle();
0564
0565 protected:
0566 static std::shared_ptr<ThreadPool> MakeCpuThreadPool();
0567 ThreadPool();
0568 };
0569
0570 #endif
0571
0572
0573 ARROW_EXPORT ThreadPool* GetCpuThreadPool();
0574
0575
0576
0577
0578
0579
0580
0581
0582
0583 template <typename Fut, typename ValueType = typename Fut::ValueType>
0584 typename Fut::SyncType RunSynchronously(FnOnce<Fut(Executor*)> get_future,
0585 bool use_threads) {
0586 if (use_threads) {
0587 auto fut = std::move(get_future)(GetCpuThreadPool());
0588 return FutureToSync(fut);
0589 } else {
0590 return SerialExecutor::RunInSerialExecutor<ValueType>(std::move(get_future));
0591 }
0592 }
0593
0594
0595
0596
0597
0598
0599
0600
0601
0602
0603
0604
0605 template <typename T>
0606 Iterator<T> IterateSynchronously(
0607 FnOnce<Result<std::function<Future<T>()>>(Executor*)> get_gen, bool use_threads) {
0608 if (use_threads) {
0609 auto maybe_gen = std::move(get_gen)(GetCpuThreadPool());
0610 if (!maybe_gen.ok()) {
0611 return MakeErrorIterator<T>(maybe_gen.status());
0612 }
0613 return MakeGeneratorIterator(*maybe_gen);
0614 } else {
0615 return SerialExecutor::IterateGenerator(std::move(get_gen));
0616 }
0617 }
0618
0619 }
0620 }