Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:10

0001 import json
0002 import logging
0003 import threading
0004 import time
0005 from django.utils import timezone
0006 from django.db import connection
0007 from .models import SystemAgent
0008 from .workflow_models import WorkflowMessage
0009 from django.conf import settings
0010 from channels.layers import get_channel_layer
0011 from asgiref.sync import async_to_sync
0012 
0013 try:
0014     import stomp
0015 except ImportError:
0016     stomp = None
0017 
0018 class WorkflowMessageProcessor(stomp.ConnectionListener if stomp else object):
0019     """
0020     ActiveMQ message processor that handles both heartbeat and workflow messages.
0021     Runs asynchronously within Django without blocking the web server.
0022     """
0023     
0024     def __init__(self, connection_manager):
0025         self.logger = logging.getLogger(__name__)
0026         self.connection_manager = connection_manager
0027         self.reconnect_delay = 10  # seconds
0028         
0029     def on_message(self, frame):
0030         """Process incoming ActiveMQ messages"""
0031         try:
0032             # Close any lingering database connections to prevent connection leaks
0033             connection.close()
0034             
0035             data = json.loads(frame.body)
0036             
0037             if self._is_heartbeat_message(data):
0038                 self._process_heartbeat(data)
0039             elif self._is_workflow_message(data):
0040                 self._process_workflow_message(data, frame)
0041             else:
0042                 self.logger.debug(f"Unrecognized message format: {frame.body}")
0043                 
0044         except json.JSONDecodeError:
0045             self.logger.error(f"Failed to decode JSON from message: {frame.body}")
0046         except Exception as e:
0047             self.logger.error(f"Error processing message: {e}")
0048             self.logger.debug(f"Message body: {frame.body}")
0049     
0050     def on_error(self, frame):
0051         """Handle ActiveMQ errors"""
0052         self.logger.error(f"ActiveMQ error: {frame.body}")
0053     
0054     def on_disconnected(self):
0055         """Handle disconnection from ActiveMQ"""
0056         self.logger.warning("Disconnected from ActiveMQ - scheduling reconnection")
0057         
0058         # Schedule reconnection in a separate thread to avoid blocking
0059         def delayed_reconnect():
0060             time.sleep(self.reconnect_delay)
0061             if not self.connection_manager.is_connected():
0062                 self.connection_manager.reconnect()
0063         
0064         thread = threading.Thread(target=delayed_reconnect, daemon=True)
0065         thread.start()
0066     
0067     def _is_heartbeat_message(self, data):
0068         """Check if message is an agent heartbeat"""
0069         return 'agent_name' in data and 'status' in data
0070     
0071     def _is_workflow_message(self, data):
0072         """Check if message is a workflow message"""
0073         return 'msg_type' in data
0074     
0075     def _process_heartbeat(self, data):
0076         """Process agent heartbeat messages to update SystemAgent records"""
0077         agent_name = data.get('agent_name')
0078         status = data.get('status')
0079         
0080         if not agent_name:
0081             self.logger.warning(f"Heartbeat message missing agent_name: {data}")
0082             return
0083         
0084         try:
0085             agent, created = SystemAgent.objects.get_or_create(
0086                 instance_name=agent_name,
0087                 defaults={
0088                     'agent_type': 'Unknown',
0089                     'status': status if status else 'UNKNOWN',
0090                     'last_heartbeat': timezone.now(),
0091                     'workflow_enabled': True  # All agents are workflow-enabled by default
0092                 }
0093             )
0094             
0095             if not created:
0096                 if status:
0097                     agent.status = status
0098                 agent.last_heartbeat = timezone.now()
0099                 # Ensure existing agents are marked as workflow-enabled
0100                 if not agent.workflow_enabled:
0101                     agent.workflow_enabled = True
0102                 agent.save()
0103             
0104             self.logger.debug(f"Updated SystemAgent {agent_name} with status {agent.status}")
0105             
0106         except Exception as e:
0107             self.logger.error(f"Error processing heartbeat for agent {agent_name}: {e}")
0108     
0109     def _process_workflow_message(self, data, frame):
0110         """Process workflow messages and store them in WorkflowMessage model"""
0111         try:
0112             msg_type = data.get('msg_type')
0113             run_id = data.get('run_id')
0114             execution_id = data.get('execution_id')
0115             filename = data.get('filename')
0116 
0117             # Extract sender, namespace, and recipient from message
0118             sender_agent = data.get('sender')
0119             namespace = data.get('namespace')
0120             recipient_agent = data.get('recipient')
0121             
0122             # Create metadata with originator tracking
0123             import os, socket
0124             metadata = {
0125                 'created_by': f"{os.getcwd()}:{os.getpid()}",  # Include PID for better instance tracking
0126                 'process_pid': os.getpid(),
0127                 'processed_at': timezone.now().isoformat(),
0128                 'django_instance': os.environ.get('DJANGO_SETTINGS_MODULE', 'unknown')
0129             }
0130 
0131             # Create WorkflowMessage record
0132             workflow_message = WorkflowMessage.objects.create(
0133                 workflow=None,
0134                 message_type=msg_type,
0135                 sender_agent=sender_agent,
0136                 recipient_agent=recipient_agent,
0137                 namespace=namespace,
0138                 execution_id=execution_id,
0139                 run_id=run_id,
0140                 message_content=data,
0141                 message_metadata=metadata,
0142                 sent_at=timezone.now(),
0143                 queue_name=getattr(frame, 'destination', 'epictopic'),
0144                 is_successful=True  # Assume successful since we received it
0145             )
0146 
0147             # Enrich message for downstream consumers (SSE filters rely on these)
0148             enriched = dict(data)
0149             enriched.setdefault('sender_agent', sender_agent)
0150             enriched.setdefault('recipient_agent', recipient_agent)
0151             enriched.setdefault('namespace', namespace)
0152             enriched.setdefault('queue_name', getattr(frame, 'destination', 'epictopic'))
0153             enriched.setdefault('sent_at', timezone.now().isoformat())
0154 
0155             # Publish to Channels group for cross-process fanout (preferred)
0156             try:
0157                 channel_layer = get_channel_layer()
0158                 if channel_layer is not None:
0159                     group = getattr(settings, 'SSE_CHANNEL_GROUP', 'workflow_events')
0160                     async_to_sync(channel_layer.group_send)(
0161                         group,
0162                         {"type": "broadcast", "payload": enriched}
0163                     )
0164             except Exception as e:
0165                 self.logger.debug(f"Channels group_send failed or unavailable: {e}")
0166 
0167             # Also attempt in-process broadcast (useful in single-process dev)
0168             try:
0169                 from .sse_views import SSEMessageBroadcaster
0170                 broadcaster = SSEMessageBroadcaster()
0171                 broadcaster.broadcast_message(enriched)
0172             except Exception as e:
0173                 self.logger.debug(f"In-process SSE broadcast skipped/failed: {e}")
0174             
0175             self.logger.info(f"Stored and relayed workflow message: {msg_type} for run {run_id}, filename {filename}")
0176             
0177         except Exception as e:
0178             self.logger.error(f"Error processing workflow message: {e}")
0179             self.logger.debug(f"Message data: {data}")
0180