File indexing completed on 2026-04-25 08:29:12
0001
0002 """
0003 Remote SSE Sender: Sends test messages through ActiveMQ that will be relayed to SSE clients.
0004
0005 This script demonstrates sending workflow messages that the monitor will broadcast
0006 to connected SSE clients. It inherits from BaseAgent to use standard logging and
0007 ActiveMQ connectivity.
0008 """
0009
0010 from swf_common_lib.base_agent import BaseAgent
0011 import os
0012 import json
0013 import time
0014 import sys
0015
0016 class RemoteSSESender(BaseAgent):
0017 """
0018 Example agent that sends test messages to demonstrate SSE functionality.
0019 Sends a few test messages and exits.
0020 """
0021
0022 def __init__(self):
0023
0024 prod_base = os.getenv('SWF_MONITOR_PROD_URL', 'https://pandaserver02.sdcc.bnl.gov/swf-monitor').rstrip('/')
0025
0026 os.environ['SWF_MONITOR_URL'] = prod_base
0027 os.environ['SWF_MONITOR_HTTP_URL'] = prod_base
0028
0029 super().__init__(agent_type='sse_sender', subscription_queue='/topic/epictopic')
0030 self.logger.info(f"Monitor base set to: {prod_base}")
0031 self.messages_to_send = [
0032 {
0033 'msg_type': 'sse_test',
0034 'processed_by': self.agent_name,
0035 'run_id': 'test-run-001',
0036 'message': 'Hello from SSE sender!',
0037 'data': 'This is a test message for SSE demonstration'
0038 },
0039 {
0040 'msg_type': 'stf_ready',
0041 'processed_by': self.agent_name,
0042 'run_id': 'test-run-001',
0043 'filename': 'test_file_001.dat',
0044 'message': 'Simulated data ready event'
0045 },
0046 {
0047 'msg_type': 'processing_complete',
0048 'processed_by': self.agent_name,
0049 'run_id': 'test-run-001',
0050 'filename': 'test_file_001.dat',
0051 'message': 'Simulated processing complete event'
0052 }
0053 ]
0054
0055 def run_sender(self):
0056 """Send test messages; loop by default, one-shot if enabled via env."""
0057
0058 oneshot = os.getenv('SWF_SENDER_ONESHOT', '0').lower() in ('1', 'true', 'yes', 'on')
0059 batch_interval = int(os.getenv('SWF_SENDER_BATCH_INTERVAL', '30'))
0060 mode = 'one-shot' if oneshot else f'loop every {batch_interval}s'
0061 self.logger.info(f"Starting Remote SSE Sender ({mode})")
0062
0063 try:
0064 while True:
0065
0066 if not self.conn or not self.conn.is_connected():
0067 try:
0068 self.logger.info("Connecting to ActiveMQ ...")
0069 self.conn.connect(
0070 self.mq_user,
0071 self.mq_password,
0072 wait=True,
0073 version='1.1',
0074 headers={
0075 'client-id': self.agent_name,
0076 'heart-beat': '10000,30000'
0077 }
0078 )
0079 self.logger.info("Connected to ActiveMQ")
0080 self.mq_connected = True
0081
0082
0083 self.send_heartbeat()
0084 except Exception as e:
0085 self.logger.error(f"Failed to connect to ActiveMQ: {e}")
0086 self.mq_connected = False
0087 time.sleep(2)
0088 continue
0089
0090
0091 self.logger.info(f"Sending batch of {len(self.messages_to_send)} messages")
0092 for i, message in enumerate(self.messages_to_send, 1):
0093 try:
0094 self.logger.debug(f"Sending message {i}/{len(self.messages_to_send)}: {message['msg_type']}")
0095 self.send_message('/topic/epictopic', message)
0096 self.logger.debug(
0097 f"Sent message: {message['msg_type']} run={message.get('run_id','N/A')}"
0098 )
0099 except Exception as e:
0100 self.logger.error(f"Failed to send message {i}: {e}")
0101
0102
0103 break
0104
0105
0106 if i < len(self.messages_to_send):
0107 time.sleep(1)
0108
0109
0110 if oneshot:
0111 self.logger.info("Completed one-shot batch; exiting.")
0112 break
0113
0114 self.logger.info(f"Batch sent. Waiting {batch_interval} seconds.")
0115 time.sleep(batch_interval)
0116 finally:
0117
0118 if self.conn and self.conn.is_connected():
0119 try:
0120 self.conn.disconnect()
0121 self.logger.info("Disconnected from ActiveMQ")
0122 except Exception as e:
0123 self.logger.error(f"Error during disconnect: {e}")
0124
0125 def main():
0126 """Main entry point."""
0127 try:
0128 sender = RemoteSSESender()
0129 sender.run_sender()
0130
0131 except KeyboardInterrupt:
0132 print("\nReceived interrupt signal - exiting...")
0133 sys.exit(0)
0134 except Exception as e:
0135 print(f"Error: {e}")
0136 sys.exit(1)
0137
0138 if __name__ == "__main__":
0139 main()