Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:10:36

0001 /// \file ROOT/RClusterPool.hxx
0002 /// \ingroup NTuple ROOT7
0003 /// \author Jakob Blomer <jblomer@cern.ch>
0004 /// \date 2020-03-11
0005 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
0006 /// is welcome!
0007 
0008 /*************************************************************************
0009  * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers.               *
0010  * All rights reserved.                                                  *
0011  *                                                                       *
0012  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0013  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0014  *************************************************************************/
0015 
0016 #ifndef ROOT7_RClusterPool
0017 #define ROOT7_RClusterPool
0018 
0019 #include <ROOT/RCluster.hxx>
0020 #include <ROOT/RNTupleUtil.hxx>
0021 
0022 #include <condition_variable>
0023 #include <deque>
0024 #include <memory>
0025 #include <mutex>
0026 #include <future>
0027 #include <thread>
0028 #include <set>
0029 #include <vector>
0030 
0031 namespace ROOT {
0032 namespace Experimental {
0033 
0034 namespace Internal {
0035 class RPageSource;
0036 
0037 // clang-format off
0038 /**
0039 \class ROOT::Experimental::Internal::RClusterPool
0040 \ingroup NTuple
0041 \brief Managed a set of clusters containing compressed and packed pages
0042 
0043 The cluster pool steers the preloading of (partial) clusters. There is a two-step pipeline: in a first step,
0044 compressed pages are read from clusters into a memory buffer. The second pipeline step decompresses the pages
0045 and pushes them into the page pool. The actual logic of reading and unzipping is implemented by the page source.
0046 The cluster pool only orchestrates the work queues for reading and unzipping. It uses one extra I/O thread for
0047 reading waits for data from storage and generates no CPU load.
0048 
0049 The unzipping step of the pipeline therefore behaves differently depending on whether or not implicit multi-threading
0050 is turned on. If it is turned off, i.e. in a single-threaded environment, the cluster pool will only read the
0051 compressed pages and the page source has to uncompresses pages at a later point when data from the page is requested.
0052 */
0053 // clang-format on
0054 class RClusterPool {
0055 private:
0056    /// Request to load a subset of the columns of a particular cluster.
0057    /// Work items come in groups and are executed by the page source.
0058    struct RReadItem {
0059       /// Items with different bunch ids are scheduled for different vector reads
0060       std::int64_t fBunchId = -1;
0061       std::promise<std::unique_ptr<RCluster>> fPromise;
0062       RCluster::RKey fClusterKey;
0063    };
0064 
0065    /// Clusters that are currently being processed by the pipeline.  Every in-flight cluster has a corresponding
0066    /// work item, first a read item and then an unzip item.
0067    struct RInFlightCluster {
0068       std::future<std::unique_ptr<RCluster>> fFuture;
0069       RCluster::RKey fClusterKey;
0070       /// By the time a cluster has been loaded, this cluster might not be necessary anymore. This can happen if
0071       /// there are jumps in the access pattern (i.e. the access pattern deviates from linear access).
0072       bool fIsExpired = false;
0073 
0074       bool operator ==(const RInFlightCluster &other) const {
0075          return (fClusterKey.fClusterId == other.fClusterKey.fClusterId) &&
0076                 (fClusterKey.fPhysicalColumnSet == other.fClusterKey.fPhysicalColumnSet);
0077       }
0078       bool operator !=(const RInFlightCluster &other) const { return !(*this == other); }
0079       /// First order by cluster id, then by number of columns, than by the column ids in fColumns
0080       bool operator <(const RInFlightCluster &other) const;
0081    };
0082 
0083    /// Every cluster pool is responsible for exactly one page source that triggers loading of the clusters
0084    /// (GetCluster()) and is used for implementing the I/O and cluster memory allocation (PageSource::LoadClusters()).
0085    RPageSource &fPageSource;
0086    /// The number of clusters before the currently active cluster that should stay in the pool if present
0087    /// Reserved for later use.
0088    unsigned int fWindowPre = 0;
0089    /// The number of clusters that are being read in a single vector read.
0090    unsigned int fClusterBunchSize;
0091    /// Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other
0092    std::int64_t fBunchId = 0;
0093    /// The cache of clusters around the currently active cluster
0094    std::vector<std::unique_ptr<RCluster>> fPool;
0095 
0096    /// Protects the shared state between the main thread and the I/O thread, namely the work queue and the in-flight
0097    /// clusters vector
0098    std::mutex fLockWorkQueue;
0099    /// The clusters that were handed off to the I/O thread
0100    std::vector<RInFlightCluster> fInFlightClusters;
0101    /// Signals a non-empty I/O work queue
0102    std::condition_variable fCvHasReadWork;
0103    /// The communication channel to the I/O thread
0104    std::deque<RReadItem> fReadQueue;
0105 
0106    /// The I/O thread calls RPageSource::LoadClusters() asynchronously.  The thread is mostly waiting for the
0107    /// data to arrive (blocked by the kernel) and therefore can safely run in addition to the application
0108    /// main threads.
0109    std::thread fThreadIo;
0110 
0111    /// Every cluster id has at most one corresponding RCluster pointer in the pool
0112    RCluster *FindInPool(DescriptorId_t clusterId) const;
0113    /// Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor())
0114    /// make sure that a free slot actually exists
0115    size_t FindFreeSlot() const;
0116    /// The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool
0117    void ExecReadClusters();
0118    /// Returns the given cluster from the pool, which needs to contain at least the columns `physicalColumns`.
0119    /// Executed at the end of GetCluster when all missing data pieces have been sent to the load queue.
0120    /// Ideally, the function returns without blocking if the cluster is already in the pool.
0121    RCluster *WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns);
0122 
0123 public:
0124    static constexpr unsigned int kDefaultClusterBunchSize = 1;
0125    RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize);
0126    explicit RClusterPool(RPageSource &pageSource) : RClusterPool(pageSource, kDefaultClusterBunchSize) {}
0127    RClusterPool(const RClusterPool &other) = delete;
0128    RClusterPool &operator =(const RClusterPool &other) = delete;
0129    ~RClusterPool();
0130 
0131    /// Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread load
0132    /// the cluster in the pool, blocks until done, and then returns it.  Triggers along the way the background loading
0133    /// of the following fWindowPost number of clusters.  The returned cluster has at least all the pages of
0134    /// `physicalColumns` and possibly pages of other columns, too.  If implicit multi-threading is turned on, the
0135    /// uncompressed pages of the returned cluster are already pushed into the page pool associated with the page source
0136    /// upon return. The cluster remains valid until the next call to GetCluster().
0137    RCluster *GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns);
0138 
0139    /// Used by the unit tests to drain the queue of clusters to be preloaded
0140    void WaitForInFlightClusters();
0141 }; // class RClusterPool
0142 
0143 } // namespace Internal
0144 } // namespace Experimental
0145 } // namespace ROOT
0146 
0147 #endif