Back to home page

EIC code displayed by LXR

 
 

    


Warning, /swf-monitor/docs/SSE_RELAY.md is written in an unsupported language. File is not indexed.

0001 # Server-Sent Events (SSE) Relay via Channels + Redis
0002 
0003 Date: 2025-08-26
0004 
0005 ## Overview
0006 
0007 SWF Monitor forwards workflow messages it consumes from ActiveMQ to remote HTTP clients using Server-Sent Events (SSE). This provides a WAN/firewall-friendly, one-way push over HTTPS.
0008 
0009 For user documentation on SSE streaming including client setup and usage examples, see [SSE Real-Time Streaming](../../swf-testbed/docs/sse-streaming.md) in the testbed repository.
0010 
0011 Production runs Django under mod_wsgi (one or more daemon processes) with the ActiveMQ listener in a separate process, and `/swf-monitor/mcp/` on a separate uvicorn ASGI worker. To reliably fan out events to all SSE clients across processes, we use Django Channels with a Redis channel layer as an inter-process relay. WebSockets are not required or enabled for this feature.
0012 
0013 Important: Redis/Channels is REQUIRED in any environment that must support remote ActiveMQ client recipients via SSE. The in-memory fallback is for single-process development only and is not suitable for production.
0014 
0015 ## Architecture
0016 
0017 1. ActiveMQ listener (management command/process) consumes messages and persists them (WorkflowMessage, SystemAgent).
0018 2. Listener publishes enriched message payloads to a Channels group (default: `workflow_events`).
0019 3. Each web/WSGI process starts a small background subscriber that joins the group and forwards messages into the in-memory `SSEMessageBroadcaster`.
0020 4. SSE clients connect to `/api/messages/stream/` and receive messages from per-client queues with heartbeats and optional filters.
0021 
0022 Key files:
0023 - `monitor_app/activemq_processor.py` — persists and publishes to Channels group (and in-process fallback)
0024 - `monitor_app/sse_views.py` — SSE endpoints, broadcaster, and background subscriber loop
0025 - `swf_monitor_project/settings.py` — `CHANNEL_LAYERS` (Redis if `REDIS_URL`), `SSE_CHANNEL_GROUP`
0026 
0027 ## Configuration
0028 
0029 Environment variables (loaded via `.env`):
0030 - `REDIS_URL` — e.g., `redis://localhost:6379/0`. Enables Redis-backed channel layer. If unset, falls back to in-memory (single-process only).
0031 - `SSE_CHANNEL_GROUP` — Channels group name (default: `workflow_events`).
0032 
0033 Django settings detect `REDIS_URL` and configure `CHANNEL_LAYERS` accordingly.
0034 
0035 ## Authentication and CORS
0036 
0037 The SSE endpoint (`/api/messages/stream/`) implements manual token authentication (DRF removed to avoid content negotiation issues). Browser EventSource cannot send Authorization headers; use session authentication for same-origin access. For cross-origin browser use, configure CORS for credentialed requests (no wildcard origins) and ensure cookies are allowed.
0038 
0039 Non-browser clients (headless) may pass tokens using standard HTTP clients; avoid placing tokens in query strings unless explicitly approved.
0040 
0041 ## Apache/WSGI Streaming
0042 
0043 Ensure production is configured so streaming responses are not buffered or terminated prematurely:
0044 - Use mod_wsgi daemon mode and appropriate timeouts to allow long-lived connections.
0045 - Disable proxy buffering if present; `X-Accel-Buffering: no` is Nginx-specific and not used with Apache.
0046 
0047 ## Bash snippets
0048 
0049 Activate environment and install requirements:
0050 
0051 ```bash
0052 cd /eic/u/wenauseic/github/swf-testbed
0053 source .venv/bin/activate
0054 source ~/.env
0055 pip install -r /eic/u/wenauseic/github/swf-monitor/requirements.txt
0056 ```
0057 
0058 Set Redis configuration for the monitor (example):
0059 
0060 ```bash
0061 export REDIS_URL=redis://localhost:6379/0
0062 export SSE_CHANNEL_GROUP=workflow_events
0063 ```
0064 
0065 Restart services (examples; adapt to your deployment):
0066 
0067 ```bash
0068 # Restart the ActiveMQ listener and reload Apache
0069 # supervisorctl restart swf-monitor-listener
0070 # sudo systemctl reload httpd
0071 ```
0072 
0073 ## Filters and payload enrichment
0074 
0075 Forwarded SSE payloads are enriched to support filters and diagnostics:
0076 - `sender_agent`, `recipient_agent`
0077 - `queue_name`
0078 - `sent_at`
0079 
0080 SSE clients can filter via query params: `msg_types`, `agents`, `run_ids`.
0081 
0082 ## Reliability & backpressure
0083 
0084 SSE is best-effort real-time; there is no replay on reconnect. Each client has a bounded queue (drop-oldest on overflow). Heartbeats are emitted ~30s by default.
0085 
0086 ## Testing
0087 
0088 The SSE functionality is comprehensively tested in `monitor_app/tests/test_sse_stream.py`:
0089 - **Unit tests**: Core broadcasting logic, message filtering, client management
0090 - **Integration tests**: Channel layer communication (when Redis is available)
0091 - **HTTP endpoint tests**: Authentication, response format, status reporting
0092 
0093 Run SSE-specific tests:
0094 ```bash
0095 ./run_tests.py src/monitor_app/tests/test_sse_stream.py
0096 ```
0097 
0098 The tests use Django's test infrastructure rather than external HTTP connections, providing fast, reliable validation of SSE functionality without network dependencies.
0099 
0100 ## Operational notes
0101 
0102 - If `REDIS_URL` is unset, cross-process fanout will not work; use only for single-process dev. For production and any deployment serving remote recipients, `REDIS_URL` must be configured and Redis must be running.
0103 - Database write amplification from per-event stats is minimized in the hot path; consider batching if high-volume SSE usage is expected.