File indexing completed on 2025-01-18 10:00:11
0001
0002
0003
0004
0005
0006
0007
0008
0009 #pragma once
0010
0011 #include <functional>
0012 #include <memory>
0013
0014 #include "gloo/context.h"
0015 #include "gloo/transport/unbound_buffer.h"
0016
0017 namespace gloo {
0018
0019 class ReduceOptions {
0020 public:
0021 using Func = std::function<void(void*, const void*, const void*, size_t)>;
0022
0023 explicit ReduceOptions(const std::shared_ptr<Context>& context)
0024 : context(context), timeout(context->getTimeout()) {}
0025
0026 template <typename T>
0027 void setInput(std::unique_ptr<transport::UnboundBuffer> buf) {
0028 this->elements = buf->size / sizeof(T);
0029 this->elementSize = sizeof(T);
0030 this->in = std::move(buf);
0031 }
0032
0033 template <typename T>
0034 void setInput(T* ptr, size_t elements) {
0035 this->elements = elements;
0036 this->elementSize = sizeof(T);
0037 this->in = context->createUnboundBuffer(ptr, elements * sizeof(T));
0038 }
0039
0040 template <typename T>
0041 void setOutput(std::unique_ptr<transport::UnboundBuffer> buf) {
0042 this->elements = buf->size / sizeof(T);
0043 this->elementSize = sizeof(T);
0044 this->out = std::move(buf);
0045 }
0046
0047 template <typename T>
0048 void setOutput(T* ptr, size_t elements) {
0049 this->elements = elements;
0050 this->elementSize = sizeof(T);
0051 this->out = context->createUnboundBuffer(ptr, elements * sizeof(T));
0052 }
0053
0054 void setRoot(int root) {
0055 this->root = root;
0056 }
0057
0058 void setReduceFunction(Func fn) {
0059 this->reduce = fn;
0060 }
0061
0062 void setTag(uint32_t tag) {
0063 this->tag = tag;
0064 }
0065
0066 void setMaxSegmentSize(size_t maxSegmentSize) {
0067 this->maxSegmentSize = maxSegmentSize;
0068 }
0069
0070 void setTimeout(std::chrono::milliseconds timeout) {
0071 this->timeout = timeout;
0072 }
0073
0074 protected:
0075 std::shared_ptr<Context> context;
0076 std::unique_ptr<transport::UnboundBuffer> in;
0077 std::unique_ptr<transport::UnboundBuffer> out;
0078
0079
0080 size_t elements = 0;
0081
0082
0083 size_t elementSize = 0;
0084
0085
0086 int root = -1;
0087
0088
0089 Func reduce;
0090
0091
0092
0093 uint32_t tag = 0;
0094
0095
0096
0097
0098 static constexpr size_t kMaxSegmentSize = 1024 * 1024;
0099
0100
0101
0102
0103
0104 size_t maxSegmentSize = kMaxSegmentSize;
0105
0106
0107 std::chrono::milliseconds timeout;
0108
0109 friend void reduce(ReduceOptions&);
0110 };
0111
0112 void reduce(ReduceOptions& opts);
0113
0114 }