Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:42

0001 """
0002 System and infrastructure MCP tools.
0003 
0004 Includes: system state, agents, namespaces, logs, testbed management.
0005 """
0006 
0007 import logging
0008 from datetime import timedelta
0009 from django.utils import timezone
0010 from asgiref.sync import sync_to_async
0011 
0012 from mcp_server import mcp_server as mcp
0013 
0014 from ..models import SystemAgent, RunState, PersistentState, SystemStateEvent, AppLog
0015 from ..workflow_models import WorkflowExecution, WorkflowMessage, Namespace
0016 from .common import _parse_time, _default_start_time, _monitor_url, _get_testbed_config_path, _get_username
0017 
0018 logger = logging.getLogger(__name__)
0019 
0020 
0021 # -----------------------------------------------------------------------------
0022 # System State
0023 # -----------------------------------------------------------------------------
0024 
0025 @mcp.tool()
0026 async def swf_get_system_state(username: str = None) -> dict:
0027     """
0028     Get comprehensive system state including agents, executions, run states, and persistent state.
0029 
0030     Use this tool first to get a high-level view of the entire system before drilling
0031     into specific details. This is the starting point for understanding testbed health.
0032 
0033     Args:
0034         username: Username to get context for (reads their testbed.toml).
0035                   If not provided, uses SWF_HOME environment variable.
0036 
0037     Returns:
0038     - timestamp: current server time
0039     - user_context: namespace, workflow defaults from testbed.toml
0040     - agent_manager: status of user's agent manager daemon
0041     - workflow_runner: status of healthy DAQ_Simulator that can accept start_workflow
0042     - ready_to_run: boolean - True if workflow_runner is healthy
0043     - last_execution: most recent workflow execution for user's namespace
0044     - errors_last_hour: count of ERROR logs in user's namespace
0045     - agents: total, healthy (heartbeat <5min), unhealthy counts
0046     - executions: running count, completed in last hour
0047     - messages: count in last 10 minutes
0048     - run_states: current fast processing run states
0049     - persistent_state: system-wide persistent state (next IDs, etc.)
0050     - recent_events: last 10 system state events
0051     """
0052     import os
0053     from pathlib import Path
0054 
0055     username = _get_username(username)
0056     swf_home = os.getenv('SWF_HOME', f'/data/{username}/github')
0057 
0058     @sync_to_async
0059     def fetch():
0060         SystemAgent.mark_stale_agents()
0061 
0062         now = timezone.now()
0063         recent_threshold = now - timedelta(minutes=5)
0064 
0065         # User context from testbed config
0066         testbed_toml, config_source = _get_testbed_config_path()
0067         user_context = {
0068             "username": username,
0069             "namespace": None,
0070             "workflow_name": None,
0071             "config": None,
0072             "config_file": str(testbed_toml.name) if testbed_toml else None,
0073             "config_source": config_source,
0074         }
0075         if testbed_toml and testbed_toml.exists():
0076             try:
0077                 import tomllib
0078                 with open(testbed_toml, 'rb') as f:
0079                     toml_data = tomllib.load(f)
0080                 user_context["namespace"] = toml_data.get('testbed', {}).get('namespace')
0081                 workflow_section = toml_data.get('workflow', {})
0082                 user_context["workflow_name"] = workflow_section.get('name')
0083                 user_context["config"] = workflow_section.get('config')
0084             except Exception:
0085                 pass
0086 
0087         user_namespace = user_context.get("namespace")
0088 
0089         # Agent manager status
0090         agent_manager_name = f'agent-manager-{username}'
0091         agent_manager = {"status": "missing", "last_heartbeat": None}
0092         try:
0093             am = SystemAgent.objects.get(instance_name=agent_manager_name)
0094             if am.operational_state == 'EXITED':
0095                 agent_manager["status"] = "exited"
0096             elif am.last_heartbeat and am.last_heartbeat >= recent_threshold:
0097                 agent_manager["status"] = "healthy"
0098             else:
0099                 agent_manager["status"] = "unhealthy"
0100             agent_manager["last_heartbeat"] = am.last_heartbeat.isoformat() if am.last_heartbeat else None
0101         except SystemAgent.DoesNotExist:
0102             pass
0103 
0104         # Workflow runner status
0105         workflow_runner = {"status": "missing", "name": None, "last_heartbeat": None}
0106         runner_qs = SystemAgent.objects.filter(
0107             agent_type__in=['DAQ_Simulator', 'workflow_runner'],
0108             namespace=user_namespace,
0109             last_heartbeat__gte=recent_threshold,
0110         ).exclude(operational_state='EXITED').order_by('-last_heartbeat')
0111 
0112         if runner_qs.exists():
0113             runner = runner_qs.first()
0114             workflow_runner["status"] = "healthy"
0115             workflow_runner["name"] = runner.instance_name
0116             workflow_runner["last_heartbeat"] = runner.last_heartbeat.isoformat() if runner.last_heartbeat else None
0117         else:
0118             any_runner = SystemAgent.objects.filter(
0119                 agent_type__in=['DAQ_Simulator', 'workflow_runner'],
0120                 namespace=user_namespace,
0121             ).exclude(operational_state='EXITED').first()
0122             if any_runner:
0123                 workflow_runner["status"] = "unhealthy"
0124                 workflow_runner["name"] = any_runner.instance_name
0125                 workflow_runner["last_heartbeat"] = any_runner.last_heartbeat.isoformat() if any_runner.last_heartbeat else None
0126 
0127         ready_to_run = workflow_runner["status"] == "healthy"
0128 
0129         # Last execution for user's namespace
0130         last_execution = None
0131         if user_namespace:
0132             last_exec = WorkflowExecution.objects.filter(
0133                 namespace=user_namespace
0134             ).order_by('-start_time').first()
0135             if last_exec:
0136                 last_execution = {
0137                     "execution_id": last_exec.execution_id,
0138                     "status": last_exec.status,
0139                     "start_time": last_exec.start_time.isoformat() if last_exec.start_time else None,
0140                     "end_time": last_exec.end_time.isoformat() if last_exec.end_time else None,
0141                 }
0142 
0143         # Errors in last hour
0144         errors_last_hour = 0
0145         if user_namespace:
0146             import logging as py_logging
0147             errors_last_hour = AppLog.objects.filter(
0148                 level__gte=py_logging.ERROR,
0149                 timestamp__gte=now - timedelta(hours=1),
0150                 extra_data__namespace=user_namespace,
0151             ).count()
0152 
0153         # Global agent stats
0154         total_agents = SystemAgent.objects.count()
0155         exited_agents = SystemAgent.objects.filter(operational_state='EXITED').count()
0156         active_agents = total_agents - exited_agents
0157 
0158         healthy_agents = SystemAgent.objects.filter(
0159             last_heartbeat__gte=recent_threshold,
0160             status='OK'
0161         ).exclude(operational_state='EXITED').count()
0162 
0163         # Execution stats
0164         running_executions = WorkflowExecution.objects.filter(status='running').count()
0165         recent_completed = WorkflowExecution.objects.filter(
0166             status='completed',
0167             end_time__gte=now - timedelta(hours=1)
0168         ).count()
0169 
0170         # Message stats
0171         recent_messages = WorkflowMessage.objects.filter(
0172             sent_at__gte=now - timedelta(minutes=10)
0173         ).count()
0174 
0175         # Run states
0176         run_states = [
0177             {
0178                 "run_number": rs.run_number,
0179                 "phase": rs.phase,
0180                 "state": rs.state,
0181                 "substate": rs.substate,
0182                 "active_workers": rs.active_worker_count,
0183                 "target_workers": rs.target_worker_count,
0184                 "slices_queued": rs.slices_queued,
0185                 "slices_processing": rs.slices_processing,
0186                 "slices_completed": rs.slices_completed,
0187                 "slices_failed": rs.slices_failed,
0188             }
0189             for rs in RunState.objects.all().order_by('-run_number')[:5]
0190         ]
0191 
0192         # Persistent state
0193         persistent_state = PersistentState.get_state()
0194 
0195         # Recent system events
0196         recent_events = [
0197             {
0198                 "timestamp": e.timestamp.isoformat(),
0199                 "run_number": e.run_number,
0200                 "event_type": e.event_type,
0201                 "state": e.state,
0202             }
0203             for e in SystemStateEvent.objects.order_by('-timestamp')[:10]
0204         ]
0205 
0206         return {
0207             "timestamp": now.isoformat(),
0208             "user_context": user_context,
0209             "agent_manager": agent_manager,
0210             "workflow_runner": workflow_runner,
0211             "ready_to_run": ready_to_run,
0212             "last_execution": last_execution,
0213             "errors_last_hour": errors_last_hour,
0214             "agents": {
0215                 "total": total_agents,
0216                 "active": active_agents,
0217                 "exited": exited_agents,
0218                 "healthy": healthy_agents,
0219                 "unhealthy": active_agents - healthy_agents,
0220             },
0221             "executions": {
0222                 "running": running_executions,
0223                 "completed_last_hour": recent_completed,
0224             },
0225             "messages_last_10min": recent_messages,
0226             "run_states": run_states,
0227             "persistent_state": persistent_state,
0228             "recent_events": recent_events,
0229         }
0230 
0231     return await fetch()
0232 
0233 
0234 # -----------------------------------------------------------------------------
0235 # Agents
0236 # -----------------------------------------------------------------------------
0237 
0238 @mcp.tool()
0239 async def swf_list_agents(
0240     namespace: str = None,
0241     agent_type: str = None,
0242     status: str = None,
0243     execution_id: str = None,
0244     start_time: str = None,
0245     end_time: str = None,
0246 ) -> list:
0247     """
0248     List registered agents with filtering options.
0249 
0250     Agents are processes that participate in workflows (DAQ simulator, data agent,
0251     processing agent, fast monitoring agent). Each sends periodic heartbeats.
0252 
0253     By default, excludes EXITED agents. Use status='EXITED' to see only exited,
0254     or status='all' to see all agents regardless of status.
0255 
0256     Args:
0257         namespace: Filter to agents in this namespace (e.g., 'torre1', 'wenauseic')
0258         agent_type: Filter by type: 'daqsim', 'data', 'processing', 'fastmon', 'workflow_runner'
0259         status: Filter by status: 'OK', 'WARNING', 'ERROR', 'UNKNOWN', 'EXITED', or 'all'.
0260                 Default (None) excludes EXITED agents.
0261         execution_id: Filter to agents that participated in this workflow execution
0262         start_time: Filter to agents with heartbeat >= this ISO datetime
0263         end_time: Filter to agents with heartbeat <= this ISO datetime
0264 
0265     Returns list of agents with: name, agent_type, status, operational_state, namespace,
0266     last_heartbeat, workflow_enabled, total_stf_processed
0267     """
0268     @sync_to_async
0269     def fetch():
0270         SystemAgent.mark_stale_agents()
0271 
0272         qs = SystemAgent.objects.all().order_by('-last_heartbeat')
0273 
0274         if namespace:
0275             qs = qs.filter(namespace=namespace)
0276         if agent_type:
0277             qs = qs.filter(agent_type=agent_type)
0278 
0279         if status is None:
0280             qs = qs.exclude(status='EXITED')
0281         elif status.lower() != 'all':
0282             qs = qs.filter(status__iexact=status)
0283 
0284         start = _parse_time(start_time)
0285         end = _parse_time(end_time)
0286         if start:
0287             qs = qs.filter(last_heartbeat__gte=start)
0288         if end:
0289             qs = qs.filter(last_heartbeat__lte=end)
0290 
0291         if execution_id:
0292             agent_names = WorkflowMessage.objects.filter(
0293                 execution_id=execution_id
0294             ).values_list('sender_agent', flat=True).distinct()
0295             qs = qs.filter(instance_name__in=agent_names)
0296 
0297         params = []
0298         if namespace:
0299             params.append(f"namespace={namespace}")
0300         if agent_type:
0301             params.append(f"agent_type={agent_type}")
0302         if status and status.lower() != 'all':
0303             params.append(f"status={status}")
0304         query_string = "&".join(params)
0305         url = _monitor_url(f"/workflow/agents/?{query_string}" if query_string else "/workflow/agents/")
0306 
0307         MAX_ITEMS = 100
0308         total_count = qs.count()
0309         items = [
0310             {
0311                 "name": a.instance_name,
0312                 "agent_type": a.agent_type,
0313                 "status": a.status,
0314                 "operational_state": a.operational_state,
0315                 "namespace": a.namespace,
0316                 "last_heartbeat": a.last_heartbeat.isoformat() if a.last_heartbeat else None,
0317                 "workflow_enabled": a.workflow_enabled,
0318                 "total_stf_processed": a.total_stf_processed,
0319             }
0320             for a in qs[:MAX_ITEMS]
0321         ]
0322         return {
0323             "items": items,
0324             "total_count": total_count,
0325             "has_more": total_count > MAX_ITEMS,
0326             "monitor_urls": [
0327                 {"title": "Agents List", "url": url},
0328             ],
0329         }
0330 
0331     return await fetch()
0332 
0333 
0334 @mcp.tool()
0335 async def swf_get_agent(name: str) -> dict:
0336     """
0337     Get detailed information about a specific agent.
0338 
0339     Use swf_list_agents first to see available agent names if you don't know them.
0340 
0341     Args:
0342         name: The exact agent instance name (e.g., 'daq_simulator_torre1')
0343 
0344     Returns: name, agent_type, status, namespace, last_heartbeat, description,
0345     workflow_enabled, current_stf_count, total_stf_processed, metadata
0346     """
0347     @sync_to_async
0348     def fetch():
0349         try:
0350             a = SystemAgent.objects.get(instance_name=name)
0351             return {
0352                 "name": a.instance_name,
0353                 "agent_type": a.agent_type,
0354                 "status": a.status,
0355                 "namespace": a.namespace,
0356                 "last_heartbeat": a.last_heartbeat.isoformat() if a.last_heartbeat else None,
0357                 "description": a.description,
0358                 "workflow_enabled": a.workflow_enabled,
0359                 "current_stf_count": a.current_stf_count,
0360                 "total_stf_processed": a.total_stf_processed,
0361                 "last_stf_processed": a.last_stf_processed.isoformat() if a.last_stf_processed else None,
0362                 "metadata": a.metadata,
0363                 "monitor_urls": [
0364                     {"title": "Agent Detail", "url": _monitor_url(f"/workflow/agents/{a.instance_name}/")},
0365                 ],
0366             }
0367         except SystemAgent.DoesNotExist:
0368             return {"error": f"Agent '{name}' not found. Use swf_list_agents to see available agents."}
0369 
0370     return await fetch()
0371 
0372 
0373 # -----------------------------------------------------------------------------
0374 # Namespaces
0375 # -----------------------------------------------------------------------------
0376 
0377 @mcp.tool()
0378 async def swf_list_namespaces() -> list:
0379     """
0380     List all testbed namespaces.
0381 
0382     Namespaces provide isolation between different users' workflow runs.
0383     Each namespace has its own set of agents and workflow executions.
0384 
0385     Returns list of namespaces with: name, owner, description
0386     """
0387     @sync_to_async
0388     def fetch():
0389         qs = Namespace.objects.all().order_by('name')
0390         MAX_ITEMS = 100
0391         total_count = qs.count()
0392         items = [
0393             {
0394                 "name": n.name,
0395                 "owner": n.owner,
0396                 "description": n.description,
0397             }
0398             for n in qs[:MAX_ITEMS]
0399         ]
0400         return {
0401             "items": items,
0402             "total_count": total_count,
0403             "has_more": total_count > MAX_ITEMS,
0404             "monitor_urls": [
0405                 {"title": "Namespaces List", "url": _monitor_url("/namespaces/")},
0406             ],
0407         }
0408 
0409     return await fetch()
0410 
0411 
0412 @mcp.tool()
0413 async def swf_get_namespace(
0414     namespace: str,
0415     start_time: str = None,
0416     end_time: str = None,
0417 ) -> dict:
0418     """
0419     Get detailed information about a namespace including activity counts.
0420 
0421     Args:
0422         namespace: The namespace name (required)
0423         start_time: Count activity from this ISO datetime (default: last 24 hours)
0424         end_time: Count activity until this ISO datetime (default: now)
0425 
0426     Returns: name, owner, description, agent_count, execution_count, message_count,
0427     active_users (users who ran executions in the time range)
0428     """
0429     @sync_to_async
0430     def fetch():
0431         try:
0432             ns = Namespace.objects.get(name=namespace)
0433             ns_info = {
0434                 "name": ns.name,
0435                 "owner": ns.owner,
0436                 "description": ns.description,
0437             }
0438         except Namespace.DoesNotExist:
0439             ns_info = {
0440                 "name": namespace,
0441                 "owner": None,
0442                 "description": None,
0443             }
0444 
0445         start = _parse_time(start_time) or _default_start_time(24)
0446         end = _parse_time(end_time) or timezone.now()
0447 
0448         agent_count = SystemAgent.objects.filter(namespace=namespace).count()
0449 
0450         execution_qs = WorkflowExecution.objects.filter(
0451             namespace=namespace,
0452             start_time__gte=start,
0453             start_time__lte=end,
0454         )
0455         execution_count = execution_qs.count()
0456         active_users = list(execution_qs.values_list('executed_by', flat=True).distinct())
0457 
0458         message_count = WorkflowMessage.objects.filter(
0459             namespace=namespace,
0460             sent_at__gte=start,
0461             sent_at__lte=end,
0462         ).count()
0463 
0464         return {
0465             **ns_info,
0466             "agent_count": agent_count,
0467             "execution_count": execution_count,
0468             "message_count": message_count,
0469             "active_users": active_users,
0470             "time_range": {
0471                 "start": start.isoformat(),
0472                 "end": end.isoformat(),
0473             },
0474             "monitor_urls": [
0475                 {"title": "Namespace Detail", "url": _monitor_url(f"/workflow/namespaces/{namespace}/")},
0476             ],
0477         }
0478 
0479     return await fetch()
0480 
0481 
0482 # -----------------------------------------------------------------------------
0483 # Logs
0484 # -----------------------------------------------------------------------------
0485 
0486 @mcp.tool()
0487 async def swf_list_logs(
0488     app_name: str = None,
0489     instance_name: str = None,
0490     execution_id: str = None,
0491     level: str = None,
0492     search: str = None,
0493     start_time: str = None,
0494     end_time: str = None,
0495 ) -> list:
0496     """
0497     List application log entries with filtering.
0498 
0499     All agents log to the central database via Python's logging module.
0500     Use this tool to discover errors, debug issues, and understand system behavior.
0501 
0502     DIAGNOSTIC USE CASES:
0503     - Workflow logs: list_logs(execution_id='stf_datataking-user-0044')
0504     - Debug a specific agent: list_logs(instance_name='daq_simulator-agent-user-123')
0505     - Find all errors: list_logs(level='ERROR')
0506     - Search for specific issues: list_logs(search='connection failed')
0507 
0508     Args:
0509         app_name: Filter by application name (e.g., 'daq_simulator', 'data_agent')
0510         instance_name: Filter by agent instance name
0511         execution_id: Filter by workflow execution ID (e.g., 'stf_datataking-wenauseic-0044')
0512         level: Minimum log level - returns this level and higher severity.
0513                DEBUG (all), INFO, WARNING, ERROR, CRITICAL
0514         search: Case-insensitive text search in log message
0515         start_time: Filter logs from this ISO datetime (default: last 24 hours)
0516         end_time: Filter logs until this ISO datetime
0517 
0518     Returns list of log entries (max 200) with: id, timestamp, app_name, instance_name,
0519     level, message, module, funcname, lineno
0520     """
0521     import logging as py_logging
0522 
0523     @sync_to_async
0524     def fetch():
0525         qs = AppLog.objects.all().order_by('-timestamp')
0526 
0527         if app_name:
0528             qs = qs.filter(app_name=app_name)
0529         if instance_name:
0530             qs = qs.filter(instance_name=instance_name)
0531         if execution_id:
0532             qs = qs.filter(extra_data__execution_id=execution_id)
0533 
0534         if level:
0535             level_map = {
0536                 'DEBUG': py_logging.DEBUG,
0537                 'INFO': py_logging.INFO,
0538                 'WARNING': py_logging.WARNING,
0539                 'ERROR': py_logging.ERROR,
0540                 'CRITICAL': py_logging.CRITICAL,
0541             }
0542             if level.upper() in level_map:
0543                 qs = qs.filter(level__gte=level_map[level.upper()])
0544 
0545         if search:
0546             qs = qs.filter(message__icontains=search)
0547 
0548         start = _parse_time(start_time) or _default_start_time(24)
0549         end = _parse_time(end_time)
0550         qs = qs.filter(timestamp__gte=start)
0551         if end:
0552             qs = qs.filter(timestamp__lte=end)
0553 
0554         params = []
0555         if instance_name:
0556             params.append(f"instance_name={instance_name}")
0557         if execution_id:
0558             params.append(f"execution_id={execution_id}")
0559         if level:
0560             params.append(f"level={level}")
0561         query_string = "&".join(params)
0562         url = _monitor_url(f"/logs/?{query_string}" if query_string else "/logs/")
0563 
0564         MAX_ITEMS = 200
0565         total_count = qs.count()
0566         items = [
0567             {
0568                 "id": log.id,
0569                 "timestamp": log.timestamp.isoformat() if log.timestamp else None,
0570                 "app_name": log.app_name,
0571                 "instance_name": log.instance_name,
0572                 "level": log.levelname,
0573                 "message": log.message,
0574                 "module": log.module,
0575                 "funcname": log.funcname,
0576                 "lineno": log.lineno,
0577                 "extra_data": log.extra_data,
0578             }
0579             for log in qs[:MAX_ITEMS]
0580         ]
0581         return {
0582             "items": items,
0583             "total_count": total_count,
0584             "has_more": total_count > MAX_ITEMS,
0585             "monitor_urls": [
0586                 {"title": "Logs List", "url": url},
0587             ],
0588         }
0589 
0590     return await fetch()
0591 
0592 
0593 @mcp.tool()
0594 async def swf_get_log_entry(log_id: int) -> dict:
0595     """
0596     Get full details of a specific log entry.
0597 
0598     Args:
0599         log_id: The log entry ID (from list_logs)
0600 
0601     Returns: Full log entry with all fields including extra_data
0602     """
0603     @sync_to_async
0604     def fetch():
0605         try:
0606             log = AppLog.objects.get(id=log_id)
0607             return {
0608                 "id": log.id,
0609                 "timestamp": log.timestamp.isoformat() if log.timestamp else None,
0610                 "app_name": log.app_name,
0611                 "instance_name": log.instance_name,
0612                 "level": log.levelname,
0613                 "message": log.message,
0614                 "module": log.module,
0615                 "funcname": log.funcname,
0616                 "lineno": log.lineno,
0617                 "process": log.process,
0618                 "thread": log.thread,
0619                 "extra_data": log.extra_data,
0620                 "monitor_urls": [
0621                     {"title": "Log Detail", "url": _monitor_url(f"/logs/{log.id}/")},
0622                 ],
0623             }
0624         except AppLog.DoesNotExist:
0625             return {"error": f"Log entry {log_id} not found."}
0626 
0627     return await fetch()
0628 
0629 
0630 # -----------------------------------------------------------------------------
0631 # Agent Management
0632 # -----------------------------------------------------------------------------
0633 
0634 @mcp.tool()
0635 async def swf_kill_agent(name: str) -> dict:
0636     """
0637     Kill an agent process by sending SIGKILL to its PID.
0638 
0639     Looks up the agent by instance_name, retrieves its pid and hostname,
0640     and sends SIGKILL if the agent is on the current host. Sets the agent's
0641     status and operational_state to EXITED, so it won't appear in default
0642     list_agents results.
0643 
0644     Args:
0645         name: The exact agent instance name (e.g., 'daq_simulator-agent-wenauseic-308')
0646 
0647     Returns:
0648         Success/failure status with details
0649     """
0650     import os
0651     import signal
0652     import socket
0653 
0654     current_host = socket.gethostname()
0655 
0656     @sync_to_async
0657     def do_kill():
0658         try:
0659             agent = SystemAgent.objects.get(instance_name=name)
0660         except SystemAgent.DoesNotExist:
0661             return {
0662                 "success": False,
0663                 "error": f"Agent '{name}' not found. Use swf_list_agents to see available agents.",
0664             }
0665 
0666         pid = agent.pid
0667         hostname = agent.hostname
0668         old_state = agent.operational_state
0669         killed = False
0670         kill_error = None
0671 
0672         if pid:
0673             if hostname and hostname != current_host:
0674                 kill_error = f"Agent on '{hostname}', not '{current_host}' - cannot kill remotely"
0675             else:
0676                 try:
0677                     os.kill(pid, signal.SIGKILL)
0678                     killed = True
0679                 except ProcessLookupError:
0680                     kill_error = f"Process {pid} not found (already dead)"
0681                 except PermissionError:
0682                     kill_error = f"Permission denied to kill process {pid}"
0683                 except Exception as e:
0684                     kill_error = str(e)
0685 
0686         agent.operational_state = 'EXITED'
0687         agent.status = 'EXITED'
0688         agent.save(update_fields=['operational_state', 'status'])
0689 
0690         logger.info(f"MCP kill_agent: '{name}' pid={pid} killed={killed} error={kill_error}")
0691 
0692         return {
0693             "success": True,
0694             "name": name,
0695             "pid": pid,
0696             "hostname": hostname,
0697             "killed": killed,
0698             "kill_error": kill_error,
0699             "old_state": old_state,
0700             "new_state": "EXITED",
0701         }
0702 
0703     return await do_kill()
0704 
0705 
0706 # -----------------------------------------------------------------------------
0707 # User Testbed Management
0708 # -----------------------------------------------------------------------------
0709 
0710 @mcp.tool()
0711 async def swf_check_agent_manager(username: str = None) -> dict:
0712     """
0713     Check if a user's agent manager daemon is alive.
0714 
0715     The agent manager is a lightweight per-user daemon that listens for MCP commands
0716     to control testbed agents. It sends periodic heartbeats to the monitor.
0717 
0718     Args:
0719         username: The username to check. If not provided, uses current user.
0720 
0721     Returns:
0722         - alive: True if agent manager has recent heartbeat (within 5 minutes)
0723         - instance_name: The agent manager's instance name
0724         - namespace: Current testbed namespace (from config)
0725         - last_heartbeat: When it last checked in
0726         - control_queue: The queue to send commands to
0727         - agents_running: Whether testbed agents are running
0728         - how_to_start: Instructions if not alive
0729     """
0730     username = _get_username(username)
0731 
0732     instance_name = f'agent-manager-{username}'
0733     control_queue = f'/queue/agent_control.{username}'
0734 
0735     @sync_to_async
0736     def fetch():
0737         try:
0738             agent = SystemAgent.objects.get(instance_name=instance_name)
0739             now = timezone.now()
0740             recent_threshold = now - timedelta(minutes=5)
0741 
0742             alive = (
0743                 agent.last_heartbeat is not None and
0744                 agent.last_heartbeat >= recent_threshold and
0745                 agent.operational_state != 'EXITED'
0746             )
0747 
0748             metadata = agent.metadata or {}
0749 
0750             return {
0751                 "alive": alive,
0752                 "username": username,
0753                 "instance_name": instance_name,
0754                 "namespace": agent.namespace,
0755                 "last_heartbeat": agent.last_heartbeat.isoformat() if agent.last_heartbeat else None,
0756                 "operational_state": agent.operational_state,
0757                 "status": agent.status,
0758                 "description": agent.description,
0759                 "control_queue": control_queue,
0760                 "agents_running": metadata.get('agents_running', False),
0761                 "supervisord_healthy": metadata.get('supervisord_healthy'),
0762             }
0763         except SystemAgent.DoesNotExist:
0764             return {
0765                 "alive": False,
0766                 "username": username,
0767                 "instance_name": instance_name,
0768                 "namespace": None,
0769                 "last_heartbeat": None,
0770                 "operational_state": None,
0771                 "control_queue": control_queue,
0772                 "agents_running": False,
0773                 "how_to_start": f"Run 'testbed agent-manager' in {username}'s swf-testbed directory",
0774             }
0775 
0776     return await fetch()
0777 
0778 
0779 @mcp.tool()
0780 async def swf_start_user_testbed(username: str = None, config_name: str = None) -> dict:
0781     """
0782     Start a user's testbed via their agent manager daemon.
0783 
0784     Sends a start_testbed command to the user's agent manager, which then
0785     starts supervisord-managed agents. The agent manager must be running first.
0786 
0787     Refuses to start if workflow agents are already running. User must call
0788     stop_user_testbed first to ensure clean slate.
0789 
0790     Args:
0791         username: The username whose testbed to start. If not provided, uses current user.
0792         config_name: Config file name in workflows/ directory (default: testbed.toml).
0793                      If not provided, agent manager uses its already-loaded config
0794                      (from SWF_TESTBED_CONFIG env var or default).
0795 
0796     Returns:
0797         Success/failure status. If agent manager is not running, provides instructions.
0798     """
0799     import json
0800     from datetime import datetime
0801 
0802     username = _get_username(username)
0803 
0804     # First check if agent manager is alive
0805     manager_status = await swf_check_agent_manager(username)
0806     if not manager_status.get('alive'):
0807         return {
0808             "success": False,
0809             "error": f"Agent manager for '{username}' is not running",
0810             "how_to_start": f"Run 'testbed agent-manager' in {username}'s swf-testbed directory",
0811             "username": username,
0812         }
0813 
0814     # Check if workflow agents are already running
0815     testbed_status = await swf_get_testbed_status(username)
0816     running_agents = [a['name'] for a in testbed_status.get('agents', []) if a.get('status') == 'running']
0817     if running_agents:
0818         return {
0819             "success": False,
0820             "error": f"Cannot start: workflow agents already running: {running_agents}",
0821             "how_to_fix": "Call stop_user_testbed first to stop existing agents",
0822             "username": username,
0823             "running_agents": running_agents,
0824         }
0825 
0826     control_queue = f'/queue/agent_control.{username}'
0827 
0828     @sync_to_async
0829     def send_command():
0830         from ..activemq_connection import ActiveMQConnectionManager
0831         mq = ActiveMQConnectionManager()
0832 
0833         start_msg = {
0834             'command': 'start_testbed',
0835             'timestamp': datetime.now().isoformat(),
0836             'source': 'mcp'
0837         }
0838         if config_name:
0839             start_msg['config_name'] = config_name
0840 
0841         config_desc = config_name or 'agent manager default'
0842 
0843         if mq.send_message(control_queue, json.dumps(start_msg)):
0844             logger.info(
0845                 f"MCP start_user_testbed: sent start_testbed for '{username}' "
0846                 f"(config={config_desc})"
0847             )
0848             return {
0849                 "success": True,
0850                 "message": "Start command sent to agent manager",
0851                 "username": username,
0852                 "config": config_desc,
0853                 "control_queue": control_queue,
0854                 "note": "Agents will start asynchronously. Use swf_get_testbed_status to verify.",
0855             }
0856         else:
0857             return {
0858                 "success": False,
0859                 "error": "Failed to send start_testbed command to ActiveMQ",
0860                 "username": username,
0861             }
0862 
0863     return await send_command()
0864 
0865 
0866 @mcp.tool()
0867 async def swf_stop_user_testbed(username: str = None) -> dict:
0868     """
0869     Stop a user's testbed via their agent manager daemon.
0870 
0871     Sends a stop_testbed command to the user's agent manager, which then
0872     stops all supervisord-managed agents.
0873 
0874     Args:
0875         username: The username whose testbed to stop. If not provided, uses current user.
0876 
0877     Returns:
0878         Success/failure status.
0879     """
0880     import json
0881     from datetime import datetime
0882 
0883     username = _get_username(username)
0884 
0885     manager_status = await swf_check_agent_manager(username)
0886     if not manager_status.get('alive'):
0887         return {
0888             "success": False,
0889             "error": f"Agent manager for '{username}' is not running",
0890             "username": username,
0891             "note": "If agents are still running, you can kill them directly with kill_agent()",
0892         }
0893 
0894     control_queue = f'/queue/agent_control.{username}'
0895 
0896     @sync_to_async
0897     def send_command():
0898         from ..activemq_connection import ActiveMQConnectionManager
0899 
0900         msg = {
0901             'command': 'stop_testbed',
0902             'timestamp': datetime.now().isoformat(),
0903             'source': 'mcp'
0904         }
0905 
0906         mq = ActiveMQConnectionManager()
0907         if mq.send_message(control_queue, json.dumps(msg)):
0908             logger.info(f"MCP stop_user_testbed: sent stop_testbed command for user '{username}'")
0909             return {
0910                 "success": True,
0911                 "message": f"Stop command sent to {username}'s agent manager",
0912                 "username": username,
0913                 "control_queue": control_queue,
0914                 "note": "Agents will stop asynchronously. Use swf_list_agents to verify.",
0915             }
0916         else:
0917             return {
0918                 "success": False,
0919                 "error": "Failed to send message to ActiveMQ. Is the message broker running?",
0920                 "username": username,
0921             }
0922 
0923     return await send_command()
0924 
0925 
0926 @mcp.tool()
0927 async def swf_get_testbed_status(username: str = None) -> dict:
0928     """
0929     Get comprehensive status of a user's testbed.
0930 
0931     Shows agent manager status, namespace, and all workflow agents with their
0932     current state (running/stopped based on heartbeat freshness).
0933 
0934     Args:
0935         username: The username to check. If not provided, uses current user.
0936 
0937     Returns:
0938         - agent_manager: Status of the agent manager daemon
0939         - namespace: Current testbed namespace
0940         - agents: List of workflow agents with status
0941         - summary: Quick counts of running/stopped agents
0942     """
0943     username = _get_username(username)
0944 
0945     manager_status = await swf_check_agent_manager(username)
0946     namespace = manager_status.get('namespace')
0947 
0948     @sync_to_async
0949     def fetch_agents():
0950         SystemAgent.mark_stale_agents()
0951 
0952         now = timezone.now()
0953         healthy_threshold = now - timedelta(minutes=2)
0954 
0955         agents_info = []
0956         running_count = 0
0957         stopped_count = 0
0958 
0959         if namespace:
0960             agents = SystemAgent.objects.filter(
0961                 namespace=namespace
0962             ).exclude(
0963                 agent_type='agent_manager'
0964             ).exclude(
0965                 operational_state='EXITED'
0966             ).order_by('-last_heartbeat')
0967 
0968             for agent in agents:
0969                 is_running = (
0970                     agent.last_heartbeat and
0971                     agent.last_heartbeat >= healthy_threshold
0972                 )
0973                 if is_running:
0974                     running_count += 1
0975                 else:
0976                     stopped_count += 1
0977 
0978                 agents_info.append({
0979                     'name': agent.instance_name,
0980                     'type': agent.agent_type,
0981                     'status': 'running' if is_running else 'stopped',
0982                     'last_heartbeat': agent.last_heartbeat.isoformat() if agent.last_heartbeat else None,
0983                 })
0984 
0985         return {
0986             'agents': agents_info,
0987             'running': running_count,
0988             'stopped': stopped_count,
0989         }
0990 
0991     agents_data = await fetch_agents()
0992 
0993     alive = manager_status.get('alive', False)
0994     sv_healthy = manager_status.get('supervisord_healthy')
0995     manager_error = manager_status.get('status') == 'ERROR'
0996 
0997     @sync_to_async
0998     def check_running_workflows():
0999         if not namespace:
1000             return 0
1001         return WorkflowExecution.objects.filter(
1002             namespace=namespace, status='running'
1003         ).count()
1004 
1005     running_workflows = await check_running_workflows()
1006     has_agents = agents_data['running'] > 0
1007     ready = alive and not manager_error and has_agents and running_workflows == 0
1008 
1009     result = {
1010         'username': username,
1011         'agent_manager': {
1012             'alive': alive,
1013             'namespace': namespace,
1014             'operational_state': manager_status.get('operational_state'),
1015             'status': manager_status.get('status'),
1016             'last_heartbeat': manager_status.get('last_heartbeat'),
1017             'supervisord_healthy': sv_healthy,
1018         },
1019         'agents': agents_data['agents'],
1020         'summary': {
1021             'running': agents_data['running'],
1022             'stopped': agents_data['stopped'],
1023         },
1024         'running_workflows': running_workflows,
1025         'ready': ready,
1026     }
1027 
1028     if not alive:
1029         result['error'] = 'Agent manager is not running. Run /check-testbed to bootstrap infrastructure.'
1030     elif manager_error:
1031         result['error'] = f"Agent manager reports ERROR: {manager_status.get('description', 'unknown')}"
1032     elif running_workflows > 0:
1033         result['note'] = f'{running_workflows} workflow(s) currently running'
1034     elif has_agents:
1035         result['note'] = 'Testbed is idle and ready to run a workflow'
1036     else:
1037         result['note'] = 'No agents running. Start testbed first.'
1038 
1039     return result