Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:12

0001 #!/usr/bin/env python3
0002 """
0003 Remote SSE Receiver: Connects to the monitor's SSE stream to receive workflow messages.
0004 
0005 This script receives real-time workflow messages via Server-Sent Events (SSE)
0006 from the swf-monitor running under production Apache. It connects to the SSE
0007 endpoint and logs received messages.
0008 
0009 Command-line filtering examples (based on remote_sse_sender.py messages):
0010   python remote_sse_receiver.py --message sse_test
0011   python remote_sse_receiver.py --message stf_ready,processing_complete  
0012   python remote_sse_receiver.py --agent sse_sender-agent
0013   python remote_sse_receiver.py --message sse_test --agent sse_sender-agent
0014 """
0015 
0016 import os
0017 import sys
0018 import time
0019 import json
0020 import signal
0021 import argparse
0022 from pathlib import Path
0023 
0024 import requests
0025 from swf_common_lib.base_agent import BaseAgent
0026 
0027 # Canonical production base URL (can be overridden by SWF_MONITOR_PROD_URL)
0028 DEFAULT_MONITOR_BASE = "https://pandaserver02.sdcc.bnl.gov/swf-monitor"
0029 
0030 def setup_environment() -> None:
0031     """Load environment variables from ~/.env file if present."""
0032     env_file = Path.home() / ".env"
0033     if env_file.exists():
0034         with env_file.open() as f:
0035             for line in f:
0036                 line = line.strip()
0037                 if not line or line.startswith('#') or '=' not in line:
0038                     continue
0039                 if line.startswith('export '):
0040                     line = line[7:]
0041                 key, value = line.split('=', 1)
0042                 os.environ[key.strip()] = value.strip("'\"")
0043 
0044 
0045 class RemoteSSEReceiver(BaseAgent):
0046     """Production-only SSE client for swf-monitor that registers as an agent."""
0047 
0048     def __init__(self, msg_types=None, agents=None) -> None:
0049         setup_environment()
0050 
0051         # Production monitor base URL (env override, otherwise production default)
0052         env_url = os.getenv('SWF_MONITOR_PROD_URL')
0053         if env_url:
0054             monitor_base = env_url.rstrip('/')
0055         else:
0056             monitor_base = DEFAULT_MONITOR_BASE
0057             print(f"ā„¹ļø  Using default production URL: {monitor_base} (override with SWF_MONITOR_PROD_URL)")
0058 
0059         # Override monitor URLs for production
0060         os.environ['SWF_MONITOR_URL'] = monitor_base
0061         os.environ['SWF_MONITOR_HTTP_URL'] = monitor_base
0062 
0063         # Initialize BaseAgent with descriptive type and name
0064         super().__init__(agent_type='sse_receiver', subscription_queue='/topic/epictopic')
0065         
0066         # Override agent name if user provided one
0067         user_agent_name = os.getenv('SWF_SSE_RECEIVER_NAME')
0068         if user_agent_name:
0069             self.agent_name = user_agent_name
0070         self.monitor_base = monitor_base
0071         self.msg_types = msg_types
0072         self.agents = agents
0073 
0074         # HTTP session: production defaults (verify=True, env proxies honored)
0075         self.session = requests.Session()
0076         self.session.headers.update({
0077             'Authorization': f'Token {self.api_token}',
0078             'Cache-Control': 'no-cache',
0079             'Accept': 'text/event-stream',
0080             'Connection': 'keep-alive',
0081         })
0082 
0083         # Simple shutdown - just exit immediately
0084         signal.signal(signal.SIGINT, lambda signum, frame: sys.exit(0))
0085         signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit(0))
0086 
0087         print("šŸ”§ Remote SSE Receiver initialized")
0088         print(f"   Monitor base: {self.monitor_base}")
0089         print(f"   Token prefix: {self.api_token[:12]}...")
0090         ca_bundle = os.getenv('REQUESTS_CA_BUNDLE')
0091         if ca_bundle:
0092             print(f"   CA bundle:   {ca_bundle}")
0093 
0094 
0095     def connect_and_receive(self) -> None:
0096         """Connect to SSE stream and process messages in a loop."""
0097         # Build stream URL with filters
0098         stream_url = f"{self.monitor_base}/api/messages/stream/"
0099         params = []
0100         if self.msg_types:
0101             params.append(f"msg_types={','.join(self.msg_types)}")
0102         if self.agents:
0103             params.append(f"agents={','.join(self.agents)}")
0104         if params:
0105             stream_url += "?" + "&".join(params)
0106         status_url = f"{self.monitor_base}/api/messages/stream/status/"
0107         print(f"šŸ“” Connecting to SSE stream: {stream_url}")
0108 
0109         while True:
0110             try:
0111                 # Status precheck
0112                 print("šŸ”Œ Testing SSE endpoint...")
0113                 # Do not follow redirects; 302 likely indicates auth not reaching Django
0114                 # The status endpoint is a regular DRF endpoint, not an SSE stream, so override Accept header
0115                 status_resp = self.session.get(status_url, timeout=20, allow_redirects=False, headers={'Accept': 'application/json'})
0116                 if status_resp.status_code != 200:
0117                     if status_resp.status_code in (401, 403):
0118                         print(f"āŒ Auth failed (HTTP {status_resp.status_code}). Check SWF_API_TOKEN (token may be missing/invalid).")
0119                         www = status_resp.headers.get('WWW-Authenticate')
0120                         if www:
0121                             print(f"   WWW-Authenticate: {www}")
0122                         print("   If running via Apache, ensure 'WSGIPassAuthorization On' is enabled so the Authorization header reaches Django.")
0123                     elif 300 <= status_resp.status_code < 400:
0124                         loc = status_resp.headers.get('Location', 'unknown')
0125                         print(f"āŒ Got redirect (HTTP {status_resp.status_code}) to {loc}. This usually means Authorization isn't being passed through.")
0126                         print("   Enable 'WSGIPassAuthorization On' in Apache for the /swf-monitor app and reload Apache.")
0127                     else:
0128                         print(f"āŒ SSE endpoint not available: HTTP {status_resp.status_code}")
0129                     print("   Retrying in 15 seconds...")
0130                     time.sleep(15)
0131                     continue
0132 
0133                 # Open the SSE stream (blocks quietly while waiting for events)
0134                 # Do not follow redirects; treat as auth failure
0135                 response = self.session.get(stream_url, stream=True, timeout=(10, 3600), allow_redirects=False)
0136                 if response.status_code != 200:
0137                     if response.status_code in (401, 403):
0138                         print(f"āŒ Auth failed opening stream (HTTP {response.status_code}). Check SWF_API_TOKEN.")
0139                         www = response.headers.get('WWW-Authenticate')
0140                         if www:
0141                             print(f"   WWW-Authenticate: {www}")
0142                         print("   If behind Apache, ensure 'WSGIPassAuthorization On' is configured.")
0143                     elif 300 <= response.status_code < 400:
0144                         loc = response.headers.get('Location', 'unknown')
0145                         print(f"āŒ Redirect when opening stream (HTTP {response.status_code}) to {loc}. Authorization likely not forwarded by Apache.")
0146                         print("   Configure 'WSGIPassAuthorization On' and reload Apache.")
0147                     else:
0148                         print(f"āŒ Failed to open stream: HTTP {response.status_code}")
0149                         print("   Response Headers:")
0150                         for key, value in response.headers.items():
0151                             print(f"     {key}: {value}")
0152                         print("   Response Body:")
0153                         print(f"     {response.text}")
0154                     print("   Retrying in 15 seconds...")
0155                     time.sleep(15)
0156                     continue
0157 
0158                 print("āœ… SSE stream opened - waiting for events... (Ctrl+C to exit)")
0159                 
0160                 # Register this SSE receiver as an active agent
0161                 self.send_heartbeat()
0162                 
0163                 print("-" * 60)
0164                 # streaming until broken or stopped
0165                 self._process_sse_stream(response)
0166 
0167             except requests.exceptions.ReadTimeout as e:
0168                 print(f"ā±ļø  Read timeout while waiting for messages: {e}")
0169                 print("   Reconnecting in 15 seconds...")
0170                 time.sleep(15)
0171             except requests.exceptions.RequestException as e:
0172                 print(f"āŒ Connection error: {e}")
0173                 print("   Retrying in 15 seconds...")
0174                 time.sleep(15)
0175             except Exception as e:
0176                 print(f"āŒ Unexpected error: {e}")
0177                 time.sleep(15)
0178 
0179 
0180     def _process_sse_stream(self, response) -> None:
0181         event_buffer = []
0182         try:
0183             for line in response.iter_lines(decode_unicode=True, chunk_size=1):
0184                 if line is None:
0185                     continue
0186                 line = line.strip()
0187                 if not line:
0188                     if event_buffer:
0189                         self._handle_sse_event(event_buffer)
0190                         event_buffer = []
0191                 else:
0192                     event_buffer.append(line)
0193         except KeyboardInterrupt:
0194             print("\nšŸ“” Received interrupt - closing connection...")
0195         except Exception as e:
0196             print(f"āŒ Error processing stream: {e}")
0197         finally:
0198             try:
0199                 response.close()
0200             except Exception:
0201                 pass
0202 
0203     def _handle_sse_event(self, event_lines) -> None:
0204         event_type = "message"
0205         event_data = ""
0206         for line in event_lines:
0207             if line.startswith('event: '):
0208                 event_type = line[7:]
0209             elif line.startswith('data: '):
0210                 event_data = line[6:]
0211 
0212         timestamp = time.strftime("%H:%M:%S")
0213         if event_type == "connected":
0214             print(f"[{timestamp}] šŸ”— Connected to SSE stream")
0215             try:
0216                 data = json.loads(event_data)
0217                 client_id = data.get('client_id', 'unknown')
0218                 print(f"[{timestamp}] šŸ“‹ Client ID: {client_id}")
0219             except Exception:
0220                 pass
0221         elif event_type == "heartbeat":
0222             # Stay quiet on heartbeats to avoid log spam
0223             return
0224         else:
0225             try:
0226                 data = json.loads(event_data)
0227                 msg_type = data.get('msg_type', 'unknown')
0228                 processed_by = data.get('processed_by', 'unknown')
0229                 run_id = data.get('run_id', 'N/A')
0230                 print(f"[{timestamp}] šŸ“Ø Message received:")
0231                 print(f"         Message: {msg_type}")
0232                 print(f"           Agent: {processed_by}")
0233                 print(f"            Run:  {run_id}")
0234                 if 'message' in data:
0235                     print(f"            Text: {data['message']}")
0236                 if 'filename' in data:
0237                     print(f"            File: {data['filename']}")
0238                 print("-" * 60)
0239             except json.JSONDecodeError:
0240                 print(f"[{timestamp}] šŸ“Ø Non-JSON message: {event_data}")
0241             except Exception as e:
0242                 print(f"[{timestamp}] āŒ Error parsing message: {e}")
0243 
0244 
0245 def main() -> None:
0246     parser = argparse.ArgumentParser(
0247         description="Remote SSE Receiver: Connect to workflow monitor SSE stream",
0248         formatter_class=argparse.RawDescriptionHelpFormatter,
0249         epilog="""
0250 Examples:
0251   python remote_sse_receiver.py                           # Receive all messages
0252   python remote_sse_receiver.py --message stf_gen         # Only STF generation messages
0253   python remote_sse_receiver.py --agent daq-simulator     # Only messages from daq-simulator
0254   python remote_sse_receiver.py --message stf_gen,stf_ready --agent daq-simulator
0255 
0256 Environment variables:
0257   SWF_SSE_RECEIVER_NAME - Required: descriptive agent name
0258   SWF_API_TOKEN         - Required: monitor API token
0259   SWF_MONITOR_PROD_URL  - Optional: override monitor URL
0260         """)
0261     
0262     parser.add_argument('--message', '--msg-type', dest='msg_types',
0263                         help='Filter by message type(s), comma-separated (e.g., stf_gen,stf_ready)')
0264     parser.add_argument('--agent', dest='agents', 
0265                         help='Filter by agent name(s), comma-separated (e.g., daq-simulator,data-agent)')
0266     
0267     args = parser.parse_args()
0268     
0269     # Parse comma-separated values
0270     msg_types = None
0271     if args.msg_types:
0272         msg_types = [t.strip() for t in args.msg_types.split(',')]
0273     
0274     agents = None
0275     if args.agents:
0276         agents = [a.strip() for a in args.agents.split(',')]
0277     
0278     try:
0279         receiver = RemoteSSEReceiver(msg_types=msg_types, agents=agents)
0280         if msg_types or agents:
0281             filters = []
0282             if msg_types:
0283                 filters.append(f"messages: {', '.join(msg_types)}")
0284             if agents:
0285                 filters.append(f"agents: {', '.join(agents)}")
0286             print(f"šŸ” Filtering enabled - {' | '.join(filters)}")
0287         else:
0288             print("šŸ“¬ Receiving all messages (no filtering)")
0289         
0290         receiver.connect_and_receive()
0291     except KeyboardInterrupt:
0292         print("\nšŸ“” Received interrupt signal - exiting...")
0293         sys.exit(0)
0294     except Exception as e:
0295         print(f"āŒ Error: {e}")
0296         sys.exit(1)
0297 
0298 
0299 if __name__ == "__main__":
0300     main()