File indexing completed on 2025-01-18 09:57:47
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026 #pragma once
0027
0028 #include "PTL/Globals.hh"
0029 #include "PTL/TaskSubQueue.hh"
0030 #include "PTL/Threading.hh"
0031 #include "PTL/VTask.hh"
0032 #include "PTL/VUserTaskQueue.hh"
0033
0034 #include <atomic>
0035 #include <cstdint>
0036 #include <memory>
0037 #include <random>
0038 #include <vector>
0039
0040 namespace PTL
0041 {
0042 class UserTaskQueue : public VUserTaskQueue
0043 {
0044 public:
0045 using task_pointer = std::shared_ptr<VTask>;
0046 using TaskSubQueueContainer = std::vector<TaskSubQueue*>;
0047 using random_engine_t = std::default_random_engine;
0048 using int_dist_t = std::uniform_int_distribution<int>;
0049
0050 public:
0051
0052 UserTaskQueue(intmax_t nworkers = -1, UserTaskQueue* = nullptr);
0053
0054
0055 ~UserTaskQueue() override;
0056
0057 public:
0058
0059 task_pointer GetTask(intmax_t subq = -1, intmax_t nitr = -1) override;
0060
0061 intmax_t InsertTask(task_pointer&&, ThreadData* = nullptr,
0062 intmax_t subq = -1) override PTL_NO_SANITIZE_THREAD;
0063
0064
0065 task_pointer GetThreadBinTask();
0066
0067
0068 void Wait() override {}
0069 void resize(intmax_t) override;
0070
0071 bool empty() const override;
0072 size_type size() const override;
0073
0074 size_type bin_size(size_type bin) const override;
0075 bool bin_empty(size_type bin) const override;
0076
0077 bool true_empty() const override;
0078 size_type true_size() const override;
0079
0080 void ExecuteOnAllThreads(ThreadPool* tp, function_type f) override;
0081
0082 void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp,
0083 function_type f) override;
0084
0085 VUserTaskQueue* clone() override;
0086
0087 intmax_t GetThreadBin() const override;
0088
0089 protected:
0090 intmax_t GetInsertBin() const;
0091
0092 private:
0093 void AcquireHold();
0094 void ReleaseHold();
0095
0096 private:
0097 bool m_is_clone;
0098 intmax_t m_thread_bin;
0099 mutable intmax_t m_insert_bin;
0100 std::atomic_bool* m_hold = nullptr;
0101 std::atomic_uintmax_t* m_ntasks = nullptr;
0102 Mutex* m_mutex = nullptr;
0103 TaskSubQueueContainer* m_subqueues = nullptr;
0104 std::vector<int> m_rand_list = {};
0105 std::vector<int>::iterator m_rand_itr = {};
0106 };
0107
0108
0109
0110 inline bool
0111 UserTaskQueue::empty() const
0112 {
0113 return (m_ntasks->load(std::memory_order_relaxed) == 0);
0114 }
0115
0116
0117
0118 inline UserTaskQueue::size_type
0119 UserTaskQueue::size() const
0120 {
0121 return m_ntasks->load(std::memory_order_relaxed);
0122 }
0123
0124
0125
0126 inline UserTaskQueue::size_type
0127 UserTaskQueue::bin_size(size_type bin) const
0128 {
0129 return (*m_subqueues)[bin]->size();
0130 }
0131
0132
0133
0134 inline bool
0135 UserTaskQueue::bin_empty(size_type bin) const
0136 {
0137 return (*m_subqueues)[bin]->empty();
0138 }
0139
0140
0141
0142 inline bool
0143 UserTaskQueue::true_empty() const
0144 {
0145 for(const auto& itr : *m_subqueues)
0146 if(!itr->empty())
0147 return false;
0148 return true;
0149 }
0150
0151
0152
0153 inline UserTaskQueue::size_type
0154 UserTaskQueue::true_size() const
0155 {
0156 size_type _n = 0;
0157 for(const auto& itr : *m_subqueues)
0158 _n += itr->size();
0159 return _n;
0160 }
0161
0162
0163 }