Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:01:10

0001 // Copyright (C) 2013-2014 Vicente J. Botet Escriba
0002 //
0003 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
0004 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0005 //
0006 // 2013/09 Vicente J. Botet Escriba
0007 //    Adapt to boost from CCIA C++11 implementation
0008 //    first implementation of a simple pool thread using a vector of threads and a sync_queue.
0009 
0010 #ifndef BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
0011 #define BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
0012 
0013 #include <boost/thread/detail/config.hpp>
0014 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE
0015 
0016 #include <boost/thread/detail/delete.hpp>
0017 #include <boost/thread/detail/move.hpp>
0018 #include <boost/thread/thread.hpp>
0019 #include <boost/thread/concurrent_queues/sync_queue.hpp>
0020 #include <boost/thread/executors/work.hpp>
0021 #include <boost/thread/csbl/vector.hpp>
0022 
0023 #include <boost/config/abi_prefix.hpp>
0024 
0025 namespace boost
0026 {
0027 namespace executors
0028 {
0029   class basic_thread_pool
0030   {
0031   public:
0032     /// type-erasure to store the works to do
0033     typedef  executors::work work;
0034   private:
0035     typedef thread thread_t;
0036     /// A move aware vector type
0037     typedef csbl::vector<thread_t> thread_vector;
0038 
0039     /// A move aware vector
0040     thread_vector threads;
0041     /// the thread safe work queue
0042     concurrent::sync_queue<work > work_queue;
0043 
0044   public:
0045     /**
0046      * Effects: try to execute one task.
0047      * Returns: whether a task has been executed.
0048      * Throws: whatever the current task constructor throws or the task() throws.
0049      */
0050     bool try_executing_one()
0051     {
0052       try
0053       {
0054         work task;
0055         if (work_queue.try_pull(task) == queue_op_status::success)
0056         {
0057           task();
0058           return true;
0059         }
0060         return false;
0061       }
0062       catch (...)
0063       {
0064         std::terminate();
0065         //return false;
0066       }
0067     }
0068     /**
0069      * Effects: schedule one task or yields
0070      * Throws: whatever the current task constructor throws or the task() throws.
0071      */
0072     void schedule_one_or_yield()
0073     {
0074         if ( ! try_executing_one())
0075         {
0076           this_thread::yield();
0077         }
0078     }
0079   private:
0080 
0081     /**
0082      * The main loop of the worker threads
0083      */
0084     void worker_thread()
0085     {
0086       try
0087       {
0088         for(;;)
0089         {
0090           work task;
0091           try
0092           {
0093             queue_op_status st = work_queue.wait_pull(task);
0094             if (st == queue_op_status::closed) {
0095               return;
0096             }
0097             task();
0098           }
0099           catch (boost::thread_interrupted&)
0100           {
0101             return;
0102           }
0103         }
0104       }
0105       catch (...)
0106       {
0107         std::terminate();
0108         return;
0109       }
0110     }
0111 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
0112     template <class AtThreadEntry>
0113     void worker_thread1(AtThreadEntry& at_thread_entry)
0114     {
0115       at_thread_entry(*this);
0116       worker_thread();
0117     }
0118 #endif
0119     void worker_thread2(void(*at_thread_entry)(basic_thread_pool&))
0120     {
0121       at_thread_entry(*this);
0122       worker_thread();
0123     }
0124     template <class AtThreadEntry>
0125     void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
0126     {
0127       at_thread_entry(*this);
0128       worker_thread();
0129     }
0130     static void do_nothing_at_thread_entry(basic_thread_pool&) {}
0131 
0132   public:
0133     /// basic_thread_pool is not copyable.
0134     BOOST_THREAD_NO_COPYABLE(basic_thread_pool)
0135 
0136     /**
0137      * \b Effects: creates a thread pool that runs closures on \c thread_count threads.
0138      *
0139      * \b Throws: Whatever exception is thrown while initializing the needed resources.
0140      */
0141     basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
0142     {
0143       try
0144       {
0145         threads.reserve(thread_count);
0146         for (unsigned i = 0; i < thread_count; ++i)
0147         {
0148 #if 1
0149           thread th (&basic_thread_pool::worker_thread, this);
0150           threads.push_back(thread_t(boost::move(th)));
0151 #else
0152           threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
0153 #endif
0154         }
0155       }
0156       catch (...)
0157       {
0158         close();
0159         throw;
0160       }
0161     }
0162     /**
0163      * \b Effects: creates a thread pool that runs closures on \c thread_count threads
0164      * and executes the at_thread_entry function at the entry of each created thread. .
0165      *
0166      * \b Throws: Whatever exception is thrown while initializing the needed resources.
0167      */
0168 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
0169     template <class AtThreadEntry>
0170     basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry)
0171     {
0172       try
0173       {
0174         threads.reserve(thread_count);
0175         for (unsigned i = 0; i < thread_count; ++i)
0176         {
0177           thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry);
0178           threads.push_back(thread_t(boost::move(th)));
0179           //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
0180         }
0181       }
0182       catch (...)
0183       {
0184         close();
0185         throw;
0186       }
0187     }
0188 #endif
0189     basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&))
0190     {
0191       try
0192       {
0193         threads.reserve(thread_count);
0194         for (unsigned i = 0; i < thread_count; ++i)
0195         {
0196           thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry);
0197           threads.push_back(thread_t(boost::move(th)));
0198           //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
0199         }
0200       }
0201       catch (...)
0202       {
0203         close();
0204         throw;
0205       }
0206     }
0207     template <class AtThreadEntry>
0208     basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
0209     {
0210       try
0211       {
0212         threads.reserve(thread_count);
0213         for (unsigned i = 0; i < thread_count; ++i)
0214         {
0215           thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry));
0216           threads.push_back(thread_t(boost::move(th)));
0217           //threads.push_back(thread_t(&basic_thread_pool::worker_thread, this)); // do not compile
0218         }
0219       }
0220       catch (...)
0221       {
0222         close();
0223         throw;
0224       }
0225     }
0226     /**
0227      * \b Effects: Destroys the thread pool.
0228      *
0229      * \b Synchronization: The completion of all the closures happen before the completion of the \c basic_thread_pool destructor.
0230      */
0231     ~basic_thread_pool()
0232     {
0233       // signal to all the worker threads that there will be no more submissions.
0234       close();
0235       // joins all the threads before destroying the thread pool resources (e.g. the queue).
0236       interrupt_and_join();
0237     }
0238 
0239     /**
0240      * \b Effects: join all the threads.
0241      */
0242     void join()
0243     {
0244       for (unsigned i = 0; i < threads.size(); ++i)
0245       {
0246         //threads[i].interrupt();
0247         threads[i].join();
0248       }
0249     }
0250 
0251     /**
0252      * \b Effects: interrupt all the threads.
0253      */
0254     void interrupt()
0255     {
0256       for (unsigned i = 0; i < threads.size(); ++i)
0257       {
0258         threads[i].interrupt();
0259       }
0260     }
0261 
0262     /**
0263      * \b Effects: interrupt and join all the threads.
0264      */
0265     void interrupt_and_join()
0266     {
0267       for (unsigned i = 0; i < threads.size(); ++i)
0268       {
0269         threads[i].interrupt();
0270         threads[i].join();
0271       }
0272     }
0273 
0274     /**
0275      * \b Effects: close the \c basic_thread_pool for submissions.
0276      * The worker threads will work until there is no more closures to run.
0277      */
0278     void close()
0279     {
0280       work_queue.close();
0281     }
0282 
0283     /**
0284      * \b Returns: whether the pool is closed for submissions.
0285      */
0286     bool closed()
0287     {
0288       return work_queue.closed();
0289     }
0290 
0291     /**
0292      * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
0293      *
0294      * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
0295      * If invoked closure throws an exception the \c basic_thread_pool will call \c std::terminate, as is the case with threads.
0296      *
0297      * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
0298      *
0299      * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
0300      * Whatever exception that can be throw while storing the closure.
0301      */
0302     void submit(BOOST_THREAD_RV_REF(work) closure)  {
0303       work_queue.push(boost::move(closure));
0304     }
0305 
0306 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
0307     template <typename Closure>
0308     void submit(Closure & closure)
0309     {
0310       submit(work(closure));
0311     }
0312 #endif
0313     void submit(void (*closure)())
0314     {
0315       submit(work(closure));
0316     }
0317 
0318     template <typename Closure>
0319     void submit(BOOST_THREAD_FWD_REF(Closure) closure)
0320     {
0321       //submit(work(boost::forward<Closure>(closure)));
0322       work w((boost::forward<Closure>(closure)));
0323       submit(boost::move(w));
0324     }
0325 
0326     /**
0327      * \b Requires: This must be called from an scheduled task.
0328      *
0329      * \b Effects: reschedule functions until pred()
0330      */
0331     template <typename Pred>
0332     bool reschedule_until(Pred const& pred)
0333     {
0334       do {
0335         if ( ! try_executing_one())
0336         {
0337           return false;
0338         }
0339       } while (! pred());
0340       return true;
0341     }
0342 
0343   };
0344 }
0345 using executors::basic_thread_pool;
0346 
0347 }
0348 
0349 #include <boost/config/abi_suffix.hpp>
0350 
0351 #endif
0352 #endif