File indexing completed on 2025-01-30 10:22:39
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016 #ifndef ROOT7_RPageStorageDaos
0017 #define ROOT7_RPageStorageDaos
0018
0019 #include <ROOT/RError.hxx>
0020 #include <ROOT/RPageStorage.hxx>
0021 #include <ROOT/RNTuple.hxx>
0022 #include <ROOT/RNTupleSerialize.hxx>
0023 #include <ROOT/RNTupleZip.hxx>
0024 #include <string_view>
0025
0026 #include <array>
0027 #include <atomic>
0028 #include <cstdio>
0029 #include <memory>
0030 #include <string>
0031 #include <optional>
0032
0033 namespace ROOT {
0034 namespace Experimental {
0035
0036 namespace Internal {
0037 using ntuple_index_t = std::uint32_t;
0038 class RCluster;
0039 class RClusterPool;
0040 class RDaosPool;
0041 class RDaosContainer;
0042 class RPageAllocatorHeap;
0043 class RPagePool;
0044 enum EDaosLocatorFlags {
0045
0046 kCagedPage = 0x01,
0047 };
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059 struct RDaosNTupleAnchor {
0060
0061 std::uint64_t fVersionAnchor = 1;
0062
0063 std::uint16_t fVersionEpoch = RNTuple::kVersionEpoch;
0064 std::uint16_t fVersionMajor = RNTuple::kVersionMajor;
0065 std::uint16_t fVersionMinor = RNTuple::kVersionMinor;
0066 std::uint16_t fVersionPatch = RNTuple::kVersionPatch;
0067
0068 std::uint32_t fNBytesHeader = 0;
0069
0070 std::uint32_t fLenHeader = 0;
0071
0072 std::uint32_t fNBytesFooter = 0;
0073
0074 std::uint32_t fLenFooter = 0;
0075
0076 std::string fObjClass{};
0077
0078 bool operator ==(const RDaosNTupleAnchor &other) const {
0079 return fVersionAnchor == other.fVersionAnchor && fVersionEpoch == other.fVersionEpoch &&
0080 fVersionMajor == other.fVersionMajor && fVersionMinor == other.fVersionMinor &&
0081 fVersionPatch == other.fVersionPatch && fNBytesHeader == other.fNBytesHeader &&
0082 fLenHeader == other.fLenHeader && fNBytesFooter == other.fNBytesFooter && fLenFooter == other.fLenFooter &&
0083 fObjClass == other.fObjClass;
0084 }
0085
0086 std::uint32_t Serialize(void *buffer) const;
0087 RResult<std::uint32_t> Deserialize(const void *buffer, std::uint32_t bufSize);
0088
0089 static std::uint32_t GetSize();
0090 };
0091
0092
0093
0094
0095
0096
0097
0098
0099
0100
0101
0102 class RPageSinkDaos : public RPagePersistentSink {
0103 private:
0104 std::unique_ptr<RPageAllocatorHeap> fPageAllocator;
0105
0106
0107
0108
0109
0110 std::unique_ptr<RDaosContainer> fDaosContainer;
0111
0112 std::atomic<std::uint64_t> fPageId{0};
0113
0114 std::atomic<std::uint64_t> fClusterGroupId{0};
0115
0116 std::string fURI;
0117
0118 std::uint64_t fNBytesCurrentCluster{0};
0119
0120 RDaosNTupleAnchor fNTupleAnchor;
0121 ntuple_index_t fNTupleIndex{0};
0122 uint32_t fCageSizeLimit{};
0123
0124 protected:
0125 using RPagePersistentSink::InitImpl;
0126 void InitImpl(unsigned char *serializedHeader, std::uint32_t length) final;
0127 RNTupleLocator CommitPageImpl(ColumnHandle_t columnHandle, const RPage &page) final;
0128 RNTupleLocator
0129 CommitSealedPageImpl(DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
0130 std::vector<RNTupleLocator> CommitSealedPageVImpl(std::span<RPageStorage::RSealedPageGroup> ranges) final;
0131 std::uint64_t CommitClusterImpl() final;
0132 RNTupleLocator CommitClusterGroupImpl(unsigned char *serializedPageList, std::uint32_t length) final;
0133 void CommitDatasetImpl(unsigned char *serializedFooter, std::uint32_t length) final;
0134 void WriteNTupleHeader(const void *data, size_t nbytes, size_t lenHeader);
0135 void WriteNTupleFooter(const void *data, size_t nbytes, size_t lenFooter);
0136 void WriteNTupleAnchor();
0137
0138 public:
0139 RPageSinkDaos(std::string_view ntupleName, std::string_view uri, const RNTupleWriteOptions &options);
0140 ~RPageSinkDaos() override;
0141
0142 RPage ReservePage(ColumnHandle_t columnHandle, std::size_t nElements) final;
0143 void ReleasePage(RPage &page) final;
0144 };
0145
0146
0147
0148
0149
0150
0151
0152
0153 class RPageSourceDaos : public RPageSource {
0154 private:
0155
0156
0157 struct RClusterInfo {
0158 DescriptorId_t fClusterId = 0;
0159
0160 RClusterDescriptor::RPageRange::RPageInfoExtended fPageInfo;
0161
0162 std::uint64_t fColumnOffset = 0;
0163 };
0164
0165 ntuple_index_t fNTupleIndex{0};
0166
0167
0168 std::shared_ptr<RPagePool> fPagePool;
0169
0170 RCluster *fCurrentCluster = nullptr;
0171
0172 std::unique_ptr<RDaosContainer> fDaosContainer;
0173
0174 std::string fURI;
0175
0176 std::unique_ptr<RClusterPool> fClusterPool;
0177
0178 RNTupleDescriptorBuilder fDescriptorBuilder;
0179
0180 RPage PopulatePageFromCluster(ColumnHandle_t columnHandle, const RClusterInfo &clusterInfo,
0181 ClusterSize_t::ValueType idxInCluster);
0182
0183 protected:
0184 RNTupleDescriptor AttachImpl() final;
0185 void UnzipClusterImpl(RCluster *cluster) final;
0186
0187 public:
0188 RPageSourceDaos(std::string_view ntupleName, std::string_view uri, const RNTupleReadOptions &options);
0189
0190
0191 std::unique_ptr<RPageSource> Clone() const final;
0192 ~RPageSourceDaos() override;
0193
0194 RPage PopulatePage(ColumnHandle_t columnHandle, NTupleSize_t globalIndex) final;
0195 RPage PopulatePage(ColumnHandle_t columnHandle, RClusterIndex clusterIndex) final;
0196 void ReleasePage(RPage &page) final;
0197
0198 void LoadSealedPage(DescriptorId_t physicalColumnId, RClusterIndex clusterIndex, RSealedPage &sealedPage) final;
0199
0200 std::vector<std::unique_ptr<RCluster>> LoadClusters(std::span<RCluster::RKey> clusterKeys) final;
0201
0202
0203 std::string GetObjectClass() const;
0204 };
0205
0206 }
0207
0208 }
0209 }
0210
0211 #endif