Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:08

0001 # The MQ Communications Module

0002 # This module provides classes for sending and receiving messages using ActiveMQ/Artemis.

0003 # It is designed to facilitate communication in the ePIC streaming testbed system.

0004 # It uses the `stomp.py` library for communication with the ActiveMQ server.

0005 
0006 import os, sys, stomp, ssl, time
0007 
0008 ###################################################################

0009 mq_user     = os.environ.get('MQ_USER',     None) # this will fail if not set

0010 mq_passwd   = os.environ.get('MQ_PASSWD',   None)
0011 
0012 mq_port     = int(os.environ.get('MQ_PORT', 61612))
0013 
0014 mq_host     = os.environ.get('MQ_HOST',     'pandaserver02.sdcc.bnl.gov')
0015 mq_cafile   = os.environ.get('MQ_CAFILE',   '')
0016 
0017 mq_subscription_name = os.environ.get('MQ_SUBSCRIPTION_NAME', 'epic_streaming_testbed')
0018 mq_topic    = os.environ.get('MQ_TOPIC',    'epictopic')
0019 
0020 
0021 ###################################################################

0022 class Messenger:
0023     """

0024     A messenger class for sending and receiving messages using ActiveMQ/Artemis,

0025     for communication with other components in the ePIC streaming testbed system.

0026 

0027     This class provides methods to connect to an ActiveMQ/Artemis server, send messages,

0028     subscribe to topics, and receive messages. It uses the `stomp.py` library

0029     for communication with the ActiveMQ server.

0030     """
0031 
0032     # ---

0033     def __init__(self, host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=None, verbose=False):
0034         self.host       = host
0035         self.port       = port
0036         self.username   = username
0037         self.password   = password
0038         self.client_id  = client_id
0039 
0040         if(not self.username or not self.password):
0041             raise ValueError("MQ_USER and MQ_PASSWD environment variables must be set.")
0042 
0043         if(mq_cafile == ''):
0044             raise ValueError("MQ_CAFILE environment variable must be set to a valid CA file path.")
0045 
0046         self.verbose = verbose
0047         if self.verbose:
0048             print(f"Initializing Messenger with host={self.host}, port={self.port}, username={self.username}")
0049         
0050         heartbeats = (5000, 10000) # (client, server) heartbeats in milliseconds

0051         self.conn = stomp.Connection(host_and_ports=[(host, port)], vhost=host,try_loopback_connect=False, heartbeats=heartbeats)
0052         if not self.conn: raise Exception("Connection object is not initialized.")
0053     
0054         # Set SSL parameters for the connection

0055         if not os.path.exists(mq_cafile):      
0056             raise FileNotFoundError(f"MQ_CAFILE '{mq_cafile}' does not exist.")
0057 
0058         self.conn.transport.set_ssl(
0059             for_hosts=[(mq_host, mq_port)],
0060             ca_certs=mq_cafile,
0061             ssl_version=ssl.PROTOCOL_TLS_CLIENT
0062         )
0063 
0064     # ---

0065     def disconnect(self):
0066         """Disconnect from the ActiveMQ server."""
0067         if self.conn:
0068             self.conn.disconnect()
0069 
0070     
0071     # ^ Upstream is commmon for sender and receiver ^

0072 
0073     # ---

0074     # The connect and send methods are intended to be overridden in subclasses.

0075     def connect(self):
0076         print('** Base class: Connecting to ActiveMQ server... **')
0077 
0078     # ---

0079     def send(self):
0080         print('** Base class: Sending message to ActiveMQ server... **')
0081 
0082 ###################################################################

0083 class Sender(Messenger):
0084     def __init__(self, host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=None, verbose=False):
0085         super().__init__(host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=client_id, verbose=verbose)
0086         if self.verbose:
0087             print(f"*** Initializing Sender with topic={mq_topic} ***")
0088 
0089     # ---

0090     def connect(self):
0091         if self.verbose: print('*** Sender connecting to ActiveMQ server... ***')
0092         try:
0093             self.conn.connect(login=self.username, passcode=self.password, wait=True, version='1.2')
0094             if self.conn.is_connected():
0095                 if self.verbose:
0096                     print("*** Sender connected to MQ server at {}:{} ***".format(self.host, self.port))
0097             else:
0098                 # if self.verbose:

0099                 print("*** Sender not connected to MQ server at {}:{} ***".format(self.host, self.port))
0100         except Exception as e:
0101             print("Sender connection failed:", type(e).__name__, e)
0102 
0103     # ---

0104     def send(self, destination=mq_topic, body='heartbeat', headers={'persistent': 'true'}):
0105         self.conn.send(destination=destination, body=body, headers=headers)
0106 
0107 ###################################################################

0108 class Listener(stomp.ConnectionListener):
0109     def __init__(self, processor=None, verbose=False):
0110         super().__init__()
0111         self.processor  = processor
0112         self.verbose    = verbose
0113 
0114 
0115     def on_connected(self, headers):
0116         if self.verbose:
0117             print(f'''*** Connected to broker: {headers} ***''')
0118 
0119     def on_message(self, frame):
0120         if self.processor:
0121             self.processor(frame.body)
0122 
0123     def on_error(self, frame):
0124         print(f"Error from broker: {frame}")
0125 
0126     def on_disconnected(self):
0127         print("Disconnected from broker")
0128 
0129 
0130 # ---

0131 # The Receiver class is a subclass of Messenger that is used to receive messages from the ActiveMQ server.

0132 # It inherits the connection and disconnection methods from Messenger and can be extended to add more functionality.

0133 
0134 class Receiver(Messenger):
0135     def __init__(self, host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=None, verbose=False, processor=None):
0136         super().__init__(host=mq_host, port=mq_port, username=mq_user, password=mq_passwd, client_id=client_id, verbose=verbose)
0137         self.processor = processor
0138         # self.client_id = client_id - should be done in the base.

0139         if self.verbose:
0140             print(f"*** Initializing Receiver with host={self.host}, port={self.port}, username={self.username}, client_id={self.client_id}, topic={mq_topic} ***")
0141 
0142     # ---

0143     def connect(self):
0144         # Attach listener

0145         self.conn.set_listener('', Listener(verbose=self.verbose, processor=self.processor))
0146         
0147         # Optionally, attach a debug listener:

0148         # self.conn.set_listener('debug', stomp.PrintingListener())

0149         # Connect with a durable client-id

0150         try:
0151             self.conn.connect(login=self.username, passcode=self.password, wait=True, version='1.2', headers={'client-id': self.client_id})
0152             if self.conn.is_connected():
0153                 if self.verbose: print("*** Receiver connected to MQ server at {}:{}, topic {} ***".format(self.host, self.port, mq_topic))
0154             else:
0155                 if self.verbose: print("*** Receiver not connected to MQ server at {}:{} ***".format(self.host, self.port))
0156         except Exception as e:
0157             print("Receiver connection failed:", type(e).__name__, e)
0158 
0159         # Subscribe with durable subscription name

0160         self.conn.subscribe(
0161             destination=mq_topic,
0162             id=1,
0163             ack='auto',
0164             headers={
0165                 'activemq.subscriptionName': mq_subscription_name,
0166                 'client-id': self.client_id
0167                 }
0168             )
0169