Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:12

0001 #!/usr/bin/env python3
0002 """
0003 Workflow Orchestrator - Start agents and trigger workflows.
0004 
0005 Usage:
0006     testbed run <name>           # Run workflows/<name>.toml
0007     testbed run                  # Run workflows/testbed.toml
0008 """
0009 
0010 import os
0011 import sys
0012 import subprocess
0013 import time
0014 import tomllib
0015 from pathlib import Path
0016 
0017 
0018 # Agent name mapping: testbed.toml key -> supervisord program name
0019 AGENT_PROGRAM_MAP = {
0020     'data': 'example-data-agent',
0021     'processing': 'example-processing-agent',
0022     'fastmon': 'example-fastmon-agent',
0023     'fast_processing': 'fast-processing-agent',
0024     'stf-data': 'stf-data-agent',
0025     'stf-processing': 'stf-processing-agent',
0026 }
0027 
0028 AGENTS_CONF = 'agents.supervisord.conf'
0029 AGENTS_SOCK = '/tmp/swf-agents-supervisor.sock'
0030 
0031 
0032 def load_config(config_name: str = None) -> dict:
0033     """
0034     Load workflow configuration.
0035 
0036     Args:
0037         config_name: Name of config file (without path/extension), or None for testbed.toml
0038 
0039     Returns:
0040         Merged configuration dict
0041     """
0042     workflows_dir = Path(__file__).parent
0043 
0044     if config_name is None:
0045         config_path = workflows_dir / 'testbed.toml'
0046     else:
0047         # Try exact name first, then with .toml, then _default.toml
0048         candidates = [
0049             workflows_dir / config_name,
0050             workflows_dir / f'{config_name}.toml',
0051             workflows_dir / f'{config_name}_default.toml',
0052         ]
0053         config_path = None
0054         for candidate in candidates:
0055             if candidate.exists():
0056                 config_path = candidate
0057                 break
0058         if config_path is None:
0059             raise FileNotFoundError(f"Config not found: {config_name} (tried {[str(c) for c in candidates]})")
0060 
0061     with open(config_path, 'rb') as f:
0062         config = tomllib.load(f)
0063 
0064     return config
0065 
0066 
0067 def restart_supervisord() -> bool:
0068     """Restart supervisord to pick up current environment and config.
0069 
0070     Always restarts to ensure fresh env vars (like SWF_TESTBED_CONFIG) are available.
0071     Called after verifying no agents are running.
0072     """
0073     testbed_dir = Path(__file__).parent.parent
0074     conf_path = str(testbed_dir / AGENTS_CONF)
0075 
0076     # Check if supervisord is running
0077     result = subprocess.run(
0078         ['supervisorctl', '-c', conf_path, 'status'],
0079         capture_output=True,
0080         text=True,
0081         cwd=testbed_dir
0082     )
0083 
0084     if result.returncode != 4:  # Connected - shutdown first
0085         print("Restarting supervisord to pick up current environment...")
0086         subprocess.run(
0087             ['supervisorctl', '-c', conf_path, 'shutdown'],
0088             capture_output=True,
0089             cwd=testbed_dir
0090         )
0091         time.sleep(1)
0092 
0093     # Start fresh
0094     print("Starting supervisord...")
0095     start_result = subprocess.run(
0096         ['supervisord', '-c', conf_path],
0097         capture_output=True,
0098         text=True,
0099         cwd=testbed_dir
0100     )
0101 
0102     if start_result.returncode != 0:
0103         print(f"Error starting supervisord: {start_result.stderr}")
0104         return False
0105 
0106     time.sleep(1)
0107     return True
0108 
0109 
0110 def start_agent(agent_name: str) -> bool:
0111     """
0112     Start an agent via supervisorctl.
0113 
0114     Args:
0115         agent_name: Key from testbed.toml agents section (e.g., 'processing')
0116 
0117     Returns:
0118         True if agent started or already running
0119     """
0120     program_name = AGENT_PROGRAM_MAP.get(agent_name)
0121     if not program_name:
0122         print(f"Unknown agent: {agent_name}")
0123         return False
0124 
0125     testbed_dir = Path(__file__).parent.parent
0126 
0127     result = subprocess.run(
0128         ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'start', program_name],
0129         capture_output=True,
0130         text=True,
0131         cwd=testbed_dir
0132     )
0133 
0134     if result.returncode == 0 or 'already started' in result.stdout.lower():
0135         print(f"  {program_name}: started")
0136         return True
0137     else:
0138         print(f"  {program_name}: failed to start - {result.stderr.strip()}")
0139         return False
0140 
0141 
0142 def verify_agent_pid(agent_name: str) -> bool:
0143     """
0144     Verify agent process exists by checking supervisord status.
0145 
0146     Args:
0147         agent_name: Key from testbed.toml agents section
0148 
0149     Returns:
0150         True if PID exists and process is running
0151     """
0152     program_name = AGENT_PROGRAM_MAP.get(agent_name)
0153     if not program_name:
0154         return False
0155 
0156     testbed_dir = Path(__file__).parent.parent
0157 
0158     result = subprocess.run(
0159         ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'status', program_name],
0160         capture_output=True,
0161         text=True,
0162         cwd=testbed_dir
0163     )
0164 
0165     # Output like: "example-processing-agent   RUNNING   pid 12345, uptime 0:00:05"
0166     return 'RUNNING' in result.stdout
0167 
0168 
0169 def get_running_agents() -> list:
0170     """Get list of currently running agent program names."""
0171     testbed_dir = Path(__file__).parent.parent
0172 
0173     result = subprocess.run(
0174         ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'status'],
0175         capture_output=True,
0176         text=True,
0177         cwd=testbed_dir
0178     )
0179 
0180     if result.returncode == 4:  # Can't connect - supervisord not running
0181         return []
0182 
0183     running = []
0184     for line in result.stdout.strip().split('\n'):
0185         if 'RUNNING' in line:
0186             # Line format: "program-name   RUNNING   pid 12345, uptime 0:00:05"
0187             program_name = line.split()[0]
0188             running.append(program_name)
0189 
0190     return running
0191 
0192 
0193 def reread_supervisord_config() -> bool:
0194     """Reread supervisord config to pick up changes."""
0195     testbed_dir = Path(__file__).parent.parent
0196 
0197     result = subprocess.run(
0198         ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'reread'],
0199         capture_output=True,
0200         text=True,
0201         cwd=testbed_dir
0202     )
0203 
0204     if result.returncode == 0:
0205         if result.stdout.strip():
0206             print(f"Config changes detected: {result.stdout.strip()}")
0207         return True
0208     else:
0209         print(f"Warning: Config reread failed: {result.stderr.strip()}")
0210         return False
0211 
0212 
0213 def start_workflow_runner() -> bool:
0214     """Start the workflow runner agent."""
0215     testbed_dir = Path(__file__).parent.parent
0216 
0217     result = subprocess.run(
0218         ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'start', 'workflow-runner'],
0219         capture_output=True,
0220         text=True,
0221         cwd=testbed_dir
0222     )
0223 
0224     if result.returncode == 0 or 'already started' in result.stdout.lower():
0225         print(f"  workflow-runner: started")
0226         return True
0227     else:
0228         print(f"  workflow-runner: failed to start - {result.stderr.strip()}")
0229         return False
0230 
0231 
0232 def send_run_workflow(config: dict) -> bool:
0233     """
0234     Send run_workflow command to WorkflowRunner via ActiveMQ.
0235 
0236     Args:
0237         config: Configuration dict with workflow and parameters
0238 
0239     Returns:
0240         True if message sent successfully
0241     """
0242     # Import here to avoid loading STOMP if not needed
0243     from workflows.send_workflow_command import CommandSender
0244 
0245     workflow_config = config.get('workflow', {})
0246     workflow_name = workflow_config.get('name', 'stf_datataking')
0247     workflow_config_name = workflow_config.get('config', 'fast_processing_default')
0248     realtime = workflow_config.get('realtime', True)
0249 
0250     # Get parameter overrides
0251     params = config.get('parameters', {})
0252 
0253     # Get namespace from the loaded config (not from hardcoded testbed.toml)
0254     namespace = config.get('testbed', {}).get('namespace')
0255 
0256     sender = CommandSender()
0257     sender.namespace = namespace  # Override with correct namespace
0258     sender.connect()
0259 
0260     try:
0261         sender.send_run_workflow(
0262             workflow_name,
0263             config=workflow_config_name,
0264             realtime=realtime,
0265             **params
0266         )
0267         return True
0268     finally:
0269         sender.disconnect()
0270 
0271 
0272 def run(config_name: str = None) -> bool:
0273     """
0274     Start agents and trigger workflow.
0275 
0276     Refuses to start if agents are already running. User must run
0277     'testbed stop-local' first to ensure clean slate.
0278 
0279     Args:
0280         config_name: Name of config file, or None for testbed.toml
0281 
0282     Returns:
0283         True if workflow started successfully
0284     """
0285     # Load configuration
0286     try:
0287         config = load_config(config_name)
0288     except FileNotFoundError as e:
0289         print(f"Error: {e}")
0290         return False
0291 
0292     namespace = config.get('testbed', {}).get('namespace')
0293     if not namespace:
0294         print("Error: namespace not set in [testbed] section")
0295         return False
0296 
0297     print(f"Namespace: {namespace}")
0298 
0299     # Check if any agents are already running - refuse if so
0300     # Do this BEFORE restarting supervisord
0301     running_agents = get_running_agents()
0302     if running_agents:
0303         print(f"Error: Cannot start - agents already running: {running_agents}")
0304         print("Run 'testbed stop-agents' first to stop existing agents.")
0305         return False
0306 
0307     # Restart supervisord to pick up current env vars and config
0308     if not restart_supervisord():
0309         print("Error: Failed to start supervisord")
0310         return False
0311 
0312     # Start workflow runner first
0313     print("Starting workflow runner...")
0314     if not start_workflow_runner():
0315         print("Error: Failed to start workflow runner")
0316         return False
0317 
0318     # Give it a moment to connect
0319     time.sleep(2)
0320 
0321     # Start enabled agents
0322     agents_config = config.get('agents', {})
0323     enabled_agents = []
0324 
0325     print("Starting agents...")
0326     for agent_name, agent_config in agents_config.items():
0327         if isinstance(agent_config, dict) and agent_config.get('enabled', False):
0328             if start_agent(agent_name):
0329                 enabled_agents.append(agent_name)
0330 
0331     if not enabled_agents:
0332         print("Warning: No agents enabled in configuration")
0333 
0334     # Brief pause for agents to initialize
0335     time.sleep(2)
0336 
0337     # Verify PIDs
0338     print("Verifying agents...")
0339     all_running = True
0340     for agent_name in enabled_agents:
0341         if verify_agent_pid(agent_name):
0342             print(f"  {agent_name}: running")
0343         else:
0344             print(f"  {agent_name}: NOT running")
0345             all_running = False
0346 
0347     if not all_running:
0348         print("Warning: Some agents failed to start")
0349 
0350     # Send run_workflow command
0351     print("Triggering workflow...")
0352     if send_run_workflow(config):
0353         workflow_name = config.get('workflow', {}).get('name', 'stf_datataking')
0354         print(f"Workflow '{workflow_name}' triggered. Use 'testbed status' to monitor.")
0355         return True
0356     else:
0357         print("Error: Failed to send workflow command")
0358         return False
0359 
0360 
0361 if __name__ == '__main__':
0362     import argparse
0363     parser = argparse.ArgumentParser(description='Start agents and run workflow')
0364     parser.add_argument('config', nargs='?', help='Config name (default: testbed.toml)')
0365     args = parser.parse_args()
0366 
0367     success = run(args.config)
0368     sys.exit(0 if success else 1)