Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:10:13

0001 /**
0002  * Copyright (c) 2017-present, Facebook, Inc.
0003  * All rights reserved.
0004  *
0005  * This source code is licensed under the BSD-style license found in the
0006  * LICENSE file in the root directory of this source tree.
0007  */
0008 
0009 #pragma once
0010 
0011 #include <functional>
0012 #include <memory>
0013 
0014 #include <gloo/transport/tcp/error.h>
0015 #include <gloo/transport/tcp/loop.h>
0016 #include <gloo/transport/tcp/socket.h>
0017 
0018 namespace gloo {
0019 namespace transport {
0020 namespace tcp {
0021 
0022 // ReadValueOperation asynchronously reads a value of type T from the
0023 // socket specified at construction. Upon completion or error, the
0024 // callback is called. Its lifetime is coupled with completion of the
0025 // operation, so the called doesn't need to hold on to the instance.
0026 // It does so by storing a shared_ptr to itself (effectively a leak)
0027 // until the event loop calls back.
0028 template <typename T>
0029 class ReadValueOperation final
0030     : public Handler,
0031       public std::enable_shared_from_this<ReadValueOperation<T>> {
0032  public:
0033   using callback_t =
0034       std::function<void(std::shared_ptr<Socket>, const Error& error, T&& t)>;
0035 
0036   ReadValueOperation(
0037       std::shared_ptr<Loop> loop,
0038       std::shared_ptr<Socket> socket,
0039       callback_t fn)
0040       : loop_(std::move(loop)),
0041         socket_(std::move(socket)),
0042         fn_(std::move(fn)) {}
0043 
0044   void run() {
0045     // Cannot initialize leak until after the object has been
0046     // constructed, because the std::make_shared initialization
0047     // doesn't run after construction of the underlying object.
0048     leak_ = this->shared_from_this();
0049     // Register with loop only after we've leaked the shared_ptr,
0050     // because we unleak it when the event loop thread calls.
0051     loop_->registerDescriptor(socket_->fd(), EPOLLIN | EPOLLONESHOT, this);
0052   }
0053 
0054   void handleEvents(int events) override {
0055     // Move leaked shared_ptr to the stack so that this object
0056     // destroys itself once this function returns.
0057     auto self = std::move(this->leak_);
0058 
0059     // Read T.
0060     auto rv = socket_->read(&t_, sizeof(t_));
0061     if (rv == -1) {
0062       fn_(socket_, SystemError("read", errno), std::move(t_));
0063       return;
0064     }
0065 
0066     // Check for short read (assume we can read in a single call).
0067     if (rv < sizeof(t_)) {
0068       fn_(socket_, ShortReadError(rv, sizeof(t_)), std::move(t_));
0069       return;
0070     }
0071 
0072     fn_(socket_, Error::kSuccess, std::move(t_));
0073   }
0074 
0075  private:
0076   std::shared_ptr<Loop> loop_;
0077   std::shared_ptr<Socket> socket_;
0078   callback_t fn_;
0079   std::shared_ptr<ReadValueOperation<T>> leak_;
0080 
0081   T t_;
0082 };
0083 
0084 template <typename T>
0085 void read(
0086     std::shared_ptr<Loop> loop,
0087     std::shared_ptr<Socket> socket,
0088     typename ReadValueOperation<T>::callback_t fn) {
0089   auto x = std::make_shared<ReadValueOperation<T>>(
0090       std::move(loop), std::move(socket), std::move(fn));
0091   x->run();
0092 }
0093 
0094 // WriteValueOperation asynchronously writes a value of type T to the
0095 // socket specified at construction. Upon completion or error, the
0096 // callback is called. Its lifetime is coupled with completion of the
0097 // operation, so the called doesn't need to hold on to the instance.
0098 // It does so by storing a shared_ptr to itself (effectively a leak)
0099 // until the event loop calls back.
0100 template <typename T>
0101 class WriteValueOperation final
0102     : public Handler,
0103       public std::enable_shared_from_this<WriteValueOperation<T>> {
0104  public:
0105   using callback_t =
0106       std::function<void(std::shared_ptr<Socket>, const Error& error)>;
0107 
0108   WriteValueOperation(
0109       std::shared_ptr<Loop> loop,
0110       std::shared_ptr<Socket> socket,
0111       T t,
0112       callback_t fn)
0113       : loop_(std::move(loop)),
0114         socket_(std::move(socket)),
0115         fn_(std::move(fn)),
0116         t_(std::move(t)) {}
0117 
0118   void run() {
0119     // Cannot initialize leak until after the object has been
0120     // constructed, because the std::make_shared initialization
0121     // doesn't run after construction of the underlying object.
0122     leak_ = this->shared_from_this();
0123     // Register with loop only after we've leaked the shared_ptr,
0124     // because we unleak it when the event loop thread calls.
0125     loop_->registerDescriptor(socket_->fd(), EPOLLOUT | EPOLLONESHOT, this);
0126   }
0127 
0128   void handleEvents(int events) override {
0129     // Move leaked shared_ptr to the stack so that this object
0130     // destroys itself once this function returns.
0131     auto leak = std::move(this->leak_);
0132 
0133     // Write T.
0134     auto rv = socket_->write(&t_, sizeof(t_));
0135     if (rv == -1) {
0136       fn_(socket_, SystemError("write", errno));
0137       return;
0138     }
0139 
0140     // Check for short write (assume we can write in a single call).
0141     if (rv < sizeof(t_)) {
0142       fn_(socket_, ShortWriteError(rv, sizeof(t_)));
0143       return;
0144     }
0145 
0146     fn_(socket_, Error::kSuccess);
0147   }
0148 
0149  private:
0150   std::shared_ptr<Loop> loop_;
0151   std::shared_ptr<Socket> socket_;
0152   callback_t fn_;
0153   std::shared_ptr<WriteValueOperation<T>> leak_;
0154 
0155   T t_;
0156 };
0157 
0158 template <typename T>
0159 void write(
0160     std::shared_ptr<Loop> loop,
0161     std::shared_ptr<Socket> socket,
0162     T t,
0163     typename WriteValueOperation<T>::callback_t fn) {
0164   auto x = std::make_shared<WriteValueOperation<T>>(
0165       std::move(loop), std::move(socket), std::move(t), std::move(fn));
0166   x->run();
0167 }
0168 
0169 } // namespace tcp
0170 } // namespace transport
0171 } // namespace gloo