Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-16 09:08:35

0001 /// \file ROOT/RPageStorage.hxx
0002 /// \ingroup NTuple
0003 /// \author Jakob Blomer <jblomer@cern.ch>
0004 /// \date 2018-07-19
0005 
0006 /*************************************************************************
0007  * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers.               *
0008  * All rights reserved.                                                  *
0009  *                                                                       *
0010  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0011  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0012  *************************************************************************/
0013 
0014 #ifndef ROOT_RPageStorage
0015 #define ROOT_RPageStorage
0016 
0017 #include <ROOT/RError.hxx>
0018 #include <ROOT/RCluster.hxx>
0019 #include <ROOT/RColumnElementBase.hxx>
0020 #include <ROOT/RNTupleDescriptor.hxx>
0021 #include <ROOT/RNTupleMetrics.hxx>
0022 #include <ROOT/RNTupleReadOptions.hxx>
0023 #include <ROOT/RNTupleSerialize.hxx>
0024 #include <ROOT/RNTupleWriteOptions.hxx>
0025 #include <ROOT/RNTupleUtil.hxx>
0026 #include <ROOT/RPage.hxx>
0027 #include <ROOT/RPagePool.hxx>
0028 #include <ROOT/RSpan.hxx>
0029 #include <string_view>
0030 
0031 #include <atomic>
0032 #include <cassert>
0033 #include <cstddef>
0034 #include <deque>
0035 #include <functional>
0036 #include <map>
0037 #include <memory>
0038 #include <mutex>
0039 #include <set>
0040 #include <shared_mutex>
0041 #include <unordered_map>
0042 #include <unordered_set>
0043 #include <vector>
0044 
0045 namespace ROOT {
0046 
0047 class RNTupleModel;
0048 
0049 namespace Internal {
0050 
0051 class RPageAllocator;
0052 class RColumn;
0053 struct RNTupleModelChangeset;
0054 
0055 enum class EPageStorageType {
0056    kSink,
0057    kSource,
0058 };
0059 
0060 // clang-format off
0061 /**
0062 \class ROOT::Internal::RPageStorage
0063 \ingroup NTuple
0064 \brief Common functionality of an ntuple storage for both reading and writing
0065 
0066 The RPageStore provides access to a storage container that keeps the bits of pages and clusters comprising
0067 an ntuple.  Concrete implementations can use a TFile, a raw file, an object store, and so on.
0068 */
0069 // clang-format on
0070 class RPageStorage {
0071 public:
0072    /// The page checksum is a 64bit xxhash3
0073    static constexpr std::size_t kNBytesPageChecksum = sizeof(std::uint64_t);
0074 
0075    /// The interface of a task scheduler to schedule page (de)compression tasks
0076    class RTaskScheduler {
0077    public:
0078       virtual ~RTaskScheduler() = default;
0079       /// Take a callable that represents a task
0080       virtual void AddTask(const std::function<void(void)> &taskFunc) = 0;
0081       /// Blocks until all scheduled tasks finished
0082       virtual void Wait() = 0;
0083    };
0084 
0085    /// A sealed page contains the bytes of a page as written to storage (packed & compressed).  It is used
0086    /// as an input to UnsealPages() as well as to transfer pages between different storage media.
0087    /// RSealedPage does _not_ own the buffer it is pointing to in order to not interfere with the memory management
0088    /// of concrete page sink and page source implementations.
0089    struct RSealedPage {
0090    private:
0091       const void *fBuffer = nullptr;
0092       std::size_t fBufferSize = 0; ///< Size of the page payload and the trailing checksum (if available)
0093       std::uint32_t fNElements = 0;
0094       bool fHasChecksum = false; ///< If set, the last 8 bytes of the buffer are the xxhash of the rest of the buffer
0095 
0096    public:
0097       RSealedPage() = default;
0098       RSealedPage(const void *buffer, std::size_t bufferSize, std::uint32_t nElements, bool hasChecksum = false)
0099          : fBuffer(buffer), fBufferSize(bufferSize), fNElements(nElements), fHasChecksum(hasChecksum)
0100       {
0101       }
0102       RSealedPage(const RSealedPage &other) = default;
0103       RSealedPage &operator=(const RSealedPage &other) = default;
0104       RSealedPage(RSealedPage &&other) = default;
0105       RSealedPage &operator=(RSealedPage &&other) = default;
0106 
0107       const void *GetBuffer() const { return fBuffer; }
0108       void SetBuffer(const void *buffer) { fBuffer = buffer; }
0109 
0110       std::size_t GetDataSize() const
0111       {
0112          assert(fBufferSize >= fHasChecksum * kNBytesPageChecksum);
0113          return fBufferSize - fHasChecksum * kNBytesPageChecksum;
0114       }
0115       std::size_t GetBufferSize() const { return fBufferSize; }
0116       void SetBufferSize(std::size_t bufferSize) { fBufferSize = bufferSize; }
0117 
0118       std::uint32_t GetNElements() const { return fNElements; }
0119       void SetNElements(std::uint32_t nElements) { fNElements = nElements; }
0120 
0121       bool GetHasChecksum() const { return fHasChecksum; }
0122       void SetHasChecksum(bool hasChecksum) { fHasChecksum = hasChecksum; }
0123 
0124       void ChecksumIfEnabled();
0125       RResult<void> VerifyChecksumIfEnabled() const;
0126       /// Returns a failure if the sealed page has no checksum
0127       RResult<std::uint64_t> GetChecksum() const;
0128    };
0129 
0130    using SealedPageSequence_t = std::deque<RSealedPage>;
0131    /// A range of sealed pages referring to the same column that can be used for vector commit
0132    struct RSealedPageGroup {
0133       ROOT::DescriptorId_t fPhysicalColumnId;
0134       SealedPageSequence_t::const_iterator fFirst;
0135       SealedPageSequence_t::const_iterator fLast;
0136 
0137       RSealedPageGroup() = default;
0138       RSealedPageGroup(ROOT::DescriptorId_t d, SealedPageSequence_t::const_iterator b,
0139                        SealedPageSequence_t::const_iterator e)
0140          : fPhysicalColumnId(d), fFirst(b), fLast(e)
0141       {
0142       }
0143    };
0144 
0145 protected:
0146    ROOT::Experimental::Detail::RNTupleMetrics fMetrics;
0147 
0148    /// For the time being, we will use the heap allocator for all sources and sinks. This may change in the future.
0149    std::unique_ptr<ROOT::Internal::RPageAllocator> fPageAllocator;
0150 
0151    std::string fNTupleName;
0152    RTaskScheduler *fTaskScheduler = nullptr;
0153    void WaitForAllTasks()
0154    {
0155       if (!fTaskScheduler)
0156          return;
0157       fTaskScheduler->Wait();
0158    }
0159 
0160 public:
0161    explicit RPageStorage(std::string_view name);
0162    RPageStorage(const RPageStorage &other) = delete;
0163    RPageStorage &operator=(const RPageStorage &other) = delete;
0164    RPageStorage(RPageStorage &&other) = default;
0165    RPageStorage &operator=(RPageStorage &&other) = default;
0166    virtual ~RPageStorage();
0167 
0168    /// Whether the concrete implementation is a sink or a source
0169    virtual EPageStorageType GetType() = 0;
0170 
0171    struct RColumnHandle {
0172       ROOT::DescriptorId_t fPhysicalId = ROOT::kInvalidDescriptorId;
0173       ROOT::Internal::RColumn *fColumn = nullptr;
0174 
0175       /// Returns true for a valid column handle; fColumn and fPhysicalId should always either both
0176       /// be valid or both be invalid.
0177       explicit operator bool() const { return fPhysicalId != ROOT::kInvalidDescriptorId && fColumn; }
0178    };
0179    /// The column handle identifies a column with the current open page storage
0180    using ColumnHandle_t = RColumnHandle;
0181 
0182    /// Register a new column.  When reading, the column must exist in the ntuple on disk corresponding to the metadata.
0183    /// When writing, every column can only be attached once.
0184    virtual ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) = 0;
0185    /// Unregisters a column.  A page source decreases the reference counter for the corresponding active column.
0186    /// For a page sink, dropping columns is currently a no-op.
0187    virtual void DropColumn(ColumnHandle_t columnHandle) = 0;
0188    ROOT::DescriptorId_t GetColumnId(ColumnHandle_t columnHandle) const { return columnHandle.fPhysicalId; }
0189 
0190    /// Returns the default metrics object.  Subclasses might alternatively provide their own metrics object by
0191    /// overriding this.
0192    virtual ROOT::Experimental::Detail::RNTupleMetrics &GetMetrics() { return fMetrics; }
0193 
0194    /// Returns the NTuple name.
0195    const std::string &GetNTupleName() const { return fNTupleName; }
0196 
0197    void SetTaskScheduler(RTaskScheduler *taskScheduler) { fTaskScheduler = taskScheduler; }
0198 }; // class RPageStorage
0199 
0200 // clang-format off
0201 /**
0202 \class ROOT::Internal::RWritePageMemoryManager
0203 \ingroup NTuple
0204 \brief Helper to maintain a memory budget for the write pages of a set of columns
0205 
0206 The memory manager keeps track of the sum of bytes used by the write pages of a set of columns.
0207 It will flush (and shrink) large pages of other columns on the attempt to expand a page.
0208 */
0209 // clang-format on
0210 class RWritePageMemoryManager {
0211 private:
0212    struct RColumnInfo {
0213       ROOT::Internal::RColumn *fColumn = nullptr;
0214       std::size_t fCurrentPageSize = 0;
0215       std::size_t fInitialPageSize = 0;
0216 
0217       bool operator>(const RColumnInfo &other) const;
0218    };
0219 
0220    /// Sum of all the write page sizes (their capacity) of the columns in `fColumnsSortedByPageSize`
0221    std::size_t fCurrentAllocatedBytes = 0;
0222    /// Maximum allowed value for `fCurrentAllocatedBytes`, set from RNTupleWriteOptions::fPageBufferBudget
0223    std::size_t fMaxAllocatedBytes = 0;
0224    /// All columns that called `ReservePage()` (hence `TryUpdate()`) at least once,
0225    /// sorted by their current write page size from large to small
0226    std::set<RColumnInfo, std::greater<RColumnInfo>> fColumnsSortedByPageSize;
0227 
0228    /// Flush columns in order of allocated write page size until the sum of all write page allocations
0229    /// leaves space for at least targetAvailableSize bytes. Only use columns with a write page size larger
0230    /// than pageSizeLimit.
0231    bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit);
0232 
0233 public:
0234    explicit RWritePageMemoryManager(std::size_t maxAllocatedBytes) : fMaxAllocatedBytes(maxAllocatedBytes) {}
0235 
0236    /// Try to register the new write page size for the given column. Flush large columns to make space, if necessary.
0237    /// If not enough space is available after all (sum of write pages would be larger than fMaxAllocatedBytes),
0238    /// return false.
0239    bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize);
0240 };
0241 
0242 // clang-format off
0243 /**
0244 \class ROOT::Internal::RPageSink
0245 \ingroup NTuple
0246 \brief Abstract interface to write data into an ntuple
0247 
0248 The page sink takes the list of columns and afterwards a series of page commits and cluster commits.
0249 The user is responsible to commit clusters at a consistent point, i.e. when all pages corresponding to data
0250 up to the given entry number are committed.
0251 
0252 An object of this class may either be a wrapper (for example a RPageSinkBuf) or a "persistent" sink,
0253 inheriting from RPagePersistentSink.
0254 */
0255 // clang-format on
0256 class RPageSink : public RPageStorage {
0257 public:
0258    using Callback_t = std::function<void(RPageSink &)>;
0259 
0260    /// Cluster that was staged, but not yet logically appended to the RNTuple
0261    struct RStagedCluster {
0262       std::uint64_t fNBytesWritten = 0;
0263       ROOT::NTupleSize_t fNEntries = 0;
0264 
0265       struct RColumnInfo {
0266          ROOT::RClusterDescriptor::RPageRange fPageRange;
0267          ROOT::NTupleSize_t fNElements = ROOT::kInvalidNTupleIndex;
0268          std::uint32_t fCompressionSettings;
0269          bool fIsSuppressed = false;
0270       };
0271 
0272       std::vector<RColumnInfo> fColumnInfos;
0273    };
0274 
0275 protected:
0276    std::unique_ptr<ROOT::RNTupleWriteOptions> fOptions;
0277 
0278    /// Flag if sink was initialized
0279    bool fIsInitialized = false;
0280 
0281    /// Helper for streaming a page. This is commonly used in derived, concrete page sinks. Note that if
0282    /// compressionSetting is 0 (uncompressed) and the page is mappable and not checksummed, the returned sealed page
0283    /// will point directly to the input page buffer. Otherwise, the sealed page references fSealPageBuffer.  Thus,
0284    /// the buffer pointed to by the RSealedPage should never be freed.
0285    RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element);
0286 
0287 private:
0288    std::vector<Callback_t> fOnDatasetCommitCallbacks;
0289    std::vector<unsigned char> fSealPageBuffer; ///< Used as destination buffer in the simple SealPage overload
0290 
0291    /// Used in ReservePage to maintain the page buffer budget
0292    RWritePageMemoryManager fWritePageMemoryManager;
0293 
0294 public:
0295    RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options);
0296 
0297    RPageSink(const RPageSink &) = delete;
0298    RPageSink &operator=(const RPageSink &) = delete;
0299    RPageSink(RPageSink &&) = default;
0300    RPageSink &operator=(RPageSink &&) = default;
0301    ~RPageSink() override;
0302 
0303    EPageStorageType GetType() final { return EPageStorageType::kSink; }
0304    /// Returns the sink's write options.
0305    const ROOT::RNTupleWriteOptions &GetWriteOptions() const { return *fOptions; }
0306 
0307    void DropColumn(ColumnHandle_t /*columnHandle*/) final {}
0308 
0309    bool IsInitialized() const { return fIsInitialized; }
0310 
0311    /// Return the RNTupleDescriptor being constructed.
0312    virtual const ROOT::RNTupleDescriptor &GetDescriptor() const = 0;
0313 
0314    virtual ROOT::NTupleSize_t GetNEntries() const = 0;
0315 
0316    /// Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket)
0317    /// Init() associates column handles to the columns referenced by the model
0318    void Init(RNTupleModel &model)
0319    {
0320       if (fIsInitialized) {
0321          throw RException(R__FAIL("already initialized"));
0322       }
0323       fIsInitialized = true;
0324       InitImpl(model);
0325    }
0326 
0327 protected:
0328    virtual void InitImpl(RNTupleModel &model) = 0;
0329    virtual void CommitDatasetImpl() = 0;
0330 
0331 public:
0332    /// Parameters for the SealPage() method
0333    struct RSealPageConfig {
0334       const ROOT::Internal::RPage *fPage = nullptr; ///< Input page to be sealed
0335       const ROOT::Internal::RColumnElementBase *fElement =
0336          nullptr;                                   ///< Corresponds to the page's elements, for size calculation etc.
0337       std::uint32_t fCompressionSettings = 0;       ///< Compression algorithm and level to apply
0338       /// Adds a 8 byte little-endian xxhash3 checksum to the page payload. The buffer has to be large enough to
0339       /// to store the additional 8 bytes.
0340       bool fWriteChecksum = true;
0341       /// If false, the output buffer must not point to the input page buffer, which would otherwise be an option
0342       /// if the page is mappable and should not be compressed
0343       bool fAllowAlias = false;
0344       /// Location for sealed output. The memory buffer has to be large enough.
0345       void *fBuffer = nullptr;
0346    };
0347 
0348    /// Seal a page using the provided info.
0349    static RSealedPage SealPage(const RSealPageConfig &config);
0350 
0351    /// Incorporate incremental changes to the model into the ntuple descriptor. This happens, e.g. if new fields were
0352    /// added after the initial call to `RPageSink::Init(RNTupleModel &)`.
0353    /// `firstEntry` specifies the global index for the first stored element in the added columns.
0354    virtual void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) = 0;
0355    /// Adds an extra type information record to schema. The extra type information will be written to the
0356    /// extension header. The information in the record will be merged with the existing information, e.g.
0357    /// duplicate streamer info records will be removed. This method is called by the "on commit dataset" callback
0358    /// registered by specific fields (e.g., streamer field) and during merging.
0359    virtual void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) = 0;
0360 
0361    /// Commits a suppressed column for the current cluster. Can be called anytime before CommitCluster().
0362    /// For any given column and cluster, there must be no calls to both CommitSuppressedColumn() and page commits.
0363    virtual void CommitSuppressedColumn(ColumnHandle_t columnHandle) = 0;
0364    /// Write a page to the storage. The column must have been added before.
0365    virtual void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) = 0;
0366    /// Write a preprocessed page to storage. The column must have been added before.
0367    virtual void
0368    CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
0369    /// Write a vector of preprocessed pages to storage. The corresponding columns must have been added before.
0370    virtual void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) = 0;
0371    /// Stage the current cluster and create a new one for the following data.
0372    /// Returns the object that must be passed to CommitStagedClusters to logically append the staged cluster to the
0373    /// ntuple descriptor.
0374    virtual RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) = 0;
0375    /// Commit staged clusters, logically appending them to the ntuple descriptor.
0376    virtual void CommitStagedClusters(std::span<RStagedCluster> clusters) = 0;
0377    /// Finalize the current cluster and create a new one for the following data.
0378    /// Returns the number of bytes written to storage (excluding metadata).
0379    virtual std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries)
0380    {
0381       RStagedCluster stagedClusters[] = {StageCluster(nNewEntries)};
0382       CommitStagedClusters(stagedClusters);
0383       return stagedClusters[0].fNBytesWritten;
0384    }
0385    /// Write out the page locations (page list envelope) for all the committed clusters since the last call of
0386    /// CommitClusterGroup (or the beginning of writing).
0387    virtual void CommitClusterGroup() = 0;
0388 
0389    /// The registered callback is executed at the beginning of CommitDataset();
0390    void RegisterOnCommitDatasetCallback(Callback_t callback) { fOnDatasetCommitCallbacks.emplace_back(callback); }
0391    /// Run the registered callbacks and finalize the current cluster and the entrire data set.
0392    void CommitDataset();
0393 
0394    /// Get a new, empty page for the given column that can be filled with up to nElements;
0395    /// nElements must be larger than zero.
0396    virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements);
0397 
0398    /// An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
0399    class RSinkGuard {
0400       std::mutex *fLock;
0401 
0402    public:
0403       explicit RSinkGuard(std::mutex *lock) : fLock(lock)
0404       {
0405          if (fLock != nullptr) {
0406             fLock->lock();
0407          }
0408       }
0409       RSinkGuard(const RSinkGuard &) = delete;
0410       RSinkGuard &operator=(const RSinkGuard &) = delete;
0411       RSinkGuard(RSinkGuard &&) = delete;
0412       RSinkGuard &operator=(RSinkGuard &&) = delete;
0413       ~RSinkGuard()
0414       {
0415          if (fLock != nullptr) {
0416             fLock->unlock();
0417          }
0418       }
0419    };
0420 
0421    virtual RSinkGuard GetSinkGuard()
0422    {
0423       // By default, there is no lock and the guard does nothing.
0424       return RSinkGuard(nullptr);
0425    }
0426 }; // class RPageSink
0427 
0428 // clang-format off
0429 /**
0430 \class ROOT::Internal::RPagePersistentSink
0431 \ingroup NTuple
0432 \brief Base class for a sink with a physical storage backend
0433 */
0434 // clang-format on
0435 class RPagePersistentSink : public RPageSink {
0436 private:
0437    /// Used to map the IDs of the descriptor to the physical IDs issued during header/footer serialization
0438    ROOT::Internal::RNTupleSerializer::RContext fSerializationContext;
0439 
0440    /// Remembers the starting cluster id for the next cluster group
0441    std::uint64_t fNextClusterInGroup = 0;
0442    /// Used to calculate the number of entries in the current cluster
0443    ROOT::NTupleSize_t fPrevClusterNEntries = 0;
0444    /// Keeps track of the number of elements in the currently open cluster. Indexed by column id.
0445    std::vector<ROOT::RClusterDescriptor::RColumnRange> fOpenColumnRanges;
0446    /// Keeps track of the written pages in the currently open cluster. Indexed by column id.
0447    std::vector<ROOT::RClusterDescriptor::RPageRange> fOpenPageRanges;
0448 
0449    /// Union of the streamer info records that are sent from streamer fields to the sink before committing the dataset.
0450    ROOT::Internal::RNTupleSerializer::StreamerInfoMap_t fStreamerInfos;
0451 
0452 protected:
0453    /// Set of optional features supported by the persistent sink
0454    struct RFeatures {
0455       bool fCanMergePages = false;
0456    };
0457 
0458    RFeatures fFeatures;
0459    ROOT::Internal::RNTupleDescriptorBuilder fDescriptorBuilder;
0460 
0461    /// Default I/O performance counters that get registered in fMetrics
0462    struct RCounters {
0463       ROOT::Experimental::Detail::RNTupleAtomicCounter &fNPageCommitted;
0464       ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzWritePayload;
0465       ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzZip;
0466       ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallWrite;
0467       ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallZip;
0468       ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuWrite;
0469       ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuZip;
0470    };
0471    std::unique_ptr<RCounters> fCounters;
0472 
0473    virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length) = 0;
0474 
0475    virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) = 0;
0476    virtual RNTupleLocator
0477    CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
0478    /// Vector commit of preprocessed pages. The `ranges` array specifies a range of sealed pages to be
0479    /// committed for each column.  The returned vector contains, in order, the RNTupleLocator for each
0480    /// page on each range in `ranges`, i.e. the first N entries refer to the N pages in `ranges[0]`,
0481    /// followed by M entries that refer to the M pages in `ranges[1]`, etc.
0482    /// The mask allows to skip writing out certain pages. The vector has the size of all the pages.
0483    /// For every `false` value in the mask, the corresponding locator is skipped (missing) in the output vector.
0484    /// The default is to call `CommitSealedPageImpl` for each page; derived classes may provide an
0485    /// optimized implementation though.
0486    virtual std::vector<RNTupleLocator>
0487    CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask);
0488    /// Returns the number of bytes written to storage (excluding metadata)
0489    virtual std::uint64_t StageClusterImpl() = 0;
0490    /// Returns the locator of the page list envelope of the given buffer that contains the serialized page list.
0491    /// Typically, the implementation takes care of compressing and writing the provided buffer.
0492    virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) = 0;
0493    virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) = 0;
0494 
0495    /// Enables the default set of metrics provided by RPageSink. `prefix` will be used as the prefix for
0496    /// the counters registered in the internal RNTupleMetrics object.
0497    /// This set of counters can be extended by a subclass by calling `fMetrics.MakeCounter<...>()`.
0498    ///
0499    /// A subclass using the default set of metrics is always responsible for updating the counters
0500    /// appropriately, e.g. `fCounters->fNPageCommited.Inc()`
0501    void EnableDefaultMetrics(const std::string &prefix);
0502 
0503 public:
0504    RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options);
0505 
0506    RPagePersistentSink(const RPagePersistentSink &) = delete;
0507    RPagePersistentSink &operator=(const RPagePersistentSink &) = delete;
0508    RPagePersistentSink(RPagePersistentSink &&) = default;
0509    RPagePersistentSink &operator=(RPagePersistentSink &&) = default;
0510    ~RPagePersistentSink() override;
0511 
0512    /// Guess the concrete derived page source from the location
0513    static std::unique_ptr<RPageSink> Create(std::string_view ntupleName, std::string_view location,
0514                                             const ROOT::RNTupleWriteOptions &options = ROOT::RNTupleWriteOptions());
0515 
0516    ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final;
0517 
0518    const ROOT::RNTupleDescriptor &GetDescriptor() const final { return fDescriptorBuilder.GetDescriptor(); }
0519 
0520    ROOT::NTupleSize_t GetNEntries() const final { return fPrevClusterNEntries; }
0521 
0522    /// Updates the descriptor and calls InitImpl() that handles the backend-specific details (file, DAOS, etc.)
0523    void InitImpl(RNTupleModel &model) final;
0524    void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final;
0525    void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final;
0526 
0527    /// Initialize sink based on an existing descriptor and fill into the descriptor builder, optionally copying over
0528    /// the descriptor's clusters to this sink's descriptor.
0529    /// \return The model created from the new sink's descriptor. This model should be kept alive
0530    /// for at least as long as the sink.
0531    [[nodiscard]] std::unique_ptr<RNTupleModel>
0532    InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters);
0533 
0534    void CommitSuppressedColumn(ColumnHandle_t columnHandle) final;
0535    void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final;
0536    void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
0537    void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0538    RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final;
0539    void CommitStagedClusters(std::span<RStagedCluster> clusters) final;
0540    void CommitClusterGroup() final;
0541    void CommitDatasetImpl() final;
0542 }; // class RPagePersistentSink
0543 
0544 // clang-format off
0545 /**
0546 \class ROOT::Internal::RPageSource
0547 \ingroup NTuple
0548 \brief Abstract interface to read data from an ntuple
0549 
0550 The page source is initialized with the columns of interest. Alias columns from projected fields are mapped to the
0551 corresponding physical columns. Pages from the columns of interest can then be mapped into memory.
0552 The page source also gives access to the ntuple's metadata.
0553 */
0554 // clang-format on
0555 class RPageSource : public RPageStorage {
0556 public:
0557    /// Used in SetEntryRange / GetEntryRange
0558    struct REntryRange {
0559       ROOT::NTupleSize_t fFirstEntry = ROOT::kInvalidNTupleIndex;
0560       ROOT::NTupleSize_t fNEntries = 0;
0561 
0562       /// Returns true if the given cluster has entries within the entry range
0563       bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const;
0564    };
0565 
0566    /// An RAII wrapper used for the read-only access to `RPageSource::fDescriptor`. See `GetExclDescriptorGuard()``.
0567    class RSharedDescriptorGuard {
0568       const ROOT::RNTupleDescriptor &fDescriptor;
0569       std::shared_mutex &fLock;
0570 
0571    public:
0572       RSharedDescriptorGuard(const ROOT::RNTupleDescriptor &desc, std::shared_mutex &lock)
0573          : fDescriptor(desc), fLock(lock)
0574       {
0575          fLock.lock_shared();
0576       }
0577       RSharedDescriptorGuard(const RSharedDescriptorGuard &) = delete;
0578       RSharedDescriptorGuard &operator=(const RSharedDescriptorGuard &) = delete;
0579       RSharedDescriptorGuard(RSharedDescriptorGuard &&) = delete;
0580       RSharedDescriptorGuard &operator=(RSharedDescriptorGuard &&) = delete;
0581       ~RSharedDescriptorGuard() { fLock.unlock_shared(); }
0582       const ROOT::RNTupleDescriptor *operator->() const { return &fDescriptor; }
0583       const ROOT::RNTupleDescriptor &GetRef() const { return fDescriptor; }
0584    };
0585 
0586    /// An RAII wrapper used for the writable access to `RPageSource::fDescriptor`. See `GetSharedDescriptorGuard()`.
0587    class RExclDescriptorGuard {
0588       ROOT::RNTupleDescriptor &fDescriptor;
0589       std::shared_mutex &fLock;
0590 
0591    public:
0592       RExclDescriptorGuard(ROOT::RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
0593       {
0594          fLock.lock();
0595       }
0596       RExclDescriptorGuard(const RExclDescriptorGuard &) = delete;
0597       RExclDescriptorGuard &operator=(const RExclDescriptorGuard &) = delete;
0598       RExclDescriptorGuard(RExclDescriptorGuard &&) = delete;
0599       RExclDescriptorGuard &operator=(RExclDescriptorGuard &&) = delete;
0600       ~RExclDescriptorGuard()
0601       {
0602          fDescriptor.IncGeneration();
0603          fLock.unlock();
0604       }
0605       ROOT::RNTupleDescriptor *operator->() const { return &fDescriptor; }
0606       void MoveIn(ROOT::RNTupleDescriptor desc) { fDescriptor = std::move(desc); }
0607    };
0608 
0609 private:
0610    ROOT::RNTupleDescriptor fDescriptor;
0611    mutable std::shared_mutex fDescriptorLock;
0612    REntryRange fEntryRange;    ///< Used by the cluster pool to prevent reading beyond the given range
0613    bool fHasStructure = false; ///< Set to true once `LoadStructure()` is called
0614    bool fIsAttached = false;   ///< Set to true once `Attach()` is called
0615    bool fHasStreamerInfosRegistered = false; ///< Set to true when RegisterStreamerInfos() is called.
0616 
0617    /// Remembers the last cluster id from which a page was requested
0618    ROOT::DescriptorId_t fLastUsedCluster = ROOT::kInvalidDescriptorId;
0619    /// Clusters from where pages got preloaded in UnzipClusterImpl(), ordered by first entry number
0620    /// of the clusters. If the last used cluster changes in LoadPage(), all unused pages from
0621    /// previous clusters are evicted from the page pool.
0622    std::map<ROOT::NTupleSize_t, ROOT::DescriptorId_t> fPreloadedClusters;
0623 
0624    /// Does nothing if fLastUsedCluster == clusterId. Otherwise, updated fLastUsedCluster
0625    /// and evict unused paged from the page pool of all previous clusters.
0626    /// Must not be called when the descriptor guard is taken.
0627    void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId);
0628 
0629 protected:
0630    /// Default I/O performance counters that get registered in `fMetrics`
0631    struct RCounters {
0632       ROOT::Experimental::Detail::RNTupleAtomicCounter &fNReadV;
0633       ROOT::Experimental::Detail::RNTupleAtomicCounter &fNRead;
0634       ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzReadPayload;
0635       ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzReadOverhead;
0636       ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzUnzip;
0637       ROOT::Experimental::Detail::RNTupleAtomicCounter &fNClusterLoaded;
0638       ROOT::Experimental::Detail::RNTupleAtomicCounter &fNPageRead;
0639       ROOT::Experimental::Detail::RNTupleAtomicCounter &fNPageUnsealed;
0640       ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallRead;
0641       ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallUnzip;
0642       ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuRead;
0643       ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuUnzip;
0644       ROOT::Experimental::Detail::RNTupleCalcPerf &fBandwidthReadUncompressed;
0645       ROOT::Experimental::Detail::RNTupleCalcPerf &fBandwidthReadCompressed;
0646       ROOT::Experimental::Detail::RNTupleCalcPerf &fBandwidthUnzip;
0647       ROOT::Experimental::Detail::RNTupleCalcPerf &fFractionReadOverhead;
0648       ROOT::Experimental::Detail::RNTupleCalcPerf &fCompressionRatio;
0649    };
0650 
0651    /// Keeps track of the requested physical column IDs and their in-memory target type via a column element identifier.
0652    /// When using alias columns (projected fields), physical columns may be requested multiple times.
0653    class RActivePhysicalColumns {
0654    public:
0655       struct RColumnInfo {
0656          ROOT::Internal::RColumnElementBase::RIdentifier fElementId;
0657          std::size_t fRefCounter = 0;
0658       };
0659 
0660    private:
0661       /// Maps physical column IDs to all the requested in-memory representations.
0662       /// A pair of physical column ID and in-memory representation can be requested multiple times, which is
0663       /// indicated by the reference counter.
0664       /// We can only have a handful of possible in-memory representations for a given column,
0665       /// so it is fine to search them linearly.
0666       std::unordered_map<ROOT::DescriptorId_t, std::vector<RColumnInfo>> fColumnInfos;
0667 
0668    public:
0669       void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId);
0670       void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId);
0671       ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const;
0672       bool HasColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
0673       {
0674          return fColumnInfos.count(physicalColumnId) > 0;
0675       }
0676       const std::vector<RColumnInfo> &GetColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
0677       {
0678          return fColumnInfos.at(physicalColumnId);
0679       }
0680    };
0681 
0682    /// Summarizes cluster-level information that are necessary to load a certain page.
0683    /// Used by LoadPageImpl().
0684    struct RClusterInfo {
0685       ROOT::DescriptorId_t fClusterId = 0;
0686       /// Location of the page on disk
0687       ROOT::RClusterDescriptor::RPageInfoExtended fPageInfo;
0688       /// The first element number of the page's column in the given cluster
0689       std::uint64_t fColumnOffset = 0;
0690    };
0691 
0692    std::unique_ptr<RCounters> fCounters;
0693 
0694    ROOT::RNTupleReadOptions fOptions;
0695    /// The active columns are implicitly defined by the model fields or views
0696    RActivePhysicalColumns fActivePhysicalColumns;
0697 
0698    /// Pages that are unzipped with IMT are staged into the page pool
0699    ROOT::Internal::RPagePool fPagePool;
0700 
0701    virtual void LoadStructureImpl() = 0;
0702    /// `LoadStructureImpl()` has been called before `AttachImpl()` is called
0703    virtual ROOT::RNTupleDescriptor AttachImpl(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode) = 0;
0704    /// Returns a new, unattached page source for the same data set
0705    virtual std::unique_ptr<RPageSource> CloneImpl() const = 0;
0706    // Only called if a task scheduler is set. No-op be default.
0707    virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster);
0708    // Returns a page from storage if not found in the page pool. Should be able to handle zero page locators.
0709    virtual ROOT::Internal::RPageRef
0710    LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ROOT::NTupleSize_t idxInCluster) = 0;
0711 
0712    /// Prepare a page range read for the column set in `clusterKey`.  Specifically, pages referencing the
0713    /// `kTypePageZero` locator are filled in `pageZeroMap`; otherwise, `perPageFunc` is called for each page. This is
0714    /// commonly used as part of `LoadClusters()` in derived classes.
0715    void PrepareLoadCluster(
0716       const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap,
0717       std::function<void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)>
0718          perPageFunc);
0719 
0720    /// Enables the default set of metrics provided by RPageSource. `prefix` will be used as the prefix for
0721    /// the counters registered in the internal RNTupleMetrics object.
0722    /// A subclass using the default set of metrics is responsible for updating the counters
0723    /// appropriately, e.g. `fCounters->fNRead.Inc()`
0724    /// Alternatively, a subclass might provide its own RNTupleMetrics object by overriding the
0725    /// `GetMetrics()` member function.
0726    void EnableDefaultMetrics(const std::string &prefix);
0727 
0728    /// Note that the underlying lock is not recursive. See `GetSharedDescriptorGuard()` for further information.
0729    RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); }
0730 
0731 public:
0732    RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions);
0733    RPageSource(const RPageSource &) = delete;
0734    RPageSource &operator=(const RPageSource &) = delete;
0735    RPageSource(RPageSource &&) = delete;
0736    RPageSource &operator=(RPageSource &&) = delete;
0737    ~RPageSource() override;
0738    /// Guess the concrete derived page source from the file name (location)
0739    static std::unique_ptr<RPageSource> Create(std::string_view ntupleName, std::string_view location,
0740                                               const ROOT::RNTupleReadOptions &options = ROOT::RNTupleReadOptions());
0741    /// Open the same storage multiple time, e.g. for reading in multiple threads.
0742    /// If the source is already attached, the clone will be attached, too. The clone will use, however,
0743    /// it's own connection to the underlying storage (e.g., file descriptor, XRootD handle, etc.)
0744    std::unique_ptr<RPageSource> Clone() const;
0745 
0746    /// Helper for unstreaming a page. This is commonly used in derived, concrete page sources.  The implementation
0747    /// currently always makes a memory copy, even if the sealed page is uncompressed and in the final memory layout.
0748    /// The optimization of directly mapping pages is left to the concrete page source implementations.
0749    RResult<ROOT::Internal::RPage> static UnsealPage(const RSealedPage &sealedPage,
0750                                                     const ROOT::Internal::RColumnElementBase &element,
0751                                                     ROOT::Internal::RPageAllocator &pageAlloc);
0752 
0753    EPageStorageType GetType() final { return EPageStorageType::kSource; }
0754    const ROOT::RNTupleReadOptions &GetReadOptions() const { return fOptions; }
0755 
0756    /// Takes the read lock for the descriptor. Multiple threads can take the lock concurrently.
0757    /// The underlying `std::shared_mutex`, however, is neither read nor write recursive:
0758    /// within one thread, only one lock (shared or exclusive) must be acquired at the same time. This requires special
0759    /// care in sections protected by `GetSharedDescriptorGuard()` and `GetExclDescriptorGuard()` especially to avoid
0760    /// that the locks are acquired indirectly (e.g. by a call to `GetNEntries()`). As a general guideline, no other
0761    /// method of the page source should be called (directly or indirectly) in a guarded section.
0762    const RSharedDescriptorGuard GetSharedDescriptorGuard() const
0763    {
0764       return RSharedDescriptorGuard(fDescriptor, fDescriptorLock);
0765    }
0766 
0767    ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override;
0768    void DropColumn(ColumnHandle_t columnHandle) override;
0769 
0770    /// Loads header and footer without decompressing or deserializing them. This can be used to asynchronously open
0771    /// a file in the background. The method is idempotent and it is called as a first step in `Attach()`.
0772    /// Pages sources may or may not make use of splitting loading and processing metadata.
0773    /// Therefore, `LoadStructure()` may do nothing and defer loading the metadata to `Attach()`.
0774    void LoadStructure();
0775    /// Open the physical storage container and deserialize header and footer
0776    void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode =
0777                   ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading);
0778    ROOT::NTupleSize_t GetNEntries();
0779    ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle);
0780 
0781    /// Promise to only read from the given entry range. If set, prevents the cluster pool from reading-ahead beyond
0782    /// the given range. The range needs to be within `[0, GetNEntries())`.
0783    void SetEntryRange(const REntryRange &range);
0784    REntryRange GetEntryRange() const { return fEntryRange; }
0785 
0786    /// Allocates and fills a page that contains the index-th element. The default implementation searches
0787    /// the page and calls LoadPageImpl(). Returns a default-constructed RPage for suppressed columns.
0788    virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex);
0789    /// Another version of `LoadPage` that allows to specify cluster-relative indexes.
0790    /// Returns a default-constructed RPage for suppressed columns.
0791    virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, RNTupleLocalIndex localIndex);
0792 
0793    /// Read the packed and compressed bytes of a page into the memory buffer provided by `sealedPage`. The sealed page
0794    /// can be used subsequently in a call to `RPageSink::CommitSealedPage`.
0795    /// The `fSize` and `fNElements` member of the sealedPage parameters are always set. If `sealedPage.fBuffer` is
0796    /// `nullptr`, no data will be copied but the returned size information can be used by the caller to allocate a large
0797    /// enough buffer and call `LoadSealedPage` again.
0798    virtual void
0799    LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage) = 0;
0800 
0801    /// Populates all the pages of the given cluster ids and columns; it is possible that some columns do not
0802    /// contain any pages.  The page source may load more columns than the minimal necessary set from `columns`.
0803    /// To indicate which columns have been loaded, `LoadClusters()`` must mark them with `SetColumnAvailable()`.
0804    /// That includes the ones from the `columns` that don't have pages; otherwise subsequent requests
0805    /// for the cluster would assume an incomplete cluster and trigger loading again.
0806    /// `LoadClusters()` is typically called from the I/O thread of a cluster pool, i.e. the method runs
0807    /// concurrently to other methods of the page source.
0808    virtual std::vector<std::unique_ptr<ROOT::Internal::RCluster>>
0809    LoadClusters(std::span<ROOT::Internal::RCluster::RKey> clusterKeys) = 0;
0810 
0811    /// Parallel decompression and unpacking of the pages in the given cluster. The unzipped pages are supposed
0812    /// to be preloaded in a page pool attached to the source. The method is triggered by the cluster pool's
0813    /// unzip thread. It is an optional optimization, the method can safely do nothing. In particular, the
0814    /// actual implementation will only run if a task scheduler is set. In practice, a task scheduler is set
0815    /// if implicit multi-threading is turned on.
0816    void UnzipCluster(ROOT::Internal::RCluster *cluster);
0817 
0818    // TODO(gparolini): for symmetry with SealPage(), we should either make this private or SealPage() public.
0819    RResult<ROOT::Internal::RPage>
0820    UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element);
0821 
0822    /// Builds the streamer info records from the descriptor's extra type info section. This is necessary when
0823    /// connecting streamer fields so that emulated classes can be read.
0824    void RegisterStreamerInfos();
0825 }; // class RPageSource
0826 
0827 } // namespace Internal
0828 } // namespace ROOT
0829 
0830 #endif