File indexing completed on 2026-04-27 07:41:45
0001
0002 """
0003 User Agent Manager - A lightweight per-user daemon for testbed control.
0004
0005 Listens on /queue/agent_control.<username> for commands from MCP:
0006 - start_testbed: Start agents and workflow runner
0007 - stop_testbed: Stop all agents
0008 - status: Report current status
0009
0010 Sends periodic heartbeats so MCP can check if it's alive.
0011 """
0012
0013 import getpass
0014 import json
0015 import os
0016 import signal
0017 import subprocess
0018 import sys
0019 import time
0020 import tomllib
0021 from datetime import datetime
0022 from pathlib import Path
0023
0024 import stomp
0025 from swf_common_lib.rest_logging import setup_rest_logging
0026
0027
0028 HEARTBEAT_INTERVAL = 30
0029
0030
0031 AGENTS_CONF = 'agents.supervisord.conf'
0032
0033
0034 AGENT_PROGRAM_MAP = {
0035 'data': 'example-data-agent',
0036 'processing': 'example-processing-agent',
0037 'fastmon': 'example-fastmon-agent',
0038 'fast_processing': 'fast-processing-agent',
0039 }
0040
0041
0042 class UserAgentManager(stomp.ConnectionListener):
0043 """
0044 Lightweight agent manager for a single user.
0045
0046 Manages the user's testbed agents via supervisord.
0047 """
0048
0049 def __init__(self, testbed_dir: Path = None):
0050 self.username = getpass.getuser()
0051 self.control_queue = f'/queue/agent_control.{self.username}'
0052 self.testbed_dir = testbed_dir or Path(__file__).parent.parent.parent
0053
0054
0055 self.mq_host = os.getenv('ACTIVEMQ_HOST', 'localhost')
0056 self.mq_port = int(os.getenv('ACTIVEMQ_PORT', 61612))
0057 self.mq_user = os.getenv('ACTIVEMQ_USER', 'admin')
0058 self.mq_password = os.getenv('ACTIVEMQ_PASSWORD', 'admin')
0059 self.use_ssl = os.getenv('ACTIVEMQ_USE_SSL', 'false').lower() == 'true'
0060 self.ssl_ca_certs = os.getenv('ACTIVEMQ_SSL_CA_CERTS', '')
0061
0062
0063 self.monitor_url = os.getenv('SWF_MONITOR_URL', 'https://pandaserver02.sdcc.bnl.gov/swf-monitor')
0064 self.api_token = os.getenv('SWF_API_TOKEN')
0065
0066
0067 import requests
0068 self.api = requests.Session()
0069 if self.api_token:
0070 self.api.headers.update({'Authorization': f'Token {self.api_token}'})
0071 self.api.verify = False
0072
0073
0074 self.running = True
0075 self.last_heartbeat = None
0076 self.agents_running = False
0077 self.namespace = None
0078 self.config = None
0079
0080
0081 self.instance_name = f'agent-manager-{self.username}'
0082 base_url = os.getenv('SWF_MONITOR_HTTP_URL', 'http://localhost:8002')
0083 self.logger = setup_rest_logging('agent_manager', self.instance_name, base_url)
0084
0085
0086 env_config = os.getenv('SWF_TESTBED_CONFIG')
0087 if env_config:
0088 self.load_config(env_config)
0089
0090
0091 self.conn = stomp.Connection(
0092 host_and_ports=[(self.mq_host, self.mq_port)],
0093 vhost=self.mq_host,
0094 try_loopback_connect=False,
0095 heartbeats=(30000, 30000),
0096 auto_content_length=False
0097 )
0098
0099
0100 if self.use_ssl and self.ssl_ca_certs:
0101 import ssl
0102 self.conn.transport.set_ssl(
0103 for_hosts=[(self.mq_host, self.mq_port)],
0104 ca_certs=self.ssl_ca_certs,
0105 ssl_version=ssl.PROTOCOL_TLS_CLIENT
0106 )
0107
0108 self.conn.set_listener('', self)
0109
0110
0111 signal.signal(signal.SIGINT, self._signal_handler)
0112 signal.signal(signal.SIGTERM, self._signal_handler)
0113 signal.signal(signal.SIGUSR1, self._sigusr1_handler)
0114
0115 def _signal_handler(self, signum, frame):
0116 """Handle shutdown signals."""
0117 self.logger.info(f"\nReceived signal {signum}, shutting down...")
0118 self.running = False
0119
0120 def _sigusr1_handler(self, signum, frame):
0121 """SIGUSR1 triggers immediate heartbeat refresh."""
0122 self.send_heartbeat()
0123
0124 def connect(self):
0125 """Connect to ActiveMQ and subscribe to control queue."""
0126 self.logger.info(f"Connecting to ActiveMQ at {self.mq_host}:{self.mq_port}...")
0127 self.conn.connect(self.mq_user, self.mq_password, wait=True)
0128
0129 self.logger.info(f"Subscribing to {self.control_queue}...")
0130 self.conn.subscribe(destination=self.control_queue, id='control', ack='auto')
0131
0132 self.logger.info(f"User Agent Manager ready for {self.username}")
0133
0134 def disconnect(self):
0135 """Disconnect from ActiveMQ."""
0136 if self.conn.is_connected():
0137 self.conn.disconnect()
0138
0139 def on_message(self, frame):
0140 """Handle incoming control messages."""
0141 try:
0142 message = json.loads(frame.body)
0143 command = message.get('command')
0144
0145 self.logger.info(f"[{datetime.now().strftime('%H:%M:%S')}] Received command: {command}")
0146
0147 if command == 'start_testbed':
0148 config_name = message.get('config_name')
0149 if not self.handle_start_testbed(config_name):
0150 self.logger.error(f"start_testbed FAILED (config: {config_name or 'default'})")
0151 elif command == 'stop_testbed':
0152 if not self.handle_stop_testbed():
0153 self.logger.error("stop_testbed FAILED")
0154 elif command == 'restart':
0155 self.handle_restart()
0156 elif command == 'status':
0157 self.handle_status(message.get('reply_to'))
0158 elif command == 'ping':
0159 self.handle_ping(message.get('reply_to'))
0160 else:
0161 self.logger.info(f"Unknown command: {command}")
0162
0163 except Exception as e:
0164 self.logger.error(f"Error processing message: {e}")
0165
0166 def on_error(self, frame):
0167 """Handle connection errors."""
0168 self.logger.error(f"STOMP error: {frame.body}")
0169
0170 def on_disconnected(self):
0171 """Handle disconnection."""
0172 if self.running:
0173 self.logger.info("Disconnected from ActiveMQ, attempting reconnect...")
0174 time.sleep(5)
0175 try:
0176 self.connect()
0177 except Exception as e:
0178 self.logger.error(f"Reconnect failed: {e}")
0179
0180 def load_config(self, config_name: str = None) -> dict:
0181 """Load testbed config and update namespace.
0182
0183 Args:
0184 config_name: Config file name (default: from SWF_TESTBED_CONFIG env var,
0185 or workflows/testbed.toml). Can be just name (looked up
0186 in workflows/) or relative path.
0187
0188 Returns:
0189 Parsed config dict
0190 """
0191 if config_name is None:
0192 config_name = os.getenv('SWF_TESTBED_CONFIG', 'workflows/testbed.toml')
0193
0194 if '/' in config_name:
0195 config_path = self.testbed_dir / config_name
0196 else:
0197 if not config_name.endswith('.toml'):
0198 config_name = f'{config_name}.toml'
0199 config_path = self.testbed_dir / 'workflows' / config_name
0200
0201 if not config_path.exists():
0202 self.logger.error(f"Config not found: {config_path}")
0203 return {}
0204
0205 self.logger.info(f"Loading config: {config_path}")
0206 with open(config_path, 'rb') as f:
0207 self.config = tomllib.load(f)
0208
0209
0210 self.namespace = self.config.get('testbed', {}).get('namespace')
0211 if self.namespace:
0212 self.logger.info(f"Namespace: {self.namespace}")
0213 else:
0214 self.logger.warning("No namespace in config")
0215
0216 return self.config
0217
0218 def get_enabled_agents(self) -> list:
0219 """Get list of supervisord program names for enabled agents."""
0220 if not self.config:
0221 return []
0222
0223 agents_config = self.config.get('agents', {})
0224 enabled = []
0225
0226 for agent_name, agent_conf in agents_config.items():
0227 if agent_conf.get('enabled', False):
0228 program_name = AGENT_PROGRAM_MAP.get(agent_name)
0229 if program_name:
0230 enabled.append(program_name)
0231 else:
0232 self.logger.warning(f"Unknown agent '{agent_name}' - no program mapping")
0233
0234 return enabled
0235
0236 def handle_start_testbed(self, config_name: str = None):
0237 """Start the testbed agents and workflow runner.
0238
0239 Refuses to start if agents are already running. User must call
0240 stop_testbed first to ensure clean slate.
0241 """
0242 self.logger.info(f"Starting testbed (config: {config_name or 'current'})...")
0243
0244
0245 if config_name:
0246 self.load_config(config_name)
0247 elif not self.config:
0248 self.load_config()
0249
0250
0251
0252 running_agents = self._get_running_agents()
0253 if running_agents:
0254 self.logger.error(
0255 f"Cannot start: agents already running: {running_agents}. "
0256 "Run stop_testbed first to ensure clean slate."
0257 )
0258 return False
0259
0260
0261 if not self._restart_supervisord():
0262 self.logger.error("Failed to restart supervisord")
0263 return False
0264
0265
0266 if not self._start_program('workflow-runner'):
0267 self.logger.error("Failed to start workflow-runner")
0268 return False
0269
0270
0271 enabled_agents = self.get_enabled_agents()
0272 if not enabled_agents:
0273 self.logger.warning("No agents enabled in config")
0274
0275 failed = []
0276 for agent in enabled_agents:
0277 if not self._start_program(agent):
0278 failed.append(agent)
0279
0280 if failed:
0281 self.logger.error(f"Failed to start agents: {failed}")
0282 return False
0283
0284 self.agents_running = True
0285 self.logger.info("Testbed started")
0286 return True
0287
0288 def handle_stop_testbed(self):
0289 """Stop all testbed agents."""
0290 self.logger.info("Stopping testbed...")
0291 supervisorctl = self._get_venv_bin('supervisorctl')
0292
0293 result = subprocess.run(
0294 [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'stop', 'all'],
0295 capture_output=True,
0296 text=True,
0297 cwd=self.testbed_dir
0298 )
0299
0300 if result.returncode not in [0, 4]:
0301 self.logger.error(f"Error stopping testbed: {result.stderr}")
0302 return False
0303
0304 self.agents_running = False
0305 self.logger.info("Testbed stopped")
0306 return True
0307
0308 def handle_restart(self):
0309 """Restart the agent manager with fresh code."""
0310 self.logger.info("Restarting agent manager...")
0311 self.handle_stop_testbed()
0312
0313
0314 testbed = self._get_venv_bin('testbed')
0315 subprocess.Popen(
0316 ['nohup', testbed, 'agent-manager'],
0317 stdout=open('/tmp/agent-manager.log', 'a'),
0318 stderr=subprocess.STDOUT,
0319 cwd=self.testbed_dir,
0320 start_new_session=True
0321 )
0322
0323 self.logger.info("New agent manager spawned, exiting")
0324
0325 self.running = False
0326 try:
0327 self.disconnect()
0328 except Exception:
0329 pass
0330 os._exit(0)
0331
0332 def handle_status(self, reply_to: str = None):
0333 """Get status of testbed agents."""
0334 supervisorctl = self._get_venv_bin('supervisorctl')
0335 result = subprocess.run(
0336 [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'status'],
0337 capture_output=True,
0338 text=True,
0339 cwd=self.testbed_dir
0340 )
0341
0342 status = {
0343 'username': self.username,
0344 'agents_running': self.agents_running,
0345 'supervisord_status': result.stdout,
0346 'timestamp': datetime.now().isoformat()
0347 }
0348
0349 if reply_to:
0350 self.conn.send(destination=reply_to, body=json.dumps(status))
0351
0352 return status
0353
0354 def handle_ping(self, reply_to: str = None):
0355 """Respond to ping (liveness check)."""
0356 response = {
0357 'status': 'alive',
0358 'username': self.username,
0359 'timestamp': datetime.now().isoformat()
0360 }
0361
0362 if reply_to:
0363 self.conn.send(destination=reply_to, body=json.dumps(response))
0364
0365 return response
0366
0367 def _get_venv_bin(self, cmd: str) -> str:
0368 """Get full path to command in venv bin directory."""
0369 venv_bin = self.testbed_dir / '.venv' / 'bin' / cmd
0370 if venv_bin.exists():
0371 return str(venv_bin)
0372
0373 return cmd
0374
0375 def _ensure_supervisord(self) -> bool:
0376 """Start supervisord if not running."""
0377 conf_path = self.testbed_dir / AGENTS_CONF
0378 supervisorctl = self._get_venv_bin('supervisorctl')
0379 supervisord = self._get_venv_bin('supervisord')
0380
0381 if not conf_path.exists():
0382 self.logger.error(f"{AGENTS_CONF} not found in {self.testbed_dir}")
0383 return False
0384
0385
0386 result = subprocess.run(
0387 [supervisorctl, '-c', str(conf_path), 'status'],
0388 capture_output=True,
0389 text=True,
0390 cwd=self.testbed_dir
0391 )
0392
0393 if result.returncode == 4:
0394 self.logger.info("Starting supervisord...")
0395 start_result = subprocess.run(
0396 [supervisord, '-c', str(conf_path)],
0397 capture_output=True,
0398 text=True,
0399 cwd=self.testbed_dir
0400 )
0401 if start_result.returncode != 0:
0402 self.logger.error(f"Error starting supervisord: {start_result.stderr}")
0403 return False
0404 time.sleep(1)
0405
0406 return True
0407
0408 def _restart_supervisord(self) -> bool:
0409 """Restart supervisord to pick up current environment and config.
0410
0411 Always restarts to ensure fresh env vars (like SWF_TESTBED_CONFIG) are available.
0412 Called after verifying no agents are running.
0413 """
0414 conf_path = self.testbed_dir / AGENTS_CONF
0415 supervisorctl = self._get_venv_bin('supervisorctl')
0416 supervisord = self._get_venv_bin('supervisord')
0417
0418 if not conf_path.exists():
0419 self.logger.error(f"{AGENTS_CONF} not found in {self.testbed_dir}")
0420 return False
0421
0422
0423 result = subprocess.run(
0424 [supervisorctl, '-c', str(conf_path), 'status'],
0425 capture_output=True,
0426 text=True,
0427 cwd=self.testbed_dir
0428 )
0429
0430 if result.returncode != 4:
0431 self.logger.info("Restarting supervisord to pick up current environment...")
0432 subprocess.run(
0433 [supervisorctl, '-c', str(conf_path), 'shutdown'],
0434 capture_output=True,
0435 cwd=self.testbed_dir
0436 )
0437 time.sleep(1)
0438
0439
0440 self.logger.info("Starting supervisord...")
0441 start_result = subprocess.run(
0442 [supervisord, '-c', str(conf_path)],
0443 capture_output=True,
0444 text=True,
0445 cwd=self.testbed_dir
0446 )
0447
0448 if start_result.returncode != 0:
0449 self.logger.error(f"Error starting supervisord: {start_result.stderr}")
0450 return False
0451
0452 time.sleep(1)
0453
0454
0455 verify = subprocess.run(
0456 [supervisorctl, '-c', str(conf_path), 'status'],
0457 capture_output=True,
0458 text=True,
0459 cwd=self.testbed_dir
0460 )
0461 if verify.returncode == 4:
0462 self.logger.error(
0463 f"Supervisord started (exit 0) but not responding. "
0464 f"stderr: {start_result.stderr.strip()}"
0465 )
0466 return False
0467
0468 return True
0469
0470 def _start_program(self, program_name: str) -> bool:
0471 """Start a supervisord program."""
0472 supervisorctl = self._get_venv_bin('supervisorctl')
0473 result = subprocess.run(
0474 [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'start', program_name],
0475 capture_output=True,
0476 text=True,
0477 cwd=self.testbed_dir
0478 )
0479
0480 if result.returncode == 0 or 'already started' in result.stdout.lower():
0481 self.logger.info(f" {program_name}: started")
0482 return True
0483 else:
0484 self.logger.error(f"{program_name}: failed - {result.stderr.strip()}")
0485 return False
0486
0487 def _get_running_agents(self) -> list:
0488 """Get list of currently running agent program names."""
0489 supervisorctl = self._get_venv_bin('supervisorctl')
0490 result = subprocess.run(
0491 [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'status'],
0492 capture_output=True,
0493 text=True,
0494 cwd=self.testbed_dir
0495 )
0496
0497 if result.returncode == 4:
0498 return []
0499
0500 running = []
0501 for line in result.stdout.strip().split('\n'):
0502 if 'RUNNING' in line:
0503
0504 program_name = line.split()[0]
0505 running.append(program_name)
0506
0507 return running
0508
0509 def _reread_supervisord_config(self) -> bool:
0510 """Reread supervisord config to pick up changes."""
0511 supervisorctl = self._get_venv_bin('supervisorctl')
0512 result = subprocess.run(
0513 [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'reread'],
0514 capture_output=True,
0515 text=True,
0516 cwd=self.testbed_dir
0517 )
0518
0519 if result.returncode == 0:
0520 if result.stdout.strip():
0521 self.logger.info(f"Config changes detected: {result.stdout.strip()}")
0522 return True
0523 else:
0524 self.logger.warning(f"Config reread failed: {result.stderr.strip()}")
0525 return False
0526
0527 def _check_supervisord_health(self) -> dict:
0528 """Verify supervisord health: responding, genuinely idle, or zombie."""
0529 supervisorctl = self._get_venv_bin('supervisorctl')
0530 result = subprocess.run(
0531 [supervisorctl, '-c', str(self.testbed_dir / AGENTS_CONF), 'status'],
0532 capture_output=True,
0533 text=True,
0534 cwd=self.testbed_dir
0535 )
0536
0537 if result.returncode == 4:
0538
0539 try:
0540 check = subprocess.run(
0541 ['pgrep', '-f', f'supervisord.*{AGENTS_CONF}'],
0542 capture_output=True, text=True
0543 )
0544 if check.returncode == 0 and check.stdout.strip():
0545 pid = check.stdout.strip().split()[0]
0546 return {'healthy': False, 'error': f'not responding but stale process exists (PID {pid})'}
0547 except Exception:
0548 pass
0549 return {'healthy': True, 'state': 'idle'}
0550
0551 running = [
0552 line.split()[0] for line in result.stdout.strip().split('\n')
0553 if 'RUNNING' in line
0554 ]
0555 return {'healthy': True, 'state': 'running', 'running_agents': running}
0556
0557 def send_heartbeat(self):
0558 """Send heartbeat to monitor API (using authenticated session like BaseAgent)."""
0559 try:
0560 sv_health = self._check_supervisord_health()
0561
0562 desc_parts = [f'Agent manager for {self.username}']
0563 if self.namespace:
0564 desc_parts.append(f'namespace: {self.namespace}')
0565 desc_parts.append('MQ: connected')
0566
0567 if sv_health['healthy']:
0568 status = 'OK'
0569 state = sv_health.get('state', 'running')
0570 desc_parts.append(f"supervisord: {state}")
0571 else:
0572 status = 'ERROR'
0573 desc_parts.append(f"supervisord: {sv_health['error']}")
0574 self.logger.error(f"Supervisord health check failed: {sv_health['error']}")
0575
0576 data = {
0577 'instance_name': f'agent-manager-{self.username}',
0578 'agent_type': 'agent_manager',
0579 'status': status,
0580 'operational_state': 'READY',
0581 'namespace': self.namespace,
0582 'pid': os.getpid(),
0583 'hostname': os.uname().nodename,
0584 'description': '. '.join(desc_parts),
0585 'metadata': {'supervisord_healthy': sv_health['healthy']},
0586 }
0587
0588 response = self.api.post(
0589 f"{self.monitor_url}/api/systemagents/heartbeat/",
0590 json=data,
0591 timeout=5,
0592 )
0593
0594 if response.ok:
0595 self.last_heartbeat = datetime.now()
0596
0597 except Exception as e:
0598 self.logger.error(f"Heartbeat failed: {e}")
0599
0600 def run(self):
0601 """Main run loop."""
0602 self.connect()
0603
0604
0605 self.send_heartbeat()
0606 last_heartbeat_time = time.time()
0607
0608 self.logger.info(f"Listening for commands on {self.control_queue}")
0609 self.logger.info("Press Ctrl+C to stop")
0610
0611 while self.running:
0612 try:
0613
0614 now = time.time()
0615 if now - last_heartbeat_time >= HEARTBEAT_INTERVAL:
0616 self.send_heartbeat()
0617 last_heartbeat_time = now
0618
0619 time.sleep(1)
0620
0621 except KeyboardInterrupt:
0622 break
0623
0624 self.logger.info("Shutting down...")
0625 self._send_exit_heartbeat()
0626 self.disconnect()
0627
0628 def _send_exit_heartbeat(self):
0629 """Send final heartbeat marking this agent as EXITED."""
0630 try:
0631 data = {
0632 'instance_name': f'agent-manager-{self.username}',
0633 'agent_type': 'agent_manager',
0634 'status': 'EXITED',
0635 'operational_state': 'EXITED',
0636 'namespace': self.namespace,
0637 'pid': os.getpid(),
0638 'hostname': os.uname().nodename,
0639 'description': f'Agent manager for {self.username}. Shut down.',
0640 }
0641 self.api.post(
0642 f"{self.monitor_url}/api/systemagents/heartbeat/",
0643 json=data,
0644 timeout=5,
0645 )
0646 self.logger.info("Exit heartbeat sent")
0647 except Exception as e:
0648 self.logger.error(f"Failed to send exit heartbeat: {e}")
0649
0650
0651 def main():
0652 """Entry point."""
0653
0654 from pathlib import Path
0655 env_file = Path.home() / '.env'
0656 if env_file.exists():
0657 with open(env_file) as f:
0658 for line in f:
0659 line = line.strip()
0660 if line and not line.startswith('#') and '=' in line:
0661 if line.startswith('export '):
0662 line = line[7:]
0663 key, value = line.split('=', 1)
0664 os.environ[key] = value.strip('"\'')
0665
0666 manager = UserAgentManager()
0667 manager.run()
0668
0669
0670 if __name__ == '__main__':
0671 main()