File indexing completed on 2025-02-21 10:09:51
0001
0002
0003
0004 #ifndef QTCONCURRENT_REDUCEKERNEL_H
0005 #define QTCONCURRENT_REDUCEKERNEL_H
0006
0007 #include <QtConcurrent/qtconcurrent_global.h>
0008
0009 #if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
0010
0011 #include <QtCore/qatomic.h>
0012 #include <QtCore/qlist.h>
0013 #include <QtCore/qmap.h>
0014 #include <QtCore/qmutex.h>
0015 #include <QtCore/qthread.h>
0016 #include <QtCore/qthreadpool.h>
0017
0018 #include <mutex>
0019
0020 QT_BEGIN_NAMESPACE
0021
0022 namespace QtPrivate {
0023
0024 template<typename Sequence>
0025 struct SequenceHolder
0026 {
0027 SequenceHolder(const Sequence &s) : sequence(s) { }
0028 SequenceHolder(Sequence &&s) : sequence(std::move(s)) { }
0029 Sequence sequence;
0030 };
0031
0032 }
0033
0034 namespace QtConcurrent {
0035
0036
0037
0038
0039
0040
0041
0042
0043 #ifdef Q_QDOC
0044 enum ReduceQueueLimits {
0045 ReduceQueueStartLimit = 20,
0046 ReduceQueueThrottleLimit = 30
0047 };
0048 #else
0049 enum {
0050 ReduceQueueStartLimit = 20,
0051 ReduceQueueThrottleLimit = 30
0052 };
0053 #endif
0054
0055
0056
0057
0058 template <typename T>
0059 class IntermediateResults
0060 {
0061 public:
0062 int begin, end;
0063 QList<T> vector;
0064 };
0065
0066 enum ReduceOption {
0067 UnorderedReduce = 0x1,
0068 OrderedReduce = 0x2,
0069 SequentialReduce = 0x4
0070
0071 };
0072 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
0073 #ifndef Q_QDOC
0074 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
0075 #endif
0076
0077 template <typename ReduceFunctor, typename ReduceResultType, typename T>
0078 class ReduceKernel
0079 {
0080 typedef QMap<int, IntermediateResults<T> > ResultsMap;
0081
0082 const ReduceOptions reduceOptions;
0083
0084 QMutex mutex;
0085 int progress, resultsMapSize;
0086 const int threadCount;
0087 ResultsMap resultsMap;
0088
0089 bool canReduce(int begin) const
0090 {
0091 return (((reduceOptions & UnorderedReduce)
0092 && progress == 0)
0093 || ((reduceOptions & OrderedReduce)
0094 && progress == begin));
0095 }
0096
0097 void reduceResult(ReduceFunctor &reduce,
0098 ReduceResultType &r,
0099 const IntermediateResults<T> &result)
0100 {
0101 for (int i = 0; i < result.vector.size(); ++i) {
0102 std::invoke(reduce, r, result.vector.at(i));
0103 }
0104 }
0105
0106 void reduceResults(ReduceFunctor &reduce,
0107 ReduceResultType &r,
0108 ResultsMap &map)
0109 {
0110 typename ResultsMap::iterator it = map.begin();
0111 while (it != map.end()) {
0112 reduceResult(reduce, r, it.value());
0113 ++it;
0114 }
0115 }
0116
0117 public:
0118 ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions)
0119 : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
0120 threadCount(std::max(pool->maxThreadCount(), 1))
0121 { }
0122
0123 void runReduce(ReduceFunctor &reduce,
0124 ReduceResultType &r,
0125 const IntermediateResults<T> &result)
0126 {
0127 std::unique_lock<QMutex> locker(mutex);
0128 if (!canReduce(result.begin)) {
0129 ++resultsMapSize;
0130 resultsMap.insert(result.begin, result);
0131 return;
0132 }
0133
0134 if (reduceOptions & UnorderedReduce) {
0135
0136 progress = -1;
0137
0138
0139 locker.unlock();
0140 reduceResult(reduce, r, result);
0141 locker.lock();
0142
0143
0144 while (!resultsMap.isEmpty()) {
0145 ResultsMap resultsMapCopy = resultsMap;
0146 resultsMap.clear();
0147
0148 locker.unlock();
0149 reduceResults(reduce, r, resultsMapCopy);
0150 locker.lock();
0151
0152 resultsMapSize -= resultsMapCopy.size();
0153 }
0154
0155 progress = 0;
0156 } else {
0157
0158 locker.unlock();
0159 reduceResult(reduce, r, result);
0160 locker.lock();
0161
0162
0163 progress += result.end - result.begin;
0164
0165
0166 typename ResultsMap::iterator it = resultsMap.begin();
0167 while (it != resultsMap.end()) {
0168 if (it.value().begin != progress)
0169 break;
0170
0171 locker.unlock();
0172 reduceResult(reduce, r, it.value());
0173 locker.lock();
0174
0175 --resultsMapSize;
0176 progress += it.value().end - it.value().begin;
0177 it = resultsMap.erase(it);
0178 }
0179 }
0180 }
0181
0182
0183 void finish(ReduceFunctor &reduce, ReduceResultType &r)
0184 {
0185 reduceResults(reduce, r, resultsMap);
0186 }
0187
0188 inline bool shouldThrottle()
0189 {
0190 std::lock_guard<QMutex> locker(mutex);
0191 return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
0192 }
0193
0194 inline bool shouldStartThread()
0195 {
0196 std::lock_guard<QMutex> locker(mutex);
0197 return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
0198 }
0199 };
0200
0201 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
0202 struct SequenceHolder2 : private QtPrivate::SequenceHolder<Sequence>, public Base
0203 {
0204 template<typename S = Sequence, typename F1 = Functor1, typename F2 = Functor2>
0205 SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
0206 ReduceOptions reduceOptions)
0207 : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
0208 Base(pool, this->sequence.cbegin(), this->sequence.cend(),
0209 std::forward<F1>(functor1), std::forward<F2>(functor2), reduceOptions)
0210 { }
0211
0212 template<typename InitialValueType, typename S = Sequence,
0213 typename F1 = Functor1, typename F2 = Functor2>
0214 SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
0215 InitialValueType &&initialValue, ReduceOptions reduceOptions)
0216 : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
0217 Base(pool, this->sequence.cbegin(), this->sequence.cend(),
0218 std::forward<F1>(functor1), std::forward<F2>(functor2),
0219 std::forward<InitialValueType>(initialValue), reduceOptions)
0220 { }
0221
0222 void finish() override
0223 {
0224 Base::finish();
0225
0226
0227 this->sequence = Sequence();
0228 }
0229 };
0230
0231 }
0232
0233 QT_END_NAMESPACE
0234
0235 #endif
0236
0237 #endif