Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-02-21 10:09:51

0001 // Copyright (C) 2016 The Qt Company Ltd.
0002 // SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only
0003 
0004 #ifndef QTCONCURRENT_REDUCEKERNEL_H
0005 #define QTCONCURRENT_REDUCEKERNEL_H
0006 
0007 #include <QtConcurrent/qtconcurrent_global.h>
0008 
0009 #if !defined(QT_NO_CONCURRENT) || defined(Q_QDOC)
0010 
0011 #include <QtCore/qatomic.h>
0012 #include <QtCore/qlist.h>
0013 #include <QtCore/qmap.h>
0014 #include <QtCore/qmutex.h>
0015 #include <QtCore/qthread.h>
0016 #include <QtCore/qthreadpool.h>
0017 
0018 #include <mutex>
0019 
0020 QT_BEGIN_NAMESPACE
0021 
0022 namespace QtPrivate {
0023 
0024 template<typename Sequence>
0025 struct SequenceHolder
0026 {
0027     SequenceHolder(const Sequence &s) : sequence(s) { }
0028     SequenceHolder(Sequence &&s) : sequence(std::move(s)) { }
0029     Sequence sequence;
0030 };
0031 
0032 }
0033 
0034 namespace QtConcurrent {
0035 
0036 /*
0037     The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
0038     limit the reduce queue size for MapReduce. When the number of
0039     reduce blocks in the queue exceeds ReduceQueueStartLimit,
0040     MapReduce won't start any new threads, and when it exceeds
0041     ReduceQueueThrottleLimit running threads will be stopped.
0042 */
0043 #ifdef Q_QDOC
0044 enum ReduceQueueLimits {
0045     ReduceQueueStartLimit = 20,
0046     ReduceQueueThrottleLimit = 30
0047 };
0048 #else
0049 enum {
0050     ReduceQueueStartLimit = 20,
0051     ReduceQueueThrottleLimit = 30
0052 };
0053 #endif
0054 
0055 // IntermediateResults holds a block of intermediate results from a
0056 // map or filter functor. The begin/end offsets indicates the origin
0057 // and range of the block.
0058 template <typename T>
0059 class IntermediateResults
0060 {
0061 public:
0062     int begin, end;
0063     QList<T> vector;
0064 };
0065 
0066 enum ReduceOption {
0067     UnorderedReduce = 0x1,
0068     OrderedReduce = 0x2,
0069     SequentialReduce = 0x4
0070     // ParallelReduce = 0x8
0071 };
0072 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
0073 #ifndef Q_QDOC
0074 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
0075 #endif
0076 // supports both ordered and out-of-order reduction
0077 template <typename ReduceFunctor, typename ReduceResultType, typename T>
0078 class ReduceKernel
0079 {
0080     typedef QMap<int, IntermediateResults<T> > ResultsMap;
0081 
0082     const ReduceOptions reduceOptions;
0083 
0084     QMutex mutex;
0085     int progress, resultsMapSize;
0086     const int threadCount;
0087     ResultsMap resultsMap;
0088 
0089     bool canReduce(int begin) const
0090     {
0091         return (((reduceOptions & UnorderedReduce)
0092                  && progress == 0)
0093                 || ((reduceOptions & OrderedReduce)
0094                     && progress == begin));
0095     }
0096 
0097     void reduceResult(ReduceFunctor &reduce,
0098                       ReduceResultType &r,
0099                       const IntermediateResults<T> &result)
0100     {
0101         for (int i = 0; i < result.vector.size(); ++i) {
0102             std::invoke(reduce, r, result.vector.at(i));
0103         }
0104     }
0105 
0106     void reduceResults(ReduceFunctor &reduce,
0107                        ReduceResultType &r,
0108                        ResultsMap &map)
0109     {
0110         typename ResultsMap::iterator it = map.begin();
0111         while (it != map.end()) {
0112             reduceResult(reduce, r, it.value());
0113             ++it;
0114         }
0115     }
0116 
0117 public:
0118     ReduceKernel(QThreadPool *pool, ReduceOptions _reduceOptions)
0119         : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
0120           threadCount(std::max(pool->maxThreadCount(), 1))
0121     { }
0122 
0123     void runReduce(ReduceFunctor &reduce,
0124                    ReduceResultType &r,
0125                    const IntermediateResults<T> &result)
0126     {
0127         std::unique_lock<QMutex> locker(mutex);
0128         if (!canReduce(result.begin)) {
0129             ++resultsMapSize;
0130             resultsMap.insert(result.begin, result);
0131             return;
0132         }
0133 
0134         if (reduceOptions & UnorderedReduce) {
0135             // UnorderedReduce
0136             progress = -1;
0137 
0138             // reduce this result
0139             locker.unlock();
0140             reduceResult(reduce, r, result);
0141             locker.lock();
0142 
0143             // reduce all stored results as well
0144             while (!resultsMap.isEmpty()) {
0145                 ResultsMap resultsMapCopy = resultsMap;
0146                 resultsMap.clear();
0147 
0148                 locker.unlock();
0149                 reduceResults(reduce, r, resultsMapCopy);
0150                 locker.lock();
0151 
0152                 resultsMapSize -= resultsMapCopy.size();
0153             }
0154 
0155             progress = 0;
0156         } else {
0157             // reduce this result
0158             locker.unlock();
0159             reduceResult(reduce, r, result);
0160             locker.lock();
0161 
0162             // OrderedReduce
0163             progress += result.end - result.begin;
0164 
0165             // reduce as many other results as possible
0166             typename ResultsMap::iterator it = resultsMap.begin();
0167             while (it != resultsMap.end()) {
0168                 if (it.value().begin != progress)
0169                     break;
0170 
0171                 locker.unlock();
0172                 reduceResult(reduce, r, it.value());
0173                 locker.lock();
0174 
0175                 --resultsMapSize;
0176                 progress += it.value().end - it.value().begin;
0177                 it = resultsMap.erase(it);
0178             }
0179         }
0180     }
0181 
0182     // final reduction
0183     void finish(ReduceFunctor &reduce, ReduceResultType &r)
0184     {
0185         reduceResults(reduce, r, resultsMap);
0186     }
0187 
0188     inline bool shouldThrottle()
0189     {
0190         std::lock_guard<QMutex> locker(mutex);
0191         return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
0192     }
0193 
0194     inline bool shouldStartThread()
0195     {
0196         std::lock_guard<QMutex> locker(mutex);
0197         return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
0198     }
0199 };
0200 
0201 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
0202 struct SequenceHolder2 : private QtPrivate::SequenceHolder<Sequence>, public Base
0203 {
0204     template<typename S = Sequence, typename F1 = Functor1, typename F2 = Functor2>
0205     SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
0206                     ReduceOptions reduceOptions)
0207         : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
0208           Base(pool, this->sequence.cbegin(), this->sequence.cend(),
0209                std::forward<F1>(functor1), std::forward<F2>(functor2), reduceOptions)
0210     { }
0211 
0212     template<typename InitialValueType, typename S = Sequence,
0213              typename F1 = Functor1, typename F2 = Functor2>
0214     SequenceHolder2(QThreadPool *pool, S &&_sequence, F1 &&functor1, F2 &&functor2,
0215                     InitialValueType &&initialValue, ReduceOptions reduceOptions)
0216         : QtPrivate::SequenceHolder<Sequence>(std::forward<S>(_sequence)),
0217           Base(pool, this->sequence.cbegin(), this->sequence.cend(),
0218                std::forward<F1>(functor1), std::forward<F2>(functor2),
0219                std::forward<InitialValueType>(initialValue), reduceOptions)
0220     { }
0221 
0222     void finish() override
0223     {
0224         Base::finish();
0225         // Clear the sequence to make sure all temporaries are destroyed
0226         // before finished is signaled.
0227         this->sequence = Sequence();
0228     }
0229 };
0230 
0231 } // namespace QtConcurrent
0232 
0233 QT_END_NAMESPACE
0234 
0235 #endif // QT_NO_CONCURRENT
0236 
0237 #endif