File indexing completed on 2025-01-18 10:17:19
0001
0002
0003
0004
0005
0006 #ifndef JANA2_ZMQDATASOURCE_H
0007 #define JANA2_ZMQDATASOURCE_H
0008
0009 #include <JANA/Streaming/JTransport.h>
0010
0011 #include <zmq.h>
0012 #include <errno.h>
0013
0014 class ZmqTransport : public JTransport {
0015
0016 public:
0017 ZmqTransport(std::string socket_name, bool publish = false)
0018 : m_socket_name(socket_name)
0019 , m_publish(publish) {}
0020
0021 ~ZmqTransport() {
0022 if (m_socket != nullptr) {
0023 zmq_close(m_socket);
0024 }
0025 }
0026
0027 void initialize() override {
0028 m_context = zmq_ctx_new();
0029 int result;
0030 if (m_publish) {
0031 m_socket = zmq_socket(m_context, ZMQ_PUB);
0032 result = zmq_bind(m_socket, m_socket_name.c_str());
0033 }
0034 else {
0035 m_socket = zmq_socket(m_context, ZMQ_SUB);
0036 result = zmq_connect(m_socket, m_socket_name.c_str());
0037 zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);
0038 }
0039 if (result == -1) {
0040 int errno_saved = errno;
0041 std::ostringstream os;
0042
0043 os << "Unable to " << (m_publish ? "bind" : "connect") << " to zmq socket " << m_socket_name << ": ";
0044
0045 switch (errno_saved) {
0046 case EINVAL: os << "Invalid endpoint"; break;
0047 case EPROTONOSUPPORT: os << "Transport protocol not supported"; break;
0048 case ENOCOMPATPROTO: os << "Transport protocol not compatible with socket type"; break;
0049 case EADDRINUSE: os << "Address in use"; break;
0050 case EADDRNOTAVAIL: os << "Address not available"; break;
0051 case ENODEV: os << "Address specifies nonexistent interface"; break;
0052 case ETERM: os << "Context associated with this socket was terminated"; break;
0053 case ENOTSOCK: os << "Invalid socket"; break;
0054 case EMTHREAD: os << "No I/O thread available"; break;
0055 }
0056 std::cout << os.str();
0057 throw JException(os.str());
0058 }
0059 };
0060
0061 JTransport::Result send(const JMessage& src_msg) override {
0062
0063 int rc = zmq_send(m_socket, src_msg.as_buffer(), src_msg.get_buffer_size(), 0);
0064
0065 if (rc == -1) {
0066 return JTransport::Result::FAILURE;
0067 }
0068 return JTransport::Result::SUCCESS;
0069 }
0070
0071 JTransport::Result receive(JMessage& dest_msg) override {
0072
0073 int rc_length = zmq_recv(m_socket, dest_msg.as_buffer(), dest_msg.get_buffer_capacity(), ZMQ_DONTWAIT);
0074
0075 if (rc_length == -1) {
0076 return JTransport::Result::TRY_AGAIN;
0077 }
0078
0079 if (dest_msg.is_end_of_stream()) {
0080 zmq_close(m_socket);
0081 m_socket = nullptr;
0082 return JTransport::Result::FINISHED;
0083 }
0084
0085 return JTransport::Result::SUCCESS;
0086 }
0087
0088 private:
0089 std::string m_socket_name = "tcp://127.0.0.1:5555";
0090 bool m_publish = false;
0091 void* m_context;
0092 void* m_socket;
0093 };
0094
0095
0096 #endif