Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:10:47

0001 /// \file ROOT/RPageSinkBuf.hxx
0002 /// \ingroup NTuple ROOT7
0003 /// \author Jakob Blomer <jblomer@cern.ch>
0004 /// \author Max Orok <maxwellorok@gmail.com>
0005 /// \author Javier Lopez-Gomez <javier.lopez.gomez@cern.ch>
0006 /// \date 2021-03-17
0007 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
0008 /// is welcome!
0009 
0010 /*************************************************************************
0011  * Copyright (C) 1995-2021, Rene Brun and Fons Rademakers.               *
0012  * All rights reserved.                                                  *
0013  *                                                                       *
0014  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0015  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0016  *************************************************************************/
0017 
0018 #ifndef ROOT7_RPageSinkBuf
0019 #define ROOT7_RPageSinkBuf
0020 
0021 #include <ROOT/RNTupleMetrics.hxx>
0022 #include <ROOT/RPageStorage.hxx>
0023 
0024 #include <deque>
0025 #include <iterator>
0026 #include <memory>
0027 #include <tuple>
0028 
0029 namespace ROOT {
0030 namespace Experimental {
0031 namespace Internal {
0032 
0033 // clang-format off
0034 /**
0035 \class ROOT::Experimental::Internal::RPageSinkBuf
0036 \ingroup NTuple
0037 \brief Wrapper sink that coalesces cluster column page writes
0038 *
0039 * TODO(jblomer): The interplay of derived class and RPageSink is not yet optimally designed for page storage wrapper
0040 * classes like this one. Header and footer serialization, e.g., are done twice.  To be revised.
0041 */
0042 // clang-format on
0043 class RPageSinkBuf : public RPageSink {
0044 private:
0045    /// A buffered column. The column is not responsible for RPage memory management (i.e.
0046    /// ReservePage/ReleasePage), which is handled by the enclosing RPageSinkBuf.
0047    class RColumnBuf {
0048    public:
0049       struct RPageZipItem {
0050          RPage fPage;
0051          // Compression scratch buffer for fSealedPage.
0052          std::unique_ptr<unsigned char[]> fBuf;
0053          RPageStorage::RSealedPage *fSealedPage = nullptr;
0054          bool IsSealed() const { return fSealedPage != nullptr; }
0055          void AllocateSealedPageBuf(std::size_t nBytes)
0056          {
0057             fBuf = std::unique_ptr<unsigned char[]>(new unsigned char[nBytes]);
0058          }
0059       };
0060    public:
0061       RColumnBuf() = default;
0062       RColumnBuf(const RColumnBuf&) = delete;
0063       RColumnBuf& operator=(const RColumnBuf&) = delete;
0064       RColumnBuf(RColumnBuf&&) = default;
0065       RColumnBuf& operator=(RColumnBuf&&) = default;
0066       ~RColumnBuf() { DropBufferedPages(); }
0067 
0068       /// Returns a reference to the newly buffered page. The reference remains
0069       /// valid until the return value of DrainBufferedPages() is destroyed.
0070       RPageZipItem &BufferPage(RPageStorage::ColumnHandle_t columnHandle)
0071       {
0072          if (!fCol) {
0073             fCol = columnHandle;
0074          }
0075          // Safety: Insertion at the end of a deque never invalidates references
0076          // to existing elements.
0077          return fBufferedPages.emplace_back();
0078       }
0079       const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
0080       bool IsEmpty() const { return fBufferedPages.empty(); }
0081       bool HasSealedPagesOnly() const { return fBufferedPages.size() == fSealedPages.size(); }
0082       const RPageStorage::SealedPageSequence_t &GetSealedPages() const { return fSealedPages; }
0083 
0084       using BufferedPages_t = std::tuple<std::deque<RPageZipItem>, RPageStorage::SealedPageSequence_t>;
0085       /// When the return value of DrainBufferedPages() is destroyed, all references
0086       /// returned by GetBuffer are invalidated.
0087       /// This function gives up on the ownership of the buffered pages.  Thus, `ReleasePage()` must be called
0088       /// accordingly.
0089       BufferedPages_t DrainBufferedPages()
0090       {
0091          BufferedPages_t drained;
0092          std::swap(fBufferedPages, std::get<decltype(fBufferedPages)>(drained));
0093          std::swap(fSealedPages, std::get<decltype(fSealedPages)>(drained));
0094          return drained;
0095       }
0096       void DropBufferedPages();
0097 
0098       // The returned reference points to a default-constructed RSealedPage. It can be used
0099       // to fill in data after sealing.
0100       RSealedPage &RegisterSealedPage()
0101       {
0102          return fSealedPages.emplace_back();
0103       }
0104 
0105    private:
0106       RPageStorage::ColumnHandle_t fCol;
0107       /// Using a deque guarantees that element iterators are never invalidated
0108       /// by appends to the end of the iterator by BufferPage.
0109       std::deque<RPageZipItem> fBufferedPages;
0110       /// Pages that have been already sealed by a concurrent task. A vector commit can be issued if all
0111       /// buffered pages have been sealed.
0112       /// Note that each RSealedPage refers to the same buffer as `fBufferedPages[i].fBuf` for some value of `i`, and
0113       /// thus owned by RPageZipItem
0114       RPageStorage::SealedPageSequence_t fSealedPages;
0115    };
0116 
0117 private:
0118    /// I/O performance counters that get registered in fMetrics
0119    struct RCounters {
0120       Detail::RNTuplePlainCounter &fParallelZip;
0121       Detail::RNTuplePlainCounter &fTimeWallCriticalSection;
0122       Detail::RNTupleTickCounter<Detail::RNTuplePlainCounter> &fTimeCpuCriticalSection;
0123    };
0124    std::unique_ptr<RCounters> fCounters;
0125    /// The inner sink, responsible for actually performing I/O.
0126    std::unique_ptr<RPageSink> fInnerSink;
0127    /// The buffered page sink maintains a copy of the RNTupleModel for the inner sink.
0128    /// For the unbuffered case, the RNTupleModel is instead managed by a RNTupleWriter.
0129    std::unique_ptr<RNTupleModel> fInnerModel;
0130    /// Vector of buffered column pages. Indexed by column id.
0131    std::vector<RColumnBuf> fBufferedColumns;
0132    DescriptorId_t fNFields = 0;
0133    DescriptorId_t fNColumns = 0;
0134 
0135    void ConnectFields(const std::vector<RFieldBase *> &fields, NTupleSize_t firstEntry);
0136 
0137 public:
0138    explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
0139    RPageSinkBuf(const RPageSinkBuf&) = delete;
0140    RPageSinkBuf& operator=(const RPageSinkBuf&) = delete;
0141    RPageSinkBuf(RPageSinkBuf&&) = default;
0142    RPageSinkBuf& operator=(RPageSinkBuf&&) = default;
0143    ~RPageSinkBuf() override;
0144 
0145    ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final;
0146 
0147    const RNTupleDescriptor &GetDescriptor() const final;
0148 
0149    void InitImpl(RNTupleModel &model) final;
0150    void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final;
0151 
0152    void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
0153    void CommitSealedPage(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final;
0154    void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0155    std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final;
0156    void CommitClusterGroup() final;
0157    void CommitDataset() final;
0158 
0159    RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;
0160    void ReleasePage(RPage &page) final;
0161 }; // RPageSinkBuf
0162 
0163 } // namespace Internal
0164 } // namespace Experimental
0165 } // namespace ROOT
0166 
0167 #endif