Back to home page

EIC code displayed by LXR

 
 

    


Warning, /swf-testbed/docs/fast-processing-workflow.md is written in an unsupported language. File is not indexed.

0001 # Fast Processing Workflow
0002 
0003 This document describes the fast processing workflow for near real-time detector data processing via PanDA/iDDS workers.
0004 
0005 ## Overview
0006 
0007 The fast processing workflow enables rapid processing of detector data by:
0008 1. Simulating DAQ data taking (STF generation)
0009 2. Sampling Time Frames (TF) from Super Time Frames (STF)
0010 3. Creating TF slices for parallel processing
0011 4. Distributing slices to PanDA workers running reconstruction payloads
0012 
0013 ### Pipeline Overview
0014 
0015 ![Fast Processing Pipeline](images/fast-processing-pipeline-v5.svg)
0016 
0017 **Worker Payload:** Each PanDA worker receives a TF slice as input and runs a reconstruction payload. Currently the payload is a placeholder; in production it will be EICrecon for ePIC detector reconstruction.
0018 
0019 ## Message Flow
0020 
0021 ### Agent Pipeline
0022 
0023 ```mermaid
0024 flowchart LR
0025     DS["DAQ Simulator"]
0026     DA["Data Agent"]
0027     FM["FastMon Agent"]
0028     FP["Fast Processing Agent"]
0029     W["PanDA Workers<br/>(EICrecon)"]
0030 
0031     DS -->|"stf_gen<br/>STF"| DA
0032     DA -->|"stf_ready"| FM
0033     FM -->|"tf_file_registered<br/>STF sample"| FP
0034     FP -->|"TF slices"| W
0035 ```
0036 
0037 ### Complete Workflow Sequence
0038 
0039 ```mermaid
0040 sequenceDiagram
0041     participant DS as DAQ Simulator
0042     participant DA as Data Agent
0043     participant FM as FastMon Agent
0044     participant FP as Fast Processing Agent
0045     participant PQ as PanDA Queue
0046     participant MON as Monitor DB
0047 
0048     Note over DS,MON: === Run Initialization ===
0049 
0050     DS->>DA: run_imminent (execution_id, run_id)
0051     DS->>FM: run_imminent
0052     DS->>FP: run_imminent
0053     DA->>MON: Create dataset for run
0054     FP->>MON: Initialize RunState
0055 
0056     DS->>DA: start_run (run_id)
0057     DS->>FM: start_run
0058     DS->>FP: start_run
0059     FP->>MON: Update RunState (phase=physics)
0060 
0061     Note over DS,MON: === STF Processing (repeat for each STF) ===
0062 
0063     DS->>DA: stf_gen (filename, sequence)
0064     DS->>FM: stf_gen
0065     DS->>FP: stf_gen
0066 
0067     DA->>MON: Register STF file
0068     DA->>DA: Process STF metadata
0069     DA->>FM: stf_ready (filename, run_id)
0070     DA->>FP: stf_ready
0071 
0072     FM->>FM: Sample TFs from STF
0073     FM->>MON: Register TF samples
0074     FM->>FP: tf_file_registered (tf_filename, stf_filename)
0075 
0076     FP->>FP: Create TF slices
0077     FP->>MON: Register TF slices
0078 
0079     loop For each slice
0080         FP->>PQ: slice message
0081     end
0082 
0083     FP->>MON: Update RunState (slices_created)
0084 
0085     Note over DS,MON: === Run Completion ===
0086 
0087     DS->>DA: end_run (total_stf_files)
0088     DS->>FM: end_run
0089     DS->>FP: end_run
0090 
0091     DA->>MON: Finalize dataset
0092     FP->>MON: Update RunState (phase=completed)
0093 ```
0094 
0095 ## Data Products
0096 
0097 ### Hierarchy
0098 
0099 ```
0100 Run (run_id: 101993)
0101 ├── STF Files (Super Time Frames)
0102 │   ├── swf.101993.000001.stf
0103 │   ├── swf.101993.000002.stf
0104 │   └── swf.101993.000003.stf
01050106 ├── TF Samples (Time Frame samples from FastMon)
0107 │   ├── swf.101993.000001_tf_001.tf
0108 │   ├── swf.101993.000001_tf_002.tf
0109 │   └── ...
01100111 └── TF Slices (for PanDA workers)
0112     ├── swf.101993.000001_slice_000.tf
0113     ├── swf.101993.000001_slice_001.tf
0114     └── ... (15 slices per STF sample)
0115 ```
0116 
0117 ### Data Product Details
0118 
0119 | Product | Created By | Stored In | Purpose |
0120 |---------|-----------|-----------|---------|
0121 | **STF File** | DAQ Simulator | STFFile table | Raw detector data unit |
0122 | **TF Sample** | FastMon Agent | FastMonFile table | Sampled subset for fast monitoring |
0123 | **TF Slice** | Fast Processing Agent | TFSlice table | Processing unit for PanDA workers |
0124 
0125 ## Message Types
0126 
0127 ### Broadcast Messages (DAQ Simulator → All Agents)
0128 
0129 | Message | Payload | Purpose |
0130 |---------|---------|---------|
0131 | `run_imminent` | `execution_id`, `run_id`, `workflow_params` | Prepare for new run |
0132 | `start_run` | `run_id`, `state=physics` | Begin data taking |
0133 | `stf_gen` | `filename`, `sequence`, `run_id` | New STF available |
0134 | `pause_run` | `run_id` | Temporary halt |
0135 | `resume_run` | `run_id` | Resume from pause |
0136 | `end_run` | `run_id`, `total_stf_files` | Run complete |
0137 
0138 ### Agent-to-Agent Messages
0139 
0140 | Message | From | To | Payload |
0141 |---------|------|----|---------|
0142 | `stf_ready` | Data Agent | FastMon, Fast Processing | `filename`, `checksum`, `size_bytes` |
0143 | `tf_file_registered` | FastMon Agent | Fast Processing Agent | `tf_filename`, `stf_filename`, `tf_count` |
0144 
0145 ### Queue Messages (Fast Processing → PanDA)
0146 
0147 | Message | Destination | Payload |
0148 |---------|-------------|---------|
0149 | `slice` | `/queue/panda.transformer.slices` | `slice_id`, `tf_filename`, `start`, `end`, `tf_count` |
0150 
0151 ## Configuration
0152 
0153 ### fast_processing_default.toml
0154 
0155 ```toml
0156 [testbed]
0157 namespace = "torre2"
0158 
0159 [agents.data]
0160 enabled = true
0161 
0162 [agents.fastmon]
0163 enabled = true
0164 
0165 [agents.fast_processing]
0166 enabled = true
0167 
0168 [fast_processing]
0169 stf_count = 10              # STF files to generate
0170 physics_period_count = 1    # Physics periods per run
0171 target_worker_count = 30    # Target PanDA workers
0172 stf_sampling_rate = 1.0     # Fraction of STFs to sample (1.0 = 100%)
0173 slices_per_sample = 15      # TF slices per STF sample
0174 slice_processing_time = 30  # Seconds per slice (for planning)
0175 ```
0176 
0177 ### Workflow Parameters Flow
0178 
0179 ```mermaid
0180 flowchart TB
0181     subgraph Config["Configuration"]
0182         TOML["fast_processing_default.toml"]
0183     end
0184 
0185     subgraph Execution["Workflow Execution"]
0186         WE["WorkflowExecution<br/>(parameter_values)"]
0187     end
0188 
0189     subgraph Agents["Agents"]
0190         DS["DAQ Simulator<br/>(reads config directly)"]
0191         FP["Fast Processing Agent<br/>(fetches from API)"]
0192     end
0193 
0194     TOML -->|"loaded at start"| DS
0195     DS -->|"creates execution"| WE
0196     WE -->|"GET /workflow-executions/{id}/"| FP
0197 ```
0198 
0199 ## State Transitions
0200 
0201 ### RunState Lifecycle
0202 
0203 ```mermaid
0204 stateDiagram-v2
0205     [*] --> imminent: run_imminent received
0206 
0207     imminent --> physics: start_run received
0208     physics --> standby: pause_run received
0209     standby --> physics: resume_run received
0210     physics --> completed: end_run received
0211 
0212     completed --> [*]
0213 
0214     note right of physics
0215         Active data taking
0216         STF files generated
0217         TF slices created
0218     end note
0219 
0220     note right of standby
0221         Paused state
0222         No new STFs
0223         Workers idle
0224     end note
0225 ```
0226 
0227 ### TF Slice Lifecycle
0228 
0229 ```mermaid
0230 stateDiagram-v2
0231     [*] --> queued: Created by Fast Processing Agent
0232 
0233     queued --> processing: Worker picks up slice
0234     processing --> completed: Processing successful
0235     processing --> failed: Processing error
0236 
0237     failed --> queued: Retry (max 3)
0238 
0239     completed --> [*]
0240     failed --> [*]: Max retries exceeded
0241 ```
0242 
0243 ## Monitoring
0244 
0245 ### Key Metrics
0246 
0247 | Metric | Source | Purpose |
0248 |--------|--------|---------|
0249 | `stf_count` | WorkflowExecution | Total STFs in run |
0250 | `tf_files_received` | Fast Processing Agent | TF samples processed |
0251 | `slices_created` | RunState | Total slices generated |
0252 | `slices_queued` | RunState | Slices waiting for workers |
0253 | `slices_completed` | RunState | Successfully processed |
0254 
0255 ### Monitoring Queries (MCP)
0256 
0257 ```python
0258 # Workflow status
0259 get_workflow_monitor(execution_id='stf_datataking-wenauseic-0049')
0260 
0261 # Messages during execution
0262 list_messages(execution_id='stf_datataking-wenauseic-0049')
0263 
0264 # STF files for a run
0265 list_stf_files(run_number=101993)
0266 
0267 # TF slices for a run
0268 list_tf_slices(run_number=101993)
0269 
0270 # Agent logs
0271 list_logs(execution_id='stf_datataking-wenauseic-0049')
0272 ```
0273 
0274 ## Example Execution
0275 
0276 From a recent test run (execution `stf_datataking-wenauseic-0049`):
0277 
0278 ```
0279 Run 101993 Summary:
0280 ├── Duration: ~14 seconds
0281 ├── STF files: 3 (all processed)
0282 ├── Messages:
0283 │   ├── run_imminent → all agents prepared
0284 │   ├── start_run → physics phase began
0285 │   ├── stf_gen (x3) → STF files generated
0286 │   ├── stf_ready (x3) → Data Agent processed
0287 │   └── end_run → Run completed
0288 └── Agents involved:
0289     ├── daq_simulator-agent-wenauseic-484
0290     ├── data-agent-wenauseic-481
0291     ├── fastmon-agent-wenauseic-482
0292     └── fast_processing-agent-wenauseic-483
0293 ```
0294 
0295 ## See Also
0296 
0297 - [Agent Management](agent-management.md) - Starting and stopping agents
0298 - [Architecture Overview](architecture.md) - System design
0299 - [Operations Guide](operations.md) - Day-to-day operations