File indexing completed on 2026-04-10 08:39:00
0001
0002 import os
0003 import unittest
0004
0005 from pandaserver.api.v1.http_client import HttpClient, api_url_ssl
0006
0007
0008
0009 PANDA_JOB_ID = int(os.environ.get("PANDA_JOB_ID_TEST", -1))
0010 JEDI_TASK_ID = int(os.environ.get("JEDI_TASK_ID_TEST", -1))
0011
0012
0013 class TestJobAPI(unittest.TestCase):
0014 def setUp(self):
0015 self.http_client = HttpClient()
0016
0017 def test_get_status(self):
0018
0019 url = f"{api_url_ssl}/job/get_status"
0020 print(f"Testing URL: {url}")
0021 data = {"job_ids": [PANDA_JOB_ID]}
0022 status, output = self.http_client.get(url, data)
0023 print(output)
0024 output["status"] = status
0025 del output["data"]
0026 del output["message"]
0027
0028
0029 expected_response = {"status": 0, "success": True}
0030 self.assertEqual(output, expected_response)
0031
0032 def test_get_description(self):
0033
0034 url = f"{api_url_ssl}/job/get_description"
0035 print(f"Testing URL: {url}")
0036 data = {"job_ids": [PANDA_JOB_ID]}
0037 status, output = self.http_client.get(url, data)
0038 print(output)
0039 output["status"] = status
0040 del output["data"]
0041 del output["message"]
0042
0043
0044 expected_response = {"status": 0, "success": True}
0045 self.assertEqual(output, expected_response)
0046
0047 def test_get_description_incl_archive(self):
0048
0049 url = f"{api_url_ssl}/job/get_description_incl_archive"
0050 print(f"Testing URL: {url}")
0051 data = {"job_ids": [PANDA_JOB_ID]}
0052 status, output = self.http_client.get(url, data)
0053 print(output)
0054 output["status"] = status
0055 del output["data"]
0056 del output["message"]
0057
0058
0059 expected_response = {"status": 0, "success": True}
0060 self.assertEqual(output, expected_response)
0061
0062 def test_get_metadata_for_analysis_jobs(self):
0063
0064 url = f"{api_url_ssl}/job/get_metadata_for_analysis_jobs"
0065 print(f"Testing URL: {url}")
0066 data = {"task_id": JEDI_TASK_ID}
0067 status, output = self.http_client.get(url, data)
0068 print(output)
0069 output["status"] = status
0070 del output["data"]
0071
0072 if JEDI_TASK_ID == -1:
0073
0074 expected_response = {"status": 0, "success": True, "message": "No metadata found"}
0075 else:
0076
0077 expected_response = {"status": 0, "success": True, "message": ""}
0078 output["message"] = ""
0079 self.assertEqual(output, expected_response)
0080
0081 def test_kill(self):
0082
0083 url = f"{api_url_ssl}/job/kill"
0084 print(f"Testing URL: {url}")
0085 data = {"job_ids": [PANDA_JOB_ID]}
0086 status, output = self.http_client.post(url, data)
0087 print(output)
0088 output["status"] = status
0089 del output["data"]
0090 del output["message"]
0091
0092
0093 expected_response = {"status": 0, "success": True}
0094 self.assertEqual(output, expected_response)
0095
0096 def test_reassign(self):
0097
0098 url = f"{api_url_ssl}/job/reassign"
0099 print(f"Testing URL: {url}")
0100 data = {"job_ids": [PANDA_JOB_ID]}
0101 status, output = self.http_client.post(url, data)
0102 print(output)
0103 output["status"] = status
0104 del output["message"]
0105
0106
0107 expected_response = {"status": 0, "success": True, "data": None}
0108 self.assertEqual(output, expected_response)
0109
0110
0111 if __name__ == "__main__":
0112 unittest.main()