Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-11-03 10:02:56

0001 /// \file RNTupleDS.hxx
0002 /// \ingroup NTuple ROOT7
0003 /// \author Jakob Blomer <jblomer@cern.ch>
0004 /// \author Enrico Guiraud <enrico.guiraud@cern.ch>
0005 /// \date 2018-10-04
0006 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
0007 /// is welcome!
0008 
0009 /*************************************************************************
0010  * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers.               *
0011  * All rights reserved.                                                  *
0012  *                                                                       *
0013  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0014  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0015  *************************************************************************/
0016 
0017 #ifndef ROOT_RNTupleDS
0018 #define ROOT_RNTupleDS
0019 
0020 #include <ROOT/RDataSource.hxx>
0021 #include <ROOT/RNTupleUtil.hxx>
0022 #include <ROOT/RNTupleDescriptor.hxx>
0023 #include <string_view>
0024 
0025 #include <condition_variable>
0026 #include <cstdint>
0027 #include <memory>
0028 #include <mutex>
0029 #include <string>
0030 #include <thread>
0031 #include <vector>
0032 #include <unordered_map>
0033 
0034 namespace ROOT {
0035 class RFieldBase;
0036 class RDataFrame;
0037 class RNTuple;
0038 } // namespace ROOT
0039 namespace ROOT::Internal::RDF {
0040 class RNTupleColumnReader;
0041 }
0042 namespace ROOT::Internal {
0043 class RPageSource;
0044 }
0045 
0046 namespace ROOT::RDF {
0047 class RNTupleDS final : public ROOT::RDF::RDataSource {
0048    friend class ROOT::Internal::RDF::RNTupleColumnReader;
0049 
0050    /// The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
0051    /// The GetEntryRanges() swaps fNextRanges and fCurrentRanges and uses the list of
0052    /// REntryRangeDS records to return the list of ranges ready to use by the RDF loop manager.
0053    struct REntryRangeDS {
0054       std::unique_ptr<ROOT::Internal::RPageSource> fSource;
0055       ULong64_t fFirstEntry = 0; ///< First entry index in fSource
0056       /// End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry
0057       ULong64_t fLastEntry = 0;
0058    };
0059 
0060    /// A clone of the first pages source's descriptor.
0061    ROOT::RNTupleDescriptor fPrincipalDescriptor;
0062 
0063    /// The data source may be constructed with an ntuple name and a list of files
0064    std::string fNTupleName;
0065    std::vector<std::string> fFileNames;
0066    /// The staging area is relevant for chains of files, i.e. when fFileNames is not empty. In this case,
0067    /// files are opened in the background in batches of size `fNSlots` and kept in the staging area.
0068    /// The first file (chains or no chains) is always opened on construction in order to process the schema.
0069    /// For all subsequent files, the corresponding page sources in the staging area only executed `LoadStructure()`,
0070    /// i.e. they should have a compressed buffer of the meta-data available.
0071    /// Concretely:
0072    ///   1. We open the first file on construction to read the schema and then move the corresponding page source
0073    ///      in the staging area.
0074    ///   2. On `Initialize()`, we start the I/O background thread, which in turn opens the first batch of files.
0075    ///   3. At the beginning of `GetEntryRanges()`, we
0076    ///      a) wait for the I/O thread to finish,
0077    ///      b) call `PrepareNextRanges()` in the main thread to move the page sources from the staging area
0078    ///         into `fNextRanges`; this will also call `Attach()` on the page sources (i.e., deserialize the meta-data),
0079    ///         and
0080    ///      c) trigger staging of the next batch of files in the I/O background thread.
0081    ///   4. On `Finalize()`, the I/O background thread is stopped.
0082    std::vector<std::unique_ptr<ROOT::Internal::RPageSource>> fStagingArea;
0083    std::size_t fNextFileIndex = 0; ///< Index into fFileNames to the next file to process
0084 
0085    /// We prepare a prototype field for every column. If a column reader is actually requested
0086    /// in GetColumnReaders(), we move a clone of the field into a new column reader for RDataFrame.
0087    /// Only the clone connects to the backing page store and acquires I/O resources.
0088    /// The field IDs are set in the context of the first source and used as keys in fFieldId2QualifiedName.
0089    std::vector<std::unique_ptr<ROOT::RFieldBase>> fProtoFields;
0090    /// Columns may be requested with types other than with which they were initially added as proto fields. For example,
0091    /// a column with a `ROOT::RVec<float>` proto field may instead be requested as a `std::vector<float>`. In case this
0092    /// happens, we create an alternative proto field and store it here, with the original index in `fProtoFields` as
0093    /// key. A single column can have more than one alternative proto fields.
0094    std::unordered_map<std::size_t, std::vector<std::unique_ptr<ROOT::RFieldBase>>> fAlternativeProtoFields;
0095    /// Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d).
0096    /// This enables the column reader to rewire the field IDs when the file changes (chain),
0097    /// using the fully qualified name as a search key in the descriptor of the other page sources.
0098    std::unordered_map<ROOT::DescriptorId_t, std::string> fFieldId2QualifiedName;
0099    std::vector<std::string> fColumnNames;
0100    std::vector<std::string> fColumnTypes;
0101    /// List of column readers returned by GetColumnReaders() organized by slot. Used to reconnect readers
0102    /// to new page sources when the files in the chain change.
0103    std::vector<std::vector<ROOT::Internal::RDF::RNTupleColumnReader *>> fActiveColumnReaders;
0104 
0105    ULong64_t fSeenEntries = 0;                ///< The number of entries so far returned by GetEntryRanges()
0106    std::vector<REntryRangeDS> fCurrentRanges; ///< Basis for the ranges returned by the last GetEntryRanges() call
0107    std::vector<REntryRangeDS> fNextRanges;    ///< Basis for the ranges populated by the PrepareNextRanges() call
0108    /// Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in
0109    /// the fCurrentRanges vectors.  This is necessary because the returned ranges get distributed arbitrarily
0110    /// onto slots.  In the InitSlot method, the column readers use this map to find the correct range to connect to.
0111    std::unordered_map<ULong64_t, std::size_t> fFirstEntry2RangeIdx;
0112 
0113    /// The background thread that runs StageNextSources()
0114    std::thread fThreadStaging;
0115    /// Protects the shared state between the main thread and the I/O thread
0116    std::mutex fMutexStaging;
0117    /// Signal for the state information of fIsReadyForStaging and fHasNextSources
0118    std::condition_variable fCvStaging;
0119    /// Is true when the staging thread should start working
0120    bool fIsReadyForStaging = false;
0121    /// Is true when the staging thread has populated the next batch of files to fStagingArea
0122    bool fHasNextSources = false;
0123    /// Is true when the I/O thread should quit
0124    bool fStagingThreadShouldTerminate = false;
0125 
0126    /// \brief Holds useful information about fields added to the RNTupleDS
0127    struct RFieldInfo {
0128       ROOT::DescriptorId_t fFieldId;
0129       std::size_t fNRepetitions;
0130       // Enable `std::vector::emplace_back` for this type
0131       RFieldInfo(ROOT::DescriptorId_t fieldId, std::size_t nRepetitions)
0132          : fFieldId(fieldId), fNRepetitions(nRepetitions)
0133       {
0134       }
0135    };
0136 
0137    /// Provides the RDF column "colName" given the field identified by fieldID. For records and collections,
0138    /// AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info
0139    /// about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an
0140    /// `std::vector<Jet>`, with
0141    /// ~~~{.cpp}
0142    /// struct Jet {
0143    ///    float pt;
0144    ///    float eta;
0145    /// };
0146    /// ~~~
0147    /// AddField will recurse into `Jet.pt` and `Jet.eta` and provide the two inner fields as `ROOT::VecOps::RVec<float>`
0148    /// each.
0149    ///
0150    /// In case the field is a collection of type `ROOT::VecOps::RVec`, `std::vector` or `std::array`, its corresponding
0151    /// column is added as a `ROOT::VecOps::RVec`. Otherwise, the collection field's on-disk type is used. Note, however,
0152    /// that inner record members of such collections will still be added as `ROOT::VecOps::RVec` (e.g., `std::set<Jet>
0153    /// will be added as a `std::set`, but `Jet.[pt|eta] will be added as `ROOT::VecOps::RVec<float>).
0154    void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId,
0155                  std::vector<RFieldInfo> fieldInfos, bool convertToRVec = true);
0156 
0157    /// The main function of the fThreadStaging background thread
0158    void ExecStaging();
0159    /// Starting from `fNextFileIndex`, opens the next `fNSlots` files. Calls `LoadStructure()` on the opened files.
0160    /// The very first file is already available from the constructor.
0161    void StageNextSources();
0162    /// Populates fNextRanges with the next set of entry ranges. Moves files from the staging area as necessary
0163    /// and aligns ranges with cluster boundaries for scheduling the tail of files.
0164    /// Upon return, the fNextRanges list is ordered.  It has usually fNSlots elements; fewer if there
0165    /// is not enough work to give at least one cluster to every slot.
0166    void PrepareNextRanges();
0167 
0168    explicit RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource);
0169 
0170 public:
0171    RNTupleDS(std::string_view ntupleName, std::string_view fileName);
0172    RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames);
0173    // Rule of five
0174    RNTupleDS(const RNTupleDS &) = delete;
0175    RNTupleDS &operator=(const RNTupleDS &) = delete;
0176    RNTupleDS(RNTupleDS &&) = delete;
0177    RNTupleDS &operator=(RNTupleDS &&) = delete;
0178    ~RNTupleDS() final;
0179 
0180    void SetNSlots(unsigned int nSlots) final;
0181    std::size_t GetNFiles() const final { return fFileNames.empty() ? 1 : fFileNames.size(); }
0182    const std::vector<std::string> &GetColumnNames() const final { return fColumnNames; }
0183    bool HasColumn(std::string_view colName) const final;
0184    std::string GetTypeName(std::string_view colName) const final;
0185    std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() final;
0186    std::string GetLabel() final { return "RNTupleDS"; }
0187 
0188    void Initialize() final;
0189    void InitSlot(unsigned int slot, ULong64_t firstEntry) final;
0190    void FinalizeSlot(unsigned int slot) final;
0191    void Finalize() final;
0192 
0193    std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
0194    GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &) final;
0195 
0196    // Old API, unused
0197    bool SetEntry(unsigned int, ULong64_t) final { return true; }
0198 
0199 protected:
0200    Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final;
0201 };
0202 } // namespace ROOT::RDF
0203 
0204 namespace ROOT::RDF {
0205 RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName);
0206 RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames);
0207 } // namespace ROOT::RDF
0208 
0209 #endif