Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:12

0001 #!/usr/bin/env python3
0002 """
0003 Remote SSE Sender: Sends test messages through ActiveMQ that will be relayed to SSE clients.
0004 
0005 This script demonstrates sending workflow messages that the monitor will broadcast
0006 to connected SSE clients. It inherits from BaseAgent to use standard logging and
0007 ActiveMQ connectivity.
0008 """
0009 
0010 from swf_common_lib.base_agent import BaseAgent
0011 import os
0012 import json
0013 import time
0014 import sys
0015 
0016 class RemoteSSESender(BaseAgent):
0017     """
0018     Example agent that sends test messages to demonstrate SSE functionality.
0019     Sends a few test messages and exits.
0020     """
0021 
0022     def __init__(self):
0023         # Force production monitor for this example agent
0024         prod_base = os.getenv('SWF_MONITOR_PROD_URL', 'https://pandaserver02.sdcc.bnl.gov/swf-monitor').rstrip('/')
0025         # Override any localhost defaults to avoid misdirected heartbeats/logging
0026         os.environ['SWF_MONITOR_URL'] = prod_base
0027         os.environ['SWF_MONITOR_HTTP_URL'] = prod_base
0028 
0029         super().__init__(agent_type='sse_sender', subscription_queue='/topic/epictopic')
0030         self.logger.info(f"Monitor base set to: {prod_base}")
0031         self.messages_to_send = [
0032             {
0033                 'msg_type': 'sse_test',
0034                 'processed_by': self.agent_name,
0035                 'run_id': 'test-run-001',
0036                 'message': 'Hello from SSE sender!',
0037                 'data': 'This is a test message for SSE demonstration'
0038             },
0039             {
0040                 'msg_type': 'stf_ready',
0041                 'processed_by': self.agent_name, 
0042                 'run_id': 'test-run-001',
0043                 'filename': 'test_file_001.dat',
0044                 'message': 'Simulated data ready event'
0045             },
0046             {
0047                 'msg_type': 'processing_complete',
0048                 'processed_by': self.agent_name,
0049                 'run_id': 'test-run-001', 
0050                 'filename': 'test_file_001.dat',
0051                 'message': 'Simulated processing complete event'
0052             }
0053         ]
0054 
0055     def run_sender(self):
0056         """Send test messages; loop by default, one-shot if enabled via env."""
0057         # Loop by default. Set SWF_SENDER_ONESHOT=1 (or true/yes/on) to send a single batch.
0058         oneshot = os.getenv('SWF_SENDER_ONESHOT', '0').lower() in ('1', 'true', 'yes', 'on')
0059         batch_interval = int(os.getenv('SWF_SENDER_BATCH_INTERVAL', '30'))
0060         mode = 'one-shot' if oneshot else f'loop every {batch_interval}s'
0061         self.logger.info(f"Starting Remote SSE Sender ({mode})")
0062 
0063         try:
0064             while True:
0065                 # Ensure connection (fixed 2s delay on failure)
0066                 if not self.conn or not self.conn.is_connected():
0067                     try:
0068                         self.logger.info("Connecting to ActiveMQ ...")
0069                         self.conn.connect(
0070                             self.mq_user,
0071                             self.mq_password,
0072                             wait=True,
0073                             version='1.1',
0074                             headers={
0075                                 'client-id': self.agent_name,
0076                                 'heart-beat': '10000,30000'
0077                             }
0078                         )
0079                         self.logger.info("Connected to ActiveMQ")
0080                         self.mq_connected = True
0081                         
0082                         # Register agent with monitor via heartbeat
0083                         self.send_heartbeat()
0084                     except Exception as e:
0085                         self.logger.error(f"Failed to connect to ActiveMQ: {e}")
0086                         self.mq_connected = False
0087                         time.sleep(2)
0088                         continue
0089 
0090                 # Send one batch
0091                 self.logger.info(f"Sending batch of {len(self.messages_to_send)} messages")
0092                 for i, message in enumerate(self.messages_to_send, 1):
0093                     try:
0094                         self.logger.debug(f"Sending message {i}/{len(self.messages_to_send)}: {message['msg_type']}")
0095                         self.send_message('/topic/epictopic', message)
0096                         self.logger.debug(
0097                             f"Sent message: {message['msg_type']} run={message.get('run_id','N/A')}"
0098                         )
0099                     except Exception as e:
0100                         self.logger.error(f"Failed to send message {i}: {e}")
0101                         # If a message fails to send, the connection might be dead.
0102                         # Break the inner loop and let the outer loop try to reconnect.
0103                         break
0104                     
0105                     # Sleep between messages (only if not the last message)
0106                     if i < len(self.messages_to_send):
0107                         time.sleep(1)
0108 
0109                 # Exit in one-shot mode; otherwise sleep and repeat
0110                 if oneshot:
0111                     self.logger.info("Completed one-shot batch; exiting.")
0112                     break
0113 
0114                 self.logger.info(f"Batch sent. Waiting {batch_interval} seconds.")
0115                 time.sleep(batch_interval)
0116         finally:
0117             # Disconnect when the loop is exited (e.g., one-shot mode, Ctrl-C)
0118             if self.conn and self.conn.is_connected():
0119                 try:
0120                     self.conn.disconnect()
0121                     self.logger.info("Disconnected from ActiveMQ")
0122                 except Exception as e:
0123                     self.logger.error(f"Error during disconnect: {e}")
0124 
0125 def main():
0126     """Main entry point."""
0127     try:
0128         sender = RemoteSSESender()
0129         sender.run_sender()
0130         
0131     except KeyboardInterrupt:
0132         print("\nReceived interrupt signal - exiting...")
0133         sys.exit(0)
0134     except Exception as e:
0135         print(f"Error: {e}")
0136         sys.exit(1)
0137 
0138 if __name__ == "__main__":
0139     main()