File indexing completed on 2026-04-25 08:29:12
0001
0002 """
0003 Workflow Orchestrator - Start agents and trigger workflows.
0004
0005 Usage:
0006 testbed run <name> # Run workflows/<name>.toml
0007 testbed run # Run workflows/testbed.toml
0008 """
0009
0010 import os
0011 import sys
0012 import subprocess
0013 import time
0014 import tomllib
0015 from pathlib import Path
0016
0017
0018
0019 AGENT_PROGRAM_MAP = {
0020 'data': 'example-data-agent',
0021 'processing': 'example-processing-agent',
0022 'fastmon': 'example-fastmon-agent',
0023 'fast_processing': 'fast-processing-agent',
0024 'stf-data': 'stf-data-agent',
0025 'stf-processing': 'stf-processing-agent',
0026 }
0027
0028 AGENTS_CONF = 'agents.supervisord.conf'
0029 AGENTS_SOCK = '/tmp/swf-agents-supervisor.sock'
0030
0031
0032 def load_config(config_name: str = None) -> dict:
0033 """
0034 Load workflow configuration.
0035
0036 Args:
0037 config_name: Name of config file (without path/extension), or None for testbed.toml
0038
0039 Returns:
0040 Merged configuration dict
0041 """
0042 workflows_dir = Path(__file__).parent
0043
0044 if config_name is None:
0045 config_path = workflows_dir / 'testbed.toml'
0046 else:
0047
0048 candidates = [
0049 workflows_dir / config_name,
0050 workflows_dir / f'{config_name}.toml',
0051 workflows_dir / f'{config_name}_default.toml',
0052 ]
0053 config_path = None
0054 for candidate in candidates:
0055 if candidate.exists():
0056 config_path = candidate
0057 break
0058 if config_path is None:
0059 raise FileNotFoundError(f"Config not found: {config_name} (tried {[str(c) for c in candidates]})")
0060
0061 with open(config_path, 'rb') as f:
0062 config = tomllib.load(f)
0063
0064 return config
0065
0066
0067 def restart_supervisord() -> bool:
0068 """Restart supervisord to pick up current environment and config.
0069
0070 Always restarts to ensure fresh env vars (like SWF_TESTBED_CONFIG) are available.
0071 Called after verifying no agents are running.
0072 """
0073 testbed_dir = Path(__file__).parent.parent
0074 conf_path = str(testbed_dir / AGENTS_CONF)
0075
0076
0077 result = subprocess.run(
0078 ['supervisorctl', '-c', conf_path, 'status'],
0079 capture_output=True,
0080 text=True,
0081 cwd=testbed_dir
0082 )
0083
0084 if result.returncode != 4:
0085 print("Restarting supervisord to pick up current environment...")
0086 subprocess.run(
0087 ['supervisorctl', '-c', conf_path, 'shutdown'],
0088 capture_output=True,
0089 cwd=testbed_dir
0090 )
0091 time.sleep(1)
0092
0093
0094 print("Starting supervisord...")
0095 start_result = subprocess.run(
0096 ['supervisord', '-c', conf_path],
0097 capture_output=True,
0098 text=True,
0099 cwd=testbed_dir
0100 )
0101
0102 if start_result.returncode != 0:
0103 print(f"Error starting supervisord: {start_result.stderr}")
0104 return False
0105
0106 time.sleep(1)
0107 return True
0108
0109
0110 def start_agent(agent_name: str) -> bool:
0111 """
0112 Start an agent via supervisorctl.
0113
0114 Args:
0115 agent_name: Key from testbed.toml agents section (e.g., 'processing')
0116
0117 Returns:
0118 True if agent started or already running
0119 """
0120 program_name = AGENT_PROGRAM_MAP.get(agent_name)
0121 if not program_name:
0122 print(f"Unknown agent: {agent_name}")
0123 return False
0124
0125 testbed_dir = Path(__file__).parent.parent
0126
0127 result = subprocess.run(
0128 ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'start', program_name],
0129 capture_output=True,
0130 text=True,
0131 cwd=testbed_dir
0132 )
0133
0134 if result.returncode == 0 or 'already started' in result.stdout.lower():
0135 print(f" {program_name}: started")
0136 return True
0137 else:
0138 print(f" {program_name}: failed to start - {result.stderr.strip()}")
0139 return False
0140
0141
0142 def verify_agent_pid(agent_name: str) -> bool:
0143 """
0144 Verify agent process exists by checking supervisord status.
0145
0146 Args:
0147 agent_name: Key from testbed.toml agents section
0148
0149 Returns:
0150 True if PID exists and process is running
0151 """
0152 program_name = AGENT_PROGRAM_MAP.get(agent_name)
0153 if not program_name:
0154 return False
0155
0156 testbed_dir = Path(__file__).parent.parent
0157
0158 result = subprocess.run(
0159 ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'status', program_name],
0160 capture_output=True,
0161 text=True,
0162 cwd=testbed_dir
0163 )
0164
0165
0166 return 'RUNNING' in result.stdout
0167
0168
0169 def get_running_agents() -> list:
0170 """Get list of currently running agent program names."""
0171 testbed_dir = Path(__file__).parent.parent
0172
0173 result = subprocess.run(
0174 ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'status'],
0175 capture_output=True,
0176 text=True,
0177 cwd=testbed_dir
0178 )
0179
0180 if result.returncode == 4:
0181 return []
0182
0183 running = []
0184 for line in result.stdout.strip().split('\n'):
0185 if 'RUNNING' in line:
0186
0187 program_name = line.split()[0]
0188 running.append(program_name)
0189
0190 return running
0191
0192
0193 def reread_supervisord_config() -> bool:
0194 """Reread supervisord config to pick up changes."""
0195 testbed_dir = Path(__file__).parent.parent
0196
0197 result = subprocess.run(
0198 ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'reread'],
0199 capture_output=True,
0200 text=True,
0201 cwd=testbed_dir
0202 )
0203
0204 if result.returncode == 0:
0205 if result.stdout.strip():
0206 print(f"Config changes detected: {result.stdout.strip()}")
0207 return True
0208 else:
0209 print(f"Warning: Config reread failed: {result.stderr.strip()}")
0210 return False
0211
0212
0213 def start_workflow_runner() -> bool:
0214 """Start the workflow runner agent."""
0215 testbed_dir = Path(__file__).parent.parent
0216
0217 result = subprocess.run(
0218 ['supervisorctl', '-c', str(testbed_dir / AGENTS_CONF), 'start', 'workflow-runner'],
0219 capture_output=True,
0220 text=True,
0221 cwd=testbed_dir
0222 )
0223
0224 if result.returncode == 0 or 'already started' in result.stdout.lower():
0225 print(f" workflow-runner: started")
0226 return True
0227 else:
0228 print(f" workflow-runner: failed to start - {result.stderr.strip()}")
0229 return False
0230
0231
0232 def send_run_workflow(config: dict) -> bool:
0233 """
0234 Send run_workflow command to WorkflowRunner via ActiveMQ.
0235
0236 Args:
0237 config: Configuration dict with workflow and parameters
0238
0239 Returns:
0240 True if message sent successfully
0241 """
0242
0243 from workflows.send_workflow_command import CommandSender
0244
0245 workflow_config = config.get('workflow', {})
0246 workflow_name = workflow_config.get('name', 'stf_datataking')
0247 workflow_config_name = workflow_config.get('config', 'fast_processing_default')
0248 realtime = workflow_config.get('realtime', True)
0249
0250
0251 params = config.get('parameters', {})
0252
0253
0254 namespace = config.get('testbed', {}).get('namespace')
0255
0256 sender = CommandSender()
0257 sender.namespace = namespace
0258 sender.connect()
0259
0260 try:
0261 sender.send_run_workflow(
0262 workflow_name,
0263 config=workflow_config_name,
0264 realtime=realtime,
0265 **params
0266 )
0267 return True
0268 finally:
0269 sender.disconnect()
0270
0271
0272 def run(config_name: str = None) -> bool:
0273 """
0274 Start agents and trigger workflow.
0275
0276 Refuses to start if agents are already running. User must run
0277 'testbed stop-local' first to ensure clean slate.
0278
0279 Args:
0280 config_name: Name of config file, or None for testbed.toml
0281
0282 Returns:
0283 True if workflow started successfully
0284 """
0285
0286 try:
0287 config = load_config(config_name)
0288 except FileNotFoundError as e:
0289 print(f"Error: {e}")
0290 return False
0291
0292 namespace = config.get('testbed', {}).get('namespace')
0293 if not namespace:
0294 print("Error: namespace not set in [testbed] section")
0295 return False
0296
0297 print(f"Namespace: {namespace}")
0298
0299
0300
0301 running_agents = get_running_agents()
0302 if running_agents:
0303 print(f"Error: Cannot start - agents already running: {running_agents}")
0304 print("Run 'testbed stop-agents' first to stop existing agents.")
0305 return False
0306
0307
0308 if not restart_supervisord():
0309 print("Error: Failed to start supervisord")
0310 return False
0311
0312
0313 print("Starting workflow runner...")
0314 if not start_workflow_runner():
0315 print("Error: Failed to start workflow runner")
0316 return False
0317
0318
0319 time.sleep(2)
0320
0321
0322 agents_config = config.get('agents', {})
0323 enabled_agents = []
0324
0325 print("Starting agents...")
0326 for agent_name, agent_config in agents_config.items():
0327 if isinstance(agent_config, dict) and agent_config.get('enabled', False):
0328 if start_agent(agent_name):
0329 enabled_agents.append(agent_name)
0330
0331 if not enabled_agents:
0332 print("Warning: No agents enabled in configuration")
0333
0334
0335 time.sleep(2)
0336
0337
0338 print("Verifying agents...")
0339 all_running = True
0340 for agent_name in enabled_agents:
0341 if verify_agent_pid(agent_name):
0342 print(f" {agent_name}: running")
0343 else:
0344 print(f" {agent_name}: NOT running")
0345 all_running = False
0346
0347 if not all_running:
0348 print("Warning: Some agents failed to start")
0349
0350
0351 print("Triggering workflow...")
0352 if send_run_workflow(config):
0353 workflow_name = config.get('workflow', {}).get('name', 'stf_datataking')
0354 print(f"Workflow '{workflow_name}' triggered. Use 'testbed status' to monitor.")
0355 return True
0356 else:
0357 print("Error: Failed to send workflow command")
0358 return False
0359
0360
0361 if __name__ == '__main__':
0362 import argparse
0363 parser = argparse.ArgumentParser(description='Start agents and run workflow')
0364 parser.add_argument('config', nargs='?', help='Config name (default: testbed.toml)')
0365 args = parser.parse_args()
0366
0367 success = run(args.config)
0368 sys.exit(0 if success else 1)