Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-10 08:39:00

0001 # Description: Unit tests for the Job API methods
0002 import os
0003 import unittest
0004 
0005 from pandaserver.api.v1.http_client import HttpClient, api_url_ssl
0006 
0007 # To run tests with real IDs, set environment variables:
0008 #   PANDA_JOB_ID_TEST=<job_id>  JEDI_TASK_ID_TEST=<task_id>
0009 PANDA_JOB_ID = int(os.environ.get("PANDA_JOB_ID_TEST", -1))
0010 JEDI_TASK_ID = int(os.environ.get("JEDI_TASK_ID_TEST", -1))
0011 
0012 
0013 class TestJobAPI(unittest.TestCase):
0014     def setUp(self):
0015         self.http_client = HttpClient()
0016 
0017     def test_get_status(self):
0018         # def get_status(req: PandaRequest, job_ids: List[int], timeout: int = 60) -> Dict:
0019         url = f"{api_url_ssl}/job/get_status"
0020         print(f"Testing URL: {url}")
0021         data = {"job_ids": [PANDA_JOB_ID]}
0022         status, output = self.http_client.get(url, data)
0023         print(output)
0024         output["status"] = status
0025         del output["data"]
0026         del output["message"]
0027 
0028         # Returns success=True regardless of whether job exists (returns empty or None entries)
0029         expected_response = {"status": 0, "success": True}
0030         self.assertEqual(output, expected_response)
0031 
0032     def test_get_description(self):
0033         # def get_description(req: PandaRequest, job_ids: List[int]) -> Dict:
0034         url = f"{api_url_ssl}/job/get_description"
0035         print(f"Testing URL: {url}")
0036         data = {"job_ids": [PANDA_JOB_ID]}
0037         status, output = self.http_client.get(url, data)
0038         print(output)
0039         output["status"] = status
0040         del output["data"]
0041         del output["message"]
0042 
0043         # Returns success=True regardless (returns [None] for unknown job IDs)
0044         expected_response = {"status": 0, "success": True}
0045         self.assertEqual(output, expected_response)
0046 
0047     def test_get_description_incl_archive(self):
0048         # def get_description_incl_archive(req: PandaRequest, job_ids: List[int]) -> Dict:
0049         url = f"{api_url_ssl}/job/get_description_incl_archive"
0050         print(f"Testing URL: {url}")
0051         data = {"job_ids": [PANDA_JOB_ID]}
0052         status, output = self.http_client.get(url, data)
0053         print(output)
0054         output["status"] = status
0055         del output["data"]
0056         del output["message"]
0057 
0058         # Returns success=True regardless (returns [None] for unknown job IDs)
0059         expected_response = {"status": 0, "success": True}
0060         self.assertEqual(output, expected_response)
0061 
0062     def test_get_metadata_for_analysis_jobs(self):
0063         # def get_metadata_for_analysis_jobs(req: PandaRequest, task_id: int) -> Dict:
0064         url = f"{api_url_ssl}/job/get_metadata_for_analysis_jobs"
0065         print(f"Testing URL: {url}")
0066         data = {"task_id": JEDI_TASK_ID}
0067         status, output = self.http_client.get(url, data)
0068         print(output)
0069         output["status"] = status
0070         del output["data"]
0071 
0072         if JEDI_TASK_ID == -1:
0073             # Fake task returns success=True with "No metadata found" message
0074             expected_response = {"status": 0, "success": True, "message": "No metadata found"}
0075         else:
0076             # Real task
0077             expected_response = {"status": 0, "success": True, "message": ""}
0078             output["message"] = ""
0079         self.assertEqual(output, expected_response)
0080 
0081     def test_kill(self):
0082         # def kill(req, job_ids: List[int], code: int = None, ...) -> Dict:
0083         url = f"{api_url_ssl}/job/kill"
0084         print(f"Testing URL: {url}")
0085         data = {"job_ids": [PANDA_JOB_ID]}
0086         status, output = self.http_client.post(url, data)
0087         print(output)
0088         output["status"] = status
0089         del output["data"]
0090         del output["message"]
0091 
0092         # killJobs always returns success=True
0093         expected_response = {"status": 0, "success": True}
0094         self.assertEqual(output, expected_response)
0095 
0096     def test_reassign(self):
0097         # def reassign(req: PandaRequest, job_ids: List[int]) -> Dict:
0098         url = f"{api_url_ssl}/job/reassign"
0099         print(f"Testing URL: {url}")
0100         data = {"job_ids": [PANDA_JOB_ID]}
0101         status, output = self.http_client.post(url, data)
0102         print(output)
0103         output["status"] = status
0104         del output["message"]
0105 
0106         # reassignJobs always returns success=True
0107         expected_response = {"status": 0, "success": True, "data": None}
0108         self.assertEqual(output, expected_response)
0109 
0110 
0111 if __name__ == "__main__":
0112     unittest.main()