File indexing completed on 2026-04-27 07:41:41
0001 """
0002 API utility functions for swf-monitor communication.
0003
0004 These utilities are shared between BaseAgent and other components that need
0005 to interact with the swf-monitor REST API but don't inherit from BaseAgent.
0006 """
0007
0008 import time
0009 import logging
0010 import requests
0011
0012
0013 RETRY_DELAYS = (2, 5, 10, 20, 40, 60)
0014 RETRYABLE_STATUS_CODES = {404, 500, 502, 503, 504}
0015
0016
0017 def api_request_with_retry(method, url, session=None, logger=None, **kwargs):
0018 """
0019 Make an HTTP request with exponential backoff retry on transient failures.
0020
0021 Retries on: ConnectionError, Timeout, 502/503/504.
0022 Fails immediately on: 4xx, redirects, other errors.
0023
0024 Args:
0025 method: HTTP method ('get', 'post', etc.)
0026 url: Full URL
0027 session: requests.Session to use (falls back to requests module)
0028 logger: Logger instance
0029 **kwargs: Passed to requests (json, timeout, etc.)
0030
0031 Returns:
0032 requests.Response on success
0033
0034 Raises:
0035 requests.exceptions.RequestException on final failure
0036 """
0037 if logger is None:
0038 logger = logging.getLogger(__name__)
0039 if session is None:
0040 session = requests
0041
0042 kwargs.setdefault('timeout', 10)
0043
0044 last_exception = None
0045 for attempt in range(len(RETRY_DELAYS) + 1):
0046 try:
0047 response = session.request(method, url, **kwargs)
0048
0049 if response.status_code in RETRYABLE_STATUS_CODES:
0050 if attempt < len(RETRY_DELAYS):
0051 delay = RETRY_DELAYS[attempt]
0052 logger.warning(
0053 f"Retryable HTTP {response.status_code} from {method.upper()} {url}, "
0054 f"retry {attempt + 1}/{len(RETRY_DELAYS)} in {delay}s"
0055 )
0056 time.sleep(delay)
0057 continue
0058 else:
0059 response.raise_for_status()
0060
0061 return response
0062
0063 except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
0064 last_exception = e
0065 if attempt < len(RETRY_DELAYS):
0066 delay = RETRY_DELAYS[attempt]
0067 logger.warning(
0068 f"{type(e).__name__} on {method.upper()} {url}, "
0069 f"retry {attempt + 1}/{len(RETRY_DELAYS)} in {delay}s"
0070 )
0071 time.sleep(delay)
0072 else:
0073 raise
0074
0075 raise last_exception
0076
0077
0078 def get_next_agent_id(monitor_url, api_session, logger=None):
0079 """
0080 Get the next agent ID from persistent state API.
0081
0082 Retries indefinitely with capped backoff so that a transient API outage
0083 does not permanently kill the agent (supervisord would hit startretries
0084 and mark the process FATAL).
0085
0086 Args:
0087 monitor_url (str): Base URL of the swf-monitor service
0088 api_session (requests.Session): Configured session with auth headers
0089 logger (logging.Logger, optional): Logger for output, defaults to root logger
0090
0091 Returns:
0092 str: Next agent ID as string
0093 """
0094 if logger is None:
0095 logger = logging.getLogger(__name__)
0096
0097 url = f"{monitor_url}/api/state/next-agent-id/"
0098 attempt = 0
0099 while True:
0100 try:
0101 response = api_request_with_retry('post', url, session=api_session, logger=logger)
0102 response.raise_for_status()
0103
0104 data = response.json()
0105 if data.get('status') == 'success':
0106 agent_id = data.get('agent_id')
0107 logger.info(f"Got next agent ID from persistent state: {agent_id}")
0108 return str(agent_id)
0109 else:
0110 raise RuntimeError(f"API returned error: {data.get('error', 'Unknown error')}")
0111
0112 except Exception as e:
0113 attempt += 1
0114 delay = min(60, 5 * attempt)
0115 logger.warning(
0116 f"Failed to get agent ID (attempt {attempt}): {e} — retrying in {delay}s"
0117 )
0118 time.sleep(delay)
0119
0120
0121 def get_next_run_number(monitor_url, api_session, logger=None):
0122 """
0123 Get the next run number from persistent state API.
0124
0125 Args:
0126 monitor_url (str): Base URL of the swf-monitor service
0127 api_session (requests.Session): Configured session with auth headers
0128 logger (logging.Logger, optional): Logger for output, defaults to root logger
0129
0130 Returns:
0131 str: Next run number as string
0132
0133 Raises:
0134 RuntimeError: If API call fails or returns error
0135 """
0136 if logger is None:
0137 logger = logging.getLogger(__name__)
0138
0139 try:
0140 url = f"{monitor_url}/api/state/next-run-number/"
0141 response = api_request_with_retry('post', url, session=api_session, logger=logger)
0142 response.raise_for_status()
0143
0144 data = response.json()
0145 if data.get('status') == 'success':
0146 run_number = data.get('run_number')
0147 logger.info(f"Got next run number from persistent state: {run_number}")
0148 return str(run_number)
0149 else:
0150 raise RuntimeError(f"API returned error: {data.get('error', 'Unknown error')}")
0151
0152 except Exception as e:
0153 logger.error(f"Failed to get next run number from API: {e}")
0154 raise RuntimeError(f"Critical failure getting run number: {e}") from e
0155
0156
0157 def ensure_namespace(monitor_url, api_session, name, owner=None, logger=None):
0158 """
0159 Ensure a namespace exists in the database, creating it if not.
0160
0161 Args:
0162 monitor_url (str): Base URL of the swf-monitor service
0163 api_session (requests.Session): Configured session with auth headers
0164 name (str): Namespace name
0165 owner (str, optional): Owner username, defaults to current user
0166 logger (logging.Logger, optional): Logger for output
0167
0168 Returns:
0169 dict: Namespace info with keys: name, owner, description, created (bool)
0170
0171 Raises:
0172 RuntimeError: If API call fails or returns error
0173 """
0174 import os
0175
0176 if logger is None:
0177 logger = logging.getLogger(__name__)
0178
0179 if owner is None:
0180 owner = os.getenv('USER', 'unknown')
0181
0182 try:
0183 url = f"{monitor_url}/api/namespaces/ensure/"
0184 payload = {'name': name, 'owner': owner}
0185 response = api_request_with_retry('post', url, session=api_session, logger=logger, json=payload)
0186 response.raise_for_status()
0187
0188 data = response.json()
0189 if data.get('status') == 'success':
0190 if data.get('created'):
0191 logger.info(f"Created namespace '{name}' with owner '{owner}'")
0192 return data
0193 else:
0194 raise RuntimeError(f"API returned error: {data.get('error', 'Unknown error')}")
0195
0196 except Exception as e:
0197 logger.warning(f"Failed to ensure namespace '{name}': {e}")
0198 raise