File indexing completed on 2025-01-18 10:10:50
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef ROOT_TExecutor
0012 #define ROOT_TExecutor
0013
0014 #include "ROOT/RConfig.hxx"
0015 #include "ROOT/TExecutorCRTP.hxx"
0016 #include "ROOT/TSeq.hxx"
0017 #include "ROOT/TSequentialExecutor.hxx"
0018 #ifdef R__USE_IMT
0019 #include "ROOT/TThreadExecutor.hxx"
0020 #endif
0021 #ifndef R__WIN32
0022 #include "ROOT/TProcessExecutor.hxx"
0023 #endif
0024 #include "TROOT.h"
0025 #include "ROOT/EExecutionPolicy.hxx"
0026
0027 #include <initializer_list>
0028 #include <memory>
0029 #include <thread>
0030 #include <type_traits> //std::enable_if
0031 #include <stdexcept> //std::invalid_argument
0032 #include <utility> //std::move
0033
0034 namespace ROOT{
0035
0036 namespace Internal{
0037 class TExecutor: public TExecutorCRTP<TExecutor> {
0038 friend TExecutorCRTP;
0039
0040 public:
0041
0042
0043
0044
0045 explicit TExecutor(unsigned nWorkers = 0) :
0046 TExecutor(ROOT::IsImplicitMTEnabled() ? ROOT::EExecutionPolicy::kMultiThread : ROOT::EExecutionPolicy::kSequential, nWorkers) {}
0047
0048
0049
0050
0051 explicit TExecutor(ROOT::EExecutionPolicy execPolicy, unsigned nWorkers = 0);
0052
0053 TExecutor(const TExecutor &) = delete;
0054 TExecutor &operator=(const TExecutor &) = delete;
0055
0056
0057 ROOT::EExecutionPolicy Policy() const { return fExecPolicy; }
0058
0059
0060
0061 using TExecutorCRTP<TExecutor>::Map;
0062
0063
0064
0065
0066
0067 using TExecutorCRTP<TExecutor>::MapReduce;
0068 template <class F, class R, class Cond = validMapReturnCond<F>>
0069 auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> InvokeResult_t<F>;
0070 template <class F, class INTEGER, class R, class Cond = validMapReturnCond<F, INTEGER>>
0071 auto MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, INTEGER>;
0072 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0073 auto MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0074 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0075 auto MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0076 template <class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0077 auto MapReduce(F func, const std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>;
0078
0079
0080
0081 using TExecutorCRTP<TExecutor>::Reduce;
0082
0083 unsigned GetPoolSize() const;
0084
0085 private:
0086
0087
0088 template <class F, class Cond = validMapReturnCond<F>>
0089 auto MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
0090 template <class F, class INTEGER, class Cond = validMapReturnCond<F, INTEGER>>
0091 auto MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>;
0092 template <class F, class T, class Cond = validMapReturnCond<F, T>>
0093 auto MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0094 template <class F, class T, class Cond = validMapReturnCond<F, T>>
0095 auto MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0096
0097 ROOT::EExecutionPolicy fExecPolicy;
0098
0099
0100
0101 using Unused_t = ROOT::TSequentialExecutor;
0102 #ifdef R__USE_IMT
0103 # define R__EXECUTOR_THREAD ROOT::TThreadExecutor
0104 #else
0105 # define R__EXECUTOR_THREAD Unused_t
0106 #endif
0107 #ifndef R__WIN32
0108 # define R__EXECUTOR_PROCESS ROOT::TProcessExecutor
0109 #else
0110 # define R__EXECUTOR_PROCESS Unused_t
0111 #endif
0112
0113 std::unique_ptr<R__EXECUTOR_THREAD> fThreadExecutor;
0114 std::unique_ptr<R__EXECUTOR_PROCESS> fProcessExecutor;
0115 std::unique_ptr<ROOT::TSequentialExecutor> fSequentialExecutor;
0116
0117 #undef R__EXECUTOR_THREAD
0118 #undef R__EXECUTOR_PROCESS
0119
0120
0121
0122 template<class F, class CONTAINER>
0123 struct MapRetType {
0124 using type = InvokeResult_t<F, typename CONTAINER::value_type>;
0125 };
0126
0127 template<class F>
0128 struct MapRetType<F, unsigned> {
0129 using type = InvokeResult_t<F>;
0130 };
0131
0132
0133
0134
0135 template<class F, class T>
0136 auto ResolveExecutorAndMap(F func, T&& args) -> std::vector<typename MapRetType<F, typename std::decay<T>::type>::type> {
0137 std::vector<typename MapRetType<F, typename std::decay<T>::type>::type> res;
0138 switch(fExecPolicy) {
0139 case ROOT::EExecutionPolicy::kSequential:
0140 res = fSequentialExecutor->Map(func, std::forward<T>(args));
0141 break;
0142 case ROOT::EExecutionPolicy::kMultiThread:
0143 res = fThreadExecutor->Map(func, std::forward<T>(args));
0144 break;
0145 case ROOT::EExecutionPolicy::kMultiProcess:
0146 res = fProcessExecutor->Map(func, std::forward<T>(args));
0147 break;
0148 default:
0149 break;
0150 }
0151 return res;
0152 }
0153 };
0154
0155
0156
0157
0158
0159
0160
0161 template <class F, class Cond>
0162 auto TExecutor::MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>
0163 {
0164 return ResolveExecutorAndMap(func, nTimes);
0165 }
0166
0167
0168
0169
0170
0171
0172 template <class F, class INTEGER, class Cond>
0173 auto TExecutor::MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>
0174 {
0175 return ResolveExecutorAndMap(func, args);
0176 }
0177
0178
0179
0180
0181
0182
0183 template <class F, class T, class Cond>
0184 auto TExecutor::MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0185 {
0186 return ResolveExecutorAndMap(func, args);
0187 }
0188
0189
0190
0191
0192
0193
0194 template <class F, class T, class Cond>
0195 auto TExecutor::MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0196 {
0197 return ResolveExecutorAndMap(func, args);
0198 }
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210
0211
0212 template <class F, class R, class Cond>
0213 auto TExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> InvokeResult_t<F>
0214 {
0215
0216 static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F>>>, "redfunc does not have the correct signature");
0217 if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0218 return fThreadExecutor->MapReduce(func, nTimes, redfunc, nChunks);
0219 }
0220 return Reduce(Map(func, nTimes), redfunc);
0221 }
0222
0223
0224
0225
0226
0227
0228
0229
0230
0231
0232
0233
0234
0235 template <class F, class INTEGER, class R, class Cond>
0236 auto TExecutor::MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, INTEGER>
0237 {
0238 static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F, INTEGER>>>,
0239 "redfunc does not have the correct signature");
0240 if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0241 return fThreadExecutor->MapReduce(func, args, redfunc, nChunks);
0242 }
0243 return Reduce(Map(func, args), redfunc);
0244 }
0245
0246
0247
0248
0249
0250
0251
0252
0253
0254
0255
0256
0257
0258 template <class F, class T, class R, class Cond>
0259 auto TExecutor::MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>
0260 {
0261 static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F, T>>>,
0262 "redfunc does not have the correct signature");
0263 if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0264 return fThreadExecutor->MapReduce(func, args, redfunc, nChunks);
0265 }
0266 return Reduce(Map(func, args), redfunc);
0267 }
0268
0269
0270
0271
0272
0273
0274
0275
0276
0277
0278
0279
0280
0281 template <class F, class T, class R, class Cond>
0282 auto TExecutor::MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>
0283 {
0284 static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F, T>>>,
0285 "redfunc does not have the correct signature");
0286 if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0287 return fThreadExecutor->MapReduce(func, args, redfunc, nChunks);
0288 }
0289 return Reduce(Map(func, args), redfunc);
0290 }
0291
0292
0293
0294
0295
0296
0297
0298
0299
0300
0301
0302
0303
0304 template <class F, class T, class R, class Cond>
0305 auto TExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc, unsigned nChunks) -> InvokeResult_t<F, T>
0306 {
0307 static_assert(std::is_invocable_v<R, std::vector<InvokeResult_t<F, T>>>,
0308 "redfunc does not have the correct signature");
0309 if (fExecPolicy == ROOT::EExecutionPolicy::kMultiThread) {
0310 return fThreadExecutor->MapReduce(func, args, redfunc, nChunks);
0311 }
0312 return Reduce(Map(func, args), redfunc);
0313 }
0314
0315
0316
0317
0318
0319
0320 inline unsigned TExecutor::GetPoolSize() const
0321 {
0322 unsigned poolSize{0u};
0323 switch(fExecPolicy){
0324 case ROOT::EExecutionPolicy::kSequential:
0325 poolSize = fSequentialExecutor->GetPoolSize();
0326 break;
0327 case ROOT::EExecutionPolicy::kMultiThread:
0328 poolSize = fThreadExecutor->GetPoolSize();
0329 break;
0330 case ROOT::EExecutionPolicy::kMultiProcess:
0331 poolSize = fProcessExecutor->GetPoolSize();
0332 break;
0333 default:
0334 break;
0335 }
0336 return poolSize;
0337 }
0338
0339 }
0340 }
0341
0342 #endif