File indexing completed on 2025-09-18 09:13:52
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026 #pragma once
0027
0028 #include "PTL/Macros.hh"
0029 #include "PTL/TaskSubQueue.hh"
0030 #include "PTL/Types.hh"
0031 #include "PTL/VUserTaskQueue.hh"
0032
0033 #include <atomic>
0034 #include <cstdint>
0035 #include <memory>
0036 #include <random>
0037 #include <vector>
0038
0039 namespace PTL
0040 {
0041 class ThreadData;
0042 class ThreadPool;
0043 class VTask;
0044
0045 class UserTaskQueue : public VUserTaskQueue
0046 {
0047 public:
0048 using task_pointer = std::shared_ptr<VTask>;
0049 using TaskSubQueueContainer = std::vector<TaskSubQueue*>;
0050 using random_engine_t = std::default_random_engine;
0051 using int_dist_t = std::uniform_int_distribution<int>;
0052
0053 public:
0054
0055 UserTaskQueue(intmax_t nworkers = -1, UserTaskQueue* = nullptr);
0056
0057
0058 ~UserTaskQueue() override;
0059
0060 public:
0061
0062 task_pointer GetTask(intmax_t subq = -1, intmax_t nitr = -1) override;
0063
0064 intmax_t InsertTask(task_pointer&&, ThreadData* = nullptr,
0065 intmax_t subq = -1) override PTL_NO_SANITIZE_THREAD;
0066
0067
0068 task_pointer GetThreadBinTask();
0069
0070
0071 void Wait() override {}
0072 void resize(intmax_t) override;
0073
0074 bool empty() const override;
0075 size_type size() const override;
0076
0077 size_type bin_size(size_type bin) const override;
0078 bool bin_empty(size_type bin) const override;
0079
0080 bool true_empty() const override;
0081 size_type true_size() const override;
0082
0083 void ExecuteOnAllThreads(ThreadPool* tp, function_type f) override;
0084
0085 void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool* tp,
0086 function_type f) override;
0087
0088 VUserTaskQueue* clone() override;
0089
0090 intmax_t GetThreadBin() const override;
0091
0092 protected:
0093 intmax_t GetInsertBin() const;
0094
0095 private:
0096 void AcquireHold();
0097 void ReleaseHold();
0098
0099 private:
0100 bool m_is_clone;
0101 intmax_t m_thread_bin;
0102 mutable intmax_t m_insert_bin;
0103 std::atomic_bool* m_hold = nullptr;
0104 std::atomic_uintmax_t* m_ntasks = nullptr;
0105 Mutex* m_mutex = nullptr;
0106 TaskSubQueueContainer* m_subqueues = nullptr;
0107 std::vector<int> m_rand_list = {};
0108 std::vector<int>::iterator m_rand_itr = {};
0109 };
0110
0111
0112
0113 inline bool
0114 UserTaskQueue::empty() const
0115 {
0116 return (m_ntasks->load(std::memory_order_relaxed) == 0);
0117 }
0118
0119
0120
0121 inline UserTaskQueue::size_type
0122 UserTaskQueue::size() const
0123 {
0124 return m_ntasks->load(std::memory_order_relaxed);
0125 }
0126
0127
0128
0129 inline UserTaskQueue::size_type
0130 UserTaskQueue::bin_size(size_type bin) const
0131 {
0132 return (*m_subqueues)[bin]->size();
0133 }
0134
0135
0136
0137 inline bool
0138 UserTaskQueue::bin_empty(size_type bin) const
0139 {
0140 return (*m_subqueues)[bin]->empty();
0141 }
0142
0143
0144
0145 inline bool
0146 UserTaskQueue::true_empty() const
0147 {
0148 for(const auto& itr : *m_subqueues)
0149 if(!itr->empty())
0150 return false;
0151 return true;
0152 }
0153
0154
0155
0156 inline UserTaskQueue::size_type
0157 UserTaskQueue::true_size() const
0158 {
0159 size_type _n = 0;
0160 for(const auto& itr : *m_subqueues)
0161 _n += itr->size();
0162 return _n;
0163 }
0164
0165
0166 }