File indexing completed on 2026-04-27 07:41:45
0001
0002 """
0003 Fast Monitoring Agent for SWF Fast Monitoring System.
0004
0005 This agent receives stf_ready messages from the data agent, samples Time Frames (TF) from
0006 Super Time Frames (STF), and records TF metadata in the fast monitoring database.
0007 The TFs are then broadcast via ActiveMQ to fast monitoring clients.
0008
0009 Designed to run continuously under supervisord.
0010 """
0011
0012 import sys
0013 import json
0014 from datetime import datetime
0015
0016 from swf_common_lib.base_agent import BaseAgent, setup_environment
0017 import example_fastmon_utils as fastmon_utils
0018
0019
0020 class FastMonitorAgent(BaseAgent):
0021 """
0022 Agent that receives stf_ready messages, samples TFs from STFs and records them in the database.
0023 Then broadcasts the TF notifications via ActiveMQ.
0024 """
0025
0026 def __init__(self, config: dict, debug=False, config_path=None):
0027 """
0028 Initialize the fast monitoring agent.
0029
0030 Args:
0031 config: configuration dictionary containing:
0032 - selection_fraction: Fraction of TFs to select (0.0-1.0)
0033 - tf_files_per_stf: Number of TF files to generate per STF
0034 - tf_size_fraction: Fraction of STF size for each TF
0035 - tf_sequence_start: Starting sequence number for TF files
0036 debug: Enable debug logging for heartbeat messages
0037 config_path: Path to testbed.toml config file
0038 """
0039
0040
0041 super().__init__(agent_type='fastmon', subscription_queue='/topic/epictopic', debug=debug,
0042 config_path=config_path)
0043 self.running = True
0044 self.destination = '/topic/epictopic'
0045
0046 self.logger.info("Fast Monitor Agent initialized successfully")
0047
0048 self.config = config
0049
0050
0051 fastmon_utils.validate_config(self.config)
0052 self.logger.info(f"Fast Monitor Agent initialized with config: {self.config}")
0053
0054
0055 self.stf_messages_processed = 0
0056 self.last_message_time = None
0057 self.processing_stats = {'total_stf_messages': 0, 'total_tf_files_created': 0}
0058
0059 def send_tf_file_notification(self, tf_file: dict, stf_file: dict):
0060 """
0061 Send notification to clients about a newly registered TF file via ActiveMQ.
0062
0063 Args:
0064 tf_file: TF file data from the FastMonFile API
0065 stf_file: Parent STF file data
0066 """
0067 try:
0068
0069 message = fastmon_utils.create_tf_message(tf_file, stf_file, self.agent_name)
0070
0071
0072 self.send_message(self.destination, message)
0073
0074 self.logger.debug(f"Sent TF file notification via ActiveMQ: {tf_file.get('tf_filename')}")
0075
0076 except Exception as e:
0077 self.logger.error(f"Failed to send TF file notification: {e}")
0078
0079
0080
0081 def on_message(self, frame):
0082 """
0083 Handle incoming stf_ready messages for fast monitoring.
0084 This agent processes STF metadata and creates TF samples.
0085 """
0086
0087 message_data, msg_type = self.log_received_message(frame, {'stf_ready'})
0088 if message_data is None:
0089 return
0090
0091
0092 if 'execution_id' in message_data:
0093 self.current_execution_id = message_data['execution_id']
0094 if 'run_id' in message_data:
0095 self.current_run_id = message_data['run_id']
0096
0097
0098 self.send_heartbeat()
0099
0100 try:
0101
0102 if msg_type == 'stf_ready':
0103 tf_files = self.sample_timeframes(message_data)
0104 else:
0105 self.logger.warning(f"Ignoring unknown message type {msg_type}",
0106 extra=self._log_extra(msg_type=msg_type))
0107
0108 except Exception as e:
0109 self.logger.error("Error processing message",
0110 extra=self._log_extra(error=str(e)))
0111 self.report_agent_status('ERROR', f'Message processing error: {str(e)}')
0112
0113
0114 def sample_timeframes(self, message_data):
0115 """
0116 Handle stf_ready message and sample STFs into TFs
0117 Registers the TFs in the swf-monitor database and notifies clients.
0118 """
0119 self.logger.info("Processing stf_ready message", extra=self._log_extra())
0120
0121
0122 self.last_message_time = datetime.now()
0123 self.stf_messages_processed += 1
0124 self.processing_stats['total_stf_messages'] += 1
0125
0126 tf_files_registered = []
0127 self.logger.debug(f"Message data received: {message_data}", extra=self._log_extra())
0128 if not message_data.get('filename'):
0129 self.logger.error("No filename provided in message", extra=self._log_extra())
0130 return tf_files_registered
0131
0132 tf_subsamples = fastmon_utils.simulate_tf_subsamples(message_data, self.config, self.logger, self.agent_name)
0133
0134
0135
0136 tf_files_created = 0
0137 for tf_metadata in tf_subsamples:
0138 self.logger.debug(f"Processing {tf_metadata}")
0139 tf_file = fastmon_utils.record_tf_file(tf_metadata, self.config, self, self.logger)
0140 if tf_file:
0141 tf_files_created += 1
0142
0143 self.send_tf_file_notification(tf_file, message_data)
0144 tf_files_registered.append(tf_file)
0145
0146
0147 self.processing_stats['total_tf_files_created'] += tf_files_created
0148
0149 self.logger.info(f"Registered {tf_files_created} TF subsamples for STF file {message_data.get('filename')}",
0150 extra=self._log_extra(stf_filename=message_data.get('filename'), tf_files_created=tf_files_created))
0151 return tf_files_registered
0152
0153
0154
0155
0156
0157
0158
0159 def main():
0160 """Main entry point for the agent."""
0161 import argparse
0162 from pathlib import Path
0163
0164 script_dir = Path(__file__).parent
0165
0166 parser = argparse.ArgumentParser(description='Fast Monitor Agent')
0167 parser.add_argument('--debug', action='store_true', help='Enable debug logging for heartbeat messages')
0168 parser.add_argument('--testbed-config', default=None,
0169 help='Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)')
0170 args = parser.parse_args()
0171
0172
0173 config = {
0174 "selection_fraction": 0.1,
0175
0176 "tf_files_per_stf": 7,
0177 "tf_size_fraction": 0.15,
0178 "tf_sequence_start": 1,
0179 }
0180
0181
0182 agent = FastMonitorAgent(config, debug=args.debug, config_path=args.testbed_config)
0183
0184
0185 agent.run()
0186
0187
0188 if __name__ == "__main__":
0189
0190 if not setup_environment():
0191 sys.exit(1)
0192
0193 main()