File indexing completed on 2026-04-25 08:29:10
0001 from django.test import TestCase
0002 from django.urls import reverse
0003 from django.contrib.auth.models import User
0004 from django.utils import timezone
0005 from rest_framework.test import APITestCase, APIClient
0006 from rest_framework import status
0007 from .models import SystemAgent, AppLog, Run, StfFile, Subscriber
0008 from .serializers import AppLogSerializer
0009 from django.core.management import call_command
0010 from io import StringIO
0011 import logging
0012 import uuid
0013 import re
0014
0015 class SystemAgentAPITests(APITestCase):
0016 def setUp(self):
0017 unique_username = f"testuser_{uuid.uuid4()}"
0018 self.user = User.objects.create_user(username=unique_username, password='testpassword')
0019 self.client.force_authenticate(user=self.user)
0020 self.agent = SystemAgent.objects.create(instance_name='test_agent', agent_type='test', status='OK')
0021
0022 def test_list_agents(self):
0023 url = reverse('systemagent-list')
0024 response = self.client.get(url)
0025 self.assertEqual(response.status_code, status.HTTP_200_OK)
0026
0027 def test_create_agent(self):
0028 url = reverse('systemagent-list')
0029 data = {'instance_name': 'new_agent', 'agent_type': 'test', 'status': 'OK'}
0030 response = self.client.post(url, data, format='json')
0031 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0032
0033 def test_partial_update_agent(self):
0034 url = reverse('systemagent-detail', kwargs={'pk': self.agent.pk})
0035 data = {'status': 'ERROR'}
0036 response = self.client.patch(url, data, format='json')
0037 self.assertEqual(response.status_code, status.HTTP_200_OK)
0038 self.agent.refresh_from_db()
0039 self.assertEqual(self.agent.status, 'ERROR')
0040
0041 def test_delete_agent(self):
0042 url = reverse('systemagent-detail', kwargs={'pk': self.agent.pk})
0043 response = self.client.delete(url)
0044 self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
0045 self.assertFalse(SystemAgent.objects.filter(pk=self.agent.pk).exists())
0046
0047 def test_create_agent_bad_data(self):
0048 url = reverse('systemagent-list')
0049 data = {'instance_name': 'new_agent'}
0050 response = self.client.post(url, data, format='json')
0051 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0052
0053 def test_update_non_existent_agent(self):
0054 url = reverse('systemagent-detail', kwargs={'pk': 999})
0055 data = {'status': 'ERROR'}
0056 response = self.client.patch(url, data, format='json')
0057 self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
0058
0059 def test_delete_non_existent_agent(self):
0060 url = reverse('systemagent-detail', kwargs={'pk': 999})
0061 response = self.client.delete(url)
0062 self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
0063
0064 class AppLogAPITests(APITestCase):
0065 def setUp(self):
0066 unique_username = f"testuser_{uuid.uuid4()}"
0067 self.user = User.objects.create_user(username=unique_username, password='testpassword')
0068 self.client.force_authenticate(user=self.user)
0069 self.url = reverse('applog-list')
0070 self.log_data = {
0071 'app_name': 'test_app',
0072 'instance_name': 'test_instance',
0073 'timestamp': timezone.now().isoformat(),
0074 'level': logging.INFO,
0075 'levelname': 'INFO',
0076 'message': 'This is a test log message.',
0077 'module': 'test_module',
0078 'funcname': 'test_func',
0079 'lineno': 123,
0080 'process': 456,
0081 'thread': 789,
0082 }
0083
0084 def test_create_log(self):
0085 """
0086 Ensure we can create a new app log.
0087 """
0088 response = self.client.post(self.url, self.log_data, format='json')
0089 self.assertEqual(response.status_code, status.HTTP_201_CREATED, response.data)
0090 self.assertEqual(AppLog.objects.count(), 1)
0091 log = AppLog.objects.get()
0092 self.assertEqual(log.app_name, 'test_app')
0093 self.assertEqual(log.message, self.log_data['message'])
0094
0095 def test_create_log_invalid_level(self):
0096 """
0097 Ensure we get a bad request for an invalid log level.
0098 """
0099 data = self.log_data.copy()
0100 data['level'] = 999
0101 response = self.client.post(self.url, data, format='json')
0102 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0103
0104 def test_create_log_missing_field(self):
0105 """
0106 Ensure we get a bad request for missing a required field.
0107 """
0108 data = self.log_data.copy()
0109 del data['message']
0110 response = self.client.post(self.url, data, format='json')
0111 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0112
0113
0114 class AppLogUITests(TestCase):
0115 def setUp(self):
0116 unique_username = f"ui_user_{uuid.uuid4()}"
0117 self.user = User.objects.create_user(username=unique_username, password='password')
0118 self.client.login(username=unique_username, password='password')
0119 now = timezone.now()
0120 AppLog.objects.create(app_name='app1', instance_name='inst1', level=logging.INFO, message='info message 1', timestamp=now, levelname='INFO', module='m', funcname='f', lineno=1, process=1, thread=1)
0121 AppLog.objects.create(app_name='app1', instance_name='inst1', level=logging.WARNING, message='warning message 1', timestamp=now, levelname='WARNING', module='m', funcname='f', lineno=1, process=1, thread=1)
0122 AppLog.objects.create(app_name='app1', instance_name='inst2', level=logging.ERROR, message='error message 1', timestamp=now, levelname='ERROR', module='m', funcname='f', lineno=1, process=1, thread=1)
0123 AppLog.objects.create(app_name='app2', instance_name='inst1', level=logging.INFO, message='info message 2', timestamp=now, levelname='INFO', module='m', funcname='f', lineno=1, process=1, thread=1)
0124
0125 def test_log_list_view(self):
0126 response = self.client.get(reverse('monitor_app:log_list'))
0127 self.assertEqual(response.status_code, 200)
0128 html = response.content.decode()
0129
0130 self.assertIn('<html', html.lower())
0131
0132 rows = re.findall(r'<tr>.*?</tr>', html, re.DOTALL)
0133 self.assertTrue(len(rows) > 1)
0134
0135 def test_log_list_view_filtered(self):
0136 response = self.client.get(reverse('monitor_app:log_list') + '?app_name=app1&instance_name=inst1')
0137 self.assertEqual(response.status_code, 200)
0138 html = response.content.decode()
0139 self.assertIn('<html', html.lower())
0140 rows = re.findall(r'<tr>.*?</tr>', html, re.DOTALL)
0141 self.assertTrue(len(rows) > 1)
0142
0143 def test_log_summary_view(self):
0144 response = self.client.get(reverse('monitor_app:log_summary'))
0145 self.assertEqual(response.status_code, 200)
0146 html = response.content.decode()
0147 self.assertIn('<html', html.lower())
0148
0149 self.assertRegex(html, r'<table|<div')
0150
0151 class MonitorAppUITests(TestCase):
0152 def setUp(self):
0153 self.user = User.objects.create_user(username='ui_user', password='password')
0154 self.staff_user = User.objects.create_user(username='staff_user', password='password', is_staff=True)
0155 self.agent = SystemAgent.objects.create(instance_name='ui_agent', agent_type='test', status='OK')
0156
0157 def test_index_view_unauthenticated(self):
0158 response = self.client.get(reverse('monitor_app:index'))
0159 self.assertEqual(response.status_code, 302)
0160
0161 def test_index_view_authenticated(self):
0162 self.client.login(username='ui_user', password='password')
0163 response = self.client.get(reverse('monitor_app:index'))
0164 self.assertEqual(response.status_code, 200)
0165
0166 def test_create_agent_view_as_staff(self):
0167 self.client.login(username='staff_user', password='password')
0168 url = reverse('monitor_app:system_agent_create')
0169 data = {'instance_name': 'new_ui_agent', 'agent_type': 'test', 'status': 'OK'}
0170 response = self.client.post(url, data)
0171 self.assertEqual(response.status_code, 302)
0172 self.assertTrue(SystemAgent.objects.filter(instance_name='new_ui_agent').exists())
0173
0174 def test_create_agent_view_as_non_staff(self):
0175 self.client.login(username='ui_user', password='password')
0176 url = reverse('monitor_app:system_agent_create')
0177 data = {'instance_name': 'new_ui_agent', 'agent_type': 'test', 'status': 'OK'}
0178 response = self.client.post(url, data)
0179 self.assertEqual(response.status_code, 403)
0180
0181 def test_delete_agent_view(self):
0182 self.client.login(username='staff_user', password='password')
0183 url = reverse('monitor_app:system_agent_delete', kwargs={'pk': self.agent.pk})
0184 response = self.client.post(url)
0185 self.assertEqual(response.status_code, 302)
0186 self.assertFalse(SystemAgent.objects.filter(pk=self.agent.pk).exists())
0187
0188 def test_update_agent_view_get(self):
0189 self.client.login(username='staff_user', password='password')
0190 url = reverse('monitor_app:system_agent_update', kwargs={'pk': self.agent.pk})
0191 response = self.client.get(url)
0192 self.assertEqual(response.status_code, 200)
0193 self.assertContains(response, self.agent.instance_name)
0194
0195 def test_update_non_existent_agent_view(self):
0196 self.client.login(username='staff_user', password='password')
0197 url = reverse('monitor_app:system_agent_update', kwargs={'pk': 999})
0198 response = self.client.get(url)
0199 self.assertEqual(response.status_code, 404)
0200
0201 def test_delete_non_existent_agent_view(self):
0202 self.client.login(username='staff_user', password='password')
0203 url = reverse('monitor_app:system_agent_delete', kwargs={'pk': 999})
0204 response = self.client.get(url)
0205 self.assertEqual(response.status_code, 404)
0206
0207 class LogSummaryAPITests(TestCase):
0208 def setUp(self):
0209
0210 self.username = f"testuser_{uuid.uuid4()}"
0211 self.user = User.objects.create_user(username=self.username, password="testpass")
0212 self.client.login(username=self.username, password="testpass")
0213 now = timezone.now()
0214
0215 AppLog.objects.create(app_name='app1', instance_name='inst1', timestamp=now, level=logging.ERROR, levelname='ERROR', message='Error 1', module='mod', funcname='f', lineno=1, process=1, thread=1)
0216 AppLog.objects.create(app_name='app1', instance_name='inst1', timestamp=now, level=logging.INFO, levelname='INFO', message='Info 1', module='mod', funcname='f', lineno=2, process=1, thread=1)
0217 AppLog.objects.create(app_name='app1', instance_name='inst2', timestamp=now, level=logging.ERROR, levelname='ERROR', message='Error 2', module='mod', funcname='f', lineno=3, process=1, thread=1)
0218 AppLog.objects.create(app_name='app2', instance_name='inst3', timestamp=now, level=logging.CRITICAL, levelname='CRITICAL', message='Critical 1', module='mod', funcname='f', lineno=4, process=1, thread=1)
0219
0220 def tearDown(self):
0221
0222 User.objects.filter(username=self.username).delete()
0223
0224 def test_summary_api(self):
0225 url = '/api/logs/summary/'
0226 response = self.client.get(url)
0227 self.assertEqual(response.status_code, status.HTTP_200_OK)
0228 data = response.json()
0229 self.assertIn('app1', data)
0230 self.assertIn('app2', data)
0231 self.assertIn('inst1', data['app1'])
0232 self.assertIn('inst2', data['app1'])
0233 self.assertIn('inst3', data['app2'])
0234
0235 self.assertEqual(data['app1']['inst1']['error_counts'].get('ERROR', 0), 1)
0236 self.assertEqual(data['app1']['inst2']['error_counts'].get('ERROR', 0), 1)
0237 self.assertEqual(data['app2']['inst3']['error_counts'].get('CRITICAL', 0), 1)
0238
0239 self.assertTrue(isinstance(data['app1']['inst1']['recent_errors'], list))
0240
0241
0242 class RunAPITests(APITestCase):
0243 def setUp(self):
0244 unique_username = f"testuser_{uuid.uuid4()}"
0245 self.user = User.objects.create_user(username=unique_username, password='testpassword')
0246 self.client.force_authenticate(user=self.user)
0247 self.run = Run.objects.create(
0248 run_number=12345,
0249 start_time=timezone.now(),
0250 run_conditions={'beam_energy': 10.0, 'detector_config': 'standard'}
0251 )
0252
0253 def test_list_runs(self):
0254 url = reverse('run-list')
0255 response = self.client.get(url)
0256 self.assertEqual(response.status_code, status.HTTP_200_OK)
0257
0258 def test_create_run(self):
0259 url = reverse('run-list')
0260 data = {
0261 'run_number': 12346,
0262 'start_time': timezone.now().isoformat(),
0263 'run_conditions': {'beam_energy': 12.0, 'detector_config': 'high_rate'}
0264 }
0265 response = self.client.post(url, data, format='json')
0266 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0267 self.assertEqual(Run.objects.count(), 2)
0268
0269 def test_get_run(self):
0270 url = reverse('run-detail', kwargs={'pk': self.run.run_id})
0271 response = self.client.get(url)
0272 self.assertEqual(response.status_code, status.HTTP_200_OK)
0273 self.assertEqual(response.data['run_number'], 12345)
0274
0275 def test_update_run(self):
0276 url = reverse('run-detail', kwargs={'pk': self.run.run_id})
0277 data = {'end_time': timezone.now().isoformat()}
0278 response = self.client.patch(url, data, format='json')
0279 self.assertEqual(response.status_code, status.HTTP_200_OK)
0280 self.run.refresh_from_db()
0281 self.assertIsNotNone(self.run.end_time)
0282
0283 def test_delete_run(self):
0284 url = reverse('run-detail', kwargs={'pk': self.run.run_id})
0285 response = self.client.delete(url)
0286 self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
0287 self.assertFalse(Run.objects.filter(pk=self.run.run_id).exists())
0288
0289 def test_create_run_duplicate_number(self):
0290 url = reverse('run-list')
0291 data = {
0292 'run_number': 12345,
0293 'start_time': timezone.now().isoformat()
0294 }
0295 response = self.client.post(url, data, format='json')
0296 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0297
0298 def test_unauthenticated_access_denied(self):
0299 self.client.force_authenticate(user=None)
0300 url = reverse('run-list')
0301 response = self.client.get(url)
0302 self.assertIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN])
0303
0304
0305 class StfFileAPITests(APITestCase):
0306 def setUp(self):
0307 unique_username = f"testuser_{uuid.uuid4()}"
0308 self.user = User.objects.create_user(username=unique_username, password='testpassword')
0309 self.client.force_authenticate(user=self.user)
0310 self.run = Run.objects.create(
0311 run_number=12345,
0312 start_time=timezone.now()
0313 )
0314 self.stf_file = StfFile.objects.create(
0315 run=self.run,
0316 machine_state="physics",
0317 file_url="https://example.com/files/test.stf",
0318 file_size_bytes=1024000,
0319 checksum="abc123def456"
0320 )
0321
0322 def test_list_stf_files(self):
0323 url = reverse('stffile-list')
0324 response = self.client.get(url)
0325 self.assertEqual(response.status_code, status.HTTP_200_OK)
0326
0327 def test_create_stf_file(self):
0328 url = reverse('stffile-list')
0329 data = {
0330 'run': self.run.run_id,
0331 'machine_state': 'cosmics',
0332 'file_url': 'https://example.com/files/test2.stf',
0333 'file_size_bytes': 2048000,
0334 'checksum': 'def789abc123',
0335 'status': 'registered'
0336 }
0337 response = self.client.post(url, data, format='json')
0338 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0339 self.assertEqual(StfFile.objects.count(), 2)
0340
0341 def test_get_stf_file(self):
0342 url = reverse('stffile-detail', kwargs={'pk': self.stf_file.file_id})
0343 response = self.client.get(url)
0344 self.assertEqual(response.status_code, status.HTTP_200_OK)
0345 self.assertEqual(response.data['file_url'], "https://example.com/files/test.stf")
0346
0347 def test_update_stf_file_status(self):
0348 url = reverse('stffile-detail', kwargs={'pk': self.stf_file.file_id})
0349 data = {'status': 'processing'}
0350 response = self.client.patch(url, data, format='json')
0351 self.assertEqual(response.status_code, status.HTTP_200_OK)
0352 self.stf_file.refresh_from_db()
0353 self.assertEqual(self.stf_file.status, 'processing')
0354
0355 def test_delete_stf_file(self):
0356 url = reverse('stffile-detail', kwargs={'pk': self.stf_file.file_id})
0357 response = self.client.delete(url)
0358 self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
0359 self.assertFalse(StfFile.objects.filter(pk=self.stf_file.file_id).exists())
0360
0361 def test_create_stf_file_duplicate_url(self):
0362 url = reverse('stffile-list')
0363 data = {
0364 'run': self.run.run_id,
0365 'file_url': 'https://example.com/files/test.stf',
0366 'file_size_bytes': 1000,
0367 'checksum': 'duplicate'
0368 }
0369 response = self.client.post(url, data, format='json')
0370 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0371
0372 def test_invalid_status_value(self):
0373 url = reverse('stffile-detail', kwargs={'pk': self.stf_file.file_id})
0374 data = {'status': 'invalid_status'}
0375 response = self.client.patch(url, data, format='json')
0376 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0377
0378 def test_unauthenticated_access_denied(self):
0379 self.client.force_authenticate(user=None)
0380 url = reverse('stffile-list')
0381 response = self.client.get(url)
0382 self.assertIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN])
0383
0384
0385 class SubscriberAPITests(APITestCase):
0386 def setUp(self):
0387 unique_username = f"testuser_{uuid.uuid4()}"
0388 self.user = User.objects.create_user(username=unique_username, password='testpassword')
0389 self.client.force_authenticate(user=self.user)
0390 self.subscriber = Subscriber.objects.create(
0391 subscriber_name="test_subscriber",
0392 fraction=0.5,
0393 description="Test subscriber for unit tests",
0394 is_active=True
0395 )
0396
0397 def test_list_subscribers(self):
0398 url = reverse('subscriber-list')
0399 response = self.client.get(url)
0400 self.assertEqual(response.status_code, status.HTTP_200_OK)
0401
0402 def test_create_subscriber(self):
0403 url = reverse('subscriber-list')
0404 data = {
0405 'subscriber_name': 'new_subscriber',
0406 'fraction': 0.8,
0407 'description': 'New test subscriber',
0408 'is_active': True
0409 }
0410 response = self.client.post(url, data, format='json')
0411 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0412 self.assertEqual(Subscriber.objects.count(), 2)
0413
0414 def test_get_subscriber(self):
0415 url = reverse('subscriber-detail', kwargs={'pk': self.subscriber.subscriber_id})
0416 response = self.client.get(url)
0417 self.assertEqual(response.status_code, status.HTTP_200_OK)
0418 self.assertEqual(response.data['subscriber_name'], "test_subscriber")
0419
0420 def test_update_subscriber_status(self):
0421 url = reverse('subscriber-detail', kwargs={'pk': self.subscriber.subscriber_id})
0422 data = {'is_active': False}
0423 response = self.client.patch(url, data, format='json')
0424 self.assertEqual(response.status_code, status.HTTP_200_OK)
0425 self.subscriber.refresh_from_db()
0426 self.assertFalse(self.subscriber.is_active)
0427
0428 def test_update_subscriber_fraction(self):
0429 url = reverse('subscriber-detail', kwargs={'pk': self.subscriber.subscriber_id})
0430 data = {'fraction': 0.3}
0431 response = self.client.patch(url, data, format='json')
0432 self.assertEqual(response.status_code, status.HTTP_200_OK)
0433 self.subscriber.refresh_from_db()
0434 self.assertEqual(self.subscriber.fraction, 0.3)
0435
0436 def test_delete_subscriber(self):
0437 url = reverse('subscriber-detail', kwargs={'pk': self.subscriber.subscriber_id})
0438 response = self.client.delete(url)
0439 self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
0440 self.assertFalse(Subscriber.objects.filter(pk=self.subscriber.subscriber_id).exists())
0441
0442 def test_create_subscriber_duplicate_name(self):
0443 url = reverse('subscriber-list')
0444 data = {
0445 'subscriber_name': 'test_subscriber',
0446 'fraction': 0.1,
0447 'description': 'Duplicate name test'
0448 }
0449 response = self.client.post(url, data, format='json')
0450 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0451
0452 def test_invalid_fraction_range(self):
0453 url = reverse('subscriber-list')
0454 data = {
0455 'subscriber_name': 'invalid_fraction_subscriber',
0456 'fraction': 1.5,
0457 'description': 'Invalid fraction test'
0458 }
0459 response = self.client.post(url, data, format='json')
0460
0461
0462
0463 def test_unauthenticated_access_denied(self):
0464 self.client.force_authenticate(user=None)
0465 url = reverse('subscriber-list')
0466 response = self.client.get(url)
0467 self.assertIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN])
0468
0469
0470 class RestLoggingIntegrationTests(APITestCase):
0471 """
0472 Test the REST logging functionality end-to-end.
0473
0474 These tests verify that agents can send log messages to the database
0475 via the REST API endpoint using both direct REST calls and the
0476 custom Python logging handler.
0477 """
0478
0479 def setUp(self):
0480 """Set up test client and authentication."""
0481 unique_username = f"testuser_{uuid.uuid4()}"
0482 self.user = User.objects.create_user(username=unique_username, password='testpassword')
0483 self.client.force_authenticate(user=self.user)
0484 self.logs_url = reverse('applog-list')
0485 AppLog.objects.all().delete()
0486
0487 def test_direct_rest_log_creation(self):
0488 """Test creating logs directly via REST API."""
0489 log_data = {
0490 'app_name': 'test_app',
0491 'instance_name': 'test_instance',
0492 'timestamp': timezone.now().isoformat(),
0493 'level': logging.INFO,
0494 'levelname': 'INFO',
0495 'message': 'Test log message via REST API',
0496 'module': 'test_module',
0497 'funcname': 'test_function',
0498 'lineno': 42,
0499 'process': 1234,
0500 'thread': 5678,
0501 'extra_data': {'test': 'data'}
0502 }
0503
0504 response = self.client.post(
0505 self.logs_url,
0506 data=log_data,
0507 format='json'
0508 )
0509
0510 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0511
0512
0513 self.assertEqual(AppLog.objects.count(), 1)
0514 log = AppLog.objects.first()
0515 self.assertEqual(log.app_name, 'test_app')
0516 self.assertEqual(log.message, 'Test log message via REST API')
0517 self.assertEqual(log.level, logging.INFO)
0518
0519 def test_multiple_log_levels(self):
0520 """Test logging different severity levels."""
0521 test_logs = [
0522 ('DEBUG', logging.DEBUG, 'Debug message'),
0523 ('INFO', logging.INFO, 'Info message'),
0524 ('WARNING', logging.WARNING, 'Warning message'),
0525 ('ERROR', logging.ERROR, 'Error message'),
0526 ('CRITICAL', logging.CRITICAL, 'Critical message')
0527 ]
0528
0529 for levelname, level_int, message in test_logs:
0530 log_data = {
0531 'app_name': 'multi_level_test',
0532 'instance_name': 'test_instance',
0533 'timestamp': timezone.now().isoformat(),
0534 'level': level_int,
0535 'levelname': levelname,
0536 'message': message,
0537 'module': 'test_module',
0538 'funcname': 'test_function',
0539 'lineno': 1,
0540 'process': 1234,
0541 'thread': 5678
0542 }
0543
0544 response = self.client.post(
0545 self.logs_url,
0546 data=log_data,
0547 format='json'
0548 )
0549
0550 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0551
0552
0553 self.assertEqual(AppLog.objects.count(), len(test_logs))
0554
0555
0556 levels_in_db = set(AppLog.objects.values_list('level', flat=True))
0557 expected_levels = {level_int for _, level_int, _ in test_logs}
0558 self.assertEqual(levels_in_db, expected_levels)
0559
0560 def test_bulk_logging(self):
0561 """Test creating many log entries (simulating real usage)."""
0562 num_logs = 25
0563
0564 for i in range(num_logs):
0565 log_data = {
0566 'app_name': 'bulk_test_app',
0567 'instance_name': f'instance_{i % 5}',
0568 'timestamp': timezone.now().isoformat(),
0569 'level': logging.INFO,
0570 'levelname': 'INFO',
0571 'message': f'Bulk test log message {i+1}',
0572 'module': 'bulk_test_module',
0573 'funcname': 'bulk_test_function',
0574 'lineno': i + 1,
0575 'process': 1234,
0576 'thread': 5678
0577 }
0578
0579 response = self.client.post(
0580 self.logs_url,
0581 data=log_data,
0582 format='json'
0583 )
0584
0585 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0586
0587
0588 self.assertEqual(AppLog.objects.count(), num_logs)
0589
0590
0591 instance_counts = AppLog.objects.values('instance_name').distinct().count()
0592 self.assertEqual(instance_counts, 5)
0593
0594 def test_log_retrieval(self):
0595 """Test retrieving logs via REST API."""
0596
0597 for i in range(3):
0598 log_data = {
0599 'app_name': 'retrieval_test',
0600 'instance_name': 'test_instance',
0601 'timestamp': timezone.now().isoformat(),
0602 'level': logging.INFO,
0603 'levelname': 'INFO',
0604 'message': f'Retrieval test message {i+1}',
0605 'module': 'test_module',
0606 'funcname': 'test_function',
0607 'lineno': i + 1,
0608 'process': 1234,
0609 'thread': 5678
0610 }
0611
0612 self.client.post(
0613 self.logs_url,
0614 data=log_data,
0615 format='json'
0616 )
0617
0618
0619 response = self.client.get(self.logs_url)
0620 self.assertEqual(response.status_code, status.HTTP_200_OK)
0621
0622
0623 response_data = response.json()
0624
0625 if isinstance(response_data, dict) and 'results' in response_data:
0626 results = response_data['results']
0627 else:
0628 results = response_data
0629
0630 self.assertEqual(len(results), 3)
0631
0632
0633 messages = [log['message'] for log in results]
0634 self.assertIn('Retrieval test message 1', messages)
0635 self.assertIn('Retrieval test message 2', messages)
0636 self.assertIn('Retrieval test message 3', messages)
0637
0638 def test_invalid_log_data(self):
0639 """Test handling of invalid log data."""
0640 invalid_data = {
0641 'app_name': 'test_app',
0642
0643 'message': 'Invalid log entry'
0644 }
0645
0646 response = self.client.post(
0647 self.logs_url,
0648 data=invalid_data,
0649 format='json'
0650 )
0651
0652 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0653 self.assertEqual(AppLog.objects.count(), 0)
0654
0655 def test_agent_workflow_logging_simulation(self):
0656 """Test simulating how agents would send logs during workflow processing."""
0657
0658 startup_logs = [
0659 ('INFO', 'Agent starting up'),
0660 ('INFO', 'Connecting to message queue'),
0661 ('INFO', 'Agent ready for processing')
0662 ]
0663
0664 for levelname, message in startup_logs:
0665 log_data = {
0666 'app_name': 'workflow_agent',
0667 'instance_name': 'agent_001',
0668 'timestamp': timezone.now().isoformat(),
0669 'level': getattr(logging, levelname),
0670 'levelname': levelname,
0671 'message': message,
0672 'module': 'agent_workflow',
0673 'funcname': 'startup_sequence',
0674 'lineno': 1,
0675 'process': 1234,
0676 'thread': 5678
0677 }
0678
0679 response = self.client.post(self.logs_url, data=log_data, format='json')
0680 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0681
0682
0683 processing_logs = [
0684 ('INFO', 'Processing file batch 1/10'),
0685 ('DEBUG', 'File validation successful'),
0686 ('WARNING', 'Processing took longer than expected'),
0687 ('INFO', 'Batch processing completed')
0688 ]
0689
0690 for levelname, message in processing_logs:
0691 log_data = {
0692 'app_name': 'workflow_agent',
0693 'instance_name': 'agent_001',
0694 'timestamp': timezone.now().isoformat(),
0695 'level': getattr(logging, levelname),
0696 'levelname': levelname,
0697 'message': message,
0698 'module': 'agent_workflow',
0699 'funcname': 'process_batch',
0700 'lineno': 1,
0701 'process': 1234,
0702 'thread': 5678
0703 }
0704
0705 response = self.client.post(self.logs_url, data=log_data, format='json')
0706 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0707
0708
0709 total_logs = len(startup_logs) + len(processing_logs)
0710 self.assertEqual(AppLog.objects.count(), total_logs)
0711
0712
0713 agent_logs = AppLog.objects.filter(
0714 app_name='workflow_agent',
0715 instance_name='agent_001'
0716 )
0717 self.assertEqual(agent_logs.count(), total_logs)
0718
0719
0720 log_levels = set(agent_logs.values_list('levelname', flat=True))
0721 expected_levels = {'INFO', 'DEBUG', 'WARNING'}
0722 self.assertEqual(log_levels, expected_levels)
0723
0724
0725 class ActiveMQSSLConnectionTests(TestCase):
0726 """
0727 Tests for ActiveMQ SSL connection functionality.
0728 These tests verify that the SSL configuration works correctly with the certificate.
0729 """
0730
0731 def setUp(self):
0732 """Set up test environment with ActiveMQ settings"""
0733
0734 try:
0735 import stomp
0736 import ssl
0737 self.stomp_available = True
0738 except ImportError:
0739 self.stomp_available = False
0740
0741 def test_activemq_ssl_configuration(self):
0742 """Test that ActiveMQ SSL settings are properly configured"""
0743 from django.conf import settings
0744
0745
0746 self.assertTrue(hasattr(settings, 'ACTIVEMQ_USE_SSL'))
0747 self.assertTrue(hasattr(settings, 'ACTIVEMQ_SSL_CA_CERTS'))
0748 self.assertTrue(hasattr(settings, 'ACTIVEMQ_HOST'))
0749 self.assertTrue(hasattr(settings, 'ACTIVEMQ_PORT'))
0750 self.assertTrue(hasattr(settings, 'ACTIVEMQ_USER'))
0751 self.assertTrue(hasattr(settings, 'ACTIVEMQ_PASSWORD'))
0752
0753
0754 if settings.ACTIVEMQ_USE_SSL:
0755 self.assertIsNotNone(settings.ACTIVEMQ_SSL_CA_CERTS)
0756 self.assertNotEqual(settings.ACTIVEMQ_SSL_CA_CERTS, '')
0757
0758 def test_certificate_file_exists(self):
0759 """Test that the SSL certificate file exists and is readable"""
0760 from django.conf import settings
0761 import os
0762
0763 if getattr(settings, 'ACTIVEMQ_USE_SSL', False):
0764 cert_file = settings.ACTIVEMQ_SSL_CA_CERTS
0765 if cert_file:
0766 self.assertTrue(os.path.exists(cert_file),
0767 f"Certificate file not found: {cert_file}")
0768 self.assertTrue(os.path.isfile(cert_file),
0769 f"Certificate path is not a file: {cert_file}")
0770
0771 with open(cert_file, 'r') as f:
0772 content = f.read()
0773 self.assertIn('-----BEGIN CERTIFICATE-----', content)
0774 self.assertIn('-----END CERTIFICATE-----', content)
0775
0776 def test_activemq_connection_mock(self):
0777 """Test ActiveMQ connection setup with mocked connection"""
0778 if not self.stomp_available:
0779 self.skipTest("stomp.py not available")
0780
0781 from django.conf import settings
0782 from unittest.mock import patch, MagicMock
0783 import stomp
0784 import ssl
0785
0786
0787 with patch('stomp.Connection') as mock_connection_class:
0788 mock_connection = MagicMock()
0789 mock_connection_class.return_value = mock_connection
0790
0791
0792 host = getattr(settings, 'ACTIVEMQ_HOST', 'localhost')
0793 port = getattr(settings, 'ACTIVEMQ_PORT', 61612)
0794 use_ssl = getattr(settings, 'ACTIVEMQ_USE_SSL', False)
0795
0796
0797 conn = stomp.Connection(host_and_ports=[(host, port)], vhost=host, try_loopback_connect=False)
0798
0799
0800 mock_connection_class.assert_called_once_with(
0801 host_and_ports=[(host, port)],
0802 vhost=host,
0803 try_loopback_connect=False
0804 )
0805
0806
0807 if use_ssl:
0808 ssl_ca_certs = getattr(settings, 'ACTIVEMQ_SSL_CA_CERTS', '')
0809 if ssl_ca_certs:
0810
0811 self.assertTrue(hasattr(conn, 'transport'))
0812
0813 def test_activemq_listener_import(self):
0814 """Test that the ActiveMQ listener module imports correctly"""
0815 try:
0816 from monitor_app.activemq_listener import start_activemq_listener, MessageListener
0817 self.assertTrue(callable(start_activemq_listener))
0818 self.assertTrue(MessageListener is not None)
0819 except ImportError as e:
0820 self.fail(f"Failed to import ActiveMQ listener components: {e}")
0821
0822 def test_activemq_ssl_connection_attempt(self):
0823 """Test actual SSL connection attempt (will fail if service not available)"""
0824 if not self.stomp_available:
0825 self.skipTest("stomp.py not available")
0826
0827 from django.conf import settings
0828 import stomp
0829 import ssl
0830 import socket
0831
0832
0833 if not getattr(settings, 'ACTIVEMQ_USE_SSL', False):
0834 self.skipTest("SSL not configured for ActiveMQ")
0835
0836 host = settings.ACTIVEMQ_HOST
0837 port = settings.ACTIVEMQ_PORT
0838 ca_certs = settings.ACTIVEMQ_SSL_CA_CERTS
0839
0840
0841 try:
0842 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
0843 sock.settimeout(2)
0844 result = sock.connect_ex((host, port))
0845 sock.close()
0846
0847 if result != 0:
0848 self.skipTest(f"ActiveMQ service not reachable at {host}:{port}")
0849
0850 except Exception as e:
0851 self.skipTest(f"Cannot test network connectivity: {e}")
0852
0853
0854 conn = stomp.Connection(host_and_ports=[(host, port)], vhost=host, try_loopback_connect=False)
0855
0856
0857 if ca_certs:
0858 try:
0859 conn.transport.set_ssl(
0860 for_hosts=[(host, port)],
0861 ca_certs=ca_certs,
0862 ssl_version=ssl.PROTOCOL_TLS_CLIENT
0863 )
0864
0865
0866 user = settings.ACTIVEMQ_USER
0867 password = settings.ACTIVEMQ_PASSWORD
0868
0869
0870 try:
0871 conn.connect(login=user, passcode=password, wait=True, version='1.2', timeout=5)
0872 conn.disconnect()
0873
0874 self.assertTrue(True, "ActiveMQ SSL connection successful")
0875 except Exception as e:
0876
0877
0878 self.assertIsInstance(e, (stomp.exception.ConnectFailedException,
0879 ConnectionError, OSError, socket.error))
0880
0881 except Exception as e:
0882 self.fail(f"SSL configuration failed: {e}")