Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-09-16 09:08:35

0001 /// \file ROOT/RPageSinkBuf.hxx
0002 /// \ingroup NTuple
0003 /// \author Jakob Blomer <jblomer@cern.ch>
0004 /// \author Max Orok <maxwellorok@gmail.com>
0005 /// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
0006 /// \date 2021-03-17
0007 
0008 /*************************************************************************
0009  * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers.               *
0010  * All rights reserved.                                                  *
0011  *                                                                       *
0012  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0013  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0014  *************************************************************************/
0015 
0016 #ifndef ROOT_RPageSinkBuf
0017 #define ROOT_RPageSinkBuf
0018 
0019 #include <ROOT/RNTupleMetrics.hxx>
0020 #include <ROOT/RPageStorage.hxx>
0021 
0022 #include <deque>
0023 #include <functional>
0024 #include <iterator>
0025 #include <memory>
0026 #include <tuple>
0027 
0028 namespace ROOT {
0029 namespace Internal {
0030 
0031 // clang-format off
0032 /**
0033 \class ROOT::Internal::RPageSinkBuf
0034 \ingroup NTuple
0035 \brief Wrapper sink that coalesces cluster column page writes
0036 */
0037 // clang-format on
0038 class RPageSinkBuf : public RPageSink {
0039 private:
0040    /// A buffered column. The column is not responsible for RPage memory management (i.e. ReservePage),
0041    /// which is handled by the enclosing RPageSinkBuf.
0042    class RColumnBuf {
0043    public:
0044       struct RPageZipItem {
0045          RPage fPage;
0046          // Compression scratch buffer for fSealedPage.
0047          std::unique_ptr<unsigned char[]> fBuf;
0048          RPageStorage::RSealedPage *fSealedPage = nullptr;
0049          bool IsSealed() const { return fSealedPage != nullptr; }
0050       };
0051    public:
0052       RColumnBuf() = default;
0053       RColumnBuf(const RColumnBuf&) = delete;
0054       RColumnBuf& operator=(const RColumnBuf&) = delete;
0055       RColumnBuf(RColumnBuf&&) = default;
0056       RColumnBuf& operator=(RColumnBuf&&) = default;
0057       ~RColumnBuf() { DropBufferedPages(); }
0058 
0059       /// Returns a reference to the newly buffered page. The reference remains
0060       /// valid until DropBufferedPages().
0061       RPageZipItem &BufferPage(RPageStorage::ColumnHandle_t columnHandle)
0062       {
0063          if (!fCol) {
0064             fCol = columnHandle;
0065          }
0066          // Safety: Insertion at the end of a deque never invalidates references
0067          // to existing elements.
0068          return fBufferedPages.emplace_back();
0069       }
0070       const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
0071       bool IsEmpty() const { return fBufferedPages.empty(); }
0072       bool HasSealedPagesOnly() const { return fBufferedPages.size() == fSealedPages.size(); }
0073       const RPageStorage::SealedPageSequence_t &GetSealedPages() const { return fSealedPages; }
0074 
0075       void DropBufferedPages();
0076 
0077       // The returned reference points to a default-constructed RSealedPage. It can be used
0078       // to fill in data after sealing.
0079       RSealedPage &RegisterSealedPage()
0080       {
0081          return fSealedPages.emplace_back();
0082       }
0083 
0084    private:
0085       RPageStorage::ColumnHandle_t fCol;
0086       /// Using a deque guarantees that element iterators are never invalidated
0087       /// by appends to the end of the iterator by BufferPage.
0088       std::deque<RPageZipItem> fBufferedPages;
0089       /// Pages that have been already sealed by a concurrent task. A vector commit can be issued if all
0090       /// buffered pages have been sealed.
0091       /// Note that each RSealedPage refers to the same buffer as `fBufferedPages[i].fBuf` for some value of `i`, and
0092       /// thus owned by RPageZipItem
0093       RPageStorage::SealedPageSequence_t fSealedPages;
0094    };
0095 
0096 private:
0097    /// I/O performance counters that get registered in fMetrics
0098    struct RCounters {
0099       ROOT::Experimental::Detail::RNTuplePlainCounter &fParallelZip;
0100       ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallZip;
0101       ROOT::Experimental::Detail::RNTuplePlainCounter &fTimeWallCriticalSection;
0102       ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuZip;
0103       ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTuplePlainCounter>
0104          &fTimeCpuCriticalSection;
0105    };
0106    std::unique_ptr<RCounters> fCounters;
0107    /// The inner sink, responsible for actually performing I/O.
0108    std::unique_ptr<RPageSink> fInnerSink;
0109    /// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
0110    /// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
0111    std::unique_ptr<ROOT::RNTupleModel> fInnerModel;
0112    /// Vector of buffered column pages. Indexed by column id.
0113    std::vector<RColumnBuf> fBufferedColumns;
0114    /// Columns committed as suppressed are stored and passed to the inner sink at cluster commit
0115    std::vector<ColumnHandle_t> fSuppressedColumns;
0116    ROOT::DescriptorId_t fNFields = 0;
0117    ROOT::DescriptorId_t fNColumns = 0;
0118 
0119    void ConnectFields(const std::vector<ROOT::RFieldBase *> &fields, ROOT::NTupleSize_t firstEntry);
0120    void FlushClusterImpl(std::function<void(void)> FlushClusterFn);
0121 
0122 public:
0123    explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
0124    RPageSinkBuf(const RPageSinkBuf&) = delete;
0125    RPageSinkBuf& operator=(const RPageSinkBuf&) = delete;
0126    RPageSinkBuf(RPageSinkBuf&&) = default;
0127    RPageSinkBuf& operator=(RPageSinkBuf&&) = default;
0128    ~RPageSinkBuf() override;
0129 
0130    ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final;
0131 
0132    const ROOT::RNTupleDescriptor &GetDescriptor() const final;
0133 
0134    ROOT::NTupleSize_t GetNEntries() const final { return fInnerSink->GetNEntries(); }
0135 
0136    void InitImpl(ROOT::RNTupleModel &model) final;
0137    void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final;
0138    void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final;
0139 
0140    void CommitSuppressedColumn(ColumnHandle_t columnHandle) final;
0141    void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
0142    void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final;
0143    void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0144    std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final;
0145    RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final;
0146    void CommitStagedClusters(std::span<RStagedCluster> clusters) final;
0147    void CommitClusterGroup() final;
0148    void CommitDatasetImpl() final;
0149 
0150    RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;
0151 }; // RPageSinkBuf
0152 
0153 } // namespace Internal
0154 } // namespace ROOT
0155 
0156 #endif