File indexing completed on 2025-01-18 10:10:44
0001
0002
0003
0004
0005
0006
0007
0008
0009 #ifndef ROOT_RIoUring
0010 #define ROOT_RIoUring
0011
0012 #include <cstdint>
0013 #include <cstring>
0014 #include <stdexcept>
0015 #include <string>
0016
0017 #include <liburing.h>
0018 #include <liburing/io_uring.h>
0019
0020 #include "TError.h"
0021
0022 namespace ROOT {
0023 namespace Internal {
0024
0025 class RIoUring {
0026 private:
0027 struct io_uring fRing;
0028 std::uint32_t fDepth = 0;
0029
0030 public:
0031
0032
0033
0034 RIoUring() {
0035 std::uint32_t queueDepth = 1024;
0036 int ret;
0037 while (true) {
0038 ret = io_uring_queue_init(queueDepth, &fRing, 0 );
0039 if (ret == 0) {
0040 fDepth = queueDepth;
0041 break;
0042 }
0043 if (ret != -ENOMEM) {
0044 throw std::runtime_error("Error initializing io_uring: " + std::string(std::strerror(-ret)));
0045 }
0046
0047 queueDepth /= 2;
0048 if (queueDepth == 0) {
0049 throw std::runtime_error("Failed to allocate memory for the smallest possible "
0050 "io_uring instance. 'memlock' memory has been exhausted for this user");
0051 }
0052 }
0053 }
0054
0055
0056
0057 explicit RIoUring(std::uint32_t entriesHint) {
0058 struct io_uring_params params = {};
0059 int ret = io_uring_queue_init_params(entriesHint, &fRing, ¶ms);
0060 if (ret != 0) {
0061 throw std::runtime_error("Error initializing io_uring: " + std::string(std::strerror(-ret)));
0062 }
0063 fDepth = params.sq_entries;
0064 }
0065
0066 RIoUring(const RIoUring&) = delete;
0067 RIoUring& operator=(const RIoUring&) = delete;
0068
0069 ~RIoUring() {
0070
0071 io_uring_queue_exit(&fRing);
0072 }
0073
0074 std::uint32_t GetQueueDepth() {
0075 return fDepth;
0076 }
0077
0078
0079 struct io_uring *GetRawRing() {
0080 return &fRing;
0081 }
0082
0083
0084 struct RReadEvent {
0085
0086 void *fBuffer = nullptr;
0087
0088 std::uint64_t fOffset = 0;
0089
0090 std::size_t fSize = 0;
0091
0092 std::size_t fOutBytes = 0;
0093
0094 int fFileDes = -1;
0095 };
0096
0097
0098
0099 void SubmitReadsAndWait(RReadEvent* readEvents, unsigned int nReads) {
0100 unsigned int batch = 0;
0101 unsigned int batchSize = fDepth;
0102 unsigned int readPos = 0;
0103
0104 while (readPos < nReads) {
0105 if (readPos + batchSize > nReads) {
0106 batchSize = nReads - readPos;
0107 }
0108
0109 struct io_uring_sqe *sqe;
0110 for (std::size_t i = readPos; i < readPos + batchSize; ++i) {
0111 sqe = io_uring_get_sqe(&fRing);
0112 if (!sqe) {
0113 throw std::runtime_error("batch " + std::to_string(batch) + ": "
0114 + "get SQE failed for read request '" + std::to_string(i)
0115 + "', error: " + std::string(strerror(errno)));
0116 }
0117 if (readEvents[i].fFileDes == -1) {
0118 throw std::runtime_error("batch " + std::to_string(batch) + ": "
0119 + "bad fd (-1) for read request '" + std::to_string(i) + "'");
0120 }
0121 if (readEvents[i].fBuffer == nullptr) {
0122 throw std::runtime_error("batch " + std::to_string(batch) + ": "
0123 + "null read buffer for read request '" + std::to_string(i) + "'");
0124 }
0125 io_uring_prep_read(sqe,
0126 readEvents[i].fFileDes,
0127 readEvents[i].fBuffer,
0128 readEvents[i].fSize,
0129 readEvents[i].fOffset
0130 );
0131 sqe->flags |= IOSQE_ASYNC;
0132 sqe->user_data = i;
0133 }
0134
0135
0136 int submitted = io_uring_submit_and_wait(&fRing, batchSize);
0137 if (submitted <= 0) {
0138 throw std::runtime_error("batch " + std::to_string(batch) + ": "
0139 "ring submit failed, error: " + std::string(strerror(errno)));
0140 }
0141 if (submitted != static_cast<int>(batchSize)) {
0142 throw std::runtime_error("ring submitted " + std::to_string(submitted) +
0143 " events but requested " + std::to_string(batchSize));
0144 }
0145
0146 struct io_uring_cqe *cqe;
0147 int ret;
0148 for (int i = 0; i < submitted; ++i) {
0149 ret = io_uring_wait_cqe(&fRing, &cqe);
0150 if (ret < 0) {
0151 throw std::runtime_error("wait cqe failed, error: " + std::string(std::strerror(-ret)));
0152 }
0153 auto index = reinterpret_cast<std::size_t>(io_uring_cqe_get_data(cqe));
0154 if (index >= nReads) {
0155 throw std::runtime_error("bad cqe user data: " + std::to_string(index));
0156 }
0157 if (cqe->res < 0) {
0158 throw std::runtime_error("batch " + std::to_string(batch) + ": "
0159 + "read failed for ReadEvent[" + std::to_string(index) + "], "
0160 "error: " + std::string(std::strerror(-cqe->res)));
0161 }
0162 readEvents[index].fOutBytes = static_cast<std::size_t>(cqe->res);
0163 io_uring_cqe_seen(&fRing, cqe);
0164 }
0165 readPos += batchSize;
0166 batch += 1;
0167 }
0168 return;
0169 }
0170 };
0171
0172 }
0173 }
0174
0175 #endif