Warning, file /include/zmq_addon.hpp was not indexed
or was modified since last indexation (in which case cross-reference links may be missing, inaccurate or erroneous).
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024 #ifndef __ZMQ_ADDON_HPP_INCLUDED__
0025 #define __ZMQ_ADDON_HPP_INCLUDED__
0026
0027 #include "zmq.hpp"
0028
0029 #include <deque>
0030 #include <iomanip>
0031 #include <sstream>
0032 #include <stdexcept>
0033 #ifdef ZMQ_CPP11
0034 #include <limits>
0035 #include <functional>
0036 #include <unordered_map>
0037 #endif
0038
0039 namespace zmq
0040 {
0041 #ifdef ZMQ_CPP11
0042
0043 namespace detail
0044 {
0045 template<bool CheckN, class OutputIt>
0046 recv_result_t
0047 recv_multipart_n(socket_ref s, OutputIt out, size_t n, recv_flags flags)
0048 {
0049 size_t msg_count = 0;
0050 message_t msg;
0051 while (true) {
0052 if ZMQ_CONSTEXPR_IF (CheckN) {
0053 if (msg_count >= n)
0054 throw std::runtime_error(
0055 "Too many message parts in recv_multipart_n");
0056 }
0057 if (!s.recv(msg, flags)) {
0058
0059 assert(msg_count == 0);
0060 return {};
0061 }
0062 ++msg_count;
0063 const bool more = msg.more();
0064 *out++ = std::move(msg);
0065 if (!more)
0066 break;
0067 }
0068 return msg_count;
0069 }
0070
0071 inline bool is_little_endian()
0072 {
0073 const uint16_t i = 0x01;
0074 return *reinterpret_cast<const uint8_t *>(&i) == 0x01;
0075 }
0076
0077 inline void write_network_order(unsigned char *buf, const uint32_t value)
0078 {
0079 if (is_little_endian()) {
0080 ZMQ_CONSTEXPR_VAR uint32_t mask = (std::numeric_limits<std::uint8_t>::max)();
0081 *buf++ = static_cast<unsigned char>((value >> 24) & mask);
0082 *buf++ = static_cast<unsigned char>((value >> 16) & mask);
0083 *buf++ = static_cast<unsigned char>((value >> 8) & mask);
0084 *buf++ = static_cast<unsigned char>(value & mask);
0085 } else {
0086 std::memcpy(buf, &value, sizeof(value));
0087 }
0088 }
0089
0090 inline uint32_t read_u32_network_order(const unsigned char *buf)
0091 {
0092 if (is_little_endian()) {
0093 return (static_cast<uint32_t>(buf[0]) << 24)
0094 + (static_cast<uint32_t>(buf[1]) << 16)
0095 + (static_cast<uint32_t>(buf[2]) << 8)
0096 + static_cast<uint32_t>(buf[3]);
0097 } else {
0098 uint32_t value;
0099 std::memcpy(&value, buf, sizeof(value));
0100 return value;
0101 }
0102 }
0103 }
0104
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115
0116
0117 template<class OutputIt>
0118 ZMQ_NODISCARD recv_result_t recv_multipart(socket_ref s,
0119 OutputIt out,
0120 recv_flags flags = recv_flags::none)
0121 {
0122 return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
0123 }
0124
0125
0126
0127
0128
0129
0130
0131
0132
0133
0134
0135
0136
0137
0138
0139 template<class OutputIt>
0140 ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s,
0141 OutputIt out,
0142 size_t n,
0143 recv_flags flags = recv_flags::none)
0144 {
0145 return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
0146 }
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159
0160 template<class Range
0161 #ifndef ZMQ_CPP11_PARTIAL
0162 ,
0163 typename = typename std::enable_if<
0164 detail::is_range<Range>::value
0165 && (std::is_same<detail::range_value_t<Range>, message_t>::value
0166 || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
0167 #endif
0168 >
0169 send_result_t
0170 send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
0171 {
0172 using std::begin;
0173 using std::end;
0174 auto it = begin(msgs);
0175 const auto end_it = end(msgs);
0176 size_t msg_count = 0;
0177 while (it != end_it) {
0178 const auto next = std::next(it);
0179 const auto msg_flags =
0180 flags | (next == end_it ? send_flags::none : send_flags::sndmore);
0181 if (!s.send(*it, msg_flags)) {
0182
0183 assert(it == begin(msgs));
0184 return {};
0185 }
0186 ++msg_count;
0187 it = next;
0188 }
0189 return msg_count;
0190 }
0191
0192
0193
0194
0195
0196
0197
0198
0199
0200
0201
0202
0203
0204
0205
0206
0207
0208
0209
0210
0211 template<class Range
0212 #ifndef ZMQ_CPP11_PARTIAL
0213 ,
0214 typename = typename std::enable_if<
0215 detail::is_range<Range>::value
0216 && (std::is_same<detail::range_value_t<Range>, message_t>::value
0217 || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
0218 #endif
0219 >
0220 message_t encode(const Range &parts)
0221 {
0222 size_t mmsg_size = 0;
0223
0224
0225 for (const auto &part : parts) {
0226 const size_t part_size = part.size();
0227 if (part_size > (std::numeric_limits<std::uint32_t>::max)()) {
0228
0229 throw std::range_error("Invalid size, message part too large");
0230 }
0231 const size_t count_size =
0232 part_size < (std::numeric_limits<std::uint8_t>::max)() ? 1 : 5;
0233 mmsg_size += part_size + count_size;
0234 }
0235
0236 message_t encoded(mmsg_size);
0237 unsigned char *buf = encoded.data<unsigned char>();
0238 for (const auto &part : parts) {
0239 const uint32_t part_size = static_cast<uint32_t>(part.size());
0240 const unsigned char *part_data =
0241 static_cast<const unsigned char *>(part.data());
0242
0243 if (part_size < (std::numeric_limits<std::uint8_t>::max)()) {
0244
0245 *buf++ = (unsigned char) part_size;
0246 } else {
0247
0248 *buf++ = (std::numeric_limits<uint8_t>::max)();
0249 detail::write_network_order(buf, part_size);
0250 buf += sizeof(part_size);
0251 }
0252 std::memcpy(buf, part_data, part_size);
0253 buf += part_size;
0254 }
0255
0256 assert(static_cast<size_t>(buf - encoded.data<unsigned char>()) == mmsg_size);
0257 return encoded;
0258 }
0259
0260
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271
0272
0273
0274
0275 template<class OutputIt> OutputIt decode(const message_t &encoded, OutputIt out)
0276 {
0277 const unsigned char *source = encoded.data<unsigned char>();
0278 const unsigned char *const limit = source + encoded.size();
0279
0280 while (source < limit) {
0281 size_t part_size = *source++;
0282 if (part_size == (std::numeric_limits<std::uint8_t>::max)()) {
0283 if (static_cast<size_t>(limit - source) < sizeof(uint32_t)) {
0284 throw std::out_of_range(
0285 "Malformed encoding, overflow in reading size");
0286 }
0287 part_size = detail::read_u32_network_order(source);
0288
0289 source += sizeof(uint32_t);
0290 }
0291
0292 if (static_cast<size_t>(limit - source) < part_size) {
0293 throw std::out_of_range("Malformed encoding, overflow in reading part");
0294 }
0295 *out = message_t(source, part_size);
0296 ++out;
0297 source += part_size;
0298 }
0299
0300 assert(source == limit);
0301 return out;
0302 }
0303
0304 #endif
0305
0306
0307 #ifdef ZMQ_HAS_RVALUE_REFS
0308
0309
0310
0311
0312
0313
0314
0315
0316 class multipart_t
0317 {
0318 private:
0319 std::deque<message_t> m_parts;
0320
0321 public:
0322 typedef std::deque<message_t>::value_type value_type;
0323
0324 typedef std::deque<message_t>::iterator iterator;
0325 typedef std::deque<message_t>::const_iterator const_iterator;
0326
0327 typedef std::deque<message_t>::reverse_iterator reverse_iterator;
0328 typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
0329
0330
0331 multipart_t() {}
0332
0333
0334 multipart_t(socket_ref socket) { recv(socket); }
0335
0336
0337 multipart_t(const void *src, size_t size) { addmem(src, size); }
0338
0339
0340 multipart_t(const std::string &string) { addstr(string); }
0341
0342
0343 multipart_t(message_t &&message) { add(std::move(message)); }
0344
0345
0346 multipart_t(multipart_t &&other) ZMQ_NOTHROW { m_parts = std::move(other.m_parts); }
0347
0348
0349 multipart_t &operator=(multipart_t &&other) ZMQ_NOTHROW
0350 {
0351 m_parts = std::move(other.m_parts);
0352 return *this;
0353 }
0354
0355
0356 virtual ~multipart_t() { clear(); }
0357
0358 message_t &operator[](size_t n) { return m_parts[n]; }
0359
0360 const message_t &operator[](size_t n) const { return m_parts[n]; }
0361
0362 message_t &at(size_t n) { return m_parts.at(n); }
0363
0364 const message_t &at(size_t n) const { return m_parts.at(n); }
0365
0366 iterator begin() { return m_parts.begin(); }
0367
0368 const_iterator begin() const { return m_parts.begin(); }
0369
0370 const_iterator cbegin() const { return m_parts.cbegin(); }
0371
0372 reverse_iterator rbegin() { return m_parts.rbegin(); }
0373
0374 const_reverse_iterator rbegin() const { return m_parts.rbegin(); }
0375
0376 iterator end() { return m_parts.end(); }
0377
0378 const_iterator end() const { return m_parts.end(); }
0379
0380 const_iterator cend() const { return m_parts.cend(); }
0381
0382 reverse_iterator rend() { return m_parts.rend(); }
0383
0384 const_reverse_iterator rend() const { return m_parts.rend(); }
0385
0386
0387 void clear() { m_parts.clear(); }
0388
0389
0390 size_t size() const { return m_parts.size(); }
0391
0392
0393 bool empty() const { return m_parts.empty(); }
0394
0395
0396 bool recv(socket_ref socket, int flags = 0)
0397 {
0398 clear();
0399 bool more = true;
0400 while (more) {
0401 message_t message;
0402 #ifdef ZMQ_CPP11
0403 if (!socket.recv(message, static_cast<recv_flags>(flags)))
0404 return false;
0405 #else
0406 if (!socket.recv(&message, flags))
0407 return false;
0408 #endif
0409 more = message.more();
0410 add(std::move(message));
0411 }
0412 return true;
0413 }
0414
0415
0416 bool send(socket_ref socket, int flags = 0)
0417 {
0418 flags &= ~(ZMQ_SNDMORE);
0419 bool more = size() > 0;
0420 while (more) {
0421 message_t message = pop();
0422 more = size() > 0;
0423 #ifdef ZMQ_CPP11
0424 if (!socket.send(message, static_cast<send_flags>(
0425 (more ? ZMQ_SNDMORE : 0) | flags)))
0426 return false;
0427 #else
0428 if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
0429 return false;
0430 #endif
0431 }
0432 clear();
0433 return true;
0434 }
0435
0436
0437 void prepend(multipart_t &&other)
0438 {
0439 while (!other.empty())
0440 push(other.remove());
0441 }
0442
0443
0444 void append(multipart_t &&other)
0445 {
0446 while (!other.empty())
0447 add(other.pop());
0448 }
0449
0450
0451 void pushmem(const void *src, size_t size)
0452 {
0453 m_parts.push_front(message_t(src, size));
0454 }
0455
0456
0457 void addmem(const void *src, size_t size)
0458 {
0459 m_parts.push_back(message_t(src, size));
0460 }
0461
0462
0463 void pushstr(const std::string &string)
0464 {
0465 m_parts.push_front(message_t(string.data(), string.size()));
0466 }
0467
0468
0469 void addstr(const std::string &string)
0470 {
0471 m_parts.push_back(message_t(string.data(), string.size()));
0472 }
0473
0474
0475 template<typename T> void pushtyp(const T &type)
0476 {
0477 static_assert(!std::is_same<T, std::string>::value,
0478 "Use pushstr() instead of pushtyp<std::string>()");
0479 m_parts.push_front(message_t(&type, sizeof(type)));
0480 }
0481
0482
0483 template<typename T> void addtyp(const T &type)
0484 {
0485 static_assert(!std::is_same<T, std::string>::value,
0486 "Use addstr() instead of addtyp<std::string>()");
0487 m_parts.push_back(message_t(&type, sizeof(type)));
0488 }
0489
0490
0491 void push(message_t &&message) { m_parts.push_front(std::move(message)); }
0492
0493
0494 void add(message_t &&message) { m_parts.push_back(std::move(message)); }
0495
0496
0497 void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
0498
0499
0500 std::string popstr()
0501 {
0502 std::string string(m_parts.front().data<char>(), m_parts.front().size());
0503 m_parts.pop_front();
0504 return string;
0505 }
0506
0507
0508 template<typename T> T poptyp()
0509 {
0510 static_assert(!std::is_same<T, std::string>::value,
0511 "Use popstr() instead of poptyp<std::string>()");
0512 if (sizeof(T) != m_parts.front().size())
0513 throw std::runtime_error(
0514 "Invalid type, size does not match the message size");
0515 T type = *m_parts.front().data<T>();
0516 m_parts.pop_front();
0517 return type;
0518 }
0519
0520
0521 message_t pop()
0522 {
0523 message_t message = std::move(m_parts.front());
0524 m_parts.pop_front();
0525 return message;
0526 }
0527
0528
0529 message_t remove()
0530 {
0531 message_t message = std::move(m_parts.back());
0532 m_parts.pop_back();
0533 return message;
0534 }
0535
0536
0537 const message_t &front() { return m_parts.front(); }
0538
0539
0540 const message_t &back() { return m_parts.back(); }
0541
0542
0543 const message_t *peek(size_t index) const { return &m_parts[index]; }
0544
0545
0546 std::string peekstr(size_t index) const
0547 {
0548 std::string string(m_parts[index].data<char>(), m_parts[index].size());
0549 return string;
0550 }
0551
0552
0553 template<typename T> T peektyp(size_t index) const
0554 {
0555 static_assert(!std::is_same<T, std::string>::value,
0556 "Use peekstr() instead of peektyp<std::string>()");
0557 if (sizeof(T) != m_parts[index].size())
0558 throw std::runtime_error(
0559 "Invalid type, size does not match the message size");
0560 T type = *m_parts[index].data<T>();
0561 return type;
0562 }
0563
0564
0565 template<typename T> static multipart_t create(const T &type)
0566 {
0567 multipart_t multipart;
0568 multipart.addtyp(type);
0569 return multipart;
0570 }
0571
0572
0573 multipart_t clone() const
0574 {
0575 multipart_t multipart;
0576 for (size_t i = 0; i < size(); i++)
0577 multipart.addmem(m_parts[i].data(), m_parts[i].size());
0578 return multipart;
0579 }
0580
0581
0582 std::string str() const
0583 {
0584 std::stringstream ss;
0585 for (size_t i = 0; i < m_parts.size(); i++) {
0586 const unsigned char *data = m_parts[i].data<unsigned char>();
0587 size_t size = m_parts[i].size();
0588
0589
0590 bool isText = true;
0591 for (size_t j = 0; j < size; j++) {
0592 if (data[j] < 32 || data[j] > 127) {
0593 isText = false;
0594 break;
0595 }
0596 }
0597 ss << "\n[" << std::dec << std::setw(3) << std::setfill('0') << size
0598 << "] ";
0599 if (size >= 1000) {
0600 ss << "... (too big to print)";
0601 continue;
0602 }
0603 for (size_t j = 0; j < size; j++) {
0604 if (isText)
0605 ss << static_cast<char>(data[j]);
0606 else
0607 ss << std::hex << std::setw(2) << std::setfill('0')
0608 << static_cast<short>(data[j]);
0609 }
0610 }
0611 return ss.str();
0612 }
0613
0614
0615 bool equal(const multipart_t *other) const ZMQ_NOTHROW
0616 {
0617 return *this == *other;
0618 }
0619
0620 bool operator==(const multipart_t &other) const ZMQ_NOTHROW
0621 {
0622 if (size() != other.size())
0623 return false;
0624 for (size_t i = 0; i < size(); i++)
0625 if (at(i) != other.at(i))
0626 return false;
0627 return true;
0628 }
0629
0630 bool operator!=(const multipart_t &other) const ZMQ_NOTHROW
0631 {
0632 return !(*this == other);
0633 }
0634
0635 #ifdef ZMQ_CPP11
0636
0637
0638 message_t encode() const { return zmq::encode(*this); }
0639
0640
0641 void decode_append(const message_t &encoded)
0642 {
0643 zmq::decode(encoded, std::back_inserter(*this));
0644 }
0645
0646
0647 static multipart_t decode(const message_t &encoded)
0648 {
0649 multipart_t tmp;
0650 zmq::decode(encoded, std::back_inserter(tmp));
0651 return tmp;
0652 }
0653
0654 #endif
0655
0656 private:
0657
0658 multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;
0659 void operator=(const multipart_t &other) ZMQ_DELETED_FUNCTION;
0660 };
0661
0662 inline std::ostream &operator<<(std::ostream &os, const multipart_t &msg)
0663 {
0664 return os << msg.str();
0665 }
0666
0667 #endif
0668
0669 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
0670 class active_poller_t
0671 {
0672 public:
0673 active_poller_t() = default;
0674 ~active_poller_t() = default;
0675
0676 active_poller_t(const active_poller_t &) = delete;
0677 active_poller_t &operator=(const active_poller_t &) = delete;
0678
0679 active_poller_t(active_poller_t &&src) = default;
0680 active_poller_t &operator=(active_poller_t &&src) = default;
0681
0682 using handler_type = std::function<void(event_flags)>;
0683
0684 void add(zmq::socket_ref socket, event_flags events, handler_type handler)
0685 {
0686 if (!handler)
0687 throw std::invalid_argument("null handler in active_poller_t::add");
0688 auto ret = handlers.emplace(
0689 socket, std::make_shared<handler_type>(std::move(handler)));
0690 if (!ret.second)
0691 throw error_t(EINVAL);
0692 try {
0693 base_poller.add(socket, events, ret.first->second.get());
0694 need_rebuild = true;
0695 }
0696 catch (...) {
0697
0698 handlers.erase(socket);
0699 throw;
0700 }
0701 }
0702
0703 void remove(zmq::socket_ref socket)
0704 {
0705 base_poller.remove(socket);
0706 handlers.erase(socket);
0707 need_rebuild = true;
0708 }
0709
0710 void modify(zmq::socket_ref socket, event_flags events)
0711 {
0712 base_poller.modify(socket, events);
0713 }
0714
0715 size_t wait(std::chrono::milliseconds timeout)
0716 {
0717 if (need_rebuild) {
0718 poller_events.resize(handlers.size());
0719 poller_handlers.clear();
0720 poller_handlers.reserve(handlers.size());
0721 for (const auto &handler : handlers) {
0722 poller_handlers.push_back(handler.second);
0723 }
0724 need_rebuild = false;
0725 }
0726 const auto count = base_poller.wait_all(poller_events, timeout);
0727 std::for_each(poller_events.begin(),
0728 poller_events.begin() + static_cast<ptrdiff_t>(count),
0729 [](decltype(base_poller)::event_type &event) {
0730 assert(event.user_data != nullptr);
0731 (*event.user_data)(event.events);
0732 });
0733 return count;
0734 }
0735
0736 ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); }
0737
0738 size_t size() const noexcept { return handlers.size(); }
0739
0740 private:
0741 bool need_rebuild{false};
0742
0743 poller_t<handler_type> base_poller{};
0744 std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
0745 std::vector<decltype(base_poller)::event_type> poller_events{};
0746 std::vector<std::shared_ptr<handler_type>> poller_handlers{};
0747 };
0748 #endif
0749
0750
0751 }
0752
0753 #endif