File indexing completed on 2025-01-30 10:01:10
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
0011 #define BOOST_THREAD_EXECUTORS_BASIC_THREAD_POOL_HPP
0012
0013 #include <boost/thread/detail/config.hpp>
0014 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION && defined BOOST_THREAD_PROVIDES_EXECUTORS && defined BOOST_THREAD_USES_MOVE
0015
0016 #include <boost/thread/detail/delete.hpp>
0017 #include <boost/thread/detail/move.hpp>
0018 #include <boost/thread/thread.hpp>
0019 #include <boost/thread/concurrent_queues/sync_queue.hpp>
0020 #include <boost/thread/executors/work.hpp>
0021 #include <boost/thread/csbl/vector.hpp>
0022
0023 #include <boost/config/abi_prefix.hpp>
0024
0025 namespace boost
0026 {
0027 namespace executors
0028 {
0029 class basic_thread_pool
0030 {
0031 public:
0032
0033 typedef executors::work work;
0034 private:
0035 typedef thread thread_t;
0036
0037 typedef csbl::vector<thread_t> thread_vector;
0038
0039
0040 thread_vector threads;
0041
0042 concurrent::sync_queue<work > work_queue;
0043
0044 public:
0045
0046
0047
0048
0049
0050 bool try_executing_one()
0051 {
0052 try
0053 {
0054 work task;
0055 if (work_queue.try_pull(task) == queue_op_status::success)
0056 {
0057 task();
0058 return true;
0059 }
0060 return false;
0061 }
0062 catch (...)
0063 {
0064 std::terminate();
0065
0066 }
0067 }
0068
0069
0070
0071
0072 void schedule_one_or_yield()
0073 {
0074 if ( ! try_executing_one())
0075 {
0076 this_thread::yield();
0077 }
0078 }
0079 private:
0080
0081
0082
0083
0084 void worker_thread()
0085 {
0086 try
0087 {
0088 for(;;)
0089 {
0090 work task;
0091 try
0092 {
0093 queue_op_status st = work_queue.wait_pull(task);
0094 if (st == queue_op_status::closed) {
0095 return;
0096 }
0097 task();
0098 }
0099 catch (boost::thread_interrupted&)
0100 {
0101 return;
0102 }
0103 }
0104 }
0105 catch (...)
0106 {
0107 std::terminate();
0108 return;
0109 }
0110 }
0111 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
0112 template <class AtThreadEntry>
0113 void worker_thread1(AtThreadEntry& at_thread_entry)
0114 {
0115 at_thread_entry(*this);
0116 worker_thread();
0117 }
0118 #endif
0119 void worker_thread2(void(*at_thread_entry)(basic_thread_pool&))
0120 {
0121 at_thread_entry(*this);
0122 worker_thread();
0123 }
0124 template <class AtThreadEntry>
0125 void worker_thread3(BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
0126 {
0127 at_thread_entry(*this);
0128 worker_thread();
0129 }
0130 static void do_nothing_at_thread_entry(basic_thread_pool&) {}
0131
0132 public:
0133
0134 BOOST_THREAD_NO_COPYABLE(basic_thread_pool)
0135
0136
0137
0138
0139
0140
0141 basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
0142 {
0143 try
0144 {
0145 threads.reserve(thread_count);
0146 for (unsigned i = 0; i < thread_count; ++i)
0147 {
0148 #if 1
0149 thread th (&basic_thread_pool::worker_thread, this);
0150 threads.push_back(thread_t(boost::move(th)));
0151 #else
0152 threads.push_back(thread_t(&basic_thread_pool::worker_thread, this));
0153 #endif
0154 }
0155 }
0156 catch (...)
0157 {
0158 close();
0159 throw;
0160 }
0161 }
0162
0163
0164
0165
0166
0167
0168 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
0169 template <class AtThreadEntry>
0170 basic_thread_pool( unsigned const thread_count, AtThreadEntry& at_thread_entry)
0171 {
0172 try
0173 {
0174 threads.reserve(thread_count);
0175 for (unsigned i = 0; i < thread_count; ++i)
0176 {
0177 thread th (&basic_thread_pool::worker_thread1<AtThreadEntry>, this, at_thread_entry);
0178 threads.push_back(thread_t(boost::move(th)));
0179
0180 }
0181 }
0182 catch (...)
0183 {
0184 close();
0185 throw;
0186 }
0187 }
0188 #endif
0189 basic_thread_pool( unsigned const thread_count, void(*at_thread_entry)(basic_thread_pool&))
0190 {
0191 try
0192 {
0193 threads.reserve(thread_count);
0194 for (unsigned i = 0; i < thread_count; ++i)
0195 {
0196 thread th (&basic_thread_pool::worker_thread2, this, at_thread_entry);
0197 threads.push_back(thread_t(boost::move(th)));
0198
0199 }
0200 }
0201 catch (...)
0202 {
0203 close();
0204 throw;
0205 }
0206 }
0207 template <class AtThreadEntry>
0208 basic_thread_pool( unsigned const thread_count, BOOST_THREAD_FWD_REF(AtThreadEntry) at_thread_entry)
0209 {
0210 try
0211 {
0212 threads.reserve(thread_count);
0213 for (unsigned i = 0; i < thread_count; ++i)
0214 {
0215 thread th (&basic_thread_pool::worker_thread3<AtThreadEntry>, this, boost::forward<AtThreadEntry>(at_thread_entry));
0216 threads.push_back(thread_t(boost::move(th)));
0217
0218 }
0219 }
0220 catch (...)
0221 {
0222 close();
0223 throw;
0224 }
0225 }
0226
0227
0228
0229
0230
0231 ~basic_thread_pool()
0232 {
0233
0234 close();
0235
0236 interrupt_and_join();
0237 }
0238
0239
0240
0241
0242 void join()
0243 {
0244 for (unsigned i = 0; i < threads.size(); ++i)
0245 {
0246
0247 threads[i].join();
0248 }
0249 }
0250
0251
0252
0253
0254 void interrupt()
0255 {
0256 for (unsigned i = 0; i < threads.size(); ++i)
0257 {
0258 threads[i].interrupt();
0259 }
0260 }
0261
0262
0263
0264
0265 void interrupt_and_join()
0266 {
0267 for (unsigned i = 0; i < threads.size(); ++i)
0268 {
0269 threads[i].interrupt();
0270 threads[i].join();
0271 }
0272 }
0273
0274
0275
0276
0277
0278 void close()
0279 {
0280 work_queue.close();
0281 }
0282
0283
0284
0285
0286 bool closed()
0287 {
0288 return work_queue.closed();
0289 }
0290
0291
0292
0293
0294
0295
0296
0297
0298
0299
0300
0301
0302 void submit(BOOST_THREAD_RV_REF(work) closure) {
0303 work_queue.push(boost::move(closure));
0304 }
0305
0306 #if defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
0307 template <typename Closure>
0308 void submit(Closure & closure)
0309 {
0310 submit(work(closure));
0311 }
0312 #endif
0313 void submit(void (*closure)())
0314 {
0315 submit(work(closure));
0316 }
0317
0318 template <typename Closure>
0319 void submit(BOOST_THREAD_FWD_REF(Closure) closure)
0320 {
0321
0322 work w((boost::forward<Closure>(closure)));
0323 submit(boost::move(w));
0324 }
0325
0326
0327
0328
0329
0330
0331 template <typename Pred>
0332 bool reschedule_until(Pred const& pred)
0333 {
0334 do {
0335 if ( ! try_executing_one())
0336 {
0337 return false;
0338 }
0339 } while (! pred());
0340 return true;
0341 }
0342
0343 };
0344 }
0345 using executors::basic_thread_pool;
0346
0347 }
0348
0349 #include <boost/config/abi_suffix.hpp>
0350
0351 #endif
0352 #endif