Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:10

0001 import ssl
0002 import logging
0003 import threading
0004 from django.conf import settings
0005 
0006 try:
0007     import stomp
0008 except ImportError:
0009     stomp = None
0010 
0011 class ActiveMQConnectionManager:
0012     """
0013     Singleton connection manager for ActiveMQ integration.
0014     Handles connection lifecycle and ensures single connection per Django instance.
0015     """
0016     _instance = None
0017     _lock = threading.Lock()
0018     
0019     def __new__(cls):
0020         if cls._instance is None:
0021             with cls._lock:
0022                 if cls._instance is None:
0023                     cls._instance = super().__new__(cls)
0024         return cls._instance
0025     
0026     def __init__(self):
0027         if hasattr(self, 'initialized'):
0028             return
0029         self.conn = None
0030         self.listener = None
0031         self.initialized = True
0032         self.logger = logging.getLogger(__name__)
0033     
0034     def connect(self):
0035         """Establish connection to ActiveMQ"""
0036         if stomp is None:
0037             self.logger.error("stomp.py library not available - cannot connect to ActiveMQ")
0038             return False
0039             
0040         if self.conn and self.conn.is_connected():
0041             self.logger.debug("ActiveMQ already connected")
0042             return True
0043             
0044         try:
0045             host = getattr(settings, 'ACTIVEMQ_HOST', 'localhost')
0046             port = getattr(settings, 'ACTIVEMQ_PORT', 61612)
0047             
0048             self.logger.info(f"Connecting to ActiveMQ at {host}:{port}")
0049             
0050             # Create connection matching working example agents
0051             # Use heartbeats parameter like swf-common-lib does
0052             heartbeats = (5000, 10000)  # (client, server) heartbeats in milliseconds
0053             self.conn = stomp.Connection(
0054                 host_and_ports=[(host, port)],
0055                 vhost=host, 
0056                 try_loopback_connect=False,
0057                 heartbeats=heartbeats
0058             )
0059             
0060             # Configure SSL if enabled - MUST be done before set_listener
0061             if getattr(settings, 'ACTIVEMQ_USE_SSL', False):
0062                 self._configure_ssl(host, port)
0063             
0064             # Set up message listener
0065             from .activemq_processor import WorkflowMessageProcessor
0066             self.listener = WorkflowMessageProcessor(self)
0067             self.conn.set_listener('', self.listener)
0068             
0069             # Connect and subscribe with proper STOMP version and headers
0070             user = getattr(settings, 'ACTIVEMQ_USER', 'admin')
0071             password = getattr(settings, 'ACTIVEMQ_PASSWORD', 'admin')
0072             topic = getattr(settings, 'ACTIVEMQ_HEARTBEAT_TOPIC', 'epictopic')
0073             
0074             self.conn.connect(
0075                 user,
0076                 password,
0077                 wait=True,
0078                 version='1.1',
0079                 headers={
0080                     'client-id': 'swf-monitor-django'
0081                 }
0082             )
0083 
0084             # Subscribe to workflow topic (broadcast messages from agents)
0085             self.conn.subscribe(destination=topic, id=1, ack='auto')
0086 
0087             self.logger.info(f"Successfully connected to ActiveMQ and subscribed to {topic}")
0088             return True
0089             
0090         except Exception as e:
0091             self.logger.error(f"Failed to connect to ActiveMQ: {e}")
0092             self.conn = None
0093             return False
0094     
0095     def _configure_ssl(self, host, port):
0096         """Configure SSL for ActiveMQ connection"""
0097         try:
0098             ssl_ca_certs = getattr(settings, 'ACTIVEMQ_SSL_CA_CERTS', '')
0099             ssl_cert_file = getattr(settings, 'ACTIVEMQ_SSL_CERT_FILE', '')
0100             ssl_key_file = getattr(settings, 'ACTIVEMQ_SSL_KEY_FILE', '')
0101             
0102             if ssl_ca_certs:
0103                 ssl_args = {
0104                     'ca_certs': ssl_ca_certs,
0105                     'ssl_version': ssl.PROTOCOL_TLS_CLIENT
0106                 }
0107                 
0108                 # Add client cert and key if provided
0109                 if ssl_cert_file and ssl_key_file:
0110                     ssl_args['certfile'] = ssl_cert_file
0111                     ssl_args['keyfile'] = ssl_key_file
0112                 
0113                 self.conn.transport.set_ssl(
0114                     for_hosts=[(host, port)],
0115                     **ssl_args
0116                 )
0117                 self.logger.info(f"SSL configured with CA certs: {ssl_ca_certs}")
0118             else:
0119                 self.logger.warning("SSL enabled but no CA certificate file specified")
0120                 
0121         except Exception as e:
0122             self.logger.error(f"Failed to configure SSL: {e}")
0123             raise
0124     
0125     def disconnect(self):
0126         """Disconnect from ActiveMQ"""
0127         if self.conn and self.conn.is_connected():
0128             try:
0129                 self.conn.disconnect()
0130                 self.logger.info("Disconnected from ActiveMQ")
0131             except Exception as e:
0132                 self.logger.error(f"Error disconnecting from ActiveMQ: {e}")
0133         self.conn = None
0134         self.listener = None
0135     
0136     def reconnect(self):
0137         """Attempt to reconnect to ActiveMQ"""
0138         self.logger.info("Attempting to reconnect to ActiveMQ...")
0139         self.disconnect()
0140         return self.connect()
0141     
0142     def is_connected(self):
0143         """Check if connection is active"""
0144         return self.conn and self.conn.is_connected()
0145 
0146     def send_message(self, destination: str, body: str) -> bool:
0147         """
0148         Send a message to a destination queue/topic.
0149 
0150         Args:
0151             destination: Queue or topic name (e.g., '/queue/workflow_control')
0152             body: Message body (typically JSON string)
0153 
0154         Returns:
0155             True if sent successfully, False otherwise
0156         """
0157         if not self.is_connected():
0158             if not self.connect():
0159                 self.logger.error("Cannot send message - not connected to ActiveMQ")
0160                 return False
0161 
0162         try:
0163             self.conn.send(destination=destination, body=body)
0164             self.logger.info(f"Sent message to {destination}")
0165             return True
0166         except Exception as e:
0167             self.logger.error(f"Failed to send message to {destination}: {e}")
0168             return False