File indexing completed on 2025-09-17 09:14:30
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #ifndef ROOT_RNTupleFillContext
0017 #define ROOT_RNTupleFillContext
0018
0019 #include <ROOT/RConfig.hxx> // for R__unlikely
0020 #include <ROOT/REntry.hxx>
0021 #include <ROOT/RError.hxx>
0022 #include <ROOT/RPageStorage.hxx>
0023 #include <ROOT/RRawPtrWriteEntry.hxx>
0024 #include <ROOT/RNTupleFillStatus.hxx>
0025 #include <ROOT/RNTupleMetrics.hxx>
0026 #include <ROOT/RNTupleModel.hxx>
0027 #include <ROOT/RNTupleUtil.hxx>
0028
0029 #include <cstddef>
0030 #include <cstdint>
0031 #include <memory>
0032 #include <vector>
0033
0034 namespace ROOT {
0035 namespace Experimental {
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052 class RNTupleFillContext {
0053 friend class ROOT::RNTupleWriter;
0054 friend class RNTupleParallelWriter;
0055
0056 private:
0057 std::unique_ptr<ROOT::Internal::RPageSink> fSink;
0058
0059 std::unique_ptr<ROOT::RNTupleModel> fModel;
0060
0061 Detail::RNTupleMetrics fMetrics;
0062
0063 ROOT::NTupleSize_t fLastFlushed = 0;
0064 ROOT::NTupleSize_t fNEntries = 0;
0065
0066 std::size_t fUnzippedClusterSize = 0;
0067
0068 std::uint64_t fNBytesFlushed = 0;
0069
0070
0071 std::uint64_t fNBytesFilled = 0;
0072
0073 std::size_t fMaxUnzippedClusterSize;
0074
0075 std::size_t fUnzippedClusterSizeEst;
0076
0077
0078
0079 bool fStagedClusterCommitting = false;
0080
0081 std::vector<ROOT::Internal::RPageSink::RStagedCluster> fStagedClusters;
0082
0083 template <typename Entry>
0084 void FillNoFlushImpl(Entry &entry, ROOT::RNTupleFillStatus &status)
0085 {
0086 if (R__unlikely(entry.GetModelId() != fModel->GetModelId()))
0087 throw RException(R__FAIL("mismatch between entry and model"));
0088
0089 const std::size_t bytesWritten = entry.Append();
0090 fUnzippedClusterSize += bytesWritten;
0091 fNEntries++;
0092
0093 status.fNEntriesSinceLastFlush = fNEntries - fLastFlushed;
0094 status.fUnzippedClusterSize = fUnzippedClusterSize;
0095 status.fLastEntrySize = bytesWritten;
0096 status.fShouldFlushCluster =
0097 (fUnzippedClusterSize >= fMaxUnzippedClusterSize) || (fUnzippedClusterSize >= fUnzippedClusterSizeEst);
0098 }
0099 template <typename Entry>
0100 std::size_t FillImpl(Entry &entry)
0101 {
0102 ROOT::RNTupleFillStatus status;
0103 FillNoFlush(entry, status);
0104 if (status.ShouldFlushCluster())
0105 FlushCluster();
0106 return status.GetLastEntrySize();
0107 }
0108
0109 RNTupleFillContext(std::unique_ptr<ROOT::RNTupleModel> model, std::unique_ptr<ROOT::Internal::RPageSink> sink);
0110 RNTupleFillContext(const RNTupleFillContext &) = delete;
0111 RNTupleFillContext &operator=(const RNTupleFillContext &) = delete;
0112
0113 public:
0114 ~RNTupleFillContext();
0115
0116
0117
0118
0119
0120
0121 void FillNoFlush(ROOT::REntry &entry, ROOT::RNTupleFillStatus &status) { FillNoFlushImpl(entry, status); }
0122
0123
0124
0125 std::size_t Fill(ROOT::REntry &entry) { return FillImpl(entry); }
0126
0127
0128
0129
0130
0131
0132 void FillNoFlush(Detail::RRawPtrWriteEntry &entry, ROOT::RNTupleFillStatus &status)
0133 {
0134 FillNoFlushImpl(entry, status);
0135 }
0136
0137
0138
0139 std::size_t Fill(Detail::RRawPtrWriteEntry &entry) { return FillImpl(entry); }
0140
0141
0142
0143 void FlushColumns();
0144
0145 void FlushCluster();
0146
0147 void CommitStagedClusters();
0148
0149 const ROOT::RNTupleModel &GetModel() const { return *fModel; }
0150 std::unique_ptr<ROOT::REntry> CreateEntry() const { return fModel->CreateEntry(); }
0151 std::unique_ptr<Detail::RRawPtrWriteEntry> CreateRawPtrWriteEntry() const
0152 {
0153 return fModel->CreateRawPtrWriteEntry();
0154 }
0155
0156
0157 ROOT::NTupleSize_t GetLastFlushed() const { return fLastFlushed; }
0158
0159 ROOT::NTupleSize_t GetNEntries() const { return fNEntries; }
0160
0161 void EnableStagedClusterCommitting(bool val = true)
0162 {
0163 if (!val && !fStagedClusters.empty()) {
0164 throw RException(R__FAIL("cannot disable staged committing with pending clusters"));
0165 }
0166 fStagedClusterCommitting = val;
0167 }
0168 bool IsStagedClusterCommittingEnabled() const { return fStagedClusterCommitting; }
0169
0170 void EnableMetrics() { fMetrics.Enable(); }
0171 const Detail::RNTupleMetrics &GetMetrics() const { return fMetrics; }
0172 };
0173
0174 }
0175 }
0176
0177 #endif