Back to home page

EIC code displayed by LXR

 
 

    


Warning, /swf-testbed/docs/sse-streaming.md is written in an unsupported language. File is not indexed.

0001 # Server-Sent Events (SSE) Real-Time Streaming
0002 
0003 Real-time workflow message streaming for remote monitoring and visualization via HTTPS Server-Sent Events.
0004 
0005 ## Overview
0006 
0007 The SWF Testbed provides real-time streaming of workflow messages to remote clients via REST Server-Sent Events (SSE). This enables distributed agents and external systems to receive live workflow updates without requiring direct ActiveMQ access or complex networking configurations.
0008 
0009 **Important**: ActiveMQ message senders require no modifications. ALL messages sent to ActiveMQ are automatically available to SSE clients. Only receiving agents need SSE-specific configuration.
0010 
0011 **Key Benefits:**
0012 - **Firewall-friendly**: Uses standard HTTPS connections 
0013 - **Real-time**: Sub-second latency for workflow messages
0014 - **Scalable**: Supports multiple concurrent remote clients
0015 - **Secure**: Token-based authentication with production TLS
0016 - **Filtered**: Clients can subscribe to specific message types or workflows
0017 
0018 ## Architecture
0019 
0020 ```
0021 Workflow Agents → ActiveMQ → Monitor → SSE Broadcaster → Remote Agents
0022      ↓              ↓           ↓           ↓              
0023   data-agent    epictopic   Database   Server-side    
0024 processing-agent            Logging    Filtering      
0025   daq-simulator                                       
0026 ```
0027 
0028 **Data Flow:**
0029 1. **Workflow agents** send messages to ActiveMQ (normal operation)
0030 2. **Monitor** consumes messages, stores in database, enriches with metadata  
0031 3. **SSE Broadcaster** applies per-client filters and forwards only matching messages
0032 4. **Remote clients** receive filtered real-time message streams over the network
0033 
0034 ## SSE Clients (Receivers)
0035 
0036 ### Remote SSE Receiver
0037 
0038 The `remote_sse_receiver.py` script connects to the production monitor and displays workflow messages in real-time.
0039 
0040 #### Prerequisites
0041 
0042 ```bash
0043 export SWF_API_TOKEN="your-production-api-token"
0044 export SWF_SSE_RECEIVER_NAME="descriptive-client-name"
0045 ```
0046 
0047 **Important:** `SWF_SSE_RECEIVER_NAME` must be set to a descriptive, unique name that identifies your remote agent. This name will appear in the monitor's agent registry and workflow views.
0048 
0049 #### Usage
0050 
0051 ```bash
0052 # Basic usage (receives ALL messages)
0053 cd /eic/u/wenauseic/github/swf-testbed
0054 source .venv/bin/activate && source ~/.env
0055 export SWF_SSE_RECEIVER_NAME="my-fastmon"
0056 python example_agents/remote_sse_receiver.py
0057 
0058 # Filter by message type
0059 python example_agents/remote_sse_receiver.py --message sse_test
0060 
0061 # Filter by multiple message types
0062 python example_agents/remote_sse_receiver.py --message data_ready,processing_complete
0063 
0064 # Filter by agent
0065 python example_agents/remote_sse_receiver.py --agent sse_sender-agent
0066 
0067 # Combine filters
0068 python example_agents/remote_sse_receiver.py --message sse_test --agent sse_sender-agent
0069 ```
0070 
0071 #### Features
0072 
0073 - **Authentication**: Automatic token-based authentication
0074 - **Auto-reconnection**: Handles network interruptions gracefully
0075 - **Command-line filtering**: Filter by message types and agents via --message and --agent options
0076 - **Agent registration**: Registers as active monitoring client
0077 - **Real-time display**: Formatted message output with timestamps
0078 
0079 #### Output Example
0080 
0081 ```
0082 📡 Connecting to SSE stream: https://pandaserver02.sdcc.bnl.gov/swf-monitor/api/messages/stream/
0083 🔌 Testing SSE endpoint...
0084 ✅ SSE stream opened - waiting for messages... (Ctrl+C to exit)
0085 ------------------------------------------------------------
0086 [14:30:25] 🔗 Connected to SSE stream
0087 [14:30:25] 📋 Client ID: a1b2c3d4-e5f6-7890-abcd-ef1234567890
0088 [14:30:28] 📨 Message received:
0089             Type: stf_gen
0090             From: daq-simulator-agent
0091             Run:  run-2025-001
0092             File: stf_001.dat
0093 ------------------------------------------------------------
0094 ```
0095 
0096 ### Integration Requirements
0097 
0098 SSE clients must:
0099 1. **Set descriptive name**: `SWF_SSE_RECEIVER_NAME` environment variable
0100 2. **Authenticate**: Valid `SWF_API_TOKEN` for production access
0101 3. **Register as agent**: Inherit from `BaseAgent` to appear in monitor views and for robust connection handling
0102 
0103 ## Server-Side Filtering and Network Efficiency
0104 
0105 ### **How Filtering Works**
0106 
0107 **ALL ActiveMQ messages** are processed by the SSE system, but **server-side filtering prevents unwanted messages from being sent over the network**.
0108 
0109 #### Code Evidence:
0110 1. **Every ActiveMQ message** triggers SSE processing:
0111    ```python
0112    # activemq_processor.py:158
0113    broadcaster.broadcast_message(enriched)
0114    ```
0115 
0116 2. **Server applies filters BEFORE sending to clients**:
0117    ```python
0118    # sse_views.py:127
0119    if self._message_matches_filters(message_data, self.client_filters.get(client_id, {})):
0120        client_queue.put_nowait(message_data)  # Only filtered messages queued
0121    ```
0122 
0123 3. **Filter implementation rejects unwanted messages**:
0124    ```python
0125    # sse_views.py:160-161
0126    if 'msg_types' in filters:
0127        if message.get('msg_type') not in filters['msg_types']:
0128            return False  # Message filtered out, NOT sent to client
0129    ```
0130 
0131 ### **Network Traffic Control**
0132 
0133 - ✅ **No unnecessary network traffic**: Filtered messages never leave the server
0134 - ✅ **Server-side filtering**: Applied before network transmission  
0135 - ✅ **Per-client filtering**: Each client receives only matching messages
0136 
0137 ### **Filter Types and Examples**
0138 
0139 #### Available Filters:
0140 - `msg_types` - Filter by message type
0141 - `agents` - Filter by sender agent (`processed_by` field)
0142 - `run_ids` - Filter by workflow run ID
0143 
0144 ## Message Types and Content
0145 
0146 SSE clients receive workflow messages (subject to their filters) with enriched metadata:
0147 
0148 ### Standard Workflow Messages
0149 - `run_imminent` - New run about to start
0150 - `start_run` - Run initialization complete
0151 - `stf_gen` - New STF data file available  
0152 - `data_ready` - Data processing ready
0153 - `processing_complete` - Processing finished
0154 - `pause_run` / `resume_run` - Run control
0155 - `end_run` - Run completed
0156 
0157 ### Message Structure
0158 ```json
0159 {
0160   "msg_type": "stf_gen",
0161   "processed_by": "daq-simulator-agent", 
0162   "run_id": "run-2025-001",
0163   "filename": "stf_001.dat",
0164   "message": "STF file generated",
0165   "sender_agent": "daq-simulator-agent",
0166   "recipient_agent": "data-agent",
0167   "queue_name": "epictopic",
0168   "sent_at": "2025-01-15T14:30:25.123Z"
0169 }
0170 ```
0171 
0172 ### Test Messages
0173 - `sse_test` - Connectivity and functionality testing
0174 
0175 ## Security and Authentication
0176 
0177 ### Production Authentication
0178 SSE streaming requires valid API tokens:
0179 
0180 ```bash
0181 # Get token from monitor admin
0182 export SWF_API_TOKEN="your-token-here"
0183 ```
0184 
0185 ### Network Security
0186 - **HTTPS only**: All SSE connections use TLS encryption
0187 - **Token authentication**: No credentials in URLs or query parameters  
0188 - **Firewall friendly**: Outbound HTTPS (port 443) only
0189 - **No incoming connections**: Clients connect to monitor, not vice versa
0190 
0191 ## Deployment Considerations
0192 
0193 ### Monitor Requirements
0194 - **Redis**: Required for multi-process SSE fanout in production
0195 - **Apache configuration**: Proper headers for SSE streaming
0196 - **Database storage**: All messages persisted for audit/replay
0197 
0198 ### Client Requirements  
0199 - **Persistent connection**: SSE requires long-lived HTTPS connections
0200 - **Reconnection logic**: Handle temporary network disruptions
0201 - **Resource management**: Proper cleanup on shutdown
0202 - **Agent registration**: Must appear in monitor's agent registry
0203 
0204 ### Scaling
0205 - **Multiple clients**: Monitor supports concurrent SSE connections
0206 - **Message filtering**: Clients can filter by message type, agent, or run
0207 - **Latency**: Sub-second delivery for real-time monitoring
0208 
0209 ## Development and Testing
0210 
0211 ### Test Message Generation
0212 Use `remote_sse_sender.py` to generate test messages:
0213 
0214 ```bash
0215 # One-shot test batch
0216 export SWF_SENDER_ONESHOT=1  
0217 python example_agents/remote_sse_sender.py
0218 
0219 # Continuous test messages (every 30s)
0220 python example_agents/remote_sse_sender.py
0221 ```
0222 
0223 
0224 ## Troubleshooting
0225 
0226 ### Common Issues
0227 
0228 **Connection Refused (ECONNREFUSED)**
0229 - Check monitor URL and network connectivity
0230 - Verify production monitor is running
0231 
0232 **HTTP 401 Unauthorized**  
0233 - Verify `SWF_API_TOKEN` is set and valid
0234 - Check token hasn't expired
0235 
0236 
0237 **Missing Agent Name Error**
0238 ```
0239 ❌ Error: SWF_SSE_RECEIVER_NAME must be set to a descriptive agent name
0240 ```
0241 Set descriptive client identifier:
0242 ```bash
0243 export SWF_SSE_RECEIVER_NAME="workflow-dashboard"
0244 ```
0245 
0246 ### Monitoring SSE Health
0247 Check connected clients in monitor web interface:
0248 - **System Agents**: View registered SSE receivers
0249 - **Workflow Messages**: See message delivery status  
0250 - **API Status**: `/api/messages/stream/status/` endpoint
0251 
0252 ---
0253 
0254 For technical implementation details, see [Monitor SSE Documentation](../swf-monitor/docs/SSE_RELAY.md).