Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/root/ROOT/RNTupleDS.hxx was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 /// \file RNTupleDS.hxx
0002 /// \ingroup NTuple ROOT7
0003 /// \author Jakob Blomer <jblomer@cern.ch>
0004 /// \author Enrico Guiraud <enrico.guiraud@cern.ch>
0005 /// \date 2018-10-04
0006 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
0007 /// is welcome!
0008 
0009 /*************************************************************************
0010  * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers.               *
0011  * All rights reserved.                                                  *
0012  *                                                                       *
0013  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0014  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0015  *************************************************************************/
0016 
0017 #ifndef ROOT_RNTupleDS
0018 #define ROOT_RNTupleDS
0019 
0020 #include <ROOT/RDataSource.hxx>
0021 #include <ROOT/RNTupleDescriptor.hxx>
0022 #include <ROOT/RNTupleTypes.hxx>
0023 #include <string_view>
0024 
0025 #include <condition_variable>
0026 #include <cstdint>
0027 #include <memory>
0028 #include <mutex>
0029 #include <optional>
0030 #include <string>
0031 #include <thread>
0032 #include <vector>
0033 #include <unordered_map>
0034 
0035 // Follow RDF namespace convention
0036 namespace ROOT {
0037 class RDataFrame;
0038 }
0039 namespace ROOT::Internal::RDF {
0040 /**
0041  * \brief Internal overload of the function that allows passing a range of entries
0042  *
0043  * The event range will be respected when processing this RNTuple. It is assumed
0044  * that processing happens within one thread only.
0045  */
0046 ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames,
0047                              const std::pair<ULong64_t, ULong64_t> &range);
0048 /**
0049  * \brief Retrieves the cluster boundaries and the number of entries for the input RNTuple
0050  *
0051  * \param[in] ntupleName The name of the RNTuple dataset
0052  * \param[in] location The location of the RNTuple dataset (e.g. a path to a file)
0053  *
0054  * \note This function is a helper for the Python side to avoid having to deal
0055  *       with the shared descriptor guard.
0056  */
0057 std::pair<std::vector<ROOT::Internal::RNTupleClusterBoundaries>, ROOT::NTupleSize_t>
0058 GetClustersAndEntries(std::string_view ntupleName, std::string_view location);
0059 } // namespace ROOT::Internal::RDF
0060 
0061 namespace ROOT {
0062 class RFieldBase;
0063 class RDataFrame;
0064 class RNTuple;
0065 } // namespace ROOT
0066 namespace ROOT::Internal::RDF {
0067 class RNTupleColumnReader;
0068 }
0069 namespace ROOT::Internal {
0070 class RPageSource;
0071 }
0072 
0073 namespace ROOT::RDF {
0074 class RNTupleDS final : public ROOT::RDF::RDataSource {
0075    friend class ROOT::Internal::RDF::RNTupleColumnReader;
0076 
0077    /// The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records.
0078    /// The GetEntryRanges() swaps fNextRanges and fCurrentRanges and uses the list of
0079    /// REntryRangeDS records to return the list of ranges ready to use by the RDF loop manager.
0080    struct REntryRangeDS {
0081       std::unique_ptr<ROOT::Internal::RPageSource> fSource;
0082       ULong64_t fFirstEntry = 0; ///< First entry index in fSource
0083       /// End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry
0084       ULong64_t fLastEntry = 0;
0085       std::string_view fFileName; ///< Storage location of the current RNTuple
0086    };
0087 
0088    /// A clone of the first pages source's descriptor.
0089    ROOT::RNTupleDescriptor fPrincipalDescriptor;
0090 
0091    /// The data source may be constructed with an ntuple name and a list of files
0092    std::string fNTupleName;
0093    std::vector<std::string> fFileNames;
0094    /// The staging area is relevant for chains of files, i.e. when fFileNames is not empty. In this case,
0095    /// files are opened in the background in batches of size `fNSlots` and kept in the staging area.
0096    /// The first file (chains or no chains) is always opened on construction in order to process the schema.
0097    /// For all subsequent files, the corresponding page sources in the staging area only executed `LoadStructure()`,
0098    /// i.e. they should have a compressed buffer of the meta-data available.
0099    /// Concretely:
0100    ///   1. We open the first file on construction to read the schema and then move the corresponding page source
0101    ///      in the staging area.
0102    ///   2. On `Initialize()`, we start the I/O background thread, which in turn opens the first batch of files.
0103    ///   3. At the beginning of `GetEntryRanges()`, we
0104    ///      a) wait for the I/O thread to finish,
0105    ///      b) call `PrepareNextRanges()` in the main thread to move the page sources from the staging area
0106    ///         into `fNextRanges`; this will also call `Attach()` on the page sources (i.e., deserialize the meta-data),
0107    ///         and
0108    ///      c) trigger staging of the next batch of files in the I/O background thread.
0109    ///   4. On `Finalize()`, the I/O background thread is stopped.
0110    std::vector<std::unique_ptr<ROOT::Internal::RPageSource>> fStagingArea;
0111    std::size_t fNextFileIndex = 0; ///< Index into fFileNames to the next file to process
0112 
0113    /// We prepare a prototype field for every column. If a column reader is actually requested
0114    /// in GetColumnReaders(), we move a clone of the field into a new column reader for RDataFrame.
0115    /// Only the clone connects to the backing page store and acquires I/O resources.
0116    /// The field IDs are set in the context of the first source and used as keys in fFieldId2QualifiedName.
0117    std::vector<std::unique_ptr<ROOT::RFieldBase>> fProtoFields;
0118    /// Columns may be requested with types other than with which they were initially added as proto fields. For example,
0119    /// a column with a `ROOT::RVec<float>` proto field may instead be requested as a `std::vector<float>`. In case this
0120    /// happens, we create an alternative proto field and store it here, with the original index in `fProtoFields` as
0121    /// key. A single column can have more than one alternative proto fields.
0122    std::unordered_map<std::size_t, std::vector<std::unique_ptr<ROOT::RFieldBase>>> fAlternativeProtoFields;
0123    /// Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d).
0124    /// This enables the column reader to rewire the field IDs when the file changes (chain),
0125    /// using the fully qualified name as a search key in the descriptor of the other page sources.
0126    std::unordered_map<ROOT::DescriptorId_t, std::string> fFieldId2QualifiedName;
0127    std::vector<std::string> fColumnNames;
0128    std::vector<std::string> fColumnTypes;
0129    /// List of column readers returned by GetColumnReaders() organized by slot. Used to reconnect readers
0130    /// to new page sources when the files in the chain change.
0131    std::vector<std::vector<ROOT::Internal::RDF::RNTupleColumnReader *>> fActiveColumnReaders;
0132 
0133    ULong64_t fSeenEntriesNoGlobalRange = 0; ///< The number of entries seen so far in GetEntryRanges()
0134 
0135    std::vector<REntryRangeDS> fCurrentRanges; ///< Basis for the ranges returned by the last GetEntryRanges() call
0136    std::vector<REntryRangeDS> fNextRanges;    ///< Basis for the ranges populated by the PrepareNextRanges() call
0137    /// Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in
0138    /// the fCurrentRanges vectors.  This is necessary because the returned ranges get distributed arbitrarily
0139    /// onto slots.  In the InitSlot method, the column readers use this map to find the correct range to connect to.
0140    std::unordered_map<ULong64_t, std::size_t> fFirstEntry2RangeIdx;
0141    // Keep track of the scheduled entries - necessary for processing of GlobalEntries
0142    std::vector<std::pair<ULong64_t, ULong64_t>> fOriginalRanges;
0143    /// One element per slot, corresponding to the current range index for that slot, as filled by InitSlot
0144    std::vector<std::size_t> fSlotsToRangeIdxs;
0145 
0146    /// The background thread that runs StageNextSources()
0147    std::thread fThreadStaging;
0148    /// Protects the shared state between the main thread and the I/O thread
0149    std::mutex fMutexStaging;
0150    /// Signal for the state information of fIsReadyForStaging and fHasNextSources
0151    std::condition_variable fCvStaging;
0152    /// Is true when the staging thread should start working
0153    bool fIsReadyForStaging = false;
0154    /// Is true when the staging thread has populated the next batch of files to fStagingArea
0155    bool fHasNextSources = false;
0156    /// Is true when the I/O thread should quit
0157    bool fStagingThreadShouldTerminate = false;
0158 
0159    /// \brief Holds useful information about fields added to the RNTupleDS
0160    struct RFieldInfo {
0161       ROOT::DescriptorId_t fFieldId;
0162       std::size_t fNRepetitions;
0163       // Enable `std::vector::emplace_back` for this type
0164       RFieldInfo(ROOT::DescriptorId_t fieldId, std::size_t nRepetitions)
0165          : fFieldId(fieldId), fNRepetitions(nRepetitions)
0166       {
0167       }
0168    };
0169 
0170    /// Provides the RDF column "colName" given the field identified by fieldID. For records and collections,
0171    /// AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info
0172    /// about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an
0173    /// `std::vector<Jet>`, with
0174    /// ~~~{.cpp}
0175    /// struct Jet {
0176    ///    float pt;
0177    ///    float eta;
0178    /// };
0179    /// ~~~
0180    /// AddField will recurse into `Jet.pt` and `Jet.eta` and provide the two inner fields as `ROOT::VecOps::RVec<float>`
0181    /// each.
0182    ///
0183    /// In case the field is a collection of type `ROOT::VecOps::RVec`, `std::vector` or `std::array`, its corresponding
0184    /// column is added as a `ROOT::VecOps::RVec`. Otherwise, the collection field's on-disk type is used. Note, however,
0185    /// that inner record members of such collections will still be added as `ROOT::VecOps::RVec` (e.g., `std::set<Jet>
0186    /// will be added as a `std::set`, but `Jet.[pt|eta] will be added as `ROOT::VecOps::RVec<float>).
0187    void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId,
0188                  std::vector<RFieldInfo> fieldInfos, bool convertToRVec = true);
0189 
0190    /// The main function of the fThreadStaging background thread
0191    void ExecStaging();
0192    /// Starting from `fNextFileIndex`, opens the next `fNSlots` files. Calls `LoadStructure()` on the opened files.
0193    /// The very first file is already available from the constructor.
0194    void StageNextSources();
0195    /// Populates fNextRanges with the next set of entry ranges. Moves files from the staging area as necessary
0196    /// and aligns ranges with cluster boundaries for scheduling the tail of files.
0197    /// Upon return, the fNextRanges list is ordered.  It has usually fNSlots elements; fewer if there
0198    /// is not enough work to give at least one cluster to every slot.
0199    void PrepareNextRanges();
0200 
0201    explicit RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource);
0202 
0203    ROOT::RFieldBase *GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid);
0204 
0205    friend ROOT::RDataFrame ROOT::Internal::RDF::FromRNTuple(std::string_view ntupleName,
0206                                                             const std::vector<std::string> &fileNames,
0207                                                             const std::pair<ULong64_t, ULong64_t> &range);
0208 
0209    explicit RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames,
0210                       const std::pair<ULong64_t, ULong64_t> &range);
0211 
0212 public:
0213    RNTupleDS(std::string_view ntupleName, std::string_view fileName);
0214    RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames);
0215    // Rule of five
0216    RNTupleDS(const RNTupleDS &) = delete;
0217    RNTupleDS &operator=(const RNTupleDS &) = delete;
0218    RNTupleDS(RNTupleDS &&) = delete;
0219    RNTupleDS &operator=(RNTupleDS &&) = delete;
0220    ~RNTupleDS() final;
0221 
0222    void SetNSlots(unsigned int nSlots) final;
0223    std::size_t GetNFiles() const final { return fFileNames.empty() ? 1 : fFileNames.size(); }
0224    const std::vector<std::string> &GetColumnNames() const final { return fColumnNames; }
0225    bool HasColumn(std::string_view colName) const final;
0226    std::string GetTypeName(std::string_view colName) const final;
0227    std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() final;
0228    std::string GetLabel() final { return "RNTupleDS"; }
0229 
0230    void Initialize() final;
0231    void InitSlot(unsigned int slot, ULong64_t firstEntry) final;
0232    void FinalizeSlot(unsigned int slot) final;
0233    void Finalize() final;
0234 
0235    std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
0236    GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &) final;
0237 
0238    ROOT::RDF::RSampleInfo
0239    CreateSampleInfo(unsigned int,
0240                     const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &) const final;
0241 
0242    // Old API, unused
0243    bool SetEntry(unsigned int, ULong64_t) final { return true; }
0244 
0245 protected:
0246    Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final;
0247 };
0248 } // namespace ROOT::RDF
0249 
0250 namespace ROOT::RDF {
0251 RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName);
0252 RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames);
0253 } // namespace ROOT::RDF
0254 
0255 #endif