Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-08-28 08:26:53

0001 // Licensed to the Apache Software Foundation (ASF) under one
0002 // or more contributor license agreements.  See the NOTICE file
0003 // distributed with this work for additional information
0004 // regarding copyright ownership.  The ASF licenses this file
0005 // to you under the Apache License, Version 2.0 (the
0006 // "License"); you may not use this file except in compliance
0007 // with the License.  You may obtain a copy of the License at
0008 //
0009 //   http://www.apache.org/licenses/LICENSE-2.0
0010 //
0011 // Unless required by applicable law or agreed to in writing,
0012 // software distributed under the License is distributed on an
0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
0014 // KIND, either express or implied.  See the License for the
0015 // specific language governing permissions and limitations
0016 // under the License.
0017 
0018 #pragma once
0019 
0020 #include <atomic>
0021 #include <cstdint>
0022 #include <functional>
0023 #include <vector>
0024 
0025 #include "arrow/acero/visibility.h"
0026 #include "arrow/status.h"
0027 #include "arrow/util/config.h"
0028 #include "arrow/util/logging.h"
0029 
0030 namespace arrow {
0031 namespace acero {
0032 
0033 // Atomic value surrounded by padding bytes to avoid cache line invalidation
0034 // whenever it is modified by a concurrent thread on a different CPU core.
0035 //
0036 template <typename T>
0037 class AtomicWithPadding {
0038  private:
0039   static constexpr int kCacheLineSize = 64;
0040   uint8_t padding_before[kCacheLineSize];
0041 
0042  public:
0043   std::atomic<T> value;
0044 
0045  private:
0046   uint8_t padding_after[kCacheLineSize];
0047 };
0048 
0049 // Used for asynchronous execution of operations that can be broken into
0050 // a fixed number of symmetric tasks that can be executed concurrently.
0051 //
0052 // Implements priorities between multiple such operations, called task groups.
0053 //
0054 // Allows to specify the maximum number of in-flight tasks at any moment.
0055 //
0056 // Also allows for executing next pending tasks immediately using a caller thread.
0057 //
0058 class ARROW_ACERO_EXPORT TaskScheduler {
0059  public:
0060   using TaskImpl = std::function<Status(size_t, int64_t)>;
0061   using TaskGroupContinuationImpl = std::function<Status(size_t)>;
0062   using ScheduleImpl = std::function<Status(TaskGroupContinuationImpl)>;
0063   using AbortContinuationImpl = std::function<void()>;
0064 
0065   virtual ~TaskScheduler() = default;
0066 
0067   // Order in which task groups are registered represents priorities of their tasks
0068   // (the first group has the highest priority).
0069   //
0070   // Returns task group identifier that is used to request operations on the task group.
0071   virtual int RegisterTaskGroup(TaskImpl task_impl,
0072                                 TaskGroupContinuationImpl cont_impl) = 0;
0073 
0074   virtual void RegisterEnd() = 0;
0075 
0076   // total_num_tasks may be zero, in which case task group continuation will be executed
0077   // immediately
0078   virtual Status StartTaskGroup(size_t thread_id, int group_id,
0079                                 int64_t total_num_tasks) = 0;
0080 
0081   // Execute given number of tasks immediately using caller thread
0082   virtual Status ExecuteMore(size_t thread_id, int num_tasks_to_execute,
0083                              bool execute_all) = 0;
0084 
0085   // Begin scheduling tasks using provided callback and
0086   // the limit on the number of in-flight tasks at any moment.
0087   //
0088   // Scheduling will continue as long as there are waiting tasks.
0089   //
0090   // It will automatically resume whenever new task group gets started.
0091   virtual Status StartScheduling(size_t thread_id, ScheduleImpl schedule_impl,
0092                                  int num_concurrent_tasks, bool use_sync_execution) = 0;
0093 
0094   // Abort scheduling and execution.
0095   // Used in case of being notified about unrecoverable error for the entire query.
0096   virtual void Abort(AbortContinuationImpl impl) = 0;
0097 
0098   static std::unique_ptr<TaskScheduler> Make();
0099 };
0100 
0101 }  // namespace acero
0102 }  // namespace arrow