Warning, file /include/eigen3/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
0011 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
0012
0013 namespace Eigen {
0014
0015 template <typename Environment>
0016 class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
0017 public:
0018 typedef typename Environment::Task Task;
0019 typedef RunQueue<Task, 1024> Queue;
0020
0021 ThreadPoolTempl(int num_threads, Environment env = Environment())
0022 : ThreadPoolTempl(num_threads, true, env) {}
0023
0024 ThreadPoolTempl(int num_threads, bool allow_spinning,
0025 Environment env = Environment())
0026 : env_(env),
0027 num_threads_(num_threads),
0028 allow_spinning_(allow_spinning),
0029 thread_data_(num_threads),
0030 all_coprimes_(num_threads),
0031 waiters_(num_threads),
0032 global_steal_partition_(EncodePartition(0, num_threads_)),
0033 blocked_(0),
0034 spinning_(0),
0035 done_(false),
0036 cancelled_(false),
0037 ec_(waiters_) {
0038 waiters_.resize(num_threads_);
0039
0040
0041
0042
0043
0044
0045
0046 eigen_plain_assert(num_threads_ < kMaxThreads);
0047 for (int i = 1; i <= num_threads_; ++i) {
0048 all_coprimes_.emplace_back(i);
0049 ComputeCoprimes(i, &all_coprimes_.back());
0050 }
0051 #ifndef EIGEN_THREAD_LOCAL
0052 init_barrier_.reset(new Barrier(num_threads_));
0053 #endif
0054 thread_data_.resize(num_threads_);
0055 for (int i = 0; i < num_threads_; i++) {
0056 SetStealPartition(i, EncodePartition(0, num_threads_));
0057 thread_data_[i].thread.reset(
0058 env_.CreateThread([this, i]() { WorkerLoop(i); }));
0059 }
0060 #ifndef EIGEN_THREAD_LOCAL
0061
0062
0063 init_barrier_->Wait();
0064 #endif
0065 }
0066
0067 ~ThreadPoolTempl() {
0068 done_ = true;
0069
0070
0071
0072
0073 if (!cancelled_) {
0074 ec_.Notify(true);
0075 } else {
0076
0077
0078 for (size_t i = 0; i < thread_data_.size(); i++) {
0079 thread_data_[i].queue.Flush();
0080 }
0081 }
0082
0083
0084 for (size_t i = 0; i < thread_data_.size(); ++i)
0085 thread_data_[i].thread.reset();
0086 }
0087
0088 void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) {
0089 eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_));
0090
0091
0092 for (int i = 0; i < num_threads_; i++) {
0093 const auto& pair = partitions[i];
0094 unsigned start = pair.first, end = pair.second;
0095 AssertBounds(start, end);
0096 unsigned val = EncodePartition(start, end);
0097 SetStealPartition(i, val);
0098 }
0099 }
0100
0101 void Schedule(std::function<void()> fn) EIGEN_OVERRIDE {
0102 ScheduleWithHint(std::move(fn), 0, num_threads_);
0103 }
0104
0105 void ScheduleWithHint(std::function<void()> fn, int start,
0106 int limit) override {
0107 Task t = env_.CreateTask(std::move(fn));
0108 PerThread* pt = GetPerThread();
0109 if (pt->pool == this) {
0110
0111 Queue& q = thread_data_[pt->thread_id].queue;
0112 t = q.PushFront(std::move(t));
0113 } else {
0114
0115
0116 eigen_plain_assert(start < limit);
0117 eigen_plain_assert(limit <= num_threads_);
0118 int num_queues = limit - start;
0119 int rnd = Rand(&pt->rand) % num_queues;
0120 eigen_plain_assert(start + rnd < limit);
0121 Queue& q = thread_data_[start + rnd].queue;
0122 t = q.PushBack(std::move(t));
0123 }
0124
0125
0126
0127
0128
0129
0130
0131 if (!t.f) {
0132 ec_.Notify(false);
0133 } else {
0134 env_.ExecuteTask(t);
0135 }
0136 }
0137
0138 void Cancel() EIGEN_OVERRIDE {
0139 cancelled_ = true;
0140 done_ = true;
0141
0142
0143 #ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
0144 for (size_t i = 0; i < thread_data_.size(); i++) {
0145 thread_data_[i].thread->OnCancel();
0146 }
0147 #endif
0148
0149
0150 ec_.Notify(true);
0151 }
0152
0153 int NumThreads() const EIGEN_FINAL { return num_threads_; }
0154
0155 int CurrentThreadId() const EIGEN_FINAL {
0156 const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
0157 if (pt->pool == this) {
0158 return pt->thread_id;
0159 } else {
0160 return -1;
0161 }
0162 }
0163
0164 private:
0165
0166
0167
0168
0169
0170
0171
0172 static const int kMaxPartitionBits = 16;
0173 static const int kMaxThreads = 1 << kMaxPartitionBits;
0174
0175 inline unsigned EncodePartition(unsigned start, unsigned limit) {
0176 return (start << kMaxPartitionBits) | limit;
0177 }
0178
0179 inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) {
0180 *limit = val & (kMaxThreads - 1);
0181 val >>= kMaxPartitionBits;
0182 *start = val;
0183 }
0184
0185 void AssertBounds(int start, int end) {
0186 eigen_plain_assert(start >= 0);
0187 eigen_plain_assert(start < end);
0188 eigen_plain_assert(end <= num_threads_);
0189 }
0190
0191 inline void SetStealPartition(size_t i, unsigned val) {
0192 thread_data_[i].steal_partition.store(val, std::memory_order_relaxed);
0193 }
0194
0195 inline unsigned GetStealPartition(int i) {
0196 return thread_data_[i].steal_partition.load(std::memory_order_relaxed);
0197 }
0198
0199 void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) {
0200 for (int i = 1; i <= N; i++) {
0201 unsigned a = i;
0202 unsigned b = N;
0203
0204 while (b != 0) {
0205 unsigned tmp = a;
0206 a = b;
0207 b = tmp % b;
0208 }
0209 if (a == 1) {
0210 coprimes->push_back(i);
0211 }
0212 }
0213 }
0214
0215 typedef typename Environment::EnvThread Thread;
0216
0217 struct PerThread {
0218 constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
0219 ThreadPoolTempl* pool;
0220 uint64_t rand;
0221 int thread_id;
0222 #ifndef EIGEN_THREAD_LOCAL
0223
0224 char pad_[128];
0225 #endif
0226 };
0227
0228 struct ThreadData {
0229 constexpr ThreadData() : thread(), steal_partition(0), queue() {}
0230 std::unique_ptr<Thread> thread;
0231 std::atomic<unsigned> steal_partition;
0232 Queue queue;
0233 };
0234
0235 Environment env_;
0236 const int num_threads_;
0237 const bool allow_spinning_;
0238 MaxSizeVector<ThreadData> thread_data_;
0239 MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
0240 MaxSizeVector<EventCount::Waiter> waiters_;
0241 unsigned global_steal_partition_;
0242 std::atomic<unsigned> blocked_;
0243 std::atomic<bool> spinning_;
0244 std::atomic<bool> done_;
0245 std::atomic<bool> cancelled_;
0246 EventCount ec_;
0247 #ifndef EIGEN_THREAD_LOCAL
0248 std::unique_ptr<Barrier> init_barrier_;
0249 std::mutex per_thread_map_mutex_;
0250 std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
0251 #endif
0252
0253
0254 void WorkerLoop(int thread_id) {
0255 #ifndef EIGEN_THREAD_LOCAL
0256 std::unique_ptr<PerThread> new_pt(new PerThread());
0257 per_thread_map_mutex_.lock();
0258 bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second;
0259 eigen_plain_assert(insertOK);
0260 EIGEN_UNUSED_VARIABLE(insertOK);
0261 per_thread_map_mutex_.unlock();
0262 init_barrier_->Notify();
0263 init_barrier_->Wait();
0264 #endif
0265 PerThread* pt = GetPerThread();
0266 pt->pool = this;
0267 pt->rand = GlobalThreadIdHash();
0268 pt->thread_id = thread_id;
0269 Queue& q = thread_data_[thread_id].queue;
0270 EventCount::Waiter* waiter = &waiters_[thread_id];
0271
0272
0273
0274
0275 const int spin_count =
0276 allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0;
0277 if (num_threads_ == 1) {
0278
0279
0280
0281
0282
0283
0284 while (!cancelled_) {
0285 Task t = q.PopFront();
0286 for (int i = 0; i < spin_count && !t.f; i++) {
0287 if (!cancelled_.load(std::memory_order_relaxed)) {
0288 t = q.PopFront();
0289 }
0290 }
0291 if (!t.f) {
0292 if (!WaitForWork(waiter, &t)) {
0293 return;
0294 }
0295 }
0296 if (t.f) {
0297 env_.ExecuteTask(t);
0298 }
0299 }
0300 } else {
0301 while (!cancelled_) {
0302 Task t = q.PopFront();
0303 if (!t.f) {
0304 t = LocalSteal();
0305 if (!t.f) {
0306 t = GlobalSteal();
0307 if (!t.f) {
0308
0309 if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) {
0310 for (int i = 0; i < spin_count && !t.f; i++) {
0311 if (!cancelled_.load(std::memory_order_relaxed)) {
0312 t = GlobalSteal();
0313 } else {
0314 return;
0315 }
0316 }
0317 spinning_ = false;
0318 }
0319 if (!t.f) {
0320 if (!WaitForWork(waiter, &t)) {
0321 return;
0322 }
0323 }
0324 }
0325 }
0326 }
0327 if (t.f) {
0328 env_.ExecuteTask(t);
0329 }
0330 }
0331 }
0332 }
0333
0334
0335
0336 Task Steal(unsigned start, unsigned limit) {
0337 PerThread* pt = GetPerThread();
0338 const size_t size = limit - start;
0339 unsigned r = Rand(&pt->rand);
0340
0341
0342 eigen_plain_assert(all_coprimes_[size - 1].size() < (1<<30));
0343 unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32;
0344 unsigned index = ((uint64_t) all_coprimes_[size - 1].size() * (uint64_t)r) >> 32;
0345 unsigned inc = all_coprimes_[size - 1][index];
0346
0347 for (unsigned i = 0; i < size; i++) {
0348 eigen_plain_assert(start + victim < limit);
0349 Task t = thread_data_[start + victim].queue.PopBack();
0350 if (t.f) {
0351 return t;
0352 }
0353 victim += inc;
0354 if (victim >= size) {
0355 victim -= size;
0356 }
0357 }
0358 return Task();
0359 }
0360
0361
0362 Task LocalSteal() {
0363 PerThread* pt = GetPerThread();
0364 unsigned partition = GetStealPartition(pt->thread_id);
0365
0366
0367 if (global_steal_partition_ == partition) return Task();
0368 unsigned start, limit;
0369 DecodePartition(partition, &start, &limit);
0370 AssertBounds(start, limit);
0371
0372 return Steal(start, limit);
0373 }
0374
0375
0376 Task GlobalSteal() {
0377 return Steal(0, num_threads_);
0378 }
0379
0380
0381
0382
0383
0384 bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
0385 eigen_plain_assert(!t->f);
0386
0387
0388 ec_.Prewait();
0389
0390 int victim = NonEmptyQueueIndex();
0391 if (victim != -1) {
0392 ec_.CancelWait();
0393 if (cancelled_) {
0394 return false;
0395 } else {
0396 *t = thread_data_[victim].queue.PopBack();
0397 return true;
0398 }
0399 }
0400
0401
0402
0403 blocked_++;
0404
0405 if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
0406 ec_.CancelWait();
0407
0408
0409
0410
0411
0412 if (NonEmptyQueueIndex() != -1) {
0413
0414
0415
0416
0417
0418
0419 blocked_--;
0420 return true;
0421 }
0422
0423 ec_.Notify(true);
0424 return false;
0425 }
0426 ec_.CommitWait(waiter);
0427 blocked_--;
0428 return true;
0429 }
0430
0431 int NonEmptyQueueIndex() {
0432 PerThread* pt = GetPerThread();
0433
0434
0435
0436 const size_t size = thread_data_.size();
0437 unsigned r = Rand(&pt->rand);
0438 unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
0439 unsigned victim = r % size;
0440 for (unsigned i = 0; i < size; i++) {
0441 if (!thread_data_[victim].queue.Empty()) {
0442 return victim;
0443 }
0444 victim += inc;
0445 if (victim >= size) {
0446 victim -= size;
0447 }
0448 }
0449 return -1;
0450 }
0451
0452 static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
0453 return std::hash<std::thread::id>()(std::this_thread::get_id());
0454 }
0455
0456 EIGEN_STRONG_INLINE PerThread* GetPerThread() {
0457 #ifndef EIGEN_THREAD_LOCAL
0458 static PerThread dummy;
0459 auto it = per_thread_map_.find(GlobalThreadIdHash());
0460 if (it == per_thread_map_.end()) {
0461 return &dummy;
0462 } else {
0463 return it->second.get();
0464 }
0465 #else
0466 EIGEN_THREAD_LOCAL PerThread per_thread_;
0467 PerThread* pt = &per_thread_;
0468 return pt;
0469 #endif
0470 }
0471
0472 static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
0473 uint64_t current = *state;
0474
0475 *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
0476
0477 return static_cast<unsigned>((current ^ (current >> 22)) >>
0478 (22 + (current >> 61)));
0479 }
0480 };
0481
0482 typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool;
0483
0484 }
0485
0486 #endif