Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 #!/usr/bin/env python3
0002 """
0003 Fast Monitoring Agent for SWF Fast Monitoring System.
0004 
0005 This agent receives stf_ready messages from the data agent, samples Time Frames (TF) from
0006 Super Time Frames (STF), and records TF metadata in the fast monitoring database.
0007 The TFs are then broadcast via ActiveMQ to fast monitoring clients.
0008 
0009 Designed to run continuously under supervisord.
0010 """
0011 
0012 import sys
0013 import json
0014 from datetime import datetime
0015 
0016 from swf_common_lib.base_agent import BaseAgent, setup_environment
0017 import example_fastmon_utils as fastmon_utils
0018 
0019 
0020 class FastMonitorAgent(BaseAgent):
0021     """
0022     Agent that receives stf_ready messages, samples TFs from STFs and records them in the database.
0023     Then broadcasts the TF notifications via ActiveMQ.
0024     """
0025 
0026     def __init__(self, config: dict, debug=False, config_path=None):
0027         """
0028         Initialize the fast monitoring agent.
0029 
0030         Args:
0031             config: configuration dictionary containing:
0032                 - selection_fraction: Fraction of TFs to select (0.0-1.0)
0033                 - tf_files_per_stf: Number of TF files to generate per STF
0034                 - tf_size_fraction: Fraction of STF size for each TF
0035                 - tf_sequence_start: Starting sequence number for TF files
0036             debug: Enable debug logging for heartbeat messages
0037             config_path: Path to testbed.toml config file
0038         """
0039 
0040         # Initialize base agent with fast monitoring specific parameters
0041         super().__init__(agent_type='fastmon', subscription_queue='/topic/epictopic', debug=debug,
0042                          config_path=config_path)
0043         self.running = True
0044         self.destination = '/topic/epictopic'
0045 
0046         self.logger.info("Fast Monitor Agent initialized successfully")
0047 
0048         self.config = config
0049         
0050         # Validate configuration
0051         fastmon_utils.validate_config(self.config)
0052         self.logger.info(f"Fast Monitor Agent initialized with config: {self.config}")
0053         
0054         # Fast monitoring specific state
0055         self.stf_messages_processed = 0
0056         self.last_message_time = None
0057         self.processing_stats = {'total_stf_messages': 0, 'total_tf_files_created': 0}
0058 
0059     def send_tf_file_notification(self, tf_file: dict, stf_file: dict):
0060         """
0061         Send notification to clients about a newly registered TF file via ActiveMQ.
0062         
0063         Args:
0064             tf_file: TF file data from the FastMonFile API
0065             stf_file: Parent STF file data
0066         """
0067         try:
0068             # Create message using utility function
0069             message = fastmon_utils.create_tf_message(tf_file, stf_file, self.agent_name)
0070 
0071             # Send message via ActiveMQ (monitor will forward to SSE clients)
0072             self.send_message(self.destination, message)
0073             
0074             self.logger.debug(f"Sent TF file notification via ActiveMQ: {tf_file.get('tf_filename')}")
0075             
0076         except Exception as e:
0077             self.logger.error(f"Failed to send TF file notification: {e}")
0078 
0079 
0080 
0081     def on_message(self, frame):
0082         """
0083         Handle incoming stf_ready messages for fast monitoring.
0084         This agent processes STF metadata and creates TF samples.
0085         """
0086         # Use base class helper for consistent logging
0087         message_data, msg_type = self.log_received_message(frame, {'stf_ready'})
0088         if message_data is None:
0089             return
0090 
0091         # Extract workflow context from message
0092         if 'execution_id' in message_data:
0093             self.current_execution_id = message_data['execution_id']
0094         if 'run_id' in message_data:
0095             self.current_run_id = message_data['run_id']
0096 
0097         # Update heartbeat on message activity
0098         self.send_heartbeat()
0099 
0100         try:
0101             # A "stf_ready" call from the data agent
0102             if msg_type == 'stf_ready':
0103                 tf_files = self.sample_timeframes(message_data)
0104             else:
0105                 self.logger.warning(f"Ignoring unknown message type {msg_type}",
0106                                    extra=self._log_extra(msg_type=msg_type))
0107 
0108         except Exception as e:
0109             self.logger.error("Error processing message",
0110                             extra=self._log_extra(error=str(e)))
0111             self.report_agent_status('ERROR', f'Message processing error: {str(e)}')
0112 
0113 
0114     def sample_timeframes(self, message_data):
0115         """
0116         Handle stf_ready message and sample STFs into TFs
0117         Registers the TFs in the swf-monitor database and notifies clients.
0118         """
0119         self.logger.info("Processing stf_ready message", extra=self._log_extra())
0120 
0121         # Update message tracking stats
0122         self.last_message_time = datetime.now()
0123         self.stf_messages_processed += 1
0124         self.processing_stats['total_stf_messages'] += 1
0125 
0126         tf_files_registered = []
0127         self.logger.debug(f"Message data received: {message_data}", extra=self._log_extra())
0128         if not message_data.get('filename'):
0129             self.logger.error("No filename provided in message", extra=self._log_extra())
0130             return tf_files_registered
0131 
0132         tf_subsamples = fastmon_utils.simulate_tf_subsamples(message_data, self.config, self.logger, self.agent_name)
0133 
0134         # Record each TF file in the FastMonFile table
0135         # TODO: register in bulk
0136         tf_files_created = 0
0137         for tf_metadata in tf_subsamples:
0138             self.logger.debug(f"Processing {tf_metadata}")
0139             tf_file = fastmon_utils.record_tf_file(tf_metadata, self.config, self, self.logger)
0140             if tf_file:
0141                 tf_files_created += 1
0142                 # Broadcast tf_file_registered to downstream consumers
0143                 self.send_tf_file_notification(tf_file, message_data)
0144             tf_files_registered.append(tf_file)
0145 
0146         # Update TF creation stats
0147         self.processing_stats['total_tf_files_created'] += tf_files_created
0148 
0149         self.logger.info(f"Registered {tf_files_created} TF subsamples for STF file {message_data.get('filename')}",
0150                         extra=self._log_extra(stf_filename=message_data.get('filename'), tf_files_created=tf_files_created))
0151         return tf_files_registered
0152 
0153 
0154 
0155     
0156 
0157 
0158 
0159 def main():
0160     """Main entry point for the agent."""
0161     import argparse
0162     from pathlib import Path
0163 
0164     script_dir = Path(__file__).parent
0165 
0166     parser = argparse.ArgumentParser(description='Fast Monitor Agent')
0167     parser.add_argument('--debug', action='store_true', help='Enable debug logging for heartbeat messages')
0168     parser.add_argument('--testbed-config', default=None,
0169                         help='Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)')
0170     args = parser.parse_args()
0171 
0172     # Configuration for message-driven agent
0173     config = {
0174         "selection_fraction": 0.1,  # 10% of files
0175         # TF simulation parameters
0176         "tf_files_per_stf": 7,  # Number of TF files to generate per STF
0177         "tf_size_fraction": 0.15,  # Fraction of STF size for each TF
0178         "tf_sequence_start": 1,  # Starting sequence number for TF files
0179     }
0180 
0181     # Create agent with config and debug flag
0182     agent = FastMonitorAgent(config, debug=args.debug, config_path=args.testbed_config)
0183 
0184     # Run in message-driven mode (reacts to stf_ready messages)
0185     agent.run()
0186 
0187 
0188 if __name__ == "__main__":
0189     # Setup environment first
0190     if not setup_environment():
0191         sys.exit(1)
0192 
0193     main()