Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:19

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #ifndef _ZmqTransport_h_
0007 #define _ZmqTransport_h_
0008 
0009 #include <JANA/Services/JParameterManager.h>
0010 #include <JANA/Streaming/JTransport.h>
0011 
0012 #include <zmq.h>
0013 #include <errno.h>
0014 
0015 class ZmqTransport : public JTransport {
0016 
0017 public:
0018 
0019     ZmqTransport(std::string socket_name, bool publish = false)
0020         : m_socket_name(socket_name)
0021         , m_publish(publish) {}
0022 
0023     ~ZmqTransport() {
0024         if (m_socket != nullptr) {
0025             zmq_close(m_socket);
0026         }
0027     }
0028 
0029     void initialize() override {
0030 
0031         m_context = zmq_ctx_new();
0032         int result;
0033         if (m_publish) {
0034             m_socket = zmq_socket(m_context, ZMQ_PUB);
0035             result = zmq_bind(m_socket, m_socket_name.c_str());
0036         }
0037         else {
0038             m_socket = zmq_socket(m_context, ZMQ_SUB);
0039             result = zmq_connect(m_socket, m_socket_name.c_str());
0040             zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);  // Subscribe to everything
0041         }
0042         if (result == -1) {
0043             int errno_saved = errno;
0044             std::ostringstream os;
0045             os << "Unable to " << (m_publish ? "bind" : "connect") << " to zmq socket " << m_socket_name << ": ";
0046             switch (errno_saved) {
0047                 case EINVAL: os << "Invalid endpoint"; break;
0048                 case EPROTONOSUPPORT: os << "Transport protocol not supported"; break;
0049                 case ENOCOMPATPROTO: os << "Transport protocol not compatible with socket type"; break;
0050                 case EADDRINUSE: os << "Address in use"; break;
0051                 case EADDRNOTAVAIL: os << "Address not available"; break;
0052                 case ENODEV: os << "Address specifies nonexistent interface"; break;
0053                 case ETERM: os << "Context associated with this socket was terminated"; break;
0054                 case ENOTSOCK: os << "Invalid socket"; break;
0055                 case EMTHREAD: os << "No I/O thread available"; break;
0056             }
0057             std::cout << os.str();
0058             throw JException(os.str());
0059         }
0060 
0061     };
0062 
0063     JTransport::Result send(const JMessage& src_msg) override {
0064 
0065         int rc = zmq_send(m_socket, src_msg.as_buffer(), src_msg.get_buffer_size(), 0);
0066         if (rc == -1) {
0067             return JTransport::Result::FAILURE;
0068         }
0069         return JTransport::Result::SUCCESS;
0070     }
0071 
0072     JTransport::Result receive(JMessage& dest_msg) override {
0073 
0074         int rc_length = zmq_recv(m_socket, dest_msg.as_buffer(), dest_msg.get_buffer_capacity(), ZMQ_DONTWAIT);
0075         if (rc_length == -1) {
0076             return JTransport::Result::TRY_AGAIN;
0077         }
0078         if (dest_msg.is_end_of_stream()) {
0079             zmq_close(m_socket);
0080             m_socket = nullptr;
0081             return JTransport::Result::FINISHED;
0082         }
0083         return JTransport::Result::SUCCESS;
0084     }
0085 
0086 private:
0087 
0088     std::string m_socket_name = "tcp://127.0.0.1:5555";
0089     bool m_publish = false;
0090     void* m_context;
0091     void* m_socket;
0092 
0093 };
0094 
0095 #endif // _ZmqTransport_h_