Back to home page

EIC code displayed by LXR

 
 

    


Warning, /swf-common-lib/mq_comms/README.md is written in an unsupported language. File is not indexed.

0001 # The ActiveMQ communications
0002 
0003 ## Purpose
0004 
0005 This is a general purpose package created to facilitate
0006 the interface with the _ActiveMQ_ broker. It is agnostic
0007 with regards to the contents of the messages sent and received.
0008 
0009 ## Environment variables
0010 
0011 `MQ_USER` and `MQ_PASSWD` environment variables need to be set
0012 for the package to work. Same goes for `MQ_CAFILE`, this needs
0013 to be the full path to the CA file.
0014 
0015 `MQ_HOST` and `MQ_PORT` have default values in the code which will work
0016 for testing right away.
0017 
0018 ## Classes
0019 
0020 The _Sender_ and _Receiver_ classes inherit their common
0021 functionality from the _Messenger_, the base class. They can
0022 be instantiated separately as needed, in a single or multiple
0023 applications and are agnostic with
0024 regards to the logic of the simulation.
0025 
0026 ## Messages
0027 
0028 Currently, the _base agent_ class in **common-lib** contains
0029 a set defined as follows:
0030 
0031 ```python
0032     WORKFLOW_MESSAGE_TYPES = {
0033         'run_imminent', 'start_run', 'pause_run', 'resume_run', 'end_run',
0034         'stf_gen', 'data_ready'
0035     }
0036 ```
0037 
0038 ## Basic Durable Subscription Example
0039 
0040 ```python
0041 import stomp
0042 import time
0043 
0044 class MyListener(stomp.ConnectionListener):
0045     def on_error(self, frame):
0046         print(f'Received an error: {frame.body}')
0047     
0048     def on_message(self, frame):
0049         print(f'Received message: {frame.body}')
0050         print(f'Headers: {frame.headers}')
0051 
0052 # Connection parameters
0053 host = 'localhost'
0054 port = 61613  # Default STOMP port for ActiveMQ
0055 client_id = 'my-client-id'  # Must be unique and persistent
0056 subscription_name = 'my-durable-sub'
0057 topic = '/topic/my.topic'
0058 
0059 # Create connection
0060 conn = stomp.Connection([(host, port)])
0061 
0062 # Set client ID for durable subscription (must be done before connect)
0063 conn.set_listener('', MyListener())
0064 
0065 # Connect with client-id header (required for durable subscriptions)
0066 conn.connect(wait=True, headers={'client-id': client_id})
0067 
0068 # Subscribe with durable subscription
0069 conn.subscribe(
0070     destination=topic,
0071     id=subscription_name,
0072     ack='auto',
0073     headers={
0074         'activemq.subscriptionName': subscription_name,
0075         'client-id': client_id
0076     }
0077 )
0078 
0079 print(f'Durable subscription created: {subscription_name}')
0080 print('Waiting for messages...')
0081 
0082 try:
0083     # Keep the connection alive
0084     while True:
0085         time.sleep(1)
0086 except KeyboardInterrupt:
0087     print('\nDisconnecting...')
0088     conn.disconnect()
0089 ```
0090 
0091 ## Key Points for Durable Subscriptions
0092 
0093 1. **Client ID**: Must be set and unique. This identifies the client across connections.
0094 
0095 2. **Subscription Name**: Use the `activemq.subscriptionName` header to name your durable subscription.
0096 
0097 3. **Topic (not Queue)**: Durable subscriptions only work with topics, not queues.
0098 
0099 4. **Persistence**: Messages sent to the topic while the subscriber is disconnected will be delivered when it reconnects (as long as it uses the same client-id and subscription name).
0100 
0101 ## Unsubscribing from a Durable Subscription
0102 
0103 To permanently remove a durable subscription:
0104 
0105 ```python
0106 conn.unsubscribe(
0107     id=subscription_name,
0108     headers={'activemq.subscriptionName': subscription_name}
0109 )
0110 ```
0111 
0112 ## Using ActiveMQ Artemis
0113 
0114 If you're using ActiveMQ Artemis (the newer version), the syntax is slightly different:
0115 
0116 ```python
0117 conn.subscribe(
0118     destination=topic,
0119     id=subscription_name,
0120     ack='auto',
0121     headers={
0122         'durable': 'true',
0123         'subscription-name': subscription_name
0124     }
0125 )
0126 ```
0127