File indexing completed on 2025-10-31 09:04:31
0001 
0002 
0003 
0004 
0005 
0006 
0007 
0008 
0009 #pragma once
0010 
0011 #include <functional>
0012 #include <memory>
0013 
0014 #include <gloo/transport/tcp/error.h>
0015 #include <gloo/transport/tcp/loop.h>
0016 #include <gloo/transport/tcp/socket.h>
0017 
0018 namespace gloo {
0019 namespace transport {
0020 namespace tcp {
0021 
0022 
0023 
0024 
0025 
0026 
0027 
0028 template <typename T>
0029 class ReadValueOperation final
0030     : public Handler,
0031       public std::enable_shared_from_this<ReadValueOperation<T>> {
0032  public:
0033   using callback_t =
0034       std::function<void(std::shared_ptr<Socket>, const Error& error, T&& t)>;
0035 
0036   ReadValueOperation(
0037       std::shared_ptr<Loop> loop,
0038       std::shared_ptr<Socket> socket,
0039       callback_t fn)
0040       : loop_(std::move(loop)),
0041         socket_(std::move(socket)),
0042         fn_(std::move(fn)) {}
0043 
0044   void run() {
0045     
0046     
0047     
0048     leak_ = this->shared_from_this();
0049     
0050     
0051     loop_->registerDescriptor(socket_->fd(), EPOLLIN | EPOLLONESHOT, this);
0052   }
0053 
0054   void handleEvents(int events) override {
0055     
0056     
0057     auto self = std::move(this->leak_);
0058 
0059     
0060     auto rv = socket_->read(&t_, sizeof(t_));
0061     if (rv == -1) {
0062       fn_(socket_, SystemError("read", errno), std::move(t_));
0063       return;
0064     }
0065 
0066     
0067     if (rv < sizeof(t_)) {
0068       fn_(socket_, ShortReadError(rv, sizeof(t_)), std::move(t_));
0069       return;
0070     }
0071 
0072     fn_(socket_, Error::kSuccess, std::move(t_));
0073   }
0074 
0075  private:
0076   std::shared_ptr<Loop> loop_;
0077   std::shared_ptr<Socket> socket_;
0078   callback_t fn_;
0079   std::shared_ptr<ReadValueOperation<T>> leak_;
0080 
0081   T t_;
0082 };
0083 
0084 template <typename T>
0085 void read(
0086     std::shared_ptr<Loop> loop,
0087     std::shared_ptr<Socket> socket,
0088     typename ReadValueOperation<T>::callback_t fn) {
0089   auto x = std::make_shared<ReadValueOperation<T>>(
0090       std::move(loop), std::move(socket), std::move(fn));
0091   x->run();
0092 }
0093 
0094 
0095 
0096 
0097 
0098 
0099 
0100 template <typename T>
0101 class WriteValueOperation final
0102     : public Handler,
0103       public std::enable_shared_from_this<WriteValueOperation<T>> {
0104  public:
0105   using callback_t =
0106       std::function<void(std::shared_ptr<Socket>, const Error& error)>;
0107 
0108   WriteValueOperation(
0109       std::shared_ptr<Loop> loop,
0110       std::shared_ptr<Socket> socket,
0111       T t,
0112       callback_t fn)
0113       : loop_(std::move(loop)),
0114         socket_(std::move(socket)),
0115         fn_(std::move(fn)),
0116         t_(std::move(t)) {}
0117 
0118   void run() {
0119     
0120     
0121     
0122     leak_ = this->shared_from_this();
0123     
0124     
0125     loop_->registerDescriptor(socket_->fd(), EPOLLOUT | EPOLLONESHOT, this);
0126   }
0127 
0128   void handleEvents(int events) override {
0129     
0130     
0131     auto leak = std::move(this->leak_);
0132 
0133     
0134     auto rv = socket_->write(&t_, sizeof(t_));
0135     if (rv == -1) {
0136       fn_(socket_, SystemError("write", errno));
0137       return;
0138     }
0139 
0140     
0141     if (rv < sizeof(t_)) {
0142       fn_(socket_, ShortWriteError(rv, sizeof(t_)));
0143       return;
0144     }
0145 
0146     fn_(socket_, Error::kSuccess);
0147   }
0148 
0149  private:
0150   std::shared_ptr<Loop> loop_;
0151   std::shared_ptr<Socket> socket_;
0152   callback_t fn_;
0153   std::shared_ptr<WriteValueOperation<T>> leak_;
0154 
0155   T t_;
0156 };
0157 
0158 template <typename T>
0159 void write(
0160     std::shared_ptr<Loop> loop,
0161     std::shared_ptr<Socket> socket,
0162     T t,
0163     typename WriteValueOperation<T>::callback_t fn) {
0164   auto x = std::make_shared<WriteValueOperation<T>>(
0165       std::move(loop), std::move(socket), std::move(t), std::move(fn));
0166   x->run();
0167 }
0168 
0169 } 
0170 } 
0171 }