|
|
|||
File indexing completed on 2026-05-10 08:44:34
0001 //===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===// 0002 // 0003 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 0004 // See https://llvm.org/LICENSE.txt for license information. 0005 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 0006 // 0007 //===----------------------------------------------------------------------===// 0008 // 0009 // This file defines a crude C++11 based thread pool. 0010 // 0011 //===----------------------------------------------------------------------===// 0012 0013 #ifndef LLVM_SUPPORT_THREADPOOL_H 0014 #define LLVM_SUPPORT_THREADPOOL_H 0015 0016 #include "llvm/ADT/DenseMap.h" 0017 #include "llvm/Config/llvm-config.h" 0018 #include "llvm/Support/RWMutex.h" 0019 #include "llvm/Support/Threading.h" 0020 #include "llvm/Support/thread.h" 0021 0022 #include <future> 0023 0024 #include <condition_variable> 0025 #include <deque> 0026 #include <functional> 0027 #include <memory> 0028 #include <mutex> 0029 #include <utility> 0030 0031 namespace llvm { 0032 0033 class ThreadPoolTaskGroup; 0034 0035 /// This defines the abstract base interface for a ThreadPool allowing 0036 /// asynchronous parallel execution on a defined number of threads. 0037 /// 0038 /// It is possible to reuse one thread pool for different groups of tasks 0039 /// by grouping tasks using ThreadPoolTaskGroup. All tasks are processed using 0040 /// the same queue, but it is possible to wait only for a specific group of 0041 /// tasks to finish. 0042 /// 0043 /// It is also possible for worker threads to submit new tasks and wait for 0044 /// them. Note that this may result in a deadlock in cases such as when a task 0045 /// (directly or indirectly) tries to wait for its own completion, or when all 0046 /// available threads are used up by tasks waiting for a task that has no thread 0047 /// left to run on (this includes waiting on the returned future). It should be 0048 /// generally safe to wait() for a group as long as groups do not form a cycle. 0049 class ThreadPoolInterface { 0050 /// The actual method to enqueue a task to be defined by the concrete 0051 /// implementation. 0052 virtual void asyncEnqueue(std::function<void()> Task, 0053 ThreadPoolTaskGroup *Group) = 0; 0054 0055 public: 0056 /// Destroying the pool will drain the pending tasks and wait. The current 0057 /// thread may participate in the execution of the pending tasks. 0058 virtual ~ThreadPoolInterface(); 0059 0060 /// Blocking wait for all the threads to complete and the queue to be empty. 0061 /// It is an error to try to add new tasks while blocking on this call. 0062 /// Calling wait() from a task would deadlock waiting for itself. 0063 virtual void wait() = 0; 0064 0065 /// Blocking wait for only all the threads in the given group to complete. 0066 /// It is possible to wait even inside a task, but waiting (directly or 0067 /// indirectly) on itself will deadlock. If called from a task running on a 0068 /// worker thread, the call may process pending tasks while waiting in order 0069 /// not to waste the thread. 0070 virtual void wait(ThreadPoolTaskGroup &Group) = 0; 0071 0072 /// Returns the maximum number of worker this pool can eventually grow to. 0073 virtual unsigned getMaxConcurrency() const = 0; 0074 0075 /// Asynchronous submission of a task to the pool. The returned future can be 0076 /// used to wait for the task to finish and is *non-blocking* on destruction. 0077 template <typename Function, typename... Args> 0078 auto async(Function &&F, Args &&...ArgList) { 0079 auto Task = 0080 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...); 0081 return async(std::move(Task)); 0082 } 0083 0084 /// Overload, task will be in the given task group. 0085 template <typename Function, typename... Args> 0086 auto async(ThreadPoolTaskGroup &Group, Function &&F, Args &&...ArgList) { 0087 auto Task = 0088 std::bind(std::forward<Function>(F), std::forward<Args>(ArgList)...); 0089 return async(Group, std::move(Task)); 0090 } 0091 0092 /// Asynchronous submission of a task to the pool. The returned future can be 0093 /// used to wait for the task to finish and is *non-blocking* on destruction. 0094 template <typename Func> 0095 auto async(Func &&F) -> std::shared_future<decltype(F())> { 0096 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)), 0097 nullptr); 0098 } 0099 0100 template <typename Func> 0101 auto async(ThreadPoolTaskGroup &Group, Func &&F) 0102 -> std::shared_future<decltype(F())> { 0103 return asyncImpl(std::function<decltype(F())()>(std::forward<Func>(F)), 0104 &Group); 0105 } 0106 0107 private: 0108 /// Asynchronous submission of a task to the pool. The returned future can be 0109 /// used to wait for the task to finish and is *non-blocking* on destruction. 0110 template <typename ResTy> 0111 std::shared_future<ResTy> asyncImpl(std::function<ResTy()> Task, 0112 ThreadPoolTaskGroup *Group) { 0113 auto Future = std::async(std::launch::deferred, std::move(Task)).share(); 0114 asyncEnqueue([Future]() { Future.wait(); }, Group); 0115 return Future; 0116 } 0117 }; 0118 0119 #if LLVM_ENABLE_THREADS 0120 /// A ThreadPool implementation using std::threads. 0121 /// 0122 /// The pool keeps a vector of threads alive, waiting on a condition variable 0123 /// for some work to become available. 0124 class StdThreadPool : public ThreadPoolInterface { 0125 public: 0126 /// Construct a pool using the hardware strategy \p S for mapping hardware 0127 /// execution resources (threads, cores, CPUs) 0128 /// Defaults to using the maximum execution resources in the system, but 0129 /// accounting for the affinity mask. 0130 StdThreadPool(ThreadPoolStrategy S = hardware_concurrency()); 0131 0132 /// Blocking destructor: the pool will wait for all the threads to complete. 0133 ~StdThreadPool() override; 0134 0135 /// Blocking wait for all the threads to complete and the queue to be empty. 0136 /// It is an error to try to add new tasks while blocking on this call. 0137 /// Calling wait() from a task would deadlock waiting for itself. 0138 void wait() override; 0139 0140 /// Blocking wait for only all the threads in the given group to complete. 0141 /// It is possible to wait even inside a task, but waiting (directly or 0142 /// indirectly) on itself will deadlock. If called from a task running on a 0143 /// worker thread, the call may process pending tasks while waiting in order 0144 /// not to waste the thread. 0145 void wait(ThreadPoolTaskGroup &Group) override; 0146 0147 /// Returns the maximum number of worker threads in the pool, not the current 0148 /// number of threads! 0149 unsigned getMaxConcurrency() const override { return MaxThreadCount; } 0150 0151 // TODO: Remove, misleading legacy name warning! 0152 LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency") 0153 unsigned getThreadCount() const { return MaxThreadCount; } 0154 0155 /// Returns true if the current thread is a worker thread of this thread pool. 0156 bool isWorkerThread() const; 0157 0158 private: 0159 /// Returns true if all tasks in the given group have finished (nullptr means 0160 /// all tasks regardless of their group). QueueLock must be locked. 0161 bool workCompletedUnlocked(ThreadPoolTaskGroup *Group) const; 0162 0163 /// Asynchronous submission of a task to the pool. The returned future can be 0164 /// used to wait for the task to finish and is *non-blocking* on destruction. 0165 void asyncEnqueue(std::function<void()> Task, 0166 ThreadPoolTaskGroup *Group) override { 0167 int requestedThreads; 0168 { 0169 // Lock the queue and push the new task 0170 std::unique_lock<std::mutex> LockGuard(QueueLock); 0171 0172 // Don't allow enqueueing after disabling the pool 0173 assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); 0174 Tasks.emplace_back(std::make_pair(std::move(Task), Group)); 0175 requestedThreads = ActiveThreads + Tasks.size(); 0176 } 0177 QueueCondition.notify_one(); 0178 grow(requestedThreads); 0179 } 0180 0181 /// Grow to ensure that we have at least `requested` Threads, but do not go 0182 /// over MaxThreadCount. 0183 void grow(int requested); 0184 0185 void processTasks(ThreadPoolTaskGroup *WaitingForGroup); 0186 0187 /// Threads in flight 0188 std::vector<llvm::thread> Threads; 0189 /// Lock protecting access to the Threads vector. 0190 mutable llvm::sys::RWMutex ThreadsLock; 0191 0192 /// Tasks waiting for execution in the pool. 0193 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks; 0194 0195 /// Locking and signaling for accessing the Tasks queue. 0196 std::mutex QueueLock; 0197 std::condition_variable QueueCondition; 0198 0199 /// Signaling for job completion (all tasks or all tasks in a group). 0200 std::condition_variable CompletionCondition; 0201 0202 /// Keep track of the number of thread actually busy 0203 unsigned ActiveThreads = 0; 0204 /// Number of threads active for tasks in the given group (only non-zero). 0205 DenseMap<ThreadPoolTaskGroup *, unsigned> ActiveGroups; 0206 0207 /// Signal for the destruction of the pool, asking thread to exit. 0208 bool EnableFlag = true; 0209 0210 const ThreadPoolStrategy Strategy; 0211 0212 /// Maximum number of threads to potentially grow this pool to. 0213 const unsigned MaxThreadCount; 0214 }; 0215 #endif // LLVM_ENABLE_THREADS 0216 0217 /// A non-threaded implementation. 0218 class SingleThreadExecutor : public ThreadPoolInterface { 0219 public: 0220 /// Construct a non-threaded pool, ignoring using the hardware strategy. 0221 SingleThreadExecutor(ThreadPoolStrategy ignored = {}); 0222 0223 /// Blocking destructor: the pool will first execute the pending tasks. 0224 ~SingleThreadExecutor() override; 0225 0226 /// Blocking wait for all the tasks to execute first 0227 void wait() override; 0228 0229 /// Blocking wait for only all the tasks in the given group to complete. 0230 void wait(ThreadPoolTaskGroup &Group) override; 0231 0232 /// Returns always 1: there is no concurrency. 0233 unsigned getMaxConcurrency() const override { return 1; } 0234 0235 // TODO: Remove, misleading legacy name warning! 0236 LLVM_DEPRECATED("Use getMaxConcurrency instead", "getMaxConcurrency") 0237 unsigned getThreadCount() const { return 1; } 0238 0239 /// Returns true if the current thread is a worker thread of this thread pool. 0240 bool isWorkerThread() const; 0241 0242 private: 0243 /// Asynchronous submission of a task to the pool. The returned future can be 0244 /// used to wait for the task to finish and is *non-blocking* on destruction. 0245 void asyncEnqueue(std::function<void()> Task, 0246 ThreadPoolTaskGroup *Group) override { 0247 Tasks.emplace_back(std::make_pair(std::move(Task), Group)); 0248 } 0249 0250 /// Tasks waiting for execution in the pool. 0251 std::deque<std::pair<std::function<void()>, ThreadPoolTaskGroup *>> Tasks; 0252 }; 0253 0254 #if LLVM_ENABLE_THREADS 0255 using DefaultThreadPool = StdThreadPool; 0256 #else 0257 using DefaultThreadPool = SingleThreadExecutor; 0258 #endif 0259 0260 /// A group of tasks to be run on a thread pool. Thread pool tasks in different 0261 /// groups can run on the same threadpool but can be waited for separately. 0262 /// It is even possible for tasks of one group to submit and wait for tasks 0263 /// of another group, as long as this does not form a loop. 0264 class ThreadPoolTaskGroup { 0265 public: 0266 /// The ThreadPool argument is the thread pool to forward calls to. 0267 ThreadPoolTaskGroup(ThreadPoolInterface &Pool) : Pool(Pool) {} 0268 0269 /// Blocking destructor: will wait for all the tasks in the group to complete 0270 /// by calling ThreadPool::wait(). 0271 ~ThreadPoolTaskGroup() { wait(); } 0272 0273 /// Calls ThreadPool::async() for this group. 0274 template <typename Function, typename... Args> 0275 inline auto async(Function &&F, Args &&...ArgList) { 0276 return Pool.async(*this, std::forward<Function>(F), 0277 std::forward<Args>(ArgList)...); 0278 } 0279 0280 /// Calls ThreadPool::wait() for this group. 0281 void wait() { Pool.wait(*this); } 0282 0283 private: 0284 ThreadPoolInterface &Pool; 0285 }; 0286 0287 } // namespace llvm 0288 0289 #endif // LLVM_SUPPORT_THREADPOOL_H
| [ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
|
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
|