Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-30 10:10:12

0001 /**
0002  * Copyright (c) 2018-present, Facebook, Inc.
0003  * All rights reserved.
0004  *
0005  * This source code is licensed under the BSD-style license found in the
0006  * LICENSE file in the root directory of this source tree.
0007  */
0008 
0009 #pragma once
0010 
0011 #include <deque>
0012 #include <memory>
0013 #include <mutex>
0014 #include <tuple>
0015 #include <unordered_map>
0016 #include <unordered_set>
0017 
0018 #include "gloo/common/memory.h"
0019 #include "gloo/common/store.h"
0020 #include "gloo/transport/context.h"
0021 
0022 namespace gloo {
0023 namespace transport {
0024 namespace tcp {
0025 
0026 // Forward declaration
0027 class Context;
0028 class Device;
0029 class Pair;
0030 class UnboundBuffer;
0031 
0032 class Context : public ::gloo::transport::Context,
0033                       public std::enable_shared_from_this<Context> {
0034  public:
0035   Context(std::shared_ptr<Device> device, int rank, int size);
0036 
0037   virtual ~Context();
0038 
0039   virtual void createAndConnectAllPairs(IStore& store) override;
0040 
0041   std::unique_ptr<transport::Pair>& createPair(int rank) override;
0042 
0043   std::unique_ptr<transport::UnboundBuffer> createUnboundBuffer(
0044       void* ptr,
0045       size_t size) override;
0046 
0047  protected:
0048   std::shared_ptr<Device> device_;
0049 
0050   using pendingRecvTuple = std::tuple<
0051       WeakNonOwningPtr<UnboundBuffer>,
0052       size_t,
0053       size_t,
0054       std::unordered_set<int>>;
0055 
0056   // Buffers with pending receive operation by slot.
0057   std::unordered_map<uint64_t, std::deque<pendingRecvTuple>> pendingRecv_;
0058 
0059   // This function registers the specified unbound buffer for a receive
0060   // operation from any of the specified ranks.
0061   void recvFromAny(
0062       UnboundBuffer* buf,
0063       uint64_t slot,
0064       size_t offset,
0065       size_t nbytes,
0066       std::vector<int> srcRanks);
0067 
0068   int recvFromAnyFindRank(
0069       UnboundBuffer* buf,
0070       uint64_t slot,
0071       size_t offset,
0072       size_t nbytes,
0073       const std::vector<int>& srcRanks);
0074 
0075   // Allowed to be called only by ContextMutator::findRecvFromAny,
0076   // where the context lock is already held.
0077   bool findRecvFromAny(
0078       uint64_t slot,
0079       int rank,
0080       WeakNonOwningPtr<tcp::UnboundBuffer>* buf,
0081       size_t* offset,
0082       size_t* nbytes);
0083 
0084   // Set exception on every pair in this context. This is called when
0085   // waiting for a send or recv operation on an unbound buffer times
0086   // out. All pairs should be signaled and closed in that event.
0087   void signalException(const std::string& msg);
0088 
0089   friend class ContextMutator;
0090 
0091   friend class UnboundBuffer;
0092 
0093   friend class Pair;
0094 };
0095 
0096 struct Rank {
0097   std::string hostname;
0098   std::vector<char> addressBytes;
0099   std::vector<ssize_t> pairIdentifiers;
0100 
0101   explicit Rank(
0102       const std::string& hostname,
0103       const std::vector<char>& addrBytes,
0104       const std::vector<ssize_t>& pairIdentifiers)
0105       : hostname(hostname),
0106         addressBytes(addrBytes),
0107         pairIdentifiers(pairIdentifiers) {}
0108   explicit Rank(const std::vector<char>& bytes);
0109 
0110   std::vector<char> bytes() const;
0111 };
0112 
0113 } // namespace tcp
0114 } // namespace transport
0115 } // namespace gloo