![]() |
|
|||
File indexing completed on 2025-08-28 08:27:02
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <atomic> 0021 #include <functional> 0022 #include <list> 0023 #include <memory> 0024 0025 #include "arrow/result.h" 0026 #include "arrow/status.h" 0027 #include "arrow/util/cancel.h" 0028 #include "arrow/util/functional.h" 0029 #include "arrow/util/future.h" 0030 #include "arrow/util/iterator.h" 0031 #include "arrow/util/mutex.h" 0032 #include "arrow/util/thread_pool.h" 0033 #include "arrow/util/tracing.h" 0034 0035 namespace arrow { 0036 0037 using internal::FnOnce; 0038 0039 namespace util { 0040 0041 /// A utility which keeps tracks of, and schedules, asynchronous tasks 0042 /// 0043 /// An asynchronous task has a synchronous component and an asynchronous component. 0044 /// The synchronous component typically schedules some kind of work on an external 0045 /// resource (e.g. the I/O thread pool or some kind of kernel-based asynchronous 0046 /// resource like io_uring). The asynchronous part represents the work 0047 /// done on that external resource. Executing the synchronous part will be referred 0048 /// to as "submitting the task" since this usually includes submitting the asynchronous 0049 /// portion to the external thread pool. 0050 /// 0051 /// By default the scheduler will submit the task (execute the synchronous part) as 0052 /// soon as it is added, assuming the underlying thread pool hasn't terminated or the 0053 /// scheduler hasn't aborted. In this mode, the scheduler is simply acting as 0054 /// a simple task group. 0055 /// 0056 /// A task scheduler starts with an initial task. That task, and all subsequent tasks 0057 /// are free to add subtasks. Once all submitted tasks finish the scheduler will 0058 /// finish. Note, it is not an error to add additional tasks after a scheduler has 0059 /// aborted. These tasks will be ignored and never submitted. The scheduler returns a 0060 /// future which will complete when all submitted tasks have finished executing. Once all 0061 /// tasks have been finished the scheduler is invalid and should no longer be used. 0062 /// 0063 /// Task failure (either the synchronous portion or the asynchronous portion) will cause 0064 /// the scheduler to enter an aborted state. The first such failure will be reported in 0065 /// the final task future. 0066 class ARROW_EXPORT AsyncTaskScheduler { 0067 public: 0068 /// Destructor for AsyncTaskScheduler 0069 /// 0070 /// The lifetime of the task scheduled is managed automatically. The scheduler 0071 /// will remain valid while any tasks are running (and can always be safely accessed) 0072 /// within tasks) and will be destroyed as soon as all tasks have finished. 0073 virtual ~AsyncTaskScheduler() = default; 0074 /// An interface for a task 0075 /// 0076 /// Users may want to override this, for example, to add priority 0077 /// information for use by a queue. 0078 class Task { 0079 public: 0080 virtual ~Task() = default; 0081 /// Submit the task 0082 /// 0083 /// This will be called by the scheduler at most once when there 0084 /// is space to run the task. This is expected to be a fairly quick 0085 /// function that simply submits the actual task work to an external 0086 /// resource (e.g. I/O thread pool). 0087 /// 0088 /// If this call fails then the scheduler will enter an aborted state. 0089 virtual Result<Future<>> operator()() = 0; 0090 /// The cost of the task 0091 /// 0092 /// A ThrottledAsyncTaskScheduler can be used to limit the number of concurrent tasks. 0093 /// A custom cost may be used, for example, if you would like to limit the number of 0094 /// tasks based on the total expected RAM usage of the tasks (this is done in the 0095 /// scanner) 0096 virtual int cost() const { return 1; } 0097 /// The name of the task 0098 /// 0099 /// This is used for debugging and traceability. The returned view must remain 0100 /// valid for the lifetime of the task. 0101 virtual std::string_view name() const = 0; 0102 0103 /// a span tied to the lifetime of the task, for internal use only 0104 tracing::Span span; 0105 }; 0106 0107 /// Add a task to the scheduler 0108 /// 0109 /// If the scheduler is in an aborted state this call will return false and the task 0110 /// will never be run. This is harmless and does not need to be guarded against. 0111 /// 0112 /// The return value for this call can usually be ignored. There is little harm in 0113 /// attempting to add tasks to an aborted scheduler. It is only included for callers 0114 /// that want to avoid future task generation to save effort. 0115 /// 0116 /// \param task the task to submit 0117 /// 0118 /// A task's name must remain valid for the duration of the task. It is used for 0119 /// debugging (e.g. when debugging a deadlock to see which tasks still remain) and for 0120 /// traceability (the name will be used for spans assigned to the task) 0121 /// 0122 /// \return true if the task was submitted or queued, false if the task was ignored 0123 virtual bool AddTask(std::unique_ptr<Task> task) = 0; 0124 0125 /// Adds an async generator to the scheduler 0126 /// 0127 /// The async generator will be visited, one item at a time. Submitting a task 0128 /// will consist of polling the generator for the next future. The generator's future 0129 /// will then represent the task itself. 0130 /// 0131 /// This visits the task serially without readahead. If readahead or parallelism 0132 /// is desired then it should be added in the generator itself. 0133 /// 0134 /// The generator itself will be kept alive until all tasks have been completed. 0135 /// However, if the scheduler is aborted, the generator will be destroyed as soon as the 0136 /// next item would be requested. 0137 /// 0138 /// \param generator the generator to submit to the scheduler 0139 /// \param visitor a function which visits each generator future as it completes 0140 /// \param name a name which will be used for each submitted task 0141 template <typename T> 0142 bool AddAsyncGenerator(std::function<Future<T>()> generator, 0143 std::function<Status(const T&)> visitor, std::string_view name); 0144 0145 template <typename Callable> 0146 struct SimpleTask : public Task { 0147 SimpleTask(Callable callable, std::string_view name) 0148 : callable(std::move(callable)), name_(name) {} 0149 SimpleTask(Callable callable, std::string name) 0150 : callable(std::move(callable)), owned_name_(std::move(name)) { 0151 name_ = *owned_name_; 0152 } 0153 Result<Future<>> operator()() override { return callable(); } 0154 std::string_view name() const override { return name_; } 0155 Callable callable; 0156 std::string_view name_; 0157 std::optional<std::string> owned_name_; 0158 }; 0159 0160 /// Add a task with cost 1 to the scheduler 0161 /// 0162 /// \param callable a "submit" function that should return a future 0163 /// \param name a name for the task 0164 /// 0165 /// `name` must remain valid until the task has been submitted AND the returned 0166 /// future completes. It is used for debugging and tracing. 0167 /// 0168 /// \see AddTask for more details 0169 template <typename Callable> 0170 bool AddSimpleTask(Callable callable, std::string_view name) { 0171 return AddTask(std::make_unique<SimpleTask<Callable>>(std::move(callable), name)); 0172 } 0173 0174 /// Add a task with cost 1 to the scheduler 0175 /// 0176 /// This is an overload of \see AddSimpleTask that keeps `name` alive 0177 /// in the task. 0178 template <typename Callable> 0179 bool AddSimpleTask(Callable callable, std::string name) { 0180 return AddTask( 0181 std::make_unique<SimpleTask<Callable>>(std::move(callable), std::move(name))); 0182 } 0183 0184 /// Construct a scheduler 0185 /// 0186 /// \param initial_task The initial task which is responsible for adding 0187 /// the first subtasks to the scheduler. 0188 /// \param abort_callback A callback that will be triggered immediately after a task 0189 /// fails while other tasks may still be running. Nothing needs to be done here, 0190 /// when a task fails the scheduler will stop accepting new tasks and eventually 0191 /// return the error. However, this callback can be used to more quickly end 0192 /// long running tasks that have already been submitted. Defaults to doing 0193 /// nothing. 0194 /// \param stop_token An optional stop token that will allow cancellation of the 0195 /// scheduler. This will be checked before each task is submitted and, in the 0196 /// event of a cancellation, the scheduler will enter an aborted state. This is 0197 /// a graceful cancellation and submitted tasks will still complete. 0198 /// \return A future that will be completed when the initial task and all subtasks have 0199 /// finished. 0200 static Future<> Make( 0201 FnOnce<Status(AsyncTaskScheduler*)> initial_task, 0202 FnOnce<void(const Status&)> abort_callback = [](const Status&) {}, 0203 StopToken stop_token = StopToken::Unstoppable()); 0204 0205 /// A span tracking execution of the scheduler's tasks, for internal use only 0206 virtual const tracing::Span& span() const = 0; 0207 }; 0208 0209 class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler { 0210 public: 0211 /// An interface for a task queue 0212 /// 0213 /// A queue's methods will not be called concurrently 0214 class Queue { 0215 public: 0216 virtual ~Queue() = default; 0217 /// Push a task to the queue 0218 /// 0219 /// \param task the task to enqueue 0220 virtual void Push(std::unique_ptr<Task> task) = 0; 0221 /// Pop the next task from the queue 0222 virtual std::unique_ptr<Task> Pop() = 0; 0223 /// Peek the next task in the queue 0224 virtual const Task& Peek() = 0; 0225 /// Check if the queue is empty 0226 virtual bool Empty() = 0; 0227 /// Purge the queue of all items 0228 virtual void Purge() = 0; 0229 virtual std::size_t Size() const = 0; 0230 }; 0231 0232 class Throttle { 0233 public: 0234 virtual ~Throttle() = default; 0235 /// Acquire amt permits 0236 /// 0237 /// If nullopt is returned then the permits were immediately 0238 /// acquired and the caller can proceed. If a future is returned then the caller 0239 /// should wait for the future to complete first. When the returned future completes 0240 /// the permits have NOT been acquired and the caller must call Acquire again 0241 /// 0242 /// \param amt the number of permits to acquire 0243 virtual std::optional<Future<>> TryAcquire(int amt) = 0; 0244 /// Release amt permits 0245 /// 0246 /// This will possibly complete waiting futures and should probably not be 0247 /// called while holding locks. 0248 /// 0249 /// \param amt the number of permits to release 0250 virtual void Release(int amt) = 0; 0251 0252 /// The size of the largest task that can run 0253 /// 0254 /// Incoming tasks will have their cost latched to this value to ensure 0255 /// they can still run (although they will be the only thing allowed to 0256 /// run at that time). 0257 virtual int Capacity() = 0; 0258 0259 /// Pause the throttle 0260 /// 0261 /// Any tasks that have been submitted already will continue. However, no new tasks 0262 /// will be run until the throttle is resumed. 0263 virtual void Pause() = 0; 0264 /// Resume the throttle 0265 /// 0266 /// Allows task to be submitted again. If there is a max_concurrent_cost limit then 0267 /// it will still apply. 0268 virtual void Resume() = 0; 0269 }; 0270 0271 /// Pause the throttle 0272 /// 0273 /// Any tasks that have been submitted already will continue. However, no new tasks 0274 /// will be run until the throttle is resumed. 0275 virtual void Pause() = 0; 0276 /// Resume the throttle 0277 /// 0278 /// Allows task to be submitted again. If there is a max_concurrent_cost limit then 0279 /// it will still apply. 0280 virtual void Resume() = 0; 0281 /// Return the number of tasks queued but not yet submitted 0282 virtual std::size_t QueueSize() = 0; 0283 0284 /// Create a throttled view of a scheduler 0285 /// 0286 /// Tasks added via this view will be subjected to the throttle and, if the tasks cannot 0287 /// run immediately, will be placed into a queue. 0288 /// 0289 /// Although a shared_ptr is returned it should generally be assumed that the caller 0290 /// is being given exclusive ownership. The shared_ptr is used to share the view with 0291 /// queued and submitted tasks and the lifetime of those is unpredictable. It is 0292 /// important the caller keep the returned pointer alive for as long as they plan to add 0293 /// tasks to the view. 0294 /// 0295 /// \param scheduler a scheduler to submit tasks to after throttling 0296 /// 0297 /// This can be the root scheduler, another throttled scheduler, or a task group. These 0298 /// are all composable. 0299 /// 0300 /// \param max_concurrent_cost the maximum amount of cost allowed to run at any one time 0301 /// 0302 /// If a task is added that has a cost greater than max_concurrent_cost then its cost 0303 /// will be reduced to max_concurrent_cost so that it is still possible for the task to 0304 /// run. 0305 /// 0306 /// \param queue the queue to use when tasks cannot be submitted 0307 /// 0308 /// By default a FIFO queue will be used. However, a custom queue can be provided if 0309 /// some tasks have higher priority than other tasks. 0310 static std::shared_ptr<ThrottledAsyncTaskScheduler> Make( 0311 AsyncTaskScheduler* scheduler, int max_concurrent_cost, 0312 std::unique_ptr<Queue> queue = NULLPTR); 0313 0314 /// @brief Create a ThrottledAsyncTaskScheduler using a custom throttle 0315 /// 0316 /// \see Make 0317 static std::shared_ptr<ThrottledAsyncTaskScheduler> MakeWithCustomThrottle( 0318 AsyncTaskScheduler* scheduler, std::unique_ptr<Throttle> throttle, 0319 std::unique_ptr<Queue> queue = NULLPTR); 0320 }; 0321 0322 /// A utility to keep track of a collection of tasks 0323 /// 0324 /// Often it is useful to keep track of some state that only needs to stay alive 0325 /// for some small collection of tasks, or to perform some kind of final cleanup 0326 /// when a collection of tasks is finished. 0327 /// 0328 /// For example, when scanning, we need to keep the file reader alive while all scan 0329 /// tasks run for a given file, and then we can gracefully close it when we finish the 0330 /// file. 0331 class ARROW_EXPORT AsyncTaskGroup : public AsyncTaskScheduler { 0332 public: 0333 /// Destructor for the task group 0334 /// 0335 /// The destructor might trigger the finish callback. If the finish callback fails 0336 /// then the error will be reported as a task on the scheduler. 0337 /// 0338 /// Failure to destroy the async task group will not prevent the scheduler from 0339 /// finishing. If the scheduler finishes before the async task group is done then 0340 /// the finish callback will be run immediately when the async task group finishes. 0341 /// 0342 /// If the scheduler has aborted then the finish callback will not run. 0343 ~AsyncTaskGroup() = default; 0344 /// Create an async task group 0345 /// 0346 /// The finish callback will not run until the task group is destroyed and all 0347 /// tasks are finished so you will generally want to reset / destroy the returned 0348 /// unique_ptr at some point. 0349 /// 0350 /// \param scheduler The underlying scheduler to submit tasks to 0351 /// \param finish_callback A callback that will be run only after the task group has 0352 /// been destroyed and all tasks added by the group have 0353 /// finished. 0354 /// 0355 /// Note: in error scenarios the finish callback may not run. However, it will still, 0356 /// of course, be destroyed. 0357 static std::unique_ptr<AsyncTaskGroup> Make(AsyncTaskScheduler* scheduler, 0358 FnOnce<Status()> finish_callback); 0359 }; 0360 0361 /// Create a task group that is also throttled 0362 /// 0363 /// This is a utility factory that creates a throttled view of a scheduler and then 0364 /// wraps that throttled view with a task group that destroys the throttle when finished. 0365 /// 0366 /// \see ThrottledAsyncTaskScheduler 0367 /// \see AsyncTaskGroup 0368 /// \param target the underlying scheduler to submit tasks to 0369 /// \param max_concurrent_cost the maximum amount of cost allowed to run at any one time 0370 /// \param queue the queue to use when tasks cannot be submitted 0371 /// \param finish_callback A callback that will be run only after the task group has 0372 /// been destroyed and all tasks added by the group have finished 0373 ARROW_EXPORT std::unique_ptr<ThrottledAsyncTaskScheduler> MakeThrottledAsyncTaskGroup( 0374 AsyncTaskScheduler* target, int max_concurrent_cost, 0375 std::unique_ptr<ThrottledAsyncTaskScheduler::Queue> queue, 0376 FnOnce<Status()> finish_callback); 0377 0378 // Defined down here to avoid circular dependency between AsyncTaskScheduler and 0379 // AsyncTaskGroup 0380 template <typename T> 0381 bool AsyncTaskScheduler::AddAsyncGenerator(std::function<Future<T>()> generator, 0382 std::function<Status(const T&)> visitor, 0383 std::string_view name) { 0384 struct State { 0385 State(std::function<Future<T>()> generator, std::function<Status(const T&)> visitor, 0386 std::unique_ptr<AsyncTaskGroup> task_group, std::string_view name) 0387 : generator(std::move(generator)), 0388 visitor(std::move(visitor)), 0389 task_group(std::move(task_group)), 0390 name(name) {} 0391 std::function<Future<T>()> generator; 0392 std::function<Status(const T&)> visitor; 0393 std::unique_ptr<AsyncTaskGroup> task_group; 0394 std::string_view name; 0395 }; 0396 struct SubmitTask : public Task { 0397 explicit SubmitTask(std::unique_ptr<State> state_holder) 0398 : state_holder(std::move(state_holder)) {} 0399 0400 struct SubmitTaskCallback { 0401 SubmitTaskCallback(std::unique_ptr<State> state_holder, Future<> task_completion) 0402 : state_holder(std::move(state_holder)), 0403 task_completion(std::move(task_completion)) {} 0404 void operator()(const Result<T>& maybe_item) { 0405 if (!maybe_item.ok()) { 0406 task_completion.MarkFinished(maybe_item.status()); 0407 return; 0408 } 0409 const auto& item = *maybe_item; 0410 if (IsIterationEnd(item)) { 0411 task_completion.MarkFinished(); 0412 return; 0413 } 0414 Status visit_st = state_holder->visitor(item); 0415 if (!visit_st.ok()) { 0416 task_completion.MarkFinished(std::move(visit_st)); 0417 return; 0418 } 0419 state_holder->task_group->AddTask( 0420 std::make_unique<SubmitTask>(std::move(state_holder))); 0421 task_completion.MarkFinished(); 0422 } 0423 std::unique_ptr<State> state_holder; 0424 Future<> task_completion; 0425 }; 0426 0427 Result<Future<>> operator()() { 0428 Future<> task = Future<>::Make(); 0429 // Consume as many items as we can (those that are already finished) 0430 // synchronously to avoid recursion / stack overflow. 0431 while (true) { 0432 Future<T> next = state_holder->generator(); 0433 if (next.TryAddCallback( 0434 [&] { return SubmitTaskCallback(std::move(state_holder), task); })) { 0435 return task; 0436 } 0437 ARROW_ASSIGN_OR_RAISE(T item, next.result()); 0438 if (IsIterationEnd(item)) { 0439 task.MarkFinished(); 0440 return task; 0441 } 0442 ARROW_RETURN_NOT_OK(state_holder->visitor(item)); 0443 } 0444 } 0445 0446 std::string_view name() const { return state_holder->name; } 0447 0448 std::unique_ptr<State> state_holder; 0449 }; 0450 std::unique_ptr<AsyncTaskGroup> task_group = 0451 AsyncTaskGroup::Make(this, [] { return Status::OK(); }); 0452 AsyncTaskGroup* task_group_view = task_group.get(); 0453 std::unique_ptr<State> state_holder = std::make_unique<State>( 0454 std::move(generator), std::move(visitor), std::move(task_group), name); 0455 task_group_view->AddTask(std::make_unique<SubmitTask>(std::move(state_holder))); 0456 return true; 0457 } 0458 0459 } // namespace util 0460 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |