Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:12:17

0001 /* @(#)root/multiproc:$Id$ */
0002 // Author: G Ganis Jan 2017
0003 
0004 /*************************************************************************
0005  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
0006  * All rights reserved.                                                  *
0007  *                                                                       *
0008  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0009  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0010  *************************************************************************/
0011 
0012 #ifndef ROOT_TMPWorkerTree
0013 #define ROOT_TMPWorkerTree
0014 
0015 #include "ROOT/TypeTraits.hxx" // InvokeResult_t
0016 #include "TMPWorker.h"
0017 #include "TFile.h"
0018 #include "TEntryList.h"
0019 #include "TEventList.h"
0020 #include "TH1.h"
0021 #include "TKey.h"
0022 #include "TSelector.h"
0023 #include "TTree.h"
0024 #include "TTreeCache.h"
0025 #include "TTreeReader.h"
0026 
0027 #include <memory> //unique_ptr
0028 #include <string>
0029 #include <sstream>
0030 #include <type_traits> //std::enable_if_t
0031 #include <unistd.h> //pid_t
0032 #include <vector>
0033 
0034 class TMPWorkerTree : public TMPWorker {
0035 
0036 public:
0037    TMPWorkerTree();
0038    TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries, const std::string &treeName,
0039                  UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
0040    TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
0041    ~TMPWorkerTree() override;
0042 
0043    // It doesn't make sense to copy a TMPWorker (each one has a uniq_ptr to its socket)
0044    TMPWorkerTree(const TMPWorkerTree &) = delete;
0045    TMPWorkerTree &operator=(const TMPWorkerTree &) = delete;
0046 
0047 protected:
0048 
0049    void         CloseFile();
0050    ULong64_t    EvalMaxEntries(ULong64_t maxEntries);
0051    void         HandleInput(MPCodeBufPair& msg) override; ///< Execute instructions received from a MP client
0052    void Init(int fd, UInt_t workerN) override;
0053    Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
0054                   std::string &errmsg);
0055    TFile       *OpenFile(const std::string& fileName);
0056    virtual void Process(UInt_t, MPCodeBufPair &) {}
0057    TTree       *RetrieveTree(TFile *fp);
0058    virtual void SendResult() { }
0059    void         Setup();
0060    void         SetupTreeCache(TTree *tree);
0061 
0062    std::vector<std::string> fFileNames; ///< the files to be processed by all workers
0063    std::string fTreeName;               ///< the name of the tree to be processed
0064    TTree *fTree;                        ///< pointer to the tree to be processed. It is only used if the tree is directly passed to TProcessExecutor::Process as argument
0065    TFile *fFile;                        ///< last open file
0066    TEntryList *fEntryList;              ///< entrylist
0067    ULong64_t fFirstEntry;               ///< first entry to br processed
0068 
0069 private:
0070 
0071    // TTree cache handling
0072    TTreeCache *fTreeCache;              ///< instance of the tree cache for the tree
0073    bool        fTreeCacheIsLearning;    ///< Whether cache is in learning phase
0074    bool        fUseTreeCache;           ///< Control usage of the tree cache
0075    Long64_t    fCacheSize;              ///< Cache size
0076 };
0077 
0078 template<class F>
0079 class TMPWorkerTreeFunc : public TMPWorkerTree {
0080 public:
0081    TMPWorkerTreeFunc(F procFunc, const std::vector<std::string> &fileNames, TEntryList *entries,
0082                      const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
0083       : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
0084         fReducedResult(), fCanReduce(false)
0085    {
0086    }
0087    TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
0088                      ULong64_t firstEntry)
0089       : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
0090         fCanReduce(false)
0091    {
0092    }
0093    ~TMPWorkerTreeFunc() override {}
0094 
0095 private:
0096    void Process(UInt_t code, MPCodeBufPair &msg) override;
0097    void SendResult() override;
0098 
0099    F  fProcFunc; ///< copy the function to be executed
0100    /// the results of the executions of fProcFunc merged together
0101    ROOT::TypeTraits::InvokeResult_t<F, std::reference_wrapper<TTreeReader>> fReducedResult;
0102    /// true if fReducedResult can be reduced with a new result, false until we have produced one result
0103    bool fCanReduce;
0104 };
0105 
0106 class TMPWorkerTreeSel : public TMPWorkerTree {
0107 public:
0108    TMPWorkerTreeSel(TSelector &selector, const std::vector<std::string> &fileNames, TEntryList *entries,
0109                     const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
0110       : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
0111         fCallBegin(true)
0112    {
0113    }
0114    TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
0115                     ULong64_t firstEntry)
0116       : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
0117    {
0118    }
0119    ~TMPWorkerTreeSel() override {}
0120 
0121 private:
0122    void Process(UInt_t code, MPCodeBufPair &msg) override;
0123    void SendResult() override;
0124 
0125    TSelector &fSelector; ///< pointer to the selector to be used to process the tree. It is null if we are not using a TSelector.
0126    bool fCallBegin = true;
0127 };
0128 
0129 //////////////////////////////////////////////////////////////////////////
0130 /// Auxiliary templated functions
0131 /// If the user lambda returns a TH1F*, TTree*, TEventList*, we incur in the
0132 /// problem of that object being automatically owned by the current open file.
0133 /// For these three types, we call SetDirectory(nullptr) to detach the returned
0134 /// object from the file we are reading the TTree from.
0135 /// Note: the only sane case in which this should happen is when a TH1F* is
0136 /// returned.
0137 template <class T, std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
0138                                     !std::is_constructible<TCollection *, T>::value> * = nullptr>
0139 void DetachRes(T res)
0140 {
0141    auto th1p = dynamic_cast<TH1*>(res);
0142    if(th1p != nullptr) {
0143       th1p->SetDirectory(nullptr);
0144       return;
0145    }
0146    auto ttreep = dynamic_cast<TTree*>(res);
0147    if(ttreep != nullptr) {
0148       ttreep->SetDirectory(nullptr);
0149       return;
0150    }
0151    auto tentrylist = dynamic_cast<TEntryList*>(res);
0152    if(tentrylist != nullptr) {
0153       tentrylist->SetDirectory(nullptr);
0154       return;
0155    }
0156    auto teventlist = dynamic_cast<TEventList*>(res);
0157    if(teventlist != nullptr) {
0158       teventlist->SetDirectory(nullptr);
0159       return;
0160    }
0161    return;
0162 }
0163 
0164 // Specialization for TCollections
0165 template <class T,
0166           std::enable_if_t<std::is_pointer<T>::value && std::is_constructible<TCollection *, T>::value> * = nullptr>
0167 void DetachRes(T res)
0168 {
0169    if (res) {
0170       TIter nxo(res);
0171       TObject *obj = nullptr;
0172       while ((obj = nxo())) {
0173          DetachRes(obj);
0174       }
0175    }
0176 }
0177 
0178 //////////////////////////////////////////////////////////////////////////
0179 /// Generic function processing SendResult and Process overload
0180 
0181 template<class F>
0182 void TMPWorkerTreeFunc<F>::SendResult()
0183 {
0184    //send back result
0185    MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
0186 }
0187 
0188 template <class F>
0189 void TMPWorkerTreeFunc<F>::Process(UInt_t code, MPCodeBufPair &msg)
0190 {
0191 
0192    Long64_t start = 0;
0193    Long64_t finish = 0;
0194    TEntryList *enl = nullptr;
0195    std::string reply, errmsg, sn = "[S" + std::to_string(GetNWorker()) + "]: ";
0196    if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
0197       reply = sn + errmsg;
0198       MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
0199       return;
0200    }
0201 
0202    // If we are not done processing entries in the tree, 
0203    // create a TTreeReader that reads this range of entries
0204    if (start >= 0 && start < fTree->GetEntries()) {
0205       TTreeReader reader(fTree, enl);
0206 
0207       TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
0208       if (status != TTreeReader::kEntryValid) {
0209          reply = sn + "could not set TTreeReader to range " + std::to_string(start) + " " + std::to_string(finish - 1);
0210          MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
0211          return;
0212       }
0213 
0214       // execute function
0215       auto res = fProcFunc(reader);
0216 
0217       // detach result from file if needed (currently needed for TH1, TTree, TEventList)
0218       DetachRes(res);
0219 
0220       if (fCanReduce) {
0221          PoolUtils::ReduceObjects<TObject *> redfunc;
0222          fReducedResult = static_cast<decltype(fReducedResult)>(redfunc(
0223             {res, fReducedResult})); // TODO try not to copy these into a vector, do everything by ref. std::vector<T&>?
0224       } else {
0225          fCanReduce = true;
0226          fReducedResult = res;
0227       }
0228    }
0229 
0230    //update the number of processed entries
0231    fProcessedEntries += finish - start;
0232 
0233    if(fMaxNEntries == fProcessedEntries)
0234       //we are done forever
0235       MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
0236    else
0237       //we are done for now
0238       MPSend(GetSocket(), MPCode::kIdling);
0239 }
0240 
0241 #endif