File indexing completed on 2026-04-27 07:41:42
0001 """
0002 System and infrastructure MCP tools.
0003
0004 Includes: system state, agents, namespaces, logs, testbed management.
0005 """
0006
0007 import logging
0008 from datetime import timedelta
0009 from django.utils import timezone
0010 from asgiref.sync import sync_to_async
0011
0012 from mcp_server import mcp_server as mcp
0013
0014 from ..models import SystemAgent, RunState, PersistentState, SystemStateEvent, AppLog
0015 from ..workflow_models import WorkflowExecution, WorkflowMessage, Namespace
0016 from .common import _parse_time, _default_start_time, _monitor_url, _get_testbed_config_path, _get_username
0017
0018 logger = logging.getLogger(__name__)
0019
0020
0021
0022
0023
0024
0025 @mcp.tool()
0026 async def swf_get_system_state(username: str = None) -> dict:
0027 """
0028 Get comprehensive system state including agents, executions, run states, and persistent state.
0029
0030 Use this tool first to get a high-level view of the entire system before drilling
0031 into specific details. This is the starting point for understanding testbed health.
0032
0033 Args:
0034 username: Username to get context for (reads their testbed.toml).
0035 If not provided, uses SWF_HOME environment variable.
0036
0037 Returns:
0038 - timestamp: current server time
0039 - user_context: namespace, workflow defaults from testbed.toml
0040 - agent_manager: status of user's agent manager daemon
0041 - workflow_runner: status of healthy DAQ_Simulator that can accept start_workflow
0042 - ready_to_run: boolean - True if workflow_runner is healthy
0043 - last_execution: most recent workflow execution for user's namespace
0044 - errors_last_hour: count of ERROR logs in user's namespace
0045 - agents: total, healthy (heartbeat <5min), unhealthy counts
0046 - executions: running count, completed in last hour
0047 - messages: count in last 10 minutes
0048 - run_states: current fast processing run states
0049 - persistent_state: system-wide persistent state (next IDs, etc.)
0050 - recent_events: last 10 system state events
0051 """
0052 import os
0053 from pathlib import Path
0054
0055 username = _get_username(username)
0056 swf_home = os.getenv('SWF_HOME', f'/data/{username}/github')
0057
0058 @sync_to_async
0059 def fetch():
0060 SystemAgent.mark_stale_agents()
0061
0062 now = timezone.now()
0063 recent_threshold = now - timedelta(minutes=5)
0064
0065
0066 testbed_toml, config_source = _get_testbed_config_path()
0067 user_context = {
0068 "username": username,
0069 "namespace": None,
0070 "workflow_name": None,
0071 "config": None,
0072 "config_file": str(testbed_toml.name) if testbed_toml else None,
0073 "config_source": config_source,
0074 }
0075 if testbed_toml and testbed_toml.exists():
0076 try:
0077 import tomllib
0078 with open(testbed_toml, 'rb') as f:
0079 toml_data = tomllib.load(f)
0080 user_context["namespace"] = toml_data.get('testbed', {}).get('namespace')
0081 workflow_section = toml_data.get('workflow', {})
0082 user_context["workflow_name"] = workflow_section.get('name')
0083 user_context["config"] = workflow_section.get('config')
0084 except Exception:
0085 pass
0086
0087 user_namespace = user_context.get("namespace")
0088
0089
0090 agent_manager_name = f'agent-manager-{username}'
0091 agent_manager = {"status": "missing", "last_heartbeat": None}
0092 try:
0093 am = SystemAgent.objects.get(instance_name=agent_manager_name)
0094 if am.operational_state == 'EXITED':
0095 agent_manager["status"] = "exited"
0096 elif am.last_heartbeat and am.last_heartbeat >= recent_threshold:
0097 agent_manager["status"] = "healthy"
0098 else:
0099 agent_manager["status"] = "unhealthy"
0100 agent_manager["last_heartbeat"] = am.last_heartbeat.isoformat() if am.last_heartbeat else None
0101 except SystemAgent.DoesNotExist:
0102 pass
0103
0104
0105 workflow_runner = {"status": "missing", "name": None, "last_heartbeat": None}
0106 runner_qs = SystemAgent.objects.filter(
0107 agent_type__in=['DAQ_Simulator', 'workflow_runner'],
0108 namespace=user_namespace,
0109 last_heartbeat__gte=recent_threshold,
0110 ).exclude(operational_state='EXITED').order_by('-last_heartbeat')
0111
0112 if runner_qs.exists():
0113 runner = runner_qs.first()
0114 workflow_runner["status"] = "healthy"
0115 workflow_runner["name"] = runner.instance_name
0116 workflow_runner["last_heartbeat"] = runner.last_heartbeat.isoformat() if runner.last_heartbeat else None
0117 else:
0118 any_runner = SystemAgent.objects.filter(
0119 agent_type__in=['DAQ_Simulator', 'workflow_runner'],
0120 namespace=user_namespace,
0121 ).exclude(operational_state='EXITED').first()
0122 if any_runner:
0123 workflow_runner["status"] = "unhealthy"
0124 workflow_runner["name"] = any_runner.instance_name
0125 workflow_runner["last_heartbeat"] = any_runner.last_heartbeat.isoformat() if any_runner.last_heartbeat else None
0126
0127 ready_to_run = workflow_runner["status"] == "healthy"
0128
0129
0130 last_execution = None
0131 if user_namespace:
0132 last_exec = WorkflowExecution.objects.filter(
0133 namespace=user_namespace
0134 ).order_by('-start_time').first()
0135 if last_exec:
0136 last_execution = {
0137 "execution_id": last_exec.execution_id,
0138 "status": last_exec.status,
0139 "start_time": last_exec.start_time.isoformat() if last_exec.start_time else None,
0140 "end_time": last_exec.end_time.isoformat() if last_exec.end_time else None,
0141 }
0142
0143
0144 errors_last_hour = 0
0145 if user_namespace:
0146 import logging as py_logging
0147 errors_last_hour = AppLog.objects.filter(
0148 level__gte=py_logging.ERROR,
0149 timestamp__gte=now - timedelta(hours=1),
0150 extra_data__namespace=user_namespace,
0151 ).count()
0152
0153
0154 total_agents = SystemAgent.objects.count()
0155 exited_agents = SystemAgent.objects.filter(operational_state='EXITED').count()
0156 active_agents = total_agents - exited_agents
0157
0158 healthy_agents = SystemAgent.objects.filter(
0159 last_heartbeat__gte=recent_threshold,
0160 status='OK'
0161 ).exclude(operational_state='EXITED').count()
0162
0163
0164 running_executions = WorkflowExecution.objects.filter(status='running').count()
0165 recent_completed = WorkflowExecution.objects.filter(
0166 status='completed',
0167 end_time__gte=now - timedelta(hours=1)
0168 ).count()
0169
0170
0171 recent_messages = WorkflowMessage.objects.filter(
0172 sent_at__gte=now - timedelta(minutes=10)
0173 ).count()
0174
0175
0176 run_states = [
0177 {
0178 "run_number": rs.run_number,
0179 "phase": rs.phase,
0180 "state": rs.state,
0181 "substate": rs.substate,
0182 "active_workers": rs.active_worker_count,
0183 "target_workers": rs.target_worker_count,
0184 "slices_queued": rs.slices_queued,
0185 "slices_processing": rs.slices_processing,
0186 "slices_completed": rs.slices_completed,
0187 "slices_failed": rs.slices_failed,
0188 }
0189 for rs in RunState.objects.all().order_by('-run_number')[:5]
0190 ]
0191
0192
0193 persistent_state = PersistentState.get_state()
0194
0195
0196 recent_events = [
0197 {
0198 "timestamp": e.timestamp.isoformat(),
0199 "run_number": e.run_number,
0200 "event_type": e.event_type,
0201 "state": e.state,
0202 }
0203 for e in SystemStateEvent.objects.order_by('-timestamp')[:10]
0204 ]
0205
0206 return {
0207 "timestamp": now.isoformat(),
0208 "user_context": user_context,
0209 "agent_manager": agent_manager,
0210 "workflow_runner": workflow_runner,
0211 "ready_to_run": ready_to_run,
0212 "last_execution": last_execution,
0213 "errors_last_hour": errors_last_hour,
0214 "agents": {
0215 "total": total_agents,
0216 "active": active_agents,
0217 "exited": exited_agents,
0218 "healthy": healthy_agents,
0219 "unhealthy": active_agents - healthy_agents,
0220 },
0221 "executions": {
0222 "running": running_executions,
0223 "completed_last_hour": recent_completed,
0224 },
0225 "messages_last_10min": recent_messages,
0226 "run_states": run_states,
0227 "persistent_state": persistent_state,
0228 "recent_events": recent_events,
0229 }
0230
0231 return await fetch()
0232
0233
0234
0235
0236
0237
0238 @mcp.tool()
0239 async def swf_list_agents(
0240 namespace: str = None,
0241 agent_type: str = None,
0242 status: str = None,
0243 execution_id: str = None,
0244 start_time: str = None,
0245 end_time: str = None,
0246 ) -> list:
0247 """
0248 List registered agents with filtering options.
0249
0250 Agents are processes that participate in workflows (DAQ simulator, data agent,
0251 processing agent, fast monitoring agent). Each sends periodic heartbeats.
0252
0253 By default, excludes EXITED agents. Use status='EXITED' to see only exited,
0254 or status='all' to see all agents regardless of status.
0255
0256 Args:
0257 namespace: Filter to agents in this namespace (e.g., 'torre1', 'wenauseic')
0258 agent_type: Filter by type: 'daqsim', 'data', 'processing', 'fastmon', 'workflow_runner'
0259 status: Filter by status: 'OK', 'WARNING', 'ERROR', 'UNKNOWN', 'EXITED', or 'all'.
0260 Default (None) excludes EXITED agents.
0261 execution_id: Filter to agents that participated in this workflow execution
0262 start_time: Filter to agents with heartbeat >= this ISO datetime
0263 end_time: Filter to agents with heartbeat <= this ISO datetime
0264
0265 Returns list of agents with: name, agent_type, status, operational_state, namespace,
0266 last_heartbeat, workflow_enabled, total_stf_processed
0267 """
0268 @sync_to_async
0269 def fetch():
0270 SystemAgent.mark_stale_agents()
0271
0272 qs = SystemAgent.objects.all().order_by('-last_heartbeat')
0273
0274 if namespace:
0275 qs = qs.filter(namespace=namespace)
0276 if agent_type:
0277 qs = qs.filter(agent_type=agent_type)
0278
0279 if status is None:
0280 qs = qs.exclude(status='EXITED')
0281 elif status.lower() != 'all':
0282 qs = qs.filter(status__iexact=status)
0283
0284 start = _parse_time(start_time)
0285 end = _parse_time(end_time)
0286 if start:
0287 qs = qs.filter(last_heartbeat__gte=start)
0288 if end:
0289 qs = qs.filter(last_heartbeat__lte=end)
0290
0291 if execution_id:
0292 agent_names = WorkflowMessage.objects.filter(
0293 execution_id=execution_id
0294 ).values_list('sender_agent', flat=True).distinct()
0295 qs = qs.filter(instance_name__in=agent_names)
0296
0297 params = []
0298 if namespace:
0299 params.append(f"namespace={namespace}")
0300 if agent_type:
0301 params.append(f"agent_type={agent_type}")
0302 if status and status.lower() != 'all':
0303 params.append(f"status={status}")
0304 query_string = "&".join(params)
0305 url = _monitor_url(f"/workflow/agents/?{query_string}" if query_string else "/workflow/agents/")
0306
0307 MAX_ITEMS = 100
0308 total_count = qs.count()
0309 items = [
0310 {
0311 "name": a.instance_name,
0312 "agent_type": a.agent_type,
0313 "status": a.status,
0314 "operational_state": a.operational_state,
0315 "namespace": a.namespace,
0316 "last_heartbeat": a.last_heartbeat.isoformat() if a.last_heartbeat else None,
0317 "workflow_enabled": a.workflow_enabled,
0318 "total_stf_processed": a.total_stf_processed,
0319 }
0320 for a in qs[:MAX_ITEMS]
0321 ]
0322 return {
0323 "items": items,
0324 "total_count": total_count,
0325 "has_more": total_count > MAX_ITEMS,
0326 "monitor_urls": [
0327 {"title": "Agents List", "url": url},
0328 ],
0329 }
0330
0331 return await fetch()
0332
0333
0334 @mcp.tool()
0335 async def swf_get_agent(name: str) -> dict:
0336 """
0337 Get detailed information about a specific agent.
0338
0339 Use swf_list_agents first to see available agent names if you don't know them.
0340
0341 Args:
0342 name: The exact agent instance name (e.g., 'daq_simulator_torre1')
0343
0344 Returns: name, agent_type, status, namespace, last_heartbeat, description,
0345 workflow_enabled, current_stf_count, total_stf_processed, metadata
0346 """
0347 @sync_to_async
0348 def fetch():
0349 try:
0350 a = SystemAgent.objects.get(instance_name=name)
0351 return {
0352 "name": a.instance_name,
0353 "agent_type": a.agent_type,
0354 "status": a.status,
0355 "namespace": a.namespace,
0356 "last_heartbeat": a.last_heartbeat.isoformat() if a.last_heartbeat else None,
0357 "description": a.description,
0358 "workflow_enabled": a.workflow_enabled,
0359 "current_stf_count": a.current_stf_count,
0360 "total_stf_processed": a.total_stf_processed,
0361 "last_stf_processed": a.last_stf_processed.isoformat() if a.last_stf_processed else None,
0362 "metadata": a.metadata,
0363 "monitor_urls": [
0364 {"title": "Agent Detail", "url": _monitor_url(f"/workflow/agents/{a.instance_name}/")},
0365 ],
0366 }
0367 except SystemAgent.DoesNotExist:
0368 return {"error": f"Agent '{name}' not found. Use swf_list_agents to see available agents."}
0369
0370 return await fetch()
0371
0372
0373
0374
0375
0376
0377 @mcp.tool()
0378 async def swf_list_namespaces() -> list:
0379 """
0380 List all testbed namespaces.
0381
0382 Namespaces provide isolation between different users' workflow runs.
0383 Each namespace has its own set of agents and workflow executions.
0384
0385 Returns list of namespaces with: name, owner, description
0386 """
0387 @sync_to_async
0388 def fetch():
0389 qs = Namespace.objects.all().order_by('name')
0390 MAX_ITEMS = 100
0391 total_count = qs.count()
0392 items = [
0393 {
0394 "name": n.name,
0395 "owner": n.owner,
0396 "description": n.description,
0397 }
0398 for n in qs[:MAX_ITEMS]
0399 ]
0400 return {
0401 "items": items,
0402 "total_count": total_count,
0403 "has_more": total_count > MAX_ITEMS,
0404 "monitor_urls": [
0405 {"title": "Namespaces List", "url": _monitor_url("/namespaces/")},
0406 ],
0407 }
0408
0409 return await fetch()
0410
0411
0412 @mcp.tool()
0413 async def swf_get_namespace(
0414 namespace: str,
0415 start_time: str = None,
0416 end_time: str = None,
0417 ) -> dict:
0418 """
0419 Get detailed information about a namespace including activity counts.
0420
0421 Args:
0422 namespace: The namespace name (required)
0423 start_time: Count activity from this ISO datetime (default: last 24 hours)
0424 end_time: Count activity until this ISO datetime (default: now)
0425
0426 Returns: name, owner, description, agent_count, execution_count, message_count,
0427 active_users (users who ran executions in the time range)
0428 """
0429 @sync_to_async
0430 def fetch():
0431 try:
0432 ns = Namespace.objects.get(name=namespace)
0433 ns_info = {
0434 "name": ns.name,
0435 "owner": ns.owner,
0436 "description": ns.description,
0437 }
0438 except Namespace.DoesNotExist:
0439 ns_info = {
0440 "name": namespace,
0441 "owner": None,
0442 "description": None,
0443 }
0444
0445 start = _parse_time(start_time) or _default_start_time(24)
0446 end = _parse_time(end_time) or timezone.now()
0447
0448 agent_count = SystemAgent.objects.filter(namespace=namespace).count()
0449
0450 execution_qs = WorkflowExecution.objects.filter(
0451 namespace=namespace,
0452 start_time__gte=start,
0453 start_time__lte=end,
0454 )
0455 execution_count = execution_qs.count()
0456 active_users = list(execution_qs.values_list('executed_by', flat=True).distinct())
0457
0458 message_count = WorkflowMessage.objects.filter(
0459 namespace=namespace,
0460 sent_at__gte=start,
0461 sent_at__lte=end,
0462 ).count()
0463
0464 return {
0465 **ns_info,
0466 "agent_count": agent_count,
0467 "execution_count": execution_count,
0468 "message_count": message_count,
0469 "active_users": active_users,
0470 "time_range": {
0471 "start": start.isoformat(),
0472 "end": end.isoformat(),
0473 },
0474 "monitor_urls": [
0475 {"title": "Namespace Detail", "url": _monitor_url(f"/workflow/namespaces/{namespace}/")},
0476 ],
0477 }
0478
0479 return await fetch()
0480
0481
0482
0483
0484
0485
0486 @mcp.tool()
0487 async def swf_list_logs(
0488 app_name: str = None,
0489 instance_name: str = None,
0490 execution_id: str = None,
0491 level: str = None,
0492 search: str = None,
0493 start_time: str = None,
0494 end_time: str = None,
0495 ) -> list:
0496 """
0497 List application log entries with filtering.
0498
0499 All agents log to the central database via Python's logging module.
0500 Use this tool to discover errors, debug issues, and understand system behavior.
0501
0502 DIAGNOSTIC USE CASES:
0503 - Workflow logs: list_logs(execution_id='stf_datataking-user-0044')
0504 - Debug a specific agent: list_logs(instance_name='daq_simulator-agent-user-123')
0505 - Find all errors: list_logs(level='ERROR')
0506 - Search for specific issues: list_logs(search='connection failed')
0507
0508 Args:
0509 app_name: Filter by application name (e.g., 'daq_simulator', 'data_agent')
0510 instance_name: Filter by agent instance name
0511 execution_id: Filter by workflow execution ID (e.g., 'stf_datataking-wenauseic-0044')
0512 level: Minimum log level - returns this level and higher severity.
0513 DEBUG (all), INFO, WARNING, ERROR, CRITICAL
0514 search: Case-insensitive text search in log message
0515 start_time: Filter logs from this ISO datetime (default: last 24 hours)
0516 end_time: Filter logs until this ISO datetime
0517
0518 Returns list of log entries (max 200) with: id, timestamp, app_name, instance_name,
0519 level, message, module, funcname, lineno
0520 """
0521 import logging as py_logging
0522
0523 @sync_to_async
0524 def fetch():
0525 qs = AppLog.objects.all().order_by('-timestamp')
0526
0527 if app_name:
0528 qs = qs.filter(app_name=app_name)
0529 if instance_name:
0530 qs = qs.filter(instance_name=instance_name)
0531 if execution_id:
0532 qs = qs.filter(extra_data__execution_id=execution_id)
0533
0534 if level:
0535 level_map = {
0536 'DEBUG': py_logging.DEBUG,
0537 'INFO': py_logging.INFO,
0538 'WARNING': py_logging.WARNING,
0539 'ERROR': py_logging.ERROR,
0540 'CRITICAL': py_logging.CRITICAL,
0541 }
0542 if level.upper() in level_map:
0543 qs = qs.filter(level__gte=level_map[level.upper()])
0544
0545 if search:
0546 qs = qs.filter(message__icontains=search)
0547
0548 start = _parse_time(start_time) or _default_start_time(24)
0549 end = _parse_time(end_time)
0550 qs = qs.filter(timestamp__gte=start)
0551 if end:
0552 qs = qs.filter(timestamp__lte=end)
0553
0554 params = []
0555 if instance_name:
0556 params.append(f"instance_name={instance_name}")
0557 if execution_id:
0558 params.append(f"execution_id={execution_id}")
0559 if level:
0560 params.append(f"level={level}")
0561 query_string = "&".join(params)
0562 url = _monitor_url(f"/logs/?{query_string}" if query_string else "/logs/")
0563
0564 MAX_ITEMS = 200
0565 total_count = qs.count()
0566 items = [
0567 {
0568 "id": log.id,
0569 "timestamp": log.timestamp.isoformat() if log.timestamp else None,
0570 "app_name": log.app_name,
0571 "instance_name": log.instance_name,
0572 "level": log.levelname,
0573 "message": log.message,
0574 "module": log.module,
0575 "funcname": log.funcname,
0576 "lineno": log.lineno,
0577 "extra_data": log.extra_data,
0578 }
0579 for log in qs[:MAX_ITEMS]
0580 ]
0581 return {
0582 "items": items,
0583 "total_count": total_count,
0584 "has_more": total_count > MAX_ITEMS,
0585 "monitor_urls": [
0586 {"title": "Logs List", "url": url},
0587 ],
0588 }
0589
0590 return await fetch()
0591
0592
0593 @mcp.tool()
0594 async def swf_get_log_entry(log_id: int) -> dict:
0595 """
0596 Get full details of a specific log entry.
0597
0598 Args:
0599 log_id: The log entry ID (from list_logs)
0600
0601 Returns: Full log entry with all fields including extra_data
0602 """
0603 @sync_to_async
0604 def fetch():
0605 try:
0606 log = AppLog.objects.get(id=log_id)
0607 return {
0608 "id": log.id,
0609 "timestamp": log.timestamp.isoformat() if log.timestamp else None,
0610 "app_name": log.app_name,
0611 "instance_name": log.instance_name,
0612 "level": log.levelname,
0613 "message": log.message,
0614 "module": log.module,
0615 "funcname": log.funcname,
0616 "lineno": log.lineno,
0617 "process": log.process,
0618 "thread": log.thread,
0619 "extra_data": log.extra_data,
0620 "monitor_urls": [
0621 {"title": "Log Detail", "url": _monitor_url(f"/logs/{log.id}/")},
0622 ],
0623 }
0624 except AppLog.DoesNotExist:
0625 return {"error": f"Log entry {log_id} not found."}
0626
0627 return await fetch()
0628
0629
0630
0631
0632
0633
0634 @mcp.tool()
0635 async def swf_kill_agent(name: str) -> dict:
0636 """
0637 Kill an agent process by sending SIGKILL to its PID.
0638
0639 Looks up the agent by instance_name, retrieves its pid and hostname,
0640 and sends SIGKILL if the agent is on the current host. Sets the agent's
0641 status and operational_state to EXITED, so it won't appear in default
0642 list_agents results.
0643
0644 Args:
0645 name: The exact agent instance name (e.g., 'daq_simulator-agent-wenauseic-308')
0646
0647 Returns:
0648 Success/failure status with details
0649 """
0650 import os
0651 import signal
0652 import socket
0653
0654 current_host = socket.gethostname()
0655
0656 @sync_to_async
0657 def do_kill():
0658 try:
0659 agent = SystemAgent.objects.get(instance_name=name)
0660 except SystemAgent.DoesNotExist:
0661 return {
0662 "success": False,
0663 "error": f"Agent '{name}' not found. Use swf_list_agents to see available agents.",
0664 }
0665
0666 pid = agent.pid
0667 hostname = agent.hostname
0668 old_state = agent.operational_state
0669 killed = False
0670 kill_error = None
0671
0672 if pid:
0673 if hostname and hostname != current_host:
0674 kill_error = f"Agent on '{hostname}', not '{current_host}' - cannot kill remotely"
0675 else:
0676 try:
0677 os.kill(pid, signal.SIGKILL)
0678 killed = True
0679 except ProcessLookupError:
0680 kill_error = f"Process {pid} not found (already dead)"
0681 except PermissionError:
0682 kill_error = f"Permission denied to kill process {pid}"
0683 except Exception as e:
0684 kill_error = str(e)
0685
0686 agent.operational_state = 'EXITED'
0687 agent.status = 'EXITED'
0688 agent.save(update_fields=['operational_state', 'status'])
0689
0690 logger.info(f"MCP kill_agent: '{name}' pid={pid} killed={killed} error={kill_error}")
0691
0692 return {
0693 "success": True,
0694 "name": name,
0695 "pid": pid,
0696 "hostname": hostname,
0697 "killed": killed,
0698 "kill_error": kill_error,
0699 "old_state": old_state,
0700 "new_state": "EXITED",
0701 }
0702
0703 return await do_kill()
0704
0705
0706
0707
0708
0709
0710 @mcp.tool()
0711 async def swf_check_agent_manager(username: str = None) -> dict:
0712 """
0713 Check if a user's agent manager daemon is alive.
0714
0715 The agent manager is a lightweight per-user daemon that listens for MCP commands
0716 to control testbed agents. It sends periodic heartbeats to the monitor.
0717
0718 Args:
0719 username: The username to check. If not provided, uses current user.
0720
0721 Returns:
0722 - alive: True if agent manager has recent heartbeat (within 5 minutes)
0723 - instance_name: The agent manager's instance name
0724 - namespace: Current testbed namespace (from config)
0725 - last_heartbeat: When it last checked in
0726 - control_queue: The queue to send commands to
0727 - agents_running: Whether testbed agents are running
0728 - how_to_start: Instructions if not alive
0729 """
0730 username = _get_username(username)
0731
0732 instance_name = f'agent-manager-{username}'
0733 control_queue = f'/queue/agent_control.{username}'
0734
0735 @sync_to_async
0736 def fetch():
0737 try:
0738 agent = SystemAgent.objects.get(instance_name=instance_name)
0739 now = timezone.now()
0740 recent_threshold = now - timedelta(minutes=5)
0741
0742 alive = (
0743 agent.last_heartbeat is not None and
0744 agent.last_heartbeat >= recent_threshold and
0745 agent.operational_state != 'EXITED'
0746 )
0747
0748 metadata = agent.metadata or {}
0749
0750 return {
0751 "alive": alive,
0752 "username": username,
0753 "instance_name": instance_name,
0754 "namespace": agent.namespace,
0755 "last_heartbeat": agent.last_heartbeat.isoformat() if agent.last_heartbeat else None,
0756 "operational_state": agent.operational_state,
0757 "status": agent.status,
0758 "description": agent.description,
0759 "control_queue": control_queue,
0760 "agents_running": metadata.get('agents_running', False),
0761 "supervisord_healthy": metadata.get('supervisord_healthy'),
0762 }
0763 except SystemAgent.DoesNotExist:
0764 return {
0765 "alive": False,
0766 "username": username,
0767 "instance_name": instance_name,
0768 "namespace": None,
0769 "last_heartbeat": None,
0770 "operational_state": None,
0771 "control_queue": control_queue,
0772 "agents_running": False,
0773 "how_to_start": f"Run 'testbed agent-manager' in {username}'s swf-testbed directory",
0774 }
0775
0776 return await fetch()
0777
0778
0779 @mcp.tool()
0780 async def swf_start_user_testbed(username: str = None, config_name: str = None) -> dict:
0781 """
0782 Start a user's testbed via their agent manager daemon.
0783
0784 Sends a start_testbed command to the user's agent manager, which then
0785 starts supervisord-managed agents. The agent manager must be running first.
0786
0787 Refuses to start if workflow agents are already running. User must call
0788 stop_user_testbed first to ensure clean slate.
0789
0790 Args:
0791 username: The username whose testbed to start. If not provided, uses current user.
0792 config_name: Config file name in workflows/ directory (default: testbed.toml).
0793 If not provided, agent manager uses its already-loaded config
0794 (from SWF_TESTBED_CONFIG env var or default).
0795
0796 Returns:
0797 Success/failure status. If agent manager is not running, provides instructions.
0798 """
0799 import json
0800 from datetime import datetime
0801
0802 username = _get_username(username)
0803
0804
0805 manager_status = await swf_check_agent_manager(username)
0806 if not manager_status.get('alive'):
0807 return {
0808 "success": False,
0809 "error": f"Agent manager for '{username}' is not running",
0810 "how_to_start": f"Run 'testbed agent-manager' in {username}'s swf-testbed directory",
0811 "username": username,
0812 }
0813
0814
0815 testbed_status = await swf_get_testbed_status(username)
0816 running_agents = [a['name'] for a in testbed_status.get('agents', []) if a.get('status') == 'running']
0817 if running_agents:
0818 return {
0819 "success": False,
0820 "error": f"Cannot start: workflow agents already running: {running_agents}",
0821 "how_to_fix": "Call stop_user_testbed first to stop existing agents",
0822 "username": username,
0823 "running_agents": running_agents,
0824 }
0825
0826 control_queue = f'/queue/agent_control.{username}'
0827
0828 @sync_to_async
0829 def send_command():
0830 from ..activemq_connection import ActiveMQConnectionManager
0831 mq = ActiveMQConnectionManager()
0832
0833 start_msg = {
0834 'command': 'start_testbed',
0835 'timestamp': datetime.now().isoformat(),
0836 'source': 'mcp'
0837 }
0838 if config_name:
0839 start_msg['config_name'] = config_name
0840
0841 config_desc = config_name or 'agent manager default'
0842
0843 if mq.send_message(control_queue, json.dumps(start_msg)):
0844 logger.info(
0845 f"MCP start_user_testbed: sent start_testbed for '{username}' "
0846 f"(config={config_desc})"
0847 )
0848 return {
0849 "success": True,
0850 "message": "Start command sent to agent manager",
0851 "username": username,
0852 "config": config_desc,
0853 "control_queue": control_queue,
0854 "note": "Agents will start asynchronously. Use swf_get_testbed_status to verify.",
0855 }
0856 else:
0857 return {
0858 "success": False,
0859 "error": "Failed to send start_testbed command to ActiveMQ",
0860 "username": username,
0861 }
0862
0863 return await send_command()
0864
0865
0866 @mcp.tool()
0867 async def swf_stop_user_testbed(username: str = None) -> dict:
0868 """
0869 Stop a user's testbed via their agent manager daemon.
0870
0871 Sends a stop_testbed command to the user's agent manager, which then
0872 stops all supervisord-managed agents.
0873
0874 Args:
0875 username: The username whose testbed to stop. If not provided, uses current user.
0876
0877 Returns:
0878 Success/failure status.
0879 """
0880 import json
0881 from datetime import datetime
0882
0883 username = _get_username(username)
0884
0885 manager_status = await swf_check_agent_manager(username)
0886 if not manager_status.get('alive'):
0887 return {
0888 "success": False,
0889 "error": f"Agent manager for '{username}' is not running",
0890 "username": username,
0891 "note": "If agents are still running, you can kill them directly with kill_agent()",
0892 }
0893
0894 control_queue = f'/queue/agent_control.{username}'
0895
0896 @sync_to_async
0897 def send_command():
0898 from ..activemq_connection import ActiveMQConnectionManager
0899
0900 msg = {
0901 'command': 'stop_testbed',
0902 'timestamp': datetime.now().isoformat(),
0903 'source': 'mcp'
0904 }
0905
0906 mq = ActiveMQConnectionManager()
0907 if mq.send_message(control_queue, json.dumps(msg)):
0908 logger.info(f"MCP stop_user_testbed: sent stop_testbed command for user '{username}'")
0909 return {
0910 "success": True,
0911 "message": f"Stop command sent to {username}'s agent manager",
0912 "username": username,
0913 "control_queue": control_queue,
0914 "note": "Agents will stop asynchronously. Use swf_list_agents to verify.",
0915 }
0916 else:
0917 return {
0918 "success": False,
0919 "error": "Failed to send message to ActiveMQ. Is the message broker running?",
0920 "username": username,
0921 }
0922
0923 return await send_command()
0924
0925
0926 @mcp.tool()
0927 async def swf_get_testbed_status(username: str = None) -> dict:
0928 """
0929 Get comprehensive status of a user's testbed.
0930
0931 Shows agent manager status, namespace, and all workflow agents with their
0932 current state (running/stopped based on heartbeat freshness).
0933
0934 Args:
0935 username: The username to check. If not provided, uses current user.
0936
0937 Returns:
0938 - agent_manager: Status of the agent manager daemon
0939 - namespace: Current testbed namespace
0940 - agents: List of workflow agents with status
0941 - summary: Quick counts of running/stopped agents
0942 """
0943 username = _get_username(username)
0944
0945 manager_status = await swf_check_agent_manager(username)
0946 namespace = manager_status.get('namespace')
0947
0948 @sync_to_async
0949 def fetch_agents():
0950 SystemAgent.mark_stale_agents()
0951
0952 now = timezone.now()
0953 healthy_threshold = now - timedelta(minutes=2)
0954
0955 agents_info = []
0956 running_count = 0
0957 stopped_count = 0
0958
0959 if namespace:
0960 agents = SystemAgent.objects.filter(
0961 namespace=namespace
0962 ).exclude(
0963 agent_type='agent_manager'
0964 ).exclude(
0965 operational_state='EXITED'
0966 ).order_by('-last_heartbeat')
0967
0968 for agent in agents:
0969 is_running = (
0970 agent.last_heartbeat and
0971 agent.last_heartbeat >= healthy_threshold
0972 )
0973 if is_running:
0974 running_count += 1
0975 else:
0976 stopped_count += 1
0977
0978 agents_info.append({
0979 'name': agent.instance_name,
0980 'type': agent.agent_type,
0981 'status': 'running' if is_running else 'stopped',
0982 'last_heartbeat': agent.last_heartbeat.isoformat() if agent.last_heartbeat else None,
0983 })
0984
0985 return {
0986 'agents': agents_info,
0987 'running': running_count,
0988 'stopped': stopped_count,
0989 }
0990
0991 agents_data = await fetch_agents()
0992
0993 alive = manager_status.get('alive', False)
0994 sv_healthy = manager_status.get('supervisord_healthy')
0995 manager_error = manager_status.get('status') == 'ERROR'
0996
0997 @sync_to_async
0998 def check_running_workflows():
0999 if not namespace:
1000 return 0
1001 return WorkflowExecution.objects.filter(
1002 namespace=namespace, status='running'
1003 ).count()
1004
1005 running_workflows = await check_running_workflows()
1006 has_agents = agents_data['running'] > 0
1007 ready = alive and not manager_error and has_agents and running_workflows == 0
1008
1009 result = {
1010 'username': username,
1011 'agent_manager': {
1012 'alive': alive,
1013 'namespace': namespace,
1014 'operational_state': manager_status.get('operational_state'),
1015 'status': manager_status.get('status'),
1016 'last_heartbeat': manager_status.get('last_heartbeat'),
1017 'supervisord_healthy': sv_healthy,
1018 },
1019 'agents': agents_data['agents'],
1020 'summary': {
1021 'running': agents_data['running'],
1022 'stopped': agents_data['stopped'],
1023 },
1024 'running_workflows': running_workflows,
1025 'ready': ready,
1026 }
1027
1028 if not alive:
1029 result['error'] = 'Agent manager is not running. Run /check-testbed to bootstrap infrastructure.'
1030 elif manager_error:
1031 result['error'] = f"Agent manager reports ERROR: {manager_status.get('description', 'unknown')}"
1032 elif running_workflows > 0:
1033 result['note'] = f'{running_workflows} workflow(s) currently running'
1034 elif has_agents:
1035 result['note'] = 'Testbed is idle and ready to run a workflow'
1036 else:
1037 result['note'] = 'No agents running. Start testbed first.'
1038
1039 return result