Warning, /swf-testbed/docs/sse-streaming.md is written in an unsupported language. File is not indexed.
0001 # Server-Sent Events (SSE) Real-Time Streaming
0002
0003 Real-time workflow message streaming for remote monitoring and visualization via HTTPS Server-Sent Events.
0004
0005 ## Overview
0006
0007 The SWF Testbed provides real-time streaming of workflow messages to remote clients via REST Server-Sent Events (SSE). This enables distributed agents and external systems to receive live workflow updates without requiring direct ActiveMQ access or complex networking configurations.
0008
0009 **Important**: ActiveMQ message senders require no modifications. ALL messages sent to ActiveMQ are automatically available to SSE clients. Only receiving agents need SSE-specific configuration.
0010
0011 **Key Benefits:**
0012 - **Firewall-friendly**: Uses standard HTTPS connections
0013 - **Real-time**: Sub-second latency for workflow messages
0014 - **Scalable**: Supports multiple concurrent remote clients
0015 - **Secure**: Token-based authentication with production TLS
0016 - **Filtered**: Clients can subscribe to specific message types or workflows
0017
0018 ## Architecture
0019
0020 ```
0021 Workflow Agents → ActiveMQ → Monitor → SSE Broadcaster → Remote Agents
0022 ↓ ↓ ↓ ↓
0023 data-agent epictopic Database Server-side
0024 processing-agent Logging Filtering
0025 daq-simulator
0026 ```
0027
0028 **Data Flow:**
0029 1. **Workflow agents** send messages to ActiveMQ (normal operation)
0030 2. **Monitor** consumes messages, stores in database, enriches with metadata
0031 3. **SSE Broadcaster** applies per-client filters and forwards only matching messages
0032 4. **Remote clients** receive filtered real-time message streams over the network
0033
0034 ## SSE Clients (Receivers)
0035
0036 ### Remote SSE Receiver
0037
0038 The `remote_sse_receiver.py` script connects to the production monitor and displays workflow messages in real-time.
0039
0040 #### Prerequisites
0041
0042 ```bash
0043 export SWF_API_TOKEN="your-production-api-token"
0044 export SWF_SSE_RECEIVER_NAME="descriptive-client-name"
0045 ```
0046
0047 **Important:** `SWF_SSE_RECEIVER_NAME` must be set to a descriptive, unique name that identifies your remote agent. This name will appear in the monitor's agent registry and workflow views.
0048
0049 #### Usage
0050
0051 ```bash
0052 # Basic usage (receives ALL messages)
0053 cd /eic/u/wenauseic/github/swf-testbed
0054 source .venv/bin/activate && source ~/.env
0055 export SWF_SSE_RECEIVER_NAME="my-fastmon"
0056 python example_agents/remote_sse_receiver.py
0057
0058 # Filter by message type
0059 python example_agents/remote_sse_receiver.py --message sse_test
0060
0061 # Filter by multiple message types
0062 python example_agents/remote_sse_receiver.py --message data_ready,processing_complete
0063
0064 # Filter by agent
0065 python example_agents/remote_sse_receiver.py --agent sse_sender-agent
0066
0067 # Combine filters
0068 python example_agents/remote_sse_receiver.py --message sse_test --agent sse_sender-agent
0069 ```
0070
0071 #### Features
0072
0073 - **Authentication**: Automatic token-based authentication
0074 - **Auto-reconnection**: Handles network interruptions gracefully
0075 - **Command-line filtering**: Filter by message types and agents via --message and --agent options
0076 - **Agent registration**: Registers as active monitoring client
0077 - **Real-time display**: Formatted message output with timestamps
0078
0079 #### Output Example
0080
0081 ```
0082 📡 Connecting to SSE stream: https://pandaserver02.sdcc.bnl.gov/swf-monitor/api/messages/stream/
0083 🔌 Testing SSE endpoint...
0084 ✅ SSE stream opened - waiting for messages... (Ctrl+C to exit)
0085 ------------------------------------------------------------
0086 [14:30:25] 🔗 Connected to SSE stream
0087 [14:30:25] 📋 Client ID: a1b2c3d4-e5f6-7890-abcd-ef1234567890
0088 [14:30:28] 📨 Message received:
0089 Type: stf_gen
0090 From: daq-simulator-agent
0091 Run: run-2025-001
0092 File: stf_001.dat
0093 ------------------------------------------------------------
0094 ```
0095
0096 ### Integration Requirements
0097
0098 SSE clients must:
0099 1. **Set descriptive name**: `SWF_SSE_RECEIVER_NAME` environment variable
0100 2. **Authenticate**: Valid `SWF_API_TOKEN` for production access
0101 3. **Register as agent**: Inherit from `BaseAgent` to appear in monitor views and for robust connection handling
0102
0103 ## Server-Side Filtering and Network Efficiency
0104
0105 ### **How Filtering Works**
0106
0107 **ALL ActiveMQ messages** are processed by the SSE system, but **server-side filtering prevents unwanted messages from being sent over the network**.
0108
0109 #### Code Evidence:
0110 1. **Every ActiveMQ message** triggers SSE processing:
0111 ```python
0112 # activemq_processor.py:158
0113 broadcaster.broadcast_message(enriched)
0114 ```
0115
0116 2. **Server applies filters BEFORE sending to clients**:
0117 ```python
0118 # sse_views.py:127
0119 if self._message_matches_filters(message_data, self.client_filters.get(client_id, {})):
0120 client_queue.put_nowait(message_data) # Only filtered messages queued
0121 ```
0122
0123 3. **Filter implementation rejects unwanted messages**:
0124 ```python
0125 # sse_views.py:160-161
0126 if 'msg_types' in filters:
0127 if message.get('msg_type') not in filters['msg_types']:
0128 return False # Message filtered out, NOT sent to client
0129 ```
0130
0131 ### **Network Traffic Control**
0132
0133 - ✅ **No unnecessary network traffic**: Filtered messages never leave the server
0134 - ✅ **Server-side filtering**: Applied before network transmission
0135 - ✅ **Per-client filtering**: Each client receives only matching messages
0136
0137 ### **Filter Types and Examples**
0138
0139 #### Available Filters:
0140 - `msg_types` - Filter by message type
0141 - `agents` - Filter by sender agent (`processed_by` field)
0142 - `run_ids` - Filter by workflow run ID
0143
0144 ## Message Types and Content
0145
0146 SSE clients receive workflow messages (subject to their filters) with enriched metadata:
0147
0148 ### Standard Workflow Messages
0149 - `run_imminent` - New run about to start
0150 - `start_run` - Run initialization complete
0151 - `stf_gen` - New STF data file available
0152 - `data_ready` - Data processing ready
0153 - `processing_complete` - Processing finished
0154 - `pause_run` / `resume_run` - Run control
0155 - `end_run` - Run completed
0156
0157 ### Message Structure
0158 ```json
0159 {
0160 "msg_type": "stf_gen",
0161 "processed_by": "daq-simulator-agent",
0162 "run_id": "run-2025-001",
0163 "filename": "stf_001.dat",
0164 "message": "STF file generated",
0165 "sender_agent": "daq-simulator-agent",
0166 "recipient_agent": "data-agent",
0167 "queue_name": "epictopic",
0168 "sent_at": "2025-01-15T14:30:25.123Z"
0169 }
0170 ```
0171
0172 ### Test Messages
0173 - `sse_test` - Connectivity and functionality testing
0174
0175 ## Security and Authentication
0176
0177 ### Production Authentication
0178 SSE streaming requires valid API tokens:
0179
0180 ```bash
0181 # Get token from monitor admin
0182 export SWF_API_TOKEN="your-token-here"
0183 ```
0184
0185 ### Network Security
0186 - **HTTPS only**: All SSE connections use TLS encryption
0187 - **Token authentication**: No credentials in URLs or query parameters
0188 - **Firewall friendly**: Outbound HTTPS (port 443) only
0189 - **No incoming connections**: Clients connect to monitor, not vice versa
0190
0191 ## Deployment Considerations
0192
0193 ### Monitor Requirements
0194 - **Redis**: Required for multi-process SSE fanout in production
0195 - **Apache configuration**: Proper headers for SSE streaming
0196 - **Database storage**: All messages persisted for audit/replay
0197
0198 ### Client Requirements
0199 - **Persistent connection**: SSE requires long-lived HTTPS connections
0200 - **Reconnection logic**: Handle temporary network disruptions
0201 - **Resource management**: Proper cleanup on shutdown
0202 - **Agent registration**: Must appear in monitor's agent registry
0203
0204 ### Scaling
0205 - **Multiple clients**: Monitor supports concurrent SSE connections
0206 - **Message filtering**: Clients can filter by message type, agent, or run
0207 - **Latency**: Sub-second delivery for real-time monitoring
0208
0209 ## Development and Testing
0210
0211 ### Test Message Generation
0212 Use `remote_sse_sender.py` to generate test messages:
0213
0214 ```bash
0215 # One-shot test batch
0216 export SWF_SENDER_ONESHOT=1
0217 python example_agents/remote_sse_sender.py
0218
0219 # Continuous test messages (every 30s)
0220 python example_agents/remote_sse_sender.py
0221 ```
0222
0223
0224 ## Troubleshooting
0225
0226 ### Common Issues
0227
0228 **Connection Refused (ECONNREFUSED)**
0229 - Check monitor URL and network connectivity
0230 - Verify production monitor is running
0231
0232 **HTTP 401 Unauthorized**
0233 - Verify `SWF_API_TOKEN` is set and valid
0234 - Check token hasn't expired
0235
0236
0237 **Missing Agent Name Error**
0238 ```
0239 ❌ Error: SWF_SSE_RECEIVER_NAME must be set to a descriptive agent name
0240 ```
0241 Set descriptive client identifier:
0242 ```bash
0243 export SWF_SSE_RECEIVER_NAME="workflow-dashboard"
0244 ```
0245
0246 ### Monitoring SSE Health
0247 Check connected clients in monitor web interface:
0248 - **System Agents**: View registered SSE receivers
0249 - **Workflow Messages**: See message delivery status
0250 - **API Status**: `/api/messages/stream/status/` endpoint
0251
0252 ---
0253
0254 For technical implementation details, see [Monitor SSE Documentation](../swf-monitor/docs/SSE_RELAY.md).