Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:00:11

0001 /**
0002  * Copyright (c) 2017-present, Facebook, Inc.
0003  * All rights reserved.
0004  *
0005  * This source code is licensed under the BSD-style license found in the
0006  * LICENSE file in the root directory of this source tree.
0007  */
0008 
0009 #pragma once
0010 
0011 #include <stddef.h>
0012 #include <string.h>
0013 
0014 #include "gloo/algorithm.h"
0015 #include "gloo/context.h"
0016 
0017 namespace gloo {
0018 
0019 template <typename T>
0020 class AllreduceRingChunked : public Algorithm {
0021  public:
0022   AllreduceRingChunked(
0023       const std::shared_ptr<Context>& context,
0024       const std::vector<T*>& ptrs,
0025       const int count,
0026       const ReductionFunction<T>* fn = ReductionFunction<T>::sum)
0027       : Algorithm(context),
0028         ptrs_(ptrs),
0029         count_(count),
0030         bytes_(count_ * sizeof(T)),
0031         fn_(fn) {
0032     // Use chunks of no less than 1024 bytes (256 * sizeof(float))
0033     constexpr unsigned long minSize = 256;
0034     chunks_ = this->contextSize_ * 2;
0035 #ifdef _WIN32
0036   chunkSize_ = std::max((size_t)minSize, (size_t)((count_ + chunks_ - 1) / chunks_));
0037 #else
0038   chunkSize_ = std::max(minSize, (count_ + chunks_ - 1) / chunks_);
0039 #endif
0040     chunkBytes_ = chunkSize_ * sizeof(T);
0041 
0042     // Allocate inboxes
0043     for (int i = 0; i < 2; i++) {
0044       inbox_[i] = static_cast<T*>(malloc(bytes_));
0045     }
0046 
0047     if (count_ == 0 || this->contextSize_ == 1) {
0048       return;
0049     }
0050 
0051     auto& leftPair = this->getLeftPair();
0052     auto& rightPair = this->getRightPair();
0053     for (int i = 0; i < 2; i++) {
0054       auto slot = this->context_->nextSlot();
0055 
0056       // Buffer to send to (rank+1).
0057       sendDataBuf_[i] =
0058         rightPair->createSendBuffer(slot, ptrs_[0], bytes_);
0059       // Buffer that (rank-1) writes to.
0060       recvDataBuf_[i] =
0061         leftPair->createRecvBuffer(slot, inbox_[i], chunkBytes_);
0062     }
0063 
0064     // Dummy buffers for localized barrier.
0065     // Before sending to the right, we only need to know that the node
0066     // on the right is done using the inbox that's about to be written
0067     // into. No need for a global barrier.
0068     auto notificationSlot = this->context_->nextSlot();
0069     sendNotificationBuf_ =
0070       leftPair->createSendBuffer(notificationSlot, &dummy_, sizeof(dummy_));
0071     recvNotificationBuf_ =
0072       rightPair->createRecvBuffer(notificationSlot, &dummy_, sizeof(dummy_));
0073   }
0074 
0075   virtual ~AllreduceRingChunked() {
0076     for (int i = 0; i < 2; i++) {
0077       if (inbox_[i] != nullptr) {
0078         free(inbox_[i]);
0079       }
0080     }
0081   }
0082 
0083   void run() {
0084     if (count_ == 0) {
0085       return;
0086     }
0087 
0088     // Reduce specified pointers into ptrs_[0]
0089     for (int i = 1; i < ptrs_.size(); i++) {
0090       fn_->call(ptrs_[0], ptrs_[i], count_);
0091     }
0092 
0093     if (this->contextSize_ == 1) {
0094       // Broadcast ptrs_[0]
0095       for (int i = 1; i < ptrs_.size(); i++) {
0096         memcpy(ptrs_[i], ptrs_[0], bytes_);
0097       }
0098       return;
0099     }
0100 
0101     // Kick off copying initial chunks
0102     copyChunkAtOffset(2 * this->contextRank_);
0103     copyChunkAtOffset(2 * this->contextRank_ + 1);
0104 
0105     // Start with reduction of previously copied chunk
0106     for (int round = 2; round < chunks_; round++) {
0107       // We loop over all chunks starting at 2, since we just sent two
0108       // chunks to fill both buffers. Imagine a square grid with
0109       // chunks of memory laid out vertically and nodes horizontally.
0110       // The diagonal of this grid marks which nodes sends which
0111       // chunks of memory in the prelude. Processing happens by moving
0112       // this diagonal forward and have it wrap around the edge. This
0113       // means that node with rank 0 at round 2 will process the last
0114       // chunk. This explains why we subtract the round in the offset
0115       // equation below.
0116       //
0117       // Because we're dealing with double buffering in this
0118       // implementation, we have twice the number of chunks and
0119       // process them in pairs. This explains why we ignore the LSB on
0120       // the round number when subtracting it. The LSB is later added
0121       // to flip back and forth between the two buffers for this pair
0122       // of chunks. The number of chunks is finally added to make sure
0123       // we can wrap correctly (no modulo against negative number).
0124       //
0125       auto chunkOffset = ((2 * this->contextRank_) - (round & ~0x1) +
0126                           (round & 0x1) + chunks_) %
0127           chunks_;
0128       auto offset = chunkOffset * chunkSize_;
0129       auto length = chunkSize_;
0130       if (offset + length <= count_) {
0131         // Chunk completely in range, copy full chunk.
0132       } else if (offset < count_) {
0133         // Chunk partially in range, copy partial chunk.
0134         length = count_ - offset;
0135       } else {
0136         // Chunk out of range, copy nothing.
0137         length = 0;
0138       }
0139 
0140       // Wait for inbox write to complete
0141       recvDataBuf_[chunkOffset & 1]->waitRecv();
0142 
0143       // Reduce
0144       if (length > 0) {
0145         fn_->call(&ptrs_[0][offset], inbox_[chunkOffset & 1], length);
0146       }
0147 
0148       // Send notification to node on the left that
0149       // this node is ready for an inbox write.
0150       sendNotificationBuf_->send();
0151 
0152       // Wait for notification from node on the right
0153       // to be sure this node can start an inbox write.
0154       recvNotificationBuf_->waitRecv();
0155 
0156       // Copy accumulated chunk
0157       copyChunkAtOffset(chunkOffset);
0158     }
0159 
0160     // Second pass around the ring to broadcast result.
0161     // End at chunks_-2 since that's where the accumulation
0162     // stopped in the previous set of rounds.
0163     for (int round = 0; round < (chunks_ - 2); round++) {
0164       auto chunkOffset = ((2 * this->contextRank_) - (round & ~0x1) +
0165                           (round & 0x1) + chunks_) %
0166           chunks_;
0167       auto offset = chunkOffset * chunkSize_;
0168       auto length = chunkSize_;
0169       if (offset + length <= count_) {
0170         // Chunk completely in range, copy full chunk.
0171       } else if (offset < count_) {
0172         // Chunk partially in range, copy partial chunk.
0173         length = count_ - offset;
0174       } else {
0175         // Chunk out of range, copy nothing.
0176         length = 0;
0177       }
0178 
0179       // Wait for inbox write to complete
0180       recvDataBuf_[chunkOffset & 1]->waitRecv();
0181 
0182       // Copy
0183       if (length > 0) {
0184         memcpy(&ptrs_[0][offset], inbox_[chunkOffset & 1], length * sizeof(T));
0185       }
0186 
0187       // Skip copying in the last two rounds
0188       if (round < (chunks_ - 4)) {
0189         // Send notification to node on the left that
0190         // this node is ready for an inbox write.
0191         sendNotificationBuf_->send();
0192 
0193         // Wait for notification from node on the right
0194         // to be sure this node can start an inbox write.
0195         recvNotificationBuf_->waitRecv();
0196 
0197         // Copy accumulated chunks
0198         copyChunkAtOffset(chunkOffset);
0199       }
0200     }
0201 
0202     // Final barrier to make sure every node has finished
0203     // Otherwise, a second all reduce call might interfere
0204     // with one that it still in progress on some nodes.
0205     sendNotificationBuf_->send();
0206     recvNotificationBuf_->waitRecv();
0207 
0208     // Broadcast ptrs_[0]
0209     for (int i = 1; i < ptrs_.size(); i++) {
0210       memcpy(ptrs_[i], ptrs_[0], bytes_);
0211     }
0212   }
0213 
0214  protected:
0215   void copyChunkAtOffset(int chunkOffset) {
0216     // Populate inbox of next participant in the ring.
0217     auto offset = (chunkOffset % chunks_) * chunkSize_;
0218     auto length = chunkSize_;
0219     if (offset + length <= count_) {
0220       // Chunk completely in range, copy full chunk.
0221     } else if (offset < count_) {
0222       // Chunk partially in range, copy partial chunk.
0223       length = count_ - offset;
0224     } else {
0225       // Chunk out of range, copy _something_.
0226       // When nothing is put on the wire for empty chunks. @pietern
0227       // has seen this algorithm hang. This is probably related to the
0228       // chunk iteration order described in the run function.
0229       offset = 0;
0230       length = 1;
0231     }
0232 
0233     // Initiate write to inbox of node on the right.
0234     sendDataBuf_[chunkOffset & 0x1]->send(
0235         offset * sizeof(T), length * sizeof(T));
0236   }
0237 
0238   std::vector<T*> ptrs_;
0239   const int count_;
0240   const int bytes_;
0241   const ReductionFunction<T>* fn_;
0242 
0243   size_t chunks_;
0244   size_t chunkSize_;
0245   size_t chunkBytes_;
0246 
0247   T* inbox_[2];
0248   std::unique_ptr<transport::Buffer> sendDataBuf_[2];
0249   std::unique_ptr<transport::Buffer> recvDataBuf_[2];
0250 
0251   int dummy_;
0252   std::unique_ptr<transport::Buffer> sendNotificationBuf_;
0253   std::unique_ptr<transport::Buffer> recvNotificationBuf_;
0254 };
0255 
0256 } // namespace gloo