File indexing completed on 2026-04-27 07:41:45
0001 import typer
0002 import shutil
0003 from pathlib import Path
0004 import subprocess
0005 import os
0006 import sys
0007 import psutil
0008
0009 app = typer.Typer(help="ePIC Streaming Workflow Testbed CLI")
0010
0011 SUPERVISORD_CONF_TEMPLATE = Path(__file__).parent.parent.parent / "supervisord.conf"
0012
0013 @app.command()
0014 def init():
0015 """
0016 Initializes the testbed environment by creating a supervisord.conf file
0017 and a logs directory.
0018 """
0019 print("Initializing testbed environment...")
0020
0021
0022 logs_dir = Path("logs")
0023 logs_dir.mkdir(exist_ok=True)
0024 print(f"Created directory: {logs_dir.resolve()}")
0025
0026
0027 dest_conf = Path("supervisord.conf")
0028 if dest_conf.exists():
0029 print(f"{dest_conf} already exists. Skipping.")
0030 else:
0031 shutil.copy(SUPERVISORD_CONF_TEMPLATE, dest_conf)
0032 print(f"Created {dest_conf}")
0033
0034 def _setup_environment():
0035 """Set up the SWF_HOME environment variable for the current process and children."""
0036 script_dir = Path(__file__).parent.parent.parent.absolute()
0037 swf_home = script_dir.parent
0038 os.environ["SWF_HOME"] = str(swf_home)
0039 print(f"SWF_HOME set to: {swf_home}")
0040 return swf_home
0041
0042 @app.command()
0043 def start():
0044 """
0045 Starts the testbed services using supervisord and docker compose.
0046 """
0047
0048 _setup_environment()
0049
0050
0051 if not Path("docker-compose.yml").is_file():
0052 print("Error: docker-compose.yml not found in the current directory. "
0053 "Please ensure you are in the project root and the file exists.")
0054 raise typer.Exit(code=1)
0055 if not Path("supervisord.conf").is_file():
0056 print("Error: supervisord.conf not found in the current directory. "
0057 "Please ensure you have the correct configuration file present.")
0058 raise typer.Exit(code=1)
0059 print("Starting testbed services...")
0060 print("--- Starting Docker services ---")
0061 subprocess.run(["docker", "compose", "up", "-d"])
0062 print("--- Starting supervisord services ---")
0063
0064 if not _check_supervisord_running():
0065 print("supervisord is not running, starting it now...")
0066 subprocess.run(["supervisord", "-c", "supervisord.conf"])
0067 else:
0068 print("supervisord is already running.")
0069 subprocess.run(["supervisorctl", "-c", "supervisord.conf", "start", "all"])
0070
0071 @app.command()
0072 def stop():
0073 """
0074 Stops the testbed services.
0075 """
0076 print("Stopping testbed services...")
0077 print("--- Stopping supervisord services ---")
0078 subprocess.run(["supervisorctl", "-c", "supervisord.conf", "stop", "all"])
0079 print("--- Stopping Docker services ---")
0080 subprocess.run(["docker", "compose", "down"])
0081
0082 @app.command()
0083 def status():
0084 """
0085 Checks the status of the testbed services.
0086 """
0087 print("--- Docker services status ---")
0088 subprocess.run(["docker", "compose", "ps"])
0089 print("\n--- supervisord services status ---")
0090 subprocess.run(["supervisorctl", "-c", "supervisord.conf", "status"])
0091 _print_workflow_status()
0092
0093 def _check_supervisord_running() -> bool:
0094 """Checks if supervisord is running by trying to connect to it."""
0095 try:
0096 result = subprocess.run(
0097 ["supervisorctl", "-c", "supervisord.conf", "status"],
0098 capture_output=True,
0099 text=True,
0100 timeout=5
0101 )
0102
0103
0104 return result.returncode in [0, 3]
0105 except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
0106 return False
0107
0108 def _check_postgres_connection():
0109 """Checks the connection to the PostgreSQL database."""
0110 db_host = os.getenv("DB_HOST", "localhost")
0111 db_port = os.getenv("DB_PORT", "5432")
0112 db_user = os.getenv("DB_USER", "admin")
0113 db_name = os.getenv("DB_NAME", "swfdb")
0114
0115 print(f"--- Checking PostgreSQL connection at {db_host}:{db_port} ---")
0116 try:
0117 result = subprocess.run(
0118 ["pg_isready", "-h", db_host, "-p", db_port, "-U", db_user, "-d", db_name],
0119 capture_output=True,
0120 text=True,
0121 check=True,
0122 )
0123 print(result.stdout.strip())
0124 if "accepting connections" not in result.stdout:
0125 print("Warning: PostgreSQL is not ready.")
0126 return False
0127 return True
0128 except (subprocess.CalledProcessError, FileNotFoundError) as e:
0129 print(f"Error checking PostgreSQL status: {e}")
0130 print("Please ensure PostgreSQL is running and `pg_isready` is in your PATH.")
0131 return False
0132
0133 def _check_activemq_connection():
0134 """Checks if ActiveMQ is listening on its port."""
0135 amq_port = os.getenv("ACTIVEMQ_PORT", "61616")
0136 print(f"--- Checking ActiveMQ connection on port {amq_port} ---")
0137 result = subprocess.run(f"lsof -i -P -n | grep LISTEN | grep ':{amq_port}'", shell=True, capture_output=True)
0138 if result.returncode == 0:
0139 print(f"ActiveMQ appears to be running and listening on port {amq_port}.")
0140 return True
0141 else:
0142 print(f"Warning: Could not detect a service listening on port {amq_port}.")
0143 print("Please ensure ActiveMQ is running.")
0144 return False
0145
0146
0147 def _get_workflow_status():
0148 """Query monitor API for running workflows and agent states."""
0149 import requests
0150
0151 monitor_url = os.getenv("SWF_MONITOR_HTTP_URL", "http://localhost:8002")
0152 api_token = os.getenv("SWF_API_TOKEN", "")
0153
0154 headers = {}
0155 if api_token:
0156 headers["Authorization"] = f"Token {api_token}"
0157
0158 results = {"executions": [], "agents": [], "error": None}
0159
0160 try:
0161
0162 resp = requests.get(
0163 f"{monitor_url}/api/workflow-executions/",
0164 params={"status": "running"},
0165 headers=headers,
0166 timeout=5,
0167 verify=False
0168 )
0169 if resp.status_code == 200:
0170 data = resp.json()
0171
0172 results["executions"] = data.get("results", data) if isinstance(data, dict) else data
0173
0174
0175 resp = requests.get(
0176 f"{monitor_url}/api/systemagents/",
0177 headers=headers,
0178 timeout=5,
0179 verify=False
0180 )
0181 if resp.status_code == 200:
0182 data = resp.json()
0183 all_agents = data.get("results", data) if isinstance(data, dict) else data
0184
0185 from datetime import datetime, timedelta, timezone
0186 cutoff = datetime.now(timezone.utc) - timedelta(minutes=5)
0187 results["agents"] = [
0188 a for a in all_agents
0189 if a.get("operational_state") != "EXITED"
0190 ]
0191
0192 except requests.exceptions.RequestException as e:
0193 results["error"] = str(e)
0194
0195 return results
0196
0197
0198 def _print_workflow_status():
0199 """Print workflow and agent status from monitor."""
0200 print("\n--- Workflow Status ---")
0201
0202 status = _get_workflow_status()
0203
0204 if status["error"]:
0205 print(f"Could not query monitor API: {status['error']}")
0206 return
0207
0208
0209 executions = status["executions"]
0210 if executions:
0211 print(f"Running workflows: {len(executions)}")
0212 for ex in executions:
0213 exec_id = ex.get("execution_id", "unknown")
0214 namespace = ex.get("namespace", "")
0215 start_time = ex.get("start_time", "")[:19] if ex.get("start_time") else ""
0216 print(f" {exec_id} (namespace: {namespace}, started: {start_time})")
0217 else:
0218 print("Running workflows: 0")
0219
0220
0221 agents = status["agents"]
0222 if agents:
0223 print(f"\nActive agents: {len(agents)}")
0224 for a in agents[:10]:
0225 name = a.get("instance_name", "unknown")
0226 state = a.get("operational_state", "?")
0227 agent_type = a.get("agent_type", "?")
0228 print(f" {name}: {state} ({agent_type})")
0229 if len(agents) > 10:
0230 print(f" ... and {len(agents) - 10} more")
0231 else:
0232 print("\nActive agents: 0")
0233
0234 @app.command("start-local")
0235 def start_local():
0236 """
0237 Starts the local testbed services using supervisord.
0238 """
0239
0240 _setup_environment()
0241
0242 print("Starting local testbed services...")
0243
0244 db_ok = _check_postgres_connection()
0245 amq_ok = _check_activemq_connection()
0246
0247 if not db_ok or not amq_ok:
0248 print("\nError: One or more background services are not available. Aborting.")
0249 raise typer.Abort()
0250
0251 print("\n--- Starting supervisord services ---")
0252 if not _check_supervisord_running():
0253 print("supervisord is not running, starting it now...")
0254 subprocess.run(["supervisord", "-c", "supervisord.conf"])
0255 else:
0256 print("supervisord is already running.")
0257
0258 subprocess.run(["supervisorctl", "-c", "supervisord.conf", "start", "all"])
0259
0260 @app.command("stop-local")
0261 def stop_local():
0262 """
0263 Stops the local testbed services.
0264 """
0265 print("--- Stopping local supervisord services ---")
0266 subprocess.run(["supervisorctl", "-c", "supervisord.conf", "stop", "all"])
0267
0268
0269
0270 @app.command("status-local")
0271 def status_local():
0272 """
0273 Checks the status of the locally running testbed services.
0274 """
0275
0276 _setup_environment()
0277
0278 print("--- Local services status ---")
0279 _check_postgres_connection()
0280 _check_activemq_connection()
0281 print("\n--- supervisord services status ---")
0282
0283 if _check_supervisord_running():
0284 print("supervisord is running.")
0285 subprocess.run(["supervisorctl", "-c", "supervisord.conf", "status"])
0286 else:
0287 print("supervisord is not running.")
0288 _print_workflow_status()
0289
0290
0291 @app.command("agent-manager")
0292 def agent_manager():
0293 """
0294 Start the user agent manager daemon.
0295
0296 This lightweight daemon listens for MCP commands to control your testbed.
0297 It manages agent processes via supervisord and sends heartbeats to the monitor.
0298
0299 Run this once and leave it running. MCP can then start/stop your testbed remotely.
0300
0301 Example:
0302 testbed agent-manager # Start in foreground
0303 testbed agent-manager & # Start in background
0304 nohup testbed agent-manager & # Start and persist after logout
0305 """
0306 _setup_environment()
0307
0308 from .user_agent_manager import UserAgentManager
0309
0310 manager = UserAgentManager()
0311 manager.run()
0312
0313
0314 AGENTS_CONF = 'agents.supervisord.conf'
0315
0316
0317 @app.command("stop-agents")
0318 def stop_agents():
0319 """
0320 Stop all workflow agents.
0321
0322 Stops agents started by 'testbed run'. Use this before restarting
0323 with a different configuration.
0324 """
0325 _setup_environment()
0326
0327 testbed_root = Path(__file__).parent.parent.parent
0328 conf_path = testbed_root / AGENTS_CONF
0329
0330 if not conf_path.exists():
0331 print(f"Error: {AGENTS_CONF} not found. Run 'testbed init' first.")
0332 raise typer.Exit(code=1)
0333
0334
0335 print("Stopping workflow agents...")
0336 result = subprocess.run(
0337 ["supervisorctl", "-c", str(conf_path), "stop", "all"],
0338 capture_output=True,
0339 text=True,
0340 cwd=testbed_root
0341 )
0342
0343 if result.returncode == 4:
0344 print("No agents running (supervisord not active)")
0345 elif result.returncode != 0:
0346 print(f"Error stopping agents: {result.stderr}")
0347 raise typer.Exit(code=1)
0348 else:
0349 print(result.stdout)
0350 print("Agents stopped.")
0351
0352
0353 @app.command()
0354 def run(
0355 config_name: str = typer.Argument(
0356 None,
0357 help="Config name (e.g., 'fast_processing' loads workflows/fast_processing.toml). "
0358 "If not specified, uses workflows/testbed.toml"
0359 )
0360 ):
0361 """
0362 Start agents and run a workflow.
0363
0364 Examples:
0365 testbed run # Run using workflows/testbed.toml
0366 testbed run fast_processing # Run using workflows/fast_processing.toml
0367 """
0368 _setup_environment()
0369
0370
0371 testbed_root = Path(__file__).parent.parent.parent
0372 sys.path.insert(0, str(testbed_root))
0373
0374 from workflows.orchestrator import run as orchestrator_run
0375
0376 success = orchestrator_run(config_name)
0377 if not success:
0378 raise typer.Exit(code=1)
0379
0380
0381 if __name__ == "__main__":
0382 app()