Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-12-16 10:30:00

0001 /// \file ROOT/RNTupleFillContext.hxx
0002 /// \ingroup NTuple
0003 /// \author Jakob Blomer <jblomer@cern.ch>
0004 /// \date 2024-02-22
0005 
0006 /*************************************************************************
0007  * Copyright (C) 1995-2024, Rene Brun and Fons Rademakers.               *
0008  * All rights reserved.                                                  *
0009  *                                                                       *
0010  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0011  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0012  *************************************************************************/
0013 
0014 #ifndef ROOT_RNTupleFillContext
0015 #define ROOT_RNTupleFillContext
0016 
0017 #include <ROOT/RConfig.hxx> // for R__unlikely
0018 #include <ROOT/REntry.hxx>
0019 #include <ROOT/RError.hxx>
0020 #include <ROOT/RPageStorage.hxx>
0021 #include <ROOT/RRawPtrWriteEntry.hxx>
0022 #include <ROOT/RNTupleFillStatus.hxx>
0023 #include <ROOT/RNTupleMetrics.hxx>
0024 #include <ROOT/RNTupleModel.hxx>
0025 #include <ROOT/RNTupleTypes.hxx>
0026 
0027 #include <cstddef>
0028 #include <cstdint>
0029 #include <memory>
0030 #include <vector>
0031 
0032 namespace ROOT {
0033 
0034 // clang-format off
0035 /**
0036 \class ROOT::RNTupleFillContext
0037 \ingroup NTuple
0038 \brief A context for filling entries (data) into clusters of an RNTuple
0039 
0040 An output cluster can be filled with entries. The caller has to make sure that the data that gets filled into a cluster
0041 is not modified for the time of the Fill() call. The fill call serializes the C++ object into the column format and
0042 writes data into the corresponding column page buffers.  Writing of the buffers to storage is deferred and can be
0043 triggered by FlushCluster() or by destructing the context.  On I/O errors, an exception is thrown.
0044 
0045 Instances of this class are not meant to be used in isolation and can be created from an RNTupleParallelWriter. For
0046 sequential writing, please refer to RNTupleWriter.
0047 */
0048 // clang-format on
0049 class RNTupleFillContext {
0050    friend class ROOT::RNTupleWriter;
0051    friend class RNTupleParallelWriter;
0052 
0053 private:
0054    /// The page sink's parallel page compression scheduler if IMT is on.
0055    /// Needs to be destructed after the page sink is destructed and so declared before.
0056    std::unique_ptr<ROOT::Internal::RPageStorage::RTaskScheduler> fZipTasks;
0057    std::unique_ptr<ROOT::Internal::RPageSink> fSink;
0058    /// Needs to be destructed before fSink
0059    std::unique_ptr<ROOT::RNTupleModel> fModel;
0060 
0061    Experimental::Detail::RNTupleMetrics fMetrics;
0062 
0063    ROOT::NTupleSize_t fLastFlushed = 0;
0064    ROOT::NTupleSize_t fNEntries = 0;
0065    /// Keeps track of the number of bytes written into the current cluster
0066    std::size_t fUnzippedClusterSize = 0;
0067    /// The total number of bytes written to storage (i.e., after compression)
0068    std::uint64_t fNBytesFlushed = 0;
0069    /// The total number of bytes filled into all the so far committed clusters,
0070    /// i.e. the uncompressed size of the written clusters
0071    std::uint64_t fNBytesFilled = 0;
0072    /// Limit for committing cluster no matter the other tunables
0073    std::size_t fMaxUnzippedClusterSize;
0074    /// Estimator of uncompressed cluster size, taking into account the estimated compression ratio
0075    std::size_t fUnzippedClusterSizeEst;
0076 
0077    /// Whether to enable staged cluster committing, where only an explicit call to CommitStagedClusters() will logically
0078    /// append the clusters to the RNTuple.
0079    bool fStagedClusterCommitting = false;
0080    /// Vector of currently staged clusters.
0081    std::vector<ROOT::Internal::RPageSink::RStagedCluster> fStagedClusters;
0082 
0083    template <typename Entry>
0084    void FillNoFlushImpl(Entry &entry, ROOT::RNTupleFillStatus &status)
0085    {
0086       if (R__unlikely(entry.GetModelId() != fModel->GetModelId()))
0087          throw RException(R__FAIL("mismatch between entry and model"));
0088 
0089       const std::size_t bytesWritten = entry.Append();
0090       fUnzippedClusterSize += bytesWritten;
0091       fNEntries++;
0092 
0093       status.fNEntriesSinceLastFlush = fNEntries - fLastFlushed;
0094       status.fUnzippedClusterSize = fUnzippedClusterSize;
0095       status.fLastEntrySize = bytesWritten;
0096       status.fShouldFlushCluster =
0097          (fUnzippedClusterSize >= fMaxUnzippedClusterSize) || (fUnzippedClusterSize >= fUnzippedClusterSizeEst);
0098    }
0099    template <typename Entry>
0100    std::size_t FillImpl(Entry &entry)
0101    {
0102       ROOT::RNTupleFillStatus status;
0103       FillNoFlush(entry, status);
0104       if (status.ShouldFlushCluster())
0105          FlushCluster();
0106       return status.GetLastEntrySize();
0107    }
0108 
0109    RNTupleFillContext(std::unique_ptr<ROOT::RNTupleModel> model, std::unique_ptr<ROOT::Internal::RPageSink> sink);
0110    RNTupleFillContext(const RNTupleFillContext &) = delete;
0111    RNTupleFillContext &operator=(const RNTupleFillContext &) = delete;
0112 
0113 public:
0114    ~RNTupleFillContext();
0115 
0116    /// Fill an entry into this context, but don't commit the cluster. The calling code must pass an RNTupleFillStatus
0117    /// and check RNTupleFillStatus::ShouldFlushCluster.
0118    ///
0119    /// This method will check the entry's model ID to ensure it comes from the context's own model or throw an exception
0120    /// otherwise.
0121    void FillNoFlush(ROOT::REntry &entry, ROOT::RNTupleFillStatus &status) { FillNoFlushImpl(entry, status); }
0122    /// Fill an entry into this context.  This method will check the entry's model ID to ensure it comes from the
0123    /// context's own model or throw an exception otherwise.
0124    /// \return The number of uncompressed bytes written.
0125    std::size_t Fill(ROOT::REntry &entry) { return FillImpl(entry); }
0126 
0127    /// Fill an RRawPtrWriteEntry into this context, but don't commit the cluster. The calling code must pass an
0128    /// RNTupleFillStatus and check RNTupleFillStatus::ShouldFlushCluster.
0129    ///
0130    /// This method will check the entry's model ID to ensure it comes from the context's own model or throw an exception
0131    /// otherwise.
0132    void FillNoFlush(Experimental::Detail::RRawPtrWriteEntry &entry, ROOT::RNTupleFillStatus &status)
0133    {
0134       FillNoFlushImpl(entry, status);
0135    }
0136    /// Fill an RRawPtrWriteEntry into this context.  This method will check the entry's model ID to ensure it comes
0137    /// from the context's own model or throw an exception otherwise.
0138    /// \return The number of uncompressed bytes written.
0139    std::size_t Fill(Experimental::Detail::RRawPtrWriteEntry &entry) { return FillImpl(entry); }
0140 
0141    /// Flush column data, preparing for CommitCluster or to reduce memory usage. This will trigger compression of pages,
0142    /// but not actually write to storage.
0143    void FlushColumns();
0144    /// Flush so far filled entries to storage
0145    void FlushCluster();
0146    /// Logically append staged clusters to the RNTuple.
0147    void CommitStagedClusters();
0148 
0149    const ROOT::RNTupleModel &GetModel() const { return *fModel; }
0150    std::unique_ptr<ROOT::REntry> CreateEntry() const { return fModel->CreateEntry(); }
0151    std::unique_ptr<Experimental::Detail::RRawPtrWriteEntry> CreateRawPtrWriteEntry() const
0152    {
0153       return fModel->CreateRawPtrWriteEntry();
0154    }
0155 
0156    /// Return the entry number that was last flushed in a cluster.
0157    ROOT::NTupleSize_t GetLastFlushed() const { return fLastFlushed; }
0158    /// Return the number of entries filled so far.
0159    ROOT::NTupleSize_t GetNEntries() const { return fNEntries; }
0160 
0161    void EnableStagedClusterCommitting(bool val = true)
0162    {
0163       if (!val && !fStagedClusters.empty()) {
0164          throw RException(R__FAIL("cannot disable staged committing with pending clusters"));
0165       }
0166       fStagedClusterCommitting = val;
0167    }
0168    bool IsStagedClusterCommittingEnabled() const { return fStagedClusterCommitting; }
0169 
0170    void EnableMetrics() { fMetrics.Enable(); }
0171    const Experimental::Detail::RNTupleMetrics &GetMetrics() const { return fMetrics; }
0172 };
0173 
0174 } // namespace ROOT
0175 
0176 #endif // ROOT_RNTupleFillContext