File indexing completed on 2025-01-30 10:10:13
0001
0002
0003
0004
0005
0006
0007
0008
0009 #pragma once
0010
0011 #include <functional>
0012 #include <memory>
0013
0014 #include <gloo/transport/tcp/error.h>
0015 #include <gloo/transport/tcp/loop.h>
0016 #include <gloo/transport/tcp/socket.h>
0017
0018 namespace gloo {
0019 namespace transport {
0020 namespace tcp {
0021
0022
0023
0024
0025
0026
0027
0028 template <typename T>
0029 class ReadValueOperation final
0030 : public Handler,
0031 public std::enable_shared_from_this<ReadValueOperation<T>> {
0032 public:
0033 using callback_t =
0034 std::function<void(std::shared_ptr<Socket>, const Error& error, T&& t)>;
0035
0036 ReadValueOperation(
0037 std::shared_ptr<Loop> loop,
0038 std::shared_ptr<Socket> socket,
0039 callback_t fn)
0040 : loop_(std::move(loop)),
0041 socket_(std::move(socket)),
0042 fn_(std::move(fn)) {}
0043
0044 void run() {
0045
0046
0047
0048 leak_ = this->shared_from_this();
0049
0050
0051 loop_->registerDescriptor(socket_->fd(), EPOLLIN | EPOLLONESHOT, this);
0052 }
0053
0054 void handleEvents(int events) override {
0055
0056
0057 auto self = std::move(this->leak_);
0058
0059
0060 auto rv = socket_->read(&t_, sizeof(t_));
0061 if (rv == -1) {
0062 fn_(socket_, SystemError("read", errno), std::move(t_));
0063 return;
0064 }
0065
0066
0067 if (rv < sizeof(t_)) {
0068 fn_(socket_, ShortReadError(rv, sizeof(t_)), std::move(t_));
0069 return;
0070 }
0071
0072 fn_(socket_, Error::kSuccess, std::move(t_));
0073 }
0074
0075 private:
0076 std::shared_ptr<Loop> loop_;
0077 std::shared_ptr<Socket> socket_;
0078 callback_t fn_;
0079 std::shared_ptr<ReadValueOperation<T>> leak_;
0080
0081 T t_;
0082 };
0083
0084 template <typename T>
0085 void read(
0086 std::shared_ptr<Loop> loop,
0087 std::shared_ptr<Socket> socket,
0088 typename ReadValueOperation<T>::callback_t fn) {
0089 auto x = std::make_shared<ReadValueOperation<T>>(
0090 std::move(loop), std::move(socket), std::move(fn));
0091 x->run();
0092 }
0093
0094
0095
0096
0097
0098
0099
0100 template <typename T>
0101 class WriteValueOperation final
0102 : public Handler,
0103 public std::enable_shared_from_this<WriteValueOperation<T>> {
0104 public:
0105 using callback_t =
0106 std::function<void(std::shared_ptr<Socket>, const Error& error)>;
0107
0108 WriteValueOperation(
0109 std::shared_ptr<Loop> loop,
0110 std::shared_ptr<Socket> socket,
0111 T t,
0112 callback_t fn)
0113 : loop_(std::move(loop)),
0114 socket_(std::move(socket)),
0115 fn_(std::move(fn)),
0116 t_(std::move(t)) {}
0117
0118 void run() {
0119
0120
0121
0122 leak_ = this->shared_from_this();
0123
0124
0125 loop_->registerDescriptor(socket_->fd(), EPOLLOUT | EPOLLONESHOT, this);
0126 }
0127
0128 void handleEvents(int events) override {
0129
0130
0131 auto leak = std::move(this->leak_);
0132
0133
0134 auto rv = socket_->write(&t_, sizeof(t_));
0135 if (rv == -1) {
0136 fn_(socket_, SystemError("write", errno));
0137 return;
0138 }
0139
0140
0141 if (rv < sizeof(t_)) {
0142 fn_(socket_, ShortWriteError(rv, sizeof(t_)));
0143 return;
0144 }
0145
0146 fn_(socket_, Error::kSuccess);
0147 }
0148
0149 private:
0150 std::shared_ptr<Loop> loop_;
0151 std::shared_ptr<Socket> socket_;
0152 callback_t fn_;
0153 std::shared_ptr<WriteValueOperation<T>> leak_;
0154
0155 T t_;
0156 };
0157
0158 template <typename T>
0159 void write(
0160 std::shared_ptr<Loop> loop,
0161 std::shared_ptr<Socket> socket,
0162 T t,
0163 typename WriteValueOperation<T>::callback_t fn) {
0164 auto x = std::make_shared<WriteValueOperation<T>>(
0165 std::move(loop), std::move(socket), std::move(t), std::move(fn));
0166 x->run();
0167 }
0168
0169 }
0170 }
0171 }