Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:10:51

0001 /* @(#)root/multiproc:$Id$ */
0002 // Author: Enrico Guiraud July 2015
0003 // Modified: G Ganis Jan 2017
0004 
0005 /*************************************************************************
0006  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers.               *
0007  * All rights reserved.                                                  *
0008  *                                                                       *
0009  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0010  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0011  *************************************************************************/
0012 
0013 #ifndef ROOT_TTreeProcessorMP
0014 #define ROOT_TTreeProcessorMP
0015 
0016 #include "MPCode.h"
0017 #include "MPSendRecv.h"
0018 #include "PoolUtils.h"
0019 #include "ROOT/TypeTraits.hxx" // InvokeResult_t
0020 #include "TChain.h"
0021 #include "TChainElement.h"
0022 #include "TError.h"
0023 #include "TFileCollection.h"
0024 #include "TFileInfo.h"
0025 #include "THashList.h"
0026 #include "TMPClient.h"
0027 #include "TMPWorkerTree.h"
0028 #include "TSelector.h"
0029 #include "TTreeReader.h"
0030 #include <algorithm> //std::generate
0031 #include <numeric> //std::iota
0032 #include <string>
0033 #include <functional> //std::reference_wrapper
0034 #include <vector>
0035 
0036 namespace ROOT {
0037 
0038 class TTreeProcessorMP : private TMPClient {
0039    template <typename F, typename... Args>
0040    using InvokeResult_t = ROOT::TypeTraits::InvokeResult_t<F, Args...>;
0041 
0042 public:
0043    explicit TTreeProcessorMP(UInt_t nWorkers = 0); //default number of workers is the number of processors
0044    ~TTreeProcessorMP() = default;
0045    //it doesn't make sense for a TTreeProcessorMP to be copied
0046    TTreeProcessorMP(const TTreeProcessorMP &) = delete;
0047    TTreeProcessorMP &operator=(const TTreeProcessorMP &) = delete;
0048 
0049    /// \brief Process a TTree dataset with a functor
0050    /// \tparam F functor returning a pointer to TObject or inheriting classes and
0051    ///          taking a TTreeReader& (both enforced at compile-time)
0052    ///
0053    /// Dataset definition:
0054    /// \param[in] fileNames  vector of strings with the paths of the files with the TTree to process
0055    /// \param[in] fileName   string with the path of the files with the TTree to process
0056    /// \param[in] collection TFileCollection with the files with the TTree to process
0057    /// \param[in] chain      TChain with the files with the TTree to process
0058    /// \param[in] tree       TTree to process
0059    ///
0060    /// \param[in] entries    TEntryList to filter the dataset
0061    /// \param[in] treeName   Name of the TTree to process
0062    /// \param[in] nToProcess Number of entries to process (0 means all)
0063    /// \param[in] jFirst     First entry to process (0 means the first of the first file)
0064    ///
0065    template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
0066                                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0067                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0068    template<class F> auto Process(const std::string& fileName, F procFunc, TEntryList &entries,
0069                                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0070                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0071    template<class F> auto Process(TFileCollection& collection, F procFunc, TEntryList &entries,
0072                                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0073                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0074    template<class F> auto Process(TChain& chain, F procFunc, TEntryList &entries,
0075                                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0076                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0077    template<class F> auto Process(TTree& tree, F procFunc, TEntryList &entries,
0078                                   ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0079                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0080 
0081    /// \brief Process a TTree dataset with a functor: version without entry list
0082    /// \tparam F functor returning a pointer to TObject or inheriting classes and
0083    ///          taking a TTreeReader& (both enforced at compile-time)
0084    ///
0085    /// Dataset definition:
0086    /// \param[in] fileNames  vector of strings with the paths of the files with the TTree to process
0087    /// \param[in] fileName   string with the path of the files with the TTree to process
0088    /// \param[in] collection TFileCollection with the files with the TTree to process
0089    /// \param[in] chain      TChain with the files with the TTree to process
0090    /// \param[in] tree       TTree to process
0091    ///
0092    /// \param[in] treeName   Name of the TTree to process
0093    /// \param[in] nToProcess Number of entries to process (0 means all)
0094    /// \param[in] jFirst     First entry to process (0 means the first of the first file)
0095    ///
0096    template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc,
0097                                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0098                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0099    template<class F> auto Process(const std::string& fileName, F procFunc,
0100                                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0101                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0102    template<class F> auto Process(TFileCollection& files, F procFunc,
0103                                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0104                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0105    template<class F> auto Process(TChain& files, F procFunc,
0106                                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0107                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0108    template<class F> auto Process(TTree& tree, F procFunc, ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
0109                                   -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0110 
0111 
0112    /// \brief Process a TTree dataset with a selector
0113    ///
0114    /// Dataset definition:
0115    /// \param[in] fileNames  vector of strings with the paths of the files with the TTree to process
0116    /// \param[in] fileName   string with the path of the files with the TTree to process
0117    /// \param[in] collection TFileCollection with the files with the TTree to process
0118    /// \param[in] chain      TChain with the files with the TTree to process
0119    /// \param[in] tree       TTree to process
0120    ///
0121    /// \param[in] selector   Instance of TSelector to be applied to the dataset
0122    /// \param[in] entries    TEntryList to filter the dataset
0123    /// \param[in] treeName   Name of the TTree to process
0124    /// \param[in] nToProcess Number of entries to process (0 means all)
0125    /// \param[in] jFirst     First entry to process (0 means the first of the first file)
0126    ///
0127    // these versions require a TSelector
0128    TList* Process(const std::vector<std::string>& fileNames, TSelector& selector, TEntryList &entries,
0129                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0130    TList* Process(const std::string &fileName, TSelector& selector, TEntryList &entries,
0131                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0132    TList* Process(TFileCollection& files, TSelector& selector, TEntryList &entries,
0133                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0134    TList* Process(TChain& files, TSelector& selector, TEntryList &entries,
0135                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0136    TList* Process(TTree& tree, TSelector& selector, TEntryList &entries,
0137                   ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0138 
0139 
0140    /// \brief Process a TTree dataset with a selector: version without entry list
0141    ///
0142    /// Dataset definition:
0143    /// \param[in] fileNames  vector of strings with the paths of the files with the TTree to process
0144    /// \param[in] fileName   string with the path of the files with the TTree to process
0145    /// \param[in] collection TFileCollection with the files with the TTree to process
0146    /// \param[in] chain      TChain with the files with the TTree to process
0147    /// \param[in] tree       TTree to process
0148    ///
0149    /// \param[in] selector   Instance of TSelector to be applied to the dataset
0150    /// \param[in] treeName   Name of the TTree to process
0151    /// \param[in] nToProcess Number of entries to process (0 means all)
0152    /// \param[in] jFirst     First entry to process (0 means the first of the first file)
0153    ///
0154    // these versions require a TSelector
0155    TList* Process(const std::vector<std::string>& fileNames, TSelector& selector,
0156                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0157    TList* Process(const std::string &fileName, TSelector& selector,
0158                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0159    TList* Process(TFileCollection& files, TSelector& selector,
0160                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0161    TList* Process(TChain& files, TSelector& selector,
0162                   const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0163    TList* Process(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
0164 
0165    void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
0166    unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
0167 
0168 private:
0169    template<class T> void Collect(std::vector<T> &reslist);
0170    template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
0171 
0172    void FixLists(std::vector<TObject*> &lists);
0173    void Reset();
0174    void ReplyToIdle(TSocket *s);
0175 
0176    unsigned fNProcessed; ///< number of arguments already passed to the workers
0177    unsigned fNToProcess; ///< total number of arguments to pass to the workers
0178 
0179    /// A collection of the types of tasks that TTreeProcessorMP can execute.
0180    /// It is used to interpret in the right way and properly reply to the
0181    /// messages received (see, for example, TTreeProcessorMP::HandleInput)
0182    enum class ETask : unsigned char {
0183       kNoTask,        ///< no task is being executed
0184       kProcByRange,   ///< a Process method is being executed and each worker will process a certain range of each file
0185       kProcByFile     ///< a Process method is being executed and each worker will process a different file
0186    };
0187 
0188    ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
0189 };
0190 
0191 template<class F>
0192 auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc,  TEntryList &entries,
0193                                const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0194                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0195 {
0196    using retType = InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0197    static_assert(std::is_constructible<TObject*, retType>::value,
0198                  "procFunc must return a pointer to a class inheriting from TObject,"
0199                  " and must take a reference to TTreeReader as the only argument");
0200 
0201    // Warn for yet unimplemented functionality
0202    if (jFirst > 0) {
0203       Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
0204       jFirst = 0;
0205    }
0206 
0207    //prepare environment
0208    Reset();
0209    unsigned nWorkers = GetNWorkers();
0210 
0211    // Check th entry list
0212    TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
0213    //fork
0214    TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
0215    bool ok = Fork(worker);
0216    if(!ok) {
0217       Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
0218       return nullptr;
0219    }
0220 
0221 
0222    if(fileNames.size() < nWorkers) {
0223       //TTree entry granularity. For each file, we divide entries equally between workers
0224       fTaskType = ETask::kProcByRange;
0225       //Tell workers to start processing entries
0226       fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
0227       std::vector<unsigned> args(nWorkers);
0228       std::iota(args.begin(), args.end(), 0);
0229       fNProcessed = Broadcast(MPCode::kProcRange, args);
0230       if(fNProcessed < nWorkers)
0231          Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
0232    } else {
0233       //file granularity. each worker processes one whole file as a single task
0234       fTaskType = ETask::kProcByFile;
0235       fNToProcess = fileNames.size();
0236       std::vector<unsigned> args(nWorkers);
0237       std::iota(args.begin(), args.end(), 0);
0238       fNProcessed = Broadcast(MPCode::kProcFile, args);
0239       if(fNProcessed < nWorkers)
0240          Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
0241    }
0242 
0243    //collect results, distribute new tasks
0244    std::vector<TObject*> reslist;
0245    Collect(reslist);
0246 
0247    //merge
0248    PoolUtils::ReduceObjects<TObject *> redfunc;
0249    auto res = redfunc(reslist);
0250 
0251    //clean-up and return
0252    ReapWorkers();
0253    fTaskType = ETask::kNoTask;
0254    return static_cast<retType>(res);
0255 }
0256 
0257 
0258 template<class F>
0259 auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc,  TEntryList &entries,
0260                                const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0261                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0262 {
0263    std::vector<std::string> singleFileName(1, fileName);
0264    return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
0265 }
0266 
0267 
0268 template<class F>
0269 auto TTreeProcessorMP::Process(TFileCollection& files, F procFunc, TEntryList &entries,
0270                                const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0271                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0272 {
0273    std::vector<std::string> fileNames(files.GetNFiles());
0274    unsigned count = 0;
0275    for(auto f : *static_cast<THashList*>(files.GetList()))
0276       fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
0277 
0278    return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
0279 }
0280 
0281 
0282 template<class F>
0283 auto TTreeProcessorMP::Process(TChain& files, F procFunc, TEntryList &entries,
0284                                const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0285                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0286 {
0287    TObjArray* filelist = files.GetListOfFiles();
0288    std::vector<std::string> fileNames(filelist->GetEntries());
0289    unsigned count = 0;
0290    for(auto f : *filelist)
0291       fileNames[count++] = f->GetTitle();
0292 
0293    return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
0294 }
0295 
0296 
0297 template<class F>
0298 auto TTreeProcessorMP::Process(TTree& tree, F procFunc, TEntryList &entries,
0299                                ULong64_t nToProcess, ULong64_t jFirst)
0300                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0301 {
0302    using retType = InvokeResult_t<F, std::reference_wrapper<TTreeReader>>;
0303    static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
0304 
0305    // Warn for yet unimplemented functionality
0306    if (jFirst > 0) {
0307       Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
0308       jFirst = 0;
0309    }
0310 
0311    //prepare environment
0312    Reset();
0313    unsigned nWorkers = GetNWorkers();
0314 
0315    // Check th entry list
0316    TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
0317    //fork
0318    TMPWorkerTreeFunc<F> worker(procFunc, &tree, elist, nWorkers, nToProcess, jFirst);
0319    bool ok = Fork(worker);
0320    if(!ok) {
0321       Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
0322       return nullptr;
0323    }
0324 
0325    //divide entries equally between workers
0326    fTaskType = ETask::kProcByRange;
0327 
0328    //tell workers to start processing entries
0329    fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
0330    std::vector<unsigned> args(nWorkers);
0331    std::iota(args.begin(), args.end(), 0);
0332    fNProcessed = Broadcast(MPCode::kProcTree, args);
0333    if(fNProcessed < nWorkers)
0334       Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
0335 
0336    //collect results, distribute new tasks
0337    std::vector<TObject*> reslist;
0338    Collect(reslist);
0339 
0340    //merge
0341    PoolUtils::ReduceObjects<TObject *> redfunc;
0342    auto res = redfunc(reslist);
0343 
0344    //clean-up and return
0345    ReapWorkers();
0346    fTaskType = ETask::kNoTask;
0347    return static_cast<retType>(res);
0348 }
0349 
0350 
0351 ///
0352 /// No TEntryList versions of generic processor
0353 ///
0354 
0355 template<class F>
0356 auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc,
0357                                const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0358                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0359 {
0360    TEntryList noelist;
0361    return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
0362 }
0363 
0364 
0365 template<class F>
0366 auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc,
0367                                const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0368                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0369 {
0370    TEntryList noelist;
0371    return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
0372 }
0373 
0374 
0375 template<class F>
0376 auto TTreeProcessorMP::Process(TFileCollection& files, F procFunc,
0377                                const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0378                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0379 {
0380    TEntryList noelist;
0381    return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
0382 }
0383 
0384 
0385 template<class F>
0386 auto TTreeProcessorMP::Process(TChain& files, F procFunc,
0387                                const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
0388                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0389 {
0390    TEntryList noelist;
0391    return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
0392 }
0393 
0394 
0395 template<class F>
0396 auto TTreeProcessorMP::Process(TTree& tree, F procFunc,
0397                                ULong64_t nToProcess, ULong64_t jFirst)
0398                                -> InvokeResult_t<F, std::reference_wrapper<TTreeReader>>
0399 {
0400    TEntryList noelist;
0401    return Process(tree, procFunc, noelist, nToProcess, jFirst);
0402 }
0403 
0404 //////////////////////////////////////////////////////////////////////////
0405 /// Handle message and reply to the worker
0406 template<class T>
0407 void TTreeProcessorMP::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
0408 {
0409    unsigned code = msg.first;
0410    if (code == MPCode::kIdling) {
0411       ReplyToIdle(s);
0412    } else if(code == MPCode::kProcResult) {
0413       if(msg.second != nullptr)
0414          reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
0415       MPSend(s, MPCode::kShutdownOrder);
0416    } else if(code == MPCode::kProcError) {
0417       const char *str = ReadBuffer<const char*>(msg.second.get());
0418       Error("TTreeProcessorMP::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
0419                                          "Continuing execution ignoring these entries.", str);
0420       ReplyToIdle(s);
0421       delete [] str;
0422    } else {
0423       // UNKNOWN CODE
0424       Error("TTreeProcessorMP::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
0425    }
0426 }
0427 
0428 //////////////////////////////////////////////////////////////////////////
0429 /// Listen for messages sent by the workers and call the appropriate handler function.
0430 /// TTreeProcessorMP::HandlePoolCode is called on messages with a code < 1000 and
0431 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
0432 template<class T>
0433 void TTreeProcessorMP::Collect(std::vector<T> &reslist)
0434 {
0435    TMonitor &mon = GetMonitor();
0436    mon.ActivateAll();
0437    while (mon.GetActive() > 0) {
0438       TSocket *s = mon.Select();
0439       MPCodeBufPair msg = MPRecv(s);
0440       if (msg.first == MPCode::kRecvError) {
0441          Error("TTreeProcessorMP::Collect", "[E][C] Lost connection to a worker");
0442          Remove(s);
0443       } else if (msg.first < 1000)
0444          HandlePoolCode(msg, s, reslist);
0445       else
0446          HandleMPCode(msg, s);
0447    }
0448 }
0449 
0450 } // ROOT namespace
0451 
0452 #endif