Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:13

0001 #!/usr/bin/env python3
0002 """
0003 Workflow Simulator Agent - Executes workflow definitions using WorkflowRunner
0004 Integrates SimPy-based workflows with SWF agent infrastructure
0005 """
0006 
0007 import os
0008 import sys
0009 import json
0010 import logging
0011 import argparse
0012 from pathlib import Path
0013 
0014 def setup_environment():
0015     """Auto-activate venv and load environment variables."""
0016     script_dir = Path(__file__).resolve().parent.parent  # Go up to swf-testbed root
0017 
0018     # Auto-activate virtual environment if not already active
0019     if "VIRTUAL_ENV" not in os.environ:
0020         venv_path = script_dir / ".venv"
0021         if venv_path.exists():
0022             print("🔧 Auto-activating virtual environment...")
0023             venv_python = venv_path / "bin" / "python"
0024             if venv_python.exists():
0025                 os.environ["VIRTUAL_ENV"] = str(venv_path)
0026                 os.environ["PATH"] = f"{venv_path}/bin:{os.environ['PATH']}"
0027                 sys.executable = str(venv_python)
0028         else:
0029             print("❌ Error: No Python virtual environment found")
0030             return False
0031 
0032     # Load ~/.env environment variables (they're already exported)
0033     env_file = Path.home() / ".env"
0034     if env_file.exists():
0035         print("🔧 Loading environment variables from ~/.env...")
0036         with open(env_file) as f:
0037             for line in f:
0038                 line = line.strip()
0039                 if line and not line.startswith('#') and '=' in line:
0040                     if line.startswith('export '):
0041                         line = line[7:]  # Remove 'export '
0042                     key, value = line.split('=', 1)
0043                     value = value.strip('"\'')
0044                     # Skip entries with unexpanded shell variables
0045                     if '$' in value:
0046                         continue
0047                     os.environ[key] = value
0048 
0049     # Unset proxy variables to prevent localhost routing through proxy
0050     for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
0051         if proxy_var in os.environ:
0052             del os.environ[proxy_var]
0053 
0054     return True
0055 
0056 if __name__ == "__main__":
0057     if not setup_environment():
0058         sys.exit(1)
0059 
0060 # Import after environment setup
0061 from swf_common_lib.base_agent import BaseAgent
0062 sys.path.append(str(Path(__file__).parent.parent / "workflows"))
0063 from workflow_runner import WorkflowRunner
0064 
0065 
0066 class WorkflowSimulatorAgent(BaseAgent):
0067     """Agent that executes workflows using WorkflowRunner"""
0068 
0069     def __init__(self, workflow_name, config_name=None, duration=3600, config_path=None, **workflow_params):
0070         super().__init__(agent_type='WORKFLOW_SIMULATOR', subscription_queue='/queue/workflow_simulator',
0071                          config_path=config_path)
0072 
0073         self.workflow_name = workflow_name
0074         self.config_name = config_name
0075         self.duration = duration
0076         self.workflow_params = workflow_params
0077         self.execution_id = None
0078         self.config_path = config_path
0079 
0080         self.workflow_runner = WorkflowRunner(
0081             self.monitor_url, config_path=config_path, workflow_name='STF_Datataking'
0082         )
0083 
0084         # Enhanced status for workflow execution
0085         self.workflow_status = "initialized"
0086         self.current_execution = None
0087 
0088     def on_message(self, frame):
0089         """Handle incoming workflow control messages"""
0090         try:
0091             message_data, msg_type = self.log_received_message(frame)
0092 
0093             if msg_type == 'start_workflow':
0094                 self.handle_start_workflow(message_data)
0095             elif msg_type == 'stop_workflow':
0096                 self.handle_stop_workflow(message_data)
0097             elif msg_type == 'workflow_status_request':
0098                 self.handle_status_request(message_data)
0099             else:
0100                 self.logger.info(f"Workflow simulator received unhandled message type: {msg_type}")
0101 
0102         except Exception as e:
0103             self.logger.error(f"Error processing message: {e}")
0104             self.workflow_status = "error"
0105             self.send_enhanced_heartbeat({
0106                 'workflow_status': self.workflow_status,
0107                 'error': str(e)
0108             })
0109 
0110     def handle_start_workflow(self, message_data):
0111         """Start workflow execution"""
0112         try:
0113             self.logger.info(f"Starting workflow: {self.workflow_name}")
0114             self.workflow_status = "running"
0115 
0116             # Send enhanced heartbeat with workflow metadata
0117             self.send_enhanced_heartbeat({
0118                 'workflow_status': self.workflow_status,
0119                 'workflow_name': self.workflow_name,
0120                 'execution_phase': 'starting'
0121             })
0122 
0123             # Execute workflow
0124             self.execution_id = self.workflow_runner.run_workflow(
0125                 workflow_name=self.workflow_name,
0126                 config_name=self.config_name,
0127                 duration=self.duration,
0128                 **self.workflow_params
0129             )
0130 
0131             self.workflow_status = "completed"
0132             self.logger.info(f"Workflow completed successfully: {self.execution_id}")
0133 
0134             # Send completion heartbeat
0135             self.send_enhanced_heartbeat({
0136                 'workflow_status': self.workflow_status,
0137                 'execution_id': self.execution_id,
0138                 'execution_phase': 'completed'
0139             })
0140 
0141             # Broadcast workflow completion
0142             self.broadcast_workflow_status('workflow_completed', {
0143                 'execution_id': self.execution_id,
0144                 'workflow_name': self.workflow_name,
0145                 'status': 'completed'
0146             })
0147 
0148         except Exception as e:
0149             self.logger.error(f"Workflow execution failed: {e}")
0150             self.workflow_status = "failed"
0151             self.send_enhanced_heartbeat({
0152                 'workflow_status': self.workflow_status,
0153                 'error': str(e),
0154                 'execution_phase': 'failed'
0155             })
0156 
0157             # Broadcast failure
0158             self.broadcast_workflow_status('workflow_failed', {
0159                 'workflow_name': self.workflow_name,
0160                 'status': 'failed',
0161                 'error': str(e)
0162             })
0163 
0164     def handle_stop_workflow(self, message_data):
0165         """Handle workflow stop request"""
0166         self.logger.info("Received workflow stop request")
0167         self.workflow_status = "stopped"
0168         self.send_enhanced_heartbeat({
0169             'workflow_status': self.workflow_status,
0170             'execution_phase': 'stopped'
0171         })
0172 
0173     def handle_status_request(self, message_data):
0174         """Handle workflow status request"""
0175         self.logger.info("Sending workflow status")
0176         self.broadcast_workflow_status('workflow_status_response', {
0177             'workflow_name': self.workflow_name,
0178             'status': self.workflow_status,
0179             'execution_id': self.execution_id
0180         })
0181 
0182     def broadcast_workflow_status(self, msg_type, data):
0183         """Broadcast workflow status to other agents"""
0184         message = {
0185             'msg_type': msg_type,
0186             'timestamp': json.dumps(str(self.get_current_time()), default=str),
0187             'agent_name': self.agent_name,
0188             **data
0189         }
0190 
0191         # Broadcast to workflow status queue
0192         self.send_message('/queue/workflow_status', message)
0193         self.logger.info(f"Broadcast {msg_type}: {data}")
0194 
0195     def get_current_time(self):
0196         """Get current timestamp"""
0197         from datetime import datetime
0198         return datetime.now()
0199 
0200 
0201 
0202 def main():
0203     """Main entry point with command line argument support"""
0204     script_dir = Path(__file__).parent
0205 
0206     parser = argparse.ArgumentParser(description='Workflow Simulator Agent')
0207     parser.add_argument('workflow_name', help='Name of the workflow to execute')
0208     parser.add_argument('--testbed-config', default=str(script_dir / 'testbed.toml'),
0209                         help='Testbed config file (default: testbed.toml)')
0210     parser.add_argument('--workflow-config', help='Workflow configuration file name')
0211     parser.add_argument('--duration', type=float, default=0, help='Max duration in seconds (0 = run until workflow completes)')
0212     parser.add_argument('--stf-count', type=int, help='Override STF count (generates exactly N files)')
0213     parser.add_argument('--physics-period-count', type=int, help='Override physics period count')
0214     parser.add_argument('--physics-period-duration', type=float, help='Override physics period duration')
0215     parser.add_argument('--stf-interval', type=float, help='Override STF interval')
0216     parser.add_argument('--realtime', action='store_true',
0217                         help='Run simulation in real-time (1 sim second = 1 wall-clock second)')
0218 
0219     args = parser.parse_args()
0220 
0221     # Build workflow parameters from arguments
0222     workflow_params = {}
0223     if args.stf_count is not None:
0224         workflow_params['stf_count'] = args.stf_count
0225     if args.physics_period_count is not None:
0226         workflow_params['physics_period_count'] = args.physics_period_count
0227     if args.physics_period_duration is not None:
0228         workflow_params['physics_period_duration'] = args.physics_period_duration
0229     if args.stf_interval is not None:
0230         workflow_params['stf_interval'] = args.stf_interval
0231 
0232     # Create workflow simulator
0233     simulator = WorkflowSimulatorAgent(
0234         workflow_name=args.workflow_name,
0235         config_name=args.workflow_config,
0236         duration=args.duration,
0237         config_path=args.testbed_config,
0238         **workflow_params
0239     )
0240 
0241     # Run workflow directly
0242     try:
0243         simulator.logger.info(f"Starting workflow execution: {args.workflow_name}")
0244 
0245         # Execute workflow
0246         execution_id = simulator.workflow_runner.run_workflow(
0247             workflow_name=args.workflow_name,
0248             config_name=args.workflow_config,
0249             duration=args.duration,
0250             realtime=args.realtime,
0251             **workflow_params
0252         )
0253 
0254         simulator.logger.info(f"Workflow completed successfully: {execution_id}")
0255 
0256     except Exception as e:
0257         simulator.logger.error(f"Workflow execution failed: {e}")
0258         sys.exit(1)
0259 
0260 
0261 if __name__ == "__main__":
0262     main()