File indexing completed on 2025-09-16 09:08:35
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #ifndef ROOT_RPageSinkBuf
0017 #define ROOT_RPageSinkBuf
0018
0019 #include <ROOT/RNTupleMetrics.hxx>
0020 #include <ROOT/RPageStorage.hxx>
0021
0022 #include <deque>
0023 #include <functional>
0024 #include <iterator>
0025 #include <memory>
0026 #include <tuple>
0027
0028 namespace ROOT {
0029 namespace Internal {
0030
0031
0032
0033
0034
0035
0036
0037
0038 class RPageSinkBuf : public RPageSink {
0039 private:
0040
0041
0042 class RColumnBuf {
0043 public:
0044 struct RPageZipItem {
0045 RPage fPage;
0046
0047 std::unique_ptr<unsigned char[]> fBuf;
0048 RPageStorage::RSealedPage *fSealedPage = nullptr;
0049 bool IsSealed() const { return fSealedPage != nullptr; }
0050 };
0051 public:
0052 RColumnBuf() = default;
0053 RColumnBuf(const RColumnBuf&) = delete;
0054 RColumnBuf& operator=(const RColumnBuf&) = delete;
0055 RColumnBuf(RColumnBuf&&) = default;
0056 RColumnBuf& operator=(RColumnBuf&&) = default;
0057 ~RColumnBuf() { DropBufferedPages(); }
0058
0059
0060
0061 RPageZipItem &BufferPage(RPageStorage::ColumnHandle_t columnHandle)
0062 {
0063 if (!fCol) {
0064 fCol = columnHandle;
0065 }
0066
0067
0068 return fBufferedPages.emplace_back();
0069 }
0070 const RPageStorage::ColumnHandle_t &GetHandle() const { return fCol; }
0071 bool IsEmpty() const { return fBufferedPages.empty(); }
0072 bool HasSealedPagesOnly() const { return fBufferedPages.size() == fSealedPages.size(); }
0073 const RPageStorage::SealedPageSequence_t &GetSealedPages() const { return fSealedPages; }
0074
0075 void DropBufferedPages();
0076
0077
0078
0079 RSealedPage &RegisterSealedPage()
0080 {
0081 return fSealedPages.emplace_back();
0082 }
0083
0084 private:
0085 RPageStorage::ColumnHandle_t fCol;
0086
0087
0088 std::deque<RPageZipItem> fBufferedPages;
0089
0090
0091
0092
0093 RPageStorage::SealedPageSequence_t fSealedPages;
0094 };
0095
0096 private:
0097
0098 struct RCounters {
0099 ROOT::Experimental::Detail::RNTuplePlainCounter &fParallelZip;
0100 ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallZip;
0101 ROOT::Experimental::Detail::RNTuplePlainCounter &fTimeWallCriticalSection;
0102 ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuZip;
0103 ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTuplePlainCounter>
0104 &fTimeCpuCriticalSection;
0105 };
0106 std::unique_ptr<RCounters> fCounters;
0107
0108 std::unique_ptr<RPageSink> fInnerSink;
0109
0110
0111 std::unique_ptr<ROOT::RNTupleModel> fInnerModel;
0112
0113 std::vector<RColumnBuf> fBufferedColumns;
0114
0115 std::vector<ColumnHandle_t> fSuppressedColumns;
0116 ROOT::DescriptorId_t fNFields = 0;
0117 ROOT::DescriptorId_t fNColumns = 0;
0118
0119 void ConnectFields(const std::vector<ROOT::RFieldBase *> &fields, ROOT::NTupleSize_t firstEntry);
0120 void FlushClusterImpl(std::function<void(void)> FlushClusterFn);
0121
0122 public:
0123 explicit RPageSinkBuf(std::unique_ptr<RPageSink> inner);
0124 RPageSinkBuf(const RPageSinkBuf&) = delete;
0125 RPageSinkBuf& operator=(const RPageSinkBuf&) = delete;
0126 RPageSinkBuf(RPageSinkBuf&&) = default;
0127 RPageSinkBuf& operator=(RPageSinkBuf&&) = default;
0128 ~RPageSinkBuf() override;
0129
0130 ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, RColumn &column) final;
0131
0132 const ROOT::RNTupleDescriptor &GetDescriptor() const final;
0133
0134 ROOT::NTupleSize_t GetNEntries() const final { return fInnerSink->GetNEntries(); }
0135
0136 void InitImpl(ROOT::RNTupleModel &model) final;
0137 void UpdateSchema(const RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final;
0138 void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final;
0139
0140 void CommitSuppressedColumn(ColumnHandle_t columnHandle) final;
0141 void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
0142 void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RSealedPage &sealedPage) final;
0143 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0144 std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries) final;
0145 RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final;
0146 void CommitStagedClusters(std::span<RStagedCluster> clusters) final;
0147 void CommitClusterGroup() final;
0148 void CommitDatasetImpl() final;
0149
0150 RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;
0151 };
0152
0153 }
0154 }
0155
0156 #endif