Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:01:11

0001 // Copyright (C) 2013 Vicente J. Botet Escriba
0002 //
0003 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
0004 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
0005 //
0006 // 2013/11 Vicente J. Botet Escriba
0007 //    first implementation of a simple serial scheduler.
0008 
0009 #ifndef BOOST_THREAD_SERIAL_EXECUTOR_HPP
0010 #define BOOST_THREAD_SERIAL_EXECUTOR_HPP
0011 
0012 #include <boost/thread/detail/config.hpp>
0013 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE
0014 
0015 #include <exception>
0016 #include <boost/thread/detail/delete.hpp>
0017 #include <boost/thread/detail/move.hpp>
0018 #include <boost/thread/concurrent_queues/sync_queue.hpp>
0019 #include <boost/thread/executors/work.hpp>
0020 #include <boost/thread/executors/generic_executor_ref.hpp>
0021 #include <boost/thread/future.hpp>
0022 #include <boost/thread/scoped_thread.hpp>
0023 
0024 #include <boost/config/abi_prefix.hpp>
0025 
0026 #if defined(BOOST_MSVC)
0027 # pragma warning(push)
0028 # pragma warning(disable: 4355) // 'this' : used in base member initializer list
0029 #endif
0030 
0031 namespace boost
0032 {
0033 namespace executors
0034 {
0035   class serial_executor
0036   {
0037   public:
0038     /// type-erasure to store the works to do
0039     typedef  executors::work work;
0040   private:
0041     typedef  scoped_thread<> thread_t;
0042 
0043     /// the thread safe work queue
0044     concurrent::sync_queue<work > work_queue;
0045     generic_executor_ref ex;
0046     thread_t thr;
0047 
0048     struct try_executing_one_task {
0049       work& task;
0050       boost::promise<void> &p;
0051       try_executing_one_task(work& task, boost::promise<void> &p)
0052       : task(task), p(p) {}
0053       void operator()() {
0054         try {
0055           task();
0056           p.set_value();
0057         } catch (...)
0058         {
0059           p.set_exception(current_exception());
0060         }
0061       }
0062     };
0063   public:
0064     /**
0065      * \par Returns
0066      * The underlying executor wrapped on a generic executor reference.
0067      */
0068     generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; }
0069 
0070     /**
0071      * Effects: try to execute one task.
0072      * Returns: whether a task has been executed.
0073      * Throws: whatever the current task constructor throws or the task() throws.
0074      */
0075     bool try_executing_one()
0076     {
0077       work task;
0078       try
0079       {
0080         if (work_queue.try_pull(task) == queue_op_status::success)
0081         {
0082           boost::promise<void> p;
0083           try_executing_one_task tmp(task,p);
0084           ex.submit(tmp);
0085           p.get_future().wait();
0086           return true;
0087         }
0088         return false;
0089       }
0090       catch (...)
0091       {
0092         std::terminate();
0093         //return false;
0094       }
0095     }
0096   private:
0097     /**
0098      * Effects: schedule one task or yields
0099      * Throws: whatever the current task constructor throws or the task() throws.
0100      */
0101     void schedule_one_or_yield()
0102     {
0103         if ( ! try_executing_one())
0104         {
0105           this_thread::yield();
0106         }
0107     }
0108 
0109     /**
0110      * The main loop of the worker thread
0111      */
0112     void worker_thread()
0113     {
0114       while (!closed())
0115       {
0116         schedule_one_or_yield();
0117       }
0118       while (try_executing_one())
0119       {
0120       }
0121     }
0122 
0123   public:
0124     /// serial_executor is not copyable.
0125     BOOST_THREAD_NO_COPYABLE(serial_executor)
0126 
0127     /**
0128      * \b Effects: creates a thread pool that runs closures using one of its closure-executing methods.
0129      *
0130      * \b Throws: Whatever exception is thrown while initializing the needed resources.
0131      */
0132     template <class Executor>
0133     serial_executor(Executor& ex)
0134     : ex(ex), thr(&serial_executor::worker_thread, this)
0135     {
0136     }
0137     /**
0138      * \b Effects: Destroys the thread pool.
0139      *
0140      * \b Synchronization: The completion of all the closures happen before the completion of the \c serial_executor destructor.
0141      */
0142     ~serial_executor()
0143     {
0144       // signal to the worker thread that there will be no more submissions.
0145       close();
0146     }
0147 
0148     /**
0149      * \b Effects: close the \c serial_executor for submissions.
0150      * The loop will work until there is no more closures to run.
0151      */
0152     void close()
0153     {
0154       work_queue.close();
0155     }
0156 
0157     /**
0158      * \b Returns: whether the pool is closed for submissions.
0159      */
0160     bool closed()
0161     {
0162       return work_queue.closed();
0163     }
0164 
0165     /**
0166      * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
0167      *
0168      * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
0169      * If invoked closure throws an exception the \c serial_executor will call \c std::terminate, as is the case with threads.
0170      *
0171      * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
0172      *
0173      * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
0174      * Whatever exception that can be throw while storing the closure.
0175      */
0176     void submit(BOOST_THREAD_RV_REF(work) closure)
0177     {
0178       work_queue.push(boost::move(closure));
0179     }
0180 
0181 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
0182     template <typename Closure>
0183     void submit(Closure & closure)
0184     {
0185       submit(work(closure));
0186     }
0187 #endif
0188     void submit(void (*closure)())
0189     {
0190       submit(work(closure));
0191     }
0192 
0193     template <typename Closure>
0194     void submit(BOOST_THREAD_FWD_REF(Closure) closure)
0195     {
0196       work w((boost::forward<Closure>(closure)));
0197       submit(boost::move(w));
0198     }
0199 
0200     /**
0201      * \b Requires: This must be called from an scheduled task.
0202      *
0203      * \b Effects: reschedule functions until pred()
0204      */
0205     template <typename Pred>
0206     bool reschedule_until(Pred const& pred)
0207     {
0208       do {
0209         if ( ! try_executing_one())
0210         {
0211           return false;
0212         }
0213       } while (! pred());
0214       return true;
0215     }
0216 
0217   };
0218 }
0219 using executors::serial_executor;
0220 }
0221 
0222 #if defined(BOOST_MSVC)
0223 # pragma warning(pop)
0224 #endif
0225 
0226 #include <boost/config/abi_suffix.hpp>
0227 
0228 #endif
0229 #endif