Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 #!/usr/bin/env python3
0002 """
0003 Workflow Runner - Executes workflow definitions with SimPy
0004 
0005 Default mode: Persistent agent listening for workflow commands on 'workflow_control' queue.
0006 CLI mode (--run-once): Execute single workflow and exit.
0007 """
0008 
0009 import os
0010 import re
0011 import subprocess
0012 import sys
0013 import json
0014 import tomllib
0015 import threading
0016 from pathlib import Path
0017 from datetime import datetime
0018 from typing import Optional, Dict, Any
0019 
0020 
0021 def setup_environment():
0022     """Auto-activate venv and load environment variables."""
0023     script_dir = Path(__file__).resolve().parent.parent  # Go up to swf-testbed root
0024 
0025     # Auto-activate virtual environment if not already active
0026     if "VIRTUAL_ENV" not in os.environ:
0027         venv_path = script_dir / ".venv"
0028         if venv_path.exists():
0029             print("Auto-activating virtual environment...")
0030             venv_python = venv_path / "bin" / "python"
0031             if venv_python.exists():
0032                 os.environ["VIRTUAL_ENV"] = str(venv_path)
0033                 os.environ["PATH"] = f"{venv_path}/bin:{os.environ['PATH']}"
0034                 sys.executable = str(venv_python)
0035         else:
0036             print("Error: No Python virtual environment found")
0037             return False
0038 
0039     # Load ~/.env environment variables
0040     env_file = Path.home() / ".env"
0041     if env_file.exists():
0042         print("Loading environment variables from ~/.env...")
0043         with open(env_file) as f:
0044             for line in f:
0045                 line = line.strip()
0046                 if line and not line.startswith('#') and '=' in line:
0047                     if line.startswith('export '):
0048                         line = line[7:]
0049                     key, value = line.split('=', 1)
0050                     value = value.strip('"\'')
0051                     if '$' in value:
0052                         continue
0053                     os.environ[key] = value
0054 
0055     # Unset proxy variables for localhost connections
0056     for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
0057         if proxy_var in os.environ:
0058             del os.environ[proxy_var]
0059 
0060     return True
0061 
0062 
0063 if __name__ == "__main__":
0064     if not setup_environment():
0065         sys.exit(1)
0066 
0067 # Project imports (after environment setup)
0068 import simpy
0069 sys.path.insert(0, str(Path(__file__).parent.parent.parent / "swf-common-lib" / "src"))
0070 from swf_common_lib.base_agent import BaseAgent
0071 from swf_common_lib.api_utils import ensure_namespace
0072 
0073 
0074 def get_github_source_info(file_path: Path) -> Optional[Dict[str, str]]:
0075     """
0076     Discover GitHub source info for a file in a git checkout.
0077     Returns dict with org, repo, script_path, branch, or None if not in a git repo.
0078     """
0079     try:
0080         file_path = Path(file_path).resolve()
0081         file_dir = file_path.parent
0082 
0083         # Get git root
0084         result = subprocess.run(
0085             ['git', 'rev-parse', '--show-toplevel'],
0086             cwd=file_dir, capture_output=True, text=True
0087         )
0088         if result.returncode != 0:
0089             return None
0090         git_root = Path(result.stdout.strip())
0091 
0092         # Get remote URL
0093         result = subprocess.run(
0094             ['git', 'remote', 'get-url', 'origin'],
0095             cwd=git_root, capture_output=True, text=True
0096         )
0097         if result.returncode != 0:
0098             return None
0099         remote_url = result.stdout.strip()
0100 
0101         # Parse org/repo from URL (handles https and ssh formats)
0102         match = re.search(r'github\.com[:/]([^/]+)/([^/]+?)(?:\.git)?$', remote_url)
0103         if not match:
0104             return None
0105         org, repo = match.groups()
0106 
0107         # Get relative path within repo
0108         script_path = str(file_path.relative_to(git_root))
0109 
0110         # Get current branch
0111         result = subprocess.run(
0112             ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
0113             cwd=git_root, capture_output=True, text=True
0114         )
0115         branch = result.stdout.strip() if result.returncode == 0 else 'main'
0116 
0117         return {
0118             'org': org,
0119             'repo': repo,
0120             'script_path': script_path,
0121             'branch': branch
0122         }
0123     except Exception:
0124         return None
0125 
0126 
0127 def get_git_version(directory: Path) -> Optional[Dict[str, str]]:
0128     """
0129     Get git version info for a directory.
0130     Returns dict with commit, tag (if on tag), branch.
0131     """
0132     try:
0133         directory = Path(directory).resolve()
0134 
0135         # Get commit hash
0136         result = subprocess.run(
0137             ['git', 'rev-parse', 'HEAD'],
0138             cwd=directory, capture_output=True, text=True
0139         )
0140         if result.returncode != 0:
0141             return None
0142         commit = result.stdout.strip()
0143 
0144         # Get tag if on one
0145         result = subprocess.run(
0146             ['git', 'describe', '--tags', '--exact-match'],
0147             cwd=directory, capture_output=True, text=True
0148         )
0149         tag = result.stdout.strip() if result.returncode == 0 else None
0150 
0151         # Get branch
0152         result = subprocess.run(
0153             ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
0154             cwd=directory, capture_output=True, text=True
0155         )
0156         branch = result.stdout.strip() if result.returncode == 0 else None
0157 
0158         version_info = {'commit': commit}
0159         if tag:
0160             version_info['tag'] = tag
0161         if branch:
0162             version_info['branch'] = branch
0163 
0164         return version_info
0165     except Exception:
0166         return None
0167 
0168 
0169 class WorkflowRunner(BaseAgent):
0170     """
0171     Loads, registers, and executes workflow definitions.
0172 
0173     Operates in two modes:
0174     - Persistent mode (default): Runs as agent listening for workflow commands
0175     - CLI mode (--run-once): Executes single workflow and exits
0176     """
0177 
0178     # Message types this agent handles
0179     COMMAND_MESSAGE_TYPES = {'run_workflow', 'stop_workflow', 'status_request'}
0180 
0181     def __init__(self, monitor_url: Optional[str] = None, debug: bool = False,
0182                  config_path: Optional[str] = None, workflow_name: Optional[str] = None):
0183         """
0184         Initialize WorkflowRunner as an agent
0185 
0186         Args:
0187             monitor_url: Optional override for SWF monitor URL (uses env if not provided)
0188             debug: Enable debug logging
0189             config_path: Path to testbed.toml config file
0190             workflow_name: Name of workflow to run (used for agent_type registration)
0191         """
0192         if workflow_name:
0193             agent_type = workflow_name.replace(' ', '_')
0194         else:
0195             agent_type = 'DAQ_Simulator'
0196 
0197         super().__init__(
0198             agent_type=agent_type,
0199             subscription_queue='/queue/workflow_control',
0200             debug=debug,
0201             config_path=config_path
0202         )
0203 
0204         # Track current workflow execution
0205         self.current_execution_id = None
0206         self.current_workflow_name = None
0207         self.workflow_thread = None
0208         self.stop_event = threading.Event()
0209 
0210         # Override monitor_url if provided
0211         if monitor_url:
0212             self.monitor_url = monitor_url.rstrip('/')
0213 
0214         # Use self.api from BaseAgent (already configured with auth)
0215         self.api_session = self.api
0216         self.workflows_dir = Path(__file__).parent  # workflows/ directory
0217 
0218         # Load testbed config overrides (all sections including [testbed] for namespace)
0219         self.testbed_overrides = {}
0220         if self.config_path:
0221             testbed_config_file = Path(self.config_path)
0222             if testbed_config_file.exists():
0223                 with open(testbed_config_file, 'rb') as f:
0224                     testbed_config = tomllib.load(f)
0225                     for section, values in testbed_config.items():
0226                         if isinstance(values, dict):
0227                             self.testbed_overrides[section] = values
0228 
0229         # Connect to ActiveMQ (we only send, don't need to subscribe)
0230         self.conn.connect(
0231             self.mq_user,
0232             self.mq_password,
0233             wait=True,
0234             version='1.1',
0235             headers={
0236                 'client-id': self.agent_name,
0237                 'heart-beat': '30000,30000'
0238             }
0239         )
0240         self.mq_connected = True
0241 
0242         # Register agent in SystemAgent table
0243         self.send_heartbeat()
0244 
0245         self.logger.info(f"WorkflowRunner initialized and connected to ActiveMQ: {self.agent_name}")
0246 
0247     def run_workflow(self, workflow_name: str, config_name: Optional[str] = None,
0248                      duration: float = 3600, realtime: bool = False, **override_params) -> str:
0249         """
0250         Run a workflow by name
0251 
0252         Args:
0253             workflow_name: Name of the workflow (e.g., 'stf_processing')
0254             config_name: Name of config file (defaults to {workflow_name}_default.toml)
0255             duration: Simulation duration in seconds
0256             realtime: If True, run in real-time mode (1 sim second = 1 wall-clock second)
0257             **override_params: Parameters to override from config
0258 
0259         Returns:
0260             execution_id: The ID of the workflow execution
0261         """
0262         # Load workflow definition and config
0263         workflow_code = self._load_workflow_code(workflow_name)
0264         config = self._load_workflow_config(workflow_name, config_name)
0265 
0266         # Apply testbed config overrides (by section)
0267         for section, values in self.testbed_overrides.items():
0268             if section in config:
0269                 config[section].update(values)
0270             else:
0271                 config[section] = values
0272 
0273         # Apply CLI parameter overrides (highest priority) to all matching sections
0274         if override_params:
0275             for section, values in config.items():
0276                 if section != 'workflow' and isinstance(values, dict):
0277                     for key, value in override_params.items():
0278                         if key in values:
0279                             values[key] = value
0280 
0281         # Generate execution ID
0282         executed_by = os.getenv('USER', 'unknown')
0283         execution_id = self._generate_execution_id(workflow_name, executed_by)
0284 
0285         # Track for status reporting
0286         self.current_execution_id = execution_id
0287 
0288         # Register/update workflow definition in database
0289         workflow_file = self.workflows_dir / f"{workflow_name}.py"
0290         self._register_workflow_definition(
0291             name=config['workflow']['name'],
0292             version=config['workflow']['version'],
0293             code=workflow_code,
0294             config=config,
0295             workflow_file=workflow_file
0296         )
0297 
0298         # Create execution record (store full config for auditability)
0299         self._create_execution_record(
0300             execution_id=execution_id,
0301             workflow_name=config['workflow']['name'],
0302             workflow_version=config['workflow']['version'],
0303             config=config
0304         )
0305 
0306         # Execute the workflow
0307         self._execute_workflow(
0308             execution_id=execution_id,
0309             workflow_code=workflow_code,
0310             config=config,
0311             duration=duration,
0312             realtime=realtime
0313         )
0314 
0315         # Update execution status to completed
0316         self._update_execution_status(execution_id, 'completed')
0317 
0318         return execution_id
0319 
0320     def _load_workflow_code(self, workflow_name: str) -> str:
0321         """Load workflow Python code from file"""
0322         workflow_file = self.workflows_dir / f"{workflow_name}.py"
0323         if not workflow_file.exists():
0324             raise FileNotFoundError(f"Workflow {workflow_name}.py not found in {self.workflows_dir}")
0325 
0326         with open(workflow_file, 'r') as f:
0327             return f.read()
0328 
0329     def _load_workflow_config(self, workflow_name: str, config_name: Optional[str] = None) -> Dict[str, Any]:
0330         """Load workflow TOML configuration with includes support.
0331 
0332         Config files use descriptive section names (e.g., [daq_state_machine], [fast_processing]).
0333         Included configs are loaded first, then main config sections are added/merged.
0334         Sections are kept intact for explicit access by workflow code.
0335         """
0336         if config_name is None:
0337             config_name = f"{workflow_name}_default.toml"
0338         elif not config_name.endswith('.toml'):
0339             config_name = f"{config_name}.toml"
0340 
0341         config_file = self.workflows_dir / config_name
0342         if not config_file.exists():
0343             raise FileNotFoundError(f"Config {config_name} not found in {self.workflows_dir}")
0344 
0345         # Load main config
0346         with open(config_file, 'rb') as f:
0347             main_config = tomllib.load(f)
0348 
0349         # Check for includes in workflow section
0350         includes = main_config.get('workflow', {}).get('includes', [])
0351 
0352         if not includes:
0353             return main_config
0354 
0355         # Load included configs and add their sections to main_config
0356         for include_file in includes:
0357             include_path = self.workflows_dir / include_file
0358             if not include_path.exists():
0359                 raise FileNotFoundError(f"Included config {include_file} not found in {self.workflows_dir}")
0360 
0361             with open(include_path, 'rb') as f:
0362                 included_config = tomllib.load(f)
0363                 # Add each section from included file (don't overwrite existing)
0364                 for section, values in included_config.items():
0365                     if section != 'workflow' and section not in main_config:
0366                         main_config[section] = values
0367 
0368         return main_config
0369 
0370     def _generate_execution_id(self, workflow_name: str, executed_by: str) -> str:
0371         """Generate human-readable execution ID"""
0372         # Get next ID from persistent state API
0373         response = self.api_session.post(
0374             f"{self.monitor_url}/api/state/next-workflow-execution-id/",
0375             json={'workflow_name': workflow_name}
0376         )
0377 
0378         if response.status_code == 200:
0379             data = response.json()
0380             sequence = data.get('sequence', 1)
0381         else:
0382             # NO RANDOM FALLBACK - get proper count from database
0383             print(f"WARNING: Persistent state API failed: {response.status_code}")
0384             count_response = self.api_session.get(
0385                 f"{self.monitor_url}/api/workflow-executions/",
0386                 params={'workflow_name': workflow_name}
0387             )
0388             if count_response.status_code == 200:
0389                 count_data = count_response.json()
0390                 if isinstance(count_data, dict) and 'results' in count_data:
0391                     sequence = len(count_data['results']) + 1
0392                 elif isinstance(count_data, list):
0393                     sequence = len(count_data) + 1
0394                 else:
0395                     sequence = 1
0396             else:
0397                 print(f"ERROR: Cannot get execution count: {count_response.status_code}")
0398                 raise Exception(f"Failed to generate execution ID - API unavailable")
0399 
0400         # Format: workflow-username-NNNN
0401         return f"{workflow_name}-{executed_by}-{sequence:04d}"
0402 
0403     def _register_workflow_definition(self, name: str, version: str, code: str, config: Dict[str, Any],
0404                                        workflow_file: Path = None):
0405         """Register or update workflow definition in database"""
0406         # Check if workflow definition already exists
0407         check_url = f"{self.monitor_url}/api/workflow-definitions/"
0408         check_response = self.api_session.get(
0409             check_url,
0410             params={'workflow_name': name, 'version': version}
0411         )
0412 
0413         # Discover GitHub source info from workflow file
0414         source_info = get_github_source_info(workflow_file) if workflow_file else None
0415 
0416         # Build expanded config for database storage
0417         expanded_config = {
0418             'workflow': {
0419                 'name': config['workflow']['name'],
0420                 'version': config['workflow']['version']
0421             }
0422         }
0423         if source_info:
0424             expanded_config['source'] = source_info
0425         # Preserve description if present
0426         if 'description' in config['workflow']:
0427             expanded_config['workflow']['description'] = config['workflow']['description']
0428         # Copy all parameter sections (daq_state_machine, fast_processing, etc.)
0429         for section, values in config.items():
0430             if section != 'workflow' and isinstance(values, dict):
0431                 expanded_config[section] = values
0432 
0433         payload = {
0434             'workflow_name': name,
0435             'version': version,
0436             'workflow_type': 'simulation',
0437             'definition': code,
0438             'parameter_values': expanded_config,
0439             'created_by': os.getenv('USER', 'unknown'),
0440             'created_at': datetime.now().isoformat()
0441         }
0442 
0443         if check_response.status_code == 200:
0444             definitions = check_response.json()
0445             existing_definition = None
0446 
0447             # Handle both list and paginated response formats
0448             if isinstance(definitions, list):
0449                 existing_definition = definitions[0] if definitions else None
0450             else:
0451                 results = definitions.get('results', [])
0452                 existing_definition = results[0] if results else None
0453 
0454             if existing_definition:
0455                 # Definition is immutable - don't update, just return existing
0456                 return existing_definition
0457 
0458             # Create new definition
0459             response = self.api_session.post(
0460                 f"{self.monitor_url}/api/workflow-definitions/",
0461                 json=payload
0462             )
0463         else:
0464             # Create new definition
0465             response = self.api_session.post(
0466                 f"{self.monitor_url}/api/workflow-definitions/",
0467                 json=payload
0468             )
0469 
0470         if response.status_code not in [200, 201]:
0471             print(f"Error: Failed to register workflow definition: {response.status_code}")
0472             try:
0473                 error_detail = response.json()
0474                 print(f"API error response: {error_detail}")
0475             except:
0476                 print(f"Raw response: {response.text}")
0477             raise Exception(f"Failed to register workflow definition: {response.status_code}")
0478 
0479         return response.json()
0480 
0481     def _create_execution_record(self, execution_id: str, workflow_name: str,
0482                                  workflow_version: str, config: Dict[str, Any]):
0483         """Create workflow execution record with full config for auditability."""
0484         # Get workflow definition ID
0485         def_response = self.api_session.get(
0486             f"{self.monitor_url}/api/workflow-definitions/",
0487             params={'workflow_name': workflow_name, 'version': workflow_version}
0488         )
0489 
0490         if def_response.status_code != 200:
0491             print(f"Warning: Could not find workflow definition for {workflow_name} v{workflow_version}")
0492             return
0493 
0494         definitions = def_response.json()
0495         if not definitions:
0496             print(f"Warning: No workflow definition found for {workflow_name} v{workflow_version}")
0497             return
0498 
0499         # Handle both list and paginated response formats
0500         if isinstance(definitions, list):
0501             if not definitions:
0502                 print(f"Warning: No workflow definition found for {workflow_name} v{workflow_version}")
0503                 return
0504             workflow_definition_id = definitions[0]['id']
0505         else:
0506             results = definitions.get('results', [])
0507             if not results:
0508                 print(f"Warning: No workflow definition found for {workflow_name} v{workflow_version}")
0509                 return
0510             workflow_definition_id = results[0]['id']
0511 
0512         # Get namespace from testbed config and ensure it exists in database
0513         namespace = config.get('testbed', {}).get('namespace')
0514         if namespace:
0515             try:
0516                 ensure_namespace(self.monitor_url, self.api_session, namespace, logger=self.logger)
0517             except Exception:
0518                 pass  # Warning already logged by ensure_namespace
0519 
0520         # Add git version to config for auditability
0521         git_version = get_git_version(self.workflows_dir)
0522         config_with_version = dict(config)
0523         if git_version:
0524             config_with_version['git_version'] = git_version
0525 
0526         payload = {
0527             'execution_id': execution_id,
0528             'workflow_definition': workflow_definition_id,
0529             'namespace': namespace,
0530             'status': 'running',
0531             'executed_by': os.getenv('USER', 'unknown'),
0532             'start_time': datetime.now().isoformat(),
0533             'parameter_values': config_with_version
0534         }
0535 
0536         response = self.api_session.post(
0537             f"{self.monitor_url}/api/workflow-executions/",
0538             json=payload
0539         )
0540 
0541         if response.status_code not in [200, 201]:
0542             print(f"Error: Failed to create execution record: {response.status_code}")
0543             try:
0544                 error_detail = response.json()
0545                 print(f"API error response: {error_detail}")
0546             except:
0547                 print(f"Raw response: {response.text}")
0548             raise Exception(f"Failed to create execution record: {response.status_code}")
0549 
0550     def _on_simulation_step(self, env, execution_id: str) -> bool:
0551         """
0552         Called between simulation events.
0553 
0554         Override or extend this method to add per-step behavior such as:
0555         - Progress reporting to monitor
0556         - Periodic heartbeats during long workflows
0557         - Logging/metrics collection
0558 
0559         Args:
0560             env: SimPy environment
0561             execution_id: Current execution ID
0562 
0563         Returns:
0564             True to continue simulation, False to stop
0565         """
0566         # Check stop flag
0567         if self.stop_event.is_set():
0568             self.logger.info("Stop requested - ending simulation")
0569             return False
0570 
0571         return True
0572 
0573     def _execute_workflow(self, execution_id: str, workflow_code: str,
0574                          config: Dict[str, Any], duration: float,
0575                          realtime: bool = False):
0576         """Execute workflow using SimPy with step callback support.
0577 
0578         Args:
0579             execution_id: Unique identifier for this execution
0580             workflow_code: Python code containing WorkflowExecutor class
0581             config: Full workflow config with descriptive sections
0582             duration: Simulation duration in seconds
0583             realtime: If True, use RealtimeEnvironment (1 sim sec = 1 wall sec)
0584 
0585         The simulation calls _on_simulation_step() between events to allow
0586         graceful stopping and other per-step behaviors.
0587         """
0588         # Create SimPy environment
0589         if realtime:
0590             # RealtimeEnvironment ties simulation time to wall-clock time
0591             # factor=1 means 1 simulation second = 1 real second
0592             self.logger.info("Using real-time simulation mode")
0593             env = simpy.rt.RealtimeEnvironment(factor=1, strict=False)
0594         else:
0595             # Standard discrete-event simulation (runs as fast as possible)
0596             env = simpy.Environment()
0597 
0598         # Prepare execution namespace with runner access
0599         namespace = {
0600             'env': env,
0601             'config': config,
0602             'runner': self,
0603             'execution_id': execution_id
0604         }
0605 
0606         # Execute workflow code to get WorkflowExecutor class
0607         exec(workflow_code, namespace)
0608 
0609         # Instantiate and run workflow
0610         if 'WorkflowExecutor' in namespace:
0611             # Pass config, runner (for messaging), and execution_id
0612             executor = namespace['WorkflowExecutor'](
0613                 config=config,
0614                 runner=self,
0615                 execution_id=execution_id
0616             )
0617 
0618             # Start workflow process
0619             workflow_process = env.process(executor.execute(env))
0620 
0621             # Run simulation with step callbacks
0622             end_time = duration if duration and duration > 0 else float('inf')
0623 
0624             while True:
0625                 # Step callback - check stop flag and other per-step actions
0626                 if not self._on_simulation_step(env, execution_id):
0627                     break
0628 
0629                 # Check if workflow process completed
0630                 if workflow_process.processed:
0631                     break
0632 
0633                 # Check duration limit
0634                 if env.now >= end_time:
0635                     self.logger.info(f"Duration limit reached: {duration}s")
0636                     break
0637 
0638                 # Run next simulation event
0639                 try:
0640                     env.step()
0641                 except simpy.core.EmptySchedule:
0642                     # No more events - simulation complete
0643                     break
0644         else:
0645             raise ValueError("WorkflowExecutor class not found in workflow code")
0646 
0647     def _update_execution_status(self, execution_id: str, status: str):
0648         """Update workflow execution status"""
0649         payload = {
0650             'status': status,
0651             'end_time': datetime.now().isoformat() if status == 'completed' else None
0652         }
0653 
0654         response = self.api_session.patch(
0655             f"{self.monitor_url}/api/workflow-executions/{execution_id}/",
0656             json=payload
0657         )
0658 
0659         if response.status_code != 200:
0660             print(f"Warning: Failed to update execution status: {response.status_code}")
0661 
0662     def initialize_state(self, state_id: int, execution_id: str, config: dict = None):
0663         """
0664         Initialize state machine record for workflow execution.
0665         Currently creates RunState (run-level subset).
0666         Future: broader state machine tracking.
0667 
0668         Args:
0669             state_id: Run number / state identifier
0670             execution_id: Workflow execution ID
0671             config: Full workflow config for extracting workflow-specific params
0672         """
0673         # Extract workflow-specific params for metadata
0674         workflow_params = {}
0675         for section in ['fast_processing', 'daq_state_machine']:
0676             if config and section in config:
0677                 workflow_params.update(config[section])
0678 
0679         state_data = {
0680             'run_number': state_id,
0681             'phase': 'initializing',
0682             'state': 'imminent',
0683             'substate': 'preparing',
0684             'target_worker_count': workflow_params.get('target_worker_count', 0),
0685             'active_worker_count': 0,
0686             'stf_samples_received': 0,
0687             'slices_created': 0,
0688             'slices_queued': 0,
0689             'slices_processing': 0,
0690             'slices_completed': 0,
0691             'slices_failed': 0,
0692             'state_changed_at': datetime.now().isoformat(),
0693             'metadata': {
0694                 'execution_id': execution_id,
0695                 'stf_sampling_rate': workflow_params.get('stf_sampling_rate'),
0696                 'slices_per_sample': workflow_params.get('slices_per_sample')
0697             }
0698         }
0699 
0700         response = self.api_session.post(
0701             f"{self.monitor_url}/api/run-states/",
0702             json=state_data
0703         )
0704 
0705         if response.status_code in [200, 201]:
0706             self.logger.info(f"State initialized: {state_id}")
0707             return True
0708 
0709         self.logger.error(f"Failed to initialize state: {response.status_code}")
0710         try:
0711             self.logger.error(f"Response: {response.json()}")
0712         except Exception:
0713             self.logger.error(f"Response: {response.text}")
0714         return False
0715 
0716     # -------------------------------------------------------------------------
0717     # Persistent Agent Mode - Message Handling
0718     # -------------------------------------------------------------------------
0719 
0720     def on_message(self, frame):
0721         """
0722         Handle incoming workflow control messages.
0723 
0724         Supported commands:
0725         - run_workflow: Start a workflow execution
0726         - stop_workflow: Stop current workflow (future)
0727         - status_request: Report current status
0728         """
0729         message_data, msg_type = self.log_received_message(
0730             frame, known_types=self.COMMAND_MESSAGE_TYPES
0731         )
0732 
0733         # Namespace filtering - log_received_message returns (None, None) if filtered
0734         if message_data is None:
0735             return
0736 
0737         if msg_type == 'run_workflow':
0738             self._handle_run_workflow(message_data)
0739         elif msg_type == 'stop_workflow':
0740             self._handle_stop_workflow(message_data)
0741         elif msg_type == 'status_request':
0742             self._handle_status_request(message_data)
0743         else:
0744             self.logger.debug(f"Ignoring unhandled message type: {msg_type}")
0745 
0746     def _handle_run_workflow(self, message_data: Dict[str, Any]):
0747         """Handle run_workflow command - starts workflow in background thread."""
0748         # Check if already running a workflow
0749         if self.operational_state == 'PROCESSING':
0750             self.logger.warning(
0751                 f"Cannot start workflow - already running: {self.current_workflow_name} "
0752                 f"(execution: {self.current_execution_id})"
0753             )
0754             return
0755 
0756         # Extract workflow parameters from message
0757         workflow_name = message_data.get('workflow_name')
0758         if not workflow_name:
0759             self.logger.error("run_workflow message missing 'workflow_name'")
0760             return
0761 
0762         config_name = message_data.get('config')
0763         realtime = message_data.get('realtime', True)
0764         duration = message_data.get('duration', 0)
0765         params = message_data.get('params', {})
0766 
0767         # Clear stop flag and set state
0768         self.stop_event.clear()
0769         self.set_processing()
0770         self.current_workflow_name = workflow_name
0771 
0772         self.logger.info(
0773             f"Starting workflow: {workflow_name} (config: {config_name})",
0774             extra={'workflow_name': workflow_name}
0775         )
0776 
0777         # Start workflow in background thread
0778         self.workflow_thread = threading.Thread(
0779             target=self._run_workflow_thread,
0780             args=(workflow_name, config_name, duration, realtime, params),
0781             name=f"workflow-{workflow_name}",
0782             daemon=True
0783         )
0784         self.workflow_thread.start()
0785 
0786     def _run_workflow_thread(self, workflow_name: str, config_name: Optional[str],
0787                              duration: float, realtime: bool, params: Dict[str, Any]):
0788         """Run workflow in background thread with proper cleanup."""
0789         try:
0790             execution_id = self.run_workflow(
0791                 workflow_name=workflow_name,
0792                 config_name=config_name,
0793                 duration=duration,
0794                 realtime=realtime,
0795                 **params
0796             )
0797 
0798             if self.stop_event.is_set():
0799                 self.logger.info(
0800                     f"Workflow stopped: {workflow_name} (execution: {execution_id})",
0801                     extra={'execution_id': execution_id, 'workflow_name': workflow_name}
0802                 )
0803                 # Mark as terminated in database
0804                 self._update_execution_status(execution_id, 'terminated')
0805             else:
0806                 self.logger.info(
0807                     f"Workflow completed: {workflow_name} (execution: {execution_id})",
0808                     extra={'execution_id': execution_id, 'workflow_name': workflow_name}
0809                 )
0810 
0811         except Exception as e:
0812             exec_id = getattr(self, 'current_execution_id', None)
0813             self.logger.error(
0814                 f"Workflow failed: {workflow_name} (execution: {exec_id or 'unknown'}): {e}",
0815                 extra={'execution_id': exec_id, 'workflow_name': workflow_name}
0816             )
0817             if exec_id:
0818                 self._update_execution_status(exec_id, 'failed')
0819 
0820         finally:
0821             self.current_execution_id = None
0822             self.current_workflow_name = None
0823             self.workflow_thread = None
0824             self.set_ready()
0825 
0826     def _handle_stop_workflow(self, message_data: Dict[str, Any]):
0827         """Handle stop_workflow command - signals workflow to stop gracefully."""
0828         if self.operational_state != 'PROCESSING':
0829             self.logger.info("No workflow running to stop")
0830             return
0831 
0832         # Check execution_id if provided (for targeted stop)
0833         requested_exec_id = message_data.get('execution_id')
0834         if requested_exec_id and requested_exec_id != self.current_execution_id:
0835             self.logger.info(
0836                 f"Stop request for {requested_exec_id} ignored - "
0837                 f"current execution is {self.current_execution_id}"
0838             )
0839             return
0840 
0841         self.logger.info(
0842             f"Stopping workflow: {self.current_workflow_name} (execution: {self.current_execution_id})",
0843             extra={'execution_id': self.current_execution_id, 'workflow_name': self.current_workflow_name}
0844         )
0845         self.stop_event.set()
0846 
0847     def _handle_status_request(self, message_data: Dict[str, Any]):
0848         """Handle status_request command - logs current status."""
0849         self.logger.info(
0850             f"Status: state={self.operational_state}, "
0851             f"workflow={self.current_workflow_name}, "
0852             f"execution={self.current_execution_id}"
0853         )
0854 
0855 
0856 # =============================================================================
0857 # CLI Entry Point
0858 # =============================================================================
0859 
0860 def main():
0861     """
0862     Main entry point for WorkflowRunner.
0863 
0864     Default mode: Persistent agent listening for workflow commands.
0865     --run-once mode: Execute single workflow and exit (backward compat).
0866     """
0867     import argparse
0868 
0869     script_dir = Path(__file__).parent
0870 
0871     parser = argparse.ArgumentParser(
0872         description='Workflow Runner - DAQ Simulator Agent',
0873         formatter_class=argparse.RawDescriptionHelpFormatter,
0874         epilog="""
0875 Examples:
0876   # Persistent mode (default) - listens for commands
0877   python workflow_runner.py
0878 
0879   # Run single workflow and exit
0880   python workflow_runner.py --run-once stf_datataking --stf-count 5
0881 
0882   # Run with specific config
0883   python workflow_runner.py --run-once stf_datataking --workflow-config fast_processing_default
0884         """
0885     )
0886 
0887     parser.add_argument('--testbed-config', default=None,
0888                         help='Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)')
0889     parser.add_argument('--debug', action='store_true', help='Enable debug logging')
0890 
0891     # Mode selection
0892     parser.add_argument('--run-once', metavar='WORKFLOW',
0893                         help='Run single workflow and exit (CLI mode)')
0894 
0895     # Workflow parameters (only used with --run-once)
0896     parser.add_argument('--workflow-config', help='Workflow configuration file name')
0897     parser.add_argument('--duration', type=float, default=0,
0898                         help='Max duration in seconds (0 = run until complete)')
0899     parser.add_argument('--stf-count', type=int, help='Override STF count')
0900     parser.add_argument('--physics-period-count', type=int, help='Override physics period count')
0901     parser.add_argument('--physics-period-duration', type=float, help='Override physics period duration')
0902     parser.add_argument('--stf-interval', type=float, help='Override STF interval')
0903     parser.add_argument('--realtime', action='store_true', default=True,
0904                         help='Run in real-time mode (default: True)')
0905     parser.add_argument('--no-realtime', action='store_false', dest='realtime',
0906                         help='Run as fast as possible (discrete-event simulation)')
0907 
0908     args = parser.parse_args()
0909 
0910     # Build workflow parameters
0911     workflow_params = {}
0912     if args.stf_count is not None:
0913         workflow_params['stf_count'] = args.stf_count
0914     if args.physics_period_count is not None:
0915         workflow_params['physics_period_count'] = args.physics_period_count
0916     if args.physics_period_duration is not None:
0917         workflow_params['physics_period_duration'] = args.physics_period_duration
0918     if args.stf_interval is not None:
0919         workflow_params['stf_interval'] = args.stf_interval
0920 
0921     if args.run_once:
0922         # CLI mode: run single workflow and exit
0923         runner = WorkflowRunner(
0924             config_path=args.testbed_config,
0925             debug=args.debug,
0926             workflow_name=args.run_once
0927         )
0928 
0929         runner.set_processing()
0930         try:
0931             execution_id = runner.run_workflow(
0932                 workflow_name=args.run_once,
0933                 config_name=args.workflow_config,
0934                 duration=args.duration,
0935                 realtime=args.realtime,
0936                 **workflow_params
0937             )
0938             runner.logger.info(f"Workflow completed: {execution_id}")
0939         except Exception as e:
0940             runner.logger.error(f"Workflow failed: {e}")
0941             sys.exit(1)
0942         finally:
0943             runner.set_ready()
0944     else:
0945         # Persistent mode: run as agent listening for commands
0946         runner = WorkflowRunner(
0947             config_path=args.testbed_config,
0948             debug=args.debug
0949         )
0950         runner.run()
0951 
0952 
0953 if __name__ == "__main__":
0954     main()