File indexing completed on 2025-01-18 10:12:17
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 #ifndef ROOT_TMPWorkerTree
0013 #define ROOT_TMPWorkerTree
0014
0015 #include "ROOT/TypeTraits.hxx" // InvokeResult_t
0016 #include "TMPWorker.h"
0017 #include "TFile.h"
0018 #include "TEntryList.h"
0019 #include "TEventList.h"
0020 #include "TH1.h"
0021 #include "TKey.h"
0022 #include "TSelector.h"
0023 #include "TTree.h"
0024 #include "TTreeCache.h"
0025 #include "TTreeReader.h"
0026
0027 #include <memory> //unique_ptr
0028 #include <string>
0029 #include <sstream>
0030 #include <type_traits> //std::enable_if_t
0031 #include <unistd.h> //pid_t
0032 #include <vector>
0033
0034 class TMPWorkerTree : public TMPWorker {
0035
0036 public:
0037 TMPWorkerTree();
0038 TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries, const std::string &treeName,
0039 UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
0040 TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
0041 ~TMPWorkerTree() override;
0042
0043
0044 TMPWorkerTree(const TMPWorkerTree &) = delete;
0045 TMPWorkerTree &operator=(const TMPWorkerTree &) = delete;
0046
0047 protected:
0048
0049 void CloseFile();
0050 ULong64_t EvalMaxEntries(ULong64_t maxEntries);
0051 void HandleInput(MPCodeBufPair& msg) override;
0052 void Init(int fd, UInt_t workerN) override;
0053 Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
0054 std::string &errmsg);
0055 TFile *OpenFile(const std::string& fileName);
0056 virtual void Process(UInt_t, MPCodeBufPair &) {}
0057 TTree *RetrieveTree(TFile *fp);
0058 virtual void SendResult() { }
0059 void Setup();
0060 void SetupTreeCache(TTree *tree);
0061
0062 std::vector<std::string> fFileNames;
0063 std::string fTreeName;
0064 TTree *fTree;
0065 TFile *fFile;
0066 TEntryList *fEntryList;
0067 ULong64_t fFirstEntry;
0068
0069 private:
0070
0071
0072 TTreeCache *fTreeCache;
0073 bool fTreeCacheIsLearning;
0074 bool fUseTreeCache;
0075 Long64_t fCacheSize;
0076 };
0077
0078 template<class F>
0079 class TMPWorkerTreeFunc : public TMPWorkerTree {
0080 public:
0081 TMPWorkerTreeFunc(F procFunc, const std::vector<std::string> &fileNames, TEntryList *entries,
0082 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
0083 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
0084 fReducedResult(), fCanReduce(false)
0085 {
0086 }
0087 TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
0088 ULong64_t firstEntry)
0089 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
0090 fCanReduce(false)
0091 {
0092 }
0093 ~TMPWorkerTreeFunc() override {}
0094
0095 private:
0096 void Process(UInt_t code, MPCodeBufPair &msg) override;
0097 void SendResult() override;
0098
0099 F fProcFunc;
0100
0101 ROOT::TypeTraits::InvokeResult_t<F, std::reference_wrapper<TTreeReader>> fReducedResult;
0102
0103 bool fCanReduce;
0104 };
0105
0106 class TMPWorkerTreeSel : public TMPWorkerTree {
0107 public:
0108 TMPWorkerTreeSel(TSelector &selector, const std::vector<std::string> &fileNames, TEntryList *entries,
0109 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
0110 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
0111 fCallBegin(true)
0112 {
0113 }
0114 TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
0115 ULong64_t firstEntry)
0116 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
0117 {
0118 }
0119 ~TMPWorkerTreeSel() override {}
0120
0121 private:
0122 void Process(UInt_t code, MPCodeBufPair &msg) override;
0123 void SendResult() override;
0124
0125 TSelector &fSelector;
0126 bool fCallBegin = true;
0127 };
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137 template <class T, std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
0138 !std::is_constructible<TCollection *, T>::value> * = nullptr>
0139 void DetachRes(T res)
0140 {
0141 auto th1p = dynamic_cast<TH1*>(res);
0142 if(th1p != nullptr) {
0143 th1p->SetDirectory(nullptr);
0144 return;
0145 }
0146 auto ttreep = dynamic_cast<TTree*>(res);
0147 if(ttreep != nullptr) {
0148 ttreep->SetDirectory(nullptr);
0149 return;
0150 }
0151 auto tentrylist = dynamic_cast<TEntryList*>(res);
0152 if(tentrylist != nullptr) {
0153 tentrylist->SetDirectory(nullptr);
0154 return;
0155 }
0156 auto teventlist = dynamic_cast<TEventList*>(res);
0157 if(teventlist != nullptr) {
0158 teventlist->SetDirectory(nullptr);
0159 return;
0160 }
0161 return;
0162 }
0163
0164
0165 template <class T,
0166 std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TCollection *, T>::value> * = nullptr>
0167 void DetachRes(T res)
0168 {
0169 if (res) {
0170 TIter nxo(res);
0171 TObject *obj = nullptr;
0172 while ((obj = nxo())) {
0173 DetachRes(obj);
0174 }
0175 }
0176 }
0177
0178
0179
0180
0181 template<class F>
0182 void TMPWorkerTreeFunc<F>::SendResult()
0183 {
0184
0185 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
0186 }
0187
0188 template <class F>
0189 void TMPWorkerTreeFunc<F>::Process(UInt_t code, MPCodeBufPair &msg)
0190 {
0191
0192 Long64_t start = 0;
0193 Long64_t finish = 0;
0194 TEntryList *enl = nullptr;
0195 std::string reply, errmsg, sn = "[S" + std::to_string(GetNWorker()) + "]: ";
0196 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
0197 reply = sn + errmsg;
0198 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
0199 return;
0200 }
0201
0202
0203
0204 if (start >= 0 && start < fTree->GetEntries()) {
0205 TTreeReader reader(fTree, enl);
0206
0207 TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
0208 if (status != TTreeReader::kEntryValid) {
0209 reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1);
0210 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
0211 return;
0212 }
0213
0214
0215 auto res = fProcFunc(reader);
0216
0217
0218 DetachRes(res);
0219
0220 if (fCanReduce) {
0221 PoolUtils::ReduceObjects<TObject *> redfunc;
0222 fReducedResult = static_cast<decltype(fReducedResult)>(redfunc(
0223 {res, fReducedResult}));
0224 } else {
0225 fCanReduce = true;
0226 fReducedResult = res;
0227 }
0228 }
0229
0230
0231 fProcessedEntries += finish - start;
0232
0233 if(fMaxNEntries == fProcessedEntries)
0234
0235 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
0236 else
0237
0238 MPSend(GetSocket(), MPCode::kIdling);
0239 }
0240
0241 #endif