File indexing completed on 2026-04-10 08:39:00
0001
0002 import os
0003 import unittest
0004 from datetime import datetime, timedelta
0005
0006 from pandaserver.api.v1.http_client import HttpClient, api_url_ssl
0007
0008
0009 REQUEST_ID = int(os.environ.get("REQUEST_ID", None))
0010 DATASET = str(os.environ.get("DATASET", None))
0011
0012
0013 class TestDataCarouselAPI(unittest.TestCase):
0014 def setUp(self):
0015 self.http_client = HttpClient()
0016
0017 def test_change_staging_destination_by_request_id(self):
0018 url = f"{api_url_ssl}/data_carousel/change_staging_destination"
0019 print(f"Testing URL: {url}")
0020 data = {"request_id": REQUEST_ID}
0021 status, output = self.http_client.post(url, data)
0022 print(status, output)
0023 if status == 0 and output.get("data"):
0024 self.assertEqual(output["data"].get("request_id"), REQUEST_ID)
0025 self.assertIsInstance(output["data"].get("new_request_id"), int)
0026
0027 def test_change_staging_destination_by_dataset(self):
0028 url = f"{api_url_ssl}/data_carousel/change_staging_destination"
0029 print(f"Testing URL: {url}")
0030 data = {"dataset": DATASET}
0031 status, output = self.http_client.post(url, data)
0032 print(status, output)
0033 if status == 0 and output.get("data"):
0034 self.assertEqual(output["data"].get("dataset"), DATASET)
0035 self.assertIsInstance(output["data"].get("new_request_id"), int)
0036
0037 def test_change_staging_source_by_request_id(self):
0038 url = f"{api_url_ssl}/data_carousel/change_staging_source"
0039 print(f"Testing URL: {url}")
0040 data = {"request_id": REQUEST_ID}
0041 status, output = self.http_client.post(url, data)
0042 print(status, output)
0043 if status == 0 and output.get("data"):
0044 self.assertEqual(output["data"].get("request_id"), REQUEST_ID)
0045 self.assertIsInstance(output["data"].get("ddm_rule_id"), str)
0046
0047 def test_change_staging_source_by_dataset(self):
0048 url = f"{api_url_ssl}/data_carousel/change_staging_source"
0049 print(f"Testing URL: {url}")
0050 data = {"dataset": DATASET}
0051 status, output = self.http_client.post(url, data)
0052 print(status, output)
0053 if status == 0 and output.get("data"):
0054 self.assertEqual(output["data"].get("dataset"), DATASET)
0055 self.assertIsInstance(output["data"].get("ddm_rule_id"), str)
0056
0057 def test_force_to_staging_by_request_id(self):
0058 url = f"{api_url_ssl}/data_carousel/force_to_staging"
0059 print(f"Testing URL: {url}")
0060 data = {"request_id": REQUEST_ID}
0061 status, output = self.http_client.post(url, data)
0062 print(status, output)
0063 if status == 0 and output.get("data"):
0064 self.assertEqual(output["data"].get("request_id"), REQUEST_ID)
0065 self.assertEqual(output["data"].get("status"), "staging")
0066 self.assertIsInstance(output["data"].get("ddm_rule_id"), str)
0067
0068 def test_force_to_staging_by_dataset(self):
0069 url = f"{api_url_ssl}/data_carousel/force_to_staging"
0070 print(f"Testing URL: {url}")
0071 data = {"dataset": DATASET}
0072 status, output = self.http_client.post(url, data)
0073 print(status, output)
0074 if status == 0 and output.get("data"):
0075 self.assertEqual(output["data"].get("dataset"), DATASET)
0076 self.assertEqual(output["data"].get("status"), "staging")
0077 self.assertIsInstance(output["data"].get("ddm_rule_id"), str)
0078
0079
0080
0081 if __name__ == "__main__":
0082 unittest.main()