Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-02-21 10:09:51

0001 // Copyright (C) 2016 The Qt Company Ltd.
0002 // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
0003 
0004 #ifndef QTCONCURRENT_ITERATEKERNEL_H
0005 #define QTCONCURRENT_ITERATEKERNEL_H
0006 
0007 #include <QtConcurrent/qtconcurrent_global.h>
0008 
0009 #if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
0010 
0011 #include <QtCore/qatomic.h>
0012 #include <QtConcurrent/qtconcurrentmedian.h>
0013 #include <QtConcurrent/qtconcurrentthreadengine.h>
0014 
0015 #include <iterator>
0016 
0017 QT_BEGIN_NAMESPACE
0018 
0019 
0020 
0021 namespace QtConcurrent {
0022 
0023 /*
0024     The BlockSizeManager class manages how many iterations a thread should
0025     reserve and process at a time. This is done by measuring the time spent
0026     in the user code versus the control part code, and then increasing
0027     the block size if the ratio between them is to small. The block size
0028     management is done on the basis of the median of several timing measurements,
0029     and it is done individually for each thread.
0030 */
0031 class Q_CONCURRENT_EXPORT BlockSizeManager
0032 {
0033 public:
0034     explicit BlockSizeManager(QThreadPool *pool, int iterationCount);
0035 
0036     void timeBeforeUser();
0037     void timeAfterUser();
0038     int blockSize();
0039 
0040 private:
0041     inline bool blockSizeMaxed()
0042     {
0043         return (m_blockSize >= maxBlockSize);
0044     }
0045 
0046     const int maxBlockSize;
0047     qint64 beforeUser;
0048     qint64 afterUser;
0049     Median controlPartElapsed;
0050     Median userPartElapsed;
0051     int m_blockSize;
0052 
0053     Q_DISABLE_COPY(BlockSizeManager)
0054 };
0055 
0056 template <typename T>
0057 class ResultReporter
0058 {
0059 public:
0060     ResultReporter(ThreadEngine<T> *_threadEngine, T &_defaultValue)
0061         : threadEngine(_threadEngine), defaultValue(_defaultValue)
0062     {
0063     }
0064 
0065     void reserveSpace(int resultCount)
0066     {
0067         currentResultCount = resultCount;
0068         resizeList(qMax(resultCount, vector.size()));
0069     }
0070 
0071     void reportResults(int begin)
0072     {
0073         const int useVectorThreshold = 4; // Tunable parameter.
0074         if (currentResultCount > useVectorThreshold) {
0075             resizeList(currentResultCount);
0076             threadEngine->reportResults(vector, begin);
0077         } else {
0078             for (int i = 0; i < currentResultCount; ++i)
0079                 threadEngine->reportResult(&vector.at(i), begin + i);
0080         }
0081     }
0082 
0083     inline T * getPointer()
0084     {
0085         return vector.data();
0086     }
0087 
0088     int currentResultCount = 0;
0089     ThreadEngine<T> *threadEngine;
0090     QList<T> vector;
0091 
0092 private:
0093     void resizeList(qsizetype size)
0094     {
0095         if constexpr (std::is_default_constructible_v<T>)
0096             vector.resize(size);
0097         else
0098             vector.resize(size, defaultValue);
0099     }
0100 
0101     T &defaultValue;
0102 };
0103 
0104 template <>
0105 class ResultReporter<void>
0106 {
0107 public:
0108     inline ResultReporter(ThreadEngine<void> *) { }
0109     inline void reserveSpace(int) { }
0110     inline void reportResults(int) { }
0111     inline void * getPointer() { return nullptr; }
0112 };
0113 
0114 template<typename T>
0115 struct DefaultValueContainer
0116 {
0117     template<typename U = T>
0118     DefaultValueContainer(U &&_value) : value(std::forward<U>(_value))
0119     {
0120     }
0121 
0122     T value;
0123 };
0124 
0125 template<>
0126 struct DefaultValueContainer<void>
0127 {
0128 };
0129 
0130 inline bool selectIteration(std::bidirectional_iterator_tag)
0131 {
0132     return false; // while
0133 }
0134 
0135 inline bool selectIteration(std::forward_iterator_tag)
0136 {
0137     return false; // while
0138 }
0139 
0140 inline bool selectIteration(std::random_access_iterator_tag)
0141 {
0142     return true; // for
0143 }
0144 
0145 template <typename Iterator, typename T>
0146 class IterateKernel : public ThreadEngine<T>
0147 {
0148     using IteratorCategory = typename std::iterator_traits<Iterator>::iterator_category;
0149 
0150 public:
0151     typedef T ResultType;
0152 
0153     template<typename U = T, std::enable_if_t<std::is_same_v<U, void>, bool> = true>
0154     IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
0155         : ThreadEngine<U>(pool),
0156           begin(_begin),
0157           end(_end),
0158           current(_begin),
0159           iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
0160           forIteration(selectIteration(IteratorCategory())),
0161           progressReportingEnabled(true)
0162     {
0163     }
0164 
0165     template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
0166     IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end)
0167         : ThreadEngine<U>(pool),
0168           begin(_begin),
0169           end(_end),
0170           current(_begin),
0171           iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
0172           forIteration(selectIteration(IteratorCategory())),
0173           progressReportingEnabled(true),
0174           defaultValue(U())
0175     {
0176     }
0177 
0178     template<typename U = T, std::enable_if_t<!std::is_same_v<U, void>, bool> = true>
0179     IterateKernel(QThreadPool *pool, Iterator _begin, Iterator _end, U &&_defaultValue)
0180         : ThreadEngine<U>(pool),
0181           begin(_begin),
0182           end(_end),
0183           current(_begin),
0184           iterationCount(selectIteration(IteratorCategory()) ? static_cast<int>(std::distance(_begin, _end)) : 0),
0185           forIteration(selectIteration(IteratorCategory())),
0186           progressReportingEnabled(true),
0187           defaultValue(std::forward<U>(_defaultValue))
0188     {
0189     }
0190 
0191     virtual ~IterateKernel() { }
0192 
0193     virtual bool runIteration(Iterator, int , T *) { return false; }
0194     virtual bool runIterations(Iterator, int, int, T *) { return false; }
0195 
0196     void start() override
0197     {
0198         progressReportingEnabled = this->isProgressReportingEnabled();
0199         if (progressReportingEnabled && iterationCount > 0)
0200             this->setProgressRange(0, iterationCount);
0201     }
0202 
0203     bool shouldStartThread() override
0204     {
0205         if (forIteration)
0206             return (currentIndex.loadRelaxed() < iterationCount) && !this->shouldThrottleThread();
0207         else // whileIteration
0208             return (iteratorThreads.loadRelaxed() == 0);
0209     }
0210 
0211     ThreadFunctionResult threadFunction() override
0212     {
0213         if (forIteration)
0214             return this->forThreadFunction();
0215         else // whileIteration
0216             return this->whileThreadFunction();
0217     }
0218 
0219     ThreadFunctionResult forThreadFunction()
0220     {
0221         BlockSizeManager blockSizeManager(ThreadEngineBase::threadPool, iterationCount);
0222         ResultReporter<T> resultReporter = createResultsReporter();
0223 
0224         for(;;) {
0225             if (this->isCanceled())
0226                 break;
0227 
0228             const int currentBlockSize = blockSizeManager.blockSize();
0229 
0230             if (currentIndex.loadRelaxed() >= iterationCount)
0231                 break;
0232 
0233             // Atomically reserve a block of iterationCount for this thread.
0234             const int beginIndex = currentIndex.fetchAndAddRelease(currentBlockSize);
0235             const int endIndex = qMin(beginIndex + currentBlockSize, iterationCount);
0236 
0237             if (beginIndex >= endIndex) {
0238                 // No more work
0239                 break;
0240             }
0241 
0242             this->waitForResume(); // (only waits if the qfuture is paused.)
0243 
0244             if (shouldStartThread())
0245                 this->startThread();
0246 
0247             const int finalBlockSize = endIndex - beginIndex; // block size adjusted for possible end-of-range
0248             resultReporter.reserveSpace(finalBlockSize);
0249 
0250             // Call user code with the current iteration range.
0251             blockSizeManager.timeBeforeUser();
0252             const bool resultsAvailable = this->runIterations(begin, beginIndex, endIndex, resultReporter.getPointer());
0253             blockSizeManager.timeAfterUser();
0254 
0255             if (resultsAvailable)
0256                 resultReporter.reportResults(beginIndex);
0257 
0258             // Report progress if progress reporting enabled.
0259             if (progressReportingEnabled) {
0260                 completed.fetchAndAddAcquire(finalBlockSize);
0261                 this->setProgressValue(this->completed.loadRelaxed());
0262             }
0263 
0264             if (this->shouldThrottleThread())
0265                 return ThrottleThread;
0266         }
0267         return ThreadFinished;
0268     }
0269 
0270     ThreadFunctionResult whileThreadFunction()
0271     {
0272         if (iteratorThreads.testAndSetAcquire(0, 1) == false)
0273             return ThreadFinished;
0274 
0275         ResultReporter<T> resultReporter = createResultsReporter();
0276         resultReporter.reserveSpace(1);
0277 
0278         while (current != end) {
0279             // The following two lines breaks support for input iterators according to
0280             // the sgi docs: dereferencing prev after calling ++current is not allowed
0281             // on input iterators. (prev is dereferenced inside user.runIteration())
0282             Iterator prev = current;
0283             ++current;
0284             int index = currentIndex.fetchAndAddRelaxed(1);
0285             iteratorThreads.testAndSetRelease(1, 0);
0286 
0287             this->waitForResume(); // (only waits if the qfuture is paused.)
0288 
0289             if (shouldStartThread())
0290                 this->startThread();
0291 
0292             const bool resultAavailable = this->runIteration(prev, index, resultReporter.getPointer());
0293             if (resultAavailable)
0294                 resultReporter.reportResults(index);
0295 
0296             if (this->shouldThrottleThread())
0297                 return ThrottleThread;
0298 
0299             if (iteratorThreads.testAndSetAcquire(0, 1) == false)
0300                 return ThreadFinished;
0301         }
0302 
0303         return ThreadFinished;
0304     }
0305 
0306 private:
0307     ResultReporter<T> createResultsReporter()
0308     {
0309         if constexpr (!std::is_same_v<T, void>)
0310             return ResultReporter<T>(this, defaultValue.value);
0311         else
0312             return ResultReporter<T>(this);
0313     }
0314 
0315 public:
0316     const Iterator begin;
0317     const Iterator end;
0318     Iterator current;
0319     QAtomicInt currentIndex;
0320     QAtomicInt iteratorThreads;
0321     QAtomicInt completed;
0322     const int iterationCount;
0323     const bool forIteration;
0324     bool progressReportingEnabled;
0325     DefaultValueContainer<ResultType> defaultValue;
0326 };
0327 
0328 } // namespace QtConcurrent
0329 
0330 
0331 QT_END_NAMESPACE
0332 
0333 #endif // QT_NO_CONCURRENT
0334 
0335 #endif