File indexing completed on 2026-04-25 08:29:13
0001
0002 """
0003 Workflow Simulator Agent - Executes workflow definitions using WorkflowRunner
0004 Integrates SimPy-based workflows with SWF agent infrastructure
0005 """
0006
0007 import os
0008 import sys
0009 import json
0010 import logging
0011 import argparse
0012 from pathlib import Path
0013
0014 def setup_environment():
0015 """Auto-activate venv and load environment variables."""
0016 script_dir = Path(__file__).resolve().parent.parent
0017
0018
0019 if "VIRTUAL_ENV" not in os.environ:
0020 venv_path = script_dir / ".venv"
0021 if venv_path.exists():
0022 print("🔧 Auto-activating virtual environment...")
0023 venv_python = venv_path / "bin" / "python"
0024 if venv_python.exists():
0025 os.environ["VIRTUAL_ENV"] = str(venv_path)
0026 os.environ["PATH"] = f"{venv_path}/bin:{os.environ['PATH']}"
0027 sys.executable = str(venv_python)
0028 else:
0029 print("❌ Error: No Python virtual environment found")
0030 return False
0031
0032
0033 env_file = Path.home() / ".env"
0034 if env_file.exists():
0035 print("🔧 Loading environment variables from ~/.env...")
0036 with open(env_file) as f:
0037 for line in f:
0038 line = line.strip()
0039 if line and not line.startswith('#') and '=' in line:
0040 if line.startswith('export '):
0041 line = line[7:]
0042 key, value = line.split('=', 1)
0043 value = value.strip('"\'')
0044
0045 if '$' in value:
0046 continue
0047 os.environ[key] = value
0048
0049
0050 for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
0051 if proxy_var in os.environ:
0052 del os.environ[proxy_var]
0053
0054 return True
0055
0056 if __name__ == "__main__":
0057 if not setup_environment():
0058 sys.exit(1)
0059
0060
0061 from swf_common_lib.base_agent import BaseAgent
0062 sys.path.append(str(Path(__file__).parent.parent / "workflows"))
0063 from workflow_runner import WorkflowRunner
0064
0065
0066 class WorkflowSimulatorAgent(BaseAgent):
0067 """Agent that executes workflows using WorkflowRunner"""
0068
0069 def __init__(self, workflow_name, config_name=None, duration=3600, config_path=None, **workflow_params):
0070 super().__init__(agent_type='WORKFLOW_SIMULATOR', subscription_queue='/queue/workflow_simulator',
0071 config_path=config_path)
0072
0073 self.workflow_name = workflow_name
0074 self.config_name = config_name
0075 self.duration = duration
0076 self.workflow_params = workflow_params
0077 self.execution_id = None
0078 self.config_path = config_path
0079
0080 self.workflow_runner = WorkflowRunner(
0081 self.monitor_url, config_path=config_path, workflow_name='STF_Datataking'
0082 )
0083
0084
0085 self.workflow_status = "initialized"
0086 self.current_execution = None
0087
0088 def on_message(self, frame):
0089 """Handle incoming workflow control messages"""
0090 try:
0091 message_data, msg_type = self.log_received_message(frame)
0092
0093 if msg_type == 'start_workflow':
0094 self.handle_start_workflow(message_data)
0095 elif msg_type == 'stop_workflow':
0096 self.handle_stop_workflow(message_data)
0097 elif msg_type == 'workflow_status_request':
0098 self.handle_status_request(message_data)
0099 else:
0100 self.logger.info(f"Workflow simulator received unhandled message type: {msg_type}")
0101
0102 except Exception as e:
0103 self.logger.error(f"Error processing message: {e}")
0104 self.workflow_status = "error"
0105 self.send_enhanced_heartbeat({
0106 'workflow_status': self.workflow_status,
0107 'error': str(e)
0108 })
0109
0110 def handle_start_workflow(self, message_data):
0111 """Start workflow execution"""
0112 try:
0113 self.logger.info(f"Starting workflow: {self.workflow_name}")
0114 self.workflow_status = "running"
0115
0116
0117 self.send_enhanced_heartbeat({
0118 'workflow_status': self.workflow_status,
0119 'workflow_name': self.workflow_name,
0120 'execution_phase': 'starting'
0121 })
0122
0123
0124 self.execution_id = self.workflow_runner.run_workflow(
0125 workflow_name=self.workflow_name,
0126 config_name=self.config_name,
0127 duration=self.duration,
0128 **self.workflow_params
0129 )
0130
0131 self.workflow_status = "completed"
0132 self.logger.info(f"Workflow completed successfully: {self.execution_id}")
0133
0134
0135 self.send_enhanced_heartbeat({
0136 'workflow_status': self.workflow_status,
0137 'execution_id': self.execution_id,
0138 'execution_phase': 'completed'
0139 })
0140
0141
0142 self.broadcast_workflow_status('workflow_completed', {
0143 'execution_id': self.execution_id,
0144 'workflow_name': self.workflow_name,
0145 'status': 'completed'
0146 })
0147
0148 except Exception as e:
0149 self.logger.error(f"Workflow execution failed: {e}")
0150 self.workflow_status = "failed"
0151 self.send_enhanced_heartbeat({
0152 'workflow_status': self.workflow_status,
0153 'error': str(e),
0154 'execution_phase': 'failed'
0155 })
0156
0157
0158 self.broadcast_workflow_status('workflow_failed', {
0159 'workflow_name': self.workflow_name,
0160 'status': 'failed',
0161 'error': str(e)
0162 })
0163
0164 def handle_stop_workflow(self, message_data):
0165 """Handle workflow stop request"""
0166 self.logger.info("Received workflow stop request")
0167 self.workflow_status = "stopped"
0168 self.send_enhanced_heartbeat({
0169 'workflow_status': self.workflow_status,
0170 'execution_phase': 'stopped'
0171 })
0172
0173 def handle_status_request(self, message_data):
0174 """Handle workflow status request"""
0175 self.logger.info("Sending workflow status")
0176 self.broadcast_workflow_status('workflow_status_response', {
0177 'workflow_name': self.workflow_name,
0178 'status': self.workflow_status,
0179 'execution_id': self.execution_id
0180 })
0181
0182 def broadcast_workflow_status(self, msg_type, data):
0183 """Broadcast workflow status to other agents"""
0184 message = {
0185 'msg_type': msg_type,
0186 'timestamp': json.dumps(str(self.get_current_time()), default=str),
0187 'agent_name': self.agent_name,
0188 **data
0189 }
0190
0191
0192 self.send_message('/queue/workflow_status', message)
0193 self.logger.info(f"Broadcast {msg_type}: {data}")
0194
0195 def get_current_time(self):
0196 """Get current timestamp"""
0197 from datetime import datetime
0198 return datetime.now()
0199
0200
0201
0202 def main():
0203 """Main entry point with command line argument support"""
0204 script_dir = Path(__file__).parent
0205
0206 parser = argparse.ArgumentParser(description='Workflow Simulator Agent')
0207 parser.add_argument('workflow_name', help='Name of the workflow to execute')
0208 parser.add_argument('--testbed-config', default=str(script_dir / 'testbed.toml'),
0209 help='Testbed config file (default: testbed.toml)')
0210 parser.add_argument('--workflow-config', help='Workflow configuration file name')
0211 parser.add_argument('--duration', type=float, default=0, help='Max duration in seconds (0 = run until workflow completes)')
0212 parser.add_argument('--stf-count', type=int, help='Override STF count (generates exactly N files)')
0213 parser.add_argument('--physics-period-count', type=int, help='Override physics period count')
0214 parser.add_argument('--physics-period-duration', type=float, help='Override physics period duration')
0215 parser.add_argument('--stf-interval', type=float, help='Override STF interval')
0216 parser.add_argument('--realtime', action='store_true',
0217 help='Run simulation in real-time (1 sim second = 1 wall-clock second)')
0218
0219 args = parser.parse_args()
0220
0221
0222 workflow_params = {}
0223 if args.stf_count is not None:
0224 workflow_params['stf_count'] = args.stf_count
0225 if args.physics_period_count is not None:
0226 workflow_params['physics_period_count'] = args.physics_period_count
0227 if args.physics_period_duration is not None:
0228 workflow_params['physics_period_duration'] = args.physics_period_duration
0229 if args.stf_interval is not None:
0230 workflow_params['stf_interval'] = args.stf_interval
0231
0232
0233 simulator = WorkflowSimulatorAgent(
0234 workflow_name=args.workflow_name,
0235 config_name=args.workflow_config,
0236 duration=args.duration,
0237 config_path=args.testbed_config,
0238 **workflow_params
0239 )
0240
0241
0242 try:
0243 simulator.logger.info(f"Starting workflow execution: {args.workflow_name}")
0244
0245
0246 execution_id = simulator.workflow_runner.run_workflow(
0247 workflow_name=args.workflow_name,
0248 config_name=args.workflow_config,
0249 duration=args.duration,
0250 realtime=args.realtime,
0251 **workflow_params
0252 )
0253
0254 simulator.logger.info(f"Workflow completed successfully: {execution_id}")
0255
0256 except Exception as e:
0257 simulator.logger.error(f"Workflow execution failed: {e}")
0258 sys.exit(1)
0259
0260
0261 if __name__ == "__main__":
0262 main()