File indexing completed on 2026-04-27 07:41:45
0001
0002 """
0003 Workflow Runner - Executes workflow definitions with SimPy
0004
0005 Default mode: Persistent agent listening for workflow commands on 'workflow_control' queue.
0006 CLI mode (--run-once): Execute single workflow and exit.
0007 """
0008
0009 import os
0010 import re
0011 import subprocess
0012 import sys
0013 import json
0014 import tomllib
0015 import threading
0016 from pathlib import Path
0017 from datetime import datetime
0018 from typing import Optional, Dict, Any
0019
0020
0021 def setup_environment():
0022 """Auto-activate venv and load environment variables."""
0023 script_dir = Path(__file__).resolve().parent.parent
0024
0025
0026 if "VIRTUAL_ENV" not in os.environ:
0027 venv_path = script_dir / ".venv"
0028 if venv_path.exists():
0029 print("Auto-activating virtual environment...")
0030 venv_python = venv_path / "bin" / "python"
0031 if venv_python.exists():
0032 os.environ["VIRTUAL_ENV"] = str(venv_path)
0033 os.environ["PATH"] = f"{venv_path}/bin:{os.environ['PATH']}"
0034 sys.executable = str(venv_python)
0035 else:
0036 print("Error: No Python virtual environment found")
0037 return False
0038
0039
0040 env_file = Path.home() / ".env"
0041 if env_file.exists():
0042 print("Loading environment variables from ~/.env...")
0043 with open(env_file) as f:
0044 for line in f:
0045 line = line.strip()
0046 if line and not line.startswith('#') and '=' in line:
0047 if line.startswith('export '):
0048 line = line[7:]
0049 key, value = line.split('=', 1)
0050 value = value.strip('"\'')
0051 if '$' in value:
0052 continue
0053 os.environ[key] = value
0054
0055
0056 for proxy_var in ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY']:
0057 if proxy_var in os.environ:
0058 del os.environ[proxy_var]
0059
0060 return True
0061
0062
0063 if __name__ == "__main__":
0064 if not setup_environment():
0065 sys.exit(1)
0066
0067
0068 import simpy
0069 sys.path.insert(0, str(Path(__file__).parent.parent.parent / "swf-common-lib" / "src"))
0070 from swf_common_lib.base_agent import BaseAgent
0071 from swf_common_lib.api_utils import ensure_namespace
0072
0073
0074 def get_github_source_info(file_path: Path) -> Optional[Dict[str, str]]:
0075 """
0076 Discover GitHub source info for a file in a git checkout.
0077 Returns dict with org, repo, script_path, branch, or None if not in a git repo.
0078 """
0079 try:
0080 file_path = Path(file_path).resolve()
0081 file_dir = file_path.parent
0082
0083
0084 result = subprocess.run(
0085 ['git', 'rev-parse', '--show-toplevel'],
0086 cwd=file_dir, capture_output=True, text=True
0087 )
0088 if result.returncode != 0:
0089 return None
0090 git_root = Path(result.stdout.strip())
0091
0092
0093 result = subprocess.run(
0094 ['git', 'remote', 'get-url', 'origin'],
0095 cwd=git_root, capture_output=True, text=True
0096 )
0097 if result.returncode != 0:
0098 return None
0099 remote_url = result.stdout.strip()
0100
0101
0102 match = re.search(r'github\.com[:/]([^/]+)/([^/]+?)(?:\.git)?$', remote_url)
0103 if not match:
0104 return None
0105 org, repo = match.groups()
0106
0107
0108 script_path = str(file_path.relative_to(git_root))
0109
0110
0111 result = subprocess.run(
0112 ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
0113 cwd=git_root, capture_output=True, text=True
0114 )
0115 branch = result.stdout.strip() if result.returncode == 0 else 'main'
0116
0117 return {
0118 'org': org,
0119 'repo': repo,
0120 'script_path': script_path,
0121 'branch': branch
0122 }
0123 except Exception:
0124 return None
0125
0126
0127 def get_git_version(directory: Path) -> Optional[Dict[str, str]]:
0128 """
0129 Get git version info for a directory.
0130 Returns dict with commit, tag (if on tag), branch.
0131 """
0132 try:
0133 directory = Path(directory).resolve()
0134
0135
0136 result = subprocess.run(
0137 ['git', 'rev-parse', 'HEAD'],
0138 cwd=directory, capture_output=True, text=True
0139 )
0140 if result.returncode != 0:
0141 return None
0142 commit = result.stdout.strip()
0143
0144
0145 result = subprocess.run(
0146 ['git', 'describe', '--tags', '--exact-match'],
0147 cwd=directory, capture_output=True, text=True
0148 )
0149 tag = result.stdout.strip() if result.returncode == 0 else None
0150
0151
0152 result = subprocess.run(
0153 ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
0154 cwd=directory, capture_output=True, text=True
0155 )
0156 branch = result.stdout.strip() if result.returncode == 0 else None
0157
0158 version_info = {'commit': commit}
0159 if tag:
0160 version_info['tag'] = tag
0161 if branch:
0162 version_info['branch'] = branch
0163
0164 return version_info
0165 except Exception:
0166 return None
0167
0168
0169 class WorkflowRunner(BaseAgent):
0170 """
0171 Loads, registers, and executes workflow definitions.
0172
0173 Operates in two modes:
0174 - Persistent mode (default): Runs as agent listening for workflow commands
0175 - CLI mode (--run-once): Executes single workflow and exits
0176 """
0177
0178
0179 COMMAND_MESSAGE_TYPES = {'run_workflow', 'stop_workflow', 'status_request'}
0180
0181 def __init__(self, monitor_url: Optional[str] = None, debug: bool = False,
0182 config_path: Optional[str] = None, workflow_name: Optional[str] = None):
0183 """
0184 Initialize WorkflowRunner as an agent
0185
0186 Args:
0187 monitor_url: Optional override for SWF monitor URL (uses env if not provided)
0188 debug: Enable debug logging
0189 config_path: Path to testbed.toml config file
0190 workflow_name: Name of workflow to run (used for agent_type registration)
0191 """
0192 if workflow_name:
0193 agent_type = workflow_name.replace(' ', '_')
0194 else:
0195 agent_type = 'DAQ_Simulator'
0196
0197 super().__init__(
0198 agent_type=agent_type,
0199 subscription_queue='/queue/workflow_control',
0200 debug=debug,
0201 config_path=config_path
0202 )
0203
0204
0205 self.current_execution_id = None
0206 self.current_workflow_name = None
0207 self.workflow_thread = None
0208 self.stop_event = threading.Event()
0209
0210
0211 if monitor_url:
0212 self.monitor_url = monitor_url.rstrip('/')
0213
0214
0215 self.api_session = self.api
0216 self.workflows_dir = Path(__file__).parent
0217
0218
0219 self.testbed_overrides = {}
0220 if self.config_path:
0221 testbed_config_file = Path(self.config_path)
0222 if testbed_config_file.exists():
0223 with open(testbed_config_file, 'rb') as f:
0224 testbed_config = tomllib.load(f)
0225 for section, values in testbed_config.items():
0226 if isinstance(values, dict):
0227 self.testbed_overrides[section] = values
0228
0229
0230 self.conn.connect(
0231 self.mq_user,
0232 self.mq_password,
0233 wait=True,
0234 version='1.1',
0235 headers={
0236 'client-id': self.agent_name,
0237 'heart-beat': '30000,30000'
0238 }
0239 )
0240 self.mq_connected = True
0241
0242
0243 self.send_heartbeat()
0244
0245 self.logger.info(f"WorkflowRunner initialized and connected to ActiveMQ: {self.agent_name}")
0246
0247 def run_workflow(self, workflow_name: str, config_name: Optional[str] = None,
0248 duration: float = 3600, realtime: bool = False, **override_params) -> str:
0249 """
0250 Run a workflow by name
0251
0252 Args:
0253 workflow_name: Name of the workflow (e.g., 'stf_processing')
0254 config_name: Name of config file (defaults to {workflow_name}_default.toml)
0255 duration: Simulation duration in seconds
0256 realtime: If True, run in real-time mode (1 sim second = 1 wall-clock second)
0257 **override_params: Parameters to override from config
0258
0259 Returns:
0260 execution_id: The ID of the workflow execution
0261 """
0262
0263 workflow_code = self._load_workflow_code(workflow_name)
0264 config = self._load_workflow_config(workflow_name, config_name)
0265
0266
0267 for section, values in self.testbed_overrides.items():
0268 if section in config:
0269 config[section].update(values)
0270 else:
0271 config[section] = values
0272
0273
0274 if override_params:
0275 for section, values in config.items():
0276 if section != 'workflow' and isinstance(values, dict):
0277 for key, value in override_params.items():
0278 if key in values:
0279 values[key] = value
0280
0281
0282 executed_by = os.getenv('USER', 'unknown')
0283 execution_id = self._generate_execution_id(workflow_name, executed_by)
0284
0285
0286 self.current_execution_id = execution_id
0287
0288
0289 workflow_file = self.workflows_dir / f"{workflow_name}.py"
0290 self._register_workflow_definition(
0291 name=config['workflow']['name'],
0292 version=config['workflow']['version'],
0293 code=workflow_code,
0294 config=config,
0295 workflow_file=workflow_file
0296 )
0297
0298
0299 self._create_execution_record(
0300 execution_id=execution_id,
0301 workflow_name=config['workflow']['name'],
0302 workflow_version=config['workflow']['version'],
0303 config=config
0304 )
0305
0306
0307 self._execute_workflow(
0308 execution_id=execution_id,
0309 workflow_code=workflow_code,
0310 config=config,
0311 duration=duration,
0312 realtime=realtime
0313 )
0314
0315
0316 self._update_execution_status(execution_id, 'completed')
0317
0318 return execution_id
0319
0320 def _load_workflow_code(self, workflow_name: str) -> str:
0321 """Load workflow Python code from file"""
0322 workflow_file = self.workflows_dir / f"{workflow_name}.py"
0323 if not workflow_file.exists():
0324 raise FileNotFoundError(f"Workflow {workflow_name}.py not found in {self.workflows_dir}")
0325
0326 with open(workflow_file, 'r') as f:
0327 return f.read()
0328
0329 def _load_workflow_config(self, workflow_name: str, config_name: Optional[str] = None) -> Dict[str, Any]:
0330 """Load workflow TOML configuration with includes support.
0331
0332 Config files use descriptive section names (e.g., [daq_state_machine], [fast_processing]).
0333 Included configs are loaded first, then main config sections are added/merged.
0334 Sections are kept intact for explicit access by workflow code.
0335 """
0336 if config_name is None:
0337 config_name = f"{workflow_name}_default.toml"
0338 elif not config_name.endswith('.toml'):
0339 config_name = f"{config_name}.toml"
0340
0341 config_file = self.workflows_dir / config_name
0342 if not config_file.exists():
0343 raise FileNotFoundError(f"Config {config_name} not found in {self.workflows_dir}")
0344
0345
0346 with open(config_file, 'rb') as f:
0347 main_config = tomllib.load(f)
0348
0349
0350 includes = main_config.get('workflow', {}).get('includes', [])
0351
0352 if not includes:
0353 return main_config
0354
0355
0356 for include_file in includes:
0357 include_path = self.workflows_dir / include_file
0358 if not include_path.exists():
0359 raise FileNotFoundError(f"Included config {include_file} not found in {self.workflows_dir}")
0360
0361 with open(include_path, 'rb') as f:
0362 included_config = tomllib.load(f)
0363
0364 for section, values in included_config.items():
0365 if section != 'workflow' and section not in main_config:
0366 main_config[section] = values
0367
0368 return main_config
0369
0370 def _generate_execution_id(self, workflow_name: str, executed_by: str) -> str:
0371 """Generate human-readable execution ID"""
0372
0373 response = self.api_session.post(
0374 f"{self.monitor_url}/api/state/next-workflow-execution-id/",
0375 json={'workflow_name': workflow_name}
0376 )
0377
0378 if response.status_code == 200:
0379 data = response.json()
0380 sequence = data.get('sequence', 1)
0381 else:
0382
0383 print(f"WARNING: Persistent state API failed: {response.status_code}")
0384 count_response = self.api_session.get(
0385 f"{self.monitor_url}/api/workflow-executions/",
0386 params={'workflow_name': workflow_name}
0387 )
0388 if count_response.status_code == 200:
0389 count_data = count_response.json()
0390 if isinstance(count_data, dict) and 'results' in count_data:
0391 sequence = len(count_data['results']) + 1
0392 elif isinstance(count_data, list):
0393 sequence = len(count_data) + 1
0394 else:
0395 sequence = 1
0396 else:
0397 print(f"ERROR: Cannot get execution count: {count_response.status_code}")
0398 raise Exception(f"Failed to generate execution ID - API unavailable")
0399
0400
0401 return f"{workflow_name}-{executed_by}-{sequence:04d}"
0402
0403 def _register_workflow_definition(self, name: str, version: str, code: str, config: Dict[str, Any],
0404 workflow_file: Path = None):
0405 """Register or update workflow definition in database"""
0406
0407 check_url = f"{self.monitor_url}/api/workflow-definitions/"
0408 check_response = self.api_session.get(
0409 check_url,
0410 params={'workflow_name': name, 'version': version}
0411 )
0412
0413
0414 source_info = get_github_source_info(workflow_file) if workflow_file else None
0415
0416
0417 expanded_config = {
0418 'workflow': {
0419 'name': config['workflow']['name'],
0420 'version': config['workflow']['version']
0421 }
0422 }
0423 if source_info:
0424 expanded_config['source'] = source_info
0425
0426 if 'description' in config['workflow']:
0427 expanded_config['workflow']['description'] = config['workflow']['description']
0428
0429 for section, values in config.items():
0430 if section != 'workflow' and isinstance(values, dict):
0431 expanded_config[section] = values
0432
0433 payload = {
0434 'workflow_name': name,
0435 'version': version,
0436 'workflow_type': 'simulation',
0437 'definition': code,
0438 'parameter_values': expanded_config,
0439 'created_by': os.getenv('USER', 'unknown'),
0440 'created_at': datetime.now().isoformat()
0441 }
0442
0443 if check_response.status_code == 200:
0444 definitions = check_response.json()
0445 existing_definition = None
0446
0447
0448 if isinstance(definitions, list):
0449 existing_definition = definitions[0] if definitions else None
0450 else:
0451 results = definitions.get('results', [])
0452 existing_definition = results[0] if results else None
0453
0454 if existing_definition:
0455
0456 return existing_definition
0457
0458
0459 response = self.api_session.post(
0460 f"{self.monitor_url}/api/workflow-definitions/",
0461 json=payload
0462 )
0463 else:
0464
0465 response = self.api_session.post(
0466 f"{self.monitor_url}/api/workflow-definitions/",
0467 json=payload
0468 )
0469
0470 if response.status_code not in [200, 201]:
0471 print(f"Error: Failed to register workflow definition: {response.status_code}")
0472 try:
0473 error_detail = response.json()
0474 print(f"API error response: {error_detail}")
0475 except:
0476 print(f"Raw response: {response.text}")
0477 raise Exception(f"Failed to register workflow definition: {response.status_code}")
0478
0479 return response.json()
0480
0481 def _create_execution_record(self, execution_id: str, workflow_name: str,
0482 workflow_version: str, config: Dict[str, Any]):
0483 """Create workflow execution record with full config for auditability."""
0484
0485 def_response = self.api_session.get(
0486 f"{self.monitor_url}/api/workflow-definitions/",
0487 params={'workflow_name': workflow_name, 'version': workflow_version}
0488 )
0489
0490 if def_response.status_code != 200:
0491 print(f"Warning: Could not find workflow definition for {workflow_name} v{workflow_version}")
0492 return
0493
0494 definitions = def_response.json()
0495 if not definitions:
0496 print(f"Warning: No workflow definition found for {workflow_name} v{workflow_version}")
0497 return
0498
0499
0500 if isinstance(definitions, list):
0501 if not definitions:
0502 print(f"Warning: No workflow definition found for {workflow_name} v{workflow_version}")
0503 return
0504 workflow_definition_id = definitions[0]['id']
0505 else:
0506 results = definitions.get('results', [])
0507 if not results:
0508 print(f"Warning: No workflow definition found for {workflow_name} v{workflow_version}")
0509 return
0510 workflow_definition_id = results[0]['id']
0511
0512
0513 namespace = config.get('testbed', {}).get('namespace')
0514 if namespace:
0515 try:
0516 ensure_namespace(self.monitor_url, self.api_session, namespace, logger=self.logger)
0517 except Exception:
0518 pass
0519
0520
0521 git_version = get_git_version(self.workflows_dir)
0522 config_with_version = dict(config)
0523 if git_version:
0524 config_with_version['git_version'] = git_version
0525
0526 payload = {
0527 'execution_id': execution_id,
0528 'workflow_definition': workflow_definition_id,
0529 'namespace': namespace,
0530 'status': 'running',
0531 'executed_by': os.getenv('USER', 'unknown'),
0532 'start_time': datetime.now().isoformat(),
0533 'parameter_values': config_with_version
0534 }
0535
0536 response = self.api_session.post(
0537 f"{self.monitor_url}/api/workflow-executions/",
0538 json=payload
0539 )
0540
0541 if response.status_code not in [200, 201]:
0542 print(f"Error: Failed to create execution record: {response.status_code}")
0543 try:
0544 error_detail = response.json()
0545 print(f"API error response: {error_detail}")
0546 except:
0547 print(f"Raw response: {response.text}")
0548 raise Exception(f"Failed to create execution record: {response.status_code}")
0549
0550 def _on_simulation_step(self, env, execution_id: str) -> bool:
0551 """
0552 Called between simulation events.
0553
0554 Override or extend this method to add per-step behavior such as:
0555 - Progress reporting to monitor
0556 - Periodic heartbeats during long workflows
0557 - Logging/metrics collection
0558
0559 Args:
0560 env: SimPy environment
0561 execution_id: Current execution ID
0562
0563 Returns:
0564 True to continue simulation, False to stop
0565 """
0566
0567 if self.stop_event.is_set():
0568 self.logger.info("Stop requested - ending simulation")
0569 return False
0570
0571 return True
0572
0573 def _execute_workflow(self, execution_id: str, workflow_code: str,
0574 config: Dict[str, Any], duration: float,
0575 realtime: bool = False):
0576 """Execute workflow using SimPy with step callback support.
0577
0578 Args:
0579 execution_id: Unique identifier for this execution
0580 workflow_code: Python code containing WorkflowExecutor class
0581 config: Full workflow config with descriptive sections
0582 duration: Simulation duration in seconds
0583 realtime: If True, use RealtimeEnvironment (1 sim sec = 1 wall sec)
0584
0585 The simulation calls _on_simulation_step() between events to allow
0586 graceful stopping and other per-step behaviors.
0587 """
0588
0589 if realtime:
0590
0591
0592 self.logger.info("Using real-time simulation mode")
0593 env = simpy.rt.RealtimeEnvironment(factor=1, strict=False)
0594 else:
0595
0596 env = simpy.Environment()
0597
0598
0599 namespace = {
0600 'env': env,
0601 'config': config,
0602 'runner': self,
0603 'execution_id': execution_id
0604 }
0605
0606
0607 exec(workflow_code, namespace)
0608
0609
0610 if 'WorkflowExecutor' in namespace:
0611
0612 executor = namespace['WorkflowExecutor'](
0613 config=config,
0614 runner=self,
0615 execution_id=execution_id
0616 )
0617
0618
0619 workflow_process = env.process(executor.execute(env))
0620
0621
0622 end_time = duration if duration and duration > 0 else float('inf')
0623
0624 while True:
0625
0626 if not self._on_simulation_step(env, execution_id):
0627 break
0628
0629
0630 if workflow_process.processed:
0631 break
0632
0633
0634 if env.now >= end_time:
0635 self.logger.info(f"Duration limit reached: {duration}s")
0636 break
0637
0638
0639 try:
0640 env.step()
0641 except simpy.core.EmptySchedule:
0642
0643 break
0644 else:
0645 raise ValueError("WorkflowExecutor class not found in workflow code")
0646
0647 def _update_execution_status(self, execution_id: str, status: str):
0648 """Update workflow execution status"""
0649 payload = {
0650 'status': status,
0651 'end_time': datetime.now().isoformat() if status == 'completed' else None
0652 }
0653
0654 response = self.api_session.patch(
0655 f"{self.monitor_url}/api/workflow-executions/{execution_id}/",
0656 json=payload
0657 )
0658
0659 if response.status_code != 200:
0660 print(f"Warning: Failed to update execution status: {response.status_code}")
0661
0662 def initialize_state(self, state_id: int, execution_id: str, config: dict = None):
0663 """
0664 Initialize state machine record for workflow execution.
0665 Currently creates RunState (run-level subset).
0666 Future: broader state machine tracking.
0667
0668 Args:
0669 state_id: Run number / state identifier
0670 execution_id: Workflow execution ID
0671 config: Full workflow config for extracting workflow-specific params
0672 """
0673
0674 workflow_params = {}
0675 for section in ['fast_processing', 'daq_state_machine']:
0676 if config and section in config:
0677 workflow_params.update(config[section])
0678
0679 state_data = {
0680 'run_number': state_id,
0681 'phase': 'initializing',
0682 'state': 'imminent',
0683 'substate': 'preparing',
0684 'target_worker_count': workflow_params.get('target_worker_count', 0),
0685 'active_worker_count': 0,
0686 'stf_samples_received': 0,
0687 'slices_created': 0,
0688 'slices_queued': 0,
0689 'slices_processing': 0,
0690 'slices_completed': 0,
0691 'slices_failed': 0,
0692 'state_changed_at': datetime.now().isoformat(),
0693 'metadata': {
0694 'execution_id': execution_id,
0695 'stf_sampling_rate': workflow_params.get('stf_sampling_rate'),
0696 'slices_per_sample': workflow_params.get('slices_per_sample')
0697 }
0698 }
0699
0700 response = self.api_session.post(
0701 f"{self.monitor_url}/api/run-states/",
0702 json=state_data
0703 )
0704
0705 if response.status_code in [200, 201]:
0706 self.logger.info(f"State initialized: {state_id}")
0707 return True
0708
0709 self.logger.error(f"Failed to initialize state: {response.status_code}")
0710 try:
0711 self.logger.error(f"Response: {response.json()}")
0712 except Exception:
0713 self.logger.error(f"Response: {response.text}")
0714 return False
0715
0716
0717
0718
0719
0720 def on_message(self, frame):
0721 """
0722 Handle incoming workflow control messages.
0723
0724 Supported commands:
0725 - run_workflow: Start a workflow execution
0726 - stop_workflow: Stop current workflow (future)
0727 - status_request: Report current status
0728 """
0729 message_data, msg_type = self.log_received_message(
0730 frame, known_types=self.COMMAND_MESSAGE_TYPES
0731 )
0732
0733
0734 if message_data is None:
0735 return
0736
0737 if msg_type == 'run_workflow':
0738 self._handle_run_workflow(message_data)
0739 elif msg_type == 'stop_workflow':
0740 self._handle_stop_workflow(message_data)
0741 elif msg_type == 'status_request':
0742 self._handle_status_request(message_data)
0743 else:
0744 self.logger.debug(f"Ignoring unhandled message type: {msg_type}")
0745
0746 def _handle_run_workflow(self, message_data: Dict[str, Any]):
0747 """Handle run_workflow command - starts workflow in background thread."""
0748
0749 if self.operational_state == 'PROCESSING':
0750 self.logger.warning(
0751 f"Cannot start workflow - already running: {self.current_workflow_name} "
0752 f"(execution: {self.current_execution_id})"
0753 )
0754 return
0755
0756
0757 workflow_name = message_data.get('workflow_name')
0758 if not workflow_name:
0759 self.logger.error("run_workflow message missing 'workflow_name'")
0760 return
0761
0762 config_name = message_data.get('config')
0763 realtime = message_data.get('realtime', True)
0764 duration = message_data.get('duration', 0)
0765 params = message_data.get('params', {})
0766
0767
0768 self.stop_event.clear()
0769 self.set_processing()
0770 self.current_workflow_name = workflow_name
0771
0772 self.logger.info(
0773 f"Starting workflow: {workflow_name} (config: {config_name})",
0774 extra={'workflow_name': workflow_name}
0775 )
0776
0777
0778 self.workflow_thread = threading.Thread(
0779 target=self._run_workflow_thread,
0780 args=(workflow_name, config_name, duration, realtime, params),
0781 name=f"workflow-{workflow_name}",
0782 daemon=True
0783 )
0784 self.workflow_thread.start()
0785
0786 def _run_workflow_thread(self, workflow_name: str, config_name: Optional[str],
0787 duration: float, realtime: bool, params: Dict[str, Any]):
0788 """Run workflow in background thread with proper cleanup."""
0789 try:
0790 execution_id = self.run_workflow(
0791 workflow_name=workflow_name,
0792 config_name=config_name,
0793 duration=duration,
0794 realtime=realtime,
0795 **params
0796 )
0797
0798 if self.stop_event.is_set():
0799 self.logger.info(
0800 f"Workflow stopped: {workflow_name} (execution: {execution_id})",
0801 extra={'execution_id': execution_id, 'workflow_name': workflow_name}
0802 )
0803
0804 self._update_execution_status(execution_id, 'terminated')
0805 else:
0806 self.logger.info(
0807 f"Workflow completed: {workflow_name} (execution: {execution_id})",
0808 extra={'execution_id': execution_id, 'workflow_name': workflow_name}
0809 )
0810
0811 except Exception as e:
0812 exec_id = getattr(self, 'current_execution_id', None)
0813 self.logger.error(
0814 f"Workflow failed: {workflow_name} (execution: {exec_id or 'unknown'}): {e}",
0815 extra={'execution_id': exec_id, 'workflow_name': workflow_name}
0816 )
0817 if exec_id:
0818 self._update_execution_status(exec_id, 'failed')
0819
0820 finally:
0821 self.current_execution_id = None
0822 self.current_workflow_name = None
0823 self.workflow_thread = None
0824 self.set_ready()
0825
0826 def _handle_stop_workflow(self, message_data: Dict[str, Any]):
0827 """Handle stop_workflow command - signals workflow to stop gracefully."""
0828 if self.operational_state != 'PROCESSING':
0829 self.logger.info("No workflow running to stop")
0830 return
0831
0832
0833 requested_exec_id = message_data.get('execution_id')
0834 if requested_exec_id and requested_exec_id != self.current_execution_id:
0835 self.logger.info(
0836 f"Stop request for {requested_exec_id} ignored - "
0837 f"current execution is {self.current_execution_id}"
0838 )
0839 return
0840
0841 self.logger.info(
0842 f"Stopping workflow: {self.current_workflow_name} (execution: {self.current_execution_id})",
0843 extra={'execution_id': self.current_execution_id, 'workflow_name': self.current_workflow_name}
0844 )
0845 self.stop_event.set()
0846
0847 def _handle_status_request(self, message_data: Dict[str, Any]):
0848 """Handle status_request command - logs current status."""
0849 self.logger.info(
0850 f"Status: state={self.operational_state}, "
0851 f"workflow={self.current_workflow_name}, "
0852 f"execution={self.current_execution_id}"
0853 )
0854
0855
0856
0857
0858
0859
0860 def main():
0861 """
0862 Main entry point for WorkflowRunner.
0863
0864 Default mode: Persistent agent listening for workflow commands.
0865 --run-once mode: Execute single workflow and exit (backward compat).
0866 """
0867 import argparse
0868
0869 script_dir = Path(__file__).parent
0870
0871 parser = argparse.ArgumentParser(
0872 description='Workflow Runner - DAQ Simulator Agent',
0873 formatter_class=argparse.RawDescriptionHelpFormatter,
0874 epilog="""
0875 Examples:
0876 # Persistent mode (default) - listens for commands
0877 python workflow_runner.py
0878
0879 # Run single workflow and exit
0880 python workflow_runner.py --run-once stf_datataking --stf-count 5
0881
0882 # Run with specific config
0883 python workflow_runner.py --run-once stf_datataking --workflow-config fast_processing_default
0884 """
0885 )
0886
0887 parser.add_argument('--testbed-config', default=None,
0888 help='Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)')
0889 parser.add_argument('--debug', action='store_true', help='Enable debug logging')
0890
0891
0892 parser.add_argument('--run-once', metavar='WORKFLOW',
0893 help='Run single workflow and exit (CLI mode)')
0894
0895
0896 parser.add_argument('--workflow-config', help='Workflow configuration file name')
0897 parser.add_argument('--duration', type=float, default=0,
0898 help='Max duration in seconds (0 = run until complete)')
0899 parser.add_argument('--stf-count', type=int, help='Override STF count')
0900 parser.add_argument('--physics-period-count', type=int, help='Override physics period count')
0901 parser.add_argument('--physics-period-duration', type=float, help='Override physics period duration')
0902 parser.add_argument('--stf-interval', type=float, help='Override STF interval')
0903 parser.add_argument('--realtime', action='store_true', default=True,
0904 help='Run in real-time mode (default: True)')
0905 parser.add_argument('--no-realtime', action='store_false', dest='realtime',
0906 help='Run as fast as possible (discrete-event simulation)')
0907
0908 args = parser.parse_args()
0909
0910
0911 workflow_params = {}
0912 if args.stf_count is not None:
0913 workflow_params['stf_count'] = args.stf_count
0914 if args.physics_period_count is not None:
0915 workflow_params['physics_period_count'] = args.physics_period_count
0916 if args.physics_period_duration is not None:
0917 workflow_params['physics_period_duration'] = args.physics_period_duration
0918 if args.stf_interval is not None:
0919 workflow_params['stf_interval'] = args.stf_interval
0920
0921 if args.run_once:
0922
0923 runner = WorkflowRunner(
0924 config_path=args.testbed_config,
0925 debug=args.debug,
0926 workflow_name=args.run_once
0927 )
0928
0929 runner.set_processing()
0930 try:
0931 execution_id = runner.run_workflow(
0932 workflow_name=args.run_once,
0933 config_name=args.workflow_config,
0934 duration=args.duration,
0935 realtime=args.realtime,
0936 **workflow_params
0937 )
0938 runner.logger.info(f"Workflow completed: {execution_id}")
0939 except Exception as e:
0940 runner.logger.error(f"Workflow failed: {e}")
0941 sys.exit(1)
0942 finally:
0943 runner.set_ready()
0944 else:
0945
0946 runner = WorkflowRunner(
0947 config_path=args.testbed_config,
0948 debug=args.debug
0949 )
0950 runner.run()
0951
0952
0953 if __name__ == "__main__":
0954 main()