|
|
|||
Warning, file /include/root/ROOT/RDataSource.hxx was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001 // Author: Enrico Guiraud, Danilo Piparo CERN 09/2017 0002 0003 /************************************************************************* 0004 * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. * 0005 * All rights reserved. * 0006 * * 0007 * For the licensing terms see $ROOTSYS/LICENSE. * 0008 * For the list of contributors see $ROOTSYS/README/CREDITS. * 0009 *************************************************************************/ 0010 0011 #ifndef ROOT_RDATASOURCE 0012 #define ROOT_RDATASOURCE 0013 0014 #include "RDF/RColumnReaderBase.hxx" 0015 #include <string_view> 0016 #include "RtypesCore.h" // ULong64_t 0017 #include "TString.h" 0018 0019 #include <algorithm> // std::transform 0020 #include <cassert> 0021 #include <optional> 0022 #include <set> 0023 #include <string> 0024 #include <typeinfo> 0025 #include <unordered_map> 0026 #include <variant> 0027 #include <vector> 0028 #include <functional> 0029 0030 // Need to fwd-declare TTreeReader for CreateColumnReader 0031 class TTreeReader; 0032 namespace ROOT::Detail::RDF { 0033 class RLoopManager; 0034 } 0035 0036 namespace ROOT { 0037 namespace RDF { 0038 class RDataSource; 0039 class RSampleInfo; 0040 namespace Experimental { 0041 class RSample; 0042 } 0043 } 0044 } 0045 0046 /// Print a RDataSource at the prompt 0047 namespace cling { 0048 std::string printValue(ROOT::RDF::RDataSource *ds); 0049 } // namespace cling 0050 0051 namespace ROOT { 0052 0053 namespace Internal { 0054 namespace RDF { 0055 0056 /// Mother class of TTypedPointerHolder. The instances 0057 /// of this class can be put in a container. Upon destruction, 0058 /// the correct deletion of the pointer is performed in the 0059 /// derived class. 0060 class TPointerHolder { 0061 protected: 0062 void *fPointer{nullptr}; 0063 0064 public: 0065 TPointerHolder(void *ptr) : fPointer(ptr) {} 0066 void *GetPointer() { return fPointer; } 0067 void *GetPointerAddr() { return &fPointer; } 0068 virtual TPointerHolder *GetDeepCopy() = 0; 0069 virtual ~TPointerHolder(){}; 0070 }; 0071 0072 /// Class to wrap a pointer and delete the memory associated to it 0073 /// correctly 0074 template <typename T> 0075 class TTypedPointerHolder final : public TPointerHolder { 0076 public: 0077 TTypedPointerHolder(T *ptr) : TPointerHolder((void *)ptr) {} 0078 0079 TPointerHolder *GetDeepCopy() final 0080 { 0081 const auto typedPtr = static_cast<T *>(fPointer); 0082 return new TTypedPointerHolder(new T(*typedPtr)); 0083 } 0084 0085 ~TTypedPointerHolder() { delete static_cast<T *>(fPointer); } 0086 }; 0087 0088 std::string GetTypeNameWithOpts(const ROOT::RDF::RDataSource &ds, std::string_view colName, bool vector2RVec); 0089 const std::vector<std::string> &GetTopLevelFieldNames(const ROOT::RDF::RDataSource &ds); 0090 const std::vector<std::string> &GetColumnNamesNoDuplicates(const ROOT::RDF::RDataSource &ds); 0091 void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns); 0092 std::string DescribeDataset(ROOT::RDF::RDataSource &ds); 0093 ROOT::RDF::RSampleInfo 0094 CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot, 0095 const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &sampleMap); 0096 void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun); 0097 void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm); 0098 std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase> 0099 CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid, 0100 TTreeReader *treeReader); 0101 } // namespace RDF 0102 0103 } // ns Internal 0104 0105 namespace RDF { 0106 0107 // clang-format off 0108 /** 0109 \class ROOT::RDF::RDataSource 0110 \ingroup dataframe 0111 \brief RDataSource defines an API that RDataFrame can use to read arbitrary data formats. 0112 0113 A concrete RDataSource implementation (i.e. a class that inherits from RDataSource and implements all of its pure 0114 methods) provides an adaptor that RDataFrame can leverage to read any kind of tabular data formats. 0115 RDataFrame calls into RDataSource to retrieve information about the data, retrieve (thread-local) readers or "cursors" 0116 for selected columns and to advance the readers to the desired data entry. 0117 0118 The sequence of calls that RDataFrame (or any other client of a RDataSource) performs is the following: 0119 0120 - SetNSlots() : inform RDataSource of the desired level of parallelism 0121 - GetColumnReaders() : retrieve from RDataSource per-thread readers for the desired columns 0122 - Initialize() : inform RDataSource that an event-loop is about to start 0123 - GetEntryRanges() : retrieve from RDataSource a set of ranges of entries that can be processed concurrently 0124 - InitSlot() : inform RDataSource that a certain thread is about to start working on a certain range of entries 0125 - SetEntry() : inform RDataSource that a certain thread is about to start working on a certain entry 0126 - FinalizeSlot() : inform RDataSource that a certain thread finished working on a certain range of entries 0127 - Finalize() : inform RDataSource that an event-loop finished 0128 0129 RDataSource implementations must support running multiple event-loops consecutively (although sequentially) on the same dataset. 0130 - \b SetNSlots() is called once per RDataSource object, typically when it is associated to a RDataFrame. 0131 - \b GetColumnReaders() can be called several times, potentially with the same arguments, also in-between event-loops, but not during an event-loop. 0132 - \b GetEntryRanges() will be called several times, including during an event loop, as additional ranges are needed. It will not be called concurrently. 0133 - \b Initialize() and \b Finalize() are called once per event-loop, right before starting and right after finishing. 0134 - \b InitSlot(), \b SetEntry(), and \b FinalizeSlot() can be called concurrently from multiple threads, multiple times per event-loop. 0135 0136 Advanced users that plan to implement a custom RDataSource can check out existing implementations, e.g. RCsvDS or RNTupleDS. 0137 See the inheritance diagram below for the full list of existing concrete implementations. 0138 */ 0139 class RDataSource { 0140 // clang-format on 0141 protected: 0142 using Record_t = std::vector<void *>; 0143 friend std::string cling::printValue(::ROOT::RDF::RDataSource *); 0144 0145 virtual std::string AsString() { return "generic data source"; }; 0146 0147 unsigned int fNSlots{}; 0148 0149 std::optional<std::pair<ULong64_t, ULong64_t>> fGlobalEntryRange{}; 0150 0151 friend std::string ROOT::Internal::RDF::GetTypeNameWithOpts(const RDataSource &, std::string_view, bool); 0152 virtual std::string GetTypeNameWithOpts(std::string_view colName, bool) const { return GetTypeName(colName); } 0153 0154 friend const std::vector<std::string> &ROOT::Internal::RDF::GetTopLevelFieldNames(const ROOT::RDF::RDataSource &); 0155 virtual const std::vector<std::string> &GetTopLevelFieldNames() const { return GetColumnNames(); } 0156 0157 friend const std::vector<std::string> & 0158 ROOT::Internal::RDF::GetColumnNamesNoDuplicates(const ROOT::RDF::RDataSource &); 0159 virtual const std::vector<std::string> &GetColumnNamesNoDuplicates() const { return GetColumnNames(); } 0160 0161 friend void ROOT::Internal::RDF::CallInitializeWithOpts(ROOT::RDF::RDataSource &, const std::set<std::string> &); 0162 virtual void InitializeWithOpts(const std::set<std::string> &) { Initialize(); } 0163 0164 friend std::string ROOT::Internal::RDF::DescribeDataset(ROOT::RDF::RDataSource &); 0165 virtual std::string DescribeDataset() { return "Dataframe from datasource " + GetLabel(); } 0166 0167 friend ROOT::RDF::RSampleInfo 0168 ROOT::Internal::RDF::CreateSampleInfo(const ROOT::RDF::RDataSource &, unsigned int, 0169 const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &); 0170 virtual ROOT::RDF::RSampleInfo 0171 CreateSampleInfo(unsigned int, const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &) const; 0172 0173 friend void ROOT::Internal::RDF::RunFinalChecks(const ROOT::RDF::RDataSource &, bool); 0174 virtual void RunFinalChecks(bool) const {} 0175 0176 friend void ROOT::Internal::RDF::ProcessMT(RDataSource &, ROOT::Detail::RDF::RLoopManager &); 0177 virtual void ProcessMT(ROOT::Detail::RDF::RLoopManager &); 0178 0179 friend std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase> 0180 ROOT::Internal::RDF::CreateColumnReader(ROOT::RDF::RDataSource &, unsigned int, std::string_view, 0181 const std::type_info &, TTreeReader *); 0182 /** 0183 * \brief Creates a column reader for the requested column 0184 * 0185 * In the general case, this is just a redirect to the right GetColumnReaders overload. The signature notably also 0186 * has a TTreeReader * parameter. This is currently necessary to still allow the TTree-based MT scheduling via 0187 * TTreeProcessorMT. We use the TTreeProcessorMT::Process method to launch the same kernel across all threads. In 0188 * each thread task, TTreeProcessorMT creates a thread-local instance of a TTreeReader which is going to read the 0189 * range of events assigned to that task. That TTreeReader instance is what is passed to this method whenever a 0190 * column reader needs to be created in a thread task. In the future this method might be removed by either allowing 0191 * to request a handle to the thread-local TTreeReader instance programmatically from the TTreeProcessorMT, or 0192 * refactoring the TTreeProcessorMT scheduling into RTTreeDS altogether. 0193 */ 0194 virtual std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase> 0195 CreateColumnReader(unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *) 0196 { 0197 return GetColumnReaders(slot, col, tid); 0198 } 0199 0200 public: 0201 RDataSource() = default; 0202 // Rule of five 0203 RDataSource(const RDataSource &) = delete; 0204 RDataSource &operator=(const RDataSource &) = delete; 0205 RDataSource(RDataSource &&) = delete; 0206 RDataSource &operator=(RDataSource &&) = delete; 0207 virtual ~RDataSource() = default; 0208 0209 // clang-format off 0210 /// \brief Inform RDataSource of the number of processing slots (i.e. worker threads) used by the associated RDataFrame. 0211 /// Slots numbers are used to simplify parallel execution: RDataFrame guarantees that different threads will always 0212 /// pass different slot values when calling methods concurrently. 0213 // clang-format on 0214 virtual void SetNSlots(unsigned int nSlots) 0215 { 0216 assert(fNSlots == 0); 0217 assert(nSlots > 0); 0218 fNSlots = nSlots; 0219 }; 0220 0221 /// \brief Returns the number of files from which the dataset is constructed 0222 virtual std::size_t GetNFiles() const { return 0; } 0223 0224 // clang-format off 0225 /// \brief Returns a reference to the collection of the dataset's column names 0226 // clang-format on 0227 virtual const std::vector<std::string> &GetColumnNames() const = 0; 0228 0229 /// \brief Checks if the dataset has a certain column 0230 /// \param[in] colName The name of the column 0231 virtual bool HasColumn(std::string_view colName) const = 0; 0232 0233 // clang-format off 0234 /// \brief Type of a column as a string, e.g. `GetTypeName("x") == "double"`. Required for jitting e.g. `df.Filter("x>0")`. 0235 /// \param[in] colName The name of the column 0236 // clang-format on 0237 virtual std::string GetTypeName(std::string_view colName) const = 0; 0238 0239 // clang-format off 0240 /// Called at most once per column by RDF. Return vector of pointers to pointers to column values - one per slot. 0241 /// \tparam T The type of the data stored in the column 0242 /// \param[in] columnName The name of the column 0243 /// 0244 /// These pointers are veritable cursors: it's a responsibility of the RDataSource implementation that they point to 0245 /// the "right" memory region. 0246 // clang-format on 0247 template <typename T> 0248 std::vector<T **> GetColumnReaders(std::string_view columnName) 0249 { 0250 auto typeErasedVec = GetColumnReadersImpl(columnName, typeid(T)); 0251 std::vector<T **> typedVec(typeErasedVec.size()); 0252 std::transform(typeErasedVec.begin(), typeErasedVec.end(), typedVec.begin(), 0253 [](void *p) { return static_cast<T **>(p); }); 0254 return typedVec; 0255 } 0256 0257 /// If the other GetColumnReaders overload returns an empty vector, this overload will be called instead. 0258 /// \param[in] slot The data processing slot that needs to be considered 0259 /// \param[in] name The name of the column for which a column reader needs to be returned 0260 /// \param[in] tid A type_info 0261 /// At least one of the two must return a non-empty/non-null value. 0262 virtual std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase> 0263 GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &) 0264 { 0265 return {}; 0266 } 0267 0268 // clang-format off 0269 /// \brief Return ranges of entries to distribute to tasks. 0270 /// They are required to be contiguous intervals with no entries skipped. Supposing a dataset with nEntries, the 0271 /// intervals must start at 0 and end at nEntries, e.g. [0-5],[5-10] for 10 entries. 0272 /// This function will be invoked repeatedly by RDataFrame as it needs additional entries to process. 0273 /// The same entry range should not be returned more than once. 0274 /// Returning an empty collection of ranges signals to RDataFrame that the processing can stop. 0275 // clang-format on 0276 virtual std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() = 0; 0277 0278 // clang-format off 0279 /// \brief Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot. 0280 /// \param[in] slot The data processing slot that needs to be considered 0281 /// \param[in] entry The entry which needs to be pointed to by the reader pointers 0282 /// Slots are adopted to accommodate parallel data processing. 0283 /// Different workers will loop over different ranges and 0284 /// will be labelled by different "slot" values. 0285 /// Returns *true* if the entry has to be processed, *false* otherwise. 0286 // clang-format on 0287 virtual bool SetEntry(unsigned int slot, ULong64_t entry) = 0; 0288 0289 // clang-format off 0290 /// \brief Convenience method called before starting an event-loop. 0291 /// This method might be called multiple times over the lifetime of a RDataSource, since 0292 /// users can run multiple event-loops with the same RDataFrame. 0293 /// Ideally, `Initialize` should set the state of the RDataSource so that multiple identical event-loops 0294 /// will produce identical results. 0295 // clang-format on 0296 virtual void Initialize() {} 0297 0298 // clang-format off 0299 /// \brief Convenience method called at the start of the data processing associated to a slot. 0300 /// \param[in] slot The data processing slot wihch needs to be initialized 0301 /// \param[in] firstEntry The first entry of the range that the task will process. 0302 /// This method might be called multiple times per thread per event-loop. 0303 // clang-format on 0304 virtual void InitSlot(unsigned int /*slot*/, ULong64_t /*firstEntry*/) {} 0305 0306 // clang-format off 0307 /// \brief Convenience method called at the end of the data processing associated to a slot. 0308 /// \param[in] slot The data processing slot wihch needs to be finalized 0309 /// This method might be called multiple times per thread per event-loop. 0310 // clang-format on 0311 virtual void FinalizeSlot(unsigned int /*slot*/) {} 0312 0313 // clang-format off 0314 /// \brief Convenience method called after concluding an event-loop. 0315 /// See Initialize for more details. 0316 // clang-format on 0317 virtual void Finalize() {} 0318 0319 /// \brief Return a string representation of the datasource type. 0320 /// The returned string will be used by ROOT::RDF::SaveGraph() to represent 0321 /// the datasource in the visualization of the computation graph. 0322 /// Concrete datasources can override the default implementation. 0323 virtual std::string GetLabel() { return "Custom Datasource"; } 0324 0325 /// \brief Restrict processing to a [begin, end) range of entries. 0326 /// \param entryRange The range of entries to process. 0327 virtual void SetGlobalEntryRange(std::pair<ULong64_t, ULong64_t> entryRange) 0328 { 0329 fGlobalEntryRange = std::move(entryRange); 0330 }; 0331 0332 protected: 0333 /// type-erased vector of pointers to pointers to column values - one per slot 0334 virtual Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) = 0; 0335 }; 0336 0337 } // ns RDF 0338 0339 } // ns ROOT 0340 0341 /// Print a RDataSource at the prompt 0342 namespace cling { 0343 inline std::string printValue(ROOT::RDF::RDataSource *ds) 0344 { 0345 return ds->AsString(); 0346 } 0347 } // namespace cling 0348 0349 #endif // ROOT_TDATASOURCE
| [ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
|
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |
|