Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/eigen3/unsupported/Eigen/CXX11/src/ThreadPool/NonBlockingThreadPool.h was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 // This file is part of Eigen, a lightweight C++ template library
0002 // for linear algebra.
0003 //
0004 // Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
0005 //
0006 // This Source Code Form is subject to the terms of the Mozilla
0007 // Public License v. 2.0. If a copy of the MPL was not distributed
0008 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
0009 
0010 #ifndef EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
0011 #define EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H
0012 
0013 namespace Eigen {
0014 
0015 template <typename Environment>
0016 class ThreadPoolTempl : public Eigen::ThreadPoolInterface {
0017  public:
0018   typedef typename Environment::Task Task;
0019   typedef RunQueue<Task, 1024> Queue;
0020 
0021   ThreadPoolTempl(int num_threads, Environment env = Environment())
0022       : ThreadPoolTempl(num_threads, true, env) {}
0023 
0024   ThreadPoolTempl(int num_threads, bool allow_spinning,
0025                   Environment env = Environment())
0026       : env_(env),
0027         num_threads_(num_threads),
0028         allow_spinning_(allow_spinning),
0029         thread_data_(num_threads),
0030         all_coprimes_(num_threads),
0031         waiters_(num_threads),
0032         global_steal_partition_(EncodePartition(0, num_threads_)),
0033         blocked_(0),
0034         spinning_(0),
0035         done_(false),
0036         cancelled_(false),
0037         ec_(waiters_) {
0038     waiters_.resize(num_threads_);
0039     // Calculate coprimes of all numbers [1, num_threads].
0040     // Coprimes are used for random walks over all threads in Steal
0041     // and NonEmptyQueueIndex. Iteration is based on the fact that if we take
0042     // a random starting thread index t and calculate num_threads - 1 subsequent
0043     // indices as (t + coprime) % num_threads, we will cover all threads without
0044     // repetitions (effectively getting a presudo-random permutation of thread
0045     // indices).
0046     eigen_plain_assert(num_threads_ < kMaxThreads);
0047     for (int i = 1; i <= num_threads_; ++i) {
0048       all_coprimes_.emplace_back(i);
0049       ComputeCoprimes(i, &all_coprimes_.back());
0050     }
0051 #ifndef EIGEN_THREAD_LOCAL
0052     init_barrier_.reset(new Barrier(num_threads_));
0053 #endif
0054     thread_data_.resize(num_threads_);
0055     for (int i = 0; i < num_threads_; i++) {
0056       SetStealPartition(i, EncodePartition(0, num_threads_));
0057       thread_data_[i].thread.reset(
0058           env_.CreateThread([this, i]() { WorkerLoop(i); }));
0059     }
0060 #ifndef EIGEN_THREAD_LOCAL
0061     // Wait for workers to initialize per_thread_map_. Otherwise we might race
0062     // with them in Schedule or CurrentThreadId.
0063     init_barrier_->Wait();
0064 #endif
0065   }
0066 
0067   ~ThreadPoolTempl() {
0068     done_ = true;
0069 
0070     // Now if all threads block without work, they will start exiting.
0071     // But note that threads can continue to work arbitrary long,
0072     // block, submit new work, unblock and otherwise live full life.
0073     if (!cancelled_) {
0074       ec_.Notify(true);
0075     } else {
0076       // Since we were cancelled, there might be entries in the queues.
0077       // Empty them to prevent their destructor from asserting.
0078       for (size_t i = 0; i < thread_data_.size(); i++) {
0079         thread_data_[i].queue.Flush();
0080       }
0081     }
0082     // Join threads explicitly (by destroying) to avoid destruction order within
0083     // this class.
0084     for (size_t i = 0; i < thread_data_.size(); ++i)
0085       thread_data_[i].thread.reset();
0086   }
0087 
0088   void SetStealPartitions(const std::vector<std::pair<unsigned, unsigned>>& partitions) {
0089     eigen_plain_assert(partitions.size() == static_cast<std::size_t>(num_threads_));
0090 
0091     // Pass this information to each thread queue.
0092     for (int i = 0; i < num_threads_; i++) {
0093       const auto& pair = partitions[i];
0094       unsigned start = pair.first, end = pair.second;
0095       AssertBounds(start, end);
0096       unsigned val = EncodePartition(start, end);
0097       SetStealPartition(i, val);
0098     }
0099   }
0100 
0101   void Schedule(std::function<void()> fn) EIGEN_OVERRIDE {
0102     ScheduleWithHint(std::move(fn), 0, num_threads_);
0103   }
0104 
0105   void ScheduleWithHint(std::function<void()> fn, int start,
0106                         int limit) override {
0107     Task t = env_.CreateTask(std::move(fn));
0108     PerThread* pt = GetPerThread();
0109     if (pt->pool == this) {
0110       // Worker thread of this pool, push onto the thread's queue.
0111       Queue& q = thread_data_[pt->thread_id].queue;
0112       t = q.PushFront(std::move(t));
0113     } else {
0114       // A free-standing thread (or worker of another pool), push onto a random
0115       // queue.
0116       eigen_plain_assert(start < limit);
0117       eigen_plain_assert(limit <= num_threads_);
0118       int num_queues = limit - start;
0119       int rnd = Rand(&pt->rand) % num_queues;
0120       eigen_plain_assert(start + rnd < limit);
0121       Queue& q = thread_data_[start + rnd].queue;
0122       t = q.PushBack(std::move(t));
0123     }
0124     // Note: below we touch this after making w available to worker threads.
0125     // Strictly speaking, this can lead to a racy-use-after-free. Consider that
0126     // Schedule is called from a thread that is neither main thread nor a worker
0127     // thread of this pool. Then, execution of w directly or indirectly
0128     // completes overall computations, which in turn leads to destruction of
0129     // this. We expect that such scenario is prevented by program, that is,
0130     // this is kept alive while any threads can potentially be in Schedule.
0131     if (!t.f) {
0132       ec_.Notify(false);
0133     } else {
0134       env_.ExecuteTask(t);  // Push failed, execute directly.
0135     }
0136   }
0137 
0138   void Cancel() EIGEN_OVERRIDE {
0139     cancelled_ = true;
0140     done_ = true;
0141 
0142     // Let each thread know it's been cancelled.
0143 #ifdef EIGEN_THREAD_ENV_SUPPORTS_CANCELLATION
0144     for (size_t i = 0; i < thread_data_.size(); i++) {
0145       thread_data_[i].thread->OnCancel();
0146     }
0147 #endif
0148 
0149     // Wake up the threads without work to let them exit on their own.
0150     ec_.Notify(true);
0151   }
0152 
0153   int NumThreads() const EIGEN_FINAL { return num_threads_; }
0154 
0155   int CurrentThreadId() const EIGEN_FINAL {
0156     const PerThread* pt = const_cast<ThreadPoolTempl*>(this)->GetPerThread();
0157     if (pt->pool == this) {
0158       return pt->thread_id;
0159     } else {
0160       return -1;
0161     }
0162   }
0163 
0164  private:
0165   // Create a single atomic<int> that encodes start and limit information for
0166   // each thread.
0167   // We expect num_threads_ < 65536, so we can store them in a single
0168   // std::atomic<unsigned>.
0169   // Exposed publicly as static functions so that external callers can reuse
0170   // this encode/decode logic for maintaining their own thread-safe copies of
0171   // scheduling and steal domain(s).
0172   static const int kMaxPartitionBits = 16;
0173   static const int kMaxThreads = 1 << kMaxPartitionBits;
0174 
0175   inline unsigned EncodePartition(unsigned start, unsigned limit) {
0176     return (start << kMaxPartitionBits) | limit;
0177   }
0178 
0179   inline void DecodePartition(unsigned val, unsigned* start, unsigned* limit) {
0180     *limit = val & (kMaxThreads - 1);
0181     val >>= kMaxPartitionBits;
0182     *start = val;
0183   }
0184 
0185   void AssertBounds(int start, int end) {
0186     eigen_plain_assert(start >= 0);
0187     eigen_plain_assert(start < end);  // non-zero sized partition
0188     eigen_plain_assert(end <= num_threads_);
0189   }
0190 
0191   inline void SetStealPartition(size_t i, unsigned val) {
0192     thread_data_[i].steal_partition.store(val, std::memory_order_relaxed);
0193   }
0194 
0195   inline unsigned GetStealPartition(int i) {
0196     return thread_data_[i].steal_partition.load(std::memory_order_relaxed);
0197   }
0198 
0199   void ComputeCoprimes(int N, MaxSizeVector<unsigned>* coprimes) {
0200     for (int i = 1; i <= N; i++) {
0201       unsigned a = i;
0202       unsigned b = N;
0203       // If GCD(a, b) == 1, then a and b are coprimes.
0204       while (b != 0) {
0205         unsigned tmp = a;
0206         a = b;
0207         b = tmp % b;
0208       }
0209       if (a == 1) {
0210         coprimes->push_back(i);
0211       }
0212     }
0213   }
0214 
0215   typedef typename Environment::EnvThread Thread;
0216 
0217   struct PerThread {
0218     constexpr PerThread() : pool(NULL), rand(0), thread_id(-1) {}
0219     ThreadPoolTempl* pool;  // Parent pool, or null for normal threads.
0220     uint64_t rand;          // Random generator state.
0221     int thread_id;          // Worker thread index in pool.
0222 #ifndef EIGEN_THREAD_LOCAL
0223     // Prevent false sharing.
0224     char pad_[128];
0225 #endif
0226   };
0227 
0228   struct ThreadData {
0229     constexpr ThreadData() : thread(), steal_partition(0), queue() {}
0230     std::unique_ptr<Thread> thread;
0231     std::atomic<unsigned> steal_partition;
0232     Queue queue;
0233   };
0234 
0235   Environment env_;
0236   const int num_threads_;
0237   const bool allow_spinning_;
0238   MaxSizeVector<ThreadData> thread_data_;
0239   MaxSizeVector<MaxSizeVector<unsigned>> all_coprimes_;
0240   MaxSizeVector<EventCount::Waiter> waiters_;
0241   unsigned global_steal_partition_;
0242   std::atomic<unsigned> blocked_;
0243   std::atomic<bool> spinning_;
0244   std::atomic<bool> done_;
0245   std::atomic<bool> cancelled_;
0246   EventCount ec_;
0247 #ifndef EIGEN_THREAD_LOCAL
0248   std::unique_ptr<Barrier> init_barrier_;
0249   std::mutex per_thread_map_mutex_;  // Protects per_thread_map_.
0250   std::unordered_map<uint64_t, std::unique_ptr<PerThread>> per_thread_map_;
0251 #endif
0252 
0253   // Main worker thread loop.
0254   void WorkerLoop(int thread_id) {
0255 #ifndef EIGEN_THREAD_LOCAL
0256     std::unique_ptr<PerThread> new_pt(new PerThread());
0257     per_thread_map_mutex_.lock();
0258     bool insertOK = per_thread_map_.emplace(GlobalThreadIdHash(), std::move(new_pt)).second;
0259     eigen_plain_assert(insertOK);
0260     EIGEN_UNUSED_VARIABLE(insertOK);
0261     per_thread_map_mutex_.unlock();
0262     init_barrier_->Notify();
0263     init_barrier_->Wait();
0264 #endif
0265     PerThread* pt = GetPerThread();
0266     pt->pool = this;
0267     pt->rand = GlobalThreadIdHash();
0268     pt->thread_id = thread_id;
0269     Queue& q = thread_data_[thread_id].queue;
0270     EventCount::Waiter* waiter = &waiters_[thread_id];
0271     // TODO(dvyukov,rmlarsen): The time spent in NonEmptyQueueIndex() is
0272     // proportional to num_threads_ and we assume that new work is scheduled at
0273     // a constant rate, so we set spin_count to 5000 / num_threads_. The
0274     // constant was picked based on a fair dice roll, tune it.
0275     const int spin_count =
0276         allow_spinning_ && num_threads_ > 0 ? 5000 / num_threads_ : 0;
0277     if (num_threads_ == 1) {
0278       // For num_threads_ == 1 there is no point in going through the expensive
0279       // steal loop. Moreover, since NonEmptyQueueIndex() calls PopBack() on the
0280       // victim queues it might reverse the order in which ops are executed
0281       // compared to the order in which they are scheduled, which tends to be
0282       // counter-productive for the types of I/O workloads the single thread
0283       // pools tend to be used for.
0284       while (!cancelled_) {
0285         Task t = q.PopFront();
0286         for (int i = 0; i < spin_count && !t.f; i++) {
0287           if (!cancelled_.load(std::memory_order_relaxed)) {
0288             t = q.PopFront();
0289           }
0290         }
0291         if (!t.f) {
0292           if (!WaitForWork(waiter, &t)) {
0293             return;
0294           }
0295         }
0296         if (t.f) {
0297           env_.ExecuteTask(t);
0298         }
0299       }
0300     } else {
0301       while (!cancelled_) {
0302         Task t = q.PopFront();
0303         if (!t.f) {
0304           t = LocalSteal();
0305           if (!t.f) {
0306             t = GlobalSteal();
0307             if (!t.f) {
0308               // Leave one thread spinning. This reduces latency.
0309               if (allow_spinning_ && !spinning_ && !spinning_.exchange(true)) {
0310                 for (int i = 0; i < spin_count && !t.f; i++) {
0311                   if (!cancelled_.load(std::memory_order_relaxed)) {
0312                     t = GlobalSteal();
0313                   } else {
0314                     return;
0315                   }
0316                 }
0317                 spinning_ = false;
0318               }
0319               if (!t.f) {
0320                 if (!WaitForWork(waiter, &t)) {
0321                   return;
0322                 }
0323               }
0324             }
0325           }
0326         }
0327         if (t.f) {
0328           env_.ExecuteTask(t);
0329         }
0330       }
0331     }
0332   }
0333 
0334   // Steal tries to steal work from other worker threads in the range [start,
0335   // limit) in best-effort manner.
0336   Task Steal(unsigned start, unsigned limit) {
0337     PerThread* pt = GetPerThread();
0338     const size_t size = limit - start;
0339     unsigned r = Rand(&pt->rand);
0340     // Reduce r into [0, size) range, this utilizes trick from
0341     // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
0342     eigen_plain_assert(all_coprimes_[size - 1].size() < (1<<30));
0343     unsigned victim = ((uint64_t)r * (uint64_t)size) >> 32;
0344     unsigned index = ((uint64_t) all_coprimes_[size - 1].size() * (uint64_t)r) >> 32;
0345     unsigned inc = all_coprimes_[size - 1][index];
0346 
0347     for (unsigned i = 0; i < size; i++) {
0348       eigen_plain_assert(start + victim < limit);
0349       Task t = thread_data_[start + victim].queue.PopBack();
0350       if (t.f) {
0351         return t;
0352       }
0353       victim += inc;
0354       if (victim >= size) {
0355         victim -= size;
0356       }
0357     }
0358     return Task();
0359   }
0360 
0361   // Steals work within threads belonging to the partition.
0362   Task LocalSteal() {
0363     PerThread* pt = GetPerThread();
0364     unsigned partition = GetStealPartition(pt->thread_id);
0365     // If thread steal partition is the same as global partition, there is no
0366     // need to go through the steal loop twice.
0367     if (global_steal_partition_ == partition) return Task();
0368     unsigned start, limit;
0369     DecodePartition(partition, &start, &limit);
0370     AssertBounds(start, limit);
0371 
0372     return Steal(start, limit);
0373   }
0374 
0375   // Steals work from any other thread in the pool.
0376   Task GlobalSteal() {
0377     return Steal(0, num_threads_);
0378   }
0379 
0380 
0381   // WaitForWork blocks until new work is available (returns true), or if it is
0382   // time to exit (returns false). Can optionally return a task to execute in t
0383   // (in such case t.f != nullptr on return).
0384   bool WaitForWork(EventCount::Waiter* waiter, Task* t) {
0385     eigen_plain_assert(!t->f);
0386     // We already did best-effort emptiness check in Steal, so prepare for
0387     // blocking.
0388     ec_.Prewait();
0389     // Now do a reliable emptiness check.
0390     int victim = NonEmptyQueueIndex();
0391     if (victim != -1) {
0392       ec_.CancelWait();
0393       if (cancelled_) {
0394         return false;
0395       } else {
0396         *t = thread_data_[victim].queue.PopBack();
0397         return true;
0398       }
0399     }
0400     // Number of blocked threads is used as termination condition.
0401     // If we are shutting down and all worker threads blocked without work,
0402     // that's we are done.
0403     blocked_++;
0404     // TODO is blocked_ required to be unsigned?
0405     if (done_ && blocked_ == static_cast<unsigned>(num_threads_)) {
0406       ec_.CancelWait();
0407       // Almost done, but need to re-check queues.
0408       // Consider that all queues are empty and all worker threads are preempted
0409       // right after incrementing blocked_ above. Now a free-standing thread
0410       // submits work and calls destructor (which sets done_). If we don't
0411       // re-check queues, we will exit leaving the work unexecuted.
0412       if (NonEmptyQueueIndex() != -1) {
0413         // Note: we must not pop from queues before we decrement blocked_,
0414         // otherwise the following scenario is possible. Consider that instead
0415         // of checking for emptiness we popped the only element from queues.
0416         // Now other worker threads can start exiting, which is bad if the
0417         // work item submits other work. So we just check emptiness here,
0418         // which ensures that all worker threads exit at the same time.
0419         blocked_--;
0420         return true;
0421       }
0422       // Reached stable termination state.
0423       ec_.Notify(true);
0424       return false;
0425     }
0426     ec_.CommitWait(waiter);
0427     blocked_--;
0428     return true;
0429   }
0430 
0431   int NonEmptyQueueIndex() {
0432     PerThread* pt = GetPerThread();
0433     // We intentionally design NonEmptyQueueIndex to steal work from
0434     // anywhere in the queue so threads don't block in WaitForWork() forever
0435     // when all threads in their partition go to sleep. Steal is still local.
0436     const size_t size = thread_data_.size();
0437     unsigned r = Rand(&pt->rand);
0438     unsigned inc = all_coprimes_[size - 1][r % all_coprimes_[size - 1].size()];
0439     unsigned victim = r % size;
0440     for (unsigned i = 0; i < size; i++) {
0441       if (!thread_data_[victim].queue.Empty()) {
0442         return victim;
0443       }
0444       victim += inc;
0445       if (victim >= size) {
0446         victim -= size;
0447       }
0448     }
0449     return -1;
0450   }
0451 
0452   static EIGEN_STRONG_INLINE uint64_t GlobalThreadIdHash() {
0453     return std::hash<std::thread::id>()(std::this_thread::get_id());
0454   }
0455 
0456   EIGEN_STRONG_INLINE PerThread* GetPerThread() {
0457 #ifndef EIGEN_THREAD_LOCAL
0458     static PerThread dummy;
0459     auto it = per_thread_map_.find(GlobalThreadIdHash());
0460     if (it == per_thread_map_.end()) {
0461       return &dummy;
0462     } else {
0463       return it->second.get();
0464     }
0465 #else
0466     EIGEN_THREAD_LOCAL PerThread per_thread_;
0467     PerThread* pt = &per_thread_;
0468     return pt;
0469 #endif
0470   }
0471 
0472   static EIGEN_STRONG_INLINE unsigned Rand(uint64_t* state) {
0473     uint64_t current = *state;
0474     // Update the internal state
0475     *state = current * 6364136223846793005ULL + 0xda3e39cb94b95bdbULL;
0476     // Generate the random output (using the PCG-XSH-RS scheme)
0477     return static_cast<unsigned>((current ^ (current >> 22)) >>
0478                                  (22 + (current >> 61)));
0479   }
0480 };
0481 
0482 typedef ThreadPoolTempl<StlThreadEnvironment> ThreadPool;
0483 
0484 }  // namespace Eigen
0485 
0486 #endif  // EIGEN_CXX11_THREADPOOL_NONBLOCKING_THREAD_POOL_H