Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 10:29:50

0001 // Author: Enrico Guiraud, Danilo Piparo CERN  03/2017
0002 
0003 /*************************************************************************
0004  * Copyright (C) 1995-2022, Rene Brun and Fons Rademakers.               *
0005  * All rights reserved.                                                  *
0006  *                                                                       *
0007  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0008  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0009  *************************************************************************/
0010 
0011 #ifndef ROOT_RLOOPMANAGER
0012 #define ROOT_RLOOPMANAGER
0013 
0014 #include "ROOT/RDataSource.hxx"
0015 #include "ROOT/RDF/RColumnReaderBase.hxx"
0016 #include "ROOT/RDF/RDatasetSpec.hxx"
0017 #include "ROOT/RDF/RNodeBase.hxx"
0018 #include "ROOT/RDF/RNewSampleNotifier.hxx"
0019 #include "ROOT/RDF/RSampleInfo.hxx"
0020 #include "ROOT/RDF/Utils.hxx"
0021 
0022 #include <functional>
0023 #include <limits>
0024 #include <map>
0025 #include <memory>
0026 #include <set>
0027 #include <string>
0028 #include <string_view>
0029 #include <unordered_map>
0030 #include <unordered_set>
0031 #include <vector>
0032 #include <any>
0033 
0034 // forward declarations
0035 class TTree;
0036 class TTreeReader;
0037 class TDirectory;
0038 
0039 namespace ROOT {
0040 namespace RDF {
0041 class RCutFlowReport;
0042 class RDataSource;
0043 } // ns RDF
0044 
0045 namespace Internal {
0046 class RSlotStack;
0047 namespace RDF {
0048 class GraphNode;
0049 class RActionBase;
0050 class RVariationBase;
0051 class RDefinesWithReaders;
0052 class RVariationsWithReaders;
0053 
0054 namespace GraphDrawing {
0055 class GraphCreatorHelper;
0056 } // ns GraphDrawing
0057 
0058 using Callback_t = std::function<void(unsigned int)>;
0059 
0060 class RCallback {
0061    const Callback_t fFun;
0062    const ULong64_t fEveryN;
0063    std::vector<ULong64_t> fCounters;
0064 
0065 public:
0066    RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
0067       : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
0068    {
0069    }
0070 
0071    void operator()(unsigned int slot)
0072    {
0073       auto &c = fCounters[slot];
0074       ++c;
0075       if (c == fEveryN) {
0076          c = 0ull;
0077          fFun(slot);
0078       }
0079    }
0080 };
0081 
0082 class ROneTimeCallback {
0083    const Callback_t fFun;
0084    std::vector<int> fHasBeenCalled; // std::vector<bool> is thread-unsafe for our purposes (and generally evil)
0085 
0086 public:
0087    ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
0088 
0089    void operator()(unsigned int slot)
0090    {
0091       if (fHasBeenCalled[slot] == 1)
0092          return;
0093       fFun(slot);
0094       fHasBeenCalled[slot] = 1;
0095    }
0096 };
0097 
0098 struct RDSRangeRAII;
0099 
0100 } // namespace RDF
0101 } // namespace Internal
0102 } // namespace ROOT
0103 
0104 namespace ROOT {
0105 namespace Detail {
0106 namespace RDF {
0107 namespace RDFInternal = ROOT::Internal::RDF;
0108 
0109 class RFilterBase;
0110 class RRangeBase;
0111 class RDefineBase;
0112 using ROOT::RDF::RDataSource;
0113 
0114 /// The head node of a RDF computation graph.
0115 /// This class is responsible of running the event loop.
0116 class RLoopManager : public RNodeBase {
0117    using ColumnNames_t = std::vector<std::string>;
0118    enum class ELoopType {
0119       kInvalid,
0120       kNoFiles,
0121       kNoFilesMT,
0122       kDataSource,
0123       kDataSourceMT
0124    };
0125 
0126    friend struct RCallCleanUpTask;
0127    friend struct ROOT::Internal::RDF::RDSRangeRAII;
0128 
0129    /*
0130    A wrapped reference to a TTree dataset that can be shared by many computation graphs. Ensures lifetime management.
0131    It needs to be destroyed *after* the fDataSource data member, in case it is holding the only Python reference to the
0132    input dataset and the data source needs to access it before it is destroyed.
0133    */
0134    std::any fTTreeLifeline{};
0135 
0136    std::vector<RDFInternal::RActionBase *> fBookedActions; ///< Non-owning pointers to actions to be run
0137    std::vector<RDFInternal::RActionBase *> fRunActions;    ///< Non-owning pointers to actions already run
0138    std::vector<RFilterBase *> fBookedFilters;
0139    std::vector<RFilterBase *> fBookedNamedFilters; ///< Contains a subset of fBookedFilters, i.e. only the named filters
0140    std::vector<RRangeBase *> fBookedRanges;
0141    std::vector<RDefineBase *> fBookedDefines;
0142    std::vector<RDFInternal::RVariationBase *> fBookedVariations;
0143 
0144    Long64_t fBeginEntry{0};
0145    Long64_t fEndEntry{std::numeric_limits<Long64_t>::max()};
0146 
0147    /// Keys are `fname + "/" + treename` as RSampleInfo::fID; Values are pointers to the corresponding sample
0148    std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> fSampleMap;
0149    /// Samples need to survive throughout the whole event loop, hence stored as an attribute
0150    std::vector<ROOT::RDF::Experimental::RSample> fSamples;
0151 
0152    ColumnNames_t fDefaultColumns;
0153    /// Range of entries created when no data source is specified.
0154    std::pair<ULong64_t, ULong64_t> fEmptyEntryRange{};
0155    unsigned int fNSlots{1};
0156    bool fMustRunNamedFilters{true};
0157    /// The kind of event loop that is going to be run (e.g. on ROOT files, on no files)
0158    ELoopType fLoopType{ELoopType::kInvalid};
0159    std::unique_ptr<RDataSource> fDataSource{}; ///< Owning pointer to a data-source object. Null if no data-source
0160    /// Registered callbacks to be executed every N events.
0161    /// The registration happens via the RegisterCallback method.
0162    std::vector<RDFInternal::RCallback> fCallbacksEveryNEvents;
0163    /// Registered callbacks to invoke just once before running the loop.
0164    /// The registration happens via the RegisterCallback method.
0165    std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
0166    /// Registered callbacks to call at the beginning of each "data block".
0167    /// The key is the pointer of the corresponding node in the computation graph (a RDefinePerSample or a RAction).
0168    std::unordered_map<void *, ROOT::RDF::SampleCallback_t> fSampleCallbacks;
0169    RDFInternal::RNewSampleNotifier fNewSampleNotifier;
0170    std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
0171    unsigned int fNRuns{0}; ///< Number of event loops run
0172 
0173    /// Readers for TTree/RDataSource columns (one per slot), shared by all nodes in the computation graph.
0174    std::vector<std::unordered_map<std::string, std::unique_ptr<RColumnReaderBase>>> fDatasetColumnReaders;
0175 
0176    /// Pointer to a shared slot stack in case this instance runs concurrently with others:
0177    std::weak_ptr<ROOT::Internal::RSlotStack> fSlotStack;
0178 
0179    void RunEmptySourceMT();
0180    void RunEmptySource();
0181    void RunDataSourceMT();
0182    void RunDataSource();
0183    void RunAndCheckFilters(unsigned int slot, Long64_t entry);
0184    void InitNodeSlots(TTreeReader *r, unsigned int slot);
0185    void InitNodes();
0186    void CleanUpNodes();
0187    void CleanUpTask(TTreeReader *r, unsigned int slot);
0188    void EvalChildrenCounts();
0189    void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
0190    void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
0191    void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
0192    std::shared_ptr<ROOT::Internal::RSlotStack> SlotStack() const;
0193 
0194    // List of branches for which we want to suppress the printed error about
0195    // missing branch when switching to a new tree. This is modified by readers,
0196    // so must be declared before them in this class.
0197    std::set<std::string> fSuppressErrorsForMissingBranches{};
0198    ROOT::Internal::RDF::RStringCache fCachedColNames;
0199    std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RDefinesWithReaders>>>
0200       fUniqueDefinesWithReaders;
0201    std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RVariationsWithReaders>>>
0202       fUniqueVariationsWithReaders;
0203 
0204 public:
0205    RLoopManager(const ColumnNames_t &defaultColumns = {});
0206    RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
0207    RLoopManager(ULong64_t nEmptyEntries);
0208    RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
0209    RLoopManager(ROOT::RDF::Experimental::RDatasetSpec &&spec);
0210 
0211    // Rule of five
0212 
0213    RLoopManager(const RLoopManager &) = delete;
0214    RLoopManager &operator=(const RLoopManager &) = delete;
0215    RLoopManager(RLoopManager &&) = delete;
0216    RLoopManager &operator=(RLoopManager &&) = delete;
0217    ~RLoopManager() override;
0218 
0219    void Jit();
0220    RLoopManager *GetLoopManagerUnchecked() final { return this; }
0221    void Run(bool jit = true);
0222    const ColumnNames_t &GetDefaultColumnNames() const;
0223    ULong64_t GetNEmptyEntries() const { return fEmptyEntryRange.second - fEmptyEntryRange.first; }
0224    RDataSource *GetDataSource() const { return fDataSource.get(); }
0225    void Register(RDFInternal::RActionBase *actionPtr);
0226    void Deregister(RDFInternal::RActionBase *actionPtr);
0227    void Register(RFilterBase *filterPtr);
0228    void Deregister(RFilterBase *filterPtr);
0229    void Register(RRangeBase *rangePtr);
0230    void Deregister(RRangeBase *rangePtr);
0231    void Register(RDefineBase *definePtr);
0232    void Deregister(RDefineBase *definePtr);
0233    void Register(RDFInternal::RVariationBase *varPtr);
0234    void Deregister(RDFInternal::RVariationBase *varPtr);
0235    bool CheckFilters(unsigned int, Long64_t) final;
0236    unsigned int GetNSlots() const { return fNSlots; }
0237    void Report(ROOT::RDF::RCutFlowReport &rep) const final;
0238    /// End of recursive chain of calls, does nothing
0239    void PartialReport(ROOT::RDF::RCutFlowReport &) const final {}
0240    void IncrChildrenCount() final { ++fNChildren; }
0241    void StopProcessing() final { ++fNStopsReceived; }
0242    void ToJitExec(const std::string &) const;
0243    void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
0244    unsigned int GetNRuns() const { return fNRuns; }
0245    bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const;
0246    void AddDataSourceColumnReaders(std::string_view col, std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
0247                                    const std::type_info &ti);
0248    RColumnReaderBase *GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const;
0249    RColumnReaderBase *AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti,
0250                                                 TTreeReader *treeReader);
0251 
0252    /// End of recursive chain of calls, does nothing
0253    void AddFilterName(std::vector<std::string> &) final {}
0254    /// For each booked filter, returns either the name or "Unnamed Filter"
0255    std::vector<std::string> GetFiltersNames();
0256 
0257    /// Return all graph edges known to RLoopManager
0258    /// This includes Filters and Ranges but not Defines.
0259    std::vector<RNodeBase *> GetGraphEdges() const;
0260 
0261    /// Return all actions, either booked or already run
0262    std::vector<RDFInternal::RActionBase *> GetAllActions() const;
0263 
0264    std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>
0265    GetGraph(std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap) final;
0266 
0267    void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback);
0268 
0269    void SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange);
0270    void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end);
0271    void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec);
0272 
0273    ROOT::Internal::RDF::RStringCache &GetColumnNamesCache() { return fCachedColNames; }
0274    std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RDefinesWithReaders>>> &
0275    GetUniqueDefinesWithReaders()
0276    {
0277       return fUniqueDefinesWithReaders;
0278    }
0279    std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RVariationsWithReaders>>> &
0280    GetUniqueVariationsWithReaders()
0281    {
0282       return fUniqueVariationsWithReaders;
0283    }
0284 
0285    void SetTTreeLifeline(std::any lifeline);
0286    /// Register a slot stack to be used by this RLoopManager. This allows for sharing RDataFrame helpers safely in the
0287    /// context of RunGraphs(). Note that the loop manager only stores a weak_ptr, in between runs.
0288    void SetSlotStack(const std::shared_ptr<ROOT::Internal::RSlotStack> &slotStack) { fSlotStack = slotStack; }
0289 
0290    void SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource);
0291 
0292    void InsertSuppressErrorsForMissingBranch(const std::string &branchName)
0293    {
0294       fSuppressErrorsForMissingBranches.insert(branchName);
0295    }
0296    void EraseSuppressErrorsForMissingBranch(const std::string &branchName)
0297    {
0298       fSuppressErrorsForMissingBranches.erase(branchName);
0299    }
0300    const std::set<std::string> &GetSuppressErrorsForMissingBranches() const
0301    {
0302       return fSuppressErrorsForMissingBranches;
0303    }
0304 
0305    /// The task run by every thread on the input entry range, for the generic RDataSource.
0306    void DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange, ROOT::Internal::RSlotStack &slotStack,
0307                              std::atomic<ULong64_t> &entryCount);
0308    /// The task run by every thread on an entry range (known by the input TTreeReader), for the TTree data source.
0309    void
0310    TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic<ULong64_t> &entryCount);
0311 };
0312 
0313 /// \brief Create an RLoopManager that reads a TChain.
0314 /// \param[in] datasetName Name of the TChain
0315 /// \param[in] fileNameGlob File name (or glob) in which the TChain is stored.
0316 /// \param[in] defaultColumns List of default columns, see
0317 /// \param[in] checkFile file validator boolean
0318 /// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
0319 /// \return the RLoopManager instance.
0320 std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
0321 CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob,
0322                   const std::vector<std::string> &defaultColumns, bool checkFile = true);
0323 
0324 /// \brief Create an RLoopManager that reads a TChain.
0325 /// \param[in] datasetName Name of the TChain
0326 /// \param[in] fileNameGlobs List of file names (potentially globs).
0327 /// \param[in] defaultColumns List of default columns, see
0328 /// \param[in] checkFile file validator boolean
0329 /// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
0330 /// \return the RLoopManager instance.
0331 std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
0332 CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
0333                   const std::vector<std::string> &defaultColumns, bool checkFile = true);
0334 
0335 /// \brief Create an RLoopManager that reads an RNTuple.
0336 /// \param[in] datasetName Name of the RNTuple
0337 /// \param[in] fileNameGlob File name (or glob) in which the RNTuple is stored.
0338 /// \param[in] defaultColumns List of default columns, see
0339 /// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
0340 /// \return the RLoopManager instance.
0341 std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
0342                                                                      std::string_view fileNameGlob,
0343                                                                      const std::vector<std::string> &defaultColumns);
0344 
0345 /// \brief Create an RLoopManager that reads multiple RNTuples chained vertically.
0346 /// \param[in] datasetName Name of the RNTuple
0347 /// \param[in] fileNameGlobs List of file names (potentially globs).
0348 /// \param[in] defaultColumns List of default columns, see
0349 /// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
0350 /// \return the RLoopManager instance.
0351 std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
0352                                                                      const std::vector<std::string> &fileNameGlobs,
0353                                                                      const std::vector<std::string> &defaultColumns);
0354 
0355 /// \brief Create an RLoopManager opening a file and checking the data format of the dataset.
0356 /// \param[in] datasetName Name of the dataset in the file.
0357 /// \param[in] fileNameGlob File name (or glob) in which the dataset is stored.
0358 /// \param[in] defaultColumns List of default columns, see
0359 /// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
0360 /// \throws std::invalid_argument if the file could not be opened.
0361 /// \return an RLoopManager of the appropriate data source.
0362 std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
0363                                                                   std::string_view fileNameGlob,
0364                                                                   const std::vector<std::string> &defaultColumns);
0365 
0366 /// \brief Create an RLoopManager that reads many files. The first is opened to infer the data source type.
0367 /// \param[in] datasetName Name of the dataset.
0368 /// \param[in] fileNameGlobs List of file names (potentially globs).
0369 /// \param[in] defaultColumns List of default columns, see
0370 /// \ref https://root.cern/doc/master/classROOT_1_1RDataFrame.html#default-branches "Default column lists"
0371 /// \throws std::invalid_argument if the file could not be opened.
0372 /// \return an RLoopManager of the appropriate data source.
0373 std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
0374                                                                   const std::vector<std::string> &fileNameGlobs,
0375                                                                   const std::vector<std::string> &defaultColumns);
0376 
0377 } // namespace RDF
0378 } // namespace Detail
0379 } // namespace ROOT
0380 
0381 #endif