File indexing completed on 2026-04-25 08:29:12
0001
0002 """
0003 Remote SSE Receiver: Connects to the monitor's SSE stream to receive workflow messages.
0004
0005 This script receives real-time workflow messages via Server-Sent Events (SSE)
0006 from the swf-monitor running under production Apache. It connects to the SSE
0007 endpoint and logs received messages.
0008
0009 Command-line filtering examples (based on remote_sse_sender.py messages):
0010 python remote_sse_receiver.py --message sse_test
0011 python remote_sse_receiver.py --message stf_ready,processing_complete
0012 python remote_sse_receiver.py --agent sse_sender-agent
0013 python remote_sse_receiver.py --message sse_test --agent sse_sender-agent
0014 """
0015
0016 import os
0017 import sys
0018 import time
0019 import json
0020 import signal
0021 import argparse
0022 from pathlib import Path
0023
0024 import requests
0025 from swf_common_lib.base_agent import BaseAgent
0026
0027
0028 DEFAULT_MONITOR_BASE = "https://pandaserver02.sdcc.bnl.gov/swf-monitor"
0029
0030 def setup_environment() -> None:
0031 """Load environment variables from ~/.env file if present."""
0032 env_file = Path.home() / ".env"
0033 if env_file.exists():
0034 with env_file.open() as f:
0035 for line in f:
0036 line = line.strip()
0037 if not line or line.startswith('#') or '=' not in line:
0038 continue
0039 if line.startswith('export '):
0040 line = line[7:]
0041 key, value = line.split('=', 1)
0042 os.environ[key.strip()] = value.strip("'\"")
0043
0044
0045 class RemoteSSEReceiver(BaseAgent):
0046 """Production-only SSE client for swf-monitor that registers as an agent."""
0047
0048 def __init__(self, msg_types=None, agents=None) -> None:
0049 setup_environment()
0050
0051
0052 env_url = os.getenv('SWF_MONITOR_PROD_URL')
0053 if env_url:
0054 monitor_base = env_url.rstrip('/')
0055 else:
0056 monitor_base = DEFAULT_MONITOR_BASE
0057 print(f"ā¹ļø Using default production URL: {monitor_base} (override with SWF_MONITOR_PROD_URL)")
0058
0059
0060 os.environ['SWF_MONITOR_URL'] = monitor_base
0061 os.environ['SWF_MONITOR_HTTP_URL'] = monitor_base
0062
0063
0064 super().__init__(agent_type='sse_receiver', subscription_queue='/topic/epictopic')
0065
0066
0067 user_agent_name = os.getenv('SWF_SSE_RECEIVER_NAME')
0068 if user_agent_name:
0069 self.agent_name = user_agent_name
0070 self.monitor_base = monitor_base
0071 self.msg_types = msg_types
0072 self.agents = agents
0073
0074
0075 self.session = requests.Session()
0076 self.session.headers.update({
0077 'Authorization': f'Token {self.api_token}',
0078 'Cache-Control': 'no-cache',
0079 'Accept': 'text/event-stream',
0080 'Connection': 'keep-alive',
0081 })
0082
0083
0084 signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0))
0085 signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0))
0086
0087 print("š§ Remote SSE Receiver initialized")
0088 print(f" Monitor base: {self.monitor_base}")
0089 print(f" Token prefix: {self.api_token[:12]}...")
0090 ca_bundle = os.getenv('REQUESTS_CA_BUNDLE')
0091 if ca_bundle:
0092 print(f" CA bundle: {ca_bundle}")
0093
0094
0095 def connect_and_receive(self) -> None:
0096 """Connect to SSE stream and process messages in a loop."""
0097
0098 stream_url = f"{self.monitor_base}/api/messages/stream/"
0099 params = []
0100 if self.msg_types:
0101 params.append(f"msg_types={','.join(self.msg_types)}")
0102 if self.agents:
0103 params.append(f"agents={','.join(self.agents)}")
0104 if params:
0105 stream_url += "?" + "&".join(params)
0106 status_url = f"{self.monitor_base}/api/messages/stream/status/"
0107 print(f"š” Connecting to SSE stream: {stream_url}")
0108
0109 while True:
0110 try:
0111
0112 print("š Testing SSE endpoint...")
0113
0114
0115 status_resp = self.session.get(status_url, timeout=20, allow_redirects=False, headers={'Accept': 'application/json'})
0116 if status_resp.status_code != 200:
0117 if status_resp.status_code in (401, 403):
0118 print(f"ā Auth failed (HTTP {status_resp.status_code}). Check SWF_API_TOKEN (token may be missing/invalid).")
0119 www = status_resp.headers.get('WWW-Authenticate')
0120 if www:
0121 print(f" WWW-Authenticate: {www}")
0122 print(" If running via Apache, ensure 'WSGIPassAuthorization On' is enabled so the Authorization header reaches Django.")
0123 elif 300 <= status_resp.status_code < 400:
0124 loc = status_resp.headers.get('Location', 'unknown')
0125 print(f"ā Got redirect (HTTP {status_resp.status_code}) to {loc}. This usually means Authorization isn't being passed through.")
0126 print(" Enable 'WSGIPassAuthorization On' in Apache for the /swf-monitor app and reload Apache.")
0127 else:
0128 print(f"ā SSE endpoint not available: HTTP {status_resp.status_code}")
0129 print(" Retrying in 15 seconds...")
0130 time.sleep(15)
0131 continue
0132
0133
0134
0135 response = self.session.get(stream_url, stream=True, timeout=(10, 3600), allow_redirects=False)
0136 if response.status_code != 200:
0137 if response.status_code in (401, 403):
0138 print(f"ā Auth failed opening stream (HTTP {response.status_code}). Check SWF_API_TOKEN.")
0139 www = response.headers.get('WWW-Authenticate')
0140 if www:
0141 print(f" WWW-Authenticate: {www}")
0142 print(" If behind Apache, ensure 'WSGIPassAuthorization On' is configured.")
0143 elif 300 <= response.status_code < 400:
0144 loc = response.headers.get('Location', 'unknown')
0145 print(f"ā Redirect when opening stream (HTTP {response.status_code}) to {loc}. Authorization likely not forwarded by Apache.")
0146 print(" Configure 'WSGIPassAuthorization On' and reload Apache.")
0147 else:
0148 print(f"ā Failed to open stream: HTTP {response.status_code}")
0149 print(" Response Headers:")
0150 for key, value in response.headers.items():
0151 print(f" {key}: {value}")
0152 print(" Response Body:")
0153 print(f" {response.text}")
0154 print(" Retrying in 15 seconds...")
0155 time.sleep(15)
0156 continue
0157
0158 print("ā
SSE stream opened - waiting for events... (Ctrl+C to exit)")
0159
0160
0161 self.send_heartbeat()
0162
0163 print("-" * 60)
0164
0165 self._process_sse_stream(response)
0166
0167 except requests.exceptions.ReadTimeout as e:
0168 print(f"ā±ļø Read timeout while waiting for messages: {e}")
0169 print(" Reconnecting in 15 seconds...")
0170 time.sleep(15)
0171 except requests.exceptions.RequestException as e:
0172 print(f"ā Connection error: {e}")
0173 print(" Retrying in 15 seconds...")
0174 time.sleep(15)
0175 except Exception as e:
0176 print(f"ā Unexpected error: {e}")
0177 time.sleep(15)
0178
0179
0180 def _process_sse_stream(self, response) -> None:
0181 event_buffer = []
0182 try:
0183 for line in response.iter_lines(decode_unicode=True, chunk_size=1):
0184 if line is None:
0185 continue
0186 line = line.strip()
0187 if not line:
0188 if event_buffer:
0189 self._handle_sse_event(event_buffer)
0190 event_buffer = []
0191 else:
0192 event_buffer.append(line)
0193 except KeyboardInterrupt:
0194 print("\nš” Received interrupt - closing connection...")
0195 except Exception as e:
0196 print(f"ā Error processing stream: {e}")
0197 finally:
0198 try:
0199 response.close()
0200 except Exception:
0201 pass
0202
0203 def _handle_sse_event(self, event_lines) -> None:
0204 event_type = "message"
0205 event_data = ""
0206 for line in event_lines:
0207 if line.startswith('event: '):
0208 event_type = line[7:]
0209 elif line.startswith('data: '):
0210 event_data = line[6:]
0211
0212 timestamp = time.strftime("%H:%M:%S")
0213 if event_type == "connected":
0214 print(f"[{timestamp}] š Connected to SSE stream")
0215 try:
0216 data = json.loads(event_data)
0217 client_id = data.get('client_id', 'unknown')
0218 print(f"[{timestamp}] š Client ID: {client_id}")
0219 except Exception:
0220 pass
0221 elif event_type == "heartbeat":
0222
0223 return
0224 else:
0225 try:
0226 data = json.loads(event_data)
0227 msg_type = data.get('msg_type', 'unknown')
0228 processed_by = data.get('processed_by', 'unknown')
0229 run_id = data.get('run_id', 'N/A')
0230 print(f"[{timestamp}] šØ Message received:")
0231 print(f" Message: {msg_type}")
0232 print(f" Agent: {processed_by}")
0233 print(f" Run: {run_id}")
0234 if 'message' in data:
0235 print(f" Text: {data['message']}")
0236 if 'filename' in data:
0237 print(f" File: {data['filename']}")
0238 print("-" * 60)
0239 except json.JSONDecodeError:
0240 print(f"[{timestamp}] šØ Non-JSON message: {event_data}")
0241 except Exception as e:
0242 print(f"[{timestamp}] ā Error parsing message: {e}")
0243
0244
0245 def main() -> None:
0246 parser = argparse.ArgumentParser(
0247 description="Remote SSE Receiver: Connect to workflow monitor SSE stream",
0248 formatter_class=argparse.RawDescriptionHelpFormatter,
0249 epilog="""
0250 Examples:
0251 python remote_sse_receiver.py # Receive all messages
0252 python remote_sse_receiver.py --message stf_gen # Only STF generation messages
0253 python remote_sse_receiver.py --agent daq-simulator # Only messages from daq-simulator
0254 python remote_sse_receiver.py --message stf_gen,stf_ready --agent daq-simulator
0255
0256 Environment variables:
0257 SWF_SSE_RECEIVER_NAME - Required: descriptive agent name
0258 SWF_API_TOKEN - Required: monitor API token
0259 SWF_MONITOR_PROD_URL - Optional: override monitor URL
0260 """)
0261
0262 parser.add_argument('--message', '--msg-type', dest='msg_types',
0263 help='Filter by message type(s), comma-separated (e.g., stf_gen,stf_ready)')
0264 parser.add_argument('--agent', dest='agents',
0265 help='Filter by agent name(s), comma-separated (e.g., daq-simulator,data-agent)')
0266
0267 args = parser.parse_args()
0268
0269
0270 msg_types = None
0271 if args.msg_types:
0272 msg_types = [t.strip() for t in args.msg_types.split(',')]
0273
0274 agents = None
0275 if args.agents:
0276 agents = [a.strip() for a in args.agents.split(',')]
0277
0278 try:
0279 receiver = RemoteSSEReceiver(msg_types=msg_types, agents=agents)
0280 if msg_types or agents:
0281 filters = []
0282 if msg_types:
0283 filters.append(f"messages: {', '.join(msg_types)}")
0284 if agents:
0285 filters.append(f"agents: {', '.join(agents)}")
0286 print(f"š Filtering enabled - {' | '.join(filters)}")
0287 else:
0288 print("š¬ Receiving all messages (no filtering)")
0289
0290 receiver.connect_and_receive()
0291 except KeyboardInterrupt:
0292 print("\nš” Received interrupt signal - exiting...")
0293 sys.exit(0)
0294 except Exception as e:
0295 print(f"ā Error: {e}")
0296 sys.exit(1)
0297
0298
0299 if __name__ == "__main__":
0300 main()