File indexing completed on 2025-02-21 10:09:51
0001
0002
0003
0004 #ifndef QTCONCURRENT_ITERATEKERNEL_H
0005 #define QTCONCURRENT_ITERATEKERNEL_H
0006
0007 #include <QtConcurrent/qtconcurrent_global.h>
0008
0009 #if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
0010
0011 #include <QtCore/qatomic.h>
0012 #include <QtConcurrent/qtconcurrentmedian.h>
0013 #include <QtConcurrent/qtconcurrentthreadengine.h>
0014
0015 #include <iterator>
0016
0017 QT_BEGIN_NAMESPACE
0018
0019
0020
0021 namespace QtConcurrent {
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031 class Q_CONCURRENT_EXPORT BlockSizeManager
0032 {
0033 public:
0034 explicit BlockSizeManager(QThreadPool *pool, int iterationCount);
0035
0036 void timeBeforeUser();
0037 void timeAfterUser();
0038 int blockSize();
0039
0040 private:
0041 inline bool blockSizeMaxed()
0042 {
0043 return (m_blockSize >= maxBlockSize);
0044 }
0045
0046 const int maxBlockSize;
0047 qint64 beforeUser;
0048 qint64 afterUser;
0049 Median controlPartElapsed;
0050 Median userPartElapsed;
0051 int m_blockSize;
0052
0053 Q_DISABLE_COPY(BlockSizeManager)
0054 };
0055
0056 template <typename T>
0057 class ResultReporter
0058 {
0059 public:
0060 ResultReporter(ThreadEngine<T> *_threadEngine, T &_defaultValue)
0061 : threadEngine(_threadEngine), defaultValue(_defaultValue)
0062 {
0063 }
0064
0065 void reserveSpace(int resultCount)
0066 {
0067 currentResultCount = resultCount;
0068 resizeList(qMax(resultCount, vector.size()));
0069 }
0070
0071 void reportResults(int begin)
0072 {
0073 const int useVectorThreshold = 4;
0074 if (currentResultCount > useVectorThreshold) {
0075 resizeList(currentResultCount);
0076 threadEngine->reportResults(vector, begin);
0077 } else {
0078 for (int i = 0; i < currentResultCount; ++i)
0079 threadEngine->reportResult(&vector.at(i), begin + i);
0080 }
0081 }
0082
0083 inline T * getPointer()
0084 {
0085 return vector.data();
0086 }
0087
0088 int currentResultCount = 0;
0089 ThreadEngine<T> *threadEngine;
0090 QList<T> vector;
0091
0092 private:
0093 void resizeList(qsizetype size)
0094 {
0095 if constexpr (std::is_default_constructible_v<T>)
0096 vector.resize(size);
0097 else
0098 vector.resize(size, defaultValue);
0099 }
0100
0101 T &defaultValue;
0102 };
0103
0104 template <>
0105 class ResultReporter<void>
0106 {
0107 public:
0108 inline ResultReporter(ThreadEngine<void> *) { }
0109 inline void reserveSpace(int) { }
0110 inline void reportResults(int) { }
0111 inline void * getPointer() { return nullptr; }
0112 };
0113
0114 template<typename T>
0115 struct DefaultValueContainer
0116 {
0117 template<typename U = T>
0118 DefaultValueContainer(U &&_value) : value(std::forward<U>(_value))
0119 {
0120 }
0121
0122 T value;
0123 };
0124
0125 template<>
0126 struct DefaultValueContainer<void>
0127 {
0128 };
0129
0130 inline bool selectIteration(std::bidirectional_iterator_tag)
0131 {
0132 return false;
0133 }
0134
0135 inline bool selectIteration(std::forward_iterator_tag)
0136 {
0137 return false;
0138 }
0139
0140 inline bool selectIteration(std::random_access_iterator_tag)
0141 {
0142 return true;
0143 }
0144
0145 template <typename Iterator, typename T>
0146 class IterateKernel : public ThreadEngine<T>
0147 {
0148 using IteratorCategory = typename std::iterator_traits<Iterator>::iterator_category;
0149
0150 public:
0151 typedef T ResultType;
0152
0153 template<typename U = T, std::enable_if_t<std::is_same_v<U, void>, bool> = true>
0154 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
0155 : ThreadEngine<U>(pool),
0156 begin(_begin),
0157 end(_end),
0158 current(_begin),
0159 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
0160 forIteration(selectIteration(IteratorCategory())),
0161 progressReportingEnabled(true)
0162 {
0163 }
0164
0165 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
0166 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
0167 : ThreadEngine<U>(pool),
0168 begin(_begin),
0169 end(_end),
0170 current(_begin),
0171 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
0172 forIteration(selectIteration(IteratorCategory())),
0173 progressReportingEnabled(true),
0174 defaultValue(U())
0175 {
0176 }
0177
0178 template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
0179 IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
0180 : ThreadEngine<U>(pool),
0181 begin(_begin),
0182 end(_end),
0183 current(_begin),
0184 iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
0185 forIteration(selectIteration(IteratorCategory())),
0186 progressReportingEnabled(true),
0187 defaultValue(std::forward<U>(_defaultValue))
0188 {
0189 }
0190
0191 virtual ~IterateKernel() { }
0192
0193 virtual bool runIteration(Iterator, int , T *) { return false; }
0194 virtual bool runIterations(Iterator, int, int, T *) { return false; }
0195
0196 void start() override
0197 {
0198 progressReportingEnabled = this->isProgressReportingEnabled();
0199 if (progressReportingEnabled && iterationCount > 0)
0200 this->setProgressRange(0, iterationCount);
0201 }
0202
0203 bool shouldStartThread() override
0204 {
0205 if (forIteration)
0206 return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
0207 else
0208 return (iteratorThreads.loadRelaxed() == 0);
0209 }
0210
0211 ThreadFunctionResult threadFunction() override
0212 {
0213 if (forIteration)
0214 return this->forThreadFunction();
0215 else
0216 return this->whileThreadFunction();
0217 }
0218
0219 ThreadFunctionResult forThreadFunction()
0220 {
0221 BlockSizeManager blockSizeManager(ThreadEngineBase::threadPool, iterationCount);
0222 ResultReporter<T> resultReporter = createResultsReporter();
0223
0224 for(;;) {
0225 if (this->isCanceled())
0226 break;
0227
0228 const int currentBlockSize = blockSizeManager.blockSize();
0229
0230 if (currentIndex.loadRelaxed() >= iterationCount)
0231 break;
0232
0233
0234 const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
0235 const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
0236
0237 if (beginIndex >= endIndex) {
0238
0239 break;
0240 }
0241
0242 this->waitForResume();
0243
0244 if (shouldStartThread())
0245 this->startThread();
0246
0247 const int finalBlockSize = endIndex - beginIndex;
0248 resultReporter.reserveSpace(finalBlockSize);
0249
0250
0251 blockSizeManager.timeBeforeUser();
0252 const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
0253 blockSizeManager.timeAfterUser();
0254
0255 if (resultsAvailable)
0256 resultReporter.reportResults(beginIndex);
0257
0258
0259 if (progressReportingEnabled) {
0260 completed.fetchAndAddAcquire(finalBlockSize);
0261 this->setProgressValue(this->completed.loadRelaxed());
0262 }
0263
0264 if (this->shouldThrottleThread())
0265 return ThrottleThread;
0266 }
0267 return ThreadFinished;
0268 }
0269
0270 ThreadFunctionResult whileThreadFunction()
0271 {
0272 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
0273 return ThreadFinished;
0274
0275 ResultReporter<T> resultReporter = createResultsReporter();
0276 resultReporter.reserveSpace(1);
0277
0278 while (current != end) {
0279
0280
0281
0282 Iterator prev = current;
0283 ++current;
0284 int index = currentIndex.fetchAndAddRelaxed(1);
0285 iteratorThreads.testAndSetRelease(1, 0);
0286
0287 this->waitForResume();
0288
0289 if (shouldStartThread())
0290 this->startThread();
0291
0292 const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
0293 if (resultAavailable)
0294 resultReporter.reportResults(index);
0295
0296 if (this->shouldThrottleThread())
0297 return ThrottleThread;
0298
0299 if (iteratorThreads.testAndSetAcquire(0, 1) == false)
0300 return ThreadFinished;
0301 }
0302
0303 return ThreadFinished;
0304 }
0305
0306 private:
0307 ResultReporter<T> createResultsReporter()
0308 {
0309 if constexpr (!std::is_same_v<T, void>)
0310 return ResultReporter<T>(this, defaultValue.value);
0311 else
0312 return ResultReporter<T>(this);
0313 }
0314
0315 public:
0316 const Iterator begin;
0317 const Iterator end;
0318 Iterator current;
0319 QAtomicInt currentIndex;
0320 QAtomicInt iteratorThreads;
0321 QAtomicInt completed;
0322 const int iterationCount;
0323 const bool forIteration;
0324 bool progressReportingEnabled;
0325 DefaultValueContainer<ResultType> defaultValue;
0326 };
0327
0328 }
0329
0330
0331 QT_END_NAMESPACE
0332
0333 #endif
0334
0335 #endif