Warning, /swf-testbed/docs/fast-processing-workflow.md is written in an unsupported language. File is not indexed.
0001 # Fast Processing Workflow
0002
0003 This document describes the fast processing workflow for near real-time detector data processing via PanDA/iDDS workers.
0004
0005 ## Overview
0006
0007 The fast processing workflow enables rapid processing of detector data by:
0008 1. Simulating DAQ data taking (STF generation)
0009 2. Sampling Time Frames (TF) from Super Time Frames (STF)
0010 3. Creating TF slices for parallel processing
0011 4. Distributing slices to PanDA workers running reconstruction payloads
0012
0013 ### Pipeline Overview
0014
0015 
0016
0017 **Worker Payload:** Each PanDA worker receives a TF slice as input and runs a reconstruction payload. Currently the payload is a placeholder; in production it will be EICrecon for ePIC detector reconstruction.
0018
0019 ## Message Flow
0020
0021 ### Agent Pipeline
0022
0023 ```mermaid
0024 flowchart LR
0025 DS["DAQ Simulator"]
0026 DA["Data Agent"]
0027 FM["FastMon Agent"]
0028 FP["Fast Processing Agent"]
0029 W["PanDA Workers<br/>(EICrecon)"]
0030
0031 DS -->|"stf_gen<br/>STF"| DA
0032 DA -->|"stf_ready"| FM
0033 FM -->|"tf_file_registered<br/>STF sample"| FP
0034 FP -->|"TF slices"| W
0035 ```
0036
0037 ### Complete Workflow Sequence
0038
0039 ```mermaid
0040 sequenceDiagram
0041 participant DS as DAQ Simulator
0042 participant DA as Data Agent
0043 participant FM as FastMon Agent
0044 participant FP as Fast Processing Agent
0045 participant PQ as PanDA Queue
0046 participant MON as Monitor DB
0047
0048 Note over DS,MON: === Run Initialization ===
0049
0050 DS->>DA: run_imminent (execution_id, run_id)
0051 DS->>FM: run_imminent
0052 DS->>FP: run_imminent
0053 DA->>MON: Create dataset for run
0054 FP->>MON: Initialize RunState
0055
0056 DS->>DA: start_run (run_id)
0057 DS->>FM: start_run
0058 DS->>FP: start_run
0059 FP->>MON: Update RunState (phase=physics)
0060
0061 Note over DS,MON: === STF Processing (repeat for each STF) ===
0062
0063 DS->>DA: stf_gen (filename, sequence)
0064 DS->>FM: stf_gen
0065 DS->>FP: stf_gen
0066
0067 DA->>MON: Register STF file
0068 DA->>DA: Process STF metadata
0069 DA->>FM: stf_ready (filename, run_id)
0070 DA->>FP: stf_ready
0071
0072 FM->>FM: Sample TFs from STF
0073 FM->>MON: Register TF samples
0074 FM->>FP: tf_file_registered (tf_filename, stf_filename)
0075
0076 FP->>FP: Create TF slices
0077 FP->>MON: Register TF slices
0078
0079 loop For each slice
0080 FP->>PQ: slice message
0081 end
0082
0083 FP->>MON: Update RunState (slices_created)
0084
0085 Note over DS,MON: === Run Completion ===
0086
0087 DS->>DA: end_run (total_stf_files)
0088 DS->>FM: end_run
0089 DS->>FP: end_run
0090
0091 DA->>MON: Finalize dataset
0092 FP->>MON: Update RunState (phase=completed)
0093 ```
0094
0095 ## Data Products
0096
0097 ### Hierarchy
0098
0099 ```
0100 Run (run_id: 101993)
0101 ├── STF Files (Super Time Frames)
0102 │ ├── swf.101993.000001.stf
0103 │ ├── swf.101993.000002.stf
0104 │ └── swf.101993.000003.stf
0105 │
0106 ├── TF Samples (Time Frame samples from FastMon)
0107 │ ├── swf.101993.000001_tf_001.tf
0108 │ ├── swf.101993.000001_tf_002.tf
0109 │ └── ...
0110 │
0111 └── TF Slices (for PanDA workers)
0112 ├── swf.101993.000001_slice_000.tf
0113 ├── swf.101993.000001_slice_001.tf
0114 └── ... (15 slices per STF sample)
0115 ```
0116
0117 ### Data Product Details
0118
0119 | Product | Created By | Stored In | Purpose |
0120 |---------|-----------|-----------|---------|
0121 | **STF File** | DAQ Simulator | STFFile table | Raw detector data unit |
0122 | **TF Sample** | FastMon Agent | FastMonFile table | Sampled subset for fast monitoring |
0123 | **TF Slice** | Fast Processing Agent | TFSlice table | Processing unit for PanDA workers |
0124
0125 ## Message Types
0126
0127 ### Broadcast Messages (DAQ Simulator → All Agents)
0128
0129 | Message | Payload | Purpose |
0130 |---------|---------|---------|
0131 | `run_imminent` | `execution_id`, `run_id`, `workflow_params` | Prepare for new run |
0132 | `start_run` | `run_id`, `state=physics` | Begin data taking |
0133 | `stf_gen` | `filename`, `sequence`, `run_id` | New STF available |
0134 | `pause_run` | `run_id` | Temporary halt |
0135 | `resume_run` | `run_id` | Resume from pause |
0136 | `end_run` | `run_id`, `total_stf_files` | Run complete |
0137
0138 ### Agent-to-Agent Messages
0139
0140 | Message | From | To | Payload |
0141 |---------|------|----|---------|
0142 | `stf_ready` | Data Agent | FastMon, Fast Processing | `filename`, `checksum`, `size_bytes` |
0143 | `tf_file_registered` | FastMon Agent | Fast Processing Agent | `tf_filename`, `stf_filename`, `tf_count` |
0144
0145 ### Queue Messages (Fast Processing → PanDA)
0146
0147 | Message | Destination | Payload |
0148 |---------|-------------|---------|
0149 | `slice` | `/queue/panda.transformer.slices` | `slice_id`, `tf_filename`, `start`, `end`, `tf_count` |
0150
0151 ## Configuration
0152
0153 ### fast_processing_default.toml
0154
0155 ```toml
0156 [testbed]
0157 namespace = "torre2"
0158
0159 [agents.data]
0160 enabled = true
0161
0162 [agents.fastmon]
0163 enabled = true
0164
0165 [agents.fast_processing]
0166 enabled = true
0167
0168 [fast_processing]
0169 stf_count = 10 # STF files to generate
0170 physics_period_count = 1 # Physics periods per run
0171 target_worker_count = 30 # Target PanDA workers
0172 stf_sampling_rate = 1.0 # Fraction of STFs to sample (1.0 = 100%)
0173 slices_per_sample = 15 # TF slices per STF sample
0174 slice_processing_time = 30 # Seconds per slice (for planning)
0175 ```
0176
0177 ### Workflow Parameters Flow
0178
0179 ```mermaid
0180 flowchart TB
0181 subgraph Config["Configuration"]
0182 TOML["fast_processing_default.toml"]
0183 end
0184
0185 subgraph Execution["Workflow Execution"]
0186 WE["WorkflowExecution<br/>(parameter_values)"]
0187 end
0188
0189 subgraph Agents["Agents"]
0190 DS["DAQ Simulator<br/>(reads config directly)"]
0191 FP["Fast Processing Agent<br/>(fetches from API)"]
0192 end
0193
0194 TOML -->|"loaded at start"| DS
0195 DS -->|"creates execution"| WE
0196 WE -->|"GET /workflow-executions/{id}/"| FP
0197 ```
0198
0199 ## State Transitions
0200
0201 ### RunState Lifecycle
0202
0203 ```mermaid
0204 stateDiagram-v2
0205 [*] --> imminent: run_imminent received
0206
0207 imminent --> physics: start_run received
0208 physics --> standby: pause_run received
0209 standby --> physics: resume_run received
0210 physics --> completed: end_run received
0211
0212 completed --> [*]
0213
0214 note right of physics
0215 Active data taking
0216 STF files generated
0217 TF slices created
0218 end note
0219
0220 note right of standby
0221 Paused state
0222 No new STFs
0223 Workers idle
0224 end note
0225 ```
0226
0227 ### TF Slice Lifecycle
0228
0229 ```mermaid
0230 stateDiagram-v2
0231 [*] --> queued: Created by Fast Processing Agent
0232
0233 queued --> processing: Worker picks up slice
0234 processing --> completed: Processing successful
0235 processing --> failed: Processing error
0236
0237 failed --> queued: Retry (max 3)
0238
0239 completed --> [*]
0240 failed --> [*]: Max retries exceeded
0241 ```
0242
0243 ## Monitoring
0244
0245 ### Key Metrics
0246
0247 | Metric | Source | Purpose |
0248 |--------|--------|---------|
0249 | `stf_count` | WorkflowExecution | Total STFs in run |
0250 | `tf_files_received` | Fast Processing Agent | TF samples processed |
0251 | `slices_created` | RunState | Total slices generated |
0252 | `slices_queued` | RunState | Slices waiting for workers |
0253 | `slices_completed` | RunState | Successfully processed |
0254
0255 ### Monitoring Queries (MCP)
0256
0257 ```python
0258 # Workflow status
0259 get_workflow_monitor(execution_id='stf_datataking-wenauseic-0049')
0260
0261 # Messages during execution
0262 list_messages(execution_id='stf_datataking-wenauseic-0049')
0263
0264 # STF files for a run
0265 list_stf_files(run_number=101993)
0266
0267 # TF slices for a run
0268 list_tf_slices(run_number=101993)
0269
0270 # Agent logs
0271 list_logs(execution_id='stf_datataking-wenauseic-0049')
0272 ```
0273
0274 ## Example Execution
0275
0276 From a recent test run (execution `stf_datataking-wenauseic-0049`):
0277
0278 ```
0279 Run 101993 Summary:
0280 ├── Duration: ~14 seconds
0281 ├── STF files: 3 (all processed)
0282 ├── Messages:
0283 │ ├── run_imminent → all agents prepared
0284 │ ├── start_run → physics phase began
0285 │ ├── stf_gen (x3) → STF files generated
0286 │ ├── stf_ready (x3) → Data Agent processed
0287 │ └── end_run → Run completed
0288 └── Agents involved:
0289 ├── daq_simulator-agent-wenauseic-484
0290 ├── data-agent-wenauseic-481
0291 ├── fastmon-agent-wenauseic-482
0292 └── fast_processing-agent-wenauseic-483
0293 ```
0294
0295 ## See Also
0296
0297 - [Agent Management](agent-management.md) - Starting and stopping agents
0298 - [Architecture Overview](architecture.md) - System design
0299 - [Operations Guide](operations.md) - Day-to-day operations