File indexing completed on 2024-05-18 08:30:32
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #ifndef __ZMQ_ADDON_HPP_INCLUDED__
0025 #define __ZMQ_ADDON_HPP_INCLUDED__
0026
0027 #include "zmq.hpp"
0028
0029 #include <deque>
0030 #include <iomanip>
0031 #include <sstream>
0032 #include <stdexcept>
0033 #ifdef ZMQ_CPP11
0034 #include <limits>
0035 #include <functional>
0036 #include <unordered_map>
0037 #endif
0038
0039 namespace zmq
0040 {
0041 #ifdef ZMQ_CPP11
0042
0043 namespace detail
0044 {
0045 template<bool CheckN, class OutputIt>
0046 recv_result_t
0047 recv_multipart_n(socket_ref s, OutputIt out, size_t n, recv_flags flags)
0048 {
0049 size_t msg_count = 0;
0050 message_t msg;
0051 while (true) {
0052 if ZMQ_CONSTEXPR_IF (CheckN) {
0053 if (msg_count >= n)
0054 throw std::runtime_error(
0055 "Too many message parts in recv_multipart_n");
0056 }
0057 if (!s.recv(msg, flags)) {
0058
0059 assert(msg_count == 0);
0060 return {};
0061 }
0062 ++msg_count;
0063 const bool more = msg.more();
0064 *out++ = std::move(msg);
0065 if (!more)
0066 break;
0067 }
0068 return msg_count;
0069 }
0070
0071 inline bool is_little_endian()
0072 {
0073 const uint16_t i = 0x01;
0074 return *reinterpret_cast<const uint8_t *>(&i) == 0x01;
0075 }
0076
0077 inline void write_network_order(unsigned char *buf, const uint32_t value)
0078 {
0079 if (is_little_endian()) {
0080 ZMQ_CONSTEXPR_VAR uint32_t mask = std::numeric_limits<std::uint8_t>::max();
0081 *buf++ = static_cast<unsigned char>((value >> 24) & mask);
0082 *buf++ = static_cast<unsigned char>((value >> 16) & mask);
0083 *buf++ = static_cast<unsigned char>((value >> 8) & mask);
0084 *buf++ = static_cast<unsigned char>(value & mask);
0085 } else {
0086 std::memcpy(buf, &value, sizeof(value));
0087 }
0088 }
0089
0090 inline uint32_t read_u32_network_order(const unsigned char *buf)
0091 {
0092 if (is_little_endian()) {
0093 return (static_cast<uint32_t>(buf[0]) << 24)
0094 + (static_cast<uint32_t>(buf[1]) << 16)
0095 + (static_cast<uint32_t>(buf[2]) << 8)
0096 + static_cast<uint32_t>(buf[3]);
0097 } else {
0098 uint32_t value;
0099 std::memcpy(&value, buf, sizeof(value));
0100 return value;
0101 }
0102 }
0103 }
0104
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117 template<class OutputIt>
0118 ZMQ_NODISCARD recv_result_t recv_multipart(socket_ref s,
0119 OutputIt out,
0120 recv_flags flags = recv_flags::none)
0121 {
0122 return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
0123 }
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139 template<class OutputIt>
0140 ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s,
0141 OutputIt out,
0142 size_t n,
0143 recv_flags flags = recv_flags::none)
0144 {
0145 return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
0146 }
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160 template<class Range
0161 #ifndef ZMQ_CPP11_PARTIAL
0162 ,
0163 typename = typename std::enable_if<
0164 detail::is_range<Range>::value
0165 && (std::is_same<detail::range_value_t<Range>, message_t>::value
0166 || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
0167 #endif
0168 >
0169 send_result_t
0170 send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
0171 {
0172 using std::begin;
0173 using std::end;
0174 auto it = begin(msgs);
0175 const auto end_it = end(msgs);
0176 size_t msg_count = 0;
0177 while (it != end_it) {
0178 const auto next = std::next(it);
0179 const auto msg_flags =
0180 flags | (next == end_it ? send_flags::none : send_flags::sndmore);
0181 if (!s.send(*it, msg_flags)) {
0182
0183 assert(it == begin(msgs));
0184 return {};
0185 }
0186 ++msg_count;
0187 it = next;
0188 }
0189 return msg_count;
0190 }
0191
0192
0193
0194
0195
0196
0197
0198
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210
0211 template<class Range
0212 #ifndef ZMQ_CPP11_PARTIAL
0213 ,
0214 typename = typename std::enable_if<
0215 detail::is_range<Range>::value
0216 && (std::is_same<detail::range_value_t<Range>, message_t>::value
0217 || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
0218 #endif
0219 >
0220 message_t encode(const Range &parts)
0221 {
0222 size_t mmsg_size = 0;
0223
0224
0225 for (const auto &part : parts) {
0226 const size_t part_size = part.size();
0227 if (part_size > std::numeric_limits<std::uint32_t>::max()) {
0228
0229 throw std::range_error("Invalid size, message part too large");
0230 }
0231 const size_t count_size =
0232 part_size < std::numeric_limits<std::uint8_t>::max() ? 1 : 5;
0233 mmsg_size += part_size + count_size;
0234 }
0235
0236 message_t encoded(mmsg_size);
0237 unsigned char *buf = encoded.data<unsigned char>();
0238 for (const auto &part : parts) {
0239 const uint32_t part_size = static_cast<uint32_t>(part.size());
0240 const unsigned char *part_data =
0241 static_cast<const unsigned char *>(part.data());
0242
0243 if (part_size < std::numeric_limits<std::uint8_t>::max()) {
0244
0245 *buf++ = (unsigned char) part_size;
0246 } else {
0247
0248 *buf++ = std::numeric_limits<uint8_t>::max();
0249 detail::write_network_order(buf, part_size);
0250 buf += sizeof(part_size);
0251 }
0252 std::memcpy(buf, part_data, part_size);
0253 buf += part_size;
0254 }
0255
0256 assert(static_cast<size_t>(buf - encoded.data<unsigned char>()) == mmsg_size);
0257 return encoded;
0258 }
0259
0260
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271
0272
0273
0274
0275 template<class OutputIt> OutputIt decode(const message_t &encoded, OutputIt out)
0276 {
0277 const unsigned char *source = encoded.data<unsigned char>();
0278 const unsigned char *const limit = source + encoded.size();
0279
0280 while (source < limit) {
0281 size_t part_size = *source++;
0282 if (part_size == std::numeric_limits<std::uint8_t>::max()) {
0283 if (static_cast<size_t>(limit - source) < sizeof(uint32_t)) {
0284 throw std::out_of_range(
0285 "Malformed encoding, overflow in reading size");
0286 }
0287 part_size = detail::read_u32_network_order(source);
0288
0289 source += sizeof(uint32_t);
0290 }
0291
0292 if (static_cast<size_t>(limit - source) < part_size) {
0293 throw std::out_of_range("Malformed encoding, overflow in reading part");
0294 }
0295 *out = message_t(source, part_size);
0296 ++out;
0297 source += part_size;
0298 }
0299
0300 assert(source == limit);
0301 return out;
0302 }
0303
0304 #endif
0305
0306
0307 #ifdef ZMQ_HAS_RVALUE_REFS
0308
0309
0310
0311
0312
0313
0314
0315
0316 class multipart_t
0317 {
0318 private:
0319 std::deque<message_t> m_parts;
0320
0321 public:
0322 typedef std::deque<message_t>::value_type value_type;
0323
0324 typedef std::deque<message_t>::iterator iterator;
0325 typedef std::deque<message_t>::const_iterator const_iterator;
0326
0327 typedef std::deque<message_t>::reverse_iterator reverse_iterator;
0328 typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
0329
0330
0331 multipart_t() {}
0332
0333
0334 multipart_t(socket_ref socket) { recv(socket); }
0335
0336
0337 multipart_t(const void *src, size_t size) { addmem(src, size); }
0338
0339
0340 multipart_t(const std::string &string) { addstr(string); }
0341
0342
0343 multipart_t(message_t &&message) { add(std::move(message)); }
0344
0345
0346 multipart_t(multipart_t &&other) { m_parts = std::move(other.m_parts); }
0347
0348
0349 multipart_t &operator=(multipart_t &&other)
0350 {
0351 m_parts = std::move(other.m_parts);
0352 return *this;
0353 }
0354
0355
0356 virtual ~multipart_t() { clear(); }
0357
0358 message_t &operator[](size_t n) { return m_parts[n]; }
0359
0360 const message_t &operator[](size_t n) const { return m_parts[n]; }
0361
0362 message_t &at(size_t n) { return m_parts.at(n); }
0363
0364 const message_t &at(size_t n) const { return m_parts.at(n); }
0365
0366 iterator begin() { return m_parts.begin(); }
0367
0368 const_iterator begin() const { return m_parts.begin(); }
0369
0370 const_iterator cbegin() const { return m_parts.cbegin(); }
0371
0372 reverse_iterator rbegin() { return m_parts.rbegin(); }
0373
0374 const_reverse_iterator rbegin() const { return m_parts.rbegin(); }
0375
0376 iterator end() { return m_parts.end(); }
0377
0378 const_iterator end() const { return m_parts.end(); }
0379
0380 const_iterator cend() const { return m_parts.cend(); }
0381
0382 reverse_iterator rend() { return m_parts.rend(); }
0383
0384 const_reverse_iterator rend() const { return m_parts.rend(); }
0385
0386
0387 void clear() { m_parts.clear(); }
0388
0389
0390 size_t size() const { return m_parts.size(); }
0391
0392
0393 bool empty() const { return m_parts.empty(); }
0394
0395
0396 bool recv(socket_ref socket, int flags = 0)
0397 {
0398 clear();
0399 bool more = true;
0400 while (more) {
0401 message_t message;
0402 #ifdef ZMQ_CPP11
0403 if (!socket.recv(message, static_cast<recv_flags>(flags)))
0404 return false;
0405 #else
0406 if (!socket.recv(&message, flags))
0407 return false;
0408 #endif
0409 more = message.more();
0410 add(std::move(message));
0411 }
0412 return true;
0413 }
0414
0415
0416 bool send(socket_ref socket, int flags = 0)
0417 {
0418 flags &= ~(ZMQ_SNDMORE);
0419 bool more = size() > 0;
0420 while (more) {
0421 message_t message = pop();
0422 more = size() > 0;
0423 #ifdef ZMQ_CPP11
0424 if (!socket.send(message, static_cast<send_flags>(
0425 (more ? ZMQ_SNDMORE : 0) | flags)))
0426 return false;
0427 #else
0428 if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
0429 return false;
0430 #endif
0431 }
0432 clear();
0433 return true;
0434 }
0435
0436
0437 void prepend(multipart_t &&other)
0438 {
0439 while (!other.empty())
0440 push(other.remove());
0441 }
0442
0443
0444 void append(multipart_t &&other)
0445 {
0446 while (!other.empty())
0447 add(other.pop());
0448 }
0449
0450
0451 void pushmem(const void *src, size_t size)
0452 {
0453 m_parts.push_front(message_t(src, size));
0454 }
0455
0456
0457 void addmem(const void *src, size_t size)
0458 {
0459 m_parts.push_back(message_t(src, size));
0460 }
0461
0462
0463 void pushstr(const std::string &string)
0464 {
0465 m_parts.push_front(message_t(string.data(), string.size()));
0466 }
0467
0468
0469 void addstr(const std::string &string)
0470 {
0471 m_parts.push_back(message_t(string.data(), string.size()));
0472 }
0473
0474
0475 template<typename T> void pushtyp(const T &type)
0476 {
0477 static_assert(!std::is_same<T, std::string>::value,
0478 "Use pushstr() instead of pushtyp<std::string>()");
0479 m_parts.push_front(message_t(&type, sizeof(type)));
0480 }
0481
0482
0483 template<typename T> void addtyp(const T &type)
0484 {
0485 static_assert(!std::is_same<T, std::string>::value,
0486 "Use addstr() instead of addtyp<std::string>()");
0487 m_parts.push_back(message_t(&type, sizeof(type)));
0488 }
0489
0490
0491 void push(message_t &&message) { m_parts.push_front(std::move(message)); }
0492
0493
0494 void add(message_t &&message) { m_parts.push_back(std::move(message)); }
0495
0496
0497 void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
0498
0499
0500 std::string popstr()
0501 {
0502 std::string string(m_parts.front().data<char>(), m_parts.front().size());
0503 m_parts.pop_front();
0504 return string;
0505 }
0506
0507
0508 template<typename T> T poptyp()
0509 {
0510 static_assert(!std::is_same<T, std::string>::value,
0511 "Use popstr() instead of poptyp<std::string>()");
0512 if (sizeof(T) != m_parts.front().size())
0513 throw std::runtime_error(
0514 "Invalid type, size does not match the message size");
0515 T type = *m_parts.front().data<T>();
0516 m_parts.pop_front();
0517 return type;
0518 }
0519
0520
0521 message_t pop()
0522 {
0523 message_t message = std::move(m_parts.front());
0524 m_parts.pop_front();
0525 return message;
0526 }
0527
0528
0529 message_t remove()
0530 {
0531 message_t message = std::move(m_parts.back());
0532 m_parts.pop_back();
0533 return message;
0534 }
0535
0536
0537 const message_t &front() { return m_parts.front(); }
0538
0539
0540 const message_t &back() { return m_parts.back(); }
0541
0542
0543 const message_t *peek(size_t index) const { return &m_parts[index]; }
0544
0545
0546 std::string peekstr(size_t index) const
0547 {
0548 std::string string(m_parts[index].data<char>(), m_parts[index].size());
0549 return string;
0550 }
0551
0552
0553 template<typename T> T peektyp(size_t index) const
0554 {
0555 static_assert(!std::is_same<T, std::string>::value,
0556 "Use peekstr() instead of peektyp<std::string>()");
0557 if (sizeof(T) != m_parts[index].size())
0558 throw std::runtime_error(
0559 "Invalid type, size does not match the message size");
0560 T type = *m_parts[index].data<T>();
0561 return type;
0562 }
0563
0564
0565 template<typename T> static multipart_t create(const T &type)
0566 {
0567 multipart_t multipart;
0568 multipart.addtyp(type);
0569 return multipart;
0570 }
0571
0572
0573 multipart_t clone() const
0574 {
0575 multipart_t multipart;
0576 for (size_t i = 0; i < size(); i++)
0577 multipart.addmem(m_parts[i].data(), m_parts[i].size());
0578 return multipart;
0579 }
0580
0581
0582 std::string str() const
0583 {
0584 std::stringstream ss;
0585 for (size_t i = 0; i < m_parts.size(); i++) {
0586 const unsigned char *data = m_parts[i].data<unsigned char>();
0587 size_t size = m_parts[i].size();
0588
0589
0590 bool isText = true;
0591 for (size_t j = 0; j < size; j++) {
0592 if (data[j] < 32 || data[j] > 127) {
0593 isText = false;
0594 break;
0595 }
0596 }
0597 ss << "\n[" << std::dec << std::setw(3) << std::setfill('0') << size
0598 << "] ";
0599 if (size >= 1000) {
0600 ss << "... (too big to print)";
0601 continue;
0602 }
0603 for (size_t j = 0; j < size; j++) {
0604 if (isText)
0605 ss << static_cast<char>(data[j]);
0606 else
0607 ss << std::hex << std::setw(2) << std::setfill('0')
0608 << static_cast<short>(data[j]);
0609 }
0610 }
0611 return ss.str();
0612 }
0613
0614
0615 bool equal(const multipart_t *other) const ZMQ_NOTHROW
0616 {
0617 return *this == *other;
0618 }
0619
0620 bool operator==(const multipart_t &other) const ZMQ_NOTHROW
0621 {
0622 if (size() != other.size())
0623 return false;
0624 for (size_t i = 0; i < size(); i++)
0625 if (at(i) != other.at(i))
0626 return false;
0627 return true;
0628 }
0629
0630 bool operator!=(const multipart_t &other) const ZMQ_NOTHROW
0631 {
0632 return !(*this == other);
0633 }
0634
0635 #ifdef ZMQ_CPP11
0636
0637
0638 message_t encode() const { return zmq::encode(*this); }
0639
0640
0641 void decode_append(const message_t &encoded)
0642 {
0643 zmq::decode(encoded, std::back_inserter(*this));
0644 }
0645
0646
0647 static multipart_t decode(const message_t &encoded)
0648 {
0649 multipart_t tmp;
0650 zmq::decode(encoded, std::back_inserter(tmp));
0651 return tmp;
0652 }
0653
0654 #endif
0655
0656 private:
0657
0658 multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;
0659 void operator=(const multipart_t &other) ZMQ_DELETED_FUNCTION;
0660 };
0661
0662 inline std::ostream &operator<<(std::ostream &os, const multipart_t &msg)
0663 {
0664 return os << msg.str();
0665 }
0666
0667 #endif
0668
0669 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
0670 class active_poller_t
0671 {
0672 public:
0673 active_poller_t() = default;
0674 ~active_poller_t() = default;
0675
0676 active_poller_t(const active_poller_t &) = delete;
0677 active_poller_t &operator=(const active_poller_t &) = delete;
0678
0679 active_poller_t(active_poller_t &&src) = default;
0680 active_poller_t &operator=(active_poller_t &&src) = default;
0681
0682 using handler_type = std::function<void(event_flags)>;
0683
0684 void add(zmq::socket_ref socket, event_flags events, handler_type handler)
0685 {
0686 if (!handler)
0687 throw std::invalid_argument("null handler in active_poller_t::add");
0688 auto ret = handlers.emplace(
0689 socket, std::make_shared<handler_type>(std::move(handler)));
0690 if (!ret.second)
0691 throw error_t(EINVAL);
0692 try {
0693 base_poller.add(socket, events, ret.first->second.get());
0694 need_rebuild = true;
0695 }
0696 catch (...) {
0697
0698 handlers.erase(socket);
0699 throw;
0700 }
0701 }
0702
0703 void remove(zmq::socket_ref socket)
0704 {
0705 base_poller.remove(socket);
0706 handlers.erase(socket);
0707 need_rebuild = true;
0708 }
0709
0710 void modify(zmq::socket_ref socket, event_flags events)
0711 {
0712 base_poller.modify(socket, events);
0713 }
0714
0715 size_t wait(std::chrono::milliseconds timeout)
0716 {
0717 if (need_rebuild) {
0718 poller_events.resize(handlers.size());
0719 poller_handlers.clear();
0720 poller_handlers.reserve(handlers.size());
0721 for (const auto &handler : handlers) {
0722 poller_handlers.push_back(handler.second);
0723 }
0724 need_rebuild = false;
0725 }
0726 const auto count = base_poller.wait_all(poller_events, timeout);
0727 std::for_each(poller_events.begin(),
0728 poller_events.begin() + static_cast<ptrdiff_t>(count),
0729 [](decltype(base_poller)::event_type &event) {
0730 assert(event.user_data != nullptr);
0731 (*event.user_data)(event.events);
0732 });
0733 return count;
0734 }
0735
0736 ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); }
0737
0738 size_t size() const noexcept { return handlers.size(); }
0739
0740 private:
0741 bool need_rebuild{false};
0742
0743 poller_t<handler_type> base_poller{};
0744 std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
0745 std::vector<decltype(base_poller)::event_type> poller_events{};
0746 std::vector<std::shared_ptr<handler_type>> poller_handlers{};
0747 };
0748 #endif
0749
0750
0751 }
0752
0753 #endif