Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:27:02

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <atomic>
0021 #include <functional>
0022 #include <list>
0023 #include <memory>
0024 
0025 #include "arrow/result.h"
0026 #include "arrow/status.h"
0027 #include "arrow/util/cancel.h"
0028 #include "arrow/util/functional.h"
0029 #include "arrow/util/future.h"
0030 #include "arrow/util/iterator.h"
0031 #include "arrow/util/mutex.h"
0032 #include "arrow/util/thread_pool.h"
0033 #include "arrow/util/tracing.h"
0034 
0035 namespace arrow {
0036 
0037 using internal::FnOnce;
0038 
0039 namespace util {
0040 
0041 /// A utility which keeps tracks of, and schedules, asynchronous tasks
0042 ///
0043 /// An asynchronous task has a synchronous component and an asynchronous component.
0044 /// The synchronous component typically schedules some kind of work on an external
0045 /// resource (e.g. the I/O thread pool or some kind of kernel-based asynchronous
0046 /// resource like io_uring).  The asynchronous part represents the work
0047 /// done on that external resource.  Executing the synchronous part will be referred
0048 /// to as "submitting the task" since this usually includes submitting the asynchronous
0049 /// portion to the external thread pool.
0050 ///
0051 /// By default the scheduler will submit the task (execute the synchronous part) as
0052 /// soon as it is added, assuming the underlying thread pool hasn't terminated or the
0053 /// scheduler hasn't aborted.  In this mode, the scheduler is simply acting as
0054 /// a simple task group.
0055 ///
0056 /// A task scheduler starts with an initial task.  That task, and all subsequent tasks
0057 /// are free to add subtasks.  Once all submitted tasks finish the scheduler will
0058 /// finish.  Note, it is not an error to add additional tasks after a scheduler has
0059 /// aborted. These tasks will be ignored and never submitted.  The scheduler returns a
0060 /// future which will complete when all submitted tasks have finished executing.  Once all
0061 /// tasks have been finished the scheduler is invalid and should no longer be used.
0062 ///
0063 /// Task failure (either the synchronous portion or the asynchronous portion) will cause
0064 /// the scheduler to enter an aborted state.  The first such failure will be reported in
0065 /// the final task future.
0066 class ARROW_EXPORT AsyncTaskScheduler {
0067  public:
0068   /// Destructor for AsyncTaskScheduler
0069   ///
0070   /// The lifetime of the task scheduled is managed automatically.  The scheduler
0071   /// will remain valid while any tasks are running (and can always be safely accessed)
0072   /// within tasks) and will be destroyed as soon as all tasks have finished.
0073   virtual ~AsyncTaskScheduler() = default;
0074   /// An interface for a task
0075   ///
0076   /// Users may want to override this, for example, to add priority
0077   /// information for use by a queue.
0078   class Task {
0079    public:
0080     virtual ~Task() = default;
0081     /// Submit the task
0082     ///
0083     /// This will be called by the scheduler at most once when there
0084     /// is space to run the task.  This is expected to be a fairly quick
0085     /// function that simply submits the actual task work to an external
0086     /// resource (e.g. I/O thread pool).
0087     ///
0088     /// If this call fails then the scheduler will enter an aborted state.
0089     virtual Result<Future<>> operator()() = 0;
0090     /// The cost of the task
0091     ///
0092     /// A ThrottledAsyncTaskScheduler can be used to limit the number of concurrent tasks.
0093     /// A custom cost may be used, for example, if you would like to limit the number of
0094     /// tasks based on the total expected RAM usage of the tasks (this is done in the
0095     /// scanner)
0096     virtual int cost() const { return 1; }
0097     /// The name of the task
0098     ///
0099     /// This is used for debugging and traceability.  The returned view must remain
0100     /// valid for the lifetime of the task.
0101     virtual std::string_view name() const = 0;
0102 
0103     /// a span tied to the lifetime of the task, for internal use only
0104     tracing::Span span;
0105   };
0106 
0107   /// Add a task to the scheduler
0108   ///
0109   /// If the scheduler is in an aborted state this call will return false and the task
0110   /// will never be run.  This is harmless and does not need to be guarded against.
0111   ///
0112   /// The return value for this call can usually be ignored.  There is little harm in
0113   /// attempting to add tasks to an aborted scheduler.  It is only included for callers
0114   /// that want to avoid future task generation to save effort.
0115   ///
0116   /// \param task the task to submit
0117   ///
0118   /// A task's name must remain valid for the duration of the task.  It is used for
0119   /// debugging (e.g. when debugging a deadlock to see which tasks still remain) and for
0120   /// traceability (the name will be used for spans assigned to the task)
0121   ///
0122   /// \return true if the task was submitted or queued, false if the task was ignored
0123   virtual bool AddTask(std::unique_ptr<Task> task) = 0;
0124 
0125   /// Adds an async generator to the scheduler
0126   ///
0127   /// The async generator will be visited, one item at a time.  Submitting a task
0128   /// will consist of polling the generator for the next future.  The generator's future
0129   /// will then represent the task itself.
0130   ///
0131   /// This visits the task serially without readahead.  If readahead or parallelism
0132   /// is desired then it should be added in the generator itself.
0133   ///
0134   /// The generator itself will be kept alive until all tasks have been completed.
0135   /// However, if the scheduler is aborted, the generator will be destroyed as soon as the
0136   /// next item would be requested.
0137   ///
0138   /// \param generator the generator to submit to the scheduler
0139   /// \param visitor a function which visits each generator future as it completes
0140   /// \param name a name which will be used for each submitted task
0141   template <typename T>
0142   bool AddAsyncGenerator(std::function<Future<T>()> generator,
0143                          std::function<Status(const T&)> visitor, std::string_view name);
0144 
0145   template <typename Callable>
0146   struct SimpleTask : public Task {
0147     SimpleTask(Callable callable, std::string_view name)
0148         : callable(std::move(callable)), name_(name) {}
0149     SimpleTask(Callable callable, std::string name)
0150         : callable(std::move(callable)), owned_name_(std::move(name)) {
0151       name_ = *owned_name_;
0152     }
0153     Result<Future<>> operator()() override { return callable(); }
0154     std::string_view name() const override { return name_; }
0155     Callable callable;
0156     std::string_view name_;
0157     std::optional<std::string> owned_name_;
0158   };
0159 
0160   /// Add a task with cost 1 to the scheduler
0161   ///
0162   /// \param callable a "submit" function that should return a future
0163   /// \param name a name for the task
0164   ///
0165   /// `name` must remain valid until the task has been submitted AND the returned
0166   /// future completes.  It is used for debugging and tracing.
0167   ///
0168   /// \see AddTask for more details
0169   template <typename Callable>
0170   bool AddSimpleTask(Callable callable, std::string_view name) {
0171     return AddTask(std::make_unique<SimpleTask<Callable>>(std::move(callable), name));
0172   }
0173 
0174   /// Add a task with cost 1 to the scheduler
0175   ///
0176   /// This is an overload of \see AddSimpleTask that keeps `name` alive
0177   /// in the task.
0178   template <typename Callable>
0179   bool AddSimpleTask(Callable callable, std::string name) {
0180     return AddTask(
0181         std::make_unique<SimpleTask<Callable>>(std::move(callable), std::move(name)));
0182   }
0183 
0184   /// Construct a scheduler
0185   ///
0186   /// \param initial_task The initial task which is responsible for adding
0187   ///        the first subtasks to the scheduler.
0188   /// \param abort_callback A callback that will be triggered immediately after a task
0189   ///        fails while other tasks may still be running.  Nothing needs to be done here,
0190   ///        when a task fails the scheduler will stop accepting new tasks and eventually
0191   ///        return the error.  However, this callback can be used to more quickly end
0192   ///        long running tasks that have already been submitted.  Defaults to doing
0193   ///        nothing.
0194   /// \param stop_token An optional stop token that will allow cancellation of the
0195   ///        scheduler.  This will be checked before each task is submitted and, in the
0196   ///        event of a cancellation, the scheduler will enter an aborted state. This is
0197   ///        a graceful cancellation and submitted tasks will still complete.
0198   /// \return A future that will be completed when the initial task and all subtasks have
0199   ///         finished.
0200   static Future<> Make(
0201       FnOnce<Status(AsyncTaskScheduler*)> initial_task,
0202       FnOnce<void(const Status&)> abort_callback = [](const Status&) {},
0203       StopToken stop_token = StopToken::Unstoppable());
0204 
0205   /// A span tracking execution of the scheduler's tasks, for internal use only
0206   virtual const tracing::Span& span() const = 0;
0207 };
0208 
0209 class ARROW_EXPORT ThrottledAsyncTaskScheduler : public AsyncTaskScheduler {
0210  public:
0211   /// An interface for a task queue
0212   ///
0213   /// A queue's methods will not be called concurrently
0214   class Queue {
0215    public:
0216     virtual ~Queue() = default;
0217     /// Push a task to the queue
0218     ///
0219     /// \param task the task to enqueue
0220     virtual void Push(std::unique_ptr<Task> task) = 0;
0221     /// Pop the next task from the queue
0222     virtual std::unique_ptr<Task> Pop() = 0;
0223     /// Peek the next task in the queue
0224     virtual const Task& Peek() = 0;
0225     /// Check if the queue is empty
0226     virtual bool Empty() = 0;
0227     /// Purge the queue of all items
0228     virtual void Purge() = 0;
0229     virtual std::size_t Size() const = 0;
0230   };
0231 
0232   class Throttle {
0233    public:
0234     virtual ~Throttle() = default;
0235     /// Acquire amt permits
0236     ///
0237     /// If nullopt is returned then the permits were immediately
0238     /// acquired and the caller can proceed.  If a future is returned then the caller
0239     /// should wait for the future to complete first.  When the returned future completes
0240     /// the permits have NOT been acquired and the caller must call Acquire again
0241     ///
0242     /// \param amt the number of permits to acquire
0243     virtual std::optional<Future<>> TryAcquire(int amt) = 0;
0244     /// Release amt permits
0245     ///
0246     /// This will possibly complete waiting futures and should probably not be
0247     /// called while holding locks.
0248     ///
0249     /// \param amt the number of permits to release
0250     virtual void Release(int amt) = 0;
0251 
0252     /// The size of the largest task that can run
0253     ///
0254     /// Incoming tasks will have their cost latched to this value to ensure
0255     /// they can still run (although they will be the only thing allowed to
0256     /// run at that time).
0257     virtual int Capacity() = 0;
0258 
0259     /// Pause the throttle
0260     ///
0261     /// Any tasks that have been submitted already will continue.  However, no new tasks
0262     /// will be run until the throttle is resumed.
0263     virtual void Pause() = 0;
0264     /// Resume the throttle
0265     ///
0266     /// Allows task to be submitted again.  If there is a max_concurrent_cost limit then
0267     /// it will still apply.
0268     virtual void Resume() = 0;
0269   };
0270 
0271   /// Pause the throttle
0272   ///
0273   /// Any tasks that have been submitted already will continue.  However, no new tasks
0274   /// will be run until the throttle is resumed.
0275   virtual void Pause() = 0;
0276   /// Resume the throttle
0277   ///
0278   /// Allows task to be submitted again.  If there is a max_concurrent_cost limit then
0279   /// it will still apply.
0280   virtual void Resume() = 0;
0281   /// Return the number of tasks queued but not yet submitted
0282   virtual std::size_t QueueSize() = 0;
0283 
0284   /// Create a throttled view of a scheduler
0285   ///
0286   /// Tasks added via this view will be subjected to the throttle and, if the tasks cannot
0287   /// run immediately, will be placed into a queue.
0288   ///
0289   /// Although a shared_ptr is returned it should generally be assumed that the caller
0290   /// is being given exclusive ownership.  The shared_ptr is used to share the view with
0291   /// queued and submitted tasks and the lifetime of those is unpredictable.  It is
0292   /// important the caller keep the returned pointer alive for as long as they plan to add
0293   /// tasks to the view.
0294   ///
0295   /// \param scheduler a scheduler to submit tasks to after throttling
0296   ///
0297   /// This can be the root scheduler, another throttled scheduler, or a task group.  These
0298   /// are all composable.
0299   ///
0300   /// \param max_concurrent_cost the maximum amount of cost allowed to run at any one time
0301   ///
0302   /// If a task is added that has a cost greater than max_concurrent_cost then its cost
0303   /// will be reduced to max_concurrent_cost so that it is still possible for the task to
0304   /// run.
0305   ///
0306   /// \param queue the queue to use when tasks cannot be submitted
0307   ///
0308   /// By default a FIFO queue will be used.  However, a custom queue can be provided if
0309   /// some tasks have higher priority than other tasks.
0310   static std::shared_ptr<ThrottledAsyncTaskScheduler> Make(
0311       AsyncTaskScheduler* scheduler, int max_concurrent_cost,
0312       std::unique_ptr<Queue> queue = NULLPTR);
0313 
0314   /// @brief Create a ThrottledAsyncTaskScheduler using a custom throttle
0315   ///
0316   /// \see Make
0317   static std::shared_ptr<ThrottledAsyncTaskScheduler> MakeWithCustomThrottle(
0318       AsyncTaskScheduler* scheduler, std::unique_ptr<Throttle> throttle,
0319       std::unique_ptr<Queue> queue = NULLPTR);
0320 };
0321 
0322 /// A utility to keep track of a collection of tasks
0323 ///
0324 /// Often it is useful to keep track of some state that only needs to stay alive
0325 /// for some small collection of tasks, or to perform some kind of final cleanup
0326 /// when a collection of tasks is finished.
0327 ///
0328 /// For example, when scanning, we need to keep the file reader alive while all scan
0329 /// tasks run for a given file, and then we can gracefully close it when we finish the
0330 /// file.
0331 class ARROW_EXPORT AsyncTaskGroup : public AsyncTaskScheduler {
0332  public:
0333   /// Destructor for the task group
0334   ///
0335   /// The destructor might trigger the finish callback.  If the finish callback fails
0336   /// then the error will be reported as a task on the scheduler.
0337   ///
0338   /// Failure to destroy the async task group will not prevent the scheduler from
0339   /// finishing.  If the scheduler finishes before the async task group is done then
0340   /// the finish callback will be run immediately when the async task group finishes.
0341   ///
0342   /// If the scheduler has aborted then the finish callback will not run.
0343   ~AsyncTaskGroup() = default;
0344   /// Create an async task group
0345   ///
0346   /// The finish callback will not run until the task group is destroyed and all
0347   /// tasks are finished so you will generally want to reset / destroy the returned
0348   /// unique_ptr at some point.
0349   ///
0350   /// \param scheduler The underlying scheduler to submit tasks to
0351   /// \param finish_callback A callback that will be run only after the task group has
0352   ///                        been destroyed and all tasks added by the group have
0353   ///                        finished.
0354   ///
0355   /// Note: in error scenarios the finish callback may not run.  However, it will still,
0356   /// of course, be destroyed.
0357   static std::unique_ptr<AsyncTaskGroup> Make(AsyncTaskScheduler* scheduler,
0358                                               FnOnce<Status()> finish_callback);
0359 };
0360 
0361 /// Create a task group that is also throttled
0362 ///
0363 /// This is a utility factory that creates a throttled view of a scheduler and then
0364 /// wraps that throttled view with a task group that destroys the throttle when finished.
0365 ///
0366 /// \see ThrottledAsyncTaskScheduler
0367 /// \see AsyncTaskGroup
0368 /// \param target the underlying scheduler to submit tasks to
0369 /// \param max_concurrent_cost the maximum amount of cost allowed to run at any one time
0370 /// \param queue the queue to use when tasks cannot be submitted
0371 /// \param finish_callback A callback that will be run only after the task group has
0372 ///                  been destroyed and all tasks added by the group have finished
0373 ARROW_EXPORT std::unique_ptr<ThrottledAsyncTaskScheduler> MakeThrottledAsyncTaskGroup(
0374     AsyncTaskScheduler* target, int max_concurrent_cost,
0375     std::unique_ptr<ThrottledAsyncTaskScheduler::Queue> queue,
0376     FnOnce<Status()> finish_callback);
0377 
0378 // Defined down here to avoid circular dependency between AsyncTaskScheduler and
0379 // AsyncTaskGroup
0380 template <typename T>
0381 bool AsyncTaskScheduler::AddAsyncGenerator(std::function<Future<T>()> generator,
0382                                            std::function<Status(const T&)> visitor,
0383                                            std::string_view name) {
0384   struct State {
0385     State(std::function<Future<T>()> generator, std::function<Status(const T&)> visitor,
0386           std::unique_ptr<AsyncTaskGroup> task_group, std::string_view name)
0387         : generator(std::move(generator)),
0388           visitor(std::move(visitor)),
0389           task_group(std::move(task_group)),
0390           name(name) {}
0391     std::function<Future<T>()> generator;
0392     std::function<Status(const T&)> visitor;
0393     std::unique_ptr<AsyncTaskGroup> task_group;
0394     std::string_view name;
0395   };
0396   struct SubmitTask : public Task {
0397     explicit SubmitTask(std::unique_ptr<State> state_holder)
0398         : state_holder(std::move(state_holder)) {}
0399 
0400     struct SubmitTaskCallback {
0401       SubmitTaskCallback(std::unique_ptr<State> state_holder, Future<> task_completion)
0402           : state_holder(std::move(state_holder)),
0403             task_completion(std::move(task_completion)) {}
0404       void operator()(const Result<T>& maybe_item) {
0405         if (!maybe_item.ok()) {
0406           task_completion.MarkFinished(maybe_item.status());
0407           return;
0408         }
0409         const auto& item = *maybe_item;
0410         if (IsIterationEnd(item)) {
0411           task_completion.MarkFinished();
0412           return;
0413         }
0414         Status visit_st = state_holder->visitor(item);
0415         if (!visit_st.ok()) {
0416           task_completion.MarkFinished(std::move(visit_st));
0417           return;
0418         }
0419         state_holder->task_group->AddTask(
0420             std::make_unique<SubmitTask>(std::move(state_holder)));
0421         task_completion.MarkFinished();
0422       }
0423       std::unique_ptr<State> state_holder;
0424       Future<> task_completion;
0425     };
0426 
0427     Result<Future<>> operator()() {
0428       Future<> task = Future<>::Make();
0429       // Consume as many items as we can (those that are already finished)
0430       // synchronously to avoid recursion / stack overflow.
0431       while (true) {
0432         Future<T> next = state_holder->generator();
0433         if (next.TryAddCallback(
0434                 [&] { return SubmitTaskCallback(std::move(state_holder), task); })) {
0435           return task;
0436         }
0437         ARROW_ASSIGN_OR_RAISE(T item, next.result());
0438         if (IsIterationEnd(item)) {
0439           task.MarkFinished();
0440           return task;
0441         }
0442         ARROW_RETURN_NOT_OK(state_holder->visitor(item));
0443       }
0444     }
0445 
0446     std::string_view name() const { return state_holder->name; }
0447 
0448     std::unique_ptr<State> state_holder;
0449   };
0450   std::unique_ptr<AsyncTaskGroup> task_group =
0451       AsyncTaskGroup::Make(this, [] { return Status::OK(); });
0452   AsyncTaskGroup* task_group_view = task_group.get();
0453   std::unique_ptr<State> state_holder = std::make_unique<State>(
0454       std::move(generator), std::move(visitor), std::move(task_group), name);
0455   task_group_view->AddTask(std::make_unique<SubmitTask>(std::move(state_holder)));
0456   return true;
0457 }
0458 
0459 }  // namespace util
0460 }  // namespace arrow