|
||||
File indexing completed on 2025-01-18 10:10:36
0001 /// \file ROOT/RClusterPool.hxx 0002 /// \ingroup NTuple ROOT7 0003 /// \author Jakob Blomer <jblomer@cern.ch> 0004 /// \date 2020-03-11 0005 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback 0006 /// is welcome! 0007 0008 /************************************************************************* 0009 * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers. * 0010 * All rights reserved. * 0011 * * 0012 * For the licensing terms see $ROOTSYS/LICENSE. * 0013 * For the list of contributors see $ROOTSYS/README/CREDITS. * 0014 *************************************************************************/ 0015 0016 #ifndef ROOT7_RClusterPool 0017 #define ROOT7_RClusterPool 0018 0019 #include <ROOT/RCluster.hxx> 0020 #include <ROOT/RNTupleUtil.hxx> 0021 0022 #include <condition_variable> 0023 #include <deque> 0024 #include <memory> 0025 #include <mutex> 0026 #include <future> 0027 #include <thread> 0028 #include <set> 0029 #include <vector> 0030 0031 namespace ROOT { 0032 namespace Experimental { 0033 0034 namespace Internal { 0035 class RPageSource; 0036 0037 // clang-format off 0038 /** 0039 \class ROOT::Experimental::Internal::RClusterPool 0040 \ingroup NTuple 0041 \brief Managed a set of clusters containing compressed and packed pages 0042 0043 The cluster pool steers the preloading of (partial) clusters. There is a two-step pipeline: in a first step, 0044 compressed pages are read from clusters into a memory buffer. The second pipeline step decompresses the pages 0045 and pushes them into the page pool. The actual logic of reading and unzipping is implemented by the page source. 0046 The cluster pool only orchestrates the work queues for reading and unzipping. It uses one extra I/O thread for 0047 reading waits for data from storage and generates no CPU load. 0048 0049 The unzipping step of the pipeline therefore behaves differently depending on whether or not implicit multi-threading 0050 is turned on. If it is turned off, i.e. in a single-threaded environment, the cluster pool will only read the 0051 compressed pages and the page source has to uncompresses pages at a later point when data from the page is requested. 0052 */ 0053 // clang-format on 0054 class RClusterPool { 0055 private: 0056 /// Request to load a subset of the columns of a particular cluster. 0057 /// Work items come in groups and are executed by the page source. 0058 struct RReadItem { 0059 /// Items with different bunch ids are scheduled for different vector reads 0060 std::int64_t fBunchId = -1; 0061 std::promise<std::unique_ptr<RCluster>> fPromise; 0062 RCluster::RKey fClusterKey; 0063 }; 0064 0065 /// Clusters that are currently being processed by the pipeline. Every in-flight cluster has a corresponding 0066 /// work item, first a read item and then an unzip item. 0067 struct RInFlightCluster { 0068 std::future<std::unique_ptr<RCluster>> fFuture; 0069 RCluster::RKey fClusterKey; 0070 /// By the time a cluster has been loaded, this cluster might not be necessary anymore. This can happen if 0071 /// there are jumps in the access pattern (i.e. the access pattern deviates from linear access). 0072 bool fIsExpired = false; 0073 0074 bool operator ==(const RInFlightCluster &other) const { 0075 return (fClusterKey.fClusterId == other.fClusterKey.fClusterId) && 0076 (fClusterKey.fPhysicalColumnSet == other.fClusterKey.fPhysicalColumnSet); 0077 } 0078 bool operator !=(const RInFlightCluster &other) const { return !(*this == other); } 0079 /// First order by cluster id, then by number of columns, than by the column ids in fColumns 0080 bool operator <(const RInFlightCluster &other) const; 0081 }; 0082 0083 /// Every cluster pool is responsible for exactly one page source that triggers loading of the clusters 0084 /// (GetCluster()) and is used for implementing the I/O and cluster memory allocation (PageSource::LoadClusters()). 0085 RPageSource &fPageSource; 0086 /// The number of clusters before the currently active cluster that should stay in the pool if present 0087 /// Reserved for later use. 0088 unsigned int fWindowPre = 0; 0089 /// The number of clusters that are being read in a single vector read. 0090 unsigned int fClusterBunchSize; 0091 /// Used as an ever-growing counter in GetCluster() to separate bunches of clusters from each other 0092 std::int64_t fBunchId = 0; 0093 /// The cache of clusters around the currently active cluster 0094 std::vector<std::unique_ptr<RCluster>> fPool; 0095 0096 /// Protects the shared state between the main thread and the I/O thread, namely the work queue and the in-flight 0097 /// clusters vector 0098 std::mutex fLockWorkQueue; 0099 /// The clusters that were handed off to the I/O thread 0100 std::vector<RInFlightCluster> fInFlightClusters; 0101 /// Signals a non-empty I/O work queue 0102 std::condition_variable fCvHasReadWork; 0103 /// The communication channel to the I/O thread 0104 std::deque<RReadItem> fReadQueue; 0105 0106 /// The I/O thread calls RPageSource::LoadClusters() asynchronously. The thread is mostly waiting for the 0107 /// data to arrive (blocked by the kernel) and therefore can safely run in addition to the application 0108 /// main threads. 0109 std::thread fThreadIo; 0110 0111 /// Every cluster id has at most one corresponding RCluster pointer in the pool 0112 RCluster *FindInPool(DescriptorId_t clusterId) const; 0113 /// Returns an index of an unused element in fPool; callers of this function (GetCluster() and WaitFor()) 0114 /// make sure that a free slot actually exists 0115 size_t FindFreeSlot() const; 0116 /// The I/O thread routine, there is exactly one I/O thread in-flight for every cluster pool 0117 void ExecReadClusters(); 0118 /// Returns the given cluster from the pool, which needs to contain at least the columns `physicalColumns`. 0119 /// Executed at the end of GetCluster when all missing data pieces have been sent to the load queue. 0120 /// Ideally, the function returns without blocking if the cluster is already in the pool. 0121 RCluster *WaitFor(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns); 0122 0123 public: 0124 static constexpr unsigned int kDefaultClusterBunchSize = 1; 0125 RClusterPool(RPageSource &pageSource, unsigned int clusterBunchSize); 0126 explicit RClusterPool(RPageSource &pageSource) : RClusterPool(pageSource, kDefaultClusterBunchSize) {} 0127 RClusterPool(const RClusterPool &other) = delete; 0128 RClusterPool &operator =(const RClusterPool &other) = delete; 0129 ~RClusterPool(); 0130 0131 /// Returns the requested cluster either from the pool or, in case of a cache miss, lets the I/O thread load 0132 /// the cluster in the pool, blocks until done, and then returns it. Triggers along the way the background loading 0133 /// of the following fWindowPost number of clusters. The returned cluster has at least all the pages of 0134 /// `physicalColumns` and possibly pages of other columns, too. If implicit multi-threading is turned on, the 0135 /// uncompressed pages of the returned cluster are already pushed into the page pool associated with the page source 0136 /// upon return. The cluster remains valid until the next call to GetCluster(). 0137 RCluster *GetCluster(DescriptorId_t clusterId, const RCluster::ColumnSet_t &physicalColumns); 0138 0139 /// Used by the unit tests to drain the queue of clusters to be preloaded 0140 void WaitForInFlightClusters(); 0141 }; // class RClusterPool 0142 0143 } // namespace Internal 0144 } // namespace Experimental 0145 } // namespace ROOT 0146 0147 #endif
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.3.7 LXR engine. The LXR team |