Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:04:37

0001 // Created by: Kirill Gavrilov
0002 // Copyright (c) 2017-2019 OPEN CASCADE SAS
0003 //
0004 // This file is part of Open CASCADE Technology software library.
0005 //
0006 // This library is free software; you can redistribute it and/or modify it under
0007 // the terms of the GNU Lesser General Public License version 2.1 as published
0008 // by the Free Software Foundation, with special exception defined in the file
0009 // OCCT_LGPL_EXCEPTION.txt. Consult the file LICENSE_LGPL_21.txt included in OCCT
0010 // distribution for complete text of the license and disclaimer of any warranty.
0011 //
0012 // Alternatively, this file may be used under the terms of Open CASCADE
0013 // commercial license or contractual agreement.
0014 
0015 #ifndef _OSD_ThreadPool_HeaderFile
0016 #define _OSD_ThreadPool_HeaderFile
0017 
0018 #include <NCollection_Array1.hxx>
0019 #include <OSD_Thread.hxx>
0020 #include <Standard_Atomic.hxx>
0021 #include <Standard_Condition.hxx>
0022 #include <Standard_Mutex.hxx>
0023 
0024 //! Class defining a thread pool for executing algorithms in multi-threaded mode.
0025 //! Thread pool allocates requested amount of threads and keep them alive
0026 //! (in sleep mode when unused) during thread pool lifetime.
0027 //! The same pool can be used by multiple consumers,
0028 //! including nested multi-threading algorithms and concurrent threads:
0029 //! - Thread pool can be used either by multi-threaded algorithm by creating OSD_ThreadPool::Launcher.
0030 //!   The functor performing a job takes two parameters - Thread Index and Data Index:
0031 //!     void operator(int theThreadIndex, int theDataIndex){}
0032 //!   Multi-threaded algorithm may rely on Thread Index for allocating thread-local variables in array form,
0033 //!   since the Thread Index is guaranteed to be within range OSD_ThreadPool::Lower() and OSD_ThreadPool::Upper().
0034 //! - Default thread pool (OSD_ThreadPool::DefaultPool()) can be used in general case,
0035 //!   but application may prefer creating a dedicated pool for better control.
0036 //! - Default thread pool allocates the amount of threads considering concurrency
0037 //!   level of the system (amount of logical processors).
0038 //!   This can be overridden during OSD_ThreadPool construction or by calling OSD_ThreadPool::Init()
0039 //!   (the pool should not be used!).
0040 //! - OSD_ThreadPool::Launcher reserves specific amount of threads from the pool for executing multi-threaded Job.
0041 //!   Normally, single Launcher instance will occupy all threads available in thread pool,
0042 //!   so that nested multi-threaded algorithms (within the same thread)
0043 //!   and concurrent threads trying to use the same thread pool will run sequentially.
0044 //!   This behavior is affected by OSD_ThreadPool::NbDefaultThreadsToLaunch() parameter
0045 //!   and Launcher constructor, so that single Launcher instance will occupy not all threads
0046 //!   in the pool allowing other threads to be used concurrently.
0047 //! - OSD_ThreadPool::Launcher locks thread one-by-one from thread pool in a thread-safe way.
0048 //! - Each working thread catches exceptions occurred during job execution, and Launcher will
0049 //!   throw Standard_Failure in a caller thread on completed execution.
0050 class OSD_ThreadPool : public Standard_Transient
0051 {
0052   DEFINE_STANDARD_RTTIEXT(OSD_ThreadPool, Standard_Transient)
0053 public:
0054 
0055   //! Return (or create) a default thread pool.
0056   //! Number of threads argument will be considered only when called first time.
0057   Standard_EXPORT static const Handle(OSD_ThreadPool)& DefaultPool (int theNbThreads = -1);
0058 
0059 public:
0060 
0061   //! Main constructor.
0062   //! Application may consider specifying more threads than actually
0063   //! available (OSD_Parallel::NbLogicalProcessors()) and set up NbDefaultThreadsToLaunch() to a smaller value
0064   //! so that concurrent threads will be able using single Thread Pool instance more efficiently.
0065   //! @param theNbThreads threads number to be created by pool
0066   //!                     (if -1 is specified then OSD_Parallel::NbLogicalProcessors() will be used)
0067   Standard_EXPORT OSD_ThreadPool (int theNbThreads = -1);
0068 
0069   //! Destructor.
0070   Standard_EXPORT virtual ~OSD_ThreadPool();
0071 
0072   //! Return TRUE if at least 2 threads are available (including self-thread).
0073   bool HasThreads() const { return NbThreads() >= 2; }
0074 
0075   //! Return the lower thread index.
0076   int LowerThreadIndex() const { return 0; }
0077 
0078   //! Return the upper thread index (last index is reserved for self-thread).
0079   int UpperThreadIndex() const { return LowerThreadIndex() + myThreads.Size(); }
0080 
0081   //! Return the number of threads; >= 1.
0082   int NbThreads() const { return myThreads.Size() + 1; }
0083 
0084   //! Return maximum number of threads to be locked by a single Launcher object by default;
0085   //! the entire thread pool size is returned by default.
0086   int NbDefaultThreadsToLaunch() const { return myNbDefThreads; }
0087 
0088   //! Set maximum number of threads to be locked by a single Launcher object by default.
0089   //! Should be set BEFORE first usage.
0090   void SetNbDefaultThreadsToLaunch (int theNbThreads) { myNbDefThreads = theNbThreads; }
0091 
0092   //! Checks if thread pools has active consumers.
0093   Standard_EXPORT bool IsInUse();
0094 
0095   //! Reinitialize the thread pool with a different number of threads.
0096   //! Should be called only with no active jobs, or exception Standard_ProgramError will be thrown!
0097   Standard_EXPORT void Init (int theNbThreads);
0098 
0099 protected:
0100 
0101   //! Thread function interface.
0102   class JobInterface
0103   {
0104   public:
0105     virtual void Perform (int theThreadIndex) = 0;
0106   };
0107 
0108   //! Thread with back reference to thread pool and thread index in it.
0109   class EnumeratedThread : public OSD_Thread
0110   {
0111     friend class OSD_ThreadPool;
0112   public:
0113     //! Main constructor.
0114     EnumeratedThread (bool theIsSelfThread = false)
0115     : myPool (NULL), myJob (NULL), myWakeEvent (false),
0116       myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
0117       myIsStarted (false), myToCatchFpe (false),
0118       myIsSelfThread (theIsSelfThread) {}
0119 
0120     //! Occupy this thread for thread pool launcher.
0121     //! @return TRUE on success, or FALSE if thread has been already occupied
0122     Standard_EXPORT bool Lock();
0123 
0124     //! Release this thread for thread pool launcher; should be called only after successful OccupyThread().
0125     Standard_EXPORT void Free();
0126 
0127     //! Wake up the thread.
0128     Standard_EXPORT void WakeUp (JobInterface* theJob, bool theToCatchFpe);
0129 
0130     //! Wait the thread going into Idle state (finished jobs).
0131     Standard_EXPORT void WaitIdle();
0132 
0133   public:
0134 
0135     //! Copy constructor.
0136     EnumeratedThread (const EnumeratedThread& theCopy)
0137     : OSD_Thread(),
0138       myPool (NULL), myJob (NULL), myWakeEvent (false),
0139       myIdleEvent (false), myThreadIndex (0), myUsageCounter(0),
0140       myIsStarted (false), myToCatchFpe (false),
0141       myIsSelfThread (false) { Assign (theCopy); }
0142 
0143     //! Assignment operator.
0144     EnumeratedThread& operator= (const EnumeratedThread& theCopy)
0145     {
0146       Assign (theCopy);
0147       return *this;
0148     }
0149 
0150     //! Assignment operator.
0151     void Assign (const EnumeratedThread& theCopy)
0152     {
0153       OSD_Thread::Assign (theCopy);
0154       myPool         = theCopy.myPool;
0155       myJob          = theCopy.myJob;
0156       myThreadIndex  = theCopy.myThreadIndex;
0157       myToCatchFpe   = theCopy.myToCatchFpe;
0158       myIsSelfThread = theCopy.myIsSelfThread;
0159     }
0160 
0161   private:
0162 
0163     //! Method is executed in the context of thread.
0164     void performThread();
0165 
0166     //! Method is executed in the context of thread.
0167     static Standard_Address runThread (Standard_Address theTask);
0168 
0169   private:
0170     OSD_ThreadPool* myPool;
0171     JobInterface* myJob;
0172     Handle(Standard_Failure) myFailure;
0173     Standard_Condition myWakeEvent;
0174     Standard_Condition myIdleEvent;
0175     int myThreadIndex;
0176     volatile int myUsageCounter;
0177     bool myIsStarted;
0178     bool myToCatchFpe;
0179     bool myIsSelfThread;
0180   };
0181 
0182 public:
0183 
0184   //! Launcher object locking a subset of threads (or all threads)
0185   //! in a thread pool to perform parallel execution of the job.
0186   class Launcher
0187   {
0188   public:
0189     //! Lock specified number of threads from the thread pool.
0190     //! If thread pool is already locked by another user,
0191     //! Launcher will lock as many threads as possible
0192     //! (if none will be locked, then single threaded execution will be done).
0193     //! @param thePool       thread pool to lock the threads
0194     //! @param theMaxThreads number of threads to lock;
0195     //!                      -1 specifies that default number of threads
0196     //!                      to be used OSD_ThreadPool::NbDefaultThreadsToLaunch()
0197     Standard_EXPORT Launcher (OSD_ThreadPool& thePool, int theMaxThreads = -1);
0198 
0199     //! Release threads.
0200     ~Launcher() { Release(); }
0201 
0202     //! Return TRUE if at least 2 threads have been locked for parallel execution (including self-thread);
0203     //! otherwise, the functor will be executed within the caller thread.
0204     bool HasThreads() const { return myNbThreads >= 2; }
0205 
0206     //! Return amount of locked threads; >= 1.
0207     int NbThreads() const { return myNbThreads; }
0208 
0209     //! Return the lower thread index.
0210     int LowerThreadIndex() const { return 0; }
0211 
0212     //! Return the upper thread index (last index is reserved for the self-thread).
0213     int UpperThreadIndex() const { return LowerThreadIndex() + myNbThreads - 1; }
0214 
0215     //! Simple primitive for parallelization of "for" loops, e.g.:
0216     //! @code
0217     //!   for (int anIter = theBegin; anIter < theEnd; ++anIter) {}
0218     //! @endcode
0219     //! @param theBegin   the first data index (inclusive)
0220     //! @param theEnd     the last  data index (exclusive)
0221     //! @param theFunctor functor providing an interface
0222     //!                   "void operator(int theThreadIndex, int theDataIndex){}" performing task for specified index
0223     template<typename Functor>
0224     void Perform (int theBegin, int theEnd, const Functor& theFunctor)
0225     {
0226       JobRange aData (theBegin, theEnd);
0227       Job<Functor> aJob (theFunctor, aData);
0228       perform (aJob);
0229     }
0230 
0231     //! Release threads before Launcher destruction.
0232     Standard_EXPORT void Release();
0233 
0234   protected:
0235 
0236     //! Execute job.
0237     Standard_EXPORT void perform (JobInterface& theJob);
0238 
0239     //! Initialize job and start threads.
0240     Standard_EXPORT void run (JobInterface& theJob);
0241 
0242     //! Wait threads execution.
0243     Standard_EXPORT void wait();
0244 
0245   private:
0246     Launcher           (const Launcher& theCopy);
0247     Launcher& operator=(const Launcher& theCopy);
0248 
0249   private:
0250     NCollection_Array1<EnumeratedThread*> myThreads; //!< array of locked threads (including self-thread)
0251     EnumeratedThread mySelfThread;
0252     int myNbThreads; //!< amount of locked threads
0253   };
0254 
0255 protected:
0256 
0257   //! Auxiliary class which ensures exclusive access to iterators of processed data pool.
0258   class JobRange
0259   {
0260   public:
0261 
0262     //! Constructor
0263     JobRange (const int& theBegin, const int& theEnd) : myBegin(theBegin), myEnd (theEnd), myIt (theBegin) {}
0264 
0265     //! Returns const link on the first element.
0266     const int& Begin() const { return myBegin; }
0267 
0268     //! Returns const link on the last element.
0269     const int& End() const { return myEnd; }
0270 
0271     //! Returns first non processed element or end.
0272     //! Thread-safe method.
0273     int It() const { return Standard_Atomic_Increment (reinterpret_cast<volatile int*>(&myIt)) - 1; }
0274 
0275   private:
0276     JobRange           (const JobRange& theCopy);
0277     JobRange& operator=(const JobRange& theCopy);
0278 
0279   private:
0280     const   int& myBegin; //!< First element of range
0281     const   int& myEnd;   //!< Last  element of range
0282     mutable int  myIt;    //!< First non processed element of range
0283   };
0284 
0285   //! Auxiliary wrapper class for thread function.
0286   template<typename FunctorT> class Job : public JobInterface
0287   {
0288   public:
0289 
0290     //! Constructor.
0291     Job (const FunctorT& thePerformer, JobRange& theRange)
0292     : myPerformer (thePerformer), myRange (theRange) {}
0293 
0294     //! Method is executed in the context of thread.
0295     virtual void Perform (int theThreadIndex) Standard_OVERRIDE
0296     {
0297       for (Standard_Integer anIter = myRange.It(); anIter < myRange.End(); anIter = myRange.It())
0298       {
0299         myPerformer (theThreadIndex, anIter);
0300       }
0301     }
0302 
0303   private:
0304     Job           (const Job& theCopy);
0305     Job& operator=(const Job& theCopy);
0306 
0307   private: //! @name private fields
0308     const FunctorT& myPerformer; //!< Link on functor
0309     const JobRange& myRange;     //!< Link on processed data block
0310   };
0311 
0312   //! Release threads.
0313   void release();
0314 
0315   //! Perform the job and catch exceptions.
0316   static void performJob (Handle(Standard_Failure)& theFailure,
0317                           OSD_ThreadPool::JobInterface* theJob,
0318                           int theThreadIndex);
0319 
0320 private:
0321   //! This method should not be called (prohibited).
0322   OSD_ThreadPool (const OSD_ThreadPool& theCopy);
0323   //! This method should not be called (prohibited).
0324   OSD_ThreadPool& operator= (const OSD_ThreadPool& theCopy);
0325 
0326 private:
0327 
0328   NCollection_Array1<EnumeratedThread> myThreads; //!< array of defined threads (excluding self-thread)
0329   int  myNbDefThreads; //!< maximum number of threads to be locked by a single Launcher by default
0330   bool myShutDown;     //!< flag to shut down (destroy) the thread pool
0331 
0332 };
0333 
0334 #endif // _OSD_ThreadPool_HeaderFile