Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:10:47

0001 /// \file ROOT/RPageStorage.hxx
0002 /// \ingroup NTuple ROOT7
0003 /// \author Jakob Blomer <jblomer@cern.ch>
0004 /// \date 2018-07-19
0005 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
0006 /// is welcome!
0007 
0008 /*************************************************************************
0009  * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers.               *
0010  * All rights reserved.                                                  *
0011  *                                                                       *
0012  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0013  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0014  *************************************************************************/
0015 
0016 #ifndef ROOT7_RPageStorage
0017 #define ROOT7_RPageStorage
0018 
0019 #include <ROOT/RCluster.hxx>
0020 #include <ROOT/RNTupleDescriptor.hxx>
0021 #include <ROOT/RNTupleMetrics.hxx>
0022 #include <ROOT/RNTupleReadOptions.hxx>
0023 #include <ROOT/RNTupleWriteOptions.hxx>
0024 #include <ROOT/RNTupleUtil.hxx>
0025 #include <ROOT/RPage.hxx>
0026 #include <ROOT/RPageAllocator.hxx>
0027 #include <ROOT/RSpan.hxx>
0028 #include <string_view>
0029 
0030 #include <atomic>
0031 #include <cstddef>
0032 #include <deque>
0033 #include <functional>
0034 #include <memory>
0035 #include <mutex>
0036 #include <shared_mutex>
0037 #include <unordered_set>
0038 #include <vector>
0039 
0040 namespace ROOT {
0041 namespace Experimental {
0042 
0043 class RFieldBase;
0044 class RNTupleModel;
0045 
0046 namespace Internal {
0047 class RColumn;
0048 class RColumnElementBase;
0049 class RNTupleCompressor;
0050 class RNTupleDecompressor;
0051 struct RNTupleModelChangeset;
0052 
0053 enum class EPageStorageType {
0054    kSink,
0055    kSource,
0056 };
0057 
0058 // clang-format off
0059 /**
0060 \class ROOT::Experimental::Internal::RPageStorage
0061 \ingroup NTuple
0062 \brief Common functionality of an ntuple storage for both reading and writing
0063 
0064 The RPageStore provides access to a storage container that keeps the bits of pages and clusters comprising
0065 an ntuple.  Concrete implementations can use a TFile, a raw file, an object store, and so on.
0066 */
0067 // clang-format on
0068 class RPageStorage {
0069 public:
0070    /// The interface of a task scheduler to schedule page (de)compression tasks
0071    class RTaskScheduler {
0072    public:
0073       virtual ~RTaskScheduler() = default;
0074       /// Take a callable that represents a task
0075       virtual void AddTask(const std::function<void(void)> &taskFunc) = 0;
0076       /// Blocks until all scheduled tasks finished
0077       virtual void Wait() = 0;
0078    };
0079 
0080    /// A sealed page contains the bytes of a page as written to storage (packed & compressed).  It is used
0081    /// as an input to UnsealPages() as well as to transfer pages between different storage media.
0082    /// RSealedPage does _not_ own the buffer it is pointing to in order to not interfere with the memory management
0083    /// of concrete page sink and page source implementations.
0084    struct RSealedPage {
0085       const void *fBuffer = nullptr;
0086       std::uint32_t fSize = 0;
0087       std::uint32_t fNElements = 0;
0088 
0089       RSealedPage() = default;
0090       RSealedPage(const void *b, std::uint32_t s, std::uint32_t n) : fBuffer(b), fSize(s), fNElements(n) {}
0091       RSealedPage(const RSealedPage &other) = delete;
0092       RSealedPage& operator =(const RSealedPage &other) = delete;
0093       RSealedPage(RSealedPage &&other) = default;
0094       RSealedPage& operator =(RSealedPage &&other) = default;
0095    };
0096 
0097    using SealedPageSequence_t = std::deque<RSealedPage>;
0098    /// A range of sealed pages referring to the same column that can be used for vector commit
0099    struct RSealedPageGroup {
0100       DescriptorId_t fPhysicalColumnId;
0101       SealedPageSequence_t::const_iterator fFirst;
0102       SealedPageSequence_t::const_iterator fLast;
0103 
0104       RSealedPageGroup(DescriptorId_t d, SealedPageSequence_t::const_iterator b, SealedPageSequence_t::const_iterator e)
0105          : fPhysicalColumnId(d), fFirst(b), fLast(e)
0106       {
0107       }
0108    };
0109 
0110 protected:
0111    Detail::RNTupleMetrics fMetrics;
0112 
0113    std::string fNTupleName;
0114    RTaskScheduler *fTaskScheduler = nullptr;
0115    void WaitForAllTasks()
0116    {
0117       if (!fTaskScheduler)
0118          return;
0119       fTaskScheduler->Wait();
0120    }
0121 
0122 public:
0123    explicit RPageStorage(std::string_view name);
0124    RPageStorage(const RPageStorage &other) = delete;
0125    RPageStorage& operator =(const RPageStorage &other) = delete;
0126    RPageStorage(RPageStorage &&other) = default;
0127    RPageStorage& operator =(RPageStorage &&other) = default;
0128    virtual ~RPageStorage();
0129 
0130    /// Whether the concrete implementation is a sink or a source
0131    virtual EPageStorageType GetType() = 0;
0132 
0133    struct RColumnHandle {
0134       DescriptorId_t fPhysicalId = kInvalidDescriptorId;
0135       const RColumn *fColumn = nullptr;
0136 
0137       /// Returns true for a valid column handle; fColumn and fPhysicalId should always either both
0138       /// be valid or both be invalid.
0139       explicit operator bool() const { return fPhysicalId != kInvalidDescriptorId && fColumn; }
0140    };
0141    /// The column handle identifies a column with the current open page storage
0142    using ColumnHandle_t = RColumnHandle;
0143 
0144    /// Register a new column.  When reading, the column must exist in the ntuple on disk corresponding to the meta-data.
0145    /// When writing, every column can only be attached once.
0146    virtual ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) = 0;
0147    /// Unregisters a column.  A page source decreases the reference counter for the corresponding active column.
0148    /// For a page sink, dropping columns is currently a no-op.
0149    virtual void DropColumn(ColumnHandle_t columnHandle) = 0;
0150 
0151    /// Every page store needs to be able to free pages it handed out.  But Sinks and sources have different means
0152    /// of allocating pages.
0153    virtual void ReleasePage(RPage &page) = 0;
0154 
0155    /// Returns the default metrics object.  Subclasses might alternatively provide their own metrics object by
0156    /// overriding this.
0157    virtual Detail::RNTupleMetrics &GetMetrics() { return fMetrics; }
0158 
0159    /// Returns the NTuple name.
0160    const std::string &GetNTupleName() const { return fNTupleName; }
0161 
0162    void SetTaskScheduler(RTaskScheduler *taskScheduler) { fTaskScheduler = taskScheduler; }
0163 }; // class RPageStorage
0164 
0165 // clang-format off
0166 /**
0167 \class ROOT::Experimental::Internal::RPageSink
0168 \ingroup NTuple
0169 \brief Abstract interface to write data into an ntuple
0170 
0171 The page sink takes the list of columns and afterwards a series of page commits and cluster commits.
0172 The user is responsible to commit clusters at a consistent point, i.e. when all pages corresponding to data
0173 up to the given entry number are committed.
0174 
0175 An object of this class may either be a wrapper (for example a RPageSinkBuf) or a "persistent" sink,
0176 inheriting from RPagePersistentSink.
0177 */
0178 // clang-format on
0179 class RPageSink : public RPageStorage {
0180 protected:
0181    std::unique_ptr<RNTupleWriteOptions> fOptions;
0182 
0183    /// Helper to zip pages and header/footer; includes a 16MB (kMAXZIPBUF) zip buffer.
0184    /// There could be concrete page sinks that don't need a compressor.  Therefore, and in order to stay consistent
0185    /// with the page source, we leave it up to the derived class whether or not the compressor gets constructed.
0186    std::unique_ptr<RNTupleCompressor> fCompressor;
0187 
0188    /// Helper for streaming a page. This is commonly used in derived, concrete page sinks. Note that if
0189    /// compressionSetting is 0 (uncompressed) and the page is mappable, the returned sealed page will
0190    /// point directly to the input page buffer.  Otherwise, the sealed page references an internal buffer
0191    /// of fCompressor.  Thus, the buffer pointed to by the RSealedPage should never be freed.
0192    /// Usage of this method requires construction of fCompressor.
0193    RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting);
0194 
0195    /// Seal a page using the provided buffer.
0196    static RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting, void *buf,
0197                                bool allowAlias = true);
0198 
0199 private:
0200    /// Flag if sink was initialized
0201    bool fIsInitialized = false;
0202 
0203 public:
0204    RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options);
0205 
0206    RPageSink(const RPageSink&) = delete;
0207    RPageSink& operator=(const RPageSink&) = delete;
0208    RPageSink(RPageSink&&) = default;
0209    RPageSink& operator=(RPageSink&&) = default;
0210    ~RPageSink() override;
0211 
0212    EPageStorageType GetType() final { return EPageStorageType::kSink; }
0213    /// Returns the sink's write options.
0214    const RNTupleWriteOptions &GetWriteOptions() const { return *fOptions; }
0215 
0216    void DropColumn(ColumnHandle_t /*columnHandle*/) final {}
0217 
0218    bool IsInitialized() const { return fIsInitialized; }
0219 
0220    /// Return the RNTupleDescriptor being constructed.
0221    virtual const RNTupleDescriptor &GetDescriptor() const = 0;
0222 
0223    /// Physically creates the storage container to hold the ntuple (e.g., a keys a TFile or an S3 bucket)
0224    /// Init() associates column handles to the columns referenced by the model
0225    void Init(RNTupleModel &model)
0226    {
0227       if (fIsInitialized) {
0228          throw RException(R__FAIL("already initialized"));
0229       }
0230       fIsInitialized = true;
0231       InitImpl(model);
0232    }
0233 
0234 protected:
0235    virtual void InitImpl(RNTupleModel &model) = 0;
0236 
0237 public:
0238    /// Incorporate incremental changes to the model into the ntuple descriptor. This happens, e.g. if new fields were
0239    /// added after the initial call to `RPageSink::Init(RNTupleModel &)`.
0240    /// `firstEntry` specifies the global index for the first stored element in the added columns.
0241    virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) = 0;
0242 
0243    /// Write a page to the storage. The column must have been added before.
0244    virtual void CommitPage(ColumnHandle_t columnHandle, const RPage &page) = 0;
0245    /// Write a preprocessed page to storage. The column must have been added before.
0246    virtual void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
0247    /// Write a vector of preprocessed pages to storage. The corresponding columns must have been added before.
0248    virtual void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) = 0;
0249    /// Finalize the current cluster and create a new one for the following data.
0250    /// Returns the number of bytes written to storage (excluding meta-data).
0251    virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries) = 0;
0252    /// Write out the page locations (page list envelope) for all the committed clusters since the last call of
0253    /// CommitClusterGroup (or the beginning of writing).
0254    virtual void CommitClusterGroup() = 0;
0255    /// Finalize the current cluster and the entrire data set.
0256    virtual void CommitDataset() = 0;
0257 
0258    /// Get a new, empty page for the given column that can be filled with up to nElements.  If nElements is zero,
0259    /// the page sink picks an appropriate size.
0260    virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) = 0;
0261 
0262    /// An RAII wrapper used to synchronize a page sink. See GetSinkGuard().
0263    class RSinkGuard {
0264       std::mutex *fLock;
0265 
0266    public:
0267       explicit RSinkGuard(std::mutex *lock) : fLock(lock)
0268       {
0269          if (fLock != nullptr) {
0270             fLock->lock();
0271          }
0272       }
0273       RSinkGuard(const RSinkGuard &) = delete;
0274       RSinkGuard &operator=(const RSinkGuard &) = delete;
0275       RSinkGuard(RSinkGuard &&) = delete;
0276       RSinkGuard &operator=(RSinkGuard &&) = delete;
0277       ~RSinkGuard()
0278       {
0279          if (fLock != nullptr) {
0280             fLock->unlock();
0281          }
0282       }
0283    };
0284 
0285    virtual RSinkGuard GetSinkGuard()
0286    {
0287       // By default, there is no lock and the guard does nothing.
0288       return RSinkGuard(nullptr);
0289    }
0290 }; // class RPageSink
0291 
0292 // clang-format off
0293 /**
0294 \class ROOT::Experimental::Internal::RPagePersistentSink
0295 \ingroup NTuple
0296 \brief Base class for a sink with a physical storage backend
0297 */
0298 // clang-format on
0299 class RPagePersistentSink : public RPageSink {
0300 private:
0301    /// Used to map the IDs of the descriptor to the physical IDs issued during header/footer serialization
0302    RNTupleSerializer::RContext fSerializationContext;
0303 
0304    /// Remembers the starting cluster id for the next cluster group
0305    std::uint64_t fNextClusterInGroup = 0;
0306    /// Used to calculate the number of entries in the current cluster
0307    NTupleSize_t fPrevClusterNEntries = 0;
0308    /// Keeps track of the number of elements in the currently open cluster. Indexed by column id.
0309    std::vector<RClusterDescriptor::RColumnRange> fOpenColumnRanges;
0310    /// Keeps track of the written pages in the currently open cluster. Indexed by column id.
0311    std::vector<RClusterDescriptor::RPageRange> fOpenPageRanges;
0312 
0313 protected:
0314    Internal::RNTupleDescriptorBuilder fDescriptorBuilder;
0315 
0316    /// Default I/O performance counters that get registered in fMetrics
0317    struct RCounters {
0318       Detail::RNTupleAtomicCounter &fNPageCommitted;
0319       Detail::RNTupleAtomicCounter &fSzWritePayload;
0320       Detail::RNTupleAtomicCounter &fSzZip;
0321       Detail::RNTupleAtomicCounter &fTimeWallWrite;
0322       Detail::RNTupleAtomicCounter &fTimeWallZip;
0323       Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> &fTimeCpuWrite;
0324       Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> &fTimeCpuZip;
0325    };
0326    std::unique_ptr<RCounters> fCounters;
0327 
0328    virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length) = 0;
0329 
0330    virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) = 0;
0331    virtual RNTupleLocator
0332    CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
0333    /// Vector commit of preprocessed pages. The `ranges` array specifies a range of sealed pages to be
0334    /// committed for each column.  The returned vector contains, in order, the RNTupleLocator for each
0335    /// page on each range in `ranges`, i.e. the first N entries refer to the N pages in `ranges[0]`,
0336    /// followed by M entries that refer to the M pages in `ranges[1]`, etc.
0337    /// The default is to call `CommitSealedPageImpl` for each page; derived classes may provide an
0338    /// optimized implementation though.
0339    virtual std::vector<RNTupleLocator> CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges);
0340    /// Returns the number of bytes written to storage (excluding metadata)
0341    virtual std::uint64_t CommitClusterImpl() = 0;
0342    /// Returns the locator of the page list envelope of the given buffer that contains the serialized page list.
0343    /// Typically, the implementation takes care of compressing and writing the provided buffer.
0344    virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) = 0;
0345    virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) = 0;
0346 
0347    /// Enables the default set of metrics provided by RPageSink. `prefix` will be used as the prefix for
0348    /// the counters registered in the internal RNTupleMetrics object.
0349    /// This set of counters can be extended by a subclass by calling `fMetrics.MakeCounter<...>()`.
0350    ///
0351    /// A subclass using the default set of metrics is always responsible for updating the counters
0352    /// appropriately, e.g. `fCounters->fNPageCommited.Inc()`
0353    void EnableDefaultMetrics(const std::string &prefix);
0354 
0355 public:
0356    RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options);
0357 
0358    RPagePersistentSink(const RPagePersistentSink &) = delete;
0359    RPagePersistentSink &operator=(const RPagePersistentSink &) = delete;
0360    RPagePersistentSink(RPagePersistentSink &&) = default;
0361    RPagePersistentSink &operator=(RPagePersistentSink &&) = default;
0362    ~RPagePersistentSink() override;
0363 
0364    /// Guess the concrete derived page source from the location
0365    static std::unique_ptr<RPageSink> Create(std::string_view ntupleName, std::string_view location,
0366                                             const RNTupleWriteOptions &options = RNTupleWriteOptions());
0367 
0368    ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final;
0369 
0370    const RNTupleDescriptor &GetDescriptor() const final { return fDescriptorBuilder.GetDescriptor(); }
0371 
0372    /// Updates the descriptor and calls InitImpl() that handles the backend-specific details (file, DAOS, etc.)
0373    void InitImpl(RNTupleModel &model) final;
0374    void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final;
0375 
0376    /// Initialize sink based on an existing descriptor and fill into the descriptor builder.
0377    void InitFromDescriptor(const RNTupleDescriptor &descriptor);
0378 
0379    void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
0380    void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
0381    void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0382    std::uint64_t CommitCluster(NTupleSize_t nEntries) final;
0383    void CommitClusterGroup() final;
0384    void CommitDataset() final;
0385 }; // class RPagePersistentSink
0386 
0387 // clang-format off
0388 /**
0389 \class ROOT::Experimental::Internal::RPageSource
0390 \ingroup NTuple
0391 \brief Abstract interface to read data from an ntuple
0392 
0393 The page source is initialized with the columns of interest. Alias columns from projected fields are mapped to the
0394 corresponding physical columns. Pages from the columns of interest can then be mapped into memory.
0395 The page source also gives access to the ntuple's meta-data.
0396 */
0397 // clang-format on
0398 class RPageSource : public RPageStorage {
0399 public:
0400    /// Used in SetEntryRange / GetEntryRange
0401    struct REntryRange {
0402       NTupleSize_t fFirstEntry = kInvalidNTupleIndex;
0403       NTupleSize_t fNEntries = 0;
0404 
0405       /// Returns true if the given cluster has entries within the entry range
0406       bool IntersectsWith(const RClusterDescriptor &clusterDesc) const;
0407    };
0408 
0409    /// An RAII wrapper used for the read-only access to RPageSource::fDescriptor. See GetExclDescriptorGuard().
0410    class RSharedDescriptorGuard {
0411       const RNTupleDescriptor &fDescriptor;
0412       std::shared_mutex &fLock;
0413 
0414    public:
0415       RSharedDescriptorGuard(const RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
0416       {
0417          fLock.lock_shared();
0418       }
0419       RSharedDescriptorGuard(const RSharedDescriptorGuard &) = delete;
0420       RSharedDescriptorGuard &operator=(const RSharedDescriptorGuard &) = delete;
0421       RSharedDescriptorGuard(RSharedDescriptorGuard &&) = delete;
0422       RSharedDescriptorGuard &operator=(RSharedDescriptorGuard &&) = delete;
0423       ~RSharedDescriptorGuard() { fLock.unlock_shared(); }
0424       const RNTupleDescriptor *operator->() const { return &fDescriptor; }
0425       const RNTupleDescriptor &GetRef() const { return fDescriptor; }
0426    };
0427 
0428    /// An RAII wrapper used for the writable access to RPageSource::fDescriptor. See GetSharedDescriptorGuard().
0429    class RExclDescriptorGuard {
0430       RNTupleDescriptor &fDescriptor;
0431       std::shared_mutex &fLock;
0432 
0433    public:
0434       RExclDescriptorGuard(RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
0435       {
0436          fLock.lock();
0437       }
0438       RExclDescriptorGuard(const RExclDescriptorGuard &) = delete;
0439       RExclDescriptorGuard &operator=(const RExclDescriptorGuard &) = delete;
0440       RExclDescriptorGuard(RExclDescriptorGuard &&) = delete;
0441       RExclDescriptorGuard &operator=(RExclDescriptorGuard &&) = delete;
0442       ~RExclDescriptorGuard()
0443       {
0444          fDescriptor.IncGeneration();
0445          fLock.unlock();
0446       }
0447       RNTupleDescriptor *operator->() const { return &fDescriptor; }
0448       void MoveIn(RNTupleDescriptor &&desc) { fDescriptor = std::move(desc); }
0449    };
0450 
0451 private:
0452    RNTupleDescriptor fDescriptor;
0453    mutable std::shared_mutex fDescriptorLock;
0454    REntryRange fEntryRange; ///< Used by the cluster pool to prevent reading beyond the given range
0455 
0456 protected:
0457    /// Default I/O performance counters that get registered in fMetrics
0458    struct RCounters {
0459       Detail::RNTupleAtomicCounter &fNReadV;
0460       Detail::RNTupleAtomicCounter &fNRead;
0461       Detail::RNTupleAtomicCounter &fSzReadPayload;
0462       Detail::RNTupleAtomicCounter &fSzReadOverhead;
0463       Detail::RNTupleAtomicCounter &fSzUnzip;
0464       Detail::RNTupleAtomicCounter &fNClusterLoaded;
0465       Detail::RNTupleAtomicCounter &fNPageLoaded;
0466       Detail::RNTupleAtomicCounter &fNPagePopulated;
0467       Detail::RNTupleAtomicCounter &fTimeWallRead;
0468       Detail::RNTupleAtomicCounter &fTimeWallUnzip;
0469       Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> &fTimeCpuRead;
0470       Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> &fTimeCpuUnzip;
0471       Detail::RNTupleCalcPerf &fBandwidthReadUncompressed;
0472       Detail::RNTupleCalcPerf &fBandwidthReadCompressed;
0473       Detail::RNTupleCalcPerf &fBandwidthUnzip;
0474       Detail::RNTupleCalcPerf &fFractionReadOverhead;
0475       Detail::RNTupleCalcPerf &fCompressionRatio;
0476    };
0477 
0478    /// Keeps track of the requested physical column IDs. When using alias columns (projected fields), physical
0479    /// columns may be requested multiple times.
0480    class RActivePhysicalColumns {
0481    private:
0482       std::vector<DescriptorId_t> fIDs;
0483       std::vector<std::size_t> fRefCounters;
0484 
0485    public:
0486       void Insert(DescriptorId_t physicalColumnID);
0487       void Erase(DescriptorId_t physicalColumnID);
0488       RCluster::ColumnSet_t ToColumnSet() const;
0489    };
0490 
0491    std::unique_ptr<RCounters> fCounters;
0492 
0493    RNTupleReadOptions fOptions;
0494    /// The active columns are implicitly defined by the model fields or views
0495    RActivePhysicalColumns fActivePhysicalColumns;
0496 
0497    /// Helper to unzip pages and header/footer; comprises a 16MB (kMAXZIPBUF) unzip buffer.
0498    /// Not all page sources need a decompressor (e.g. virtual ones for chains and friends don't), thus we
0499    /// leave it up to the derived class whether or not the decompressor gets constructed.
0500    std::unique_ptr<RNTupleDecompressor> fDecompressor;
0501 
0502    virtual RNTupleDescriptor AttachImpl() = 0;
0503    // Only called if a task scheduler is set. No-op be default.
0504    virtual void UnzipClusterImpl(RCluster * /* cluster */)
0505       { }
0506 
0507    /// Prepare a page range read for the column set in `clusterKey`.  Specifically, pages referencing the
0508    /// `kTypePageZero` locator are filled in `pageZeroMap`; otherwise, `perPageFunc` is called for each page. This is
0509    /// commonly used as part of `LoadClusters()` in derived classes.
0510    void PrepareLoadCluster(
0511       const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
0512       std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc);
0513 
0514    /// Enables the default set of metrics provided by RPageSource. `prefix` will be used as the prefix for
0515    /// the counters registered in the internal RNTupleMetrics object.
0516    /// A subclass using the default set of metrics is responsible for updating the counters
0517    /// appropriately, e.g. `fCounters->fNRead.Inc()`
0518    /// Alternatively, a subclass might provide its own RNTupleMetrics object by overriding the
0519    /// GetMetrics() member function.
0520    void EnableDefaultMetrics(const std::string &prefix);
0521 
0522    /// Note that the underlying lock is not recursive. See GetSharedDescriptorGuard() for further information.
0523    RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); }
0524 
0525 public:
0526    RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions);
0527    RPageSource(const RPageSource&) = delete;
0528    RPageSource& operator=(const RPageSource&) = delete;
0529    RPageSource(RPageSource &&) = delete;
0530    RPageSource &operator=(RPageSource &&) = delete;
0531    ~RPageSource() override;
0532    /// Guess the concrete derived page source from the file name (location)
0533    static std::unique_ptr<RPageSource> Create(std::string_view ntupleName, std::string_view location,
0534                                               const RNTupleReadOptions &options = RNTupleReadOptions());
0535    /// Open the same storage multiple time, e.g. for reading in multiple threads
0536    virtual std::unique_ptr<RPageSource> Clone() const = 0;
0537 
0538    EPageStorageType GetType() final { return EPageStorageType::kSource; }
0539    const RNTupleReadOptions &GetReadOptions() const { return fOptions; }
0540 
0541    /// Takes the read lock for the descriptor. Multiple threads can take the lock concurrently.
0542    /// The underlying std::shared_mutex, however, is neither read nor write recursive:
0543    /// within one thread, only one lock (shared or exclusive) must be acquired at the same time. This requires special
0544    /// care in sections protected by GetSharedDescriptorGuard() and GetExclDescriptorGuard() especially to avoid that
0545    /// the locks are acquired indirectly (e.g. by a call to GetNEntries()).
0546    /// As a general guideline, no other method of the page source should be called (directly or indirectly) in a
0547    /// guarded section.
0548    const RSharedDescriptorGuard GetSharedDescriptorGuard() const
0549    {
0550       return RSharedDescriptorGuard(fDescriptor, fDescriptorLock);
0551    }
0552 
0553    ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override;
0554    void DropColumn(ColumnHandle_t columnHandle) override;
0555 
0556    /// Open the physical storage container for the tree
0557    void Attach() { GetExclDescriptorGuard().MoveIn(AttachImpl()); }
0558    NTupleSize_t GetNEntries();
0559    NTupleSize_t GetNElements(ColumnHandle_t columnHandle);
0560    ColumnId_t GetColumnId(ColumnHandle_t columnHandle);
0561 
0562    /// Promise to only read from the given entry range. If set, prevents the cluster pool from reading-ahead beyond
0563    /// the given range. The range needs to be within [0, GetNEntries()).
0564    void SetEntryRange(const REntryRange &range);
0565    REntryRange GetEntryRange() const { return fEntryRange; }
0566 
0567    /// Allocates and fills a page that contains the index-th element
0568    virtual RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) = 0;
0569    /// Another version of PopulatePage that allows to specify cluster-relative indexes
0570    virtual RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) = 0;
0571 
0572    /// Read the packed and compressed bytes of a page into the memory buffer provided by selaedPage. The sealed page
0573    /// can be used subsequently in a call to RPageSink::CommitSealedPage.
0574    /// The fSize and fNElements member of the sealedPage parameters are always set. If sealedPage.fBuffer is nullptr,
0575    /// no data will be copied but the returned size information can be used by the caller to allocate a large enough
0576    /// buffer and call LoadSealedPage again.
0577    virtual void
0578    LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) = 0;
0579 
0580    /// Helper for unstreaming a page. This is commonly used in derived, concrete page sources.  The implementation
0581    /// currently always makes a memory copy, even if the sealed page is uncompressed and in the final memory layout.
0582    /// The optimization of directly mapping pages is left to the concrete page source implementations.
0583    /// Usage of this method requires construction of fDecompressor. Memory is allocated via
0584    /// `RPageAllocatorHeap`; use `RPageAllocatorHeap::DeletePage()` to deallocate returned pages.
0585    RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId);
0586 
0587    /// Populates all the pages of the given cluster ids and columns; it is possible that some columns do not
0588    /// contain any pages.  The page source may load more columns than the minimal necessary set from `columns`.
0589    /// To indicate which columns have been loaded, LoadClusters() must mark them with SetColumnAvailable().
0590    /// That includes the ones from the `columns` that don't have pages; otherwise subsequent requests
0591    /// for the cluster would assume an incomplete cluster and trigger loading again.
0592    /// LoadClusters() is typically called from the I/O thread of a cluster pool, i.e. the method runs
0593    /// concurrently to other methods of the page source.
0594    virtual std::vector<std::unique_ptr<RCluster>> LoadClusters(std::span<RCluster::RKey> clusterKeys) = 0;
0595 
0596    /// Parallel decompression and unpacking of the pages in the given cluster. The unzipped pages are supposed
0597    /// to be preloaded in a page pool attached to the source. The method is triggered by the cluster pool's
0598    /// unzip thread. It is an optional optimization, the method can safely do nothing. In particular, the
0599    /// actual implementation will only run if a task scheduler is set. In practice, a task scheduler is set
0600    /// if implicit multi-threading is turned on.
0601    void UnzipCluster(RCluster *cluster);
0602 }; // class RPageSource
0603 
0604 } // namespace Internal
0605 
0606 } // namespace Experimental
0607 } // namespace ROOT
0608 
0609 #endif