File indexing completed on 2025-01-18 10:00:11
0001
0002
0003
0004
0005
0006
0007
0008
0009 #pragma once
0010
0011 #include <math.h>
0012 #include <stddef.h>
0013 #include <string.h>
0014 #include <cstring>
0015 #include <iomanip>
0016 #include <iostream>
0017 #include <unordered_map>
0018 #include <algorithm>
0019
0020 #include "gloo/algorithm.h"
0021 #include "gloo/common/error.h"
0022 #include "gloo/context.h"
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058 namespace gloo {
0059
0060 namespace bcube {
0061
0062
0063
0064
0065 class Node {
0066 public:
0067 explicit Node(int rank, int steps) : rank_(rank) {
0068 for (int i = 0; i < steps; ++i) {
0069 peersPerStep_.emplace_back();
0070 }
0071 numElemsPerStep_.resize(steps);
0072 ptrOffsetPerStep_.resize(steps);
0073 }
0074
0075
0076
0077 int getRank() const {
0078 return rank_;
0079 }
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091 void setPerStepAttributes(
0092 int step,
0093 const std::vector<int>& peerRanks,
0094 int numElems,
0095 int offset) {
0096 for (int peerRank : peerRanks) {
0097 if (peerRank != rank_) {
0098 peersPerStep_[step].emplace_back(peerRank);
0099 }
0100 }
0101 numElemsPerStep_[step] = numElems;
0102 ptrOffsetPerStep_[step] = offset;
0103 }
0104
0105
0106
0107
0108
0109 const std::vector<int>& getPeersPerStep(int step) const {
0110 return peersPerStep_[step];
0111 }
0112
0113
0114
0115
0116 int getNumElemsPerStep(int step) const {
0117 return numElemsPerStep_[step];
0118 }
0119
0120
0121
0122
0123
0124 int getPtrOffsetPerStep(int step) const {
0125 return ptrOffsetPerStep_[step];
0126 }
0127
0128 private:
0129
0130
0131
0132 const int rank_;
0133
0134
0135
0136
0137 std::vector<std::vector<int>> peersPerStep_;
0138
0139
0140
0141
0142
0143
0144
0145
0146 std::vector<int> numElemsPerStep_;
0147
0148
0149
0150
0151
0152
0153
0154 std::vector<int> ptrOffsetPerStep_;
0155 };
0156
0157
0158
0159
0160 class Group {
0161 public:
0162 Group(
0163 int step,
0164 const Node& firstNode,
0165 int peerDistance,
0166 int base,
0167 int nodes,
0168 int totalNumElems)
0169 : nodeRanks_(
0170 getNodeRanks(firstNode.getRank(), peerDistance, base, nodes)),
0171 ptrOffset_((0 == step) ? 0 : firstNode.getPtrOffsetPerStep(step - 1)),
0172 numElems_(computeNumElems(
0173 step,
0174 firstNode,
0175 nodeRanks_.size(),
0176 totalNumElems)) {}
0177
0178
0179
0180
0181 const std::vector<int>& getNodeRanks() const {
0182 return nodeRanks_;
0183 }
0184
0185
0186
0187
0188 int getPtrOffset() const {
0189 return ptrOffset_;
0190 }
0191
0192
0193
0194
0195 int getNumElems() const {
0196 return numElems_;
0197 }
0198
0199 private:
0200 const std::vector<int> nodeRanks_;
0201 const int ptrOffset_;
0202 const int numElems_;
0203
0204
0205
0206
0207
0208
0209
0210
0211
0212
0213
0214
0215
0216
0217 static int
0218 computeNumElems(int step, const Node& firstNode, int peers, int count) {
0219 int groupCount =
0220 (0 == step) ? count : firstNode.getNumElemsPerStep(step - 1);
0221 return std::max(groupCount, peers);
0222 }
0223
0224
0225
0226
0227
0228
0229 std::vector<int>
0230 getNodeRanks(int firstNodeRank, int peerDistance, int base, int nodes) const {
0231 std::vector<int> groupPeers;
0232 for (int i = 0; i < base; ++i) {
0233 int peerRank = firstNodeRank + i * peerDistance;
0234 if (peerRank < nodes) {
0235 groupPeers.emplace_back(peerRank);
0236 }
0237 }
0238 return groupPeers;
0239 }
0240 };
0241
0242 }
0243
0244
0245
0246
0247
0248
0249
0250
0251
0252
0253
0254
0255 template <typename T>
0256 class AllreduceBcube : public Algorithm {
0257 public:
0258 AllreduceBcube(
0259 const std::shared_ptr<Context>& context,
0260 const std::vector<T*> ptrs,
0261 const int count,
0262 const ReductionFunction<T>* fn = ReductionFunction<T>::sum)
0263 : Algorithm(context),
0264 myRank_(this->context_->rank),
0265 base_(this->context_->base),
0266 nodes_(this->contextSize_),
0267 ptrs_(ptrs),
0268 totalNumElems_(count),
0269 bytes_(totalNumElems_ * sizeof(T)),
0270 steps_(computeSteps(nodes_, base_)),
0271 fn_(fn),
0272 recvBufs_(steps_ * base_) {
0273 if (totalNumElems_ == 0 || nodes_ == 1) {
0274 return;
0275 }
0276 setupNodes();
0277
0278
0279
0280
0281
0282
0283 int slotOffset_ = this->context_->nextSlot(
0284 2 * this->contextSize_ * (this->contextSize_ - 1));
0285
0286 int bufIdx = 0;
0287 for (int step = 0; step < steps_; ++step) {
0288 for (int destRank : getPeersPerStep(myRank_, step)) {
0289 int recvSize = std::max(
0290 getNumElemsPerStep(myRank_, step),
0291 getNumElemsPerStep(destRank, step));
0292 auto& pair = this->context_->getPair(destRank);
0293 auto slot = slotOffset_ +
0294 2 * (std::min(myRank_, destRank) * nodes_ +
0295 std::max(myRank_, destRank));
0296 sendDataBufs_[destRank] =
0297 pair->createSendBuffer(slot, ptrs_[0], bytes_);
0298 recvBufs_[bufIdx].resize(recvSize);
0299 recvDataBufs_[destRank] = pair->createRecvBuffer(
0300 slot, &recvBufs_[bufIdx][0], recvSize * sizeof(T));
0301 recvBufIdx_[destRank] = bufIdx;
0302 ++bufIdx;
0303 ++slot;
0304 sendNotificationBufs_[destRank] =
0305 pair->createSendBuffer(slot, &dummy_, sizeof(dummy_));
0306 recvNotificationBufs_[destRank] =
0307 pair->createRecvBuffer(slot, &dummy_, sizeof(dummy_));
0308 }
0309 }
0310 }
0311
0312 #ifdef DEBUG
0313 #define DEBUG_PRINT_STAGE(stage) \
0314 do { \
0315 printStageBuffer(stage); \
0316 } while (false)
0317 #define DEBUG_PRINT_SEND(stage) \
0318 do { \
0319 printStepBuffer( \
0320 stage, step, myRank_, destRank, &ptrs_[0][0], sendCount, ptrOffset); \
0321 } while (false)
0322 #define DEBUG_PRINT_RECV(stage) \
0323 do { \
0324 printStepBuffer( \
0325 stage, \
0326 step, \
0327 srcRank, \
0328 myRank_, \
0329 &recvBufs_[recvBufIdx_[srcRank]][0], \
0330 recvCount); \
0331 } while (false)
0332 #else
0333 #define DEBUG_PRINT_STAGE(stage)
0334 #define DEBUG_PRINT_SEND(stage)
0335 #define DEBUG_PRINT_RECV(stage)
0336 #endif
0337
0338 void run() {
0339 if (totalNumElems_ == 0) {
0340 return;
0341 }
0342
0343 for (int i = 1; i < ptrs_.size(); i++) {
0344 fn_->call(ptrs_[0], ptrs_[i], totalNumElems_);
0345 }
0346
0347 if (nodes_ == 1) {
0348
0349 for (int i = 1; i < ptrs_.size(); i++) {
0350 memcpy(ptrs_[i], ptrs_[0], bytes_);
0351 }
0352 return;
0353 }
0354
0355
0356 DEBUG_PRINT_STAGE("start");
0357 for (int step = 0; step < steps_; ++step) {
0358 for (int destRank : getPeersPerStep(myRank_, step)) {
0359 int sendCount = getNumElemsPerStep(destRank, step);
0360 int ptrOffset = getPtrOffsetPerStep(destRank, step);
0361 DEBUG_PRINT_SEND("reduce-scatter");
0362 sendDataBufs_[destRank]->send(
0363 ptrOffset * sizeof(T), sendCount * sizeof(T));
0364 }
0365
0366 for (int srcRank : getPeersPerStep(myRank_, step)) {
0367 int recvCount = getNumElemsPerStep(myRank_, step);
0368 int ptrOffset = getPtrOffsetPerStep(myRank_, step);
0369 recvDataBufs_[srcRank]->waitRecv();
0370 DEBUG_PRINT_RECV("reduce-scatter");
0371 fn_->call(
0372 &ptrs_[0][ptrOffset],
0373 &recvBufs_[recvBufIdx_[srcRank]][0],
0374 recvCount);
0375
0376
0377
0378
0379 sendNotificationBufs_[srcRank]->send();
0380 }
0381 }
0382
0383 DEBUG_PRINT_STAGE("reduce-scattered");
0384
0385
0386 for (int step = steps_ - 1; step >= 0; --step) {
0387 for (int destRank : getPeersPerStep(myRank_, step)) {
0388 int sendCount = getNumElemsPerStep(myRank_, step);
0389 int ptrOffset = getPtrOffsetPerStep(myRank_, step);
0390
0391
0392
0393
0394 recvNotificationBufs_[destRank]->waitRecv();
0395 DEBUG_PRINT_SEND("all-gather");
0396 sendDataBufs_[destRank]->send(
0397 ptrOffset * sizeof(T), sendCount * sizeof(T));
0398 }
0399
0400 for (int srcRank : getPeersPerStep(myRank_, step)) {
0401 int recvCount = getNumElemsPerStep(srcRank, step);
0402 int ptrOffset = getPtrOffsetPerStep(srcRank, step);
0403 recvDataBufs_[srcRank]->waitRecv();
0404 DEBUG_PRINT_RECV("all-gather");
0405 std::memcpy(
0406 &ptrs_[0][ptrOffset],
0407 &recvBufs_[recvBufIdx_[srcRank]][0],
0408 recvCount * sizeof(T));
0409 if (step == 0) {
0410
0411
0412
0413
0414 sendNotificationBufs_[srcRank]->send();
0415 }
0416 }
0417 }
0418
0419 DEBUG_PRINT_STAGE("all-reduced");
0420
0421
0422 for (int i = 1; i < ptrs_.size(); i++) {
0423 memcpy(ptrs_[i], ptrs_[0], bytes_);
0424 }
0425
0426
0427
0428
0429
0430
0431 for (int peerRank : getPeersPerStep(myRank_, 0)) {
0432 recvNotificationBufs_[peerRank]->waitRecv();
0433 }
0434 }
0435
0436 private:
0437
0438
0439
0440 static constexpr int wordsPerSection = 4;
0441
0442
0443
0444 static constexpr int wordsPerLine = 4 * wordsPerSection;
0445
0446
0447
0448 const int myRank_{0};
0449
0450
0451
0452 const int base_{2};
0453
0454
0455
0456 const int nodes_{0};
0457
0458
0459
0460 const std::vector<T*> ptrs_{nullptr};
0461
0462
0463
0464 const int totalNumElems_{0};
0465
0466
0467
0468 const int bytes_{0};
0469
0470
0471
0472 const size_t steps_{0};
0473
0474
0475
0476 const ReductionFunction<T>* fn_{nullptr};
0477
0478
0479
0480 std::vector<std::vector<T>> recvBufs_;
0481
0482
0483
0484 std::unordered_map<int, int> recvBufIdx_;
0485
0486
0487
0488 std::unordered_map<int, std::unique_ptr<transport::Buffer>> sendDataBufs_;
0489
0490
0491
0492 std::unordered_map<int, std::unique_ptr<transport::Buffer>> recvDataBufs_;
0493
0494
0495
0496 int dummy_;
0497
0498
0499
0500
0501 std::unordered_map<int, std::unique_ptr<transport::Buffer>>
0502 sendNotificationBufs_;
0503
0504
0505
0506
0507 std::unordered_map<int, std::unique_ptr<transport::Buffer>>
0508 recvNotificationBufs_;
0509
0510
0511
0512 std::vector<bcube::Node> allNodes_;
0513
0514
0515
0516
0517
0518 static int computeSteps(int nodes, int peers) {
0519 float lg2n = log2(nodes);
0520 float lg2p = log2(peers);
0521 return ceil(lg2n / lg2p);
0522 }
0523
0524
0525
0526
0527 static bool printCheck(int ) {
0528 return false;
0529 }
0530
0531
0532
0533
0534
0535 static void printBreak(T* p, int x) {
0536 if (0 == x % wordsPerLine) {
0537 std::cout << std::endl
0538 << &p[x] << " " << std::setfill('0') << std::setw(5) << x
0539 << ": ";
0540 } else if (0 == x % wordsPerSection) {
0541 std::cout << "- ";
0542 }
0543 }
0544
0545
0546
0547
0548
0549
0550 static void printElems(T* p, int count, int start = 0) {
0551 auto alignedStart = (start / wordsPerLine) * wordsPerLine;
0552 for (int x = alignedStart; x < start + count; ++x) {
0553 printBreak(p, x);
0554 if (x < start) {
0555 std::cout << "..... ";
0556 } else {
0557 std::cout << std::setfill('0') << std::setw(5) << p[x] << " ";
0558 }
0559 }
0560 }
0561
0562
0563
0564
0565 void printStageBuffer(const std::string& msg) {
0566 if (printCheck(myRank_)) {
0567 std::cout << "rank (" << myRank_ << ") " << msg << ": ";
0568 printElems(&ptrs_[0][0], totalNumElems_);
0569 std::cout << std::endl;
0570 }
0571 }
0572
0573
0574
0575
0576
0577
0578
0579
0580
0581
0582 void printStepBuffer(
0583 const std::string& stage,
0584 int step,
0585 int srcRank,
0586 int destRank,
0587 T* p,
0588 int count,
0589 int start = 0) {
0590 if (printCheck(myRank_)) {
0591 std::cout << stage << ": step (" << step << ") "
0592 << "srcRank (" << srcRank << ") -> "
0593 << "destRank (" << destRank << "): ";
0594 printElems(p, count, start);
0595 std::cout << std::endl;
0596 }
0597 }
0598
0599
0600
0601
0602
0603
0604 const std::vector<int>& getPeersPerStep(int rank, int step) {
0605 return allNodes_[rank].getPeersPerStep(step);
0606 }
0607
0608
0609
0610
0611
0612 int getNumElemsPerStep(int rank, int step) {
0613 return allNodes_[rank].getNumElemsPerStep(step);
0614 }
0615
0616
0617
0618
0619
0620
0621 int getPtrOffsetPerStep(int rank, int step) {
0622 return allNodes_[rank].getPtrOffsetPerStep(step);
0623 }
0624
0625
0626
0627 void createNodes() {
0628 for (int rank = 0; rank < nodes_; ++rank) {
0629 allNodes_.emplace_back(rank, steps_);
0630 }
0631 }
0632
0633
0634
0635
0636
0637 void updateGroupNodes(int step, const bcube::Group& group) {
0638 const std::vector<int>& peers = group.getNodeRanks();
0639 const int peersSz = peers.size();
0640 int ptrOffset = group.getPtrOffset();
0641 int count = group.getNumElems() / peersSz;
0642 const int countRem = group.getNumElems() % peersSz;
0643 if (0 == count) {
0644 count = 1;
0645 }
0646 for (int i = 0; i < peersSz; ++i) {
0647 bcube::Node& node = allNodes_[peers[i]];
0648 if (peersSz - 1 != i) {
0649 node.setPerStepAttributes(step, peers, count, ptrOffset);
0650 ptrOffset += count;
0651 } else {
0652
0653
0654
0655
0656 node.setPerStepAttributes(step, peers, count + countRem, ptrOffset);
0657 ptrOffset += count + countRem;
0658 }
0659 ptrOffset %= totalNumElems_;
0660 }
0661 }
0662
0663
0664
0665
0666
0667
0668
0669 void setupNodes() {
0670
0671 createNodes();
0672
0673
0674 int peerDistance = 1;
0675 for (int step = 0; step < steps_; ++step) {
0676 std::vector<bcube::Group> groups;
0677
0678 for (int rank = 0; rank < nodes_; ++rank) {
0679 const bcube::Node& firstNode = allNodes_[rank];
0680
0681 if (0 == firstNode.getPeersPerStep(step).size()) {
0682
0683 groups.emplace_back(
0684 step, firstNode, peerDistance, base_, nodes_, totalNumElems_);
0685
0686 updateGroupNodes(step, groups.back());
0687 }
0688 }
0689
0690 peerDistance *= base_;
0691 }
0692 }
0693 };
0694
0695 }