File indexing completed on 2025-09-16 09:08:35
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014 #ifndef ROOT_RPageStorage
0015 #define ROOT_RPageStorage
0016
0017 #include <ROOT/RError.hxx>
0018 #include <ROOT/RCluster.hxx>
0019 #include <ROOT/RColumnElementBase.hxx>
0020 #include <ROOT/RNTupleDescriptor.hxx>
0021 #include <ROOT/RNTupleMetrics.hxx>
0022 #include <ROOT/RNTupleReadOptions.hxx>
0023 #include <ROOT/RNTupleSerialize.hxx>
0024 #include <ROOT/RNTupleWriteOptions.hxx>
0025 #include <ROOT/RNTupleUtil.hxx>
0026 #include <ROOT/RPage.hxx>
0027 #include <ROOT/RPagePool.hxx>
0028 #include <ROOT/RSpan.hxx>
0029 #include <string_view>
0030
0031 #include <atomic>
0032 #include <cassert>
0033 #include <cstddef>
0034 #include <deque>
0035 #include <functional>
0036 #include <map>
0037 #include <memory>
0038 #include <mutex>
0039 #include <set>
0040 #include <shared_mutex>
0041 #include <unordered_map>
0042 #include <unordered_set>
0043 #include <vector>
0044
0045 namespace ROOT {
0046
0047 class RNTupleModel;
0048
0049 namespace Internal {
0050
0051 class RPageAllocator;
0052 class RColumn;
0053 struct RNTupleModelChangeset;
0054
0055 enum class EPageStorageType {
0056 kSink,
0057 kSource,
0058 };
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070 class RPageStorage {
0071 public:
0072
0073 static constexpr std::size_t kNBytesPageChecksum = sizeof(std::uint64_t);
0074
0075
0076 class RTaskScheduler {
0077 public:
0078 virtual ~RTaskScheduler() = default;
0079
0080 virtual void AddTask(const std::function<void(void)> &taskFunc) = 0;
0081
0082 virtual void Wait() = 0;
0083 };
0084
0085
0086
0087
0088
0089 struct RSealedPage {
0090 private:
0091 const void *fBuffer = nullptr;
0092 std::size_t fBufferSize = 0;
0093 std::uint32_t fNElements = 0;
0094 bool fHasChecksum = false;
0095
0096 public:
0097 RSealedPage() = default;
0098 RSealedPage(const void *buffer, std::size_t bufferSize, std::uint32_t nElements, bool hasChecksum = false)
0099 : fBuffer(buffer), fBufferSize(bufferSize), fNElements(nElements), fHasChecksum(hasChecksum)
0100 {
0101 }
0102 RSealedPage(const RSealedPage &other) = default;
0103 RSealedPage &operator=(const RSealedPage &other) = default;
0104 RSealedPage(RSealedPage &&other) = default;
0105 RSealedPage &operator=(RSealedPage &&other) = default;
0106
0107 const void *GetBuffer() const { return fBuffer; }
0108 void SetBuffer(const void *buffer) { fBuffer = buffer; }
0109
0110 std::size_t GetDataSize() const
0111 {
0112 assert(fBufferSize >= fHasChecksum * kNBytesPageChecksum);
0113 return fBufferSize - fHasChecksum * kNBytesPageChecksum;
0114 }
0115 std::size_t GetBufferSize() const { return fBufferSize; }
0116 void SetBufferSize(std::size_t bufferSize) { fBufferSize = bufferSize; }
0117
0118 std::uint32_t GetNElements() const { return fNElements; }
0119 void SetNElements(std::uint32_t nElements) { fNElements = nElements; }
0120
0121 bool GetHasChecksum() const { return fHasChecksum; }
0122 void SetHasChecksum(bool hasChecksum) { fHasChecksum = hasChecksum; }
0123
0124 void ChecksumIfEnabled();
0125 RResult<void> VerifyChecksumIfEnabled() const;
0126
0127 RResult<std::uint64_t> GetChecksum() const;
0128 };
0129
0130 using SealedPageSequence_t = std::deque<RSealedPage>;
0131
0132 struct RSealedPageGroup {
0133 ROOT::DescriptorId_t fPhysicalColumnId;
0134 SealedPageSequence_t::const_iterator fFirst;
0135 SealedPageSequence_t::const_iterator fLast;
0136
0137 RSealedPageGroup() = default;
0138 RSealedPageGroup(ROOT::DescriptorId_t d, SealedPageSequence_t::const_iterator b,
0139 SealedPageSequence_t::const_iterator e)
0140 : fPhysicalColumnId(d), fFirst(b), fLast(e)
0141 {
0142 }
0143 };
0144
0145 protected:
0146 ROOT::Experimental::Detail::RNTupleMetrics fMetrics;
0147
0148
0149 std::unique_ptr<ROOT::Internal::RPageAllocator> fPageAllocator;
0150
0151 std::string fNTupleName;
0152 RTaskScheduler *fTaskScheduler = nullptr;
0153 void WaitForAllTasks()
0154 {
0155 if (!fTaskScheduler)
0156 return;
0157 fTaskScheduler->Wait();
0158 }
0159
0160 public:
0161 explicit RPageStorage(std::string_view name);
0162 RPageStorage(const RPageStorage &other) = delete;
0163 RPageStorage &operator=(const RPageStorage &other) = delete;
0164 RPageStorage(RPageStorage &&other) = default;
0165 RPageStorage &operator=(RPageStorage &&other) = default;
0166 virtual ~RPageStorage();
0167
0168
0169 virtual EPageStorageType GetType() = 0;
0170
0171 struct RColumnHandle {
0172 ROOT::DescriptorId_t fPhysicalId = ROOT::kInvalidDescriptorId;
0173 ROOT::Internal::RColumn *fColumn = nullptr;
0174
0175
0176
0177 explicit operator bool() const { return fPhysicalId != ROOT::kInvalidDescriptorId && fColumn; }
0178 };
0179
0180 using ColumnHandle_t = RColumnHandle;
0181
0182
0183
0184 virtual ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) = 0;
0185
0186
0187 virtual void DropColumn(ColumnHandle_t columnHandle) = 0;
0188 ROOT::DescriptorId_t GetColumnId(ColumnHandle_t columnHandle) const { return columnHandle.fPhysicalId; }
0189
0190
0191
0192 virtual ROOT::Experimental::Detail::RNTupleMetrics &GetMetrics() { return fMetrics; }
0193
0194
0195 const std::string &GetNTupleName() const { return fNTupleName; }
0196
0197 void SetTaskScheduler(RTaskScheduler *taskScheduler) { fTaskScheduler = taskScheduler; }
0198 };
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210 class RWritePageMemoryManager {
0211 private:
0212 struct RColumnInfo {
0213 ROOT::Internal::RColumn *fColumn = nullptr;
0214 std::size_t fCurrentPageSize = 0;
0215 std::size_t fInitialPageSize = 0;
0216
0217 bool operator>(const RColumnInfo &other) const;
0218 };
0219
0220
0221 std::size_t fCurrentAllocatedBytes = 0;
0222
0223 std::size_t fMaxAllocatedBytes = 0;
0224
0225
0226 std::set<RColumnInfo, std::greater<RColumnInfo>> fColumnsSortedByPageSize;
0227
0228
0229
0230
0231 bool TryEvict(std::size_t targetAvailableSize, std::size_t pageSizeLimit);
0232
0233 public:
0234 explicit RWritePageMemoryManager(std::size_t maxAllocatedBytes) : fMaxAllocatedBytes(maxAllocatedBytes) {}
0235
0236
0237
0238
0239 bool TryUpdate(ROOT::Internal::RColumn &column, std::size_t newWritePageSize);
0240 };
0241
0242
0243
0244
0245
0246
0247
0248
0249
0250
0251
0252
0253
0254
0255
0256 class RPageSink : public RPageStorage {
0257 public:
0258 using Callback_t = std::function<void(RPageSink &)>;
0259
0260
0261 struct RStagedCluster {
0262 std::uint64_t fNBytesWritten = 0;
0263 ROOT::NTupleSize_t fNEntries = 0;
0264
0265 struct RColumnInfo {
0266 ROOT::RClusterDescriptor::RPageRange fPageRange;
0267 ROOT::NTupleSize_t fNElements = ROOT::kInvalidNTupleIndex;
0268 std::uint32_t fCompressionSettings;
0269 bool fIsSuppressed = false;
0270 };
0271
0272 std::vector<RColumnInfo> fColumnInfos;
0273 };
0274
0275 protected:
0276 std::unique_ptr<ROOT::RNTupleWriteOptions> fOptions;
0277
0278
0279 bool fIsInitialized = false;
0280
0281
0282
0283
0284
0285 RSealedPage SealPage(const ROOT::Internal::RPage &page, const ROOT::Internal::RColumnElementBase &element);
0286
0287 private:
0288 std::vector<Callback_t> fOnDatasetCommitCallbacks;
0289 std::vector<unsigned char> fSealPageBuffer;
0290
0291
0292 RWritePageMemoryManager fWritePageMemoryManager;
0293
0294 public:
0295 RPageSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options);
0296
0297 RPageSink(const RPageSink &) = delete;
0298 RPageSink &operator=(const RPageSink &) = delete;
0299 RPageSink(RPageSink &&) = default;
0300 RPageSink &operator=(RPageSink &&) = default;
0301 ~RPageSink() override;
0302
0303 EPageStorageType GetType() final { return EPageStorageType::kSink; }
0304
0305 const ROOT::RNTupleWriteOptions &GetWriteOptions() const { return *fOptions; }
0306
0307 void DropColumn(ColumnHandle_t ) final {}
0308
0309 bool IsInitialized() const { return fIsInitialized; }
0310
0311
0312 virtual const ROOT::RNTupleDescriptor &GetDescriptor() const = 0;
0313
0314 virtual ROOT::NTupleSize_t GetNEntries() const = 0;
0315
0316
0317
0318 void Init(RNTupleModel &model)
0319 {
0320 if (fIsInitialized) {
0321 throw RException(R__FAIL("already initialized"));
0322 }
0323 fIsInitialized = true;
0324 InitImpl(model);
0325 }
0326
0327 protected:
0328 virtual void InitImpl(RNTupleModel &model) = 0;
0329 virtual void CommitDatasetImpl() = 0;
0330
0331 public:
0332
0333 struct RSealPageConfig {
0334 const ROOT::Internal::RPage *fPage = nullptr;
0335 const ROOT::Internal::RColumnElementBase *fElement =
0336 nullptr;
0337 std::uint32_t fCompressionSettings = 0;
0338
0339
0340 bool fWriteChecksum = true;
0341
0342
0343 bool fAllowAlias = false;
0344
0345 void *fBuffer = nullptr;
0346 };
0347
0348
0349 static RSealedPage SealPage(const RSealPageConfig &config);
0350
0351
0352
0353
0354 virtual void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) = 0;
0355
0356
0357
0358
0359 virtual void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) = 0;
0360
0361
0362
0363 virtual void CommitSuppressedColumn(ColumnHandle_t columnHandle) = 0;
0364
0365 virtual void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) = 0;
0366
0367 virtual void
0368 CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
0369
0370 virtual void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) = 0;
0371
0372
0373
0374 virtual RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) = 0;
0375
0376 virtual void CommitStagedClusters(std::span<RStagedCluster> clusters) = 0;
0377
0378
0379 virtual std::uint64_t CommitCluster(ROOT::NTupleSize_t nNewEntries)
0380 {
0381 RStagedCluster stagedClusters[] = {StageCluster(nNewEntries)};
0382 CommitStagedClusters(stagedClusters);
0383 return stagedClusters[0].fNBytesWritten;
0384 }
0385
0386
0387 virtual void CommitClusterGroup() = 0;
0388
0389
0390 void RegisterOnCommitDatasetCallback(Callback_t callback) { fOnDatasetCommitCallbacks.emplace_back(callback); }
0391
0392 void CommitDataset();
0393
0394
0395
0396 virtual ROOT::Internal::RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements);
0397
0398
0399 class RSinkGuard {
0400 std::mutex *fLock;
0401
0402 public:
0403 explicit RSinkGuard(std::mutex *lock) : fLock(lock)
0404 {
0405 if (fLock != nullptr) {
0406 fLock->lock();
0407 }
0408 }
0409 RSinkGuard(const RSinkGuard &) = delete;
0410 RSinkGuard &operator=(const RSinkGuard &) = delete;
0411 RSinkGuard(RSinkGuard &&) = delete;
0412 RSinkGuard &operator=(RSinkGuard &&) = delete;
0413 ~RSinkGuard()
0414 {
0415 if (fLock != nullptr) {
0416 fLock->unlock();
0417 }
0418 }
0419 };
0420
0421 virtual RSinkGuard GetSinkGuard()
0422 {
0423
0424 return RSinkGuard(nullptr);
0425 }
0426 };
0427
0428
0429
0430
0431
0432
0433
0434
0435 class RPagePersistentSink : public RPageSink {
0436 private:
0437
0438 ROOT::Internal::RNTupleSerializer::RContext fSerializationContext;
0439
0440
0441 std::uint64_t fNextClusterInGroup = 0;
0442
0443 ROOT::NTupleSize_t fPrevClusterNEntries = 0;
0444
0445 std::vector<ROOT::RClusterDescriptor::RColumnRange> fOpenColumnRanges;
0446
0447 std::vector<ROOT::RClusterDescriptor::RPageRange> fOpenPageRanges;
0448
0449
0450 ROOT::Internal::RNTupleSerializer::StreamerInfoMap_t fStreamerInfos;
0451
0452 protected:
0453
0454 struct RFeatures {
0455 bool fCanMergePages = false;
0456 };
0457
0458 RFeatures fFeatures;
0459 ROOT::Internal::RNTupleDescriptorBuilder fDescriptorBuilder;
0460
0461
0462 struct RCounters {
0463 ROOT::Experimental::Detail::RNTupleAtomicCounter &fNPageCommitted;
0464 ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzWritePayload;
0465 ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzZip;
0466 ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallWrite;
0467 ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallZip;
0468 ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuWrite;
0469 ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuZip;
0470 };
0471 std::unique_ptr<RCounters> fCounters;
0472
0473 virtual void InitImpl(unsigned char *serializedHeader, std::uint32_t length) = 0;
0474
0475 virtual RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) = 0;
0476 virtual RNTupleLocator
0477 CommitSealedPageImpl(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) = 0;
0478
0479
0480
0481
0482
0483
0484
0485
0486 virtual std::vector<RNTupleLocator>
0487 CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges, const std::vector<bool> &mask);
0488
0489 virtual std::uint64_t StageClusterImpl() = 0;
0490
0491
0492 virtual RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) = 0;
0493 virtual void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) = 0;
0494
0495
0496
0497
0498
0499
0500
0501 void EnableDefaultMetrics(const std::string &prefix);
0502
0503 public:
0504 RPagePersistentSink(std::string_view ntupleName, const ROOT::RNTupleWriteOptions &options);
0505
0506 RPagePersistentSink(const RPagePersistentSink &) = delete;
0507 RPagePersistentSink &operator=(const RPagePersistentSink &) = delete;
0508 RPagePersistentSink(RPagePersistentSink &&) = default;
0509 RPagePersistentSink &operator=(RPagePersistentSink &&) = default;
0510 ~RPagePersistentSink() override;
0511
0512
0513 static std::unique_ptr<RPageSink> Create(std::string_view ntupleName, std::string_view location,
0514 const ROOT::RNTupleWriteOptions &options = ROOT::RNTupleWriteOptions());
0515
0516 ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) final;
0517
0518 const ROOT::RNTupleDescriptor &GetDescriptor() const final { return fDescriptorBuilder.GetDescriptor(); }
0519
0520 ROOT::NTupleSize_t GetNEntries() const final { return fPrevClusterNEntries; }
0521
0522
0523 void InitImpl(RNTupleModel &model) final;
0524 void UpdateSchema(const ROOT::Internal::RNTupleModelChangeset &changeset, ROOT::NTupleSize_t firstEntry) final;
0525 void UpdateExtraTypeInfo(const ROOT::RExtraTypeInfoDescriptor &extraTypeInfo) final;
0526
0527
0528
0529
0530
0531 [[nodiscard]] std::unique_ptr<RNTupleModel>
0532 InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters);
0533
0534 void CommitSuppressedColumn(ColumnHandle_t columnHandle) final;
0535 void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final;
0536 void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
0537 void CommitSealedPageV(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0538 RStagedCluster StageCluster(ROOT::NTupleSize_t nNewEntries) final;
0539 void CommitStagedClusters(std::span<RStagedCluster> clusters) final;
0540 void CommitClusterGroup() final;
0541 void CommitDatasetImpl() final;
0542 };
0543
0544
0545
0546
0547
0548
0549
0550
0551
0552
0553
0554
0555 class RPageSource : public RPageStorage {
0556 public:
0557
0558 struct REntryRange {
0559 ROOT::NTupleSize_t fFirstEntry = ROOT::kInvalidNTupleIndex;
0560 ROOT::NTupleSize_t fNEntries = 0;
0561
0562
0563 bool IntersectsWith(const ROOT::RClusterDescriptor &clusterDesc) const;
0564 };
0565
0566
0567 class RSharedDescriptorGuard {
0568 const ROOT::RNTupleDescriptor &fDescriptor;
0569 std::shared_mutex &fLock;
0570
0571 public:
0572 RSharedDescriptorGuard(const ROOT::RNTupleDescriptor &desc, std::shared_mutex &lock)
0573 : fDescriptor(desc), fLock(lock)
0574 {
0575 fLock.lock_shared();
0576 }
0577 RSharedDescriptorGuard(const RSharedDescriptorGuard &) = delete;
0578 RSharedDescriptorGuard &operator=(const RSharedDescriptorGuard &) = delete;
0579 RSharedDescriptorGuard(RSharedDescriptorGuard &&) = delete;
0580 RSharedDescriptorGuard &operator=(RSharedDescriptorGuard &&) = delete;
0581 ~RSharedDescriptorGuard() { fLock.unlock_shared(); }
0582 const ROOT::RNTupleDescriptor *operator->() const { return &fDescriptor; }
0583 const ROOT::RNTupleDescriptor &GetRef() const { return fDescriptor; }
0584 };
0585
0586
0587 class RExclDescriptorGuard {
0588 ROOT::RNTupleDescriptor &fDescriptor;
0589 std::shared_mutex &fLock;
0590
0591 public:
0592 RExclDescriptorGuard(ROOT::RNTupleDescriptor &desc, std::shared_mutex &lock) : fDescriptor(desc), fLock(lock)
0593 {
0594 fLock.lock();
0595 }
0596 RExclDescriptorGuard(const RExclDescriptorGuard &) = delete;
0597 RExclDescriptorGuard &operator=(const RExclDescriptorGuard &) = delete;
0598 RExclDescriptorGuard(RExclDescriptorGuard &&) = delete;
0599 RExclDescriptorGuard &operator=(RExclDescriptorGuard &&) = delete;
0600 ~RExclDescriptorGuard()
0601 {
0602 fDescriptor.IncGeneration();
0603 fLock.unlock();
0604 }
0605 ROOT::RNTupleDescriptor *operator->() const { return &fDescriptor; }
0606 void MoveIn(ROOT::RNTupleDescriptor desc) { fDescriptor = std::move(desc); }
0607 };
0608
0609 private:
0610 ROOT::RNTupleDescriptor fDescriptor;
0611 mutable std::shared_mutex fDescriptorLock;
0612 REntryRange fEntryRange;
0613 bool fHasStructure = false;
0614 bool fIsAttached = false;
0615 bool fHasStreamerInfosRegistered = false;
0616
0617
0618 ROOT::DescriptorId_t fLastUsedCluster = ROOT::kInvalidDescriptorId;
0619
0620
0621
0622 std::map<ROOT::NTupleSize_t, ROOT::DescriptorId_t> fPreloadedClusters;
0623
0624
0625
0626
0627 void UpdateLastUsedCluster(ROOT::DescriptorId_t clusterId);
0628
0629 protected:
0630
0631 struct RCounters {
0632 ROOT::Experimental::Detail::RNTupleAtomicCounter &fNReadV;
0633 ROOT::Experimental::Detail::RNTupleAtomicCounter &fNRead;
0634 ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzReadPayload;
0635 ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzReadOverhead;
0636 ROOT::Experimental::Detail::RNTupleAtomicCounter &fSzUnzip;
0637 ROOT::Experimental::Detail::RNTupleAtomicCounter &fNClusterLoaded;
0638 ROOT::Experimental::Detail::RNTupleAtomicCounter &fNPageRead;
0639 ROOT::Experimental::Detail::RNTupleAtomicCounter &fNPageUnsealed;
0640 ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallRead;
0641 ROOT::Experimental::Detail::RNTupleAtomicCounter &fTimeWallUnzip;
0642 ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuRead;
0643 ROOT::Experimental::Detail::RNTupleTickCounter<ROOT::Experimental::Detail::RNTupleAtomicCounter> &fTimeCpuUnzip;
0644 ROOT::Experimental::Detail::RNTupleCalcPerf &fBandwidthReadUncompressed;
0645 ROOT::Experimental::Detail::RNTupleCalcPerf &fBandwidthReadCompressed;
0646 ROOT::Experimental::Detail::RNTupleCalcPerf &fBandwidthUnzip;
0647 ROOT::Experimental::Detail::RNTupleCalcPerf &fFractionReadOverhead;
0648 ROOT::Experimental::Detail::RNTupleCalcPerf &fCompressionRatio;
0649 };
0650
0651
0652
0653 class RActivePhysicalColumns {
0654 public:
0655 struct RColumnInfo {
0656 ROOT::Internal::RColumnElementBase::RIdentifier fElementId;
0657 std::size_t fRefCounter = 0;
0658 };
0659
0660 private:
0661
0662
0663
0664
0665
0666 std::unordered_map<ROOT::DescriptorId_t, std::vector<RColumnInfo>> fColumnInfos;
0667
0668 public:
0669 void Insert(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId);
0670 void Erase(ROOT::DescriptorId_t physicalColumnId, ROOT::Internal::RColumnElementBase::RIdentifier elementId);
0671 ROOT::Internal::RCluster::ColumnSet_t ToColumnSet() const;
0672 bool HasColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
0673 {
0674 return fColumnInfos.count(physicalColumnId) > 0;
0675 }
0676 const std::vector<RColumnInfo> &GetColumnInfos(ROOT::DescriptorId_t physicalColumnId) const
0677 {
0678 return fColumnInfos.at(physicalColumnId);
0679 }
0680 };
0681
0682
0683
0684 struct RClusterInfo {
0685 ROOT::DescriptorId_t fClusterId = 0;
0686
0687 ROOT::RClusterDescriptor::RPageInfoExtended fPageInfo;
0688
0689 std::uint64_t fColumnOffset = 0;
0690 };
0691
0692 std::unique_ptr<RCounters> fCounters;
0693
0694 ROOT::RNTupleReadOptions fOptions;
0695
0696 RActivePhysicalColumns fActivePhysicalColumns;
0697
0698
0699 ROOT::Internal::RPagePool fPagePool;
0700
0701 virtual void LoadStructureImpl() = 0;
0702
0703 virtual ROOT::RNTupleDescriptor AttachImpl(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode) = 0;
0704
0705 virtual std::unique_ptr<RPageSource> CloneImpl() const = 0;
0706
0707 virtual void UnzipClusterImpl(ROOT::Internal::RCluster *cluster);
0708
0709 virtual ROOT::Internal::RPageRef
0710 LoadPageImpl(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo, ROOT::NTupleSize_t idxInCluster) = 0;
0711
0712
0713
0714
0715 void PrepareLoadCluster(
0716 const ROOT::Internal::RCluster::RKey &clusterKey, ROOT::Internal::ROnDiskPageMap &pageZeroMap,
0717 std::function<void(ROOT::DescriptorId_t, ROOT::NTupleSize_t, const ROOT::RClusterDescriptor::RPageInfo &)>
0718 perPageFunc);
0719
0720
0721
0722
0723
0724
0725
0726 void EnableDefaultMetrics(const std::string &prefix);
0727
0728
0729 RExclDescriptorGuard GetExclDescriptorGuard() { return RExclDescriptorGuard(fDescriptor, fDescriptorLock); }
0730
0731 public:
0732 RPageSource(std::string_view ntupleName, const ROOT::RNTupleReadOptions &fOptions);
0733 RPageSource(const RPageSource &) = delete;
0734 RPageSource &operator=(const RPageSource &) = delete;
0735 RPageSource(RPageSource &&) = delete;
0736 RPageSource &operator=(RPageSource &&) = delete;
0737 ~RPageSource() override;
0738
0739 static std::unique_ptr<RPageSource> Create(std::string_view ntupleName, std::string_view location,
0740 const ROOT::RNTupleReadOptions &options = ROOT::RNTupleReadOptions());
0741
0742
0743
0744 std::unique_ptr<RPageSource> Clone() const;
0745
0746
0747
0748
0749 RResult<ROOT::Internal::RPage> static UnsealPage(const RSealedPage &sealedPage,
0750 const ROOT::Internal::RColumnElementBase &element,
0751 ROOT::Internal::RPageAllocator &pageAlloc);
0752
0753 EPageStorageType GetType() final { return EPageStorageType::kSource; }
0754 const ROOT::RNTupleReadOptions &GetReadOptions() const { return fOptions; }
0755
0756
0757
0758
0759
0760
0761
0762 const RSharedDescriptorGuard GetSharedDescriptorGuard() const
0763 {
0764 return RSharedDescriptorGuard(fDescriptor, fDescriptorLock);
0765 }
0766
0767 ColumnHandle_t AddColumn(ROOT::DescriptorId_t fieldId, ROOT::Internal::RColumn &column) override;
0768 void DropColumn(ColumnHandle_t columnHandle) override;
0769
0770
0771
0772
0773
0774 void LoadStructure();
0775
0776 void Attach(ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode mode =
0777 ROOT::Internal::RNTupleSerializer::EDescriptorDeserializeMode::kForReading);
0778 ROOT::NTupleSize_t GetNEntries();
0779 ROOT::NTupleSize_t GetNElements(ColumnHandle_t columnHandle);
0780
0781
0782
0783 void SetEntryRange(const REntryRange &range);
0784 REntryRange GetEntryRange() const { return fEntryRange; }
0785
0786
0787
0788 virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, ROOT::NTupleSize_t globalIndex);
0789
0790
0791 virtual ROOT::Internal::RPageRef LoadPage(ColumnHandle_t columnHandle, RNTupleLocalIndex localIndex);
0792
0793
0794
0795
0796
0797
0798 virtual void
0799 LoadSealedPage(ROOT::DescriptorId_t physicalColumnId, RNTupleLocalIndex localIndex, RSealedPage &sealedPage) = 0;
0800
0801
0802
0803
0804
0805
0806
0807
0808 virtual std::vector<std::unique_ptr<ROOT::Internal::RCluster>>
0809 LoadClusters(std::span<ROOT::Internal::RCluster::RKey> clusterKeys) = 0;
0810
0811
0812
0813
0814
0815
0816 void UnzipCluster(ROOT::Internal::RCluster *cluster);
0817
0818
0819 RResult<ROOT::Internal::RPage>
0820 UnsealPage(const RSealedPage &sealedPage, const ROOT::Internal::RColumnElementBase &element);
0821
0822
0823
0824 void RegisterStreamerInfos();
0825 };
0826
0827 }
0828 }
0829
0830 #endif