Back to home page

EIC code displayed by LXR

 
 

    


Warning, /swf-testbed/docs/workflows.md is written in an unsupported language. File is not indexed.

0001 # Workflow Orchestration Framework
0002 
0003 ## Overview
0004 
0005 The Workflow Orchestration Framework provides a structured approach for defining, executing, and monitoring complex multi-step processes using TOML configuration, Python+SimPy execution logic, ActiveMQ messaging, and database persistence.
0006 
0007 **Key Architecture Principles:**
0008 - **Namespace Isolation**: Workflows operate within namespaces, allowing users to discriminate their workflows from others
0009 - **One Workflow, Multiple Configurations**: The same DAQ datataking workflow serves different downstream processing strategies through TOML configuration
0010 - **Configuration Composition**: TOML configs can include base configurations for reusability
0011 - **ActiveMQ Messaging**: Workflows broadcast real messages that agents respond to
0012 - **Database as Truth**: Fully expanded configurations stored in database for reproducibility
0013 - **Agent-Driven Processing**: Workflows simulate DAQ; agents implement downstream processing logic
0014 - **Immutable Definitions**: Once created, workflow definitions are not modified; each execution records its specific git version
0015 
0016 ## Configuration Format and Composition
0017 
0018 ### Basic Configuration
0019 ```toml
0020 # workflows/stf_processing_default.toml
0021 [workflow]
0022 name = "stf_datataking"
0023 version = "1.0"
0024 description = "STF datataking workflow for standard processing"
0025 includes = ["daq_state_machine.toml"]
0026 
0027 [stf_processing]
0028 # Uses daq_state_machine defaults; add stf_processing-specific values here
0029 ```
0030 
0031 ### Configuration Composition via Includes
0032 
0033 Configurations support composition through the `includes` directive. Each config file uses a descriptive section name (e.g., `[daq_state_machine]`, `[fast_processing]`) that identifies the component:
0034 
0035 ```toml
0036 [workflow]
0037 name = "stf_datataking"
0038 version = "1.0"
0039 includes = ["daq_state_machine.toml"]
0040 
0041 [fast_processing]
0042 # Values here override daq_state_machine values and add fast_processing-specific config
0043 stf_count = 10
0044 target_worker_count = 30
0045 ```
0046 
0047 **Loading Order:**
0048 1. Include files load in array order
0049 2. Later includes override earlier ones
0050 3. Main config parameters have final say
0051 
0052 **Database Storage:**
0053 The fully expanded configuration (with all includes merged) is saved to the database, ensuring execution records are complete snapshots with no external file dependencies.
0054 
0055 ## Namespace Configuration
0056 
0057 Namespaces allow users to isolate their workflows from others. All workflows and agents operate within a namespace, and messages include the namespace for filtering.
0058 
0059 ### Testbed Configuration
0060 
0061 Before running workflows, configure your namespace in `workflows/testbed.toml`:
0062 
0063 ```toml
0064 # workflows/testbed.toml
0065 [testbed]
0066 # Namespace for this testbed instance.
0067 # Examples: "epic-fastmon-dev", "collab-dec29", "wenauseic-test1"
0068 namespace = "your-namespace"
0069 
0070 # Optional: Override workflow config values
0071 [fast_processing]
0072 stf_count = 5                   # Fewer STF files for quick testing
0073 ```
0074 
0075 **Namespace Purpose:**
0076 - Discriminate your workflows from other users' workflows
0077 - Filter messages and data in the monitor UI
0078 - Allow multiple users to collaborate in one namespace
0079 - Enable parallel testing without interference
0080 
0081 **Important:** The namespace must be set before running workflows. An empty namespace will cause an error.
0082 
0083 ## Source Traceability
0084 
0085 Workflow definitions and executions include source tracking for reproducibility:
0086 
0087 **Workflow Definitions** store:
0088 - GitHub org/repo/path/branch at time of creation
0089 - Link to source script in repository
0090 
0091 **Workflow Executions** store:
0092 - Git commit hash at time of execution
0093 - Branch/tag information
0094 - Links to specific version of source code
0095 
0096 The monitor UI displays these as clickable GitHub links.
0097 
0098 ## STF Datataking Workflow Implementation
0099 
0100 The unified STF datataking workflow simulates the ePIC DAQ state machine, broadcasting ActiveMQ messages that downstream agents respond to. The same workflow code serves both standard STF processing and fast processing - configuration determines the behavior.
0101 
0102 ### Workflow Code
0103 
0104 ```python
0105 # workflows/stf_datataking.py
0106 class WorkflowExecutor:
0107     def __init__(self, config, runner, execution_id):
0108         self.config = config
0109         self.runner = runner  # WorkflowRunner instance (BaseAgent)
0110         self.execution_id = execution_id
0111         self.stf_sequence = 0
0112         self.run_id = None
0113 
0114         # Build merged params: daq_state_machine base, with workflow-specific overrides
0115         self.daq = config.get('daq_state_machine', {})
0116         for section in ['fast_processing', 'stf_processing']:
0117             if section in config:
0118                 self.daq = {**self.daq, **config[section]}
0119 
0120     def execute(self, env):
0121         # Generate run ID for this execution
0122         from swf_common_lib.api_utils import get_next_run_number
0123         self.run_id = get_next_run_number(
0124             self.runner.monitor_url,
0125             self.runner.api_session,
0126             self.runner.logger
0127         )
0128 
0129         # State 1: no_beam / not_ready (Collider not operating)
0130         yield env.timeout(self.daq['no_beam_not_ready_delay'])
0131 
0132         # State 2: beam / not_ready (Run start imminent)
0133         yield env.process(self.broadcast_run_imminent(env))
0134         yield env.timeout(self.daq['broadcast_delay'])
0135         yield env.timeout(self.daq['beam_not_ready_delay'])
0136 
0137         # State 3: beam / ready (Ready for physics)
0138         yield env.timeout(self.daq['beam_ready_delay'])
0139 
0140         # Physics periods loop with standby between them
0141         period = 0
0142         while self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0143             # Broadcast start or resume message
0144             if period == 0:
0145                 yield env.process(self.broadcast_run_start(env))
0146             else:
0147                 yield env.process(self.broadcast_resume_run(env))
0148             yield env.timeout(self.daq['broadcast_delay'])
0149 
0150             # STF generation during physics
0151             yield from self.generate_stfs_during_physics(env, self.daq['physics_period_duration'])
0152 
0153             period += 1
0154 
0155             # Standby between physics periods
0156             if self.daq['physics_period_count'] == 0 or period < self.daq['physics_period_count']:
0157                 yield env.process(self.broadcast_pause_run(env))
0158                 yield env.timeout(self.daq['broadcast_delay'])
0159                 yield env.timeout(self.daq['standby_duration'])
0160 
0161         # State 7: beam / not_ready + broadcast run end
0162         yield env.process(self.broadcast_run_end(env))
0163         yield env.timeout(self.daq['broadcast_delay'])
0164         yield env.timeout(self.daq['beam_not_ready_end_delay'])
0165 
0166     def broadcast_stf_gen(self, env, stf_filename):
0167         """Broadcast STF generation via ActiveMQ."""
0168         message = {
0169             "msg_type": "stf_gen",
0170             "execution_id": self.execution_id,
0171             "run_id": self.run_id,
0172             "filename": stf_filename,
0173             "sequence": self.stf_sequence,
0174             "timestamp": datetime.now().isoformat(),
0175             "simulation_tick": env.now,
0176             "state": "run",
0177             "substate": "physics"
0178         }
0179         self.runner.send_message('/topic/epictopic', message)
0180         yield env.timeout(0.1)
0181 ```
0182 
0183 **Key Features:**
0184 - **ActiveMQ Messaging**: Broadcasts workflow events via `runner.send_message()`
0185 - **Parameter Distribution**: Messages include `execution_id` for agents to query WorkflowExecution
0186 - **8-State DAQ Machine**: Matches ePIC DAQ simulator state transitions
0187 - **SimPy Timing**: Simulates timing for state transitions and STF generation
0188 
0189 ## Fast Processing Configuration
0190 
0191 Fast processing uses the **same stf_datataking.py workflow** as standard STF processing, but with different configuration parameters. The distinction is in downstream agent behavior, not workflow code.
0192 
0193 ### Configuration
0194 
0195 ```toml
0196 # workflows/fast_processing_default.toml
0197 [workflow]
0198 name = "stf_datataking"
0199 version = "1.0"
0200 description = "STF datataking workflow for fast processing"
0201 includes = ["daq_state_machine.toml"]
0202 
0203 [fast_processing]
0204 # Count-based STF generation
0205 stf_count = 10                  # Generate exactly 10 STF files
0206 physics_period_count = 1        # Single physics period
0207 
0208 # Fast processing parameters
0209 target_worker_count = 30        # Target number of workers
0210 stf_sampling_rate = 0.05        # FastMon sampling fraction (5%)
0211 slices_per_sample = 15          # TF slices per STF sample
0212 slice_processing_time = 30      # Processing time per slice (seconds)
0213 worker_rampup_time = 300        # Worker startup time (5 min)
0214 worker_rampdown_time = 60       # Graceful shutdown time (1 min)
0215 ```
0216 
0217 ### Agent-Driven Processing
0218 
0219 The workflow broadcasts the same DAQ messages (`run_imminent`, `start_run`, `stf_gen`, `end_run`). The difference is in how agents respond:
0220 
0221 **Standard STF Processing:**
0222 - `processing_agent` processes complete STF files
0223 - Creates PanDA tasks for full reconstruction
0224 
0225 **Fast Processing:**
0226 - `fastmon_agent` samples STF files, creates TF samples
0227 - `fast_processing_agent` creates slices from TF samples
0228 - Workers process slices for near real-time shifter monitoring
0229 
0230 ### Performance Metrics
0231 
0232 With default parameters:
0233 - STF rate: 0.5 Hz → 1 STF sample every 2 seconds
0234 - Slices created: 15/sample × 0.5 Hz = 7.5 slices/sec
0235 - Processing capacity: 30 workers ÷ 30 sec/slice = 1 slice/sec per worker
0236 - Total capacity: 30 workers = real-time processing at 0.5 Hz
0237 
0238 ### Database Integration
0239 
0240 During execution, the workflow populates:
0241 - `RunState`: phase, worker counts, slice statistics
0242 - `TFSlice`: individual slice tracking with status
0243 - `Worker`: worker lifecycle and performance
0244 - `SystemStateEvent`: complete event history for replay
0245 
0246 
0247 ## Workflow Management Framework
0248 
0249 ### Database Models
0250 ```python
0251 class WorkflowDefinition(models.Model):
0252     workflow_name = models.CharField(max_length=200)
0253     version = models.CharField(max_length=50)
0254     workflow_type = models.CharField(max_length=100)  # e.g. "fast_processing", "stf_processing", "custom"
0255     definition = models.TextField(max_length=5000)  # Python code content
0256     parameter_values = models.JSONField()  # Default parameter values and schema
0257     created_by = models.CharField(max_length=100)
0258     created_at = models.DateTimeField(auto_now_add=True)
0259     updated_at = models.DateTimeField(auto_now=True)
0260 
0261     class Meta:
0262         unique_together = ('workflow_name', 'version')
0263 
0264 class WorkflowExecution(models.Model):
0265     execution_id = models.CharField(primary_key=True, max_length=100)  # e.g. "stf_processing-wenauseic-0001"
0266     workflow_definition = models.ForeignKey(WorkflowDefinition, on_delete=CASCADE)
0267     parameter_values = models.JSONField()  # Actual values used for this execution
0268     performance_metrics = models.JSONField(null=True)
0269     execution_environment = models.JSONField(null=True)
0270     start_time = models.DateTimeField()
0271     end_time = models.DateTimeField(null=True)
0272     status = models.CharField(max_length=20)  # running, completed, failed
0273     executed_by = models.CharField(max_length=100)
0274     error_message = models.TextField(null=True)
0275     stack_trace = models.TextField(null=True)
0276 ```
0277 
0278 ### Running Workflows
0279 
0280 **One-time CLI mode** - run a single workflow and exit:
0281 
0282 ```bash
0283 # Run workflow with count-based completion (preferred)
0284 python workflows/workflow_runner.py --run-once stf_datataking --workflow-config fast_processing_default --stf-count 10
0285 
0286 # Run in REAL-TIME mode (for testing with downstream agents)
0287 python workflows/workflow_runner.py --run-once stf_datataking --workflow-config fast_processing_default --stf-count 5 --realtime
0288 
0289 # Run with duration limit
0290 python workflows/workflow_runner.py --run-once stf_datataking --workflow-config fast_processing_default --duration 120
0291 
0292 # Override specific parameters
0293 python workflows/workflow_runner.py --run-once stf_datataking \
0294     --workflow-config fast_processing_default \
0295     --stf-count 20 \
0296     --physics-period-count 2 \
0297     --stf-interval 1.5
0298 ```
0299 
0300 **Persistent agent mode** - run as daemon, receive commands via ActiveMQ:
0301 
0302 ```bash
0303 # Start persistent agent (listens on workflow_control queue)
0304 python workflows/workflow_runner.py
0305 
0306 # Send commands via CLI utility
0307 python workflows/send_workflow_command.py run --workflow stf_datataking --stf-count 5 --no-realtime
0308 python workflows/send_workflow_command.py stop --execution-id <exec_id>
0309 python workflows/send_workflow_command.py status
0310 
0311 # Or via MCP tools: start_workflow, stop_workflow, list_workflow_executions
0312 ```
0313 
0314 **Command Line Arguments:**
0315 - `workflow_name` - Name of workflow Python file (e.g., `stf_datataking`)
0316 - `--workflow-config` - TOML configuration file name (with or without .toml extension)
0317 - `--testbed-config` - Path to testbed.toml (default: `workflows/testbed.toml`)
0318 - `--stf-count` - Generate exactly N STF files then complete (preferred over duration)
0319 - `--duration` - Maximum simulation duration in seconds (0 = run until workflow completes)
0320 - `--realtime` - Run in real-time mode (see Simulation Modes below)
0321 - `--physics-period-count` - Override physics period count
0322 - `--physics-period-duration` - Override physics period duration (seconds)
0323 - `--stf-interval` - Override STF generation interval (seconds)
0324 
0325 ### Simulation Modes
0326 
0327 The workflow simulator supports two execution modes:
0328 
0329 **Fast Simulation Mode (default)**
0330 - SimPy discrete-event simulation runs as fast as possible
0331 - A 120-second workflow completes in ~2 seconds of wall-clock time
0332 - Useful for testing workflow logic and database integration
0333 - Messages are broadcast instantly without timing constraints
0334 
0335 **Real-Time Mode (`--realtime`)**
0336 - Uses SimPy's `RealtimeEnvironment` to tie simulation time to wall-clock time
0337 - A 120-second workflow takes ~120 seconds to complete
0338 - Essential for testing with downstream agents (e.g., `fast_processing_agent`)
0339 - Messages are paced realistically, allowing agents to process them in sequence
0340 - Use `strict=False` to allow the simulation to catch up if it falls behind
0341 
0342 **What Happens:**
0343 1. WorkflowRunner (a BaseAgent) connects to ActiveMQ and registers as agent
0344 2. Configuration files are loaded with includes merged
0345 3. Fully expanded config saved to WorkflowDefinition and WorkflowExecution
0346 4. Workflow broadcasts ActiveMQ messages during execution
0347 5. Downstream agents receive messages and query WorkflowExecution for parameters
0348 
0349 ### Execution ID Generation
0350 
0351 Workflow executions use human-readable IDs following the pattern:
0352 ```
0353 workflow-username-NNNN
0354 ```
0355 
0356 For example: `stf_processing-wenauseic-0001`
0357 
0358 The sequence numbers are generated atomically via the monitor API to ensure uniqueness.
0359 
0360 ## Directory Structure
0361 ```
0362 workflows/
0363 ├── testbed.toml                         # Testbed instance config (namespace, overrides)
0364 ├── stf_datataking.py                    # Unified DAQ datataking workflow
0365 ├── daq_state_machine.toml               # Base DAQ parameters (included by others)
0366 ├── stf_processing_default.toml          # STF processing configuration
0367 ├── fast_processing_default.toml         # Fast processing configuration
0368 ├── workflow_runner.py                   # Workflow execution agent (BaseAgent, persistent or CLI)
0369 └── send_workflow_command.py             # CLI utility to send commands to persistent agent
0370 ```
0371 
0372 ## Web Interface
0373 
0374 The monitor provides web-based workflow management:
0375 
0376 - **Workflow Definitions**: View and manage workflow templates at `/workflows/`
0377 - **Workflow Executions**: Monitor execution instances and their status
0378 - **Execution Details**: View parameters, duration, and performance metrics
0379 
0380 All workflow code and configuration data is displayed with syntax highlighting for improved readability.
0381 
0382 ## Integration with Agent Infrastructure
0383 
0384 Workflows integrate seamlessly with the agent-based messaging system:
0385 
0386 **WorkflowRunner as Agent:**
0387 - Inherits from `BaseAgent`
0388 - Registers using the workflow name as agent type (e.g., `STF_Datataking`)
0389 - Sends messages to `epictopic` ActiveMQ topic
0390 - Connects to monitor API for database operations
0391 
0392 **Agent Communication:**
0393 - Workflows broadcast DAQ state transition messages
0394 - Messages include `execution_id` for parameter lookup
0395 - Agents query `/api/workflow-executions/{execution_id}/` to get full parameters
0396 - Same messages, different agent responses = different workflows
0397 
0398 **Example Message Flow:**
0399 1. WorkflowRunner broadcasts `run_imminent` with `execution_id`
0400 2. `fast_processing_agent` receives message, queries WorkflowExecution
0401 3. Agent extracts `target_worker_count`, `slices_per_sample` from parameters
0402 4. Agent initiates worker preparation based on configuration
0403 5. Workflow broadcasts `stf_gen` messages
0404 6. Agent creates TF slices and distributes to workers
0405 
0406 This architecture decouples workflow orchestration (DAQ simulation) from processing logic (agent behavior), enabling multiple downstream processing strategies with the same workflow code.
0407 
0408 ## Supervisord Agent Orchestration
0409 
0410 ### Problem
0411 
0412 - Each agent requires its own terminal/process
0413 - No way to define "this workflow needs these agents"
0414 - No single command to start/stop agent group
0415 - Multiple testbed.toml files (workflows/, example_agents/) - should be ONE
0416 
0417 ### Architecture Decision
0418 
0419 Use **supervisord** for agent management (already in project), NOT subprocesses.
0420 
0421 **Agent behavior:**
0422 - Agents are **persistent** - they start, wait for work, process it, close out, go back to waiting
0423 - Agents should not exit after processing - they're long-running services
0424 - This is the production architecture we're building toward
0425 
0426 ### Solution Design
0427 
0428 **1. Single testbed.toml in workflows/**
0429 
0430 All agents use `workflows/testbed.toml`. The `example_agents/testbed.toml` has been deleted.
0431 
0432 ```toml
0433 [testbed]
0434 namespace = "torre1"
0435 
0436 [workflow]
0437 name = "stf_datataking"
0438 config = "fast_processing_default"
0439 realtime = true
0440 
0441 [agents.processing]
0442 enabled = true
0443 script = "example_agents/example_processing_agent.py"
0444 
0445 [agents.data]
0446 enabled = false
0447 script = "example_agents/example_data_agent.py"
0448 
0449 [agents.fastmon]
0450 enabled = false
0451 script = "example_agents/example_fastmon_agent.py"
0452 
0453 [agents.fast_processing]
0454 enabled = false
0455 script = "example_agents/fast_processing_agent.py"
0456 
0457 [parameters]
0458 # Optional workflow parameter overrides
0459 # stf_count = 5
0460 # physics_period_count = 2
0461 ```
0462 
0463 **2. Workflow Orchestrator**
0464 
0465 CLI command: `testbed run <name>` - starts agents, triggers workflow from `<name>.toml`, returns immediately.
0466 
0467 ```
0468 testbed run fast_processing
04690470     ├── Load workflows/fast_processing.toml (or fast_processing_default.toml)
04710472     ├── Start enabled agents via supervisorctl
0473     │   └── Verify PID exists (not health check, just process alive)
04740475     ├── Send run_workflow command to WorkflowRunner
0476     │   └── Via ActiveMQ workflow_control queue
04770478     └── Return immediately (async operations running)
0479 ```
0480 
0481 **Key design points:**
0482 - **Non-blocking**: Launches async operations, returns immediately. No Ctrl+C required.
0483 - **No health check**: Just verify agent PID exists
0484 - **Status via separate commands**: `testbed status`, monitor page, MCP tools
0485 
0486 **3. supervisord Integration**
0487 
0488 Agents are managed via a separate supervisord config (`agents.supervisord.conf`):
0489 - Uses separate socket: `/tmp/swf-agents-supervisor.sock`
0490 - `autostart=false` - agents start on demand via `testbed run`
0491 - Programs: workflow-runner, example-data-agent, example-processing-agent, example-fastmon-agent, fast-processing-agent
0492 
0493 **4. Status Reporting**
0494 
0495 `testbed status` and `testbed status-local` commands report:
0496 - Running workflow executions (from monitor API)
0497 - Active agents with operational state and type
0498 - Standard service status (Docker or system services, supervisord)
0499 
0500 ### Files
0501 
0502 | File | Description |
0503 |------|-------------|
0504 | `workflows/testbed.toml` | Testbed instance config with [agents] section |
0505 | `agents.supervisord.conf` | Agent-specific supervisord configuration |
0506 | `workflows/orchestrator.py` | Orchestration module - start agents, trigger workflow |
0507 | `src/swf_testbed_cli/main.py` | CLI with `run` and enhanced `status` commands |
0508 
0509 ### Usage
0510 
0511 ```bash
0512 # Run workflow with default testbed.toml settings
0513 testbed run
0514 
0515 # Run with specific config
0516 testbed run fast_processing
0517 
0518 # Check status (includes workflow and agent info from monitor)
0519 testbed status
0520 testbed status-local
0521 
0522 # Stop agents
0523 supervisorctl -c agents.supervisord.conf stop all
0524 ```