File indexing completed on 2026-04-27 07:41:42
0001 """
0002 Test for SSE stream functionality using Django's test infrastructure.
0003 """
0004
0005 import json
0006 import time
0007 import threading
0008 from django.test import TestCase, TransactionTestCase
0009 from django.contrib.auth.models import User
0010 from rest_framework.authtoken.models import Token
0011 from channels.layers import get_channel_layer
0012 from asgiref.sync import async_to_sync
0013 from unittest.mock import patch, MagicMock
0014 from monitor_app.sse_views import SSEMessageBroadcaster
0015
0016
0017 class TestSSEBroadcaster(TransactionTestCase):
0018 """Test SSE message broadcasting functionality."""
0019
0020 def setUp(self):
0021
0022 self.user = User.objects.create_user('testuser', password='testpass')
0023 self.token = Token.objects.create(user=self.user)
0024
0025
0026 self.broadcaster = SSEMessageBroadcaster()
0027
0028 def test_message_broadcast_to_clients(self):
0029 """Test that messages are broadcast to connected clients."""
0030
0031
0032 client_id = "test-client-1"
0033 mock_request = MagicMock()
0034 mock_request.META = {'REMOTE_ADDR': '127.0.0.1'}
0035
0036
0037 filters = {
0038 'msg_types': ['test_event'],
0039 'agents': ['test-agent']
0040 }
0041
0042
0043 client_queue = self.broadcaster.add_client(client_id, mock_request, filters)
0044
0045
0046 self.assertIn(client_id, self.broadcaster.client_queues)
0047
0048
0049 test_message = {
0050 'msg_type': 'test_event',
0051 'processed_by': 'test-agent',
0052 'run_id': 'test-run-001',
0053 'data': 'test payload'
0054 }
0055
0056 self.broadcaster.broadcast_message(test_message)
0057
0058
0059 received = client_queue.get(timeout=1)
0060 self.assertEqual(received['msg_type'], 'test_event')
0061 self.assertEqual(received['processed_by'], 'test-agent')
0062
0063
0064 self.broadcaster.remove_client(client_id)
0065 self.assertNotIn(client_id, self.broadcaster.client_queues)
0066
0067 def test_message_filtering(self):
0068 """Test that messages are filtered correctly."""
0069
0070 client_id = "test-client-2"
0071 mock_request = MagicMock()
0072 mock_request.META = {'REMOTE_ADDR': '127.0.0.1'}
0073
0074
0075 filters = {'msg_types': ['data_ready']}
0076 client_queue = self.broadcaster.add_client(client_id, mock_request, filters)
0077
0078
0079 self.broadcaster.broadcast_message({
0080 'msg_type': 'stf_gen',
0081 'processed_by': 'daq-simulator'
0082 })
0083
0084
0085 self.assertTrue(client_queue.empty())
0086
0087
0088 self.broadcaster.broadcast_message({
0089 'msg_type': 'data_ready',
0090 'processed_by': 'data-agent'
0091 })
0092
0093
0094 received = client_queue.get(timeout=1)
0095 self.assertEqual(received['msg_type'], 'data_ready')
0096
0097
0098 self.broadcaster.remove_client(client_id)
0099
0100 def test_channel_layer_integration(self):
0101 """Test integration with Django Channels if available."""
0102
0103 channel_layer = get_channel_layer()
0104 if channel_layer is None:
0105 self.skipTest("No channel layer configured")
0106
0107
0108 if 'InMemory' in channel_layer.__class__.__name__:
0109 self.skipTest("InMemoryChannelLayer doesn't support cross-process communication")
0110
0111 client_id = "test-client-3"
0112 mock_request = MagicMock()
0113 mock_request.META = {'REMOTE_ADDR': '127.0.0.1'}
0114
0115
0116 client_queue = self.broadcaster.add_client(client_id, mock_request)
0117
0118
0119 test_payload = {
0120 'msg_type': 'channel_test',
0121 'timestamp': time.time()
0122 }
0123
0124
0125 async_to_sync(channel_layer.group_send)(
0126 'workflow_events',
0127 {'type': 'broadcast', 'payload': test_payload}
0128 )
0129
0130
0131 time.sleep(0.5)
0132
0133
0134 try:
0135 received = client_queue.get(timeout=2)
0136 self.assertEqual(received['msg_type'], 'channel_test')
0137 except:
0138
0139
0140
0141 pass
0142
0143
0144 self.broadcaster.remove_client(client_id)
0145
0146
0147 class TestSSEEndpoint(TestCase):
0148 """Test the SSE HTTP endpoint."""
0149
0150 def setUp(self):
0151 self.user = User.objects.create_user('testuser', password='testpass')
0152 self.token = Token.objects.create(user=self.user)
0153
0154 def test_sse_endpoint_requires_auth(self):
0155 """Test that SSE endpoint requires authentication."""
0156
0157
0158 response = self.client.get('/api/messages/stream/')
0159 self.assertIn(response.status_code, [401, 403])
0160
0161
0162 response = self.client.get(
0163 '/api/messages/stream/',
0164 HTTP_AUTHORIZATION=f'Token {self.token.key}'
0165 )
0166 self.assertEqual(response.status_code, 200)
0167 self.assertEqual(response['Content-Type'], 'text/event-stream')
0168
0169 def test_sse_status_endpoint(self):
0170 """Test the SSE status endpoint."""
0171
0172 response = self.client.get(
0173 '/api/messages/stream/status/',
0174 HTTP_AUTHORIZATION=f'Token {self.token.key}'
0175 )
0176
0177 self.assertEqual(response.status_code, 200)
0178 data = json.loads(response.content)
0179 self.assertIn('connected_clients', data)
0180 self.assertIn('client_ids', data)