File indexing completed on 2025-01-18 10:17:19
0001
0002
0003
0004
0005
0006 #ifndef _ZmqTransport_h_
0007 #define _ZmqTransport_h_
0008
0009 #include <JANA/Services/JParameterManager.h>
0010 #include <JANA/Streaming/JTransport.h>
0011
0012 #include <zmq.h>
0013 #include <errno.h>
0014
0015 class ZmqTransport : public JTransport {
0016
0017 public:
0018
0019 ZmqTransport(std::string socket_name, bool publish = false)
0020 : m_socket_name(socket_name)
0021 , m_publish(publish) {}
0022
0023 ~ZmqTransport() {
0024 if (m_socket != nullptr) {
0025 zmq_close(m_socket);
0026 }
0027 }
0028
0029 void initialize() override {
0030
0031 m_context = zmq_ctx_new();
0032 int result;
0033 if (m_publish) {
0034 m_socket = zmq_socket(m_context, ZMQ_PUB);
0035 result = zmq_bind(m_socket, m_socket_name.c_str());
0036 }
0037 else {
0038 m_socket = zmq_socket(m_context, ZMQ_SUB);
0039 result = zmq_connect(m_socket, m_socket_name.c_str());
0040 zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);
0041 }
0042 if (result == -1) {
0043 int errno_saved = errno;
0044 std::ostringstream os;
0045 os << "Unable to " << (m_publish ? "bind" : "connect") << " to zmq socket " << m_socket_name << ": ";
0046 switch (errno_saved) {
0047 case EINVAL: os << "Invalid endpoint"; break;
0048 case EPROTONOSUPPORT: os << "Transport protocol not supported"; break;
0049 case ENOCOMPATPROTO: os << "Transport protocol not compatible with socket type"; break;
0050 case EADDRINUSE: os << "Address in use"; break;
0051 case EADDRNOTAVAIL: os << "Address not available"; break;
0052 case ENODEV: os << "Address specifies nonexistent interface"; break;
0053 case ETERM: os << "Context associated with this socket was terminated"; break;
0054 case ENOTSOCK: os << "Invalid socket"; break;
0055 case EMTHREAD: os << "No I/O thread available"; break;
0056 }
0057 std::cout << os.str();
0058 throw JException(os.str());
0059 }
0060
0061 };
0062
0063 JTransport::Result send(const JMessage& src_msg) override {
0064
0065 int rc = zmq_send(m_socket, src_msg.as_buffer(), src_msg.get_buffer_size(), 0);
0066 if (rc == -1) {
0067 return JTransport::Result::FAILURE;
0068 }
0069 return JTransport::Result::SUCCESS;
0070 }
0071
0072 JTransport::Result receive(JMessage& dest_msg) override {
0073
0074 int rc_length = zmq_recv(m_socket, dest_msg.as_buffer(), dest_msg.get_buffer_capacity(), ZMQ_DONTWAIT);
0075 if (rc_length == -1) {
0076 return JTransport::Result::TRY_AGAIN;
0077 }
0078 if (dest_msg.is_end_of_stream()) {
0079 zmq_close(m_socket);
0080 m_socket = nullptr;
0081 return JTransport::Result::FINISHED;
0082 }
0083 return JTransport::Result::SUCCESS;
0084 }
0085
0086 private:
0087
0088 std::string m_socket_name = "tcp://127.0.0.1:5555";
0089 bool m_publish = false;
0090 void* m_context;
0091 void* m_socket;
0092
0093 };
0094
0095 #endif