|
|
|||
Warning, file /include/root/ROOT/RNTupleDS.hxx was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001 /// \file RNTupleDS.hxx 0002 /// \ingroup NTuple ROOT7 0003 /// \author Jakob Blomer <jblomer@cern.ch> 0004 /// \author Enrico Guiraud <enrico.guiraud@cern.ch> 0005 /// \date 2018-10-04 0006 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback 0007 /// is welcome! 0008 0009 /************************************************************************* 0010 * Copyright (C) 1995-2025, Rene Brun and Fons Rademakers. * 0011 * All rights reserved. * 0012 * * 0013 * For the licensing terms see $ROOTSYS/LICENSE. * 0014 * For the list of contributors see $ROOTSYS/README/CREDITS. * 0015 *************************************************************************/ 0016 0017 #ifndef ROOT_RNTupleDS 0018 #define ROOT_RNTupleDS 0019 0020 #include <ROOT/RDataSource.hxx> 0021 #include <ROOT/RNTupleDescriptor.hxx> 0022 #include <ROOT/RNTupleTypes.hxx> 0023 #include <string_view> 0024 0025 #include <condition_variable> 0026 #include <cstdint> 0027 #include <memory> 0028 #include <mutex> 0029 #include <optional> 0030 #include <string> 0031 #include <thread> 0032 #include <vector> 0033 #include <unordered_map> 0034 0035 // Follow RDF namespace convention 0036 namespace ROOT { 0037 class RDataFrame; 0038 } 0039 namespace ROOT::Internal::RDF { 0040 /** 0041 * \brief Internal overload of the function that allows passing a range of entries 0042 * 0043 * The event range will be respected when processing this RNTuple. It is assumed 0044 * that processing happens within one thread only. 0045 */ 0046 ROOT::RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames, 0047 const std::pair<ULong64_t, ULong64_t> &range); 0048 /** 0049 * \brief Retrieves the cluster boundaries and the number of entries for the input RNTuple 0050 * 0051 * \param[in] ntupleName The name of the RNTuple dataset 0052 * \param[in] location The location of the RNTuple dataset (e.g. a path to a file) 0053 * 0054 * \note This function is a helper for the Python side to avoid having to deal 0055 * with the shared descriptor guard. 0056 */ 0057 std::pair<std::vector<ROOT::Internal::RNTupleClusterBoundaries>, ROOT::NTupleSize_t> 0058 GetClustersAndEntries(std::string_view ntupleName, std::string_view location); 0059 } // namespace ROOT::Internal::RDF 0060 0061 namespace ROOT { 0062 class RFieldBase; 0063 class RDataFrame; 0064 class RNTuple; 0065 } // namespace ROOT 0066 namespace ROOT::Internal::RDF { 0067 class RNTupleColumnReader; 0068 } 0069 namespace ROOT::Internal { 0070 class RPageSource; 0071 } 0072 0073 namespace ROOT::RDF { 0074 class RNTupleDS final : public ROOT::RDF::RDataSource { 0075 friend class ROOT::Internal::RDF::RNTupleColumnReader; 0076 0077 /// The PrepareNextRanges() method populates the fNextRanges list with REntryRangeDS records. 0078 /// The GetEntryRanges() swaps fNextRanges and fCurrentRanges and uses the list of 0079 /// REntryRangeDS records to return the list of ranges ready to use by the RDF loop manager. 0080 struct REntryRangeDS { 0081 std::unique_ptr<ROOT::Internal::RPageSource> fSource; 0082 ULong64_t fFirstEntry = 0; ///< First entry index in fSource 0083 /// End entry index in fSource, e.g. the number of entries in the range is fLastEntry - fFirstEntry 0084 ULong64_t fLastEntry = 0; 0085 std::string_view fFileName; ///< Storage location of the current RNTuple 0086 }; 0087 0088 /// A clone of the first pages source's descriptor. 0089 ROOT::RNTupleDescriptor fPrincipalDescriptor; 0090 0091 /// The data source may be constructed with an ntuple name and a list of files 0092 std::string fNTupleName; 0093 std::vector<std::string> fFileNames; 0094 /// The staging area is relevant for chains of files, i.e. when fFileNames is not empty. In this case, 0095 /// files are opened in the background in batches of size `fNSlots` and kept in the staging area. 0096 /// The first file (chains or no chains) is always opened on construction in order to process the schema. 0097 /// For all subsequent files, the corresponding page sources in the staging area only executed `LoadStructure()`, 0098 /// i.e. they should have a compressed buffer of the meta-data available. 0099 /// Concretely: 0100 /// 1. We open the first file on construction to read the schema and then move the corresponding page source 0101 /// in the staging area. 0102 /// 2. On `Initialize()`, we start the I/O background thread, which in turn opens the first batch of files. 0103 /// 3. At the beginning of `GetEntryRanges()`, we 0104 /// a) wait for the I/O thread to finish, 0105 /// b) call `PrepareNextRanges()` in the main thread to move the page sources from the staging area 0106 /// into `fNextRanges`; this will also call `Attach()` on the page sources (i.e., deserialize the meta-data), 0107 /// and 0108 /// c) trigger staging of the next batch of files in the I/O background thread. 0109 /// 4. On `Finalize()`, the I/O background thread is stopped. 0110 std::vector<std::unique_ptr<ROOT::Internal::RPageSource>> fStagingArea; 0111 std::size_t fNextFileIndex = 0; ///< Index into fFileNames to the next file to process 0112 0113 /// We prepare a prototype field for every column. If a column reader is actually requested 0114 /// in GetColumnReaders(), we move a clone of the field into a new column reader for RDataFrame. 0115 /// Only the clone connects to the backing page store and acquires I/O resources. 0116 /// The field IDs are set in the context of the first source and used as keys in fFieldId2QualifiedName. 0117 std::vector<std::unique_ptr<ROOT::RFieldBase>> fProtoFields; 0118 /// Columns may be requested with types other than with which they were initially added as proto fields. For example, 0119 /// a column with a `ROOT::RVec<float>` proto field may instead be requested as a `std::vector<float>`. In case this 0120 /// happens, we create an alternative proto field and store it here, with the original index in `fProtoFields` as 0121 /// key. A single column can have more than one alternative proto fields. 0122 std::unordered_map<std::size_t, std::vector<std::unique_ptr<ROOT::RFieldBase>>> fAlternativeProtoFields; 0123 /// Connects the IDs of active proto fields and their subfields to their fully qualified name (a.b.c.d). 0124 /// This enables the column reader to rewire the field IDs when the file changes (chain), 0125 /// using the fully qualified name as a search key in the descriptor of the other page sources. 0126 std::unordered_map<ROOT::DescriptorId_t, std::string> fFieldId2QualifiedName; 0127 std::vector<std::string> fColumnNames; 0128 std::vector<std::string> fColumnTypes; 0129 /// List of column readers returned by GetColumnReaders() organized by slot. Used to reconnect readers 0130 /// to new page sources when the files in the chain change. 0131 std::vector<std::vector<ROOT::Internal::RDF::RNTupleColumnReader *>> fActiveColumnReaders; 0132 0133 ULong64_t fSeenEntriesNoGlobalRange = 0; ///< The number of entries seen so far in GetEntryRanges() 0134 0135 std::vector<REntryRangeDS> fCurrentRanges; ///< Basis for the ranges returned by the last GetEntryRanges() call 0136 std::vector<REntryRangeDS> fNextRanges; ///< Basis for the ranges populated by the PrepareNextRanges() call 0137 /// Maps the first entries from the ranges of the last GetEntryRanges() call to their corresponding index in 0138 /// the fCurrentRanges vectors. This is necessary because the returned ranges get distributed arbitrarily 0139 /// onto slots. In the InitSlot method, the column readers use this map to find the correct range to connect to. 0140 std::unordered_map<ULong64_t, std::size_t> fFirstEntry2RangeIdx; 0141 // Keep track of the scheduled entries - necessary for processing of GlobalEntries 0142 std::vector<std::pair<ULong64_t, ULong64_t>> fOriginalRanges; 0143 /// One element per slot, corresponding to the current range index for that slot, as filled by InitSlot 0144 std::vector<std::size_t> fSlotsToRangeIdxs; 0145 0146 /// The background thread that runs StageNextSources() 0147 std::thread fThreadStaging; 0148 /// Protects the shared state between the main thread and the I/O thread 0149 std::mutex fMutexStaging; 0150 /// Signal for the state information of fIsReadyForStaging and fHasNextSources 0151 std::condition_variable fCvStaging; 0152 /// Is true when the staging thread should start working 0153 bool fIsReadyForStaging = false; 0154 /// Is true when the staging thread has populated the next batch of files to fStagingArea 0155 bool fHasNextSources = false; 0156 /// Is true when the I/O thread should quit 0157 bool fStagingThreadShouldTerminate = false; 0158 0159 /// \brief Holds useful information about fields added to the RNTupleDS 0160 struct RFieldInfo { 0161 ROOT::DescriptorId_t fFieldId; 0162 std::size_t fNRepetitions; 0163 // Enable `std::vector::emplace_back` for this type 0164 RFieldInfo(ROOT::DescriptorId_t fieldId, std::size_t nRepetitions) 0165 : fFieldId(fieldId), fNRepetitions(nRepetitions) 0166 { 0167 } 0168 }; 0169 0170 /// Provides the RDF column "colName" given the field identified by fieldID. For records and collections, 0171 /// AddField recurses into the sub fields. The fieldInfos argument is a list of objects holding info 0172 /// about the fields of the outer collection(s) (w.r.t. fieldId). For instance, if fieldId refers to an 0173 /// `std::vector<Jet>`, with 0174 /// ~~~{.cpp} 0175 /// struct Jet { 0176 /// float pt; 0177 /// float eta; 0178 /// }; 0179 /// ~~~ 0180 /// AddField will recurse into `Jet.pt` and `Jet.eta` and provide the two inner fields as `ROOT::VecOps::RVec<float>` 0181 /// each. 0182 /// 0183 /// In case the field is a collection of type `ROOT::VecOps::RVec`, `std::vector` or `std::array`, its corresponding 0184 /// column is added as a `ROOT::VecOps::RVec`. Otherwise, the collection field's on-disk type is used. Note, however, 0185 /// that inner record members of such collections will still be added as `ROOT::VecOps::RVec` (e.g., `std::set<Jet> 0186 /// will be added as a `std::set`, but `Jet.[pt|eta] will be added as `ROOT::VecOps::RVec<float>). 0187 void AddField(const ROOT::RNTupleDescriptor &desc, std::string_view colName, ROOT::DescriptorId_t fieldId, 0188 std::vector<RFieldInfo> fieldInfos, bool convertToRVec = true); 0189 0190 /// The main function of the fThreadStaging background thread 0191 void ExecStaging(); 0192 /// Starting from `fNextFileIndex`, opens the next `fNSlots` files. Calls `LoadStructure()` on the opened files. 0193 /// The very first file is already available from the constructor. 0194 void StageNextSources(); 0195 /// Populates fNextRanges with the next set of entry ranges. Moves files from the staging area as necessary 0196 /// and aligns ranges with cluster boundaries for scheduling the tail of files. 0197 /// Upon return, the fNextRanges list is ordered. It has usually fNSlots elements; fewer if there 0198 /// is not enough work to give at least one cluster to every slot. 0199 void PrepareNextRanges(); 0200 0201 explicit RNTupleDS(std::unique_ptr<ROOT::Internal::RPageSource> pageSource); 0202 0203 ROOT::RFieldBase *GetFieldWithTypeChecks(std::string_view fieldName, const std::type_info &tid); 0204 0205 friend ROOT::RDataFrame ROOT::Internal::RDF::FromRNTuple(std::string_view ntupleName, 0206 const std::vector<std::string> &fileNames, 0207 const std::pair<ULong64_t, ULong64_t> &range); 0208 0209 explicit RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames, 0210 const std::pair<ULong64_t, ULong64_t> &range); 0211 0212 public: 0213 RNTupleDS(std::string_view ntupleName, std::string_view fileName); 0214 RNTupleDS(std::string_view ntupleName, const std::vector<std::string> &fileNames); 0215 // Rule of five 0216 RNTupleDS(const RNTupleDS &) = delete; 0217 RNTupleDS &operator=(const RNTupleDS &) = delete; 0218 RNTupleDS(RNTupleDS &&) = delete; 0219 RNTupleDS &operator=(RNTupleDS &&) = delete; 0220 ~RNTupleDS() final; 0221 0222 void SetNSlots(unsigned int nSlots) final; 0223 std::size_t GetNFiles() const final { return fFileNames.empty() ? 1 : fFileNames.size(); } 0224 const std::vector<std::string> &GetColumnNames() const final { return fColumnNames; } 0225 bool HasColumn(std::string_view colName) const final; 0226 std::string GetTypeName(std::string_view colName) const final; 0227 std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() final; 0228 std::string GetLabel() final { return "RNTupleDS"; } 0229 0230 void Initialize() final; 0231 void InitSlot(unsigned int slot, ULong64_t firstEntry) final; 0232 void FinalizeSlot(unsigned int slot) final; 0233 void Finalize() final; 0234 0235 std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase> 0236 GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &) final; 0237 0238 ROOT::RDF::RSampleInfo 0239 CreateSampleInfo(unsigned int, 0240 const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &) const final; 0241 0242 // Old API, unused 0243 bool SetEntry(unsigned int, ULong64_t) final { return true; } 0244 0245 protected: 0246 Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) final; 0247 }; 0248 } // namespace ROOT::RDF 0249 0250 namespace ROOT::RDF { 0251 RDataFrame FromRNTuple(std::string_view ntupleName, std::string_view fileName); 0252 RDataFrame FromRNTuple(std::string_view ntupleName, const std::vector<std::string> &fileNames); 0253 } // namespace ROOT::RDF 0254 0255 #endif
| [ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
|
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
|