File indexing completed on 2025-01-18 10:10:51
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 #ifndef ROOT_TProcessExecutor
0014 #define ROOT_TProcessExecutor
0015
0016 #include "MPCode.h"
0017 #include "MPSendRecv.h"
0018 #include "PoolUtils.h"
0019 #include "ROOT/TExecutorCRTP.hxx"
0020 #include "ROOT/TSeq.hxx"
0021 #include "ROOT/TypeTraits.hxx"
0022 #include "TError.h"
0023 #include "TFileCollection.h"
0024 #include "TFileInfo.h"
0025 #include "THashList.h"
0026 #include "TMPClient.h"
0027 #include "TMPWorkerExecutor.h"
0028
0029 #include <algorithm> //std::generate
0030 #include <numeric> //std::iota
0031 #include <string>
0032 #include <functional> //std::reference_wrapper
0033 #include <vector>
0034
0035 namespace ROOT {
0036
0037 class TProcessExecutor : public TExecutorCRTP<TProcessExecutor>, private TMPClient {
0038 friend TExecutorCRTP;
0039
0040 public:
0041 explicit TProcessExecutor(unsigned nWorkers = 0);
0042 ~TProcessExecutor() = default;
0043
0044 TProcessExecutor(const TProcessExecutor &) = delete;
0045 TProcessExecutor &operator=(const TProcessExecutor &) = delete;
0046
0047
0048
0049 using TExecutorCRTP<TProcessExecutor>::Map;
0050
0051
0052
0053
0054 using TExecutorCRTP<TProcessExecutor>::MapReduce;
0055 template<class F, class R, class Cond = validMapReturnCond<F>>
0056 auto MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t<F>;
0057 template<class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0058 auto MapReduce(F func, std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>;
0059 template<class F, class T, class R, class Cond = validMapReturnCond<F, T>>
0060 auto MapReduce(F func, const std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>;
0061
0062
0063
0064 using TExecutorCRTP<TProcessExecutor>::Reduce;
0065
0066 void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
0067
0068
0069
0070
0071
0072 unsigned GetPoolSize() const { return TMPClient::GetNWorkers(); }
0073
0074 private:
0075
0076
0077 template<class F, class Cond = validMapReturnCond<F>>
0078 auto MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>;
0079 template<class F, class INTEGER, class Cond = validMapReturnCond<F, INTEGER>>
0080 auto MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>;
0081 template<class F, class T, class Cond = validMapReturnCond<F, T>>
0082 auto MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0083 template<class F, class T, class Cond = validMapReturnCond<F, T>>
0084 auto MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>;
0085
0086 template<class T> void Collect(std::vector<T> &reslist);
0087 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
0088
0089 void Reset();
0090 void ReplyToFuncResult(TSocket *s);
0091 void ReplyToIdle(TSocket *s);
0092
0093 unsigned fNProcessed;
0094 unsigned fNToProcess;
0095
0096
0097
0098
0099 enum class ETask : unsigned char {
0100 kNoTask,
0101 kMap,
0102 kMapWithArg,
0103 kMapRed,
0104 kMapRedWithArg
0105 };
0106
0107 ETask fTaskType = ETask::kNoTask;
0108 };
0109
0110
0111
0112
0113
0114
0115
0116
0117
0118 template<class F, class Cond>
0119 auto TProcessExecutor::MapImpl(F func, unsigned nTimes) -> std::vector<InvokeResult_t<F>>
0120 {
0121 using retType = decltype(func());
0122
0123 Reset();
0124 fTaskType = ETask::kMap;
0125
0126
0127 unsigned oldNWorkers = GetPoolSize();
0128 if (nTimes < oldNWorkers)
0129 SetNWorkers(nTimes);
0130 TMPWorkerExecutor<F> worker(func);
0131 bool ok = Fork(worker);
0132 SetNWorkers(oldNWorkers);
0133 if (!ok)
0134 {
0135 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
0136 return std::vector<retType>();
0137 }
0138
0139
0140 fNToProcess = nTimes;
0141 std::vector<retType> reslist;
0142 reslist.reserve(fNToProcess);
0143 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
0144
0145
0146 Collect(reslist);
0147
0148
0149 ReapWorkers();
0150 fTaskType = ETask::kNoTask;
0151 return reslist;
0152 }
0153
0154
0155
0156
0157
0158
0159 template<class F, class T, class Cond>
0160 auto TProcessExecutor::MapImpl(F func, std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0161 {
0162
0163 using retType = decltype(func(args.front()));
0164
0165 Reset();
0166 fTaskType = ETask::kMapWithArg;
0167
0168
0169
0170 unsigned oldNWorkers = GetPoolSize();
0171 if (args.size() < oldNWorkers)
0172 SetNWorkers(args.size());
0173 TMPWorkerExecutor<F, T> worker(func, args);
0174 bool ok = Fork(worker);
0175 SetNWorkers(oldNWorkers);
0176 if (!ok)
0177 {
0178 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
0179 return std::vector<retType>();
0180 }
0181
0182
0183 fNToProcess = args.size();
0184 std::vector<retType> reslist;
0185 reslist.reserve(fNToProcess);
0186 std::vector<unsigned> range(fNToProcess);
0187 std::iota(range.begin(), range.end(), 0);
0188 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
0189
0190
0191 Collect(reslist);
0192
0193
0194 ReapWorkers();
0195 fTaskType = ETask::kNoTask;
0196 return reslist;
0197 }
0198
0199
0200
0201
0202
0203
0204 template<class F, class T, class Cond>
0205 auto TProcessExecutor::MapImpl(F func, const std::vector<T> &args) -> std::vector<InvokeResult_t<F, T>>
0206 {
0207
0208 using retType = decltype(func(args.front()));
0209
0210 Reset();
0211 fTaskType = ETask::kMapWithArg;
0212
0213
0214
0215 unsigned oldNWorkers = GetPoolSize();
0216 if (args.size() < oldNWorkers)
0217 SetNWorkers(args.size());
0218 TMPWorkerExecutor<F, T> worker(func, args);
0219 bool ok = Fork(worker);
0220 SetNWorkers(oldNWorkers);
0221 if (!ok)
0222 {
0223 Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
0224 return std::vector<retType>();
0225 }
0226
0227
0228 fNToProcess = args.size();
0229 std::vector<retType> reslist;
0230 reslist.reserve(fNToProcess);
0231 std::vector<unsigned> range(fNToProcess);
0232 std::iota(range.begin(), range.end(), 0);
0233 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
0234
0235
0236 Collect(reslist);
0237
0238
0239 ReapWorkers();
0240 fTaskType = ETask::kNoTask;
0241 return reslist;
0242 }
0243
0244
0245
0246
0247
0248
0249 template<class F, class INTEGER, class Cond>
0250 auto TProcessExecutor::MapImpl(F func, ROOT::TSeq<INTEGER> args) -> std::vector<InvokeResult_t<F, INTEGER>>
0251 {
0252 std::vector<INTEGER> vargs(args.size());
0253 std::copy(args.begin(), args.end(), vargs.begin());
0254 const auto &reslist = Map(func, vargs);
0255 return reslist;
0256 }
0257
0258
0259
0260
0261 template<class F, class R, class Cond>
0262 auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> InvokeResult_t<F>
0263 {
0264 using retType = decltype(func());
0265
0266 Reset();
0267 fTaskType= ETask::kMapRed;
0268
0269
0270 unsigned oldNWorkers = GetPoolSize();
0271 if (nTimes < oldNWorkers)
0272 SetNWorkers(nTimes);
0273 TMPWorkerExecutor<F, void, R> worker(func, redfunc);
0274 bool ok = Fork(worker);
0275 SetNWorkers(oldNWorkers);
0276 if (!ok) {
0277 std::cerr << "[E][C] Could not fork. Aborting operation\n";
0278 return retType();
0279 }
0280
0281
0282 fNToProcess = nTimes;
0283 std::vector<retType> reslist;
0284 reslist.reserve(fNToProcess);
0285 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
0286
0287
0288 Collect(reslist);
0289
0290
0291 ReapWorkers();
0292 fTaskType= ETask::kNoTask;
0293 return redfunc(reslist);
0294 }
0295
0296
0297
0298
0299
0300
0301 template<class F, class T, class R, class Cond>
0302 auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>
0303 {
0304
0305 using retType = decltype(func(args.front()));
0306
0307 Reset();
0308 fTaskType= ETask::kMapRedWithArg;
0309
0310
0311 unsigned oldNWorkers = GetPoolSize();
0312 if (args.size() < oldNWorkers)
0313 SetNWorkers(args.size());
0314 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
0315 bool ok = Fork(worker);
0316 SetNWorkers(oldNWorkers);
0317 if (!ok) {
0318 std::cerr << "[E][C] Could not fork. Aborting operation\n";
0319 return decltype(func(args.front()))();
0320 }
0321
0322
0323 fNToProcess = args.size();
0324 std::vector<retType> reslist;
0325 reslist.reserve(fNToProcess);
0326 std::vector<unsigned> range(fNToProcess);
0327 std::iota(range.begin(), range.end(), 0);
0328 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
0329
0330
0331 Collect(reslist);
0332
0333 ReapWorkers();
0334 fTaskType= ETask::kNoTask;
0335 return Reduce(reslist, redfunc);
0336 }
0337
0338
0339
0340
0341
0342
0343 template<class F, class T, class R, class Cond>
0344 auto TProcessExecutor::MapReduce(F func, const std::vector<T> &args, R redfunc) -> InvokeResult_t<F, T>
0345 {
0346
0347 using retType = decltype(func(args.front()));
0348
0349 Reset();
0350 fTaskType= ETask::kMapRedWithArg;
0351
0352
0353 unsigned oldNWorkers = GetPoolSize();
0354 if (args.size() < oldNWorkers)
0355 SetNWorkers(args.size());
0356 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
0357 bool ok = Fork(worker);
0358 SetNWorkers(oldNWorkers);
0359 if (!ok) {
0360 std::cerr << "[E][C] Could not fork. Aborting operation\n";
0361 return decltype(func(args.front()))();
0362 }
0363
0364
0365 fNToProcess = args.size();
0366 std::vector<retType> reslist;
0367 reslist.reserve(fNToProcess);
0368 std::vector<unsigned> range(fNToProcess);
0369 std::iota(range.begin(), range.end(), 0);
0370 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
0371
0372
0373 Collect(reslist);
0374
0375 ReapWorkers();
0376 fTaskType= ETask::kNoTask;
0377 return Reduce(reslist, redfunc);
0378 }
0379
0380
0381
0382 template<class T>
0383 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
0384 {
0385 unsigned code = msg.first;
0386 if (code == MPCode::kFuncResult) {
0387 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
0388 ReplyToFuncResult(s);
0389 } else if (code == MPCode::kIdling) {
0390 ReplyToIdle(s);
0391 } else if(code == MPCode::kProcResult) {
0392 if(msg.second != nullptr)
0393 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
0394 MPSend(s, MPCode::kShutdownOrder);
0395 } else if(code == MPCode::kProcError) {
0396 const char *str = ReadBuffer<const char*>(msg.second.get());
0397 Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
0398 "Continuing execution ignoring these entries.", str);
0399 ReplyToIdle(s);
0400 delete [] str;
0401 } else {
0402
0403 Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
0404 }
0405 }
0406
0407
0408
0409
0410
0411 template<class T>
0412 void TProcessExecutor::Collect(std::vector<T> &reslist)
0413 {
0414 TMonitor &mon = GetMonitor();
0415 mon.ActivateAll();
0416 while (mon.GetActive() > 0) {
0417 TSocket *s = mon.Select();
0418 MPCodeBufPair msg = MPRecv(s);
0419 if (msg.first == MPCode::kRecvError) {
0420 Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
0421 Remove(s);
0422 } else if (msg.first < 1000)
0423 HandlePoolCode(msg, s, reslist);
0424 else
0425 HandleMPCode(msg, s);
0426 }
0427 }
0428
0429 }
0430
0431 #endif