Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2024-05-18 08:30:32

0001 /*
0002     Copyright (c) 2016-2017 ZeroMQ community
0003     Copyright (c) 2016 VOCA AS / Harald Nøkland
0004 
0005     Permission is hereby granted, free of charge, to any person obtaining a copy
0006     of this software and associated documentation files (the "Software"), to
0007     deal in the Software without restriction, including without limitation the
0008     rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
0009     sell copies of the Software, and to permit persons to whom the Software is
0010     furnished to do so, subject to the following conditions:
0011 
0012     The above copyright notice and this permission notice shall be included in
0013     all copies or substantial portions of the Software.
0014 
0015     THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
0016     IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
0017     FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
0018     AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
0019     LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
0020     FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
0021     IN THE SOFTWARE.
0022 */
0023 
0024 #ifndef __ZMQ_ADDON_HPP_INCLUDED__
0025 #define __ZMQ_ADDON_HPP_INCLUDED__
0026 
0027 #include "zmq.hpp"
0028 
0029 #include <deque>
0030 #include <iomanip>
0031 #include <sstream>
0032 #include <stdexcept>
0033 #ifdef ZMQ_CPP11
0034 #include <limits>
0035 #include <functional>
0036 #include <unordered_map>
0037 #endif
0038 
0039 namespace zmq
0040 {
0041 #ifdef ZMQ_CPP11
0042 
0043 namespace detail
0044 {
0045 template<bool CheckN, class OutputIt>
0046 recv_result_t
0047 recv_multipart_n(socket_ref s, OutputIt out, size_t n, recv_flags flags)
0048 {
0049     size_t msg_count = 0;
0050     message_t msg;
0051     while (true) {
0052         if ZMQ_CONSTEXPR_IF (CheckN) {
0053             if (msg_count >= n)
0054                 throw std::runtime_error(
0055                   "Too many message parts in recv_multipart_n");
0056         }
0057         if (!s.recv(msg, flags)) {
0058             // zmq ensures atomic delivery of messages
0059             assert(msg_count == 0);
0060             return {};
0061         }
0062         ++msg_count;
0063         const bool more = msg.more();
0064         *out++ = std::move(msg);
0065         if (!more)
0066             break;
0067     }
0068     return msg_count;
0069 }
0070 
0071 inline bool is_little_endian()
0072 {
0073     const uint16_t i = 0x01;
0074     return *reinterpret_cast<const uint8_t *>(&i) == 0x01;
0075 }
0076 
0077 inline void write_network_order(unsigned char *buf, const uint32_t value)
0078 {
0079     if (is_little_endian()) {
0080         ZMQ_CONSTEXPR_VAR uint32_t mask = std::numeric_limits<std::uint8_t>::max();
0081         *buf++ = static_cast<unsigned char>((value >> 24) & mask);
0082         *buf++ = static_cast<unsigned char>((value >> 16) & mask);
0083         *buf++ = static_cast<unsigned char>((value >> 8) & mask);
0084         *buf++ = static_cast<unsigned char>(value & mask);
0085     } else {
0086         std::memcpy(buf, &value, sizeof(value));
0087     }
0088 }
0089 
0090 inline uint32_t read_u32_network_order(const unsigned char *buf)
0091 {
0092     if (is_little_endian()) {
0093         return (static_cast<uint32_t>(buf[0]) << 24)
0094                + (static_cast<uint32_t>(buf[1]) << 16)
0095                + (static_cast<uint32_t>(buf[2]) << 8)
0096                + static_cast<uint32_t>(buf[3]);
0097     } else {
0098         uint32_t value;
0099         std::memcpy(&value, buf, sizeof(value));
0100         return value;
0101     }
0102 }
0103 } // namespace detail
0104 
0105 /*  Receive a multipart message.
0106     
0107     Writes the zmq::message_t objects to OutputIterator out.
0108     The out iterator must handle an unspecified number of writes,
0109     e.g. by using std::back_inserter.
0110     
0111     Returns: the number of messages received or nullopt (on EAGAIN).
0112     Throws: if recv throws. Any exceptions thrown
0113     by the out iterator will be propagated and the message
0114     may have been only partially received with pending
0115     message parts. It is adviced to close this socket in that event.
0116 */
0117 template<class OutputIt>
0118 ZMQ_NODISCARD recv_result_t recv_multipart(socket_ref s,
0119                                            OutputIt out,
0120                                            recv_flags flags = recv_flags::none)
0121 {
0122     return detail::recv_multipart_n<false>(s, std::move(out), 0, flags);
0123 }
0124 
0125 /*  Receive a multipart message.
0126     
0127     Writes at most n zmq::message_t objects to OutputIterator out.
0128     If the number of message parts of the incoming message exceeds n
0129     then an exception will be thrown.
0130     
0131     Returns: the number of messages received or nullopt (on EAGAIN).
0132     Throws: if recv throws. Throws std::runtime_error if the number
0133     of message parts exceeds n (exactly n messages will have been written
0134     to out). Any exceptions thrown
0135     by the out iterator will be propagated and the message
0136     may have been only partially received with pending
0137     message parts. It is adviced to close this socket in that event.
0138 */
0139 template<class OutputIt>
0140 ZMQ_NODISCARD recv_result_t recv_multipart_n(socket_ref s,
0141                                              OutputIt out,
0142                                              size_t n,
0143                                              recv_flags flags = recv_flags::none)
0144 {
0145     return detail::recv_multipart_n<true>(s, std::move(out), n, flags);
0146 }
0147 
0148 /*  Send a multipart message.
0149     
0150     The range must be a ForwardRange of zmq::message_t,
0151     zmq::const_buffer or zmq::mutable_buffer.
0152     The flags may be zmq::send_flags::sndmore if there are 
0153     more message parts to be sent after the call to this function.
0154     
0155     Returns: the number of messages sent (exactly msgs.size()) or nullopt (on EAGAIN).
0156     Throws: if send throws. Any exceptions thrown
0157     by the msgs range will be propagated and the message
0158     may have been only partially sent. It is adviced to close this socket in that event.
0159 */
0160 template<class Range
0161 #ifndef ZMQ_CPP11_PARTIAL
0162          ,
0163          typename = typename std::enable_if<
0164            detail::is_range<Range>::value
0165            && (std::is_same<detail::range_value_t<Range>, message_t>::value
0166                || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
0167 #endif
0168          >
0169 send_result_t
0170 send_multipart(socket_ref s, Range &&msgs, send_flags flags = send_flags::none)
0171 {
0172     using std::begin;
0173     using std::end;
0174     auto it = begin(msgs);
0175     const auto end_it = end(msgs);
0176     size_t msg_count = 0;
0177     while (it != end_it) {
0178         const auto next = std::next(it);
0179         const auto msg_flags =
0180           flags | (next == end_it ? send_flags::none : send_flags::sndmore);
0181         if (!s.send(*it, msg_flags)) {
0182             // zmq ensures atomic delivery of messages
0183             assert(it == begin(msgs));
0184             return {};
0185         }
0186         ++msg_count;
0187         it = next;
0188     }
0189     return msg_count;
0190 }
0191 
0192 /* Encode a multipart message.
0193 
0194    The range must be a ForwardRange of zmq::message_t.  A
0195    zmq::multipart_t or STL container may be passed for encoding.
0196 
0197    Returns: a zmq::message_t holding the encoded multipart data.
0198 
0199    Throws: std::range_error is thrown if the size of any single part
0200    can not fit in an unsigned 32 bit integer.
0201 
0202    The encoding is compatible with that used by the CZMQ function
0203    zmsg_encode(), see https://rfc.zeromq.org/spec/50/.
0204    Each part consists of a size followed by the data.
0205    These are placed contiguously into the output message.  A part of
0206    size less than 255 bytes will have a single byte size value.
0207    Larger parts will have a five byte size value with the first byte
0208    set to 0xFF and the remaining four bytes holding the size of the
0209    part's data.
0210 */
0211 template<class Range
0212 #ifndef ZMQ_CPP11_PARTIAL
0213          ,
0214          typename = typename std::enable_if<
0215            detail::is_range<Range>::value
0216            && (std::is_same<detail::range_value_t<Range>, message_t>::value
0217                || detail::is_buffer<detail::range_value_t<Range>>::value)>::type
0218 #endif
0219          >
0220 message_t encode(const Range &parts)
0221 {
0222     size_t mmsg_size = 0;
0223 
0224     // First pass check sizes
0225     for (const auto &part : parts) {
0226         const size_t part_size = part.size();
0227         if (part_size > std::numeric_limits<std::uint32_t>::max()) {
0228             // Size value must fit into uint32_t.
0229             throw std::range_error("Invalid size, message part too large");
0230         }
0231         const size_t count_size =
0232           part_size < std::numeric_limits<std::uint8_t>::max() ? 1 : 5;
0233         mmsg_size += part_size + count_size;
0234     }
0235 
0236     message_t encoded(mmsg_size);
0237     unsigned char *buf = encoded.data<unsigned char>();
0238     for (const auto &part : parts) {
0239         const uint32_t part_size = static_cast<uint32_t>(part.size());
0240         const unsigned char *part_data =
0241           static_cast<const unsigned char *>(part.data());
0242 
0243         if (part_size < std::numeric_limits<std::uint8_t>::max()) {
0244             // small part
0245             *buf++ = (unsigned char) part_size;
0246         } else {
0247             // big part
0248             *buf++ = std::numeric_limits<uint8_t>::max();
0249             detail::write_network_order(buf, part_size);
0250             buf += sizeof(part_size);
0251         }
0252         std::memcpy(buf, part_data, part_size);
0253         buf += part_size;
0254     }
0255 
0256     assert(static_cast<size_t>(buf - encoded.data<unsigned char>()) == mmsg_size);
0257     return encoded;
0258 }
0259 
0260 /*  Decode an encoded message to multiple parts.
0261 
0262     The given output iterator must be a ForwardIterator to a container
0263     holding zmq::message_t such as a zmq::multipart_t or various STL
0264     containers.
0265 
0266     Returns the ForwardIterator advanced once past the last decoded
0267     part.
0268 
0269     Throws: a std::out_of_range is thrown if the encoded part sizes
0270     lead to exceeding the message data bounds.
0271 
0272     The decoding assumes the message is encoded in the manner
0273     performed by zmq::encode(), see https://rfc.zeromq.org/spec/50/.
0274  */
0275 template<class OutputIt> OutputIt decode(const message_t &encoded, OutputIt out)
0276 {
0277     const unsigned char *source = encoded.data<unsigned char>();
0278     const unsigned char *const limit = source + encoded.size();
0279 
0280     while (source < limit) {
0281         size_t part_size = *source++;
0282         if (part_size == std::numeric_limits<std::uint8_t>::max()) {
0283             if (static_cast<size_t>(limit - source) < sizeof(uint32_t)) {
0284                 throw std::out_of_range(
0285                   "Malformed encoding, overflow in reading size");
0286             }
0287             part_size = detail::read_u32_network_order(source);
0288             // the part size is allowed to be less than 0xFF
0289             source += sizeof(uint32_t);
0290         }
0291 
0292         if (static_cast<size_t>(limit - source) < part_size) {
0293             throw std::out_of_range("Malformed encoding, overflow in reading part");
0294         }
0295         *out = message_t(source, part_size);
0296         ++out;
0297         source += part_size;
0298     }
0299 
0300     assert(source == limit);
0301     return out;
0302 }
0303 
0304 #endif
0305 
0306 
0307 #ifdef ZMQ_HAS_RVALUE_REFS
0308 
0309 /*
0310     This class handles multipart messaging. It is the C++ equivalent of zmsg.h,
0311     which is part of CZMQ (the high-level C binding). Furthermore, it is a major
0312     improvement compared to zmsg.hpp, which is part of the examples in the ØMQ
0313     Guide. Unnecessary copying is avoided by using move semantics to efficiently
0314     add/remove parts.
0315 */
0316 class multipart_t
0317 {
0318   private:
0319     std::deque<message_t> m_parts;
0320 
0321   public:
0322     typedef std::deque<message_t>::value_type value_type;
0323 
0324     typedef std::deque<message_t>::iterator iterator;
0325     typedef std::deque<message_t>::const_iterator const_iterator;
0326 
0327     typedef std::deque<message_t>::reverse_iterator reverse_iterator;
0328     typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator;
0329 
0330     // Default constructor
0331     multipart_t() {}
0332 
0333     // Construct from socket receive
0334     multipart_t(socket_ref socket) { recv(socket); }
0335 
0336     // Construct from memory block
0337     multipart_t(const void *src, size_t size) { addmem(src, size); }
0338 
0339     // Construct from string
0340     multipart_t(const std::string &string) { addstr(string); }
0341 
0342     // Construct from message part
0343     multipart_t(message_t &&message) { add(std::move(message)); }
0344 
0345     // Move constructor
0346     multipart_t(multipart_t &&other) { m_parts = std::move(other.m_parts); }
0347 
0348     // Move assignment operator
0349     multipart_t &operator=(multipart_t &&other)
0350     {
0351         m_parts = std::move(other.m_parts);
0352         return *this;
0353     }
0354 
0355     // Destructor
0356     virtual ~multipart_t() { clear(); }
0357 
0358     message_t &operator[](size_t n) { return m_parts[n]; }
0359 
0360     const message_t &operator[](size_t n) const { return m_parts[n]; }
0361 
0362     message_t &at(size_t n) { return m_parts.at(n); }
0363 
0364     const message_t &at(size_t n) const { return m_parts.at(n); }
0365 
0366     iterator begin() { return m_parts.begin(); }
0367 
0368     const_iterator begin() const { return m_parts.begin(); }
0369 
0370     const_iterator cbegin() const { return m_parts.cbegin(); }
0371 
0372     reverse_iterator rbegin() { return m_parts.rbegin(); }
0373 
0374     const_reverse_iterator rbegin() const { return m_parts.rbegin(); }
0375 
0376     iterator end() { return m_parts.end(); }
0377 
0378     const_iterator end() const { return m_parts.end(); }
0379 
0380     const_iterator cend() const { return m_parts.cend(); }
0381 
0382     reverse_iterator rend() { return m_parts.rend(); }
0383 
0384     const_reverse_iterator rend() const { return m_parts.rend(); }
0385 
0386     // Delete all parts
0387     void clear() { m_parts.clear(); }
0388 
0389     // Get number of parts
0390     size_t size() const { return m_parts.size(); }
0391 
0392     // Check if number of parts is zero
0393     bool empty() const { return m_parts.empty(); }
0394 
0395     // Receive multipart message from socket
0396     bool recv(socket_ref socket, int flags = 0)
0397     {
0398         clear();
0399         bool more = true;
0400         while (more) {
0401             message_t message;
0402 #ifdef ZMQ_CPP11
0403             if (!socket.recv(message, static_cast<recv_flags>(flags)))
0404                 return false;
0405 #else
0406             if (!socket.recv(&message, flags))
0407                 return false;
0408 #endif
0409             more = message.more();
0410             add(std::move(message));
0411         }
0412         return true;
0413     }
0414 
0415     // Send multipart message to socket
0416     bool send(socket_ref socket, int flags = 0)
0417     {
0418         flags &= ~(ZMQ_SNDMORE);
0419         bool more = size() > 0;
0420         while (more) {
0421             message_t message = pop();
0422             more = size() > 0;
0423 #ifdef ZMQ_CPP11
0424             if (!socket.send(message, static_cast<send_flags>(
0425                                         (more ? ZMQ_SNDMORE : 0) | flags)))
0426                 return false;
0427 #else
0428             if (!socket.send(message, (more ? ZMQ_SNDMORE : 0) | flags))
0429                 return false;
0430 #endif
0431         }
0432         clear();
0433         return true;
0434     }
0435 
0436     // Concatenate other multipart to front
0437     void prepend(multipart_t &&other)
0438     {
0439         while (!other.empty())
0440             push(other.remove());
0441     }
0442 
0443     // Concatenate other multipart to back
0444     void append(multipart_t &&other)
0445     {
0446         while (!other.empty())
0447             add(other.pop());
0448     }
0449 
0450     // Push memory block to front
0451     void pushmem(const void *src, size_t size)
0452     {
0453         m_parts.push_front(message_t(src, size));
0454     }
0455 
0456     // Push memory block to back
0457     void addmem(const void *src, size_t size)
0458     {
0459         m_parts.push_back(message_t(src, size));
0460     }
0461 
0462     // Push string to front
0463     void pushstr(const std::string &string)
0464     {
0465         m_parts.push_front(message_t(string.data(), string.size()));
0466     }
0467 
0468     // Push string to back
0469     void addstr(const std::string &string)
0470     {
0471         m_parts.push_back(message_t(string.data(), string.size()));
0472     }
0473 
0474     // Push type (fixed-size) to front
0475     template<typename T> void pushtyp(const T &type)
0476     {
0477         static_assert(!std::is_same<T, std::string>::value,
0478                       "Use pushstr() instead of pushtyp<std::string>()");
0479         m_parts.push_front(message_t(&type, sizeof(type)));
0480     }
0481 
0482     // Push type (fixed-size) to back
0483     template<typename T> void addtyp(const T &type)
0484     {
0485         static_assert(!std::is_same<T, std::string>::value,
0486                       "Use addstr() instead of addtyp<std::string>()");
0487         m_parts.push_back(message_t(&type, sizeof(type)));
0488     }
0489 
0490     // Push message part to front
0491     void push(message_t &&message) { m_parts.push_front(std::move(message)); }
0492 
0493     // Push message part to back
0494     void add(message_t &&message) { m_parts.push_back(std::move(message)); }
0495 
0496     // Alias to allow std::back_inserter()
0497     void push_back(message_t &&message) { m_parts.push_back(std::move(message)); }
0498 
0499     // Pop string from front
0500     std::string popstr()
0501     {
0502         std::string string(m_parts.front().data<char>(), m_parts.front().size());
0503         m_parts.pop_front();
0504         return string;
0505     }
0506 
0507     // Pop type (fixed-size) from front
0508     template<typename T> T poptyp()
0509     {
0510         static_assert(!std::is_same<T, std::string>::value,
0511                       "Use popstr() instead of poptyp<std::string>()");
0512         if (sizeof(T) != m_parts.front().size())
0513             throw std::runtime_error(
0514               "Invalid type, size does not match the message size");
0515         T type = *m_parts.front().data<T>();
0516         m_parts.pop_front();
0517         return type;
0518     }
0519 
0520     // Pop message part from front
0521     message_t pop()
0522     {
0523         message_t message = std::move(m_parts.front());
0524         m_parts.pop_front();
0525         return message;
0526     }
0527 
0528     // Pop message part from back
0529     message_t remove()
0530     {
0531         message_t message = std::move(m_parts.back());
0532         m_parts.pop_back();
0533         return message;
0534     }
0535 
0536     // get message part from front
0537     const message_t &front() { return m_parts.front(); }
0538 
0539     // get message part from back
0540     const message_t &back() { return m_parts.back(); }
0541 
0542     // Get pointer to a specific message part
0543     const message_t *peek(size_t index) const { return &m_parts[index]; }
0544 
0545     // Get a string copy of a specific message part
0546     std::string peekstr(size_t index) const
0547     {
0548         std::string string(m_parts[index].data<char>(), m_parts[index].size());
0549         return string;
0550     }
0551 
0552     // Peek type (fixed-size) from front
0553     template<typename T> T peektyp(size_t index) const
0554     {
0555         static_assert(!std::is_same<T, std::string>::value,
0556                       "Use peekstr() instead of peektyp<std::string>()");
0557         if (sizeof(T) != m_parts[index].size())
0558             throw std::runtime_error(
0559               "Invalid type, size does not match the message size");
0560         T type = *m_parts[index].data<T>();
0561         return type;
0562     }
0563 
0564     // Create multipart from type (fixed-size)
0565     template<typename T> static multipart_t create(const T &type)
0566     {
0567         multipart_t multipart;
0568         multipart.addtyp(type);
0569         return multipart;
0570     }
0571 
0572     // Copy multipart
0573     multipart_t clone() const
0574     {
0575         multipart_t multipart;
0576         for (size_t i = 0; i < size(); i++)
0577             multipart.addmem(m_parts[i].data(), m_parts[i].size());
0578         return multipart;
0579     }
0580 
0581     // Dump content to string
0582     std::string str() const
0583     {
0584         std::stringstream ss;
0585         for (size_t i = 0; i < m_parts.size(); i++) {
0586             const unsigned char *data = m_parts[i].data<unsigned char>();
0587             size_t size = m_parts[i].size();
0588 
0589             // Dump the message as text or binary
0590             bool isText = true;
0591             for (size_t j = 0; j < size; j++) {
0592                 if (data[j] < 32 || data[j] > 127) {
0593                     isText = false;
0594                     break;
0595                 }
0596             }
0597             ss << "\n[" << std::dec << std::setw(3) << std::setfill('0') << size
0598                << "] ";
0599             if (size >= 1000) {
0600                 ss << "... (too big to print)";
0601                 continue;
0602             }
0603             for (size_t j = 0; j < size; j++) {
0604                 if (isText)
0605                     ss << static_cast<char>(data[j]);
0606                 else
0607                     ss << std::hex << std::setw(2) << std::setfill('0')
0608                        << static_cast<short>(data[j]);
0609             }
0610         }
0611         return ss.str();
0612     }
0613 
0614     // Check if equal to other multipart
0615     bool equal(const multipart_t *other) const ZMQ_NOTHROW
0616     {
0617         return *this == *other;
0618     }
0619 
0620     bool operator==(const multipart_t &other) const ZMQ_NOTHROW
0621     {
0622         if (size() != other.size())
0623             return false;
0624         for (size_t i = 0; i < size(); i++)
0625             if (at(i) != other.at(i))
0626                 return false;
0627         return true;
0628     }
0629 
0630     bool operator!=(const multipart_t &other) const ZMQ_NOTHROW
0631     {
0632         return !(*this == other);
0633     }
0634 
0635 #ifdef ZMQ_CPP11
0636 
0637     // Return single part message_t encoded from this multipart_t.
0638     message_t encode() const { return zmq::encode(*this); }
0639 
0640     // Decode encoded message into multiple parts and append to self.
0641     void decode_append(const message_t &encoded)
0642     {
0643         zmq::decode(encoded, std::back_inserter(*this));
0644     }
0645 
0646     // Return a new multipart_t containing the decoded message_t.
0647     static multipart_t decode(const message_t &encoded)
0648     {
0649         multipart_t tmp;
0650         zmq::decode(encoded, std::back_inserter(tmp));
0651         return tmp;
0652     }
0653 
0654 #endif
0655 
0656   private:
0657     // Disable implicit copying (moving is more efficient)
0658     multipart_t(const multipart_t &other) ZMQ_DELETED_FUNCTION;
0659     void operator=(const multipart_t &other) ZMQ_DELETED_FUNCTION;
0660 }; // class multipart_t
0661 
0662 inline std::ostream &operator<<(std::ostream &os, const multipart_t &msg)
0663 {
0664     return os << msg.str();
0665 }
0666 
0667 #endif // ZMQ_HAS_RVALUE_REFS
0668 
0669 #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
0670 class active_poller_t
0671 {
0672   public:
0673     active_poller_t() = default;
0674     ~active_poller_t() = default;
0675 
0676     active_poller_t(const active_poller_t &) = delete;
0677     active_poller_t &operator=(const active_poller_t &) = delete;
0678 
0679     active_poller_t(active_poller_t &&src) = default;
0680     active_poller_t &operator=(active_poller_t &&src) = default;
0681 
0682     using handler_type = std::function<void(event_flags)>;
0683 
0684     void add(zmq::socket_ref socket, event_flags events, handler_type handler)
0685     {
0686         if (!handler)
0687             throw std::invalid_argument("null handler in active_poller_t::add");
0688         auto ret = handlers.emplace(
0689           socket, std::make_shared<handler_type>(std::move(handler)));
0690         if (!ret.second)
0691             throw error_t(EINVAL); // already added
0692         try {
0693             base_poller.add(socket, events, ret.first->second.get());
0694             need_rebuild = true;
0695         }
0696         catch (...) {
0697             // rollback
0698             handlers.erase(socket);
0699             throw;
0700         }
0701     }
0702 
0703     void remove(zmq::socket_ref socket)
0704     {
0705         base_poller.remove(socket);
0706         handlers.erase(socket);
0707         need_rebuild = true;
0708     }
0709 
0710     void modify(zmq::socket_ref socket, event_flags events)
0711     {
0712         base_poller.modify(socket, events);
0713     }
0714 
0715     size_t wait(std::chrono::milliseconds timeout)
0716     {
0717         if (need_rebuild) {
0718             poller_events.resize(handlers.size());
0719             poller_handlers.clear();
0720             poller_handlers.reserve(handlers.size());
0721             for (const auto &handler : handlers) {
0722                 poller_handlers.push_back(handler.second);
0723             }
0724             need_rebuild = false;
0725         }
0726         const auto count = base_poller.wait_all(poller_events, timeout);
0727         std::for_each(poller_events.begin(),
0728                       poller_events.begin() + static_cast<ptrdiff_t>(count),
0729                       [](decltype(base_poller)::event_type &event) {
0730                           assert(event.user_data != nullptr);
0731                           (*event.user_data)(event.events);
0732                       });
0733         return count;
0734     }
0735 
0736     ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); }
0737 
0738     size_t size() const noexcept { return handlers.size(); }
0739 
0740   private:
0741     bool need_rebuild{false};
0742 
0743     poller_t<handler_type> base_poller{};
0744     std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
0745     std::vector<decltype(base_poller)::event_type> poller_events{};
0746     std::vector<std::shared_ptr<handler_type>> poller_handlers{};
0747 };     // class active_poller_t
0748 #endif //  defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
0749 
0750 
0751 } // namespace zmq
0752 
0753 #endif // __ZMQ_ADDON_HPP_INCLUDED__