Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 #!/usr/bin/env python3
0002 """
0003 User Agent Manager - A lightweight per-user daemon for testbed control.
0004 
0005 Listens on /queue/agent_control.<username> for commands from MCP:
0006 - start_testbed: Start agents and workflow runner
0007 - stop_testbed: Stop all agents
0008 - status: Report current status
0009 
0010 Sends periodic heartbeats so MCP can check if it's alive.
0011 """
0012 
0013 import getpass
0014 import json
0015 import os
0016 import signal
0017 import subprocess
0018 import sys
0019 import time
0020 import tomllib
0021 from datetime import datetime
0022 from pathlib import Path
0023 
0024 import stomp
0025 from swf_common_lib.rest_logging import setup_rest_logging
0026 
0027 # Heartbeat interval in seconds
0028 HEARTBEAT_INTERVAL = 30
0029 
0030 # Supervisord config for agents
0031 AGENTS_CONF = 'agents.supervisord.conf'
0032 
0033 # Map testbed.toml agent names to supervisord program names
0034 AGENT_PROGRAM_MAP = {
0035     'data': 'example-data-agent',
0036     'processing': 'example-processing-agent',
0037     'fastmon': 'example-fastmon-agent',
0038     'fast_processing': 'fast-processing-agent',
0039 }
0040 
0041 
0042 class UserAgentManager(stomp.ConnectionListener):
0043     """
0044     Lightweight agent manager for a single user.
0045 
0046     Manages the user's testbed agents via supervisord.
0047     """
0048 
0049     def __init__(self, testbed_dir: Path = None):
0050         self.username = getpass.getuser()
0051         self.control_queue = f'/queue/agent_control.{self.username}'
0052         self.testbed_dir = testbed_dir or Path(__file__).parent.parent.parent
0053 
0054         # ActiveMQ connection settings from environment
0055         self.mq_host = os.getenv('ACTIVEMQ_HOST', 'localhost')
0056         self.mq_port = int(os.getenv('ACTIVEMQ_PORT', 61612))
0057         self.mq_user = os.getenv('ACTIVEMQ_USER', 'admin')
0058         self.mq_password = os.getenv('ACTIVEMQ_PASSWORD', 'admin')
0059         self.use_ssl = os.getenv('ACTIVEMQ_USE_SSL', 'false').lower() == 'true'
0060         self.ssl_ca_certs = os.getenv('ACTIVEMQ_SSL_CA_CERTS', '')
0061 
0062         # Monitor API for heartbeats (use production monitor)
0063         self.monitor_url = os.getenv('SWF_MONITOR_URL', 'https://pandaserver02.sdcc.bnl.gov/swf-monitor')
0064         self.api_token = os.getenv('SWF_API_TOKEN')
0065 
0066         # Set up API session with auth token (like BaseAgent)
0067         import requests
0068         self.api = requests.Session()
0069         if self.api_token:
0070             self.api.headers.update({'Authorization': f'Token {self.api_token}'})
0071         self.api.verify = False  # Allow self-signed certs
0072 
0073         # State
0074         self.running = True
0075         self.last_heartbeat = None
0076         self.agents_running = False
0077         self.namespace = None  # Set when config is loaded
0078         self.config = None  # Current testbed config
0079 
0080         # Set up REST logging (must be before anything that uses self.logger)
0081         self.instance_name = f'agent-manager-{self.username}'
0082         base_url = os.getenv('SWF_MONITOR_HTTP_URL', 'http://localhost:8002')
0083         self.logger = setup_rest_logging('agent_manager', self.instance_name, base_url)
0084 
0085         # Auto-load config from SWF_TESTBED_CONFIG env var on startup
0086         env_config = os.getenv('SWF_TESTBED_CONFIG')
0087         if env_config:
0088             self.load_config(env_config)
0089 
0090         # Set up connection (matching BaseAgent configuration)
0091         self.conn = stomp.Connection(
0092             host_and_ports=[(self.mq_host, self.mq_port)],
0093             vhost=self.mq_host,
0094             try_loopback_connect=False,
0095             heartbeats=(30000, 30000),
0096             auto_content_length=False
0097         )
0098 
0099         # Configure SSL if enabled
0100         if self.use_ssl and self.ssl_ca_certs:
0101             import ssl
0102             self.conn.transport.set_ssl(
0103                 for_hosts=[(self.mq_host, self.mq_port)],
0104                 ca_certs=self.ssl_ca_certs,
0105                 ssl_version=ssl.PROTOCOL_TLS_CLIENT
0106             )
0107 
0108         self.conn.set_listener('', self)
0109 
0110         # Signal handling
0111         signal.signal(signal.SIGINT, self._signal_handler)
0112         signal.signal(signal.SIGTERM, self._signal_handler)
0113         signal.signal(signal.SIGUSR1, self._sigusr1_handler)
0114 
0115     def _signal_handler(self, signum, frame):
0116         """Handle shutdown signals."""
0117         self.logger.info(f"\nReceived signal {signum}, shutting down...")
0118         self.running = False
0119 
0120     def _sigusr1_handler(self, signum, frame):
0121         """SIGUSR1 triggers immediate heartbeat refresh."""
0122         self.send_heartbeat()
0123 
0124     def connect(self):
0125         """Connect to ActiveMQ and subscribe to control queue."""
0126         self.logger.info(f"Connecting to ActiveMQ at {self.mq_host}:{self.mq_port}...")
0127         self.conn.connect(self.mq_user, self.mq_password, wait=True)
0128 
0129         self.logger.info(f"Subscribing to {self.control_queue}...")
0130         self.conn.subscribe(destination=self.control_queue, id='control', ack='auto')
0131 
0132         self.logger.info(f"User Agent Manager ready for {self.username}")
0133 
0134     def disconnect(self):
0135         """Disconnect from ActiveMQ."""
0136         if self.conn.is_connected():
0137             self.conn.disconnect()
0138 
0139     def on_message(self, frame):
0140         """Handle incoming control messages."""
0141         try:
0142             message = json.loads(frame.body)
0143             command = message.get('command')
0144 
0145             self.logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] Received command: {command}")
0146 
0147             if command == 'start_testbed':
0148                 config_name = message.get('config_name')
0149                 if not self.handle_start_testbed(config_name):
0150                     self.logger.error(f"start_testbed FAILED (config: {config_name or 'default'})")
0151             elif command == 'stop_testbed':
0152                 if not self.handle_stop_testbed():
0153                     self.logger.error("stop_testbed FAILED")
0154             elif command == 'restart':
0155                 self.handle_restart()
0156             elif command == 'status':
0157                 self.handle_status(message.get('reply_to'))
0158             elif command == 'ping':
0159                 self.handle_ping(message.get('reply_to'))
0160             else:
0161                 self.logger.info(f"Unknown command: {command}")
0162 
0163         except Exception as e:
0164             self.logger.error(f"Error processing message: {e}")
0165 
0166     def on_error(self, frame):
0167         """Handle connection errors."""
0168         self.logger.error(f"STOMP error: {frame.body}")
0169 
0170     def on_disconnected(self):
0171         """Handle disconnection."""
0172         if self.running:
0173             self.logger.info("Disconnected from ActiveMQ, attempting reconnect...")
0174             time.sleep(5)
0175             try:
0176                 self.connect()
0177             except Exception as e:
0178                 self.logger.error(f"Reconnect failed: {e}")
0179 
0180     def load_config(self, config_name: str = None) -> dict:
0181         """Load testbed config and update namespace.
0182 
0183         Args:
0184             config_name: Config file name (default: from SWF_TESTBED_CONFIG env var,
0185                          or workflows/testbed.toml). Can be just name (looked up
0186                          in workflows/) or relative path.
0187 
0188         Returns:
0189             Parsed config dict
0190         """
0191         if config_name is None:
0192             config_name = os.getenv('SWF_TESTBED_CONFIG', 'workflows/testbed.toml')
0193 
0194         if '/' in config_name:
0195             config_path = self.testbed_dir / config_name
0196         else:
0197             if not config_name.endswith('.toml'):
0198                 config_name = f'{config_name}.toml'
0199             config_path = self.testbed_dir / 'workflows' / config_name
0200 
0201         if not config_path.exists():
0202             self.logger.error(f"Config not found: {config_path}")
0203             return {}
0204 
0205         self.logger.info(f"Loading config: {config_path}")
0206         with open(config_path, 'rb') as f:
0207             self.config = tomllib.load(f)
0208 
0209         # Update namespace from config
0210         self.namespace = self.config.get('testbed', {}).get('namespace')
0211         if self.namespace:
0212             self.logger.info(f"Namespace: {self.namespace}")
0213         else:
0214             self.logger.warning("No namespace in config")
0215 
0216         return self.config
0217 
0218     def get_enabled_agents(self) -> list:
0219         """Get list of supervisord program names for enabled agents."""
0220         if not self.config:
0221             return []
0222 
0223         agents_config = self.config.get('agents', {})
0224         enabled = []
0225 
0226         for agent_name, agent_conf in agents_config.items():
0227             if agent_conf.get('enabled', False):
0228                 program_name = AGENT_PROGRAM_MAP.get(agent_name)
0229                 if program_name:
0230                     enabled.append(program_name)
0231                 else:
0232                     self.logger.warning(f"Unknown agent '{agent_name}' - no program mapping")
0233 
0234         return enabled
0235 
0236     def handle_start_testbed(self, config_name: str = None):
0237         """Start the testbed agents and workflow runner.
0238 
0239         Refuses to start if agents are already running. User must call
0240         stop_testbed first to ensure clean slate.
0241         """
0242         self.logger.info(f"Starting testbed (config: {config_name or 'current'})...")
0243 
0244         # Load config if specified, otherwise use already-loaded config
0245         if config_name:
0246             self.load_config(config_name)
0247         elif not self.config:
0248             self.load_config()
0249 
0250         # Check if any agents are already running - refuse if so
0251         # Do this BEFORE restarting supervisord
0252         running_agents = self._get_running_agents()
0253         if running_agents:
0254             self.logger.error(
0255                 f"Cannot start: agents already running: {running_agents}. "
0256                 "Run stop_testbed first to ensure clean slate."
0257             )
0258             return False
0259 
0260         # Restart supervisord to pick up current env vars and config
0261         if not self._restart_supervisord():
0262             self.logger.error("Failed to restart supervisord")
0263             return False
0264 
0265         # Start workflow runner
0266         if not self._start_program('workflow-runner'):
0267             self.logger.error("Failed to start workflow-runner")
0268             return False
0269 
0270         # Start enabled agents from config
0271         enabled_agents = self.get_enabled_agents()
0272         if not enabled_agents:
0273             self.logger.warning("No agents enabled in config")
0274 
0275         failed = []
0276         for agent in enabled_agents:
0277             if not self._start_program(agent):
0278                 failed.append(agent)
0279 
0280         if failed:
0281             self.logger.error(f"Failed to start agents: {failed}")
0282             return False
0283 
0284         self.agents_running = True
0285         self.logger.info("Testbed started")
0286         return True
0287 
0288     def handle_stop_testbed(self):
0289         """Stop all testbed agents."""
0290         self.logger.info("Stopping testbed...")
0291         supervisorctl = self._get_venv_bin('supervisorctl')
0292 
0293         result = subprocess.run(
0294             [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'stop', 'all'],
0295             capture_output=True,
0296             text=True,
0297             cwd=self.testbed_dir
0298         )
0299 
0300         if result.returncode not in [0, 4]:  # 4 = can't connect (already stopped)
0301             self.logger.error(f"Error stopping testbed: {result.stderr}")
0302             return False
0303 
0304         self.agents_running = False
0305         self.logger.info("Testbed stopped")
0306         return True
0307 
0308     def handle_restart(self):
0309         """Restart the agent manager with fresh code."""
0310         self.logger.info("Restarting agent manager...")
0311         self.handle_stop_testbed()
0312 
0313         # Spawn new agent manager process
0314         testbed = self._get_venv_bin('testbed')
0315         subprocess.Popen(
0316             ['nohup', testbed, 'agent-manager'],
0317             stdout=open('/tmp/agent-manager.log', 'a'),
0318             stderr=subprocess.STDOUT,
0319             cwd=self.testbed_dir,
0320             start_new_session=True
0321         )
0322 
0323         self.logger.info("New agent manager spawned, exiting")
0324         # Clean disconnect before exit
0325         self.running = False
0326         try:
0327             self.disconnect()
0328         except Exception:
0329             pass
0330         os._exit(0)
0331 
0332     def handle_status(self, reply_to: str = None):
0333         """Get status of testbed agents."""
0334         supervisorctl = self._get_venv_bin('supervisorctl')
0335         result = subprocess.run(
0336             [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'status'],
0337             capture_output=True,
0338             text=True,
0339             cwd=self.testbed_dir
0340         )
0341 
0342         status = {
0343             'username': self.username,
0344             'agents_running': self.agents_running,
0345             'supervisord_status': result.stdout,
0346             'timestamp': datetime.now().isoformat()
0347         }
0348 
0349         if reply_to:
0350             self.conn.send(destination=reply_to, body=json.dumps(status))
0351 
0352         return status
0353 
0354     def handle_ping(self, reply_to: str = None):
0355         """Respond to ping (liveness check)."""
0356         response = {
0357             'status': 'alive',
0358             'username': self.username,
0359             'timestamp': datetime.now().isoformat()
0360         }
0361 
0362         if reply_to:
0363             self.conn.send(destination=reply_to, body=json.dumps(response))
0364 
0365         return response
0366 
0367     def _get_venv_bin(self, cmd: str) -> str:
0368         """Get full path to command in venv bin directory."""
0369         venv_bin = self.testbed_dir / '.venv' / 'bin' / cmd
0370         if venv_bin.exists():
0371             return str(venv_bin)
0372         # Fallback to bare command (may work if venv is activated)
0373         return cmd
0374 
0375     def _ensure_supervisord(self) -> bool:
0376         """Start supervisord if not running."""
0377         conf_path = self.testbed_dir / AGENTS_CONF
0378         supervisorctl = self._get_venv_bin('supervisorctl')
0379         supervisord = self._get_venv_bin('supervisord')
0380 
0381         if not conf_path.exists():
0382             self.logger.error(f"{AGENTS_CONF} not found in {self.testbed_dir}")
0383             return False
0384 
0385         # Check if already running
0386         result = subprocess.run(
0387             [supervisorctl, '-c', str(conf_path), 'status'],
0388             capture_output=True,
0389             text=True,
0390             cwd=self.testbed_dir
0391         )
0392 
0393         if result.returncode == 4:  # Can't connect - not running
0394             self.logger.info("Starting supervisord...")
0395             start_result = subprocess.run(
0396                 [supervisord, '-c', str(conf_path)],
0397                 capture_output=True,
0398                 text=True,
0399                 cwd=self.testbed_dir
0400             )
0401             if start_result.returncode != 0:
0402                 self.logger.error(f"Error starting supervisord: {start_result.stderr}")
0403                 return False
0404             time.sleep(1)
0405 
0406         return True
0407 
0408     def _restart_supervisord(self) -> bool:
0409         """Restart supervisord to pick up current environment and config.
0410 
0411         Always restarts to ensure fresh env vars (like SWF_TESTBED_CONFIG) are available.
0412         Called after verifying no agents are running.
0413         """
0414         conf_path = self.testbed_dir / AGENTS_CONF
0415         supervisorctl = self._get_venv_bin('supervisorctl')
0416         supervisord = self._get_venv_bin('supervisord')
0417 
0418         if not conf_path.exists():
0419             self.logger.error(f"{AGENTS_CONF} not found in {self.testbed_dir}")
0420             return False
0421 
0422         # Check if supervisord is running
0423         result = subprocess.run(
0424             [supervisorctl, '-c', str(conf_path), 'status'],
0425             capture_output=True,
0426             text=True,
0427             cwd=self.testbed_dir
0428         )
0429 
0430         if result.returncode != 4:  # Connected - shutdown first
0431             self.logger.info("Restarting supervisord to pick up current environment...")
0432             subprocess.run(
0433                 [supervisorctl, '-c', str(conf_path), 'shutdown'],
0434                 capture_output=True,
0435                 cwd=self.testbed_dir
0436             )
0437             time.sleep(1)
0438 
0439         # Start fresh
0440         self.logger.info("Starting supervisord...")
0441         start_result = subprocess.run(
0442             [supervisord, '-c', str(conf_path)],
0443             capture_output=True,
0444             text=True,
0445             cwd=self.testbed_dir
0446         )
0447 
0448         if start_result.returncode != 0:
0449             self.logger.error(f"Error starting supervisord: {start_result.stderr}")
0450             return False
0451 
0452         time.sleep(1)
0453 
0454         # Verify supervisord is actually responding
0455         verify = subprocess.run(
0456             [supervisorctl, '-c', str(conf_path), 'status'],
0457             capture_output=True,
0458             text=True,
0459             cwd=self.testbed_dir
0460         )
0461         if verify.returncode == 4:
0462             self.logger.error(
0463                 f"Supervisord started (exit 0) but not responding. "
0464                 f"stderr: {start_result.stderr.strip()}"
0465             )
0466             return False
0467 
0468         return True
0469 
0470     def _start_program(self, program_name: str) -> bool:
0471         """Start a supervisord program."""
0472         supervisorctl = self._get_venv_bin('supervisorctl')
0473         result = subprocess.run(
0474             [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'start', program_name],
0475             capture_output=True,
0476             text=True,
0477             cwd=self.testbed_dir
0478         )
0479 
0480         if result.returncode == 0 or 'already started' in result.stdout.lower():
0481             self.logger.info(f"  {program_name}: started")
0482             return True
0483         else:
0484             self.logger.error(f"{program_name}: failed - {result.stderr.strip()}")
0485             return False
0486 
0487     def _get_running_agents(self) -> list:
0488         """Get list of currently running agent program names."""
0489         supervisorctl = self._get_venv_bin('supervisorctl')
0490         result = subprocess.run(
0491             [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'status'],
0492             capture_output=True,
0493             text=True,
0494             cwd=self.testbed_dir
0495         )
0496 
0497         if result.returncode == 4:  # Can't connect - supervisord not running
0498             return []
0499 
0500         running = []
0501         for line in result.stdout.strip().split('\n'):
0502             if 'RUNNING' in line:
0503                 # Line format: "program-name   RUNNING   pid 12345, uptime 0:00:05"
0504                 program_name = line.split()[0]
0505                 running.append(program_name)
0506 
0507         return running
0508 
0509     def _reread_supervisord_config(self) -> bool:
0510         """Reread supervisord config to pick up changes."""
0511         supervisorctl = self._get_venv_bin('supervisorctl')
0512         result = subprocess.run(
0513             [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'reread'],
0514             capture_output=True,
0515             text=True,
0516             cwd=self.testbed_dir
0517         )
0518 
0519         if result.returncode == 0:
0520             if result.stdout.strip():
0521                 self.logger.info(f"Config changes detected: {result.stdout.strip()}")
0522             return True
0523         else:
0524             self.logger.warning(f"Config reread failed: {result.stderr.strip()}")
0525             return False
0526 
0527     def _check_supervisord_health(self) -> dict:
0528         """Verify supervisord health: responding, genuinely idle, or zombie."""
0529         supervisorctl = self._get_venv_bin('supervisorctl')
0530         result = subprocess.run(
0531             [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'status'],
0532             capture_output=True,
0533             text=True,
0534             cwd=self.testbed_dir
0535         )
0536 
0537         if result.returncode == 4:
0538             # Not responding — check for zombie process
0539             try:
0540                 check = subprocess.run(
0541                     ['pgrep', '-f', f'supervisord.*{AGENTS_CONF}'],
0542                     capture_output=True, text=True
0543                 )
0544                 if check.returncode == 0 and check.stdout.strip():
0545                     pid = check.stdout.strip().split()[0]
0546                     return {'healthy': False, 'error': f'not responding but stale process exists (PID {pid})'}
0547             except Exception:
0548                 pass
0549             return {'healthy': True, 'state': 'idle'}
0550 
0551         running = [
0552             line.split()[0] for line in result.stdout.strip().split('\n')
0553             if 'RUNNING' in line
0554         ]
0555         return {'healthy': True, 'state': 'running', 'running_agents': running}
0556 
0557     def send_heartbeat(self):
0558         """Send heartbeat to monitor API (using authenticated session like BaseAgent)."""
0559         try:
0560             sv_health = self._check_supervisord_health()
0561 
0562             desc_parts = [f'Agent manager for {self.username}']
0563             if self.namespace:
0564                 desc_parts.append(f'namespace: {self.namespace}')
0565             desc_parts.append('MQ: connected')
0566 
0567             if sv_health['healthy']:
0568                 status = 'OK'
0569                 state = sv_health.get('state', 'running')
0570                 desc_parts.append(f"supervisord: {state}")
0571             else:
0572                 status = 'ERROR'
0573                 desc_parts.append(f"supervisord: {sv_health['error']}")
0574                 self.logger.error(f"Supervisord health check failed: {sv_health['error']}")
0575 
0576             data = {
0577                 'instance_name': f'agent-manager-{self.username}',
0578                 'agent_type': 'agent_manager',
0579                 'status': status,
0580                 'operational_state': 'READY',
0581                 'namespace': self.namespace,
0582                 'pid': os.getpid(),
0583                 'hostname': os.uname().nodename,
0584                 'description': '. '.join(desc_parts),
0585                 'metadata': {'supervisord_healthy': sv_health['healthy']},
0586             }
0587 
0588             response = self.api.post(
0589                 f"{self.monitor_url}/api/systemagents/heartbeat/",
0590                 json=data,
0591                 timeout=5,
0592             )
0593 
0594             if response.ok:
0595                 self.last_heartbeat = datetime.now()
0596 
0597         except Exception as e:
0598             self.logger.error(f"Heartbeat failed: {e}")
0599 
0600     def run(self):
0601         """Main run loop."""
0602         self.connect()
0603 
0604         # Send immediate heartbeat so MCP can detect us quickly
0605         self.send_heartbeat()
0606         last_heartbeat_time = time.time()
0607 
0608         self.logger.info(f"Listening for commands on {self.control_queue}")
0609         self.logger.info("Press Ctrl+C to stop")
0610 
0611         while self.running:
0612             try:
0613                 # Send periodic heartbeat
0614                 now = time.time()
0615                 if now - last_heartbeat_time >= HEARTBEAT_INTERVAL:
0616                     self.send_heartbeat()
0617                     last_heartbeat_time = now
0618 
0619                 time.sleep(1)
0620 
0621             except KeyboardInterrupt:
0622                 break
0623 
0624         self.logger.info("Shutting down...")
0625         self._send_exit_heartbeat()
0626         self.disconnect()
0627 
0628     def _send_exit_heartbeat(self):
0629         """Send final heartbeat marking this agent as EXITED."""
0630         try:
0631             data = {
0632                 'instance_name': f'agent-manager-{self.username}',
0633                 'agent_type': 'agent_manager',
0634                 'status': 'EXITED',
0635                 'operational_state': 'EXITED',
0636                 'namespace': self.namespace,
0637                 'pid': os.getpid(),
0638                 'hostname': os.uname().nodename,
0639                 'description': f'Agent manager for {self.username}. Shut down.',
0640             }
0641             self.api.post(
0642                 f"{self.monitor_url}/api/systemagents/heartbeat/",
0643                 json=data,
0644                 timeout=5,
0645             )
0646             self.logger.info("Exit heartbeat sent")
0647         except Exception as e:
0648             self.logger.error(f"Failed to send exit heartbeat: {e}")
0649 
0650 
0651 def main():
0652     """Entry point."""
0653     # Load environment
0654     from pathlib import Path
0655     env_file = Path.home() / '.env'
0656     if env_file.exists():
0657         with open(env_file) as f:
0658             for line in f:
0659                 line = line.strip()
0660                 if line and not line.startswith('#') and '=' in line:
0661                     if line.startswith('export '):
0662                         line = line[7:]
0663                     key, value = line.split('=', 1)
0664                     os.environ[key] = value.strip('"\'')
0665 
0666     manager = UserAgentManager()
0667     manager.run()
0668 
0669 
0670 if __name__ == '__main__':
0671     main()