File indexing completed on 2026-04-27 07:41:41
0001 """
0002 This module contains the base class for all agents.
0003 """
0004
0005 import os
0006 import sys
0007 import time
0008 import signal
0009 import socket
0010 import stomp
0011 import requests
0012 import json
0013 import logging
0014 from pathlib import Path
0015 from typing import Optional
0016 from .api_utils import get_next_agent_id, api_request_with_retry
0017 from .config_utils import load_testbed_config, TestbedConfigError
0018
0019
0020 class APIError(Exception):
0021 """Exception raised for API-related failures."""
0022
0023 def __init__(self, message, response=None, url=None, method=None):
0024 super().__init__(message)
0025 self.response = response
0026 self.url = url
0027 self.method = method
0028
0029 def setup_environment():
0030 """Auto-activate venv and load environment variables."""
0031 script_dir = Path(__file__).resolve().parent.parent.parent.parent / "swf-testbed"
0032
0033
0034 if "VIRTUAL_ENV" not in os.environ:
0035 venv_path = script_dir / ".venv"
0036 if venv_path.exists():
0037 print("🔧 Auto-activating virtual environment...")
0038 venv_python = venv_path / "bin" / "python"
0039 if venv_python.exists():
0040 os.environ["VIRTUAL_ENV"] = str(venv_path)
0041 os.environ["PATH"] = f"{venv_path}/bin:{os.environ['PATH']}"
0042 sys.executable = str(venv_python)
0043 else:
0044 print("❌ Error: No Python virtual environment found")
0045 return False
0046
0047
0048 env_file = Path.home() / ".env"
0049 if env_file.exists():
0050 print("🔧 Loading environment variables from ~/.env...")
0051 with open(env_file) as f:
0052 for line in f:
0053 line = line.strip()
0054 if line and not line.startswith('#') and '=' in line:
0055 if line.startswith('export '):
0056 line = line[7:]
0057 key, value = line.split('=', 1)
0058 value = value.strip('"\'')
0059
0060
0061 if '$' in value:
0062 continue
0063 os.environ[key] = value
0064
0065
0066 for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
0067 if proxy_var in os.environ:
0068 del os.environ[proxy_var]
0069
0070 return True
0071
0072
0073 if not os.getenv('SWF_ENV_LOADED'):
0074 setup_environment()
0075 os.environ['SWF_ENV_LOADED'] = 'true'
0076
0077
0078 from swf_common_lib.rest_logging import setup_rest_logging
0079
0080
0081 _quiet = os.getenv('SWF_AGENT_QUIET', 'false').lower() in ('1', 'true', 'yes', 'on')
0082 _level_name = os.getenv('SWF_LOG_LEVEL', 'WARNING' if _quiet else 'INFO').upper()
0083
0084
0085
0086 _valid_levels = set(logging._nameToLevel.keys()) - {'NOTSET'}
0087 if _level_name not in logging._nameToLevel:
0088 print(f"WARNING: Invalid SWF_LOG_LEVEL '{_level_name}'. Valid levels: {', '.join(sorted(_valid_levels))}. Using INFO.")
0089 _level = logging.INFO
0090 else:
0091 _level = logging._nameToLevel[_level_name]
0092
0093 logging.basicConfig(level=_level, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
0094
0095
0096 stomp_logger = logging.getLogger('stomp')
0097 if os.getenv('SWF_STOMP_DEBUG', 'false').lower() in ('1', 'true', 'yes', 'on'):
0098 stomp_logger.setLevel(logging.DEBUG)
0099 _stomp_handler = logging.StreamHandler()
0100 _stomp_handler.setLevel(logging.DEBUG)
0101 _stomp_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(name)s - %(message)s'))
0102 stomp_logger.addHandler(_stomp_handler)
0103 else:
0104 stomp_logger.setLevel(logging.WARNING)
0105
0106
0107 class BaseAgent(stomp.ConnectionListener):
0108 """
0109 A base class for creating standalone STF workflow agents.
0110
0111 This class handles the common tasks of:
0112 - Connecting to the ActiveMQ message broker (and inheriting from stomp.ConnectionListener).
0113 - Communicating with the swf-monitor REST API.
0114 - Running a persistent process with graceful shutdown.
0115 """
0116
0117
0118 WORKFLOW_MESSAGE_TYPES = {
0119 'run_imminent', 'start_run', 'pause_run', 'resume_run', 'end_run',
0120 'stf_gen', 'stf_ready', 'tf_file_registered'
0121 }
0122
0123 def __init__(self, agent_type, subscription_queue, debug=False,
0124 config_path: Optional[str] = None):
0125 """
0126 Initialize BaseAgent.
0127
0128 Args:
0129 agent_type: Type of agent (e.g., 'DATA', 'PROCESSING')
0130 subscription_queue: ActiveMQ destination with explicit prefix.
0131 Must start with '/queue/' (anycast) or '/topic/' (multicast).
0132 Example: '/queue/workflow_control' or '/topic/epictopic'
0133 debug: Enable debug logging
0134 config_path: Path to testbed.toml config file
0135
0136 Raises:
0137 ValueError: If subscription_queue doesn't have /queue/ or /topic/ prefix
0138 """
0139
0140 if not subscription_queue.startswith('/queue/') and not subscription_queue.startswith('/topic/'):
0141 raise ValueError(
0142 f"subscription_queue must start with '/queue/' or '/topic/', got: '{subscription_queue}'. "
0143 f"Use '/queue/{subscription_queue}' for anycast or '/topic/{subscription_queue}' for multicast."
0144 )
0145
0146 self.agent_type = agent_type
0147 self.subscription_queue = subscription_queue
0148 self.DEBUG = debug
0149
0150
0151 if config_path is None:
0152 env_config = os.getenv('SWF_TESTBED_CONFIG')
0153 if env_config:
0154
0155 if not env_config.startswith('workflows/') and '/' not in env_config:
0156 config_path = f'workflows/{env_config}'
0157 else:
0158 config_path = env_config
0159 else:
0160 config_path = 'workflows/testbed.toml'
0161 self.config_path = config_path
0162
0163
0164 self.namespace = None
0165 try:
0166 config = load_testbed_config(config_path=config_path)
0167 self.namespace = config.namespace
0168 logging.info(f"Namespace: {self.namespace}")
0169 except TestbedConfigError as e:
0170 logging.error(f"Configuration error: {e}")
0171 raise
0172
0173
0174 self.monitor_url = (os.getenv('SWF_MONITOR_URL') or '').rstrip('/')
0175 self.api_token = os.getenv('SWF_API_TOKEN')
0176
0177
0178 import requests
0179 self.api = requests.Session()
0180 if self.api_token:
0181 self.api.headers.update({'Authorization': f'Token {self.api_token}'})
0182
0183
0184 import getpass
0185 self.username = getpass.getuser()
0186 agent_id = self.get_next_agent_id()
0187 self.agent_name = f"{self.agent_type.lower()}-agent-{self.username}-{agent_id}"
0188
0189
0190 self.current_execution_id = None
0191 self.current_run_id = None
0192
0193
0194 self.pid = os.getpid()
0195 self.hostname = socket.gethostname()
0196 self.operational_state = 'STARTING'
0197
0198
0199 self.base_url = (os.getenv('SWF_MONITOR_HTTP_URL') or '').rstrip('/')
0200 self.mq_host = os.getenv('ACTIVEMQ_HOST', 'localhost')
0201 self.mq_port = int(os.getenv('ACTIVEMQ_PORT', 61612))
0202 self.mq_user = os.getenv('ACTIVEMQ_USER', 'admin')
0203 self.mq_password = os.getenv('ACTIVEMQ_PASSWORD', 'admin')
0204
0205
0206 self.use_ssl = os.getenv('ACTIVEMQ_USE_SSL', 'False').lower() == 'true'
0207 self.ssl_ca_certs = os.getenv('ACTIVEMQ_SSL_CA_CERTS', '')
0208 self.ssl_cert_file = os.getenv('ACTIVEMQ_SSL_CERT_FILE', '')
0209 self.ssl_key_file = os.getenv('ACTIVEMQ_SSL_KEY_FILE', '')
0210
0211
0212 self.logger = setup_rest_logging('base_agent', self.agent_name, self.base_url)
0213
0214
0215 self.conn = stomp.Connection(
0216 host_and_ports=[(self.mq_host, self.mq_port)],
0217 vhost=self.mq_host,
0218 try_loopback_connect=False,
0219 heartbeats=(30000, 30000),
0220 auto_content_length=False
0221 )
0222
0223
0224 if self.use_ssl:
0225 import ssl
0226 logging.info(f"Configuring SSL connection with CA certs: {self.ssl_ca_certs}")
0227
0228 if self.ssl_ca_certs:
0229
0230 self.conn.transport.set_ssl(
0231 for_hosts=[(self.mq_host, self.mq_port)],
0232 ca_certs=self.ssl_ca_certs,
0233 ssl_version=ssl.PROTOCOL_TLS_CLIENT
0234 )
0235 logging.info("SSL transport configured successfully")
0236 else:
0237 logging.warning("SSL enabled but no CA certificate file specified")
0238
0239 self.conn.set_listener('', self)
0240
0241
0242 if 'localhost' in self.monitor_url or '127.0.0.1' in self.monitor_url:
0243 self.api.verify = False
0244
0245 self.api.proxies = {
0246 'http': '',
0247 'https': ''
0248 }
0249 import urllib3
0250 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
0251
0252 def _log_extra(self, **kwargs):
0253 """
0254 Build extra dict for logging with common context fields.
0255
0256 Automatically includes username, execution_id, and run_id when set.
0257 Subclasses can override to add additional fields, calling super()._log_extra().
0258
0259 Usage:
0260 self.logger.info("Message", extra=self._log_extra(custom_field=value))
0261 """
0262 extra = {'username': self.username}
0263 if self.current_execution_id:
0264 extra['execution_id'] = self.current_execution_id
0265 if self.current_run_id:
0266 extra['run_id'] = self.current_run_id
0267 extra.update(kwargs)
0268 return extra
0269
0270 def run(self):
0271 """
0272 Connects to the message broker and runs the agent's main loop.
0273 """
0274
0275 def signal_handler(signum, frame):
0276 sig_name = signal.Signals(signum).name
0277 logging.info(f"Received {sig_name}, initiating graceful shutdown...")
0278 raise KeyboardInterrupt(f"Received {sig_name}")
0279
0280 signal.signal(signal.SIGTERM, signal_handler)
0281 signal.signal(signal.SIGQUIT, signal_handler)
0282
0283 logging.info(f"Starting {self.agent_name}...")
0284
0285
0286 if not getattr(self, 'mq_connected', False):
0287 max_retries = 3
0288 retry_delay = 5
0289 for attempt in range(1, max_retries + 1):
0290 logging.info(f"Connecting to ActiveMQ at {self.mq_host}:{self.mq_port} (attempt {attempt}/{max_retries})")
0291 try:
0292 self.conn.connect(
0293 self.mq_user,
0294 self.mq_password,
0295 wait=True,
0296 version='1.1',
0297 headers={
0298 'client-id': self.agent_name,
0299 'heart-beat': '30000,30000'
0300 }
0301 )
0302 self.mq_connected = True
0303 break
0304 except Exception as e:
0305 logging.warning(f"Connection attempt {attempt} failed: {e}")
0306 if attempt < max_retries:
0307 logging.info(f"Retrying in {retry_delay} seconds...")
0308 time.sleep(retry_delay)
0309 else:
0310 logging.error(f"Failed to connect after {max_retries} attempts")
0311 raise
0312
0313 try:
0314 self.conn.subscribe(destination=self.subscription_queue, id=1, ack='auto')
0315 logging.info(f"Subscribed to queue: '{self.subscription_queue}'")
0316
0317
0318 self.register_subscriber()
0319
0320
0321 self.set_ready()
0322
0323
0324 try:
0325 self.send_heartbeat()
0326 except Exception:
0327 logging.warning("Initial heartbeat failed — server may be restarting, will retry")
0328
0329 logging.info(f"{self.agent_name} is running. Press Ctrl+C to stop.")
0330 while True:
0331 time.sleep(60)
0332
0333
0334 if not self.mq_connected:
0335 self._attempt_reconnect()
0336
0337 try:
0338 self.send_heartbeat()
0339 except Exception:
0340 logging.warning("Heartbeat failed — server may be restarting, will retry next cycle")
0341
0342 except KeyboardInterrupt:
0343 logging.info(f"Stopping {self.agent_name}...")
0344 except stomp.exception.ConnectFailedException as e:
0345 self.mq_connected = False
0346 logging.error(f"Failed to connect to ActiveMQ: {e}")
0347 logging.error("Please check the connection details and ensure ActiveMQ is running.")
0348 except Exception as e:
0349 self.mq_connected = False
0350 logging.error(f"An unexpected error occurred: {e}")
0351 import traceback
0352 traceback.print_exc()
0353 finally:
0354
0355 try:
0356 self.operational_state = 'EXITED'
0357 self.report_agent_status("EXITED", "Agent shutdown")
0358 except Exception as e:
0359 logging.warning(f"Failed to report exit status: {e}")
0360
0361 if self.conn and self.conn.is_connected():
0362 self.conn.disconnect()
0363 self.mq_connected = False
0364 logging.info("Disconnected from ActiveMQ.")
0365
0366 def on_connected(self, frame):
0367 """Handle successful connection to ActiveMQ."""
0368 logging.info(f"Successfully connected to ActiveMQ: {frame.headers}")
0369 self.mq_connected = True
0370
0371 def on_error(self, frame):
0372 logging.error(f'Received an error from ActiveMQ: body="{frame.body}", headers={frame.headers}, cmd="{frame.cmd}"')
0373 self.mq_connected = False
0374
0375 def on_disconnected(self):
0376 """Handle disconnection from ActiveMQ."""
0377 logging.warning("Disconnected from ActiveMQ - will attempt reconnection")
0378 self.mq_connected = False
0379
0380 try:
0381 self.send_heartbeat()
0382 except Exception as e:
0383 logging.warning(f"Heartbeat failed during disconnect: {e}")
0384
0385 def _attempt_reconnect(self):
0386 """Attempt to reconnect to ActiveMQ."""
0387 if self.mq_connected:
0388 return True
0389
0390 try:
0391 logging.info("Attempting to reconnect to ActiveMQ...")
0392 if self.conn.is_connected():
0393 self.conn.disconnect()
0394
0395 self.conn.connect(
0396 self.mq_user,
0397 self.mq_password,
0398 wait=True,
0399 version='1.1',
0400 headers={
0401 'client-id': self.agent_name,
0402 'heart-beat': '30000,30000'
0403 }
0404 )
0405
0406 self.conn.subscribe(destination=self.subscription_queue, id=1, ack='auto')
0407 self.mq_connected = True
0408 logging.info("Successfully reconnected to ActiveMQ")
0409 return True
0410
0411 except Exception as e:
0412 logging.warning(f"Reconnection attempt failed: {e}")
0413 self.mq_connected = False
0414 return False
0415
0416 def on_message(self, frame):
0417 """
0418 Callback for handling incoming messages.
0419 This method must be implemented by subclasses.
0420 """
0421 raise NotImplementedError("Subclasses must implement on_message")
0422
0423 def log_received_message(self, frame, known_types=None):
0424 """
0425 Helper method to log received messages with type information and namespace filtering.
0426 Agents can call this at the start of their on_message method.
0427
0428 Namespace filtering:
0429 - If agent has namespace AND message has different namespace → returns (None, None)
0430 - If message has no namespace → processes it (backward compat)
0431 - If agent has no namespace → processes all messages (backward compat)
0432
0433 Args:
0434 frame: The STOMP message frame
0435 known_types: Optional set/list of known message types (defaults to WORKFLOW_MESSAGE_TYPES)
0436
0437 Returns:
0438 tuple: (message_data, msg_type) for convenience, or (None, None) if filtered
0439
0440 Raises:
0441 RuntimeError: If message parsing fails
0442 """
0443 if known_types is None:
0444 known_types = self.WORKFLOW_MESSAGE_TYPES
0445
0446 try:
0447 message_data = json.loads(frame.body)
0448 msg_type = message_data.get('msg_type', 'unknown')
0449
0450
0451 msg_namespace = message_data.get('namespace')
0452 if self.namespace and msg_namespace and msg_namespace != self.namespace:
0453 logging.debug(
0454 f"Ignoring message from namespace '{msg_namespace}' (ours: '{self.namespace}')"
0455 )
0456 return None, None
0457
0458 if msg_type not in known_types:
0459 logging.info(f"{self.agent_type} agent received unknown message type: {msg_type}", extra={"msg_type": msg_type})
0460 else:
0461 logging.info(f"{self.agent_type} agent received message: {msg_type}")
0462
0463 return message_data, msg_type
0464 except json.JSONDecodeError as e:
0465 logging.error(f"CRITICAL: Failed to parse message JSON: {e}")
0466 raise RuntimeError(f"Message parsing failed - agent cannot continue: {e}") from e
0467
0468
0469
0470
0471
0472 def set_processing(self):
0473 """
0474 Declare that the agent is actively doing work.
0475
0476 Call this when the agent begins a unit of work (which may span multiple
0477 messages). The agent will report PROCESSING state in heartbeats until
0478 set_ready() is called.
0479
0480 Example:
0481 def on_message(self, frame):
0482 message_data, msg_type = self.log_received_message(frame)
0483 if msg_type == 'start_run':
0484 self.set_processing()
0485 # ... do work ...
0486 """
0487 self.operational_state = 'PROCESSING'
0488 logging.info(f"{self.agent_name} state -> PROCESSING")
0489
0490 def set_ready(self):
0491 """
0492 Declare that the agent is idle, waiting for work.
0493
0494 Call this when a unit of work is complete and the agent is ready
0495 to receive new work.
0496
0497 Example:
0498 def on_message(self, frame):
0499 message_data, msg_type = self.log_received_message(frame)
0500 if msg_type == 'end_run':
0501 # ... finalize work ...
0502 self.set_ready()
0503 """
0504 self.operational_state = 'READY'
0505 logging.info(f"{self.agent_name} state -> READY")
0506
0507 def processing(self):
0508 """
0509 Context manager for bounded processing work.
0510
0511 Use this when a unit of work is contained within a single code block.
0512 Automatically sets PROCESSING on entry and READY on exit.
0513
0514 Example:
0515 def on_message(self, frame):
0516 message_data, msg_type = self.log_received_message(frame)
0517 if msg_type == 'process_stf':
0518 with self.processing():
0519 # ... do bounded work ...
0520 # automatically back to READY
0521 """
0522 agent = self
0523
0524 class ProcessingContext:
0525 def __enter__(self):
0526 agent.set_processing()
0527 return self
0528
0529 def __exit__(self, exc_type, exc_val, exc_tb):
0530 agent.set_ready()
0531 return False
0532
0533 return ProcessingContext()
0534
0535 def get_next_agent_id(self):
0536 """Get the next agent ID from persistent state API."""
0537 return get_next_agent_id(self.monitor_url, self.api, logging.getLogger(__name__))
0538
0539 def send_message(self, destination, message_body):
0540 """
0541 Sends a JSON message to a specific destination.
0542
0543 Args:
0544 destination: ActiveMQ destination with explicit prefix.
0545 Must start with '/queue/' (anycast) or '/topic/' (multicast).
0546 message_body: Dict to send as JSON. 'sender' and 'namespace' auto-injected.
0547
0548 Raises:
0549 ValueError: If destination doesn't have /queue/ or /topic/ prefix
0550 """
0551
0552 if not destination.startswith('/queue/') and not destination.startswith('/topic/'):
0553 raise ValueError(
0554 f"destination must start with '/queue/' or '/topic/', got: '{destination}'. "
0555 f"Use '/queue/{destination}' for anycast or '/topic/{destination}' for multicast."
0556 )
0557
0558
0559 message_body['sender'] = self.agent_name
0560 if self.namespace:
0561 message_body['namespace'] = self.namespace
0562 else:
0563 logging.warning(
0564 f"Sending message without namespace (msg_type={message_body.get('msg_type', 'unknown')}). "
0565 "Configure namespace in testbed.toml to enable namespace filtering."
0566 )
0567
0568 try:
0569 self.conn.send(body=json.dumps(message_body), destination=destination)
0570 logging.info(f"Sent message to '{destination}': {message_body}")
0571 except Exception as e:
0572 logging.error(f"Failed to send message to '{destination}': {e}")
0573
0574
0575 if any(error_type in str(e).lower() for error_type in ['ssl', 'eof', 'connection', 'broken pipe']):
0576 logging.warning("Connection error detected - attempting recovery")
0577 self.mq_connected = False
0578 time.sleep(1)
0579 if self._attempt_reconnect():
0580 try:
0581 self.conn.send(body=json.dumps(message_body), destination=destination)
0582 logging.info(f"Message sent successfully after reconnection to '{destination}'")
0583 except Exception as retry_e:
0584 logging.error(f"Retry failed after reconnection: {retry_e}")
0585 else:
0586 logging.error("Reconnection failed - message lost")
0587
0588 def _api_request(self, method, endpoint, json_data=None):
0589 """
0590 Helper method to make a request to the monitor API.
0591 Retries on transient failures (connection errors, 502/503/504).
0592 Fails immediately on 4xx and redirects.
0593 """
0594 url = f"{self.monitor_url}/api{endpoint}"
0595 try:
0596 response = api_request_with_retry(
0597 method, url, session=self.api, logger=logging.getLogger(__name__),
0598 json=json_data, allow_redirects=False,
0599 )
0600 if 300 <= response.status_code < 400:
0601 loc = response.headers.get('Location', 'unknown')
0602 msg = (f"API redirect (HTTP {response.status_code}) to {loc}. "
0603 f"If behind Apache/OIDC, ensure API requests aren't redirected and Authorization is forwarded.")
0604 logging.error(msg)
0605 raise APIError(msg, response=response, url=url, method=method.upper())
0606 response.raise_for_status()
0607 return response.json()
0608 except requests.exceptions.RequestException as e:
0609 if hasattr(e, 'response') and e.response is not None and e.response.status_code == 400:
0610 response_text = e.response.text.lower()
0611 if "already exists" in response_text and "subscriber" in response_text:
0612 logging.info(f"Resource already exists (normal): {method.upper()} {url}")
0613 return {"status": "already_exists"}
0614
0615 logging.error(f"API request FAILED: {method.upper()} {url} - {e}")
0616 if hasattr(e, 'response') and e.response is not None:
0617 logging.error(f"Response status: {e.response.status_code}")
0618 logging.error(f"Response body: {e.response.text}")
0619 raise APIError(f"Critical API failure - agent cannot continue: {method.upper()} {url} - {e}",
0620 response=getattr(e, 'response', None), url=url, method=method.upper()) from e
0621
0622 def send_heartbeat(self):
0623 """Registers the agent and sends a heartbeat to the monitor."""
0624 if self.DEBUG:
0625 logging.info("Sending heartbeat to monitor...")
0626
0627
0628 status = "OK" if getattr(self, 'mq_connected', False) else "WARNING"
0629
0630
0631 mq_status = "connected" if getattr(self, 'mq_connected', False) else "disconnected"
0632 description = f"{self.agent_type} agent. MQ: {mq_status}"
0633
0634 payload = {
0635 "instance_name": self.agent_name,
0636 "agent_type": self.agent_type,
0637 "status": status,
0638 "description": description,
0639 "mq_connected": getattr(self, 'mq_connected', False),
0640 "pid": self.pid,
0641 "hostname": self.hostname,
0642 "operational_state": self.operational_state,
0643 }
0644
0645
0646 if self.namespace:
0647 payload["namespace"] = self.namespace
0648
0649 result = self._api_request('post', '/systemagents/heartbeat/', payload)
0650 if result:
0651 if self.DEBUG:
0652 logging.info(f"Heartbeat sent successfully. Status: {status}, MQ: {mq_status}")
0653 else:
0654 logging.warning("Failed to send heartbeat to monitor")
0655
0656 def send_enhanced_heartbeat(self, workflow_metadata=None):
0657 """Send heartbeat with optional workflow metadata."""
0658 if self.DEBUG:
0659 logging.info("Sending heartbeat to monitor...")
0660
0661
0662 status = "OK" if getattr(self, 'mq_connected', False) else "WARNING"
0663
0664
0665 mq_status = "connected" if getattr(self, 'mq_connected', False) else "disconnected"
0666 description_parts = [f"{self.agent_type} agent", f"MQ: {mq_status}"]
0667
0668
0669 if workflow_metadata:
0670 for key, value in workflow_metadata.items():
0671 description_parts.append(f"{key}: {value}")
0672
0673 description = ". ".join(description_parts)
0674
0675 payload = {
0676 "instance_name": self.agent_name,
0677 "agent_type": self.agent_type,
0678 "status": status,
0679 "description": description,
0680 "mq_connected": getattr(self, 'mq_connected', False),
0681 "pid": self.pid,
0682 "hostname": self.hostname,
0683 "operational_state": self.operational_state,
0684
0685 "workflow_enabled": True if workflow_metadata else False,
0686 "current_stf_count": workflow_metadata.get('active_tasks', 0) if workflow_metadata else 0,
0687 "total_stf_processed": workflow_metadata.get('completed_tasks', 0) if workflow_metadata else 0
0688 }
0689
0690
0691 if self.namespace:
0692 payload["namespace"] = self.namespace
0693
0694 result = self._api_request('post', '/systemagents/heartbeat/', payload)
0695 if result:
0696 if self.DEBUG:
0697 logging.info(f"Heartbeat sent successfully")
0698 return True
0699 else:
0700 logging.warning("Failed to send heartbeat to monitor")
0701 return False
0702
0703 def report_agent_status(self, status, message=None, error_details=None):
0704 """Report agent status change to monitor."""
0705 logging.info(f"Reporting agent status: {status}")
0706
0707 description_parts = [f"{self.agent_type} agent"]
0708 if message:
0709 description_parts.append(message)
0710 if error_details:
0711 description_parts.append(f"Error: {error_details}")
0712
0713 payload = {
0714 "instance_name": self.agent_name,
0715 "agent_type": self.agent_type,
0716 "status": status,
0717 "description": ". ".join(description_parts),
0718 "mq_connected": getattr(self, 'mq_connected', False),
0719 "pid": self.pid,
0720 "hostname": self.hostname,
0721 "operational_state": self.operational_state,
0722 }
0723
0724
0725 if self.namespace:
0726 payload["namespace"] = self.namespace
0727
0728 result = self._api_request('post', '/systemagents/heartbeat/', payload)
0729 if result:
0730 logging.info(f"Status reported successfully: {status}")
0731 return True
0732 else:
0733 logging.warning(f"Failed to report status: {status}")
0734 return False
0735
0736 def check_monitor_health(self):
0737 """Check if monitor API is available."""
0738 try:
0739 result = self._api_request('get', '/systemagents/', None)
0740 if result is not None:
0741 logging.info("Monitor API is healthy")
0742 return True
0743 else:
0744 logging.warning("Monitor API is not responding")
0745 return False
0746 except Exception as e:
0747 logging.error(f"Monitor health check failed: {e}")
0748 return False
0749
0750 def call_monitor_api(self, method, endpoint, json_data=None):
0751 """Generic monitor API call method for agent-specific implementations."""
0752 return self._api_request(method.lower(), endpoint, json_data)
0753
0754 def register_subscriber(self):
0755 """Register this agent as a subscriber to its ActiveMQ queue."""
0756 logging.info(f"Registering subscriber for queue '{self.subscription_queue}'...")
0757
0758 subscriber_data = {
0759 "subscriber_name": f"{self.agent_name}-{self.subscription_queue}",
0760 "description": f"{self.agent_type} agent subscribing to {self.subscription_queue}",
0761 "is_active": True,
0762 "fraction": 1.0
0763 }
0764
0765 try:
0766 result = self._api_request('post', '/subscribers/', subscriber_data)
0767 if result:
0768 if result.get('status') == 'already_exists':
0769 logging.info(f"Subscriber already registered: {subscriber_data['subscriber_name']}")
0770 return True
0771 else:
0772 logging.info(f"Subscriber registered successfully: {result.get('subscriber_name')}")
0773 return True
0774 else:
0775 logging.error("Failed to register subscriber")
0776 return False
0777 except Exception as e:
0778
0779 logging.error(f"Critical subscriber registration failure: {e}")
0780 raise e