Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2025-01-18 10:17:19

0001 
0002 // Copyright 2020, Jefferson Science Associates, LLC.
0003 // Subject to the terms in the LICENSE file found in the top-level directory.
0004 
0005 
0006 #ifndef JANA2_ZMQDATASOURCE_H
0007 #define JANA2_ZMQDATASOURCE_H
0008 
0009 #include <JANA/Streaming/JTransport.h>
0010 
0011 #include <zmq.h>
0012 #include <errno.h>
0013 
0014 class ZmqTransport : public JTransport {
0015 
0016 public:
0017     ZmqTransport(std::string socket_name, bool publish = false)
0018         : m_socket_name(socket_name)
0019         , m_publish(publish) {}
0020 
0021     ~ZmqTransport() {
0022         if (m_socket != nullptr) {
0023             zmq_close(m_socket);
0024         }
0025     }
0026 
0027     void initialize() override {
0028         m_context = zmq_ctx_new();
0029         int result;
0030         if (m_publish) {
0031             m_socket = zmq_socket(m_context, ZMQ_PUB);
0032             result = zmq_bind(m_socket, m_socket_name.c_str());
0033         }
0034         else {
0035             m_socket = zmq_socket(m_context, ZMQ_SUB);
0036             result = zmq_connect(m_socket, m_socket_name.c_str());
0037             zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);  // Subscribe to everything
0038         }
0039         if (result == -1) {
0040             int errno_saved = errno;
0041             std::ostringstream os;
0042 
0043             os << "Unable to " << (m_publish ? "bind" : "connect") << " to zmq socket " << m_socket_name << ": ";
0044 
0045             switch (errno_saved) {
0046                 case EINVAL: os << "Invalid endpoint"; break;
0047                 case EPROTONOSUPPORT: os << "Transport protocol not supported"; break;
0048                 case ENOCOMPATPROTO: os << "Transport protocol not compatible with socket type"; break;
0049                 case EADDRINUSE: os << "Address in use"; break;
0050                 case EADDRNOTAVAIL: os << "Address not available"; break;
0051                 case ENODEV: os << "Address specifies nonexistent interface"; break;
0052                 case ETERM: os << "Context associated with this socket was terminated"; break;
0053                 case ENOTSOCK: os << "Invalid socket"; break;
0054                 case EMTHREAD: os << "No I/O thread available"; break;
0055             }
0056             std::cout << os.str();
0057             throw JException(os.str());
0058         }
0059     };
0060 
0061     JTransport::Result send(const JMessage& src_msg) override {
0062 
0063         int rc = zmq_send(m_socket, src_msg.as_buffer(), src_msg.get_buffer_size(), 0);
0064 
0065         if (rc == -1) {
0066             return JTransport::Result::FAILURE;
0067         }
0068         return JTransport::Result::SUCCESS;
0069     }
0070 
0071     JTransport::Result receive(JMessage& dest_msg) override {
0072 
0073         int rc_length = zmq_recv(m_socket, dest_msg.as_buffer(), dest_msg.get_buffer_capacity(), ZMQ_DONTWAIT);
0074 
0075         if (rc_length == -1) {
0076             return JTransport::Result::TRY_AGAIN;
0077         }
0078 
0079         if (dest_msg.is_end_of_stream()) {
0080             zmq_close(m_socket);
0081             m_socket = nullptr;
0082             return JTransport::Result::FINISHED;
0083         }
0084 
0085         return JTransport::Result::SUCCESS;
0086     }
0087 
0088 private:
0089     std::string m_socket_name = "tcp://127.0.0.1:5555";
0090     bool m_publish = false;
0091     void* m_context;
0092     void* m_socket;
0093 };
0094 
0095 
0096 #endif //JANA2_ZMQDATASOURCE_H