File indexing completed on 2025-01-18 10:10:51
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #ifndef ROOT_TThreadExecutor
0013 #define ROOT_TThreadExecutor
0014
0015 #include "RConfigure.h"
0016
0017
0018 #ifndef R__USE_IMT
0019
0020 # if !defined(__ROOTCLING__) && !defined(G__DICTIONARY)
0021 # error "Cannot use ROOT::TThreadExecutor without defining R__USE_IMT."
0022 # endif
0023 #else
0024
0025 #include "ROOT/TExecutorCRTP.hxx"
0026 #include "ROOT/TSeq.hxx"
0027 #include "ROOT/TypeTraits.hxx" // InvokeResult
0028 #include "RTaskArena.hxx"
0029 #include "TError.h"
0030
0031 #include <functional> //std::function
0032 #include <initializer_list>
0033 #include <memory>
0034 #include <numeric> //std::accumulate
0035 #include <type_traits> //std::enable_if
0036 #include <utility> //std::move
0037 #include <vector>
0038
0039 namespace ROOT {
0040
0041 class TThreadExecutor: public TExecutorCRTP<TThreadExecutor> {
0042 friend TExecutorCRTP;
0043
0044 public:
0045
0046 explicit TThreadExecutor(UInt_t nThreads = 0u);
0047
0048 TThreadExecutor(const TThreadExecutor &) = delete;
0049 TThreadExecutor &operator=(const TThreadExecutor &) = delete;
0050
0051
0052
0053 template<class F>
0054 void Foreach(F func, unsigned nTimes, unsigned nChunks = 0);
0055 template<class F, class INTEGER>
0056 void Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks = 0);
0057 template<class F, class T>
0058 void Foreach(F func, std::initializer_list<T> args, unsigned nChunks = 0);
0059 template<class F, class T>
0060 void Foreach(F func, std::vector<T> &args, unsigned nChunks = 0);
0061 template<class F, class T>
0062 void Foreach(F func, const std::vector<T> &args, unsigned nChunks = 0);
0063
0064
0065
0066 using TExecutorCRTP<TThreadExecutor>::Map;
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076 using TExecutorCRTP<TThreadExecutor>::MapReduce;
0077 template <class F, class R, class Cond = validMapReturnCond<F>>
0078 auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t<F>;
0079 template <class F, class R, class Cond = validMapReturnCond<F>>
0080 auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> InvokeResult_t<F>;
0081 template <class F, class INTEGER, class R, class Cond = validMapReturnCond<F, INTEGER>>
0082 auto MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, INTEGER>;
0083 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0084 auto MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0085 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0086 auto MapReduce(F func, std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>;
0087 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0088 auto MapReduce(F func, const std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>;
0089 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0090 auto MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0091 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0092 auto MapReduce(F func, const std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0093
0094 using TExecutorCRTP<TThreadExecutor>::Reduce;
0095 template<class T, class R> auto Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
0096 template<class T, class BINARYOP> auto Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()));
0097
0098 unsigned GetPoolSize() const;
0099
0100 private:
0101
0102
0103 template <class F, class Cond = validMapReturnCond<F>>
0104 auto MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
0105 template <class F, class INTEGER, class Cond = validMapReturnCond<F, INTEGER>>
0106 auto MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>;
0107 template <class F, class T, class Cond = validMapReturnCond<F, T>>
0108 auto MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0109 template <class F, class T, class Cond = validMapReturnCond<F, T>>
0110 auto MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0111
0112
0113
0114 template <class F, class R, class Cond = validMapReturnCond<F>>
0115 auto Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<InvokeResult_t<F>>;
0116 template <class F, class INTEGER, class R, class Cond = validMapReturnCond<F, INTEGER>>
0117 auto Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks)
0118 -> std::vector<InvokeResult_t<F, INTEGER>>;
0119 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0120 auto Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<InvokeResult_t<F, T>>;
0121 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0122 auto Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<InvokeResult_t<F, T>>;
0123 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0124 auto Map(F func, const std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<InvokeResult_t<F, T>>;
0125
0126
0127 void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function<void(unsigned int i)> &f);
0128 double ParallelReduce(const std::vector<double> &objs, const std::function<double(double a, double b)> &redfunc);
0129 float ParallelReduce(const std::vector<float> &objs, const std::function<float(float a, float b)> &redfunc);
0130 template<class T, class R>
0131 auto SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
0132
0133
0134 std::shared_ptr<ROOT::Internal::RTaskArenaWrapper> fTaskArenaW = nullptr;
0135 };
0136
0137
0138
0139
0140
0141
0142
0143
0144
0145 template<class F>
0146 void TThreadExecutor::Foreach(F func, unsigned nTimes, unsigned nChunks) {
0147 if (nChunks == 0) {
0148 ParallelFor(0U, nTimes, 1, [&](unsigned int){func();});
0149 return;
0150 }
0151
0152 unsigned step = (nTimes + nChunks - 1) / nChunks;
0153 auto lambda = [&](unsigned int i)
0154 {
0155 for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
0156 func();
0157 }
0158 };
0159 ParallelFor(0U, nTimes, step, lambda);
0160 }
0161
0162
0163
0164
0165
0166
0167
0168 template<class F, class INTEGER>
0169 void TThreadExecutor::Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks) {
0170 if (nChunks == 0) {
0171 ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);});
0172 return;
0173 }
0174 unsigned start = *args.begin();
0175 unsigned end = *args.end();
0176 unsigned seqStep = args.step();
0177 unsigned step = (end - start + nChunks - 1) / nChunks;
0178
0179 auto lambda = [&](unsigned int i)
0180 {
0181 for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
0182 func(i + j);
0183 }
0184 };
0185 ParallelFor(start, end, step, lambda);
0186 }
0187
0188
0189
0190
0191
0192
0193
0194 template<class F, class T>
0195 void TThreadExecutor::Foreach(F func, std::initializer_list<T> args, unsigned nChunks) {
0196 std::vector<T> vargs(std::move(args));
0197 Foreach(func, vargs, nChunks);
0198 }
0199
0200
0201
0202
0203
0204
0205
0206 template<class F, class T>
0207 void TThreadExecutor::Foreach(F func, std::vector<T> &args, unsigned nChunks) {
0208 unsigned int nToProcess = args.size();
0209 if (nChunks == 0) {
0210 ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
0211 return;
0212 }
0213
0214 unsigned step = (nToProcess + nChunks - 1) / nChunks;
0215 auto lambda = [&](unsigned int i)
0216 {
0217 for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
0218 func(args[i + j]);
0219 }
0220 };
0221 ParallelFor(0U, nToProcess, step, lambda);
0222 }
0223
0224
0225
0226
0227
0228
0229
0230 template<class F, class T>
0231 void TThreadExecutor::Foreach(F func, const std::vector<T> &args, unsigned nChunks) {
0232 unsigned int nToProcess = args.size();
0233 if (nChunks == 0) {
0234 ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
0235 return;
0236 }
0237
0238 unsigned step = (nToProcess + nChunks - 1) / nChunks;
0239 auto lambda = [&](unsigned int i)
0240 {
0241 for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
0242 func(args[i + j]);
0243 }
0244 };
0245 ParallelFor(0U, nToProcess, step, lambda);
0246 }
0247
0248
0249
0250
0251
0252
0253 template <class F, class Cond>
0254 auto TThreadExecutor::MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>
0255 {
0256 using retType = decltype(func());
0257 std::vector<retType> reslist(nTimes);
0258 auto lambda = [&](unsigned int i)
0259 {
0260 reslist[i] = func();
0261 };
0262 ParallelFor(0U, nTimes, 1, lambda);
0263
0264 return reslist;
0265 }
0266
0267
0268
0269
0270
0271
0272 template <class F, class INTEGER, class Cond>
0273 auto TThreadExecutor::MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>
0274 {
0275 using retType = decltype(func(*args.begin()));
0276 std::vector<retType> reslist(args.size());
0277 auto lambda = [&](unsigned int i) { reslist[i] = func(args[i]); };
0278 ParallelFor(0U, args.size(), 1, lambda);
0279
0280 return reslist;
0281 }
0282
0283
0284
0285
0286
0287
0288 template <class F, class R, class Cond>
0289 auto TThreadExecutor::Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<InvokeResult_t<F>>
0290 {
0291 if (nChunks == 0)
0292 {
0293 return Map(func, nTimes);
0294 }
0295
0296 unsigned step = (nTimes + nChunks - 1) / nChunks;
0297
0298 unsigned actualChunks = (nTimes + step - 1) / step;
0299 using retType = decltype(func());
0300 std::vector<retType> reslist(actualChunks);
0301 auto lambda = [&](unsigned int i)
0302 {
0303 std::vector<retType> partialResults(std::min(nTimes-i, step));
0304 for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
0305 partialResults[j] = func();
0306 }
0307 reslist[i / step] = Reduce(partialResults, redfunc);
0308 };
0309 ParallelFor(0U, nTimes, step, lambda);
0310
0311 return reslist;
0312 }
0313
0314
0315
0316
0317
0318
0319 template <class F, class T, class Cond>
0320 auto TThreadExecutor::MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0321 {
0322
0323 using retType = decltype(func(args.front()));
0324
0325 unsigned int nToProcess = args.size();
0326 std::vector<retType> reslist(nToProcess);
0327
0328 auto lambda = [&](unsigned int i)
0329 {
0330 reslist[i] = func(args[i]);
0331 };
0332
0333 ParallelFor(0U, nToProcess, 1, lambda);
0334
0335 return reslist;
0336 }
0337
0338
0339
0340
0341
0342
0343 template <class F, class T, class Cond>
0344 auto TThreadExecutor::MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0345 {
0346
0347 using retType = decltype(func(args.front()));
0348
0349 unsigned int nToProcess = args.size();
0350 std::vector<retType> reslist(nToProcess);
0351
0352 auto lambda = [&](unsigned int i)
0353 {
0354 reslist[i] = func(args[i]);
0355 };
0356
0357 ParallelFor(0U, nToProcess, 1, lambda);
0358
0359 return reslist;
0360 }
0361
0362
0363
0364
0365
0366
0367 template <class F, class INTEGER, class R, class Cond>
0368 auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks)
0369 -> std::vector<InvokeResult_t<F, INTEGER>>
0370 {
0371 if (nChunks == 0)
0372 {
0373 return Map(func, args);
0374 }
0375
0376 unsigned nToProcess = args.size();
0377 unsigned step = (nToProcess + nChunks - 1) / nChunks;
0378
0379 unsigned actualChunks = (nToProcess + step - 1) / step;
0380
0381 using retType = decltype(func(*args.begin()));
0382 std::vector<retType> reslist(actualChunks);
0383 auto lambda = [&](unsigned int i) {
0384 std::vector<retType> partialResults(std::min(step, nToProcess - i));
0385 for (unsigned j = 0; j < partialResults.size(); j++) {
0386 partialResults[j] = func(args[i + j]);
0387 }
0388 reslist[i / step] = Reduce(partialResults, redfunc);
0389 };
0390
0391 ParallelFor(0U, nToProcess, step, lambda);
0392
0393 return reslist;
0394 }
0395
0396
0397
0398
0399
0400
0401 template <class F, class T, class R, class Cond>
0402 auto TThreadExecutor::Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks)
0403 -> std::vector<InvokeResult_t<F, T>>
0404 {
0405 if (nChunks == 0)
0406 {
0407 return Map(func, args);
0408 }
0409
0410 unsigned int nToProcess = args.size();
0411 unsigned step = (nToProcess + nChunks - 1) / nChunks;
0412
0413 unsigned actualChunks = (nToProcess + step - 1) / step;
0414
0415 using retType = decltype(func(args.front()));
0416 std::vector<retType> reslist(actualChunks);
0417 auto lambda = [&](unsigned int i) {
0418 std::vector<retType> partialResults(std::min(step, nToProcess - i));
0419 for (unsigned j = 0; j < partialResults.size(); j++) {
0420 partialResults[j] = func(args[i + j]);
0421 }
0422 reslist[i / step] = Reduce(partialResults, redfunc);
0423 };
0424
0425 ParallelFor(0U, nToProcess, step, lambda);
0426
0427 return reslist;
0428 }
0429
0430
0431
0432
0433
0434
0435 template <class F, class T, class R, class Cond>
0436 auto TThreadExecutor::Map(F func, const std::vector<T> &args, R redfunc, unsigned nChunks)
0437 -> std::vector<InvokeResult_t<F, T>>
0438 {
0439 if (nChunks == 0)
0440 {
0441 return Map(func, args);
0442 }
0443
0444 unsigned int nToProcess = args.size();
0445 unsigned step = (nToProcess + nChunks - 1) / nChunks;
0446
0447 unsigned actualChunks = (nToProcess + step - 1) / step;
0448
0449 using retType = decltype(func(args.front()));
0450 std::vector<retType> reslist(actualChunks);
0451 auto lambda = [&](unsigned int i) {
0452 std::vector<retType> partialResults(std::min(step, nToProcess - i));
0453 for (unsigned j = 0; j < partialResults.size(); j++) {
0454 partialResults[j] = func(args[i + j]);
0455 }
0456 reslist[i / step] = Reduce(partialResults, redfunc);
0457 };
0458
0459 ParallelFor(0U, nToProcess, step, lambda);
0460
0461 return reslist;
0462 }
0463
0464
0465
0466
0467
0468
0469 template <class F, class T, class R, class Cond>
0470 auto TThreadExecutor::Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks)
0471 -> std::vector<InvokeResult_t<F, T>>
0472 {
0473 std::vector<T> vargs(std::move(args));
0474 const auto &reslist = Map(func, vargs, redfunc, nChunks);
0475 return reslist;
0476 }
0477
0478
0479
0480
0481 template <class F, class R, class Cond>
0482 auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t<F>
0483 {
0484 return Reduce(Map(func, nTimes), redfunc);
0485 }
0486
0487
0488
0489
0490
0491
0492 template <class F, class R, class Cond>
0493 auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> InvokeResult_t<F>
0494 {
0495 return Reduce(Map(func, nTimes, redfunc, nChunks), redfunc);
0496 }
0497
0498
0499
0500
0501
0502
0503 template <class F, class INTEGER, class R, class Cond>
0504 auto TThreadExecutor::MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks)
0505 -> InvokeResult_t<F, INTEGER>
0506 {
0507 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
0508 }
0509
0510
0511
0512
0513
0514
0515 template <class F, class T, class R, class Cond>
0516 auto TThreadExecutor::MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks)
0517 -> InvokeResult_t<F, T>
0518 {
0519 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
0520 }
0521
0522
0523
0524
0525 template <class F, class T, class R, class Cond>
0526 auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>
0527 {
0528 return Reduce(Map(func, args), redfunc);
0529 }
0530
0531
0532
0533
0534 template <class F, class T, class R, class Cond>
0535 auto TThreadExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>
0536 {
0537 return Reduce(Map(func, args), redfunc);
0538 }
0539
0540
0541
0542
0543
0544
0545 template <class F, class T, class R, class Cond>
0546 auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>
0547 {
0548 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
0549 }
0550
0551
0552
0553
0554
0555
0556 template <class F, class T, class R, class Cond>
0557 auto TThreadExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc, unsigned nChunks)
0558 -> InvokeResult_t<F, T>
0559 {
0560 return Reduce(Map(func, args, redfunc, nChunks), redfunc);
0561 }
0562
0563
0564
0565 template<class T, class R>
0566 auto TThreadExecutor::Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
0567 {
0568
0569 static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
0570 return SeqReduce(objs, redfunc);
0571 }
0572
0573
0574
0575
0576
0577
0578
0579
0580 template<class T, class BINARYOP>
0581 auto TThreadExecutor::Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
0582 {
0583
0584 static_assert(std::is_same<decltype(redfunc(objs.front(), objs.front())), T>::value, "redfunc does not have the correct signature");
0585 return ParallelReduce(objs, redfunc);
0586 }
0587
0588
0589
0590
0591
0592
0593
0594 template<class T, class R>
0595 auto TThreadExecutor::SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
0596 {
0597 return redfunc(objs);
0598 }
0599
0600 }
0601
0602 #endif
0603 #endif