File indexing completed on 2026-04-27 07:41:43
0001 from django.test import TestCase
0002 from django.urls import reverse
0003 from django.contrib.auth.models import User
0004 from django.utils import timezone
0005 from rest_framework.test import APITestCase, APIClient
0006 from rest_framework import status
0007 from monitor_app.models import SystemAgent, AppLog, Run, StfFile, Subscriber
0008 from monitor_app.serializers import AppLogSerializer
0009 from django.core.management import call_command
0010 from io import StringIO
0011 import logging
0012 import uuid
0013 import re
0014
0015
0016 class StfFileAPITests(APITestCase):
0017 def setUp(self):
0018 unique_username = f"testuser_{uuid.uuid4()}"
0019 self.user = User.objects.create_user(username=unique_username, password='testpassword')
0020 self.client.force_authenticate(user=self.user)
0021 self.run = Run.objects.create(
0022 run_number=12345,
0023 start_time=timezone.now()
0024 )
0025 self.stf_file = StfFile.objects.create(
0026 run=self.run,
0027 machine_state="physics",
0028 stf_filename="test_run12345_001.stf",
0029 file_size_bytes=1024000,
0030 checksum="abc123def456"
0031 )
0032
0033 def test_list_stf_files(self):
0034 url = reverse('monitor_app:stffile-list')
0035 response = self.client.get(url)
0036 self.assertEqual(response.status_code, status.HTTP_200_OK)
0037
0038 def test_create_stf_file(self):
0039 url = reverse('monitor_app:stffile-list')
0040 data = {
0041 'run': self.run.run_id,
0042 'machine_state': 'cosmics',
0043 'stf_filename': 'test_run12345_002.stf',
0044 'file_size_bytes': 2048000,
0045 'checksum': 'def789abc123',
0046 'status': 'registered'
0047 }
0048 response = self.client.post(url, data, format='json')
0049 self.assertEqual(response.status_code, status.HTTP_201_CREATED)
0050 self.assertEqual(StfFile.objects.count(), 2)
0051
0052 def test_get_stf_file(self):
0053 url = reverse('monitor_app:stffile-detail', kwargs={'pk': self.stf_file.file_id})
0054 response = self.client.get(url)
0055 self.assertEqual(response.status_code, status.HTTP_200_OK)
0056 self.assertEqual(response.data['stf_filename'], "test_run12345_001.stf")
0057
0058 def test_update_stf_file_status(self):
0059 url = reverse('monitor_app:stffile-detail', kwargs={'pk': self.stf_file.file_id})
0060 data = {'status': 'processing'}
0061 response = self.client.patch(url, data, format='json')
0062 self.assertEqual(response.status_code, status.HTTP_200_OK)
0063 self.stf_file.refresh_from_db()
0064 self.assertEqual(self.stf_file.status, 'processing')
0065
0066 def test_delete_stf_file(self):
0067 url = reverse('monitor_app:stffile-detail', kwargs={'pk': self.stf_file.file_id})
0068 response = self.client.delete(url)
0069 self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
0070 self.assertFalse(StfFile.objects.filter(pk=self.stf_file.file_id).exists())
0071
0072 def test_create_stf_file_duplicate_filename(self):
0073 url = reverse('monitor_app:stffile-list')
0074 data = {
0075 'run': self.run.run_id,
0076 'stf_filename': 'test_run12345_001.stf',
0077 'file_size_bytes': 1000,
0078 'checksum': 'duplicate'
0079 }
0080 response = self.client.post(url, data, format='json')
0081 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0082
0083 def test_invalid_status_value(self):
0084 url = reverse('monitor_app:stffile-detail', kwargs={'pk': self.stf_file.file_id})
0085 data = {'status': 'invalid_status'}
0086 response = self.client.patch(url, data, format='json')
0087 self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
0088
0089 def test_unauthenticated_access_denied(self):
0090 self.client.force_authenticate(user=None)
0091 url = reverse('monitor_app:stffile-list')
0092 response = self.client.get(url)
0093 self.assertIn(response.status_code, [status.HTTP_401_UNAUTHORIZED, status.HTTP_403_FORBIDDEN])