File indexing completed on 2025-01-18 10:10:47
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 #ifndef ROOT7_RPageSinkBuf
0019 #define ROOT7_RPageSinkBuf
0020
0021 #include <ROOT/RNTupleMetrics.hxx>
0022 #include <ROOT/RPageStorage.hxx>
0023
0024 #include <deque>
0025 #include <iterator>
0026 #include <memory>
0027 #include <tuple>
0028
0029 namespace ROOT {
0030 namespace Experimental {
0031 namespace Internal {
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043 class RPageSinkBuf : public RPageSink {
0044 private:
0045
0046
0047 class RColumnBuf {
0048 public:
0049 struct RPageZipItem {
0050 RPage fPage;
0051
0052 std::unique_ptr<unsigned char[]> fBuf;
0053 RPageStorage::RSealedPage *fSealedPage = nullptr;
0054 bool IsSealed() const { return fSealedPage != nullptr; }
0055 void AllocateSealedPageBuf(std::size_t nBytes)
0056 {
0057 fBuf = std::unique_ptr<unsigned char[]>(new unsigned char[nBytes]);
0058 }
0059 };
0060 public:
0061 RColumnBuf() = default;
0062 RColumnBuf(const RColumnBuf&) = delete;
0063 RColumnBuf& operator=(const RColumnBuf&) = delete;
0064 RColumnBuf(RColumnBuf&&) = default;
0065 RColumnBuf& operator=(RColumnBuf&&) = default;
0066 ~RColumnBuf() { DropBufferedPages(); }
0067
0068
0069
0070 RPageZipItem &BufferPage(RPageStorage::ColumnHandle_t columnHandle)
0071 {
0072 if (!fCol) {
0073 fCol = columnHandle;
0074 }
0075
0076
0077 return fBufferedPages.emplace_back();
0078 }
0079 const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
0080 bool IsEmpty() const { return fBufferedPages.empty(); }
0081 bool HasSealedPagesOnly() const { return fBufferedPages.size() == fSealedPages.size(); }
0082 const RPageStorage::SealedPageSequence_t &GetSealedPages() const { return fSealedPages; }
0083
0084 using BufferedPages_t = std::tuple<std::deque<RPageZipItem>, RPageStorage::SealedPageSequence_t>;
0085
0086
0087
0088
0089 BufferedPages_t DrainBufferedPages()
0090 {
0091 BufferedPages_t drained;
0092 std::swap(fBufferedPages, std::get<decltype(fBufferedPages)>(drained));
0093 std::swap(fSealedPages, std::get<decltype(fSealedPages)>(drained));
0094 return drained;
0095 }
0096 void DropBufferedPages();
0097
0098
0099
0100 RSealedPage &RegisterSealedPage()
0101 {
0102 return fSealedPages.emplace_back();
0103 }
0104
0105 private:
0106 RPageStorage::ColumnHandle_t fCol;
0107
0108
0109 std::deque<RPageZipItem> fBufferedPages;
0110
0111
0112
0113
0114 RPageStorage::SealedPageSequence_t fSealedPages;
0115 };
0116
0117 private:
0118
0119 struct RCounters {
0120 Detail::RNTuplePlainCounter &fParallelZip;
0121 Detail::RNTuplePlainCounter &fTimeWallCriticalSection;
0122 Detail::RNTupleTickCounter<Detail::RNTuplePlainCounter> &fTimeCpuCriticalSection;
0123 };
0124 std::unique_ptr<RCounters> fCounters;
0125
0126 std::unique_ptr<RPageSink> fInnerSink;
0127
0128
0129 std::unique_ptr<RNTupleModel> fInnerModel;
0130
0131 std::vector<RColumnBuf> fBufferedColumns;
0132 DescriptorId_t fNFields = 0;
0133 DescriptorId_t fNColumns = 0;
0134
0135 void ConnectFields(const std::vector<RFieldBase *> &fields, NTupleSize_t firstEntry);
0136
0137 public:
0138 explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
0139 RPageSinkBuf(const RPageSinkBuf&) = delete;
0140 RPageSinkBuf& operator=(const RPageSinkBuf&) = delete;
0141 RPageSinkBuf(RPageSinkBuf&&) = default;
0142 RPageSinkBuf& operator=(RPageSinkBuf&&) = default;
0143 ~RPageSinkBuf() override;
0144
0145 ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final;
0146
0147 const RNTupleDescriptor &GetDescriptor() const final;
0148
0149 void InitImpl(RNTupleModel &model) final;
0150 void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final;
0151
0152 void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
0153 void CommitSealedPage(DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final;
0154 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0155 std::uint64_t CommitCluster(NTupleSize_t nNewEntries) final;
0156 void CommitClusterGroup() final;
0157 void CommitDataset() final;
0158
0159 RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;
0160 void ReleasePage(RPage &page) final;
0161 };
0162
0163 }
0164 }
0165 }
0166
0167 #endif