Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:41

0001 """
0002 API utility functions for swf-monitor communication.
0003 
0004 These utilities are shared between BaseAgent and other components that need
0005 to interact with the swf-monitor REST API but don't inherit from BaseAgent.
0006 """
0007 
0008 import time
0009 import logging
0010 import requests
0011 
0012 
0013 RETRY_DELAYS = (2, 5, 10, 20, 40, 60)
0014 RETRYABLE_STATUS_CODES = {404, 500, 502, 503, 504}
0015 
0016 
0017 def api_request_with_retry(method, url, session=None, logger=None, **kwargs):
0018     """
0019     Make an HTTP request with exponential backoff retry on transient failures.
0020 
0021     Retries on: ConnectionError, Timeout, 502/503/504.
0022     Fails immediately on: 4xx, redirects, other errors.
0023 
0024     Args:
0025         method: HTTP method ('get', 'post', etc.)
0026         url: Full URL
0027         session: requests.Session to use (falls back to requests module)
0028         logger: Logger instance
0029         **kwargs: Passed to requests (json, timeout, etc.)
0030 
0031     Returns:
0032         requests.Response on success
0033 
0034     Raises:
0035         requests.exceptions.RequestException on final failure
0036     """
0037     if logger is None:
0038         logger = logging.getLogger(__name__)
0039     if session is None:
0040         session = requests
0041 
0042     kwargs.setdefault('timeout', 10)
0043 
0044     last_exception = None
0045     for attempt in range(len(RETRY_DELAYS) + 1):
0046         try:
0047             response = session.request(method, url, **kwargs)
0048 
0049             if response.status_code in RETRYABLE_STATUS_CODES:
0050                 if attempt < len(RETRY_DELAYS):
0051                     delay = RETRY_DELAYS[attempt]
0052                     logger.warning(
0053                         f"Retryable HTTP {response.status_code} from {method.upper()} {url}, "
0054                         f"retry {attempt + 1}/{len(RETRY_DELAYS)} in {delay}s"
0055                     )
0056                     time.sleep(delay)
0057                     continue
0058                 else:
0059                     response.raise_for_status()
0060 
0061             return response
0062 
0063         except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
0064             last_exception = e
0065             if attempt < len(RETRY_DELAYS):
0066                 delay = RETRY_DELAYS[attempt]
0067                 logger.warning(
0068                     f"{type(e).__name__} on {method.upper()} {url}, "
0069                     f"retry {attempt + 1}/{len(RETRY_DELAYS)} in {delay}s"
0070                 )
0071                 time.sleep(delay)
0072             else:
0073                 raise
0074 
0075     raise last_exception
0076 
0077 
0078 def get_next_agent_id(monitor_url, api_session, logger=None):
0079     """
0080     Get the next agent ID from persistent state API.
0081 
0082     Retries indefinitely with capped backoff so that a transient API outage
0083     does not permanently kill the agent (supervisord would hit startretries
0084     and mark the process FATAL).
0085 
0086     Args:
0087         monitor_url (str): Base URL of the swf-monitor service
0088         api_session (requests.Session): Configured session with auth headers
0089         logger (logging.Logger, optional): Logger for output, defaults to root logger
0090 
0091     Returns:
0092         str: Next agent ID as string
0093     """
0094     if logger is None:
0095         logger = logging.getLogger(__name__)
0096 
0097     url = f"{monitor_url}/api/state/next-agent-id/"
0098     attempt = 0
0099     while True:
0100         try:
0101             response = api_request_with_retry('post', url, session=api_session, logger=logger)
0102             response.raise_for_status()
0103 
0104             data = response.json()
0105             if data.get('status') == 'success':
0106                 agent_id = data.get('agent_id')
0107                 logger.info(f"Got next agent ID from persistent state: {agent_id}")
0108                 return str(agent_id)
0109             else:
0110                 raise RuntimeError(f"API returned error: {data.get('error', 'Unknown error')}")
0111 
0112         except Exception as e:
0113             attempt += 1
0114             delay = min(60, 5 * attempt)  # 5, 10, 15, ... capped at 60s
0115             logger.warning(
0116                 f"Failed to get agent ID (attempt {attempt}): {e} — retrying in {delay}s"
0117             )
0118             time.sleep(delay)
0119 
0120 
0121 def get_next_run_number(monitor_url, api_session, logger=None):
0122     """
0123     Get the next run number from persistent state API.
0124 
0125     Args:
0126         monitor_url (str): Base URL of the swf-monitor service
0127         api_session (requests.Session): Configured session with auth headers
0128         logger (logging.Logger, optional): Logger for output, defaults to root logger
0129 
0130     Returns:
0131         str: Next run number as string
0132 
0133     Raises:
0134         RuntimeError: If API call fails or returns error
0135     """
0136     if logger is None:
0137         logger = logging.getLogger(__name__)
0138 
0139     try:
0140         url = f"{monitor_url}/api/state/next-run-number/"
0141         response = api_request_with_retry('post', url, session=api_session, logger=logger)
0142         response.raise_for_status()
0143 
0144         data = response.json()
0145         if data.get('status') == 'success':
0146             run_number = data.get('run_number')
0147             logger.info(f"Got next run number from persistent state: {run_number}")
0148             return str(run_number)
0149         else:
0150             raise RuntimeError(f"API returned error: {data.get('error', 'Unknown error')}")
0151 
0152     except Exception as e:
0153         logger.error(f"Failed to get next run number from API: {e}")
0154         raise RuntimeError(f"Critical failure getting run number: {e}") from e
0155 
0156 
0157 def ensure_namespace(monitor_url, api_session, name, owner=None, logger=None):
0158     """
0159     Ensure a namespace exists in the database, creating it if not.
0160 
0161     Args:
0162         monitor_url (str): Base URL of the swf-monitor service
0163         api_session (requests.Session): Configured session with auth headers
0164         name (str): Namespace name
0165         owner (str, optional): Owner username, defaults to current user
0166         logger (logging.Logger, optional): Logger for output
0167 
0168     Returns:
0169         dict: Namespace info with keys: name, owner, description, created (bool)
0170 
0171     Raises:
0172         RuntimeError: If API call fails or returns error
0173     """
0174     import os
0175 
0176     if logger is None:
0177         logger = logging.getLogger(__name__)
0178 
0179     if owner is None:
0180         owner = os.getenv('USER', 'unknown')
0181 
0182     try:
0183         url = f"{monitor_url}/api/namespaces/ensure/"
0184         payload = {'name': name, 'owner': owner}
0185         response = api_request_with_retry('post', url, session=api_session, logger=logger, json=payload)
0186         response.raise_for_status()
0187 
0188         data = response.json()
0189         if data.get('status') == 'success':
0190             if data.get('created'):
0191                 logger.info(f"Created namespace '{name}' with owner '{owner}'")
0192             return data
0193         else:
0194             raise RuntimeError(f"API returned error: {data.get('error', 'Unknown error')}")
0195 
0196     except Exception as e:
0197         logger.warning(f"Failed to ensure namespace '{name}': {e}")
0198         raise