File indexing completed on 2025-01-18 10:10:51
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013 #ifndef ROOT_TTreeProcessorMP
0014 #define ROOT_TTreeProcessorMP
0015
0016 #include "MPCode.h"
0017 #include "MPSendRecv.h"
0018 #include "PoolUtils.h"
0019 #include "ROOT/TypeTraits.hxx" // InvokeResult_t
0020 #include "TChain.h"
0021 #include "TChainElement.h"
0022 #include "TError.h"
0023 #include "TFileCollection.h"
0024 #include "TFileInfo.h"
0025 #include "THashList.h"
0026 #include "TMPClient.h"
0027 #include "TMPWorkerTree.h"
0028 #include "TSelector.h"
0029 #include "TTreeReader.h"
0030 #include <algorithm> //std::generate
0031 #include <numeric> //std::iota
0032 #include <string>
0033 #include <functional> //std::reference_wrapper
0034 #include <vector>
0035
0036 namespace ROOT {
0037
0038 class TTreeProcessorMP : private TMPClient {
0039 template <typename F, typename... Args>
0040 using InvokeResult_t = ROOT::TypeTraits::InvokeResult_t<F, Args...>;
0041
0042 public:
0043 explicit TTreeProcessorMP(UInt_t nWorkers = 0);
0044 ~TTreeProcessorMP() = default;
0045
0046 TTreeProcessorMP(const TTreeProcessorMP &) = delete;
0047 TTreeProcessorMP &operator=(const TTreeProcessorMP &) = delete;
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065 template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
0066 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0067 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0068 template<class F> auto Process(const std::string& fileName, F procFunc, TEntryList &entries,
0069 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0070 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0071 template<class F> auto Process(TFileCollection& collection, F procFunc, TEntryList &entries,
0072 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0073 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0074 template<class F> auto Process(TChain& chain, F procFunc, TEntryList &entries,
0075 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0076 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0077 template<class F> auto Process(TTree& tree, F procFunc, TEntryList &entries,
0078 ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0079 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093
0094
0095
0096 template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc,
0097 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0098 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0099 template<class F> auto Process(const std::string& fileName, F procFunc,
0100 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0101 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0102 template<class F> auto Process(TFileCollection& files, F procFunc,
0103 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0104 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0105 template<class F> auto Process(TChain& files, F procFunc,
0106 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0107 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0108 template<class F> auto Process(TTree& tree, F procFunc, ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0109 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0110
0111
0112
0113
0114
0115
0116
0117
0118
0119
0120
0121
0122
0123
0124
0125
0126
0127
0128 TList* Process(const std::vector<std::string>& fileNames, TSelector& selector, TEntryList &entries,
0129 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0130 TList* Process(const std::string &fileName, TSelector& selector, TEntryList &entries,
0131 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0132 TList* Process(TFileCollection& files, TSelector& selector, TEntryList &entries,
0133 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0134 TList* Process(TChain& files, TSelector& selector, TEntryList &entries,
0135 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0136 TList* Process(TTree& tree, TSelector& selector, TEntryList &entries,
0137 ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0138
0139
0140
0141
0142
0143
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155 TList* Process(const std::vector<std::string>& fileNames, TSelector& selector,
0156 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0157 TList* Process(const std::string &fileName, TSelector& selector,
0158 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0159 TList* Process(TFileCollection& files, TSelector& selector,
0160 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0161 TList* Process(TChain& files, TSelector& selector,
0162 const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0163 TList* Process(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0164
0165 void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
0166 unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
0167
0168 private:
0169 template<class T> void Collect(std::vector<T> &reslist);
0170 template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
0171
0172 void FixLists(std::vector<TObject*> &lists);
0173 void Reset();
0174 void ReplyToIdle(TSocket *s);
0175
0176 unsigned fNProcessed;
0177 unsigned fNToProcess;
0178
0179
0180
0181
0182 enum class ETask : unsigned char {
0183 kNoTask,
0184 kProcByRange,
0185 kProcByFile
0186 };
0187
0188 ETask fTaskType = ETask::kNoTask;
0189 };
0190
0191 template<class F>
0192 auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
0193 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0194 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0195 {
0196 using retType = InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0197 static_assert(std::is_constructible<TObject*, retType>::value,
0198 "procFunc must return a pointer to a class inheriting from TObject,"
0199 " and must take a reference to TTreeReader as the only argument");
0200
0201
0202 if (jFirst > 0) {
0203 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
0204 jFirst = 0;
0205 }
0206
0207
0208 Reset();
0209 unsigned nWorkers = GetNWorkers();
0210
0211
0212 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
0213
0214 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
0215 bool ok = Fork(worker);
0216 if(!ok) {
0217 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
0218 return nullptr;
0219 }
0220
0221
0222 if(fileNames.size() < nWorkers) {
0223
0224 fTaskType = ETask::kProcByRange;
0225
0226 fNToProcess = nWorkers*fileNames.size();
0227 std::vector<unsigned> args(nWorkers);
0228 std::iota(args.begin(), args.end(), 0);
0229 fNProcessed = Broadcast(MPCode::kProcRange, args);
0230 if(fNProcessed < nWorkers)
0231 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
0232 } else {
0233
0234 fTaskType = ETask::kProcByFile;
0235 fNToProcess = fileNames.size();
0236 std::vector<unsigned> args(nWorkers);
0237 std::iota(args.begin(), args.end(), 0);
0238 fNProcessed = Broadcast(MPCode::kProcFile, args);
0239 if(fNProcessed < nWorkers)
0240 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
0241 }
0242
0243
0244 std::vector<TObject*> reslist;
0245 Collect(reslist);
0246
0247
0248 PoolUtils::ReduceObjects<TObject *> redfunc;
0249 auto res = redfunc(reslist);
0250
0251
0252 ReapWorkers();
0253 fTaskType = ETask::kNoTask;
0254 return static_cast<retType>(res);
0255 }
0256
0257
0258 template<class F>
0259 auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc, TEntryList &entries,
0260 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0261 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0262 {
0263 std::vector<std::string> singleFileName(1, fileName);
0264 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
0265 }
0266
0267
0268 template<class F>
0269 auto TTreeProcessorMP::Process(TFileCollection& files, F procFunc, TEntryList &entries,
0270 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0271 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0272 {
0273 std::vector<std::string> fileNames(files.GetNFiles());
0274 unsigned count = 0;
0275 for(auto f : *static_cast<THashList*>(files.GetList()))
0276 fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
0277
0278 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
0279 }
0280
0281
0282 template<class F>
0283 auto TTreeProcessorMP::Process(TChain& files, F procFunc, TEntryList &entries,
0284 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0285 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0286 {
0287 TObjArray* filelist = files.GetListOfFiles();
0288 std::vector<std::string> fileNames(filelist->GetEntries());
0289 unsigned count = 0;
0290 for(auto f : *filelist)
0291 fileNames[count++] = f->GetTitle();
0292
0293 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
0294 }
0295
0296
0297 template<class F>
0298 auto TTreeProcessorMP::Process(TTree& tree, F procFunc, TEntryList &entries,
0299 ULong64_t nToProcess, ULong64_t jFirst)
0300 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0301 {
0302 using retType = InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0303 static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
0304
0305
0306 if (jFirst > 0) {
0307 Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
0308 jFirst = 0;
0309 }
0310
0311
0312 Reset();
0313 unsigned nWorkers = GetNWorkers();
0314
0315
0316 TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
0317
0318 TMPWorkerTreeFunc<F> worker(procFunc, &tree, elist, nWorkers, nToProcess, jFirst);
0319 bool ok = Fork(worker);
0320 if(!ok) {
0321 Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
0322 return nullptr;
0323 }
0324
0325
0326 fTaskType = ETask::kProcByRange;
0327
0328
0329 fNToProcess = nWorkers;
0330 std::vector<unsigned> args(nWorkers);
0331 std::iota(args.begin(), args.end(), 0);
0332 fNProcessed = Broadcast(MPCode::kProcTree, args);
0333 if(fNProcessed < nWorkers)
0334 Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
0335
0336
0337 std::vector<TObject*> reslist;
0338 Collect(reslist);
0339
0340
0341 PoolUtils::ReduceObjects<TObject *> redfunc;
0342 auto res = redfunc(reslist);
0343
0344
0345 ReapWorkers();
0346 fTaskType = ETask::kNoTask;
0347 return static_cast<retType>(res);
0348 }
0349
0350
0351
0352
0353
0354
0355 template<class F>
0356 auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc,
0357 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0358 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0359 {
0360 TEntryList noelist;
0361 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
0362 }
0363
0364
0365 template<class F>
0366 auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc,
0367 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0368 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0369 {
0370 TEntryList noelist;
0371 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
0372 }
0373
0374
0375 template<class F>
0376 auto TTreeProcessorMP::Process(TFileCollection& files, F procFunc,
0377 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0378 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0379 {
0380 TEntryList noelist;
0381 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
0382 }
0383
0384
0385 template<class F>
0386 auto TTreeProcessorMP::Process(TChain& files, F procFunc,
0387 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0388 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0389 {
0390 TEntryList noelist;
0391 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
0392 }
0393
0394
0395 template<class F>
0396 auto TTreeProcessorMP::Process(TTree& tree, F procFunc,
0397 ULong64_t nToProcess, ULong64_t jFirst)
0398 -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0399 {
0400 TEntryList noelist;
0401 return Process(tree, procFunc, noelist, nToProcess, jFirst);
0402 }
0403
0404
0405
0406 template<class T>
0407 void TTreeProcessorMP::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
0408 {
0409 unsigned code = msg.first;
0410 if (code == MPCode::kIdling) {
0411 ReplyToIdle(s);
0412 } else if(code == MPCode::kProcResult) {
0413 if(msg.second != nullptr)
0414 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
0415 MPSend(s, MPCode::kShutdownOrder);
0416 } else if(code == MPCode::kProcError) {
0417 const char *str = ReadBuffer<const char*>(msg.second.get());
0418 Error("TTreeProcessorMP::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
0419 "Continuing execution ignoring these entries.", str);
0420 ReplyToIdle(s);
0421 delete [] str;
0422 } else {
0423
0424 Error("TTreeProcessorMP::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
0425 }
0426 }
0427
0428
0429
0430
0431
0432 template<class T>
0433 void TTreeProcessorMP::Collect(std::vector<T> &reslist)
0434 {
0435 TMonitor &mon = GetMonitor();
0436 mon.ActivateAll();
0437 while (mon.GetActive() > 0) {
0438 TSocket *s = mon.Select();
0439 MPCodeBufPair msg = MPRecv(s);
0440 if (msg.first == MPCode::kRecvError) {
0441 Error("TTreeProcessorMP::Collect", "[E][C] Lost connection to a worker");
0442 Remove(s);
0443 } else if (msg.first < 1000)
0444 HandlePoolCode(msg, s, reslist);
0445 else
0446 HandleMPCode(msg, s);
0447 }
0448 }
0449
0450 }
0451
0452 #endif