Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:41

0001 """
0002 This module contains the base class for all agents.
0003 """
0004 
0005 import os
0006 import sys
0007 import time
0008 import signal
0009 import socket
0010 import stomp
0011 import requests
0012 import json
0013 import logging
0014 from pathlib import Path
0015 from typing import Optional
0016 from .api_utils import get_next_agent_id, api_request_with_retry
0017 from .config_utils import load_testbed_config, TestbedConfigError
0018 
0019 
0020 class APIError(Exception):
0021     """Exception raised for API-related failures."""
0022     
0023     def __init__(self, message, response=None, url=None, method=None):
0024         super().__init__(message)
0025         self.response = response
0026         self.url = url
0027         self.method = method
0028 
0029 def setup_environment():
0030     """Auto-activate venv and load environment variables."""
0031     script_dir = Path(__file__).resolve().parent.parent.parent.parent / "swf-testbed"
0032     
0033     # Auto-activate virtual environment if not already active
0034     if "VIRTUAL_ENV" not in os.environ:
0035         venv_path = script_dir / ".venv"
0036         if venv_path.exists():
0037             print("🔧 Auto-activating virtual environment...")
0038             venv_python = venv_path / "bin" / "python"
0039             if venv_python.exists():
0040                 os.environ["VIRTUAL_ENV"] = str(venv_path)
0041                 os.environ["PATH"] = f"{venv_path}/bin:{os.environ['PATH']}"
0042                 sys.executable = str(venv_python)
0043         else:
0044             print("❌ Error: No Python virtual environment found")
0045             return False
0046     
0047     # Load ~/.env environment variables (they're already exported)
0048     env_file = Path.home() / ".env"
0049     if env_file.exists():
0050         print("🔧 Loading environment variables from ~/.env...")
0051         with open(env_file) as f:
0052             for line in f:
0053                 line = line.strip()
0054                 if line and not line.startswith('#') and '=' in line:
0055                     if line.startswith('export '):
0056                         line = line[7:]  # Remove 'export '
0057                     key, value = line.split('=', 1)
0058                     value = value.strip('"\'')
0059                     # Skip entries with unexpanded shell variables (e.g., PATH=$PATH:...)
0060                     # These are already expanded by shell when it sourced ~/.env
0061                     if '$' in value:
0062                         continue
0063                     os.environ[key] = value
0064 
0065     # Unset proxy variables to prevent localhost routing through proxy
0066     for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
0067         if proxy_var in os.environ:
0068             del os.environ[proxy_var]
0069     
0070     return True
0071 
0072 # Auto-setup environment when module is imported (unless already done)
0073 if not os.getenv('SWF_ENV_LOADED'):
0074     setup_environment()
0075     os.environ['SWF_ENV_LOADED'] = 'true'
0076 
0077 # Import the centralized logging from swf-common-lib
0078 from swf_common_lib.rest_logging import setup_rest_logging
0079 
0080 # Configure base logging level with environment overrides
0081 _quiet = os.getenv('SWF_AGENT_QUIET', 'false').lower() in ('1', 'true', 'yes', 'on')
0082 _level_name = os.getenv('SWF_LOG_LEVEL', 'WARNING' if _quiet else 'INFO').upper()
0083 
0084 # Validate log level and provide clear error for invalid values
0085 # Use Python's built-in logging level definitions for maintainability
0086 _valid_levels = set(logging._nameToLevel.keys()) - {'NOTSET'}  # Exclude NOTSET from display
0087 if _level_name not in logging._nameToLevel:
0088     print(f"WARNING: Invalid SWF_LOG_LEVEL '{_level_name}'. Valid levels: {', '.join(sorted(_valid_levels))}. Using INFO.")
0089     _level = logging.INFO
0090 else:
0091     _level = logging._nameToLevel[_level_name]
0092 
0093 logging.basicConfig(level=_level, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
0094 
0095 # STOMP logging is very chatty; enable only if explicitly requested
0096 stomp_logger = logging.getLogger('stomp')
0097 if os.getenv('SWF_STOMP_DEBUG', 'false').lower() in ('1', 'true', 'yes', 'on'):
0098     stomp_logger.setLevel(logging.DEBUG)
0099     _stomp_handler = logging.StreamHandler()
0100     _stomp_handler.setLevel(logging.DEBUG)
0101     _stomp_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s'))
0102     stomp_logger.addHandler(_stomp_handler)
0103 else:
0104     stomp_logger.setLevel(logging.WARNING)
0105 
0106 
0107 class BaseAgent(stomp.ConnectionListener):
0108     """
0109     A base class for creating standalone STF workflow agents.
0110 
0111     This class handles the common tasks of:
0112     - Connecting to the ActiveMQ message broker (and inheriting from stomp.ConnectionListener).
0113     - Communicating with the swf-monitor REST API.
0114     - Running a persistent process with graceful shutdown.
0115     """
0116 
0117     # Standard workflow message types
0118     WORKFLOW_MESSAGE_TYPES = {
0119         'run_imminent', 'start_run', 'pause_run', 'resume_run', 'end_run',
0120         'stf_gen', 'stf_ready', 'tf_file_registered'
0121     }
0122 
0123     def __init__(self, agent_type, subscription_queue, debug=False,
0124                  config_path: Optional[str] = None):
0125         """
0126         Initialize BaseAgent.
0127 
0128         Args:
0129             agent_type: Type of agent (e.g., 'DATA', 'PROCESSING')
0130             subscription_queue: ActiveMQ destination with explicit prefix.
0131                 Must start with '/queue/' (anycast) or '/topic/' (multicast).
0132                 Example: '/queue/workflow_control' or '/topic/epictopic'
0133             debug: Enable debug logging
0134             config_path: Path to testbed.toml config file
0135 
0136         Raises:
0137             ValueError: If subscription_queue doesn't have /queue/ or /topic/ prefix
0138         """
0139         # Validate destination has explicit prefix (required for Artemis routing)
0140         if not subscription_queue.startswith('/queue/') and not subscription_queue.startswith('/topic/'):
0141             raise ValueError(
0142                 f"subscription_queue must start with '/queue/' or '/topic/', got: '{subscription_queue}'. "
0143                 f"Use '/queue/{subscription_queue}' for anycast or '/topic/{subscription_queue}' for multicast."
0144             )
0145 
0146         self.agent_type = agent_type
0147         self.subscription_queue = subscription_queue
0148         self.DEBUG = debug
0149 
0150         # Resolve config path: explicit arg > SWF_TESTBED_CONFIG env var > default
0151         if config_path is None:
0152             env_config = os.getenv('SWF_TESTBED_CONFIG')
0153             if env_config:
0154                 # Env var is filename, resolve to workflows/ directory
0155                 if not env_config.startswith('workflows/') and '/' not in env_config:
0156                     config_path = f'workflows/{env_config}'
0157                 else:
0158                     config_path = env_config
0159             else:
0160                 config_path = 'workflows/testbed.toml'
0161         self.config_path = config_path  # Store for subclasses
0162 
0163         # Load testbed configuration (namespace)
0164         self.namespace = None
0165         try:
0166             config = load_testbed_config(config_path=config_path)
0167             self.namespace = config.namespace
0168             logging.info(f"Namespace: {self.namespace}")
0169         except TestbedConfigError as e:
0170             logging.error(f"Configuration error: {e}")
0171             raise
0172 
0173         # Configuration from environment variables (needed for agent ID API call)
0174         self.monitor_url = (os.getenv('SWF_MONITOR_URL') or '').rstrip('/')
0175         self.api_token = os.getenv('SWF_API_TOKEN')
0176 
0177         # Set up API session (needed for agent ID call)
0178         import requests
0179         self.api = requests.Session()
0180         if self.api_token:
0181             self.api.headers.update({'Authorization': f'Token {self.api_token}'})
0182 
0183         # Create unique agent name with username and sequential ID
0184         import getpass
0185         self.username = getpass.getuser()
0186         agent_id = self.get_next_agent_id()
0187         self.agent_name = f"{self.agent_type.lower()}-agent-{self.username}-{agent_id}"
0188 
0189         # Workflow context tracking (populated from messages)
0190         self.current_execution_id = None
0191         self.current_run_id = None
0192 
0193         # Process identification for agent management
0194         self.pid = os.getpid()
0195         self.hostname = socket.gethostname()
0196         self.operational_state = 'STARTING'  # STARTING, READY, PROCESSING, EXITED
0197 
0198         # Use HTTP URL for REST logging (no auth required)
0199         self.base_url = (os.getenv('SWF_MONITOR_HTTP_URL') or '').rstrip('/')
0200         self.mq_host = os.getenv('ACTIVEMQ_HOST', 'localhost')
0201         self.mq_port = int(os.getenv('ACTIVEMQ_PORT', 61612))  # STOMP port for Artemis on this system
0202         self.mq_user = os.getenv('ACTIVEMQ_USER', 'admin')
0203         self.mq_password = os.getenv('ACTIVEMQ_PASSWORD', 'admin')
0204         
0205         # SSL configuration
0206         self.use_ssl = os.getenv('ACTIVEMQ_USE_SSL', 'False').lower() == 'true'
0207         self.ssl_ca_certs = os.getenv('ACTIVEMQ_SSL_CA_CERTS', '')
0208         self.ssl_cert_file = os.getenv('ACTIVEMQ_SSL_CERT_FILE', '')
0209         self.ssl_key_file = os.getenv('ACTIVEMQ_SSL_KEY_FILE', '')
0210         
0211         # Set up centralized REST logging
0212         self.logger = setup_rest_logging('base_agent', self.agent_name, self.base_url)
0213 
0214         # Create connection with proper heartbeat configuration
0215         self.conn = stomp.Connection(
0216             host_and_ports=[(self.mq_host, self.mq_port)],
0217             vhost=self.mq_host,
0218             try_loopback_connect=False,
0219             heartbeats=(30000, 30000),  # Enable automatic heartbeat sending
0220             auto_content_length=False
0221         )
0222         
0223         # Configure SSL if enabled - must be done before set_listener
0224         if self.use_ssl:
0225             import ssl
0226             logging.info(f"Configuring SSL connection with CA certs: {self.ssl_ca_certs}")
0227             
0228             if self.ssl_ca_certs:
0229                 # Configure SSL transport
0230                 self.conn.transport.set_ssl(
0231                     for_hosts=[(self.mq_host, self.mq_port)],
0232                     ca_certs=self.ssl_ca_certs,
0233                     ssl_version=ssl.PROTOCOL_TLS_CLIENT
0234                 )
0235                 logging.info("SSL transport configured successfully")
0236             else:
0237                 logging.warning("SSL enabled but no CA certificate file specified")
0238         
0239         self.conn.set_listener('', self)
0240         
0241         # For localhost development, disable SSL verification and proxy
0242         if 'localhost' in self.monitor_url or '127.0.0.1' in self.monitor_url:
0243             self.api.verify = False
0244             # Disable proxy for localhost connections
0245             self.api.proxies = {
0246                 'http': '',
0247                 'https': ''
0248             }
0249             import urllib3
0250             urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
0251 
0252     def _log_extra(self, **kwargs):
0253         """
0254         Build extra dict for logging with common context fields.
0255 
0256         Automatically includes username, execution_id, and run_id when set.
0257         Subclasses can override to add additional fields, calling super()._log_extra().
0258 
0259         Usage:
0260             self.logger.info("Message", extra=self._log_extra(custom_field=value))
0261         """
0262         extra = {'username': self.username}
0263         if self.current_execution_id:
0264             extra['execution_id'] = self.current_execution_id
0265         if self.current_run_id:
0266             extra['run_id'] = self.current_run_id
0267         extra.update(kwargs)
0268         return extra
0269 
0270     def run(self):
0271         """
0272         Connects to the message broker and runs the agent's main loop.
0273         """
0274         # Register signal handlers for graceful shutdown
0275         def signal_handler(signum, frame):
0276             sig_name = signal.Signals(signum).name
0277             logging.info(f"Received {sig_name}, initiating graceful shutdown...")
0278             raise KeyboardInterrupt(f"Received {sig_name}")
0279 
0280         signal.signal(signal.SIGTERM, signal_handler)
0281         signal.signal(signal.SIGQUIT, signal_handler)
0282 
0283         logging.info(f"Starting {self.agent_name}...")
0284 
0285         # Connect if not already connected (some subclasses connect in __init__)
0286         if not getattr(self, 'mq_connected', False):
0287             max_retries = 3
0288             retry_delay = 5
0289             for attempt in range(1, max_retries + 1):
0290                 logging.info(f"Connecting to ActiveMQ at {self.mq_host}:{self.mq_port} (attempt {attempt}/{max_retries})")
0291                 try:
0292                     self.conn.connect(
0293                         self.mq_user,
0294                         self.mq_password,
0295                         wait=True,
0296                         version='1.1',
0297                         headers={
0298                             'client-id': self.agent_name,
0299                             'heart-beat': '30000,30000'
0300                         }
0301                     )
0302                     self.mq_connected = True
0303                     break
0304                 except Exception as e:
0305                     logging.warning(f"Connection attempt {attempt} failed: {e}")
0306                     if attempt < max_retries:
0307                         logging.info(f"Retrying in {retry_delay} seconds...")
0308                         time.sleep(retry_delay)
0309                     else:
0310                         logging.error(f"Failed to connect after {max_retries} attempts")
0311                         raise
0312 
0313         try:
0314             self.conn.subscribe(destination=self.subscription_queue, id=1, ack='auto')
0315             logging.info(f"Subscribed to queue: '{self.subscription_queue}'")
0316 
0317             # Register as subscriber in monitor
0318             self.register_subscriber()
0319 
0320             # Agent is now ready and waiting for work
0321             self.set_ready()
0322 
0323             # Initial registration/heartbeat
0324             try:
0325                 self.send_heartbeat()
0326             except Exception:
0327                 logging.warning("Initial heartbeat failed — server may be restarting, will retry")
0328 
0329             logging.info(f"{self.agent_name} is running. Press Ctrl+C to stop.")
0330             while True:
0331                 time.sleep(60) # Keep the main thread alive, heartbeats can be added here
0332 
0333                 # Check connection status and attempt reconnection if needed
0334                 if not self.mq_connected:
0335                     self._attempt_reconnect()
0336 
0337                 try:
0338                     self.send_heartbeat()
0339                 except Exception:
0340                     logging.warning("Heartbeat failed — server may be restarting, will retry next cycle")
0341 
0342         except KeyboardInterrupt:
0343             logging.info(f"Stopping {self.agent_name}...")
0344         except stomp.exception.ConnectFailedException as e:
0345             self.mq_connected = False
0346             logging.error(f"Failed to connect to ActiveMQ: {e}")
0347             logging.error("Please check the connection details and ensure ActiveMQ is running.")
0348         except Exception as e:
0349             self.mq_connected = False
0350             logging.error(f"An unexpected error occurred: {e}")
0351             import traceback
0352             traceback.print_exc()
0353         finally:
0354             # Report exit status before disconnecting
0355             try:
0356                 self.operational_state = 'EXITED'
0357                 self.report_agent_status("EXITED", "Agent shutdown")
0358             except Exception as e:
0359                 logging.warning(f"Failed to report exit status: {e}")
0360 
0361             if self.conn and self.conn.is_connected():
0362                 self.conn.disconnect()
0363                 self.mq_connected = False
0364                 logging.info("Disconnected from ActiveMQ.")
0365 
0366     def on_connected(self, frame):
0367         """Handle successful connection to ActiveMQ."""
0368         logging.info(f"Successfully connected to ActiveMQ: {frame.headers}")
0369         self.mq_connected = True
0370     
0371     def on_error(self, frame):
0372         logging.error(f'Received an error from ActiveMQ: body="{frame.body}", headers={frame.headers}, cmd="{frame.cmd}"')
0373         self.mq_connected = False
0374     
0375     def on_disconnected(self):
0376         """Handle disconnection from ActiveMQ."""
0377         logging.warning("Disconnected from ActiveMQ - will attempt reconnection")
0378         self.mq_connected = False
0379         # Send heartbeat to update status, but don't let failures crash the receiver thread
0380         try:
0381             self.send_heartbeat()
0382         except Exception as e:
0383             logging.warning(f"Heartbeat failed during disconnect: {e}")
0384 
0385     def _attempt_reconnect(self):
0386         """Attempt to reconnect to ActiveMQ."""
0387         if self.mq_connected:
0388             return True
0389             
0390         try:
0391             logging.info("Attempting to reconnect to ActiveMQ...")
0392             if self.conn.is_connected():
0393                 self.conn.disconnect()
0394             
0395             self.conn.connect(
0396                 self.mq_user,
0397                 self.mq_password,
0398                 wait=True,
0399                 version='1.1',
0400                 headers={
0401                     'client-id': self.agent_name,
0402                     'heart-beat': '30000,30000'  # Send heartbeat every 30sec, expect server every 30sec
0403                 }
0404             )
0405             
0406             self.conn.subscribe(destination=self.subscription_queue, id=1, ack='auto')
0407             self.mq_connected = True
0408             logging.info("Successfully reconnected to ActiveMQ")
0409             return True
0410             
0411         except Exception as e:
0412             logging.warning(f"Reconnection attempt failed: {e}")
0413             self.mq_connected = False
0414             return False
0415 
0416     def on_message(self, frame):
0417         """
0418         Callback for handling incoming messages.
0419         This method must be implemented by subclasses.
0420         """
0421         raise NotImplementedError("Subclasses must implement on_message")
0422 
0423     def log_received_message(self, frame, known_types=None):
0424         """
0425         Helper method to log received messages with type information and namespace filtering.
0426         Agents can call this at the start of their on_message method.
0427 
0428         Namespace filtering:
0429         - If agent has namespace AND message has different namespace → returns (None, None)
0430         - If message has no namespace → processes it (backward compat)
0431         - If agent has no namespace → processes all messages (backward compat)
0432 
0433         Args:
0434             frame: The STOMP message frame
0435             known_types: Optional set/list of known message types (defaults to WORKFLOW_MESSAGE_TYPES)
0436 
0437         Returns:
0438             tuple: (message_data, msg_type) for convenience, or (None, None) if filtered
0439 
0440         Raises:
0441             RuntimeError: If message parsing fails
0442         """
0443         if known_types is None:
0444             known_types = self.WORKFLOW_MESSAGE_TYPES
0445 
0446         try:
0447             message_data = json.loads(frame.body)
0448             msg_type = message_data.get('msg_type', 'unknown')
0449 
0450             # Namespace filtering
0451             msg_namespace = message_data.get('namespace')
0452             if self.namespace and msg_namespace and msg_namespace != self.namespace:
0453                 logging.debug(
0454                     f"Ignoring message from namespace '{msg_namespace}' (ours: '{self.namespace}')"
0455                 )
0456                 return None, None
0457 
0458             if msg_type not in known_types:
0459                 logging.info(f"{self.agent_type} agent received unknown message type: {msg_type}", extra={"msg_type": msg_type})
0460             else:
0461                 logging.info(f"{self.agent_type} agent received message: {msg_type}")
0462 
0463             return message_data, msg_type
0464         except json.JSONDecodeError as e:
0465             logging.error(f"CRITICAL: Failed to parse message JSON: {e}")
0466             raise RuntimeError(f"Message parsing failed - agent cannot continue: {e}") from e
0467 
0468     # -------------------------------------------------------------------------
0469     # Processing State API
0470     # -------------------------------------------------------------------------
0471 
0472     def set_processing(self):
0473         """
0474         Declare that the agent is actively doing work.
0475 
0476         Call this when the agent begins a unit of work (which may span multiple
0477         messages). The agent will report PROCESSING state in heartbeats until
0478         set_ready() is called.
0479 
0480         Example:
0481             def on_message(self, frame):
0482                 message_data, msg_type = self.log_received_message(frame)
0483                 if msg_type == 'start_run':
0484                     self.set_processing()
0485                     # ... do work ...
0486         """
0487         self.operational_state = 'PROCESSING'
0488         logging.info(f"{self.agent_name} state -> PROCESSING")
0489 
0490     def set_ready(self):
0491         """
0492         Declare that the agent is idle, waiting for work.
0493 
0494         Call this when a unit of work is complete and the agent is ready
0495         to receive new work.
0496 
0497         Example:
0498             def on_message(self, frame):
0499                 message_data, msg_type = self.log_received_message(frame)
0500                 if msg_type == 'end_run':
0501                     # ... finalize work ...
0502                     self.set_ready()
0503         """
0504         self.operational_state = 'READY'
0505         logging.info(f"{self.agent_name} state -> READY")
0506 
0507     def processing(self):
0508         """
0509         Context manager for bounded processing work.
0510 
0511         Use this when a unit of work is contained within a single code block.
0512         Automatically sets PROCESSING on entry and READY on exit.
0513 
0514         Example:
0515             def on_message(self, frame):
0516                 message_data, msg_type = self.log_received_message(frame)
0517                 if msg_type == 'process_stf':
0518                     with self.processing():
0519                         # ... do bounded work ...
0520                     # automatically back to READY
0521         """
0522         agent = self
0523 
0524         class ProcessingContext:
0525             def __enter__(self):
0526                 agent.set_processing()
0527                 return self
0528 
0529             def __exit__(self, exc_type, exc_val, exc_tb):
0530                 agent.set_ready()
0531                 return False  # Don't suppress exceptions
0532 
0533         return ProcessingContext()
0534 
0535     def get_next_agent_id(self):
0536         """Get the next agent ID from persistent state API."""
0537         return get_next_agent_id(self.monitor_url, self.api, logging.getLogger(__name__))
0538 
0539     def send_message(self, destination, message_body):
0540         """
0541         Sends a JSON message to a specific destination.
0542 
0543         Args:
0544             destination: ActiveMQ destination with explicit prefix.
0545                 Must start with '/queue/' (anycast) or '/topic/' (multicast).
0546             message_body: Dict to send as JSON. 'sender' and 'namespace' auto-injected.
0547 
0548         Raises:
0549             ValueError: If destination doesn't have /queue/ or /topic/ prefix
0550         """
0551         # Validate destination has explicit prefix
0552         if not destination.startswith('/queue/') and not destination.startswith('/topic/'):
0553             raise ValueError(
0554                 f"destination must start with '/queue/' or '/topic/', got: '{destination}'. "
0555                 f"Use '/queue/{destination}' for anycast or '/topic/{destination}' for multicast."
0556             )
0557 
0558         # Auto-inject sender and namespace
0559         message_body['sender'] = self.agent_name
0560         if self.namespace:
0561             message_body['namespace'] = self.namespace
0562         else:
0563             logging.warning(
0564                 f"Sending message without namespace (msg_type={message_body.get('msg_type', 'unknown')}). "
0565                 "Configure namespace in testbed.toml to enable namespace filtering."
0566             )
0567 
0568         try:
0569             self.conn.send(body=json.dumps(message_body), destination=destination)
0570             logging.info(f"Sent message to '{destination}': {message_body}")
0571         except Exception as e:
0572             logging.error(f"Failed to send message to '{destination}': {e}")
0573             
0574             # Check for SSL/connection errors that indicate disconnection
0575             if any(error_type in str(e).lower() for error_type in ['ssl', 'eof', 'connection', 'broken pipe']):
0576                 logging.warning("Connection error detected - attempting recovery")
0577                 self.mq_connected = False
0578                 time.sleep(1)  # Brief pause before retry
0579                 if self._attempt_reconnect():
0580                     try:
0581                         self.conn.send(body=json.dumps(message_body), destination=destination)
0582                         logging.info(f"Message sent successfully after reconnection to '{destination}'")
0583                     except Exception as retry_e:
0584                         logging.error(f"Retry failed after reconnection: {retry_e}")
0585                 else:
0586                     logging.error("Reconnection failed - message lost")
0587 
0588     def _api_request(self, method, endpoint, json_data=None):
0589         """
0590         Helper method to make a request to the monitor API.
0591         Retries on transient failures (connection errors, 502/503/504).
0592         Fails immediately on 4xx and redirects.
0593         """
0594         url = f"{self.monitor_url}/api{endpoint}"
0595         try:
0596             response = api_request_with_retry(
0597                 method, url, session=self.api, logger=logging.getLogger(__name__),
0598                 json=json_data, allow_redirects=False,
0599             )
0600             if 300 <= response.status_code < 400:
0601                 loc = response.headers.get('Location', 'unknown')
0602                 msg = (f"API redirect (HTTP {response.status_code}) to {loc}. "
0603                        f"If behind Apache/OIDC, ensure API requests aren't redirected and Authorization is forwarded.")
0604                 logging.error(msg)
0605                 raise APIError(msg, response=response, url=url, method=method.upper())
0606             response.raise_for_status()
0607             return response.json()
0608         except requests.exceptions.RequestException as e:
0609             if hasattr(e, 'response') and e.response is not None and e.response.status_code == 400:
0610                 response_text = e.response.text.lower()
0611                 if "already exists" in response_text and "subscriber" in response_text:
0612                     logging.info(f"Resource already exists (normal): {method.upper()} {url}")
0613                     return {"status": "already_exists"}
0614 
0615             logging.error(f"API request FAILED: {method.upper()} {url} - {e}")
0616             if hasattr(e, 'response') and e.response is not None:
0617                 logging.error(f"Response status: {e.response.status_code}")
0618                 logging.error(f"Response body: {e.response.text}")
0619             raise APIError(f"Critical API failure - agent cannot continue: {method.upper()} {url} - {e}",
0620                           response=getattr(e, 'response', None), url=url, method=method.upper()) from e
0621 
0622     def send_heartbeat(self):
0623         """Registers the agent and sends a heartbeat to the monitor."""
0624         if self.DEBUG:
0625             logging.info("Sending heartbeat to monitor...")
0626         
0627         # Determine overall status based on MQ connection
0628         status = "OK" if getattr(self, 'mq_connected', False) else "WARNING"
0629         
0630         # Build description with connection details
0631         mq_status = "connected" if getattr(self, 'mq_connected', False) else "disconnected"
0632         description = f"{self.agent_type} agent. MQ: {mq_status}"
0633         
0634         payload = {
0635             "instance_name": self.agent_name,
0636             "agent_type": self.agent_type,
0637             "status": status,
0638             "description": description,
0639             "mq_connected": getattr(self, 'mq_connected', False),
0640             "pid": self.pid,
0641             "hostname": self.hostname,
0642             "operational_state": self.operational_state,
0643         }
0644 
0645         # Include namespace if configured
0646         if self.namespace:
0647             payload["namespace"] = self.namespace
0648 
0649         result = self._api_request('post', '/systemagents/heartbeat/', payload)
0650         if result:
0651             if self.DEBUG:
0652                 logging.info(f"Heartbeat sent successfully. Status: {status}, MQ: {mq_status}")
0653         else:
0654             logging.warning("Failed to send heartbeat to monitor")
0655     
0656     def send_enhanced_heartbeat(self, workflow_metadata=None):
0657         """Send heartbeat with optional workflow metadata."""
0658         if self.DEBUG:
0659             logging.info("Sending heartbeat to monitor...")
0660         
0661         # Determine overall status based on MQ connection
0662         status = "OK" if getattr(self, 'mq_connected', False) else "WARNING"
0663         
0664         # Build description with connection details
0665         mq_status = "connected" if getattr(self, 'mq_connected', False) else "disconnected"
0666         description_parts = [f"{self.agent_type} agent", f"MQ: {mq_status}"]
0667         
0668         # Add workflow context if provided
0669         if workflow_metadata:
0670             for key, value in workflow_metadata.items():
0671                 description_parts.append(f"{key}: {value}")
0672         
0673         description = ". ".join(description_parts)
0674         
0675         payload = {
0676             "instance_name": self.agent_name,
0677             "agent_type": self.agent_type,
0678             "status": status,
0679             "description": description,
0680             "mq_connected": getattr(self, 'mq_connected', False),
0681             "pid": self.pid,
0682             "hostname": self.hostname,
0683             "operational_state": self.operational_state,
0684             # Include workflow metadata in agent record
0685             "workflow_enabled": True if workflow_metadata else False,
0686             "current_stf_count": workflow_metadata.get('active_tasks', 0) if workflow_metadata else 0,
0687             "total_stf_processed": workflow_metadata.get('completed_tasks', 0) if workflow_metadata else 0
0688         }
0689 
0690         # Include namespace if configured
0691         if self.namespace:
0692             payload["namespace"] = self.namespace
0693 
0694         result = self._api_request('post', '/systemagents/heartbeat/', payload)
0695         if result:
0696             if self.DEBUG:
0697                 logging.info(f"Heartbeat sent successfully")
0698             return True
0699         else:
0700             logging.warning("Failed to send heartbeat to monitor")
0701             return False
0702 
0703     def report_agent_status(self, status, message=None, error_details=None):
0704         """Report agent status change to monitor."""
0705         logging.info(f"Reporting agent status: {status}")
0706 
0707         description_parts = [f"{self.agent_type} agent"]
0708         if message:
0709             description_parts.append(message)
0710         if error_details:
0711             description_parts.append(f"Error: {error_details}")
0712 
0713         payload = {
0714             "instance_name": self.agent_name,
0715             "agent_type": self.agent_type,
0716             "status": status,
0717             "description": ". ".join(description_parts),
0718             "mq_connected": getattr(self, 'mq_connected', False),
0719             "pid": self.pid,
0720             "hostname": self.hostname,
0721             "operational_state": self.operational_state,
0722         }
0723 
0724         # Include namespace if configured
0725         if self.namespace:
0726             payload["namespace"] = self.namespace
0727 
0728         result = self._api_request('post', '/systemagents/heartbeat/', payload)
0729         if result:
0730             logging.info(f"Status reported successfully: {status}")
0731             return True
0732         else:
0733             logging.warning(f"Failed to report status: {status}")
0734             return False
0735 
0736     def check_monitor_health(self):
0737         """Check if monitor API is available."""
0738         try:
0739             result = self._api_request('get', '/systemagents/', None)
0740             if result is not None:
0741                 logging.info("Monitor API is healthy")
0742                 return True
0743             else:
0744                 logging.warning("Monitor API is not responding")
0745                 return False
0746         except Exception as e:
0747             logging.error(f"Monitor health check failed: {e}")
0748             return False
0749     
0750     def call_monitor_api(self, method, endpoint, json_data=None):
0751         """Generic monitor API call method for agent-specific implementations."""
0752         return self._api_request(method.lower(), endpoint, json_data)
0753     
0754     def register_subscriber(self):
0755         """Register this agent as a subscriber to its ActiveMQ queue."""
0756         logging.info(f"Registering subscriber for queue '{self.subscription_queue}'...")
0757         
0758         subscriber_data = {
0759             "subscriber_name": f"{self.agent_name}-{self.subscription_queue}",
0760             "description": f"{self.agent_type} agent subscribing to {self.subscription_queue}",
0761             "is_active": True,
0762             "fraction": 1.0  # Receives all messages
0763         }
0764         
0765         try:
0766             result = self._api_request('post', '/subscribers/', subscriber_data)
0767             if result:
0768                 if result.get('status') == 'already_exists':
0769                     logging.info(f"Subscriber already registered: {subscriber_data['subscriber_name']}")
0770                     return True
0771                 else:
0772                     logging.info(f"Subscriber registered successfully: {result.get('subscriber_name')}")
0773                     return True
0774             else:
0775                 logging.error("Failed to register subscriber")
0776                 return False
0777         except Exception as e:
0778             # Other registration failures are critical
0779             logging.error(f"Critical subscriber registration failure: {e}")
0780             raise e