File indexing completed on 2025-01-18 10:10:47
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #ifndef ROOT7_RPageStorage
0017 #define ROOT7_RPageStorage
0018
0019 #include <ROOT/RCluster.hxx>
0020 #include <ROOT/RNTupleDescriptor.hxx>
0021 #include <ROOT/RNTupleMetrics.hxx>
0022 #include <ROOT/RNTupleReadOptions.hxx>
0023 #include <ROOT/RNTupleWriteOptions.hxx>
0024 #include <ROOT/RNTupleUtil.hxx>
0025 #include <ROOT/RPage.hxx>
0026 #include <ROOT/RPageAllocator.hxx>
0027 #include <ROOT/RSpan.hxx>
0028 #include <string_view>
0029
0030 #include <atomic>
0031 #include <cstddef>
0032 #include <deque>
0033 #include <functional>
0034 #include <memory>
0035 #include <mutex>
0036 #include <shared_mutex>
0037 #include <unordered_set>
0038 #include <vector>
0039
0040 namespace ROOT {
0041 namespace Experimental {
0042
0043 class RFieldBase;
0044 class RNTupleModel;
0045
0046 namespace Internal {
0047 class RColumn;
0048 class RColumnElementBase;
0049 class RNTupleCompressor;
0050 class RNTupleDecompressor;
0051 struct RNTupleModelChangeset;
0052
0053 enum class EPageStorageType {
0054 kSink,
0055 kSource,
0056 };
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068 class RPageStorage {
0069 public:
0070
0071 class RTaskScheduler {
0072 public:
0073 virtual ~RTaskScheduler() = default;
0074
0075 virtual void AddTask(const std::function<void(void)> &taskFunc) = 0;
0076
0077 virtual void Wait() = 0;
0078 };
0079
0080
0081
0082
0083
0084 struct RSealedPage {
0085 const void *fBuffer = nullptr;
0086 std::uint32_t fSize = 0;
0087 std::uint32_t fNElements = 0;
0088
0089 RSealedPage() = default;
0090 RSealedPage(const void *b, std::uint32_t s, std::uint32_t n) : fBuffer(b), fSize(s), fNElements(n) {}
0091 RSealedPage(const RSealedPage &other) = delete;
0092 RSealedPage& operator =(const RSealedPage &other) = delete;
0093 RSealedPage(RSealedPage &&other) = default;
0094 RSealedPage& operator =(RSealedPage &&other) = default;
0095 };
0096
0097 using SealedPageSequence_t = std::deque<RSealedPage>;
0098
0099 struct RSealedPageGroup {
0100 DescriptorId_t fPhysicalColumnId;
0101 SealedPageSequence_t::const_iterator fFirst;
0102 SealedPageSequence_t::const_iterator fLast;
0103
0104 RSealedPageGroup(DescriptorId_t d, SealedPageSequence_t::const_iterator b, SealedPageSequence_t::const_iterator e)
0105 : fPhysicalColumnId(d), fFirst(b), fLast(e)
0106 {
0107 }
0108 };
0109
0110 protected:
0111 Detail::RNTupleMetrics fMetrics;
0112
0113 std::string fNTupleName;
0114 RTaskScheduler *fTaskScheduler = nullptr;
0115 void WaitForAllTasks()
0116 {
0117 if (!fTaskScheduler)
0118 return;
0119 fTaskScheduler->Wait();
0120 }
0121
0122 public:
0123 explicit RPageStorage(std::string_view name);
0124 RPageStorage(const RPageStorage &other) = delete;
0125 RPageStorage& operator =(const RPageStorage &other) = delete;
0126 RPageStorage(RPageStorage &&other) = default;
0127 RPageStorage& operator =(RPageStorage &&other) = default;
0128 virtual ~RPageStorage();
0129
0130
0131 virtual EPageStorageType GetType() = 0;
0132
0133 struct RColumnHandle {
0134 DescriptorId_t fPhysicalId = kInvalidDescriptorId;
0135 const RColumn *fColumn = nullptr;
0136
0137
0138
0139 explicit operator bool() const { return fPhysicalId != kInvalidDescriptorId && fColumn; }
0140 };
0141
0142 using ColumnHandle_t = RColumnHandle;
0143
0144
0145
0146 virtual ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) = 0;
0147
0148
0149 virtual void DropColumn(ColumnHandle_t columnHandle) = 0;
0150
0151
0152
0153 virtual void ReleasePage(RPage &page) = 0;
0154
0155
0156
0157 virtual Detail::RNTupleMetrics &GetMetrics() { return fMetrics; }
0158
0159
0160 const std::string &GetNTupleName() const { return fNTupleName; }
0161
0162 void SetTaskScheduler(RTaskScheduler *taskScheduler) { fTaskScheduler = taskScheduler; }
0163 };
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173
0174
0175
0176
0177
0178
0179 class RPageSink : public RPageStorage {
0180 protected:
0181 std::unique_ptr<RNTupleWriteOptions> fOptions;
0182
0183
0184
0185
0186 std::unique_ptr<RNTupleCompressor> fCompressor;
0187
0188
0189
0190
0191
0192
0193 RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting);
0194
0195
0196 static RSealedPage SealPage(const RPage &page, const RColumnElementBase &element, int compressionSetting, void *buf,
0197 bool allowAlias = true);
0198
0199 private:
0200
0201 bool fIsInitialized = false;
0202
0203 public:
0204 RPageSink(std::string_view ntupleName, const RNTupleWriteOptions &options);
0205
0206 RPageSink(const RPageSink&) = delete;
0207 RPageSink& operator=(const RPageSink&) = delete;
0208 RPageSink(RPageSink&&) = default;
0209 RPageSink& operator=(RPageSink&&) = default;
0210 ~RPageSink() override;
0211
0212 EPageStorageType GetType() final { return EPageStorageType::kSink; }
0213
0214 const RNTupleWriteOptions &GetWriteOptions() const { return *fOptions; }
0215
0216 void DropColumn(ColumnHandle_t ) final {}
0217
0218 bool IsInitialized() const { return fIsInitialized; }
0219
0220
0221 virtual const RNTupleDescriptor &GetDescriptor() const = 0;
0222
0223
0224
0225 void Init(RNTupleModel &model)
0226 {
0227 if (fIsInitialized) {
0228 throw RException(R__FAIL("already initialized"));
0229 }
0230 fIsInitialized = true;
0231 InitImpl(model);
0232 }
0233
0234 protected:
0235 virtual void InitImpl(RNTupleModel &model) = 0;
0236
0237 public:
0238
0239
0240
0241 virtual void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) = 0;
0242
0243
0244 virtual void CommitPage(ColumnHandle_t columnHandle, const RPage &page) = 0;
0245
0246 virtual void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
0247
0248 virtual void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) = 0;
0249
0250
0251 virtual std::uint64_t CommitCluster(NTupleSize_t nNewEntries) = 0;
0252
0253
0254 virtual void CommitClusterGroup() = 0;
0255
0256 virtual void CommitDataset() = 0;
0257
0258
0259
0260 virtual RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) = 0;
0261
0262
0263 class RSinkGuard {
0264 std::mutex *fLock;
0265
0266 public:
0267 explicit RSinkGuard(std::mutex *lock) : fLock(lock)
0268 {
0269 if (fLock != nullptr) {
0270 fLock->lock();
0271 }
0272 }
0273 RSinkGuard(const RSinkGuard &) = delete;
0274 RSinkGuard &operator=(const RSinkGuard &) = delete;
0275 RSinkGuard(RSinkGuard &&) = delete;
0276 RSinkGuard &operator=(RSinkGuard &&) = delete;
0277 ~RSinkGuard()
0278 {
0279 if (fLock != nullptr) {
0280 fLock->unlock();
0281 }
0282 }
0283 };
0284
0285 virtual RSinkGuard GetSinkGuard()
0286 {
0287
0288 return RSinkGuard(nullptr);
0289 }
0290 };
0291
0292
0293
0294
0295
0296
0297
0298
0299 class RPagePersistentSink : public RPageSink {
0300 private:
0301
0302 RNTupleSerializer::RContext fSerializationContext;
0303
0304
0305 std::uint64_t fNextClusterInGroup = 0;
0306
0307 NTupleSize_t fPrevClusterNEntries = 0;
0308
0309 std::vector<RClusterDescriptor::RColumnRange> fOpenColumnRanges;
0310
0311 std::vector<RClusterDescriptor::RPageRange> fOpenPageRanges;
0312
0313 protected:
0314 Internal::RNTupleDescriptorBuilder fDescriptorBuilder;
0315
0316
0317 struct RCounters {
0318 Detail::RNTupleAtomicCounter &fNPageCommitted;
0319 Detail::RNTupleAtomicCounter &fSzWritePayload;
0320 Detail::RNTupleAtomicCounter &fSzZip;
0321 Detail::RNTupleAtomicCounter &fTimeWallWrite;
0322 Detail::RNTupleAtomicCounter &fTimeWallZip;
0323 Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> &fTimeCpuWrite;
0324 Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> &fTimeCpuZip;
0325 };
0326 std::unique_ptr<RCounters> fCounters;
0327
0328 virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length) = 0;
0329
0330 virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) = 0;
0331 virtual RNTupleLocator
0332 CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
0333
0334
0335
0336
0337
0338
0339 virtual std::vector<RNTupleLocator> CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges);
0340
0341 virtual std::uint64_t CommitClusterImpl() = 0;
0342
0343
0344 virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) = 0;
0345 virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) = 0;
0346
0347
0348
0349
0350
0351
0352
0353 void EnableDefaultMetrics(const std::string &prefix);
0354
0355 public:
0356 RPagePersistentSink(std::string_view ntupleName, const RNTupleWriteOptions &options);
0357
0358 RPagePersistentSink(const RPagePersistentSink &) = delete;
0359 RPagePersistentSink &operator=(const RPagePersistentSink &) = delete;
0360 RPagePersistentSink(RPagePersistentSink &&) = default;
0361 RPagePersistentSink &operator=(RPagePersistentSink &&) = default;
0362 ~RPagePersistentSink() override;
0363
0364
0365 static std::unique_ptr<RPageSink> Create(std::string_view ntupleName, std::string_view location,
0366 const RNTupleWriteOptions &options = RNTupleWriteOptions());
0367
0368 ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) final;
0369
0370 const RNTupleDescriptor &GetDescriptor() const final { return fDescriptorBuilder.GetDescriptor(); }
0371
0372
0373 void InitImpl(RNTupleModel &model) final;
0374 void UpdateSchema(const RNTupleModelChangeset &changeset, NTupleSize_t firstEntry) final;
0375
0376
0377 void InitFromDescriptor(const RNTupleDescriptor &descriptor);
0378
0379 void CommitPage(ColumnHandle_t columnHandle, const RPage &page) final;
0380 void CommitSealedPage(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
0381 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0382 std::uint64_t CommitCluster(NTupleSize_t nEntries) final;
0383 void CommitClusterGroup() final;
0384 void CommitDataset() final;
0385 };
0386
0387
0388
0389
0390
0391
0392
0393
0394
0395
0396
0397
0398 class RPageSource : public RPageStorage {
0399 public:
0400
0401 struct REntryRange {
0402 NTupleSize_t fFirstEntry = kInvalidNTupleIndex;
0403 NTupleSize_t fNEntries = 0;
0404
0405
0406 bool IntersectsWith(const RClusterDescriptor &clusterDesc) const;
0407 };
0408
0409
0410 class RSharedDescriptorGuard {
0411 const RNTupleDescriptor &fDescriptor;
0412 std::shared_mutex &fLock;
0413
0414 public:
0415 RSharedDescriptorGuard(const RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
0416 {
0417 fLock.lock_shared();
0418 }
0419 RSharedDescriptorGuard(const RSharedDescriptorGuard &) = delete;
0420 RSharedDescriptorGuard &operator=(const RSharedDescriptorGuard &) = delete;
0421 RSharedDescriptorGuard(RSharedDescriptorGuard &&) = delete;
0422 RSharedDescriptorGuard &operator=(RSharedDescriptorGuard &&) = delete;
0423 ~RSharedDescriptorGuard() { fLock.unlock_shared(); }
0424 const RNTupleDescriptor *operator->() const { return &fDescriptor; }
0425 const RNTupleDescriptor &GetRef() const { return fDescriptor; }
0426 };
0427
0428
0429 class RExclDescriptorGuard {
0430 RNTupleDescriptor &fDescriptor;
0431 std::shared_mutex &fLock;
0432
0433 public:
0434 RExclDescriptorGuard(RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
0435 {
0436 fLock.lock();
0437 }
0438 RExclDescriptorGuard(const RExclDescriptorGuard &) = delete;
0439 RExclDescriptorGuard &operator=(const RExclDescriptorGuard &) = delete;
0440 RExclDescriptorGuard(RExclDescriptorGuard &&) = delete;
0441 RExclDescriptorGuard &operator=(RExclDescriptorGuard &&) = delete;
0442 ~RExclDescriptorGuard()
0443 {
0444 fDescriptor.IncGeneration();
0445 fLock.unlock();
0446 }
0447 RNTupleDescriptor *operator->() const { return &fDescriptor; }
0448 void MoveIn(RNTupleDescriptor &&desc) { fDescriptor = std::move(desc); }
0449 };
0450
0451 private:
0452 RNTupleDescriptor fDescriptor;
0453 mutable std::shared_mutex fDescriptorLock;
0454 REntryRange fEntryRange;
0455
0456 protected:
0457
0458 struct RCounters {
0459 Detail::RNTupleAtomicCounter &fNReadV;
0460 Detail::RNTupleAtomicCounter &fNRead;
0461 Detail::RNTupleAtomicCounter &fSzReadPayload;
0462 Detail::RNTupleAtomicCounter &fSzReadOverhead;
0463 Detail::RNTupleAtomicCounter &fSzUnzip;
0464 Detail::RNTupleAtomicCounter &fNClusterLoaded;
0465 Detail::RNTupleAtomicCounter &fNPageLoaded;
0466 Detail::RNTupleAtomicCounter &fNPagePopulated;
0467 Detail::RNTupleAtomicCounter &fTimeWallRead;
0468 Detail::RNTupleAtomicCounter &fTimeWallUnzip;
0469 Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> &fTimeCpuRead;
0470 Detail::RNTupleTickCounter<Detail::RNTupleAtomicCounter> &fTimeCpuUnzip;
0471 Detail::RNTupleCalcPerf &fBandwidthReadUncompressed;
0472 Detail::RNTupleCalcPerf &fBandwidthReadCompressed;
0473 Detail::RNTupleCalcPerf &fBandwidthUnzip;
0474 Detail::RNTupleCalcPerf &fFractionReadOverhead;
0475 Detail::RNTupleCalcPerf &fCompressionRatio;
0476 };
0477
0478
0479
0480 class RActivePhysicalColumns {
0481 private:
0482 std::vector<DescriptorId_t> fIDs;
0483 std::vector<std::size_t> fRefCounters;
0484
0485 public:
0486 void Insert(DescriptorId_t physicalColumnID);
0487 void Erase(DescriptorId_t physicalColumnID);
0488 RCluster::ColumnSet_t ToColumnSet() const;
0489 };
0490
0491 std::unique_ptr<RCounters> fCounters;
0492
0493 RNTupleReadOptions fOptions;
0494
0495 RActivePhysicalColumns fActivePhysicalColumns;
0496
0497
0498
0499
0500 std::unique_ptr<RNTupleDecompressor> fDecompressor;
0501
0502 virtual RNTupleDescriptor AttachImpl() = 0;
0503
0504 virtual void UnzipClusterImpl(RCluster * )
0505 { }
0506
0507
0508
0509
0510 void PrepareLoadCluster(
0511 const RCluster::RKey &clusterKey, ROnDiskPageMap &pageZeroMap,
0512 std::function<void(DescriptorId_t, NTupleSize_t, const RClusterDescriptor::RPageRange::RPageInfo &)> perPageFunc);
0513
0514
0515
0516
0517
0518
0519
0520 void EnableDefaultMetrics(const std::string &prefix);
0521
0522
0523 RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); }
0524
0525 public:
0526 RPageSource(std::string_view ntupleName, const RNTupleReadOptions &fOptions);
0527 RPageSource(const RPageSource&) = delete;
0528 RPageSource& operator=(const RPageSource&) = delete;
0529 RPageSource(RPageSource &&) = delete;
0530 RPageSource &operator=(RPageSource &&) = delete;
0531 ~RPageSource() override;
0532
0533 static std::unique_ptr<RPageSource> Create(std::string_view ntupleName, std::string_view location,
0534 const RNTupleReadOptions &options = RNTupleReadOptions());
0535
0536 virtual std::unique_ptr<RPageSource> Clone() const = 0;
0537
0538 EPageStorageType GetType() final { return EPageStorageType::kSource; }
0539 const RNTupleReadOptions &GetReadOptions() const { return fOptions; }
0540
0541
0542
0543
0544
0545
0546
0547
0548 const RSharedDescriptorGuard GetSharedDescriptorGuard() const
0549 {
0550 return RSharedDescriptorGuard(fDescriptor, fDescriptorLock);
0551 }
0552
0553 ColumnHandle_t AddColumn(DescriptorId_t fieldId, const RColumn &column) override;
0554 void DropColumn(ColumnHandle_t columnHandle) override;
0555
0556
0557 void Attach() { GetExclDescriptorGuard().MoveIn(AttachImpl()); }
0558 NTupleSize_t GetNEntries();
0559 NTupleSize_t GetNElements(ColumnHandle_t columnHandle);
0560 ColumnId_t GetColumnId(ColumnHandle_t columnHandle);
0561
0562
0563
0564 void SetEntryRange(const REntryRange &range);
0565 REntryRange GetEntryRange() const { return fEntryRange; }
0566
0567
0568 virtual RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) = 0;
0569
0570 virtual RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) = 0;
0571
0572
0573
0574
0575
0576
0577 virtual void
0578 LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) = 0;
0579
0580
0581
0582
0583
0584
0585 RPage UnsealPage(const RSealedPage &sealedPage, const RColumnElementBase &element, DescriptorId_t physicalColumnId);
0586
0587
0588
0589
0590
0591
0592
0593
0594 virtual std::vector<std::unique_ptr<RCluster>> LoadClusters(std::span<RCluster::RKey> clusterKeys) = 0;
0595
0596
0597
0598
0599
0600
0601 void UnzipCluster(RCluster *cluster);
0602 };
0603
0604 }
0605
0606 }
0607 }
0608
0609 #endif