|
||||
File indexing completed on 2025-01-18 10:04:37
0001 // Created by: Kirill Gavrilov 0002 // Copyright (c) 2017-2019 OPEN CASCADE SAS 0003 // 0004 // This file is part of Open CASCADE Technology software library. 0005 // 0006 // This library is free software; you can redistribute it and/or modify it under 0007 // the terms of the GNU Lesser General Public License version 2.1 as published 0008 // by the Free Software Foundation, with special exception defined in the file 0009 // OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT 0010 // distribution for complete text of the license and disclaimer of any warranty. 0011 // 0012 // Alternatively, this file may be used under the terms of Open CASCADE 0013 // commercial license or contractual agreement. 0014 0015 #ifndef _OSD_ThreadPool_HeaderFile 0016 #define _OSD_ThreadPool_HeaderFile 0017 0018 #include <NCollection_Array1.hxx> 0019 #include <OSD_Thread.hxx> 0020 #include <Standard_Atomic.hxx> 0021 #include <Standard_Condition.hxx> 0022 #include <Standard_Mutex.hxx> 0023 0024 //! Class defining a thread pool for executing algorithms in multi-threaded mode. 0025 //! Thread pool allocates requested amount of threads and keep them alive 0026 //! (in sleep mode when unused) during thread pool lifetime. 0027 //! The same pool can be used by multiple consumers, 0028 //! including nested multi-threading algorithms and concurrent threads: 0029 //! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher. 0030 //! The functor performing a job takes two parameters - Thread Index and Data Index: 0031 //! void operator(int theThreadIndex, int theDataIndex){} 0032 //! Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form, 0033 //! since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper(). 0034 //! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case, 0035 //! but application may prefer creating a dedicated pool for better control. 0036 //! - Default thread pool allocates the amount of threads considering concurrency 0037 //! level of the system (amount of logical processors). 0038 //! This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init() 0039 //! (the pool should not be used!). 0040 //! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job. 0041 //! Normally, single Launcher instance will occupy all threads available in thread pool, 0042 //! so that nested multi-threaded algorithms (within the same thread) 0043 //! and concurrent threads trying to use the same thread pool will run sequentially. 0044 //! This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter 0045 //! and Launcher constructor, so that single Launcher instance will occupy not all threads 0046 //! in the pool allowing other threads to be used concurrently. 0047 //! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way. 0048 //! - Each working thread catches exceptions occurred during job execution, and Launcher will 0049 //! throw Standard_Failure in a caller thread on completed execution. 0050 class OSD_ThreadPool : public Standard_Transient 0051 { 0052 DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient) 0053 public: 0054 0055 //! Return (or create) a default thread pool. 0056 //! Number of threads argument will be considered only when called first time. 0057 Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1); 0058 0059 public: 0060 0061 //! Main constructor. 0062 //! Application may consider specifying more threads than actually 0063 //! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value 0064 //! so that concurrent threads will be able using single Thread Pool instance more efficiently. 0065 //! @param theNbThreads threads number to be created by pool 0066 //! (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used) 0067 Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1); 0068 0069 //! Destructor. 0070 Standard_EXPORT virtual ~OSD_ThreadPool(); 0071 0072 //! Return TRUE if at least 2 threads are available (including self-thread). 0073 bool HasThreads() const { return NbThreads() >= 2; } 0074 0075 //! Return the lower thread index. 0076 int LowerThreadIndex() const { return 0; } 0077 0078 //! Return the upper thread index (last index is reserved for self-thread). 0079 int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); } 0080 0081 //! Return the number of threads; >= 1. 0082 int NbThreads() const { return myThreads.Size() + 1; } 0083 0084 //! Return maximum number of threads to be locked by a single Launcher object by default; 0085 //! the entire thread pool size is returned by default. 0086 int NbDefaultThreadsToLaunch() const { return myNbDefThreads; } 0087 0088 //! Set maximum number of threads to be locked by a single Launcher object by default. 0089 //! Should be set BEFORE first usage. 0090 void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; } 0091 0092 //! Checks if thread pools has active consumers. 0093 Standard_EXPORT bool IsInUse(); 0094 0095 //! Reinitialize the thread pool with a different number of threads. 0096 //! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown! 0097 Standard_EXPORT void Init (int theNbThreads); 0098 0099 protected: 0100 0101 //! Thread function interface. 0102 class JobInterface 0103 { 0104 public: 0105 virtual void Perform (int theThreadIndex) = 0; 0106 }; 0107 0108 //! Thread with back reference to thread pool and thread index in it. 0109 class EnumeratedThread : public OSD_Thread 0110 { 0111 friend class OSD_ThreadPool; 0112 public: 0113 //! Main constructor. 0114 EnumeratedThread (bool theIsSelfThread = false) 0115 : myPool (NULL), myJob (NULL), myWakeEvent (false), 0116 myIdleEvent (false), myThreadIndex (0), myUsageCounter(0), 0117 myIsStarted (false), myToCatchFpe (false), 0118 myIsSelfThread (theIsSelfThread) {} 0119 0120 //! Occupy this thread for thread pool launcher. 0121 //! @return TRUE on success, or FALSE if thread has been already occupied 0122 Standard_EXPORT bool Lock(); 0123 0124 //! Release this thread for thread pool launcher; should be called only after successful OccupyThread(). 0125 Standard_EXPORT void Free(); 0126 0127 //! Wake up the thread. 0128 Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe); 0129 0130 //! Wait the thread going into Idle state (finished jobs). 0131 Standard_EXPORT void WaitIdle(); 0132 0133 public: 0134 0135 //! Copy constructor. 0136 EnumeratedThread (const EnumeratedThread& theCopy) 0137 : OSD_Thread(), 0138 myPool (NULL), myJob (NULL), myWakeEvent (false), 0139 myIdleEvent (false), myThreadIndex (0), myUsageCounter(0), 0140 myIsStarted (false), myToCatchFpe (false), 0141 myIsSelfThread (false) { Assign (theCopy); } 0142 0143 //! Assignment operator. 0144 EnumeratedThread& operator= (const EnumeratedThread& theCopy) 0145 { 0146 Assign (theCopy); 0147 return *this; 0148 } 0149 0150 //! Assignment operator. 0151 void Assign (const EnumeratedThread& theCopy) 0152 { 0153 OSD_Thread::Assign (theCopy); 0154 myPool = theCopy.myPool; 0155 myJob = theCopy.myJob; 0156 myThreadIndex = theCopy.myThreadIndex; 0157 myToCatchFpe = theCopy.myToCatchFpe; 0158 myIsSelfThread = theCopy.myIsSelfThread; 0159 } 0160 0161 private: 0162 0163 //! Method is executed in the context of thread. 0164 void performThread(); 0165 0166 //! Method is executed in the context of thread. 0167 static Standard_Address runThread (Standard_Address theTask); 0168 0169 private: 0170 OSD_ThreadPool* myPool; 0171 JobInterface* myJob; 0172 Handle(Standard_Failure) myFailure; 0173 Standard_Condition myWakeEvent; 0174 Standard_Condition myIdleEvent; 0175 int myThreadIndex; 0176 volatile int myUsageCounter; 0177 bool myIsStarted; 0178 bool myToCatchFpe; 0179 bool myIsSelfThread; 0180 }; 0181 0182 public: 0183 0184 //! Launcher object locking a subset of threads (or all threads) 0185 //! in a thread pool to perform parallel execution of the job. 0186 class Launcher 0187 { 0188 public: 0189 //! Lock specified number of threads from the thread pool. 0190 //! If thread pool is already locked by another user, 0191 //! Launcher will lock as many threads as possible 0192 //! (if none will be locked, then single threaded execution will be done). 0193 //! @param thePool thread pool to lock the threads 0194 //! @param theMaxThreads number of threads to lock; 0195 //! -1 specifies that default number of threads 0196 //! to be used OSD_ThreadPool::NbDefaultThreadsToLaunch() 0197 Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1); 0198 0199 //! Release threads. 0200 ~Launcher() { Release(); } 0201 0202 //! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread); 0203 //! otherwise, the functor will be executed within the caller thread. 0204 bool HasThreads() const { return myNbThreads >= 2; } 0205 0206 //! Return amount of locked threads; >= 1. 0207 int NbThreads() const { return myNbThreads; } 0208 0209 //! Return the lower thread index. 0210 int LowerThreadIndex() const { return 0; } 0211 0212 //! Return the upper thread index (last index is reserved for the self-thread). 0213 int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; } 0214 0215 //! Simple primitive for parallelization of "for" loops, e.g.: 0216 //! @code 0217 //! for (int anIter = theBegin; anIter < theEnd; ++anIter) {} 0218 //! @endcode 0219 //! @param theBegin the first data index (inclusive) 0220 //! @param theEnd the last data index (exclusive) 0221 //! @param theFunctor functor providing an interface 0222 //! "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index 0223 template<typename Functor> 0224 void Perform (int theBegin, int theEnd, const Functor& theFunctor) 0225 { 0226 JobRange aData (theBegin, theEnd); 0227 Job<Functor> aJob (theFunctor, aData); 0228 perform (aJob); 0229 } 0230 0231 //! Release threads before Launcher destruction. 0232 Standard_EXPORT void Release(); 0233 0234 protected: 0235 0236 //! Execute job. 0237 Standard_EXPORT void perform (JobInterface& theJob); 0238 0239 //! Initialize job and start threads. 0240 Standard_EXPORT void run (JobInterface& theJob); 0241 0242 //! Wait threads execution. 0243 Standard_EXPORT void wait(); 0244 0245 private: 0246 Launcher (const Launcher& theCopy); 0247 Launcher& operator=(const Launcher& theCopy); 0248 0249 private: 0250 NCollection_Array1<EnumeratedThread*> myThreads; //!< array of locked threads (including self-thread) 0251 EnumeratedThread mySelfThread; 0252 int myNbThreads; //!< amount of locked threads 0253 }; 0254 0255 protected: 0256 0257 //! Auxiliary class which ensures exclusive access to iterators of processed data pool. 0258 class JobRange 0259 { 0260 public: 0261 0262 //! Constructor 0263 JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {} 0264 0265 //! Returns const link on the first element. 0266 const int& Begin() const { return myBegin; } 0267 0268 //! Returns const link on the last element. 0269 const int& End() const { return myEnd; } 0270 0271 //! Returns first non processed element or end. 0272 //! Thread-safe method. 0273 int It() const { return Standard_Atomic_Increment (reinterpret_cast<volatile int*>(&myIt)) - 1; } 0274 0275 private: 0276 JobRange (const JobRange& theCopy); 0277 JobRange& operator=(const JobRange& theCopy); 0278 0279 private: 0280 const int& myBegin; //!< First element of range 0281 const int& myEnd; //!< Last element of range 0282 mutable int myIt; //!< First non processed element of range 0283 }; 0284 0285 //! Auxiliary wrapper class for thread function. 0286 template<typename FunctorT> class Job : public JobInterface 0287 { 0288 public: 0289 0290 //! Constructor. 0291 Job (const FunctorT& thePerformer, JobRange& theRange) 0292 : myPerformer (thePerformer), myRange (theRange) {} 0293 0294 //! Method is executed in the context of thread. 0295 virtual void Perform (int theThreadIndex) Standard_OVERRIDE 0296 { 0297 for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It()) 0298 { 0299 myPerformer (theThreadIndex, anIter); 0300 } 0301 } 0302 0303 private: 0304 Job (const Job& theCopy); 0305 Job& operator=(const Job& theCopy); 0306 0307 private: //! @name private fields 0308 const FunctorT& myPerformer; //!< Link on functor 0309 const JobRange& myRange; //!< Link on processed data block 0310 }; 0311 0312 //! Release threads. 0313 void release(); 0314 0315 //! Perform the job and catch exceptions. 0316 static void performJob (Handle(Standard_Failure)& theFailure, 0317 OSD_ThreadPool::JobInterface* theJob, 0318 int theThreadIndex); 0319 0320 private: 0321 //! This method should not be called (prohibited). 0322 OSD_ThreadPool (const OSD_ThreadPool& theCopy); 0323 //! This method should not be called (prohibited). 0324 OSD_ThreadPool& operator= (const OSD_ThreadPool& theCopy); 0325 0326 private: 0327 0328 NCollection_Array1<EnumeratedThread> myThreads; //!< array of defined threads (excluding self-thread) 0329 int myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default 0330 bool myShutDown; //!< flag to shut down (destroy) the thread pool 0331 0332 }; 0333 0334 #endif // _OSD_ThreadPool_HeaderFile
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |