File indexing completed on 2026-04-25 08:29:10
0001 import ssl
0002 import logging
0003 import threading
0004 from django.conf import settings
0005
0006 try:
0007 import stomp
0008 except ImportError:
0009 stomp = None
0010
0011 class ActiveMQConnectionManager:
0012 """
0013 Singleton connection manager for ActiveMQ integration.
0014 Handles connection lifecycle and ensures single connection per Django instance.
0015 """
0016 _instance = None
0017 _lock = threading.Lock()
0018
0019 def __new__(cls):
0020 if cls._instance is None:
0021 with cls._lock:
0022 if cls._instance is None:
0023 cls._instance = super().__new__(cls)
0024 return cls._instance
0025
0026 def __init__(self):
0027 if hasattr(self, 'initialized'):
0028 return
0029 self.conn = None
0030 self.listener = None
0031 self.initialized = True
0032 self.logger = logging.getLogger(__name__)
0033
0034 def connect(self):
0035 """Establish connection to ActiveMQ"""
0036 if stomp is None:
0037 self.logger.error("stomp.py library not available - cannot connect to ActiveMQ")
0038 return False
0039
0040 if self.conn and self.conn.is_connected():
0041 self.logger.debug("ActiveMQ already connected")
0042 return True
0043
0044 try:
0045 host = getattr(settings, 'ACTIVEMQ_HOST', 'localhost')
0046 port = getattr(settings, 'ACTIVEMQ_PORT', 61612)
0047
0048 self.logger.info(f"Connecting to ActiveMQ at {host}:{port}")
0049
0050
0051
0052 heartbeats = (5000, 10000)
0053 self.conn = stomp.Connection(
0054 host_and_ports=[(host, port)],
0055 vhost=host,
0056 try_loopback_connect=False,
0057 heartbeats=heartbeats
0058 )
0059
0060
0061 if getattr(settings, 'ACTIVEMQ_USE_SSL', False):
0062 self._configure_ssl(host, port)
0063
0064
0065 from .activemq_processor import WorkflowMessageProcessor
0066 self.listener = WorkflowMessageProcessor(self)
0067 self.conn.set_listener('', self.listener)
0068
0069
0070 user = getattr(settings, 'ACTIVEMQ_USER', 'admin')
0071 password = getattr(settings, 'ACTIVEMQ_PASSWORD', 'admin')
0072 topic = getattr(settings, 'ACTIVEMQ_HEARTBEAT_TOPIC', 'epictopic')
0073
0074 self.conn.connect(
0075 user,
0076 password,
0077 wait=True,
0078 version='1.1',
0079 headers={
0080 'client-id': 'swf-monitor-django'
0081 }
0082 )
0083
0084
0085 self.conn.subscribe(destination=topic, id=1, ack='auto')
0086
0087 self.logger.info(f"Successfully connected to ActiveMQ and subscribed to {topic}")
0088 return True
0089
0090 except Exception as e:
0091 self.logger.error(f"Failed to connect to ActiveMQ: {e}")
0092 self.conn = None
0093 return False
0094
0095 def _configure_ssl(self, host, port):
0096 """Configure SSL for ActiveMQ connection"""
0097 try:
0098 ssl_ca_certs = getattr(settings, 'ACTIVEMQ_SSL_CA_CERTS', '')
0099 ssl_cert_file = getattr(settings, 'ACTIVEMQ_SSL_CERT_FILE', '')
0100 ssl_key_file = getattr(settings, 'ACTIVEMQ_SSL_KEY_FILE', '')
0101
0102 if ssl_ca_certs:
0103 ssl_args = {
0104 'ca_certs': ssl_ca_certs,
0105 'ssl_version': ssl.PROTOCOL_TLS_CLIENT
0106 }
0107
0108
0109 if ssl_cert_file and ssl_key_file:
0110 ssl_args['certfile'] = ssl_cert_file
0111 ssl_args['keyfile'] = ssl_key_file
0112
0113 self.conn.transport.set_ssl(
0114 for_hosts=[(host, port)],
0115 **ssl_args
0116 )
0117 self.logger.info(f"SSL configured with CA certs: {ssl_ca_certs}")
0118 else:
0119 self.logger.warning("SSL enabled but no CA certificate file specified")
0120
0121 except Exception as e:
0122 self.logger.error(f"Failed to configure SSL: {e}")
0123 raise
0124
0125 def disconnect(self):
0126 """Disconnect from ActiveMQ"""
0127 if self.conn and self.conn.is_connected():
0128 try:
0129 self.conn.disconnect()
0130 self.logger.info("Disconnected from ActiveMQ")
0131 except Exception as e:
0132 self.logger.error(f"Error disconnecting from ActiveMQ: {e}")
0133 self.conn = None
0134 self.listener = None
0135
0136 def reconnect(self):
0137 """Attempt to reconnect to ActiveMQ"""
0138 self.logger.info("Attempting to reconnect to ActiveMQ...")
0139 self.disconnect()
0140 return self.connect()
0141
0142 def is_connected(self):
0143 """Check if connection is active"""
0144 return self.conn and self.conn.is_connected()
0145
0146 def send_message(self, destination: str, body: str) -> bool:
0147 """
0148 Send a message to a destination queue/topic.
0149
0150 Args:
0151 destination: Queue or topic name (e.g., '/queue/workflow_control')
0152 body: Message body (typically JSON string)
0153
0154 Returns:
0155 True if sent successfully, False otherwise
0156 """
0157 if not self.is_connected():
0158 if not self.connect():
0159 self.logger.error("Cannot send message - not connected to ActiveMQ")
0160 return False
0161
0162 try:
0163 self.conn.send(destination=destination, body=body)
0164 self.logger.info(f"Sent message to {destination}")
0165 return True
0166 except Exception as e:
0167 self.logger.error(f"Failed to send message to {destination}: {e}")
0168 return False