File indexing completed on 2026-04-25 08:29:08
0001
0002
0003
0004
0005
0006 import os, sys, stomp, ssl, time
0007
0008
0009 mq_user = os.environ.get('MQ_USER', None)
0010 mq_passwd = os.environ.get('MQ_PASSWD', None)
0011
0012 mq_port = int(os.environ.get('MQ_PORT', 61612))
0013
0014 mq_host = os.environ.get('MQ_HOST', 'pandaserver02.sdcc.bnl.gov')
0015 mq_cafile = os.environ.get('MQ_CAFILE', '')
0016
0017 mq_subscription_name = os.environ.get('MQ_SUBSCRIPTION_NAME', 'epic_streaming_testbed')
0018 mq_topic = os.environ.get('MQ_TOPIC', 'epictopic')
0019
0020
0021
0022 class Messenger:
0023 """
0024 A messenger class for sending and receiving messages using ActiveMQ/Artemis,
0025 for communication with other components in the ePIC streaming testbed system.
0026
0027 This class provides methods to connect to an ActiveMQ/Artemis server, send messages,
0028 subscribe to topics, and receive messages. It uses the `stomp.py` library
0029 for communication with the ActiveMQ server.
0030 """
0031
0032
0033 def __init__(self, host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=None, verbose=False):
0034 self.host = host
0035 self.port = port
0036 self.username = username
0037 self.password = password
0038 self.client_id = client_id
0039
0040 if(not self.username or not self.password):
0041 raise ValueError("MQ_USER and MQ_PASSWD environment variables must be set.")
0042
0043 if(mq_cafile == ''):
0044 raise ValueError("MQ_CAFILE environment variable must be set to a valid CA file path.")
0045
0046 self.verbose = verbose
0047 if self.verbose:
0048 print(f"Initializing Messenger with host={self.host}, port={self.port}, username={self.username}")
0049
0050 heartbeats = (5000, 10000)
0051 self.conn = stomp.Connection(host_and_ports=[(host, port)], vhost=host,try_loopback_connect=False, heartbeats=heartbeats)
0052 if not self.conn: raise Exception("Connection object is not initialized.")
0053
0054
0055 if not os.path.exists(mq_cafile):
0056 raise FileNotFoundError(f"MQ_CAFILE '{mq_cafile}' does not exist.")
0057
0058 self.conn.transport.set_ssl(
0059 for_hosts=[(mq_host, mq_port)],
0060 ca_certs=mq_cafile,
0061 ssl_version=ssl.PROTOCOL_TLS_CLIENT
0062 )
0063
0064
0065 def disconnect(self):
0066 """Disconnect from the ActiveMQ server."""
0067 if self.conn:
0068 self.conn.disconnect()
0069
0070
0071
0072
0073
0074
0075 def connect(self):
0076 print('** Base class: Connecting to ActiveMQ server... **')
0077
0078
0079 def send(self):
0080 print('** Base class: Sending message to ActiveMQ server... **')
0081
0082
0083 class Sender(Messenger):
0084 def __init__(self, host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=None, verbose=False):
0085 super().__init__(host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=client_id, verbose=verbose)
0086 if self.verbose:
0087 print(f"*** Initializing Sender with topic={mq_topic} ***")
0088
0089
0090 def connect(self):
0091 if self.verbose: print('*** Sender connecting to ActiveMQ server... ***')
0092 try:
0093 self.conn.connect(login=self.username, passcode=self.password, wait=True, version='1.2')
0094 if self.conn.is_connected():
0095 if self.verbose:
0096 print("*** Sender connected to MQ server at {}:{} ***".format(self.host, self.port))
0097 else:
0098
0099 print("*** Sender not connected to MQ server at {}:{} ***".format(self.host, self.port))
0100 except Exception as e:
0101 print("Sender connection failed:", type(e).__name__, e)
0102
0103
0104 def send(self, destination=mq_topic, body='heartbeat', headers={'persistent': 'true'}):
0105 self.conn.send(destination=destination, body=body, headers=headers)
0106
0107
0108 class Listener(stomp.ConnectionListener):
0109 def __init__(self, processor=None, verbose=False):
0110 super().__init__()
0111 self.processor = processor
0112 self.verbose = verbose
0113
0114
0115 def on_connected(self, headers):
0116 if self.verbose:
0117 print(f'''*** Connected to broker: {headers} ***''')
0118
0119 def on_message(self, frame):
0120 if self.processor:
0121 self.processor(frame.body)
0122
0123 def on_error(self, frame):
0124 print(f"Error from broker: {frame}")
0125
0126 def on_disconnected(self):
0127 print("Disconnected from broker")
0128
0129
0130
0131
0132
0133
0134 class Receiver(Messenger):
0135 def __init__(self, host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=None, verbose=False, processor=None):
0136 super().__init__(host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=client_id, verbose=verbose)
0137 self.processor = processor
0138
0139 if self.verbose:
0140 print(f"*** Initializing Receiver with host={self.host}, port={self.port}, username={self.username}, client_id={self.client_id}, topic={mq_topic} ***")
0141
0142
0143 def connect(self):
0144
0145 self.conn.set_listener('', Listener(verbose=self.verbose, processor=self.processor))
0146
0147
0148
0149
0150 try:
0151 self.conn.connect(login=self.username, passcode=self.password, wait=True, version='1.2', headers={'client-id': self.client_id})
0152 if self.conn.is_connected():
0153 if self.verbose: print("*** Receiver connected to MQ server at {}:{}, topic {} ***".format(self.host, self.port, mq_topic))
0154 else:
0155 if self.verbose: print("*** Receiver not connected to MQ server at {}:{} ***".format(self.host, self.port))
0156 except Exception as e:
0157 print("Receiver connection failed:", type(e).__name__, e)
0158
0159
0160 self.conn.subscribe(
0161 destination=mq_topic,
0162 id=1,
0163 ack='auto',
0164 headers={
0165 'activemq.subscriptionName': mq_subscription_name,
0166 'client-id': self.client_id
0167 }
0168 )
0169