Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:10:44

0001 /*************************************************************************
0002  * Copyright (C) 1995-2020, Rene Brun and Fons Rademakers.               *
0003  * All rights reserved.                                                  *
0004  *                                                                       *
0005  * For the licensing terms see $ROOTSYS/LICENSE.                         *
0006  * For the list of contributors see $ROOTSYS/README/CREDITS.             *
0007  *************************************************************************/
0008 
0009 #ifndef ROOT_RIoUring
0010 #define ROOT_RIoUring
0011 
0012 #include <cstdint>
0013 #include <cstring>
0014 #include <stdexcept>
0015 #include <string>
0016 
0017 #include <liburing.h>
0018 #include <liburing/io_uring.h>
0019 
0020 #include "TError.h"
0021 
0022 namespace ROOT {
0023 namespace Internal {
0024 
0025 class RIoUring {
0026 private:
0027    struct io_uring fRing;
0028    std::uint32_t fDepth = 0;
0029 
0030 public:
0031    // Create an io_uring instance. The ring selects an appropriate queue depth. which can be queried
0032    // afterwards using GetQueueDepth(). The depth is typically 1024 or lower. Throws an exception if
0033    // ring setup fails.
0034    RIoUring() {
0035       std::uint32_t queueDepth = 1024;
0036       int ret;
0037       while (true) {
0038          ret = io_uring_queue_init(queueDepth, &fRing, 0 /* no flags */);
0039          if (ret == 0) {
0040             fDepth = queueDepth;
0041             break; // ring setup succeeded
0042          }
0043          if (ret != -ENOMEM) {
0044             throw std::runtime_error("Error initializing io_uring: " + std::string(std::strerror(-ret)));
0045          }
0046          // try again with a smaller queue for ENOMEM
0047          queueDepth /= 2;
0048          if (queueDepth == 0) {
0049             throw std::runtime_error("Failed to allocate memory for the smallest possible "
0050                "io_uring instance. 'memlock' memory has been exhausted for this user");
0051          }
0052       }
0053    }
0054 
0055    // Create a io_uring instance that can hold at least `entriesHint` submission entries. The actual
0056    // queue depth is rounded up to the next power of 2. Throws an exception if ring setup fails.
0057    explicit RIoUring(std::uint32_t entriesHint) {
0058       struct io_uring_params params = {}; /* zero initialize param struct, no flags */
0059       int ret = io_uring_queue_init_params(entriesHint, &fRing, &params);
0060       if (ret != 0) {
0061          throw std::runtime_error("Error initializing io_uring: " + std::string(std::strerror(-ret)));
0062       }
0063       fDepth = params.sq_entries;
0064    }
0065 
0066    RIoUring(const RIoUring&) = delete;
0067    RIoUring& operator=(const RIoUring&) = delete;
0068 
0069    ~RIoUring() {
0070       // todo(max) try submitting any pending events before exiting
0071       io_uring_queue_exit(&fRing);
0072    }
0073 
0074    std::uint32_t GetQueueDepth() {
0075       return fDepth;
0076    }
0077 
0078    /// Access the raw io_uring instance.
0079    struct io_uring *GetRawRing() {
0080       return &fRing;
0081    }
0082 
0083    /// Basic read event composed of IO data and a target file descriptor.
0084    struct RReadEvent {
0085       /// The destination for reading
0086       void *fBuffer = nullptr;
0087       /// The file offset
0088       std::uint64_t fOffset = 0;
0089       /// The number of desired bytes
0090       std::size_t fSize = 0;
0091       /// The number of actually read bytes, set by the RIoUring instance
0092       std::size_t fOutBytes = 0;
0093       /// The file descriptor
0094       int fFileDes = -1;
0095    };
0096 
0097    /// Submit a number of read events and wait for completion. Events are submitted in batches if
0098    /// the number of events is larger than the submission queue depth.
0099    void SubmitReadsAndWait(RReadEvent* readEvents, unsigned int nReads) {
0100       unsigned int batch = 0;
0101       unsigned int batchSize = fDepth;
0102       unsigned int readPos = 0;
0103 
0104       while (readPos < nReads) {
0105          if (readPos + batchSize > nReads) {
0106             batchSize = nReads - readPos;
0107          }
0108          // prep reads
0109          struct io_uring_sqe *sqe;
0110          for (std::size_t i = readPos; i < readPos + batchSize; ++i) {
0111             sqe = io_uring_get_sqe(&fRing);
0112             if (!sqe) {
0113                throw std::runtime_error("batch " + std::to_string(batch) + ": "
0114                   + "get SQE failed for read request '" + std::to_string(i)
0115                   + "', error: " + std::string(strerror(errno)));
0116             }
0117             if (readEvents[i].fFileDes == -1) {
0118                throw std::runtime_error("batch " + std::to_string(batch) + ": "
0119                   + "bad fd (-1) for read request '" + std::to_string(i) + "'");
0120             }
0121             if (readEvents[i].fBuffer == nullptr) {
0122                throw std::runtime_error("batch " + std::to_string(batch) + ": "
0123                   + "null read buffer for read request '" + std::to_string(i) + "'");
0124             }
0125             io_uring_prep_read(sqe,
0126                readEvents[i].fFileDes,
0127                readEvents[i].fBuffer,
0128                readEvents[i].fSize,
0129                readEvents[i].fOffset
0130             );
0131             sqe->flags |= IOSQE_ASYNC; // maximize read event throughput
0132             sqe->user_data = i;
0133          }
0134 
0135          // todo(max) check for any difference between submit vs. submit and wait for large nReq
0136          int submitted = io_uring_submit_and_wait(&fRing, batchSize);
0137          if (submitted <= 0) {
0138             throw std::runtime_error("batch " + std::to_string(batch) + ": "
0139                "ring submit failed, error: " + std::string(strerror(errno)));
0140          }
0141          if (submitted != static_cast<int>(batchSize)) {
0142             throw std::runtime_error("ring submitted " + std::to_string(submitted) +
0143                " events but requested " + std::to_string(batchSize));
0144          }
0145          // reap reads
0146          struct io_uring_cqe *cqe;
0147          int ret;
0148          for (int i = 0; i < submitted; ++i) {
0149             ret = io_uring_wait_cqe(&fRing, &cqe);
0150             if (ret < 0) {
0151                throw std::runtime_error("wait cqe failed, error: " + std::string(std::strerror(-ret)));
0152             }
0153             auto index = reinterpret_cast<std::size_t>(io_uring_cqe_get_data(cqe));
0154             if (index >= nReads) {
0155                throw std::runtime_error("bad cqe user data: " + std::to_string(index));
0156             }
0157             if (cqe->res < 0) {
0158                throw std::runtime_error("batch " + std::to_string(batch) + ": "
0159                   + "read failed for ReadEvent[" + std::to_string(index) + "], "
0160                   "error: " + std::string(std::strerror(-cqe->res)));
0161             }
0162             readEvents[index].fOutBytes = static_cast<std::size_t>(cqe->res);
0163             io_uring_cqe_seen(&fRing, cqe);
0164          }
0165          readPos += batchSize;
0166          batch += 1;
0167       }
0168       return;
0169    }
0170 };
0171 
0172 } // namespace Internal
0173 } // namespace ROOT
0174 
0175 #endif