Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:44

0001 """REST client — talks to swf-remote's /api/panda/* proxy on loopback.
0002 
0003 We intentionally do NOT reach pandaserver02 directly. swf-remote owns the
0004 SSH tunnel; every consumer goes through it. Running this engine from a
0005 non-ec2dev host just requires pointing SWF_REMOTE_BASE_URL elsewhere (or
0006 setting up an SSH tunnel to a host that has one). No other code changes.
0007 """
0008 from __future__ import annotations
0009 
0010 import httpx
0011 
0012 
0013 class FetchError(RuntimeError):
0014     pass
0015 
0016 
0017 class Client:
0018     def __init__(self, base_url: str, timeout: int = 20):
0019         self.base_url = base_url.rstrip("/")
0020         self.timeout = timeout
0021 
0022     def _get(self, path: str, params: dict | None = None) -> dict:
0023         url = f"{self.base_url}{path}"
0024         try:
0025             r = httpx.get(url, params=params, timeout=self.timeout, verify=True)
0026         except httpx.HTTPError as e:
0027             raise FetchError(f"request failed: {url}: {e}") from e
0028         if r.status_code >= 400:
0029             raise FetchError(f"{r.status_code} {r.reason_phrase} from {url}: {r.text[:200]}")
0030         try:
0031             return r.json()
0032         except ValueError as e:
0033             raise FetchError(f"non-json response from {url}: {r.text[:200]}") from e
0034 
0035     def list_tasks(self, *, days: int = 1, status: str | None = None,
0036                    username: str | None = None, taskname: str | None = None,
0037                    processingtype: str | None = None,
0038                    limit: int = 50, before_id: int | None = None) -> dict:
0039         params = {"days": days, "limit": limit}
0040         for k, v in (("status", status), ("username", username),
0041                      ("taskname", taskname),
0042                      ("processingtype", processingtype),
0043                      ("before_id", before_id)):
0044             if v is not None:
0045                 params[k] = v
0046         return self._get("/api/panda/tasks/", params)
0047 
0048     def iter_all_tasks(self, **filters):
0049         """Paginate through all matching tasks. Yields task dicts."""
0050         before_id = None
0051         while True:
0052             batch = self.list_tasks(before_id=before_id, **filters)
0053             for item in batch.get("items", []):
0054                 yield item
0055             if not batch.get("has_more"):
0056                 return
0057             before_id = batch.get("next_before_id")
0058             if before_id is None:
0059                 return
0060 
0061     def get_task(self, jeditaskid: int) -> dict:
0062         return self._get(f"/api/panda/tasks/{jeditaskid}/")
0063 
0064     def activity(self, *, days: int = 1) -> dict:
0065         return self._get("/api/panda/activity/", {"days": days})