Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:22:39

0001 /// \file ROOT/RPageStorageDaos.hxx
0002 /// \ingroup NTuple ROOT7
0003 /// \author Javier Lopez-Gomez <j.lopez@cern.ch>
0004 /// \date 2020-11-03
0005 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
0006 /// is welcome!
0007 
0008 /*************************************************************************
0009  * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers.               *
0010  * All rights reserved.                                                  *
0011  *                                                                       *
0012  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0013  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0014  *************************************************************************/
0015 
0016 #ifndef ROOT7_RPageStorageDaos
0017 #define ROOT7_RPageStorageDaos
0018 
0019 #include <ROOT/RError.hxx>
0020 #include <ROOT/RPageStorage.hxx>
0021 #include <ROOT/RNTuple.hxx>
0022 #include <ROOT/RNTupleSerialize.hxx>
0023 #include <ROOT/RNTupleZip.hxx>
0024 #include <string_view>
0025 
0026 #include <array>
0027 #include <atomic>
0028 #include <cstdio>
0029 #include <memory>
0030 #include <string>
0031 #include <optional>
0032 
0033 namespace ROOT {
0034 namespace Experimental {
0035 
0036 namespace Internal {
0037 using ntuple_index_t = std::uint32_t;
0038 class RCluster;
0039 class RClusterPool;
0040 class RDaosPool;
0041 class RDaosContainer;
0042 class RPageAllocatorHeap;
0043 class RPagePool;
0044 enum EDaosLocatorFlags {
0045    // Indicates that the referenced page is "caged", i.e. it is stored in a larger blob that contains multiple pages.
0046    kCagedPage = 0x01,
0047 };
0048 
0049 // clang-format off
0050 /**
0051 \class ROOT::Experimental::Internal::RDaosNTupleAnchor
0052 \ingroup NTuple
0053 \brief Entry point for an RNTuple in a DAOS container. It encodes essential
0054 information to read the ntuple; currently, it contains (un)compressed size of
0055 the header/footer blobs and the object class for user data OIDs.
0056 The length of a serialized anchor cannot be greater than the value returned by the `GetSize` function.
0057 */
0058 // clang-format on
0059 struct RDaosNTupleAnchor {
0060    /// Allows for evolving the struct in future versions
0061    std::uint64_t fVersionAnchor = 1;
0062    /// Version of the binary format supported by the writer
0063    std::uint16_t fVersionEpoch = RNTuple::kVersionEpoch;
0064    std::uint16_t fVersionMajor = RNTuple::kVersionMajor;
0065    std::uint16_t fVersionMinor = RNTuple::kVersionMinor;
0066    std::uint16_t fVersionPatch = RNTuple::kVersionPatch;
0067    /// The size of the compressed ntuple header
0068    std::uint32_t fNBytesHeader = 0;
0069    /// The size of the uncompressed ntuple header
0070    std::uint32_t fLenHeader = 0;
0071    /// The size of the compressed ntuple footer
0072    std::uint32_t fNBytesFooter = 0;
0073    /// The size of the uncompressed ntuple footer
0074    std::uint32_t fLenFooter = 0;
0075    /// The object class for user data OIDs, e.g. `SX`
0076    std::string fObjClass{};
0077 
0078    bool operator ==(const RDaosNTupleAnchor &other) const {
0079       return fVersionAnchor == other.fVersionAnchor && fVersionEpoch == other.fVersionEpoch &&
0080              fVersionMajor == other.fVersionMajor && fVersionMinor == other.fVersionMinor &&
0081              fVersionPatch == other.fVersionPatch && fNBytesHeader == other.fNBytesHeader &&
0082              fLenHeader == other.fLenHeader && fNBytesFooter == other.fNBytesFooter && fLenFooter == other.fLenFooter &&
0083              fObjClass == other.fObjClass;
0084    }
0085 
0086    std::uint32_t Serialize(void *buffer) const;
0087    RResult<std::uint32_t> Deserialize(const void *buffer, std::uint32_t bufSize);
0088 
0089    static std::uint32_t GetSize();
0090 }; // struct RDaosNTupleAnchor
0091 
0092 // clang-format off
0093 /**
0094 \class ROOT::Experimental::Internal::RPageSinkDaos
0095 \ingroup NTuple
0096 \brief Storage provider that writes ntuple pages to into a DAOS container
0097 
0098 Currently, an object is allocated for ntuple metadata (anchor/header/footer).
0099 Objects can correspond to pages or clusters of pages depending on the RNTuple-DAOS mapping strategy.
0100 */
0101 // clang-format on
0102 class RPageSinkDaos : public RPagePersistentSink {
0103 private:
0104    std::unique_ptr<RPageAllocatorHeap> fPageAllocator;
0105 
0106    /// \brief Underlying DAOS container. An internal `std::shared_ptr` keep the pool connection alive.
0107    /// ISO C++ ensures the correct destruction order, i.e., `~RDaosContainer` is invoked first
0108    /// (which calls `daos_cont_close()`; the destructor for the `std::shared_ptr<RDaosPool>` is invoked
0109    /// after (which calls `daos_pool_disconect()`).
0110    std::unique_ptr<RDaosContainer> fDaosContainer;
0111    /// Page identifier for the next committed page; it is automatically incremented in `CommitSealedPageImpl()`
0112    std::atomic<std::uint64_t> fPageId{0};
0113    /// Cluster group counter for the next committed cluster pagelist; incremented in `CommitClusterGroupImpl()`
0114    std::atomic<std::uint64_t> fClusterGroupId{0};
0115    /// \brief A URI to a DAOS pool of the form 'daos://pool-label/container-label'
0116    std::string fURI;
0117    /// Tracks the number of bytes committed to the current cluster
0118    std::uint64_t fNBytesCurrentCluster{0};
0119 
0120    RDaosNTupleAnchor fNTupleAnchor;
0121    ntuple_index_t fNTupleIndex{0};
0122    uint32_t fCageSizeLimit{};
0123 
0124 protected:
0125    using RPagePersistentSink::InitImpl;
0126    void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final;
0127    RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final;
0128    RNTupleLocator
0129    CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
0130    std::vector<RNTupleLocator> CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0131    std::uint64_t CommitClusterImpl() final;
0132    RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final;
0133    void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final;
0134    void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader);
0135    void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter);
0136    void WriteNTupleAnchor();
0137 
0138 public:
0139    RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options);
0140    ~RPageSinkDaos() override;
0141 
0142    RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;
0143    void ReleasePage(RPage &page) final;
0144 }; // class RPageSinkDaos
0145 
0146 // clang-format off
0147 /**
0148 \class ROOT::Experimental::Internal::RPageSourceDaos
0149 \ingroup NTuple
0150 \brief Storage provider that reads ntuple pages from a DAOS container
0151 */
0152 // clang-format on
0153 class RPageSourceDaos : public RPageSource {
0154 private:
0155    /// Summarizes cluster-level information that are necessary to populate a certain page.
0156    /// Used by PopulatePageFromCluster().
0157    struct RClusterInfo {
0158       DescriptorId_t fClusterId = 0;
0159       /// Location of the page on disk
0160       RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo;
0161       /// The first element number of the page's column in the given cluster
0162       std::uint64_t fColumnOffset = 0;
0163    };
0164 
0165    ntuple_index_t fNTupleIndex{0};
0166 
0167    /// Populated pages might be shared; the page pool might, at some point, be used by multiple page sources
0168    std::shared_ptr<RPagePool> fPagePool;
0169    /// The last cluster from which a page got populated.  Points into fClusterPool->fPool
0170    RCluster *fCurrentCluster = nullptr;
0171    /// A container that stores object data (header/footer, pages, etc.)
0172    std::unique_ptr<RDaosContainer> fDaosContainer;
0173    /// A URI to a DAOS pool of the form 'daos://pool-label/container-label'
0174    std::string fURI;
0175    /// The cluster pool asynchronously preloads the next few clusters
0176    std::unique_ptr<RClusterPool> fClusterPool;
0177 
0178    RNTupleDescriptorBuilder fDescriptorBuilder;
0179 
0180    RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo,
0181                                  ClusterSize_t::ValueType idxInCluster);
0182 
0183 protected:
0184    RNTupleDescriptor AttachImpl() final;
0185    void UnzipClusterImpl(RCluster *cluster) final;
0186 
0187 public:
0188    RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options);
0189    /// The cloned page source creates a new connection to the pool/container.
0190    /// The meta-data (header and footer) is reread and parsed by the clone.
0191    std::unique_ptr<RPageSource> Clone() const final;
0192    ~RPageSourceDaos() override;
0193 
0194    RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final;
0195    RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) final;
0196    void ReleasePage(RPage &page) final;
0197 
0198    void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final;
0199 
0200    std::vector<std::unique_ptr<RCluster>> LoadClusters(std::span<RCluster::RKey> clusterKeys) final;
0201 
0202    /// Return the object class used for user data OIDs in this ntuple.
0203    std::string GetObjectClass() const;
0204 }; // class RPageSourceDaos
0205 
0206 } // namespace Internal
0207 
0208 } // namespace Experimental
0209 } // namespace ROOT
0210 
0211 #endif