|
|
|||
File indexing completed on 2025-11-03 10:02:56
0001 /// \file RNTupleDS.hxx 0002 /// \ingroup NTuple ROOT7 0003 /// \author Jakob Blomer <jblomer@cern.ch> 0004 /// \author Enrico Guiraud <enrico.guiraud@cern.ch> 0005 /// \date 2018-10-04 0006 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback 0007 /// is welcome! 0008 0009 /************************************************************************* 0010 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. * 0011 * All rights reserved. * 0012 * * 0013 * For the licensing terms see $ROOTSYS/LICENSE. * 0014 * For the list of contributors see $ROOTSYS/README/CREDITS. * 0015 *************************************************************************/ 0016 0017 #ifndef ROOT_RNTupleDS 0018 #define ROOT_RNTupleDS 0019 0020 #include <ROOT/RDataSource.hxx> 0021 #include <ROOT/RNTupleUtil.hxx> 0022 #include <ROOT/RNTupleDescriptor.hxx> 0023 #include <string_view> 0024 0025 #include <condition_variable> 0026 #include <cstdint> 0027 #include <memory> 0028 #include <mutex> 0029 #include <string> 0030 #include <thread> 0031 #include <vector> 0032 #include <unordered_map> 0033 0034 namespace ROOT { 0035 class RFieldBase; 0036 class RDataFrame; 0037 class RNTuple; 0038 } // namespace ROOT 0039 namespace ROOT::Internal::RDF { 0040 class RNTupleColumnReader; 0041 } 0042 namespace ROOT::Internal { 0043 class RPageSource; 0044 } 0045 0046 namespace ROOT::RDF { 0047 class RNTupleDS final : public ROOT::RDF::RDataSource { 0048 friend class ROOT::Internal::RDF::RNTupleColumnReader; 0049 0050 /// The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records. 0051 /// The GetEntryRanges() swaps fNextRanges and fCurrentRanges and uses the list of 0052 /// REntryRangeDS records to return the list of ranges ready to use by the RDF loop manager. 0053 struct REntryRangeDS { 0054 std::unique_ptr<ROOT::Internal::RPageSource> fSource; 0055 ULong64_t fFirstEntry = 0; ///< First entry index in fSource 0056 /// End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry 0057 ULong64_t fLastEntry = 0; 0058 }; 0059 0060 /// A clone of the first pages source's descriptor. 0061 ROOT::RNTupleDescriptor fPrincipalDescriptor; 0062 0063 /// The data source may be constructed with an ntuple name and a list of files 0064 std::string fNTupleName; 0065 std::vector<std::string> fFileNames; 0066 /// The staging area is relevant for chains of files, i.e. when fFileNames is not empty. In this case, 0067 /// files are opened in the background in batches of size `fNSlots` and kept in the staging area. 0068 /// The first file (chains or no chains) is always opened on construction in order to process the schema. 0069 /// For all subsequent files, the corresponding page sources in the staging area only executed `LoadStructure()`, 0070 /// i.e. they should have a compressed buffer of the meta-data available. 0071 /// Concretely: 0072 /// 1. We open the first file on construction to read the schema and then move the corresponding page source 0073 /// in the staging area. 0074 /// 2. On `Initialize()`, we start the I/O background thread, which in turn opens the first batch of files. 0075 /// 3. At the beginning of `GetEntryRanges()`, we 0076 /// a) wait for the I/O thread to finish, 0077 /// b) call `PrepareNextRanges()` in the main thread to move the page sources from the staging area 0078 /// into `fNextRanges`; this will also call `Attach()` on the page sources (i.e., deserialize the meta-data), 0079 /// and 0080 /// c) trigger staging of the next batch of files in the I/O background thread. 0081 /// 4. On `Finalize()`, the I/O background thread is stopped. 0082 std::vector<std::unique_ptr<ROOT::Internal::RPageSource>> fStagingArea; 0083 std::size_t fNextFileIndex = 0; ///< Index into fFileNames to the next file to process 0084 0085 /// We prepare a prototype field for every column. If a column reader is actually requested 0086 /// in GetColumnReaders(), we move a clone of the field into a new column reader for RDataFrame. 0087 /// Only the clone connects to the backing page store and acquires I/O resources. 0088 /// The field IDs are set in the context of the first source and used as keys in fFieldId2QualifiedName. 0089 std::vector<std::unique_ptr<ROOT::RFieldBase>> fProtoFields; 0090 /// Columns may be requested with types other than with which they were initially added as proto fields. For example, 0091 /// a column with a `ROOT::RVec<float>` proto field may instead be requested as a `std::vector<float>`. In case this 0092 /// happens, we create an alternative proto field and store it here, with the original index in `fProtoFields` as 0093 /// key. A single column can have more than one alternative proto fields. 0094 std::unordered_map<std::size_t, std::vector<std::unique_ptr<ROOT::RFieldBase>>> fAlternativeProtoFields; 0095 /// Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d). 0096 /// This enables the column reader to rewire the field IDs when the file changes (chain), 0097 /// using the fully qualified name as a search key in the descriptor of the other page sources. 0098 std::unordered_map<ROOT::DescriptorId_t, std::string> fFieldId2QualifiedName; 0099 std::vector<std::string> fColumnNames; 0100 std::vector<std::string> fColumnTypes; 0101 /// List of column readers returned by GetColumnReaders() organized by slot. Used to reconnect readers 0102 /// to new page sources when the files in the chain change. 0103 std::vector<std::vector<ROOT::Internal::RDF::RNTupleColumnReader *>> fActiveColumnReaders; 0104 0105 ULong64_t fSeenEntries = 0; ///< The number of entries so far returned by GetEntryRanges() 0106 std::vector<REntryRangeDS> fCurrentRanges; ///< Basis for the ranges returned by the last GetEntryRanges() call 0107 std::vector<REntryRangeDS> fNextRanges; ///< Basis for the ranges populated by the PrepareNextRanges() call 0108 /// Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in 0109 /// the fCurrentRanges vectors. This is necessary because the returned ranges get distributed arbitrarily 0110 /// onto slots. In the InitSlot method, the column readers use this map to find the correct range to connect to. 0111 std::unordered_map<ULong64_t, std::size_t> fFirstEntry2RangeIdx; 0112 0113 /// The background thread that runs StageNextSources() 0114 std::thread fThreadStaging; 0115 /// Protects the shared state between the main thread and the I/O thread 0116 std::mutex fMutexStaging; 0117 /// Signal for the state information of fIsReadyForStaging and fHasNextSources 0118 std::condition_variable fCvStaging; 0119 /// Is true when the staging thread should start working 0120 bool fIsReadyForStaging = false; 0121 /// Is true when the staging thread has populated the next batch of files to fStagingArea 0122 bool fHasNextSources = false; 0123 /// Is true when the I/O thread should quit 0124 bool fStagingThreadShouldTerminate = false; 0125 0126 /// \brief Holds useful information about fields added to the RNTupleDS 0127 struct RFieldInfo { 0128 ROOT::DescriptorId_t fFieldId; 0129 std::size_t fNRepetitions; 0130 // Enable `std::vector::emplace_back` for this type 0131 RFieldInfo(ROOT::DescriptorId_t fieldId, std::size_t nRepetitions) 0132 : fFieldId(fieldId), fNRepetitions(nRepetitions) 0133 { 0134 } 0135 }; 0136 0137 /// Provides the RDF column "colName" given the field identified by fieldID. For records and collections, 0138 /// AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info 0139 /// about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an 0140 /// `std::vector<Jet>`, with 0141 /// ~~~{.cpp} 0142 /// struct Jet { 0143 /// float pt; 0144 /// float eta; 0145 /// }; 0146 /// ~~~ 0147 /// AddField will recurse into `Jet.pt` and `Jet.eta` and provide the two inner fields as `ROOT::VecOps::RVec<float>` 0148 /// each. 0149 /// 0150 /// In case the field is a collection of type `ROOT::VecOps::RVec`, `std::vector` or `std::array`, its corresponding 0151 /// column is added as a `ROOT::VecOps::RVec`. Otherwise, the collection field's on-disk type is used. Note, however, 0152 /// that inner record members of such collections will still be added as `ROOT::VecOps::RVec` (e.g., `std::set<Jet> 0153 /// will be added as a `std::set`, but `Jet.[pt|eta] will be added as `ROOT::VecOps::RVec<float>). 0154 void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, 0155 std::vector<RFieldInfo> fieldInfos, bool convertToRVec = true); 0156 0157 /// The main function of the fThreadStaging background thread 0158 void ExecStaging(); 0159 /// Starting from `fNextFileIndex`, opens the next `fNSlots` files. Calls `LoadStructure()` on the opened files. 0160 /// The very first file is already available from the constructor. 0161 void StageNextSources(); 0162 /// Populates fNextRanges with the next set of entry ranges. Moves files from the staging area as necessary 0163 /// and aligns ranges with cluster boundaries for scheduling the tail of files. 0164 /// Upon return, the fNextRanges list is ordered. It has usually fNSlots elements; fewer if there 0165 /// is not enough work to give at least one cluster to every slot. 0166 void PrepareNextRanges(); 0167 0168 explicit RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource); 0169 0170 public: 0171 RNTupleDS(std::string_view ntupleName, std::string_view fileName); 0172 RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames); 0173 // Rule of five 0174 RNTupleDS(const RNTupleDS &) = delete; 0175 RNTupleDS &operator=(const RNTupleDS &) = delete; 0176 RNTupleDS(RNTupleDS &&) = delete; 0177 RNTupleDS &operator=(RNTupleDS &&) = delete; 0178 ~RNTupleDS() final; 0179 0180 void SetNSlots(unsigned int nSlots) final; 0181 std::size_t GetNFiles() const final { return fFileNames.empty() ? 1 : fFileNames.size(); } 0182 const std::vector<std::string> &GetColumnNames() const final { return fColumnNames; } 0183 bool HasColumn(std::string_view colName) const final; 0184 std::string GetTypeName(std::string_view colName) const final; 0185 std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() final; 0186 std::string GetLabel() final { return "RNTupleDS"; } 0187 0188 void Initialize() final; 0189 void InitSlot(unsigned int slot, ULong64_t firstEntry) final; 0190 void FinalizeSlot(unsigned int slot) final; 0191 void Finalize() final; 0192 0193 std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase> 0194 GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &) final; 0195 0196 // Old API, unused 0197 bool SetEntry(unsigned int, ULong64_t) final { return true; } 0198 0199 protected: 0200 Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final; 0201 }; 0202 } // namespace ROOT::RDF 0203 0204 namespace ROOT::RDF { 0205 RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName); 0206 RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames); 0207 } // namespace ROOT::RDF 0208 0209 #endif
| [ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
|
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
|