Warning, /swf-common-lib/mq_comms/README.md is written in an unsupported language. File is not indexed.
0001 # The ActiveMQ communications
0002
0003 ## Purpose
0004
0005 This is a general purpose package created to facilitate
0006 the interface with the _ActiveMQ_ broker. It is agnostic
0007 with regards to the contents of the messages sent and received.
0008
0009 ## Environment variables
0010
0011 `MQ_USER` and `MQ_PASSWD` environment variables need to be set
0012 for the package to work. Same goes for `MQ_CAFILE`, this needs
0013 to be the full path to the CA file.
0014
0015 `MQ_HOST` and `MQ_PORT` have default values in the code which will work
0016 for testing right away.
0017
0018 ## Classes
0019
0020 The _Sender_ and _Receiver_ classes inherit their common
0021 functionality from the _Messenger_, the base class. They can
0022 be instantiated separately as needed, in a single or multiple
0023 applications and are agnostic with
0024 regards to the logic of the simulation.
0025
0026 ## Messages
0027
0028 Currently, the _base agent_ class in **common-lib** contains
0029 a set defined as follows:
0030
0031 ```python
0032 WORKFLOW_MESSAGE_TYPES = {
0033 'run_imminent', 'start_run', 'pause_run', 'resume_run', 'end_run',
0034 'stf_gen', 'data_ready'
0035 }
0036 ```
0037
0038 ## Basic Durable Subscription Example
0039
0040 ```python
0041 import stomp
0042 import time
0043
0044 class MyListener(stomp.ConnectionListener):
0045 def on_error(self, frame):
0046 print(f'Received an error: {frame.body}')
0047
0048 def on_message(self, frame):
0049 print(f'Received message: {frame.body}')
0050 print(f'Headers: {frame.headers}')
0051
0052 # Connection parameters
0053 host = 'localhost'
0054 port = 61613 # Default STOMP port for ActiveMQ
0055 client_id = 'my-client-id' # Must be unique and persistent
0056 subscription_name = 'my-durable-sub'
0057 topic = '/topic/my.topic'
0058
0059 # Create connection
0060 conn = stomp.Connection([(host, port)])
0061
0062 # Set client ID for durable subscription (must be done before connect)
0063 conn.set_listener('', MyListener())
0064
0065 # Connect with client-id header (required for durable subscriptions)
0066 conn.connect(wait=True, headers={'client-id': client_id})
0067
0068 # Subscribe with durable subscription
0069 conn.subscribe(
0070 destination=topic,
0071 id=subscription_name,
0072 ack='auto',
0073 headers={
0074 'activemq.subscriptionName': subscription_name,
0075 'client-id': client_id
0076 }
0077 )
0078
0079 print(f'Durable subscription created: {subscription_name}')
0080 print('Waiting for messages...')
0081
0082 try:
0083 # Keep the connection alive
0084 while True:
0085 time.sleep(1)
0086 except KeyboardInterrupt:
0087 print('\nDisconnecting...')
0088 conn.disconnect()
0089 ```
0090
0091 ## Key Points for Durable Subscriptions
0092
0093 1. **Client ID**: Must be set and unique. This identifies the client across connections.
0094
0095 2. **Subscription Name**: Use the `activemq.subscriptionName` header to name your durable subscription.
0096
0097 3. **Topic (not Queue)**: Durable subscriptions only work with topics, not queues.
0098
0099 4. **Persistence**: Messages sent to the topic while the subscriber is disconnected will be delivered when it reconnects (as long as it uses the same client-id and subscription name).
0100
0101 ## Unsubscribing from a Durable Subscription
0102
0103 To permanently remove a durable subscription:
0104
0105 ```python
0106 conn.unsubscribe(
0107 id=subscription_name,
0108 headers={'activemq.subscriptionName': subscription_name}
0109 )
0110 ```
0111
0112 ## Using ActiveMQ Artemis
0113
0114 If you're using ActiveMQ Artemis (the newer version), the syntax is slightly different:
0115
0116 ```python
0117 conn.subscribe(
0118 destination=topic,
0119 id=subscription_name,
0120 ack='auto',
0121 headers={
0122 'durable': 'true',
0123 'subscription-name': subscription_name
0124 }
0125 )
0126 ```
0127