Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 import typer
0002 import shutil
0003 from pathlib import Path
0004 import subprocess
0005 import os
0006 import sys
0007 import psutil
0008 
0009 app = typer.Typer(help="ePIC Streaming Workflow Testbed CLI")
0010 
0011 SUPERVISORD_CONF_TEMPLATE = Path(__file__).parent.parent.parent / "supervisord.conf"
0012 
0013 @app.command()
0014 def init():
0015     """
0016     Initializes the testbed environment by creating a supervisord.conf file
0017     and a logs directory.
0018     """
0019     print("Initializing testbed environment...")
0020 
0021     # Create logs directory
0022     logs_dir = Path("logs")
0023     logs_dir.mkdir(exist_ok=True)
0024     print(f"Created directory: {logs_dir.resolve()}")
0025 
0026     # Copy supervisord.conf
0027     dest_conf = Path("supervisord.conf")
0028     if dest_conf.exists():
0029         print(f"{dest_conf} already exists. Skipping.")
0030     else:
0031         shutil.copy(SUPERVISORD_CONF_TEMPLATE, dest_conf)
0032         print(f"Created {dest_conf}")
0033 
0034 def _setup_environment():
0035     """Set up the SWF_HOME environment variable for the current process and children."""
0036     script_dir = Path(__file__).parent.parent.parent.absolute()
0037     swf_home = script_dir.parent
0038     os.environ["SWF_HOME"] = str(swf_home)
0039     print(f"SWF_HOME set to: {swf_home}")
0040     return swf_home
0041 
0042 @app.command()
0043 def start():
0044     """
0045     Starts the testbed services using supervisord and docker compose.
0046     """
0047     # Set up environment
0048     _setup_environment()
0049     
0050     # Check for required files
0051     if not Path("docker-compose.yml").is_file():
0052         print("Error: docker-compose.yml not found in the current directory. "
0053               "Please ensure you are in the project root and the file exists.")
0054         raise typer.Exit(code=1)
0055     if not Path("supervisord.conf").is_file():
0056         print("Error: supervisord.conf not found in the current directory. "
0057               "Please ensure you have the correct configuration file present.")
0058         raise typer.Exit(code=1)
0059     print("Starting testbed services...")
0060     print("--- Starting Docker services ---")
0061     subprocess.run(["docker", "compose", "up", "-d"])
0062     print("--- Starting supervisord services ---")
0063     # Check if supervisord is running, start it if needed
0064     if not _check_supervisord_running():
0065         print("supervisord is not running, starting it now...")
0066         subprocess.run(["supervisord", "-c", "supervisord.conf"])
0067     else:
0068         print("supervisord is already running.")
0069     subprocess.run(["supervisorctl", "-c", "supervisord.conf", "start", "all"])
0070 
0071 @app.command()
0072 def stop():
0073     """
0074     Stops the testbed services.
0075     """
0076     print("Stopping testbed services...")
0077     print("--- Stopping supervisord services ---")
0078     subprocess.run(["supervisorctl", "-c", "supervisord.conf", "stop", "all"])
0079     print("--- Stopping Docker services ---")
0080     subprocess.run(["docker", "compose", "down"])
0081 
0082 @app.command()
0083 def status():
0084     """
0085     Checks the status of the testbed services.
0086     """
0087     print("--- Docker services status ---")
0088     subprocess.run(["docker", "compose", "ps"])
0089     print("\n--- supervisord services status ---")
0090     subprocess.run(["supervisorctl", "-c", "supervisord.conf", "status"])
0091     _print_workflow_status()
0092 
0093 def _check_supervisord_running() -> bool:
0094     """Checks if supervisord is running by trying to connect to it."""
0095     try:
0096         result = subprocess.run(
0097             ["supervisorctl", "-c", "supervisord.conf", "status"],
0098             capture_output=True,
0099             text=True,
0100             timeout=5
0101         )
0102         # supervisorctl returns 0 if all processes OK, 3 if some failed, 4 if can't connect
0103         # Return True if we can connect (codes 0 or 3), False if can't connect (code 4+)
0104         return result.returncode in [0, 3]
0105     except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError):
0106         return False
0107 
0108 def _check_postgres_connection():
0109     """Checks the connection to the PostgreSQL database."""
0110     db_host = os.getenv("DB_HOST", "localhost")
0111     db_port = os.getenv("DB_PORT", "5432")
0112     db_user = os.getenv("DB_USER", "admin")
0113     db_name = os.getenv("DB_NAME", "swfdb")
0114 
0115     print(f"--- Checking PostgreSQL connection at {db_host}:{db_port} ---")
0116     try:
0117         result = subprocess.run(
0118             ["pg_isready", "-h", db_host, "-p", db_port, "-U", db_user, "-d", db_name],
0119             capture_output=True,
0120             text=True,
0121             check=True,
0122         )
0123         print(result.stdout.strip())
0124         if "accepting connections" not in result.stdout:
0125             print("Warning: PostgreSQL is not ready.")
0126             return False
0127         return True
0128     except (subprocess.CalledProcessError, FileNotFoundError) as e:
0129         print(f"Error checking PostgreSQL status: {e}")
0130         print("Please ensure PostgreSQL is running and `pg_isready` is in your PATH.")
0131         return False
0132 
0133 def _check_activemq_connection():
0134     """Checks if ActiveMQ is listening on its port."""
0135     amq_port = os.getenv("ACTIVEMQ_PORT", "61616")
0136     print(f"--- Checking ActiveMQ connection on port {amq_port} ---")
0137     result = subprocess.run(f"lsof -i -P -n | grep LISTEN | grep ':{amq_port}'", shell=True, capture_output=True)
0138     if result.returncode == 0:
0139         print(f"ActiveMQ appears to be running and listening on port {amq_port}.")
0140         return True
0141     else:
0142         print(f"Warning: Could not detect a service listening on port {amq_port}.")
0143         print("Please ensure ActiveMQ is running.")
0144         return False
0145 
0146 
0147 def _get_workflow_status():
0148     """Query monitor API for running workflows and agent states."""
0149     import requests
0150 
0151     monitor_url = os.getenv("SWF_MONITOR_HTTP_URL", "http://localhost:8002")
0152     api_token = os.getenv("SWF_API_TOKEN", "")
0153 
0154     headers = {}
0155     if api_token:
0156         headers["Authorization"] = f"Token {api_token}"
0157 
0158     results = {"executions": [], "agents": [], "error": None}
0159 
0160     try:
0161         # Get running executions
0162         resp = requests.get(
0163             f"{monitor_url}/api/workflow-executions/",
0164             params={"status": "running"},
0165             headers=headers,
0166             timeout=5,
0167             verify=False
0168         )
0169         if resp.status_code == 200:
0170             data = resp.json()
0171             # Handle both paginated (dict with "results") and direct list response
0172             results["executions"] = data.get("results", data) if isinstance(data, dict) else data
0173 
0174         # Get active agents (exclude EXITED)
0175         resp = requests.get(
0176             f"{monitor_url}/api/systemagents/",
0177             headers=headers,
0178             timeout=5,
0179             verify=False
0180         )
0181         if resp.status_code == 200:
0182             data = resp.json()
0183             all_agents = data.get("results", data) if isinstance(data, dict) else data
0184             # Filter to non-EXITED agents with recent heartbeat
0185             from datetime import datetime, timedelta, timezone
0186             cutoff = datetime.now(timezone.utc) - timedelta(minutes=5)
0187             results["agents"] = [
0188                 a for a in all_agents
0189                 if a.get("operational_state") != "EXITED"
0190             ]
0191 
0192     except requests.exceptions.RequestException as e:
0193         results["error"] = str(e)
0194 
0195     return results
0196 
0197 
0198 def _print_workflow_status():
0199     """Print workflow and agent status from monitor."""
0200     print("\n--- Workflow Status ---")
0201 
0202     status = _get_workflow_status()
0203 
0204     if status["error"]:
0205         print(f"Could not query monitor API: {status['error']}")
0206         return
0207 
0208     # Running executions
0209     executions = status["executions"]
0210     if executions:
0211         print(f"Running workflows: {len(executions)}")
0212         for ex in executions:
0213             exec_id = ex.get("execution_id", "unknown")
0214             namespace = ex.get("namespace", "")
0215             start_time = ex.get("start_time", "")[:19] if ex.get("start_time") else ""
0216             print(f"  {exec_id} (namespace: {namespace}, started: {start_time})")
0217     else:
0218         print("Running workflows: 0")
0219 
0220     # Active agents
0221     agents = status["agents"]
0222     if agents:
0223         print(f"\nActive agents: {len(agents)}")
0224         for a in agents[:10]:  # Limit output
0225             name = a.get("instance_name", "unknown")
0226             state = a.get("operational_state", "?")
0227             agent_type = a.get("agent_type", "?")
0228             print(f"  {name}: {state} ({agent_type})")
0229         if len(agents) > 10:
0230             print(f"  ... and {len(agents) - 10} more")
0231     else:
0232         print("\nActive agents: 0")
0233 
0234 @app.command("start-local")
0235 def start_local():
0236     """
0237     Starts the local testbed services using supervisord.
0238     """
0239     # Set up environment
0240     _setup_environment()
0241     
0242     print("Starting local testbed services...")
0243 
0244     db_ok = _check_postgres_connection()
0245     amq_ok = _check_activemq_connection()
0246 
0247     if not db_ok or not amq_ok:
0248         print("\nError: One or more background services are not available. Aborting.")
0249         raise typer.Abort()
0250 
0251     print("\n--- Starting supervisord services ---")
0252     if not _check_supervisord_running():
0253         print("supervisord is not running, starting it now...")
0254         subprocess.run(["supervisord", "-c", "supervisord.conf"])
0255     else:
0256         print("supervisord is already running.")
0257     
0258     subprocess.run(["supervisorctl", "-c", "supervisord.conf", "start", "all"])
0259 
0260 @app.command("stop-local")
0261 def stop_local():
0262     """
0263     Stops the local testbed services.
0264     """
0265     print("--- Stopping local supervisord services ---")
0266     subprocess.run(["supervisorctl", "-c", "supervisord.conf", "stop", "all"])
0267     # Optionally, shutdown supervisord itself
0268     # subprocess.run(["supervisorctl", "-c", "supervisord.conf", "shutdown"])
0269 
0270 @app.command("status-local")
0271 def status_local():
0272     """
0273     Checks the status of the locally running testbed services.
0274     """
0275     # Set up environment (needed for supervisord)
0276     _setup_environment()
0277 
0278     print("--- Local services status ---")
0279     _check_postgres_connection()
0280     _check_activemq_connection()
0281     print("\n--- supervisord services status ---")
0282     # Check if supervisord is running
0283     if _check_supervisord_running():
0284         print("supervisord is running.")
0285         subprocess.run(["supervisorctl", "-c", "supervisord.conf", "status"])
0286     else:
0287         print("supervisord is not running.")
0288     _print_workflow_status()
0289 
0290 
0291 @app.command("agent-manager")
0292 def agent_manager():
0293     """
0294     Start the user agent manager daemon.
0295 
0296     This lightweight daemon listens for MCP commands to control your testbed.
0297     It manages agent processes via supervisord and sends heartbeats to the monitor.
0298 
0299     Run this once and leave it running. MCP can then start/stop your testbed remotely.
0300 
0301     Example:
0302         testbed agent-manager          # Start in foreground
0303         testbed agent-manager &        # Start in background
0304         nohup testbed agent-manager &  # Start and persist after logout
0305     """
0306     _setup_environment()
0307 
0308     from .user_agent_manager import UserAgentManager
0309 
0310     manager = UserAgentManager()
0311     manager.run()
0312 
0313 
0314 AGENTS_CONF = 'agents.supervisord.conf'
0315 
0316 
0317 @app.command("stop-agents")
0318 def stop_agents():
0319     """
0320     Stop all workflow agents.
0321 
0322     Stops agents started by 'testbed run'. Use this before restarting
0323     with a different configuration.
0324     """
0325     _setup_environment()
0326 
0327     testbed_root = Path(__file__).parent.parent.parent
0328     conf_path = testbed_root / AGENTS_CONF
0329 
0330     if not conf_path.exists():
0331         print(f"Error: {AGENTS_CONF} not found. Run 'testbed init' first.")
0332         raise typer.Exit(code=1)
0333 
0334     # Stop all agents
0335     print("Stopping workflow agents...")
0336     result = subprocess.run(
0337         ["supervisorctl", "-c", str(conf_path), "stop", "all"],
0338         capture_output=True,
0339         text=True,
0340         cwd=testbed_root
0341     )
0342 
0343     if result.returncode == 4:
0344         print("No agents running (supervisord not active)")
0345     elif result.returncode != 0:
0346         print(f"Error stopping agents: {result.stderr}")
0347         raise typer.Exit(code=1)
0348     else:
0349         print(result.stdout)
0350         print("Agents stopped.")
0351 
0352 
0353 @app.command()
0354 def run(
0355     config_name: str = typer.Argument(
0356         None,
0357         help="Config name (e.g., 'fast_processing' loads workflows/fast_processing.toml). "
0358              "If not specified, uses workflows/testbed.toml"
0359     )
0360 ):
0361     """
0362     Start agents and run a workflow.
0363 
0364     Examples:
0365         testbed run                    # Run using workflows/testbed.toml
0366         testbed run fast_processing    # Run using workflows/fast_processing.toml
0367     """
0368     _setup_environment()
0369 
0370     # Add swf-testbed to Python path
0371     testbed_root = Path(__file__).parent.parent.parent
0372     sys.path.insert(0, str(testbed_root))
0373 
0374     from workflows.orchestrator import run as orchestrator_run
0375 
0376     success = orchestrator_run(config_name)
0377     if not success:
0378         raise typer.Exit(code=1)
0379 
0380 
0381 if __name__ == "__main__":
0382     app()