Warning, /swf-testbed/docs/workflows.md is written in an unsupported language. File is not indexed.
0001 # Workflow Orchestration Framework
0002
0003 ## Overview
0004
0005 The Workflow Orchestration Framework provides a structured approach for defining, executing, and monitoring complex multi-step processes using TOML configuration, Python+SimPy execution logic, ActiveMQ messaging, and database persistence.
0006
0007 **Key Architecture Principles:**
0008 - **Namespace Isolation**: Workflows operate within namespaces, allowing users to discriminate their workflows from others
0009 - **One Workflow, Multiple Configurations**: The same DAQ datataking workflow serves different downstream processing strategies through TOML configuration
0010 - **Configuration Composition**: TOML configs can include base configurations for reusability
0011 - **ActiveMQ Messaging**: Workflows broadcast real messages that agents respond to
0012 - **Database as Truth**: Fully expanded configurations stored in database for reproducibility
0013 - **Agent-Driven Processing**: Workflows simulate DAQ; agents implement downstream processing logic
0014 - **Immutable Definitions**: Once created, workflow definitions are not modified; each execution records its specific git version
0015
0016 ## Configuration Format and Composition
0017
0018 ### Basic Configuration
0019 ```toml
0020 # workflows/stf_processing_default.toml
0021 [workflow]
0022 name = "stf_datataking"
0023 version = "1.0"
0024 description = "STF datataking workflow for standard processing"
0025 includes = ["daq_state_machine.toml"]
0026
0027 [stf_processing]
0028 # Uses daq_state_machine defaults; add stf_processing-specific values here
0029 ```
0030
0031 ### Configuration Composition via Includes
0032
0033 Configurations support composition through the `includes` directive. Each config file uses a descriptive section name (e.g., `[daq_state_machine]`, `[fast_processing]`) that identifies the component:
0034
0035 ```toml
0036 [workflow]
0037 name = "stf_datataking"
0038 version = "1.0"
0039 includes = ["daq_state_machine.toml"]
0040
0041 [fast_processing]
0042 # Values here override daq_state_machine values and add fast_processing-specific config
0043 stf_count = 10
0044 target_worker_count = 30
0045 ```
0046
0047 **Loading Order:**
0048 1. Include files load in array order
0049 2. Later includes override earlier ones
0050 3. Main config parameters have final say
0051
0052 **Database Storage:**
0053 The fully expanded configuration (with all includes merged) is saved to the database, ensuring execution records are complete snapshots with no external file dependencies.
0054
0055 ## Namespace Configuration
0056
0057 Namespaces allow users to isolate their workflows from others. All workflows and agents operate within a namespace, and messages include the namespace for filtering.
0058
0059 ### Testbed Configuration
0060
0061 Before running workflows, configure your namespace in `workflows/testbed.toml`:
0062
0063 ```toml
0064 # workflows/testbed.toml
0065 [testbed]
0066 # Namespace for this testbed instance.
0067 # Examples: "epic-fastmon-dev", "collab-dec29", "wenauseic-test1"
0068 namespace = "your-namespace"
0069
0070 # Optional: Override workflow config values
0071 [fast_processing]
0072 stf_count = 5 # Fewer STF files for quick testing
0073 ```
0074
0075 **Namespace Purpose:**
0076 - Discriminate your workflows from other users' workflows
0077 - Filter messages and data in the monitor UI
0078 - Allow multiple users to collaborate in one namespace
0079 - Enable parallel testing without interference
0080
0081 **Important:** The namespace must be set before running workflows. An empty namespace will cause an error.
0082
0083 ## Source Traceability
0084
0085 Workflow definitions and executions include source tracking for reproducibility:
0086
0087 **Workflow Definitions** store:
0088 - GitHub org/repo/path/branch at time of creation
0089 - Link to source script in repository
0090
0091 **Workflow Executions** store:
0092 - Git commit hash at time of execution
0093 - Branch/tag information
0094 - Links to specific version of source code
0095
0096 The monitor UI displays these as clickable GitHub links.
0097
0098 ## STF Datataking Workflow Implementation
0099
0100 The unified STF datataking workflow simulates the ePIC DAQ state machine, broadcasting ActiveMQ messages that downstream agents respond to. The same workflow code serves both standard STF processing and fast processing - configuration determines the behavior.
0101
0102 ### Workflow Code
0103
0104 ```python
0105 # workflows/stf_datataking.py
0106 class WorkflowExecutor:
0107 def __init__(self, config, runner, execution_id):
0108 self.config = config
0109 self.runner = runner # WorkflowRunner instance (BaseAgent)
0110 self.execution_id = execution_id
0111 self.stf_sequence = 0
0112 self.run_id = None
0113
0114 # Build merged params: daq_state_machine base, with workflow-specific overrides
0115 self.daq = config.get('daq_state_machine', {})
0116 for section in ['fast_processing', 'stf_processing']:
0117 if section in config:
0118 self.daq = {**self.daq, **config[section]}
0119
0120 def execute(self, env):
0121 # Generate run ID for this execution
0122 from swf_common_lib.api_utils import get_next_run_number
0123 self.run_id = get_next_run_number(
0124 self.runner.monitor_url,
0125 self.runner.api_session,
0126 self.runner.logger
0127 )
0128
0129 # State 1: no_beam / not_ready (Collider not operating)
0130 yield env.timeout(self.daq['no_beam_not_ready_delay'])
0131
0132 # State 2: beam / not_ready (Run start imminent)
0133 yield env.process(self.broadcast_run_imminent(env))
0134 yield env.timeout(self.daq['broadcast_delay'])
0135 yield env.timeout(self.daq['beam_not_ready_delay'])
0136
0137 # State 3: beam / ready (Ready for physics)
0138 yield env.timeout(self.daq['beam_ready_delay'])
0139
0140 # Physics periods loop with standby between them
0141 period = 0
0142 while self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0143 # Broadcast start or resume message
0144 if period == 0:
0145 yield env.process(self.broadcast_run_start(env))
0146 else:
0147 yield env.process(self.broadcast_resume_run(env))
0148 yield env.timeout(self.daq['broadcast_delay'])
0149
0150 # STF generation during physics
0151 yield from self.generate_stfs_during_physics(env, self.daq['physics_period_duration'])
0152
0153 period += 1
0154
0155 # Standby between physics periods
0156 if self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0157 yield env.process(self.broadcast_pause_run(env))
0158 yield env.timeout(self.daq['broadcast_delay'])
0159 yield env.timeout(self.daq['standby_duration'])
0160
0161 # State 7: beam / not_ready + broadcast run end
0162 yield env.process(self.broadcast_run_end(env))
0163 yield env.timeout(self.daq['broadcast_delay'])
0164 yield env.timeout(self.daq['beam_not_ready_end_delay'])
0165
0166 def broadcast_stf_gen(self, env, stf_filename):
0167 """Broadcast STF generation via ActiveMQ."""
0168 message = {
0169 "msg_type": "stf_gen",
0170 "execution_id": self.execution_id,
0171 "run_id": self.run_id,
0172 "filename": stf_filename,
0173 "sequence": self.stf_sequence,
0174 "timestamp": datetime.now().isoformat(),
0175 "simulation_tick": env.now,
0176 "state": "run",
0177 "substate": "physics"
0178 }
0179 self.runner.send_message('/topic/epictopic', message)
0180 yield env.timeout(0.1)
0181 ```
0182
0183 **Key Features:**
0184 - **ActiveMQ Messaging**: Broadcasts workflow events via `runner.send_message()`
0185 - **Parameter Distribution**: Messages include `execution_id` for agents to query WorkflowExecution
0186 - **8-State DAQ Machine**: Matches ePIC DAQ simulator state transitions
0187 - **SimPy Timing**: Simulates timing for state transitions and STF generation
0188
0189 ## Fast Processing Configuration
0190
0191 Fast processing uses the **same stf_datataking.py workflow** as standard STF processing, but with different configuration parameters. The distinction is in downstream agent behavior, not workflow code.
0192
0193 ### Configuration
0194
0195 ```toml
0196 # workflows/fast_processing_default.toml
0197 [workflow]
0198 name = "stf_datataking"
0199 version = "1.0"
0200 description = "STF datataking workflow for fast processing"
0201 includes = ["daq_state_machine.toml"]
0202
0203 [fast_processing]
0204 # Count-based STF generation
0205 stf_count = 10 # Generate exactly 10 STF files
0206 physics_period_count = 1 # Single physics period
0207
0208 # Fast processing parameters
0209 target_worker_count = 30 # Target number of workers
0210 stf_sampling_rate = 0.05 # FastMon sampling fraction (5%)
0211 slices_per_sample = 15 # TF slices per STF sample
0212 slice_processing_time = 30 # Processing time per slice (seconds)
0213 worker_rampup_time = 300 # Worker startup time (5 min)
0214 worker_rampdown_time = 60 # Graceful shutdown time (1 min)
0215 ```
0216
0217 ### Agent-Driven Processing
0218
0219 The workflow broadcasts the same DAQ messages (`run_imminent`, `start_run`, `stf_gen`, `end_run`). The difference is in how agents respond:
0220
0221 **Standard STF Processing:**
0222 - `processing_agent` processes complete STF files
0223 - Creates PanDA tasks for full reconstruction
0224
0225 **Fast Processing:**
0226 - `fastmon_agent` samples STF files, creates TF samples
0227 - `fast_processing_agent` creates slices from TF samples
0228 - Workers process slices for near real-time shifter monitoring
0229
0230 ### Performance Metrics
0231
0232 With default parameters:
0233 - STF rate: 0.5 Hz → 1 STF sample every 2 seconds
0234 - Slices created: 15/sample × 0.5 Hz = 7.5 slices/sec
0235 - Processing capacity: 30 workers ÷ 30 sec/slice = 1 slice/sec per worker
0236 - Total capacity: 30 workers = real-time processing at 0.5 Hz
0237
0238 ### Database Integration
0239
0240 During execution, the workflow populates:
0241 - `RunState`: phase, worker counts, slice statistics
0242 - `TFSlice`: individual slice tracking with status
0243 - `Worker`: worker lifecycle and performance
0244 - `SystemStateEvent`: complete event history for replay
0245
0246
0247 ## Workflow Management Framework
0248
0249 ### Database Models
0250 ```python
0251 class WorkflowDefinition(models.Model):
0252 workflow_name = models.CharField(max_length=200)
0253 version = models.CharField(max_length=50)
0254 workflow_type = models.CharField(max_length=100) # e.g. "fast_processing", "stf_processing", "custom"
0255 definition = models.TextField(max_length=5000) # Python code content
0256 parameter_values = models.JSONField() # Default parameter values and schema
0257 created_by = models.CharField(max_length=100)
0258 created_at = models.DateTimeField(auto_now_add=True)
0259 updated_at = models.DateTimeField(auto_now=True)
0260
0261 class Meta:
0262 unique_together = ('workflow_name', 'version')
0263
0264 class WorkflowExecution(models.Model):
0265 execution_id = models.CharField(primary_key=True, max_length=100) # e.g. "stf_processing-wenauseic-0001"
0266 workflow_definition = models.ForeignKey(WorkflowDefinition, on_delete=CASCADE)
0267 parameter_values = models.JSONField() # Actual values used for this execution
0268 performance_metrics = models.JSONField(null=True)
0269 execution_environment = models.JSONField(null=True)
0270 start_time = models.DateTimeField()
0271 end_time = models.DateTimeField(null=True)
0272 status = models.CharField(max_length=20) # running, completed, failed
0273 executed_by = models.CharField(max_length=100)
0274 error_message = models.TextField(null=True)
0275 stack_trace = models.TextField(null=True)
0276 ```
0277
0278 ### Running Workflows
0279
0280 **One-time CLI mode** - run a single workflow and exit:
0281
0282 ```bash
0283 # Run workflow with count-based completion (preferred)
0284 python workflows/workflow_runner.py --run-once stf_datataking --workflow-config fast_processing_default --stf-count 10
0285
0286 # Run in REAL-TIME mode (for testing with downstream agents)
0287 python workflows/workflow_runner.py --run-once stf_datataking --workflow-config fast_processing_default --stf-count 5 --realtime
0288
0289 # Run with duration limit
0290 python workflows/workflow_runner.py --run-once stf_datataking --workflow-config fast_processing_default --duration 120
0291
0292 # Override specific parameters
0293 python workflows/workflow_runner.py --run-once stf_datataking \
0294 --workflow-config fast_processing_default \
0295 --stf-count 20 \
0296 --physics-period-count 2 \
0297 --stf-interval 1.5
0298 ```
0299
0300 **Persistent agent mode** - run as daemon, receive commands via ActiveMQ:
0301
0302 ```bash
0303 # Start persistent agent (listens on workflow_control queue)
0304 python workflows/workflow_runner.py
0305
0306 # Send commands via CLI utility
0307 python workflows/send_workflow_command.py run --workflow stf_datataking --stf-count 5 --no-realtime
0308 python workflows/send_workflow_command.py stop --execution-id <exec_id>
0309 python workflows/send_workflow_command.py status
0310
0311 # Or via MCP tools: start_workflow, stop_workflow, list_workflow_executions
0312 ```
0313
0314 **Command Line Arguments:**
0315 - `workflow_name` - Name of workflow Python file (e.g., `stf_datataking`)
0316 - `--workflow-config` - TOML configuration file name (with or without .toml extension)
0317 - `--testbed-config` - Path to testbed.toml (default: `workflows/testbed.toml`)
0318 - `--stf-count` - Generate exactly N STF files then complete (preferred over duration)
0319 - `--duration` - Maximum simulation duration in seconds (0 = run until workflow completes)
0320 - `--realtime` - Run in real-time mode (see Simulation Modes below)
0321 - `--physics-period-count` - Override physics period count
0322 - `--physics-period-duration` - Override physics period duration (seconds)
0323 - `--stf-interval` - Override STF generation interval (seconds)
0324
0325 ### Simulation Modes
0326
0327 The workflow simulator supports two execution modes:
0328
0329 **Fast Simulation Mode (default)**
0330 - SimPy discrete-event simulation runs as fast as possible
0331 - A 120-second workflow completes in ~2 seconds of wall-clock time
0332 - Useful for testing workflow logic and database integration
0333 - Messages are broadcast instantly without timing constraints
0334
0335 **Real-Time Mode (`--realtime`)**
0336 - Uses SimPy's `RealtimeEnvironment` to tie simulation time to wall-clock time
0337 - A 120-second workflow takes ~120 seconds to complete
0338 - Essential for testing with downstream agents (e.g., `fast_processing_agent`)
0339 - Messages are paced realistically, allowing agents to process them in sequence
0340 - Use `strict=False` to allow the simulation to catch up if it falls behind
0341
0342 **What Happens:**
0343 1. WorkflowRunner (a BaseAgent) connects to ActiveMQ and registers as agent
0344 2. Configuration files are loaded with includes merged
0345 3. Fully expanded config saved to WorkflowDefinition and WorkflowExecution
0346 4. Workflow broadcasts ActiveMQ messages during execution
0347 5. Downstream agents receive messages and query WorkflowExecution for parameters
0348
0349 ### Execution ID Generation
0350
0351 Workflow executions use human-readable IDs following the pattern:
0352 ```
0353 workflow-username-NNNN
0354 ```
0355
0356 For example: `stf_processing-wenauseic-0001`
0357
0358 The sequence numbers are generated atomically via the monitor API to ensure uniqueness.
0359
0360 ## Directory Structure
0361 ```
0362 workflows/
0363 ├── testbed.toml # Testbed instance config (namespace, overrides)
0364 ├── stf_datataking.py # Unified DAQ datataking workflow
0365 ├── daq_state_machine.toml # Base DAQ parameters (included by others)
0366 ├── stf_processing_default.toml # STF processing configuration
0367 ├── fast_processing_default.toml # Fast processing configuration
0368 ├── workflow_runner.py # Workflow execution agent (BaseAgent, persistent or CLI)
0369 └── send_workflow_command.py # CLI utility to send commands to persistent agent
0370 ```
0371
0372 ## Web Interface
0373
0374 The monitor provides web-based workflow management:
0375
0376 - **Workflow Definitions**: View and manage workflow templates at `/workflows/`
0377 - **Workflow Executions**: Monitor execution instances and their status
0378 - **Execution Details**: View parameters, duration, and performance metrics
0379
0380 All workflow code and configuration data is displayed with syntax highlighting for improved readability.
0381
0382 ## Integration with Agent Infrastructure
0383
0384 Workflows integrate seamlessly with the agent-based messaging system:
0385
0386 **WorkflowRunner as Agent:**
0387 - Inherits from `BaseAgent`
0388 - Registers using the workflow name as agent type (e.g., `STF_Datataking`)
0389 - Sends messages to `epictopic` ActiveMQ topic
0390 - Connects to monitor API for database operations
0391
0392 **Agent Communication:**
0393 - Workflows broadcast DAQ state transition messages
0394 - Messages include `execution_id` for parameter lookup
0395 - Agents query `/api/workflow-executions/{execution_id}/` to get full parameters
0396 - Same messages, different agent responses = different workflows
0397
0398 **Example Message Flow:**
0399 1. WorkflowRunner broadcasts `run_imminent` with `execution_id`
0400 2. `fast_processing_agent` receives message, queries WorkflowExecution
0401 3. Agent extracts `target_worker_count`, `slices_per_sample` from parameters
0402 4. Agent initiates worker preparation based on configuration
0403 5. Workflow broadcasts `stf_gen` messages
0404 6. Agent creates TF slices and distributes to workers
0405
0406 This architecture decouples workflow orchestration (DAQ simulation) from processing logic (agent behavior), enabling multiple downstream processing strategies with the same workflow code.
0407
0408 ## Supervisord Agent Orchestration
0409
0410 ### Problem
0411
0412 - Each agent requires its own terminal/process
0413 - No way to define "this workflow needs these agents"
0414 - No single command to start/stop agent group
0415 - Multiple testbed.toml files (workflows/, example_agents/) - should be ONE
0416
0417 ### Architecture Decision
0418
0419 Use **supervisord** for agent management (already in project), NOT subprocesses.
0420
0421 **Agent behavior:**
0422 - Agents are **persistent** - they start, wait for work, process it, close out, go back to waiting
0423 - Agents should not exit after processing - they're long-running services
0424 - This is the production architecture we're building toward
0425
0426 ### Solution Design
0427
0428 **1. Single testbed.toml in workflows/**
0429
0430 All agents use `workflows/testbed.toml`. The `example_agents/testbed.toml` has been deleted.
0431
0432 ```toml
0433 [testbed]
0434 namespace = "torre1"
0435
0436 [workflow]
0437 name = "stf_datataking"
0438 config = "fast_processing_default"
0439 realtime = true
0440
0441 [agents.processing]
0442 enabled = true
0443 script = "example_agents/example_processing_agent.py"
0444
0445 [agents.data]
0446 enabled = false
0447 script = "example_agents/example_data_agent.py"
0448
0449 [agents.fastmon]
0450 enabled = false
0451 script = "example_agents/example_fastmon_agent.py"
0452
0453 [agents.fast_processing]
0454 enabled = false
0455 script = "example_agents/fast_processing_agent.py"
0456
0457 [parameters]
0458 # Optional workflow parameter overrides
0459 # stf_count = 5
0460 # physics_period_count = 2
0461 ```
0462
0463 **2. Workflow Orchestrator**
0464
0465 CLI command: `testbed run <name>` - starts agents, triggers workflow from `<name>.toml`, returns immediately.
0466
0467 ```
0468 testbed run fast_processing
0469 │
0470 ├── Load workflows/fast_processing.toml (or fast_processing_default.toml)
0471 │
0472 ├── Start enabled agents via supervisorctl
0473 │ └── Verify PID exists (not health check, just process alive)
0474 │
0475 ├── Send run_workflow command to WorkflowRunner
0476 │ └── Via ActiveMQ workflow_control queue
0477 │
0478 └── Return immediately (async operations running)
0479 ```
0480
0481 **Key design points:**
0482 - **Non-blocking**: Launches async operations, returns immediately. No Ctrl+C required.
0483 - **No health check**: Just verify agent PID exists
0484 - **Status via separate commands**: `testbed status`, monitor page, MCP tools
0485
0486 **3. supervisord Integration**
0487
0488 Agents are managed via a separate supervisord config (`agents.supervisord.conf`):
0489 - Uses separate socket: `/tmp/swf-agents-supervisor.sock`
0490 - `autostart=false` - agents start on demand via `testbed run`
0491 - Programs: workflow-runner, example-data-agent, example-processing-agent, example-fastmon-agent, fast-processing-agent
0492
0493 **4. Status Reporting**
0494
0495 `testbed status` and `testbed status-local` commands report:
0496 - Running workflow executions (from monitor API)
0497 - Active agents with operational state and type
0498 - Standard service status (Docker or system services, supervisord)
0499
0500 ### Files
0501
0502 | File | Description |
0503 |------|-------------|
0504 | `workflows/testbed.toml` | Testbed instance config with [agents] section |
0505 | `agents.supervisord.conf` | Agent-specific supervisord configuration |
0506 | `workflows/orchestrator.py` | Orchestration module - start agents, trigger workflow |
0507 | `src/swf_testbed_cli/main.py` | CLI with `run` and enhanced `status` commands |
0508
0509 ### Usage
0510
0511 ```bash
0512 # Run workflow with default testbed.toml settings
0513 testbed run
0514
0515 # Run with specific config
0516 testbed run fast_processing
0517
0518 # Check status (includes workflow and agent info from monitor)
0519 testbed status
0520 testbed status-local
0521
0522 # Stop agents
0523 supervisorctl -c agents.supervisord.conf stop all
0524 ```