![]() |
|
|||
File indexing completed on 2025-08-28 08:26:53
0001 // Licensed to the Apache Software Foundation (ASF) under one 0002 // or more contributor license agreements. See the NOTICE file 0003 // distributed with this work for additional information 0004 // regarding copyright ownership. The ASF licenses this file 0005 // to you under the Apache License, Version 2.0 (the 0006 // "License"); you may not use this file except in compliance 0007 // with the License. You may obtain a copy of the License at 0008 // 0009 // http://www.apache.org/licenses/LICENSE-2.0 0010 // 0011 // Unless required by applicable law or agreed to in writing, 0012 // software distributed under the License is distributed on an 0013 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 0014 // KIND, either express or implied. See the License for the 0015 // specific language governing permissions and limitations 0016 // under the License. 0017 0018 #pragma once 0019 0020 #include <atomic> 0021 #include <cstdint> 0022 #include <functional> 0023 #include <vector> 0024 0025 #include "arrow/acero/visibility.h" 0026 #include "arrow/status.h" 0027 #include "arrow/util/config.h" 0028 #include "arrow/util/logging.h" 0029 0030 namespace arrow { 0031 namespace acero { 0032 0033 // Atomic value surrounded by padding bytes to avoid cache line invalidation 0034 // whenever it is modified by a concurrent thread on a different CPU core. 0035 // 0036 template <typename T> 0037 class AtomicWithPadding { 0038 private: 0039 static constexpr int kCacheLineSize = 64; 0040 uint8_t padding_before[kCacheLineSize]; 0041 0042 public: 0043 std::atomic<T> value; 0044 0045 private: 0046 uint8_t padding_after[kCacheLineSize]; 0047 }; 0048 0049 // Used for asynchronous execution of operations that can be broken into 0050 // a fixed number of symmetric tasks that can be executed concurrently. 0051 // 0052 // Implements priorities between multiple such operations, called task groups. 0053 // 0054 // Allows to specify the maximum number of in-flight tasks at any moment. 0055 // 0056 // Also allows for executing next pending tasks immediately using a caller thread. 0057 // 0058 class ARROW_ACERO_EXPORT TaskScheduler { 0059 public: 0060 using TaskImpl = std::function<Status(size_t, int64_t)>; 0061 using TaskGroupContinuationImpl = std::function<Status(size_t)>; 0062 using ScheduleImpl = std::function<Status(TaskGroupContinuationImpl)>; 0063 using AbortContinuationImpl = std::function<void()>; 0064 0065 virtual ~TaskScheduler() = default; 0066 0067 // Order in which task groups are registered represents priorities of their tasks 0068 // (the first group has the highest priority). 0069 // 0070 // Returns task group identifier that is used to request operations on the task group. 0071 virtual int RegisterTaskGroup(TaskImpl task_impl, 0072 TaskGroupContinuationImpl cont_impl) = 0; 0073 0074 virtual void RegisterEnd() = 0; 0075 0076 // total_num_tasks may be zero, in which case task group continuation will be executed 0077 // immediately 0078 virtual Status StartTaskGroup(size_t thread_id, int group_id, 0079 int64_t total_num_tasks) = 0; 0080 0081 // Execute given number of tasks immediately using caller thread 0082 virtual Status ExecuteMore(size_t thread_id, int num_tasks_to_execute, 0083 bool execute_all) = 0; 0084 0085 // Begin scheduling tasks using provided callback and 0086 // the limit on the number of in-flight tasks at any moment. 0087 // 0088 // Scheduling will continue as long as there are waiting tasks. 0089 // 0090 // It will automatically resume whenever new task group gets started. 0091 virtual Status StartScheduling(size_t thread_id, ScheduleImpl schedule_impl, 0092 int num_concurrent_tasks, bool use_sync_execution) = 0; 0093 0094 // Abort scheduling and execution. 0095 // Used in case of being notified about unrecoverable error for the entire query. 0096 virtual void Abort(AbortContinuationImpl impl) = 0; 0097 0098 static std::unique_ptr<TaskScheduler> Make(); 0099 }; 0100 0101 } // namespace acero 0102 } // namespace arrow
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
![]() ![]() |