File indexing completed on 2025-12-16 10:29:50
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011 #ifndef ROOT_RLOOPMANAGER
0012 #define ROOT_RLOOPMANAGER
0013
0014 #include "ROOT/RDataSource.hxx"
0015 #include "ROOT/RDF/RColumnReaderBase.hxx"
0016 #include "ROOT/RDF/RDatasetSpec.hxx"
0017 #include "ROOT/RDF/RNodeBase.hxx"
0018 #include "ROOT/RDF/RNewSampleNotifier.hxx"
0019 #include "ROOT/RDF/RSampleInfo.hxx"
0020 #include "ROOT/RDF/Utils.hxx"
0021
0022 #include <functional>
0023 #include <limits>
0024 #include <map>
0025 #include <memory>
0026 #include <set>
0027 #include <string>
0028 #include <string_view>
0029 #include <unordered_map>
0030 #include <unordered_set>
0031 #include <vector>
0032 #include <any>
0033
0034
0035 class TTree;
0036 class TTreeReader;
0037 class TDirectory;
0038
0039 namespace ROOT {
0040 namespace RDF {
0041 class RCutFlowReport;
0042 class RDataSource;
0043 }
0044
0045 namespace Internal {
0046 class RSlotStack;
0047 namespace RDF {
0048 class GraphNode;
0049 class RActionBase;
0050 class RVariationBase;
0051 class RDefinesWithReaders;
0052 class RVariationsWithReaders;
0053
0054 namespace GraphDrawing {
0055 class GraphCreatorHelper;
0056 }
0057
0058 using Callback_t = std::function<void(unsigned int)>;
0059
0060 class RCallback {
0061 const Callback_t fFun;
0062 const ULong64_t fEveryN;
0063 std::vector<ULong64_t> fCounters;
0064
0065 public:
0066 RCallback(ULong64_t everyN, Callback_t &&f, unsigned int nSlots)
0067 : fFun(std::move(f)), fEveryN(everyN), fCounters(nSlots, 0ull)
0068 {
0069 }
0070
0071 void operator()(unsigned int slot)
0072 {
0073 auto &c = fCounters[slot];
0074 ++c;
0075 if (c == fEveryN) {
0076 c = 0ull;
0077 fFun(slot);
0078 }
0079 }
0080 };
0081
0082 class ROneTimeCallback {
0083 const Callback_t fFun;
0084 std::vector<int> fHasBeenCalled;
0085
0086 public:
0087 ROneTimeCallback(Callback_t &&f, unsigned int nSlots) : fFun(std::move(f)), fHasBeenCalled(nSlots, 0) {}
0088
0089 void operator()(unsigned int slot)
0090 {
0091 if (fHasBeenCalled[slot] == 1)
0092 return;
0093 fFun(slot);
0094 fHasBeenCalled[slot] = 1;
0095 }
0096 };
0097
0098 struct RDSRangeRAII;
0099
0100 }
0101 }
0102 }
0103
0104 namespace ROOT {
0105 namespace Detail {
0106 namespace RDF {
0107 namespace RDFInternal = ROOT::Internal::RDF;
0108
0109 class RFilterBase;
0110 class RRangeBase;
0111 class RDefineBase;
0112 using ROOT::RDF::RDataSource;
0113
0114
0115
0116 class RLoopManager : public RNodeBase {
0117 using ColumnNames_t = std::vector<std::string>;
0118 enum class ELoopType {
0119 kInvalid,
0120 kNoFiles,
0121 kNoFilesMT,
0122 kDataSource,
0123 kDataSourceMT
0124 };
0125
0126 friend struct RCallCleanUpTask;
0127 friend struct ROOT::Internal::RDF::RDSRangeRAII;
0128
0129
0130
0131
0132
0133
0134 std::any fTTreeLifeline{};
0135
0136 std::vector<RDFInternal::RActionBase *> fBookedActions;
0137 std::vector<RDFInternal::RActionBase *> fRunActions;
0138 std::vector<RFilterBase *> fBookedFilters;
0139 std::vector<RFilterBase *> fBookedNamedFilters;
0140 std::vector<RRangeBase *> fBookedRanges;
0141 std::vector<RDefineBase *> fBookedDefines;
0142 std::vector<RDFInternal::RVariationBase *> fBookedVariations;
0143
0144 Long64_t fBeginEntry{0};
0145 Long64_t fEndEntry{std::numeric_limits<Long64_t>::max()};
0146
0147
0148 std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> fSampleMap;
0149
0150 std::vector<ROOT::RDF::Experimental::RSample> fSamples;
0151
0152 ColumnNames_t fDefaultColumns;
0153
0154 std::pair<ULong64_t, ULong64_t> fEmptyEntryRange{};
0155 unsigned int fNSlots{1};
0156 bool fMustRunNamedFilters{true};
0157
0158 ELoopType fLoopType{ELoopType::kInvalid};
0159 std::unique_ptr<RDataSource> fDataSource{};
0160
0161
0162 std::vector<RDFInternal::RCallback> fCallbacksEveryNEvents;
0163
0164
0165 std::vector<RDFInternal::ROneTimeCallback> fCallbacksOnce;
0166
0167
0168 std::unordered_map<void *, ROOT::RDF::SampleCallback_t> fSampleCallbacks;
0169 RDFInternal::RNewSampleNotifier fNewSampleNotifier;
0170 std::vector<ROOT::RDF::RSampleInfo> fSampleInfos;
0171 unsigned int fNRuns{0};
0172
0173
0174 std::vector<std::unordered_map<std::string, std::unique_ptr<RColumnReaderBase>>> fDatasetColumnReaders;
0175
0176
0177 std::weak_ptr<ROOT::Internal::RSlotStack> fSlotStack;
0178
0179 void RunEmptySourceMT();
0180 void RunEmptySource();
0181 void RunDataSourceMT();
0182 void RunDataSource();
0183 void RunAndCheckFilters(unsigned int slot, Long64_t entry);
0184 void InitNodeSlots(TTreeReader *r, unsigned int slot);
0185 void InitNodes();
0186 void CleanUpNodes();
0187 void CleanUpTask(TTreeReader *r, unsigned int slot);
0188 void EvalChildrenCounts();
0189 void SetupSampleCallbacks(TTreeReader *r, unsigned int slot);
0190 void UpdateSampleInfo(unsigned int slot, const std::pair<ULong64_t, ULong64_t> &range);
0191 void UpdateSampleInfo(unsigned int slot, TTreeReader &r);
0192 std::shared_ptr<ROOT::Internal::RSlotStack> SlotStack() const;
0193
0194
0195
0196
0197 std::set<std::string> fSuppressErrorsForMissingBranches{};
0198 ROOT::Internal::RDF::RStringCache fCachedColNames;
0199 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RDefinesWithReaders>>>
0200 fUniqueDefinesWithReaders;
0201 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RVariationsWithReaders>>>
0202 fUniqueVariationsWithReaders;
0203
0204 public:
0205 RLoopManager(const ColumnNames_t &defaultColumns = {});
0206 RLoopManager(TTree *tree, const ColumnNames_t &defaultBranches);
0207 RLoopManager(ULong64_t nEmptyEntries);
0208 RLoopManager(std::unique_ptr<RDataSource> ds, const ColumnNames_t &defaultBranches);
0209 RLoopManager(ROOT::RDF::Experimental::RDatasetSpec &&spec);
0210
0211
0212
0213 RLoopManager(const RLoopManager &) = delete;
0214 RLoopManager &operator=(const RLoopManager &) = delete;
0215 RLoopManager(RLoopManager &&) = delete;
0216 RLoopManager &operator=(RLoopManager &&) = delete;
0217 ~RLoopManager() override;
0218
0219 void Jit();
0220 RLoopManager *GetLoopManagerUnchecked() final { return this; }
0221 void Run(bool jit = true);
0222 const ColumnNames_t &GetDefaultColumnNames() const;
0223 ULong64_t GetNEmptyEntries() const { return fEmptyEntryRange.second - fEmptyEntryRange.first; }
0224 RDataSource *GetDataSource() const { return fDataSource.get(); }
0225 void Register(RDFInternal::RActionBase *actionPtr);
0226 void Deregister(RDFInternal::RActionBase *actionPtr);
0227 void Register(RFilterBase *filterPtr);
0228 void Deregister(RFilterBase *filterPtr);
0229 void Register(RRangeBase *rangePtr);
0230 void Deregister(RRangeBase *rangePtr);
0231 void Register(RDefineBase *definePtr);
0232 void Deregister(RDefineBase *definePtr);
0233 void Register(RDFInternal::RVariationBase *varPtr);
0234 void Deregister(RDFInternal::RVariationBase *varPtr);
0235 bool CheckFilters(unsigned int, Long64_t) final;
0236 unsigned int GetNSlots() const { return fNSlots; }
0237 void Report(ROOT::RDF::RCutFlowReport &rep) const final;
0238
0239 void PartialReport(ROOT::RDF::RCutFlowReport &) const final {}
0240 void IncrChildrenCount() final { ++fNChildren; }
0241 void StopProcessing() final { ++fNStopsReceived; }
0242 void ToJitExec(const std::string &) const;
0243 void RegisterCallback(ULong64_t everyNEvents, std::function<void(unsigned int)> &&f);
0244 unsigned int GetNRuns() const { return fNRuns; }
0245 bool HasDataSourceColumnReaders(std::string_view col, const std::type_info &ti) const;
0246 void AddDataSourceColumnReaders(std::string_view col, std::vector<std::unique_ptr<RColumnReaderBase>> &&readers,
0247 const std::type_info &ti);
0248 RColumnReaderBase *GetDatasetColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti) const;
0249 RColumnReaderBase *AddDataSourceColumnReader(unsigned int slot, std::string_view col, const std::type_info &ti,
0250 TTreeReader *treeReader);
0251
0252
0253 void AddFilterName(std::vector<std::string> &) final {}
0254
0255 std::vector<std::string> GetFiltersNames();
0256
0257
0258
0259 std::vector<RNodeBase *> GetGraphEdges() const;
0260
0261
0262 std::vector<RDFInternal::RActionBase *> GetAllActions() const;
0263
0264 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>
0265 GetGraph(std::unordered_map<void *, std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode>> &visitedMap) final;
0266
0267 void AddSampleCallback(void *nodePtr, ROOT::RDF::SampleCallback_t &&callback);
0268
0269 void SetEmptyEntryRange(std::pair<ULong64_t, ULong64_t> &&newRange);
0270 void ChangeBeginAndEndEntries(Long64_t begin, Long64_t end);
0271 void ChangeSpec(ROOT::RDF::Experimental::RDatasetSpec &&spec);
0272
0273 ROOT::Internal::RDF::RStringCache &GetColumnNamesCache() { return fCachedColNames; }
0274 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RDefinesWithReaders>>> &
0275 GetUniqueDefinesWithReaders()
0276 {
0277 return fUniqueDefinesWithReaders;
0278 }
0279 std::set<std::pair<std::string_view, std::unique_ptr<ROOT::Internal::RDF::RVariationsWithReaders>>> &
0280 GetUniqueVariationsWithReaders()
0281 {
0282 return fUniqueVariationsWithReaders;
0283 }
0284
0285 void SetTTreeLifeline(std::any lifeline);
0286
0287
0288 void SetSlotStack(const std::shared_ptr<ROOT::Internal::RSlotStack> &slotStack) { fSlotStack = slotStack; }
0289
0290 void SetDataSource(std::unique_ptr<ROOT::RDF::RDataSource> dataSource);
0291
0292 void InsertSuppressErrorsForMissingBranch(const std::string &branchName)
0293 {
0294 fSuppressErrorsForMissingBranches.insert(branchName);
0295 }
0296 void EraseSuppressErrorsForMissingBranch(const std::string &branchName)
0297 {
0298 fSuppressErrorsForMissingBranches.erase(branchName);
0299 }
0300 const std::set<std::string> &GetSuppressErrorsForMissingBranches() const
0301 {
0302 return fSuppressErrorsForMissingBranches;
0303 }
0304
0305
0306 void DataSourceThreadTask(const std::pair<ULong64_t, ULong64_t> &entryRange, ROOT::Internal::RSlotStack &slotStack,
0307 std::atomic<ULong64_t> &entryCount);
0308
0309 void
0310 TTreeThreadTask(TTreeReader &treeReader, ROOT::Internal::RSlotStack &slotStack, std::atomic<ULong64_t> &entryCount);
0311 };
0312
0313
0314
0315
0316
0317
0318
0319
0320 std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
0321 CreateLMFromTTree(std::string_view datasetName, std::string_view fileNameGlob,
0322 const std::vector<std::string> &defaultColumns, bool checkFile = true);
0323
0324
0325
0326
0327
0328
0329
0330
0331 std::shared_ptr<ROOT::Detail::RDF::RLoopManager>
0332 CreateLMFromTTree(std::string_view datasetName, const std::vector<std::string> &fileNameGlobs,
0333 const std::vector<std::string> &defaultColumns, bool checkFile = true);
0334
0335
0336
0337
0338
0339
0340
0341 std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
0342 std::string_view fileNameGlob,
0343 const std::vector<std::string> &defaultColumns);
0344
0345
0346
0347
0348
0349
0350
0351 std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromRNTuple(std::string_view datasetName,
0352 const std::vector<std::string> &fileNameGlobs,
0353 const std::vector<std::string> &defaultColumns);
0354
0355
0356
0357
0358
0359
0360
0361
0362 std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
0363 std::string_view fileNameGlob,
0364 const std::vector<std::string> &defaultColumns);
0365
0366
0367
0368
0369
0370
0371
0372
0373 std::shared_ptr<ROOT::Detail::RDF::RLoopManager> CreateLMFromFile(std::string_view datasetName,
0374 const std::vector<std::string> &fileNameGlobs,
0375 const std::vector<std::string> &defaultColumns);
0376
0377 }
0378 }
0379 }
0380
0381 #endif