File indexing completed on 2026-04-25 08:29:10
0001 import json
0002 import logging
0003 import threading
0004 import time
0005 from django.utils import timezone
0006 from django.db import connection
0007 from .models import SystemAgent
0008 from .workflow_models import WorkflowMessage
0009 from django.conf import settings
0010 from channels.layers import get_channel_layer
0011 from asgiref.sync import async_to_sync
0012
0013 try:
0014 import stomp
0015 except ImportError:
0016 stomp = None
0017
0018 class WorkflowMessageProcessor(stomp.ConnectionListener if stomp else object):
0019 """
0020 ActiveMQ message processor that handles both heartbeat and workflow messages.
0021 Runs asynchronously within Django without blocking the web server.
0022 """
0023
0024 def __init__(self, connection_manager):
0025 self.logger = logging.getLogger(__name__)
0026 self.connection_manager = connection_manager
0027 self.reconnect_delay = 10
0028
0029 def on_message(self, frame):
0030 """Process incoming ActiveMQ messages"""
0031 try:
0032
0033 connection.close()
0034
0035 data = json.loads(frame.body)
0036
0037 if self._is_heartbeat_message(data):
0038 self._process_heartbeat(data)
0039 elif self._is_workflow_message(data):
0040 self._process_workflow_message(data, frame)
0041 else:
0042 self.logger.debug(f"Unrecognized message format: {frame.body}")
0043
0044 except json.JSONDecodeError:
0045 self.logger.error(f"Failed to decode JSON from message: {frame.body}")
0046 except Exception as e:
0047 self.logger.error(f"Error processing message: {e}")
0048 self.logger.debug(f"Message body: {frame.body}")
0049
0050 def on_error(self, frame):
0051 """Handle ActiveMQ errors"""
0052 self.logger.error(f"ActiveMQ error: {frame.body}")
0053
0054 def on_disconnected(self):
0055 """Handle disconnection from ActiveMQ"""
0056 self.logger.warning("Disconnected from ActiveMQ - scheduling reconnection")
0057
0058
0059 def delayed_reconnect():
0060 time.sleep(self.reconnect_delay)
0061 if not self.connection_manager.is_connected():
0062 self.connection_manager.reconnect()
0063
0064 thread = threading.Thread(target=delayed_reconnect, daemon=True)
0065 thread.start()
0066
0067 def _is_heartbeat_message(self, data):
0068 """Check if message is an agent heartbeat"""
0069 return 'agent_name' in data and 'status' in data
0070
0071 def _is_workflow_message(self, data):
0072 """Check if message is a workflow message"""
0073 return 'msg_type' in data
0074
0075 def _process_heartbeat(self, data):
0076 """Process agent heartbeat messages to update SystemAgent records"""
0077 agent_name = data.get('agent_name')
0078 status = data.get('status')
0079
0080 if not agent_name:
0081 self.logger.warning(f"Heartbeat message missing agent_name: {data}")
0082 return
0083
0084 try:
0085 agent, created = SystemAgent.objects.get_or_create(
0086 instance_name=agent_name,
0087 defaults={
0088 'agent_type': 'Unknown',
0089 'status': status if status else 'UNKNOWN',
0090 'last_heartbeat': timezone.now(),
0091 'workflow_enabled': True
0092 }
0093 )
0094
0095 if not created:
0096 if status:
0097 agent.status = status
0098 agent.last_heartbeat = timezone.now()
0099
0100 if not agent.workflow_enabled:
0101 agent.workflow_enabled = True
0102 agent.save()
0103
0104 self.logger.debug(f"Updated SystemAgent {agent_name} with status {agent.status}")
0105
0106 except Exception as e:
0107 self.logger.error(f"Error processing heartbeat for agent {agent_name}: {e}")
0108
0109 def _process_workflow_message(self, data, frame):
0110 """Process workflow messages and store them in WorkflowMessage model"""
0111 try:
0112 msg_type = data.get('msg_type')
0113 run_id = data.get('run_id')
0114 execution_id = data.get('execution_id')
0115 filename = data.get('filename')
0116
0117
0118 sender_agent = data.get('sender')
0119 namespace = data.get('namespace')
0120 recipient_agent = data.get('recipient')
0121
0122
0123 import os, socket
0124 metadata = {
0125 'created_by': f"{os.getcwd()}:{os.getpid()}",
0126 'process_pid': os.getpid(),
0127 'processed_at': timezone.now().isoformat(),
0128 'django_instance': os.environ.get('DJANGO_SETTINGS_MODULE', 'unknown')
0129 }
0130
0131
0132 workflow_message = WorkflowMessage.objects.create(
0133 workflow=None,
0134 message_type=msg_type,
0135 sender_agent=sender_agent,
0136 recipient_agent=recipient_agent,
0137 namespace=namespace,
0138 execution_id=execution_id,
0139 run_id=run_id,
0140 message_content=data,
0141 message_metadata=metadata,
0142 sent_at=timezone.now(),
0143 queue_name=getattr(frame, 'destination', 'epictopic'),
0144 is_successful=True
0145 )
0146
0147
0148 enriched = dict(data)
0149 enriched.setdefault('sender_agent', sender_agent)
0150 enriched.setdefault('recipient_agent', recipient_agent)
0151 enriched.setdefault('namespace', namespace)
0152 enriched.setdefault('queue_name', getattr(frame, 'destination', 'epictopic'))
0153 enriched.setdefault('sent_at', timezone.now().isoformat())
0154
0155
0156 try:
0157 channel_layer = get_channel_layer()
0158 if channel_layer is not None:
0159 group = getattr(settings, 'SSE_CHANNEL_GROUP', 'workflow_events')
0160 async_to_sync(channel_layer.group_send)(
0161 group,
0162 {"type": "broadcast", "payload": enriched}
0163 )
0164 except Exception as e:
0165 self.logger.debug(f"Channels group_send failed or unavailable: {e}")
0166
0167
0168 try:
0169 from .sse_views import SSEMessageBroadcaster
0170 broadcaster = SSEMessageBroadcaster()
0171 broadcaster.broadcast_message(enriched)
0172 except Exception as e:
0173 self.logger.debug(f"In-process SSE broadcast skipped/failed: {e}")
0174
0175 self.logger.info(f"Stored and relayed workflow message: {msg_type} for run {run_id}, filename {filename}")
0176
0177 except Exception as e:
0178 self.logger.error(f"Error processing workflow message: {e}")
0179 self.logger.debug(f"Message data: {data}")
0180