Back to home page

EIC code displayed by LXR

 
 

    


Warning, file /include/root/ROOT/RDataSource.hxx was not indexed or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).

0001 // Author: Enrico Guiraud, Danilo Piparo CERN  09/2017
0002 
0003 /*************************************************************************
0004  * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers.               *
0005  * All rights reserved.                                                  *
0006  *                                                                       *
0007  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0008  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0009  *************************************************************************/
0010 
0011 #ifndef ROOT_RDATASOURCE
0012 #define ROOT_RDATASOURCE
0013 
0014 #include "RDF/RColumnReaderBase.hxx"
0015 #include <string_view>
0016 #include "RtypesCore.h" // ULong64_t
0017 #include "TString.h"
0018 
0019 #include <algorithm> // std::transform
0020 #include <cassert>
0021 #include <optional>
0022 #include <set>
0023 #include <string>
0024 #include <typeinfo>
0025 #include <unordered_map>
0026 #include <variant>
0027 #include <vector>
0028 #include <functional>
0029 
0030 // Need to fwd-declare TTreeReader for CreateColumnReader
0031 class TTreeReader;
0032 namespace ROOT::Detail::RDF {
0033 class RLoopManager;
0034 }
0035 
0036 namespace ROOT {
0037 namespace RDF {
0038 class RDataSource;
0039 class RSampleInfo;
0040 namespace Experimental {
0041 class RSample;
0042 }
0043 }
0044 }
0045 
0046 /// Print a RDataSource at the prompt
0047 namespace cling {
0048 std::string printValue(ROOT::RDF::RDataSource *ds);
0049 } // namespace cling
0050 
0051 namespace ROOT {
0052 
0053 namespace Internal {
0054 namespace RDF {
0055 
0056 /// Mother class of TTypedPointerHolder. The instances
0057 /// of this class can be put in a container. Upon destruction,
0058 /// the correct deletion of the pointer is performed in the
0059 /// derived class.
0060 class TPointerHolder {
0061 protected:
0062    void *fPointer{nullptr};
0063 
0064 public:
0065    TPointerHolder(void *ptr) : fPointer(ptr) {}
0066    void *GetPointer() { return fPointer; }
0067    void *GetPointerAddr() { return &fPointer; }
0068    virtual TPointerHolder *GetDeepCopy() = 0;
0069    virtual ~TPointerHolder(){};
0070 };
0071 
0072 /// Class to wrap a pointer and delete the memory associated to it
0073 /// correctly
0074 template <typename T>
0075 class TTypedPointerHolder final : public TPointerHolder {
0076 public:
0077    TTypedPointerHolder(T *ptr) : TPointerHolder((void *)ptr) {}
0078 
0079    TPointerHolder *GetDeepCopy() final
0080    {
0081       const auto typedPtr = static_cast<T *>(fPointer);
0082       return new TTypedPointerHolder(new T(*typedPtr));
0083    }
0084 
0085    ~TTypedPointerHolder() { delete static_cast<T *>(fPointer); }
0086 };
0087 
0088 std::string GetTypeNameWithOpts(const ROOT::RDF::RDataSource &ds, std::string_view colName, bool vector2RVec);
0089 const std::vector<std::string> &GetTopLevelFieldNames(const ROOT::RDF::RDataSource &ds);
0090 const std::vector<std::string> &GetColumnNamesNoDuplicates(const ROOT::RDF::RDataSource &ds);
0091 void CallInitializeWithOpts(ROOT::RDF::RDataSource &ds, const std::set<std::string> &suppressErrorsForMissingColumns);
0092 std::string DescribeDataset(ROOT::RDF::RDataSource &ds);
0093 ROOT::RDF::RSampleInfo
0094 CreateSampleInfo(const ROOT::RDF::RDataSource &ds, unsigned int slot,
0095                  const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &sampleMap);
0096 void RunFinalChecks(const ROOT::RDF::RDataSource &ds, bool nodesLeftNotRun);
0097 void ProcessMT(ROOT::RDF::RDataSource &ds, ROOT::Detail::RDF::RLoopManager &lm);
0098 std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
0099 CreateColumnReader(ROOT::RDF::RDataSource &ds, unsigned int slot, std::string_view col, const std::type_info &tid,
0100                    TTreeReader *treeReader);
0101 } // namespace RDF
0102 
0103 } // ns Internal
0104 
0105 namespace RDF {
0106 
0107 // clang-format off
0108 /**
0109 \class ROOT::RDF::RDataSource
0110 \ingroup dataframe
0111 \brief RDataSource defines an API that RDataFrame can use to read arbitrary data formats.
0112 
0113 A concrete RDataSource implementation (i.e. a class that inherits from RDataSource and implements all of its pure
0114 methods) provides an adaptor that RDataFrame can leverage to read any kind of tabular data formats.
0115 RDataFrame calls into RDataSource to retrieve information about the data, retrieve (thread-local) readers or "cursors"
0116 for selected columns and to advance the readers to the desired data entry.
0117 
0118 The sequence of calls that RDataFrame (or any other client of a RDataSource) performs is the following:
0119 
0120  - SetNSlots() : inform RDataSource of the desired level of parallelism
0121  - GetColumnReaders() : retrieve from RDataSource per-thread readers for the desired columns
0122  - Initialize() : inform RDataSource that an event-loop is about to start
0123  - GetEntryRanges() : retrieve from RDataSource a set of ranges of entries that can be processed concurrently
0124  - InitSlot() : inform RDataSource that a certain thread is about to start working on a certain range of entries
0125  - SetEntry() : inform RDataSource that a certain thread is about to start working on a certain entry
0126  - FinalizeSlot() : inform RDataSource that a certain thread finished working on a certain range of entries
0127  - Finalize() : inform RDataSource that an event-loop finished
0128 
0129 RDataSource implementations must support running multiple event-loops consecutively (although sequentially) on the same dataset.
0130  - \b SetNSlots() is called once per RDataSource object, typically when it is associated to a RDataFrame.
0131  - \b GetColumnReaders() can be called several times, potentially with the same arguments, also in-between event-loops, but not during an event-loop.
0132  - \b GetEntryRanges() will be called several times, including during an event loop, as additional ranges are needed.  It will not be called concurrently.
0133  - \b Initialize() and \b Finalize() are called once per event-loop,  right before starting and right after finishing.
0134  - \b InitSlot(), \b SetEntry(), and \b FinalizeSlot() can be called concurrently from multiple threads, multiple times per event-loop.
0135 
0136  Advanced users that plan to implement a custom RDataSource can check out existing implementations, e.g. RCsvDS or RNTupleDS.
0137  See the inheritance diagram below for the full list of existing concrete implementations.
0138 */
0139 class RDataSource {
0140    // clang-format on
0141 protected:
0142    using Record_t = std::vector<void *>;
0143    friend std::string cling::printValue(::ROOT::RDF::RDataSource *);
0144 
0145    virtual std::string AsString() { return "generic data source"; };
0146 
0147    unsigned int fNSlots{};
0148 
0149    std::optional<std::pair<ULong64_t, ULong64_t>> fGlobalEntryRange{};
0150 
0151    friend std::string ROOT::Internal::RDF::GetTypeNameWithOpts(const RDataSource &, std::string_view, bool);
0152    virtual std::string GetTypeNameWithOpts(std::string_view colName, bool) const { return GetTypeName(colName); }
0153 
0154    friend const std::vector<std::string> &ROOT::Internal::RDF::GetTopLevelFieldNames(const ROOT::RDF::RDataSource &);
0155    virtual const std::vector<std::string> &GetTopLevelFieldNames() const { return GetColumnNames(); }
0156 
0157    friend const std::vector<std::string> &
0158    ROOT::Internal::RDF::GetColumnNamesNoDuplicates(const ROOT::RDF::RDataSource &);
0159    virtual const std::vector<std::string> &GetColumnNamesNoDuplicates() const { return GetColumnNames(); }
0160 
0161    friend void ROOT::Internal::RDF::CallInitializeWithOpts(ROOT::RDF::RDataSource &, const std::set<std::string> &);
0162    virtual void InitializeWithOpts(const std::set<std::string> &) { Initialize(); }
0163 
0164    friend std::string ROOT::Internal::RDF::DescribeDataset(ROOT::RDF::RDataSource &);
0165    virtual std::string DescribeDataset() { return "Dataframe from datasource " + GetLabel(); }
0166 
0167    friend ROOT::RDF::RSampleInfo
0168    ROOT::Internal::RDF::CreateSampleInfo(const ROOT::RDF::RDataSource &, unsigned int,
0169                                          const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &);
0170    virtual ROOT::RDF::RSampleInfo
0171    CreateSampleInfo(unsigned int, const std::unordered_map<std::string, ROOT::RDF::Experimental::RSample *> &) const;
0172 
0173    friend void ROOT::Internal::RDF::RunFinalChecks(const ROOT::RDF::RDataSource &, bool);
0174    virtual void RunFinalChecks(bool) const {}
0175 
0176    friend void ROOT::Internal::RDF::ProcessMT(RDataSource &, ROOT::Detail::RDF::RLoopManager &);
0177    virtual void ProcessMT(ROOT::Detail::RDF::RLoopManager &);
0178 
0179    friend std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
0180    ROOT::Internal::RDF::CreateColumnReader(ROOT::RDF::RDataSource &, unsigned int, std::string_view,
0181                                            const std::type_info &, TTreeReader *);
0182    /**
0183     * \brief Creates a column reader for the requested column
0184     *
0185     * In the general case, this is just a redirect to the right GetColumnReaders overload. The signature notably also
0186     * has a TTreeReader * parameter. This is currently necessary to still allow the TTree-based MT scheduling via
0187     * TTreeProcessorMT. We use the TTreeProcessorMT::Process method to launch the same kernel across all threads. In
0188     * each thread task, TTreeProcessorMT creates a thread-local instance of a TTreeReader which is going to read the
0189     * range of events assigned to that task. That TTreeReader instance is what is passed to this method whenever a
0190     * column reader needs to be created in a thread task. In the future this method might be removed by either allowing
0191     * to request a handle to the thread-local TTreeReader instance programmatically from the TTreeProcessorMT, or
0192     * refactoring the TTreeProcessorMT scheduling into RTTreeDS altogether.
0193     */
0194    virtual std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
0195    CreateColumnReader(unsigned int slot, std::string_view col, const std::type_info &tid, TTreeReader *)
0196    {
0197       return GetColumnReaders(slot, col, tid);
0198    }
0199 
0200 public:
0201    RDataSource() = default;
0202    // Rule of five
0203    RDataSource(const RDataSource &) = delete;
0204    RDataSource &operator=(const RDataSource &) = delete;
0205    RDataSource(RDataSource &&) = delete;
0206    RDataSource &operator=(RDataSource &&) = delete;
0207    virtual ~RDataSource() = default;
0208 
0209    // clang-format off
0210    /// \brief Inform RDataSource of the number of processing slots (i.e. worker threads) used by the associated RDataFrame.
0211    /// Slots numbers are used to simplify parallel execution: RDataFrame guarantees that different threads will always
0212    /// pass different slot values when calling methods concurrently.
0213    // clang-format on
0214    virtual void SetNSlots(unsigned int nSlots)
0215    {
0216       assert(fNSlots == 0);
0217       assert(nSlots > 0);
0218       fNSlots = nSlots;
0219    };
0220 
0221    /// \brief Returns the number of files from which the dataset is constructed
0222    virtual std::size_t GetNFiles() const { return 0; }
0223 
0224    // clang-format off
0225    /// \brief Returns a reference to the collection of the dataset's column names
0226    // clang-format on
0227    virtual const std::vector<std::string> &GetColumnNames() const = 0;
0228 
0229    /// \brief Checks if the dataset has a certain column
0230    /// \param[in] colName The name of the column
0231    virtual bool HasColumn(std::string_view colName) const = 0;
0232 
0233    // clang-format off
0234    /// \brief Type of a column as a string, e.g. `GetTypeName("x") == "double"`. Required for jitting e.g. `df.Filter("x>0")`.
0235    /// \param[in] colName The name of the column
0236    // clang-format on
0237    virtual std::string GetTypeName(std::string_view colName) const = 0;
0238 
0239    // clang-format off
0240    /// Called at most once per column by RDF. Return vector of pointers to pointers to column values - one per slot.
0241    /// \tparam T The type of the data stored in the column
0242    /// \param[in] columnName The name of the column
0243    ///
0244    /// These pointers are veritable cursors: it's a responsibility of the RDataSource implementation that they point to
0245    /// the "right" memory region.
0246    // clang-format on
0247    template <typename T>
0248    std::vector<T **> GetColumnReaders(std::string_view columnName)
0249    {
0250       auto typeErasedVec = GetColumnReadersImpl(columnName, typeid(T));
0251       std::vector<T **> typedVec(typeErasedVec.size());
0252       std::transform(typeErasedVec.begin(), typeErasedVec.end(), typedVec.begin(),
0253                      [](void *p) { return static_cast<T **>(p); });
0254       return typedVec;
0255    }
0256 
0257    /// If the other GetColumnReaders overload returns an empty vector, this overload will be called instead.
0258    /// \param[in] slot The data processing slot that needs to be considered
0259    /// \param[in] name The name of the column for which a column reader needs to be returned
0260    /// \param[in] tid A type_info
0261    /// At least one of the two must return a non-empty/non-null value.
0262    virtual std::unique_ptr<ROOT::Detail::RDF::RColumnReaderBase>
0263    GetColumnReaders(unsigned int /*slot*/, std::string_view /*name*/, const std::type_info &)
0264    {
0265       return {};
0266    }
0267 
0268    // clang-format off
0269    /// \brief Return ranges of entries to distribute to tasks.
0270    /// They are required to be contiguous intervals with no entries skipped. Supposing a dataset with nEntries, the
0271    /// intervals must start at 0 and end at nEntries, e.g. [0-5],[5-10] for 10 entries.
0272    /// This function will be invoked repeatedly by RDataFrame as it needs additional entries to process.
0273    /// The same entry range should not be returned more than once.
0274    /// Returning an empty collection of ranges signals to RDataFrame that the processing can stop.
0275    // clang-format on
0276    virtual std::vector<std::pair<ULong64_t, ULong64_t>> GetEntryRanges() = 0;
0277 
0278    // clang-format off
0279    /// \brief Advance the "cursors" returned by GetColumnReaders to the selected entry for a particular slot.
0280    /// \param[in] slot The data processing slot that needs to be considered
0281    /// \param[in] entry The entry which needs to be pointed to by the reader pointers
0282    /// Slots are adopted to accommodate parallel data processing. 
0283    /// Different workers will loop over different ranges and
0284    /// will be labelled by different "slot" values.
0285    /// Returns *true* if the entry has to be processed, *false* otherwise.
0286    // clang-format on
0287    virtual bool SetEntry(unsigned int slot, ULong64_t entry) = 0;
0288 
0289    // clang-format off
0290    /// \brief Convenience method called before starting an event-loop.
0291    /// This method might be called multiple times over the lifetime of a RDataSource, since
0292    /// users can run multiple event-loops with the same RDataFrame.
0293    /// Ideally, `Initialize` should set the state of the RDataSource so that multiple identical event-loops
0294    /// will produce identical results.
0295    // clang-format on
0296    virtual void Initialize() {}
0297 
0298    // clang-format off
0299    /// \brief Convenience method called at the start of the data processing associated to a slot.
0300    /// \param[in] slot The data processing slot wihch needs to be initialized
0301    /// \param[in] firstEntry The first entry of the range that the task will process.
0302    /// This method might be called multiple times per thread per event-loop.
0303    // clang-format on
0304    virtual void InitSlot(unsigned int /*slot*/, ULong64_t /*firstEntry*/) {}
0305 
0306    // clang-format off
0307    /// \brief Convenience method called at the end of the data processing associated to a slot.
0308    /// \param[in] slot The data processing slot wihch needs to be finalized
0309    /// This method might be called multiple times per thread per event-loop.
0310    // clang-format on
0311    virtual void FinalizeSlot(unsigned int /*slot*/) {}
0312 
0313    // clang-format off
0314    /// \brief Convenience method called after concluding an event-loop.
0315    /// See Initialize for more details.
0316    // clang-format on
0317    virtual void Finalize() {}
0318 
0319    /// \brief Return a string representation of the datasource type.
0320    /// The returned string will be used by ROOT::RDF::SaveGraph() to represent
0321    /// the datasource in the visualization of the computation graph.
0322    /// Concrete datasources can override the default implementation.
0323    virtual std::string GetLabel() { return "Custom Datasource"; }
0324 
0325    /// \brief Restrict processing to a [begin, end) range of entries.
0326    /// \param entryRange The range of entries to process.
0327    virtual void SetGlobalEntryRange(std::pair<ULong64_t, ULong64_t> entryRange)
0328    {
0329       fGlobalEntryRange = std::move(entryRange);
0330    };
0331 
0332 protected:
0333    /// type-erased vector of pointers to pointers to column values - one per slot
0334    virtual Record_t GetColumnReadersImpl(std::string_view name, const std::type_info &) = 0;
0335 };
0336 
0337 } // ns RDF
0338 
0339 } // ns ROOT
0340 
0341 /// Print a RDataSource at the prompt
0342 namespace cling {
0343 inline std::string printValue(ROOT::RDF::RDataSource *ds)
0344 {
0345    return ds->AsString();
0346 }
0347 } // namespace cling
0348 
0349 #endif // ROOT_TDATASOURCE