Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 """
0002 Example Processing Agent: Handles data ready messages.
0003 """
0004 
0005 from swf_common_lib.base_agent import BaseAgent
0006 import json
0007 import time
0008 from datetime import datetime
0009 
0010 class ProcessingAgent(BaseAgent):
0011     """
0012     An example agent that simulates the role of the Processing Agent.
0013     It listens for 'stf_ready' messages.
0014     """
0015 
0016     def __init__(self, debug=False, config_path=None):
0017         super().__init__(agent_type='STF_Processing', subscription_queue='/queue/processing_agent', debug=debug,
0018                          config_path=config_path)
0019         self.active_processing = {}  # Track files being processed
0020         self.processing_stats = {'total_processed': 0, 'failed_count': 0}
0021 
0022     def on_message(self, frame):
0023         """
0024         Handles incoming workflow messages (stf_ready, run_imminent, start_run, end_run).
0025         """
0026         # Use base class helper for consistent logging
0027         message_data, msg_type = self.log_received_message(frame)
0028         if message_data is None:
0029             return
0030 
0031         # Extract workflow context from message
0032         if 'execution_id' in message_data:
0033             self.current_execution_id = message_data['execution_id']
0034         if 'run_id' in message_data:
0035             self.current_run_id = message_data['run_id']
0036 
0037         # Update heartbeat on message activity
0038         self.send_processing_agent_heartbeat()
0039         try:
0040             if msg_type == 'stf_ready':
0041                 self.handle_stf_ready(message_data)
0042             elif msg_type == 'run_imminent':
0043                 self.handle_run_imminent(message_data)
0044             elif msg_type == 'start_run':
0045                 self.handle_start_run(message_data)
0046             elif msg_type == 'end_run':
0047                 self.handle_end_run(message_data)
0048         except Exception as e:
0049             self.logger.error(
0050                 f"CRITICAL: Message processing failed - {str(e)}",
0051                 extra=self._log_extra(error=str(e))
0052             )
0053             import traceback
0054             self.logger.error(f"Traceback: {traceback.format_exc()}")
0055             raise RuntimeError(f"Critical message processing failure: {e}") from e
0056     
0057     # Processing agent specific monitor integration methods
0058     def update_file_processing_status(self, filename, status, monitor_file_id=None):
0059         """Update STF file processing status in monitor."""
0060         if not monitor_file_id:
0061             self.logger.warning(f"No monitor file ID provided for {filename}", extra=self._log_extra())
0062             return False
0063 
0064         self.logger.info(f"Updating file {filename} processing status to {status}...", extra=self._log_extra())
0065 
0066         update_data = {
0067             'status': status,
0068             'metadata': {
0069                 'processed_by': self.agent_name,
0070                 'processing_stage': 'reconstruction',
0071                 'updated_at': datetime.now().isoformat()
0072             }
0073         }
0074 
0075         result = self.call_monitor_api('PATCH', f'/stf-files/{monitor_file_id}/', update_data)
0076         if result:
0077             self.logger.info(f"File {filename} processing status updated to {status}", extra=self._log_extra())
0078             return True
0079         else:
0080             self.logger.warning(f"Failed to update file {filename} processing status", extra=self._log_extra())
0081             return False
0082 
0083     def register_processing_task(self, filename, input_data):
0084         """Register a processing task in the monitor."""
0085         self.logger.info(f"Registering processing task for {filename}...", extra=self._log_extra())
0086 
0087         task_data = {
0088             'agent_name': self.agent_name,
0089             'agent_type': 'processing',
0090             'task_type': 'reconstruction',
0091             'input_filename': filename,
0092             'run_id': input_data.get('run_id'),
0093             'status': 'started',
0094             'started_at': datetime.now().isoformat(),
0095             'task_metadata': {
0096                 'input_size_bytes': input_data.get('size_bytes'),
0097                 'input_checksum': input_data.get('checksum'),
0098                 'processing_algorithm': 'eic_reconstruction_v1.0'
0099             }
0100         }
0101 
0102         result = self.call_monitor_api('POST', '/workflow-stages/', task_data)
0103         if result:
0104             task_id = result.get('stage_id')
0105             self.active_processing[filename] = {
0106                 'task_id': task_id,
0107                 'started_at': datetime.now(),
0108                 'input_data': input_data
0109             }
0110             self.logger.info(f"Processing task registered for {filename} with ID {task_id}", extra=self._log_extra())
0111             return task_id
0112         else:
0113             self.logger.warning(f"Failed to register processing task for {filename}", extra=self._log_extra())
0114             return None
0115 
0116     def complete_processing_task(self, filename, output_files):
0117         """Mark processing task as completed in monitor."""
0118         if filename not in self.active_processing:
0119             self.logger.warning(f"No active processing task found for {filename}", extra=self._log_extra())
0120             return False
0121 
0122         task_info = self.active_processing[filename]
0123         task_id = task_info['task_id']
0124 
0125         processing_time = (datetime.now() - task_info['started_at']).total_seconds()
0126 
0127         self.logger.info(f"Completing processing task for {filename}...", extra=self._log_extra())
0128 
0129         completion_data = {
0130             'status': 'completed',
0131             'completed_at': datetime.now().isoformat(),
0132             'processing_time_seconds': processing_time,
0133             'output_metadata': {
0134                 'output_files': output_files,
0135                 'success': True,
0136                 'algorithm_version': 'eic_reconstruction_v1.0'
0137             }
0138         }
0139 
0140         result = self.call_monitor_api('PATCH', f'/workflow-stages/{task_id}/', completion_data)
0141         if result:
0142             self.processing_stats['total_processed'] += 1
0143             del self.active_processing[filename]
0144             self.logger.info(f"Processing task completed for {filename}", extra=self._log_extra())
0145             return True
0146         else:
0147             self.logger.warning(f"Failed to complete processing task for {filename}", extra=self._log_extra())
0148             return False
0149     
0150     def send_processing_agent_heartbeat(self):
0151         """Send enhanced heartbeat with processing agent context."""
0152         workflow_metadata = {
0153             'active_tasks': len(self.active_processing),
0154             'completed_tasks': self.processing_stats['total_processed'],
0155             'failed_tasks': self.processing_stats['failed_count']
0156         }
0157         
0158         return self.send_enhanced_heartbeat(workflow_metadata)
0159 
0160     def handle_run_imminent(self, message_data):
0161         """Handle run_imminent message - prepare for processing tasks"""
0162         run_id = message_data.get('run_id')
0163         self.logger.info(
0164             "Processing run_imminent message",
0165             extra=self._log_extra(simulation_tick=message_data.get('simulation_tick'))
0166         )
0167 
0168         # Report agent status for run preparation
0169         self.report_agent_status('OK', f'Preparing for run {run_id}')
0170 
0171         # TODO: Initialize processing resources for this run
0172 
0173         # Simulate preparation
0174         self.logger.info("Prepared processing resources for run", extra=self._log_extra())
0175 
0176     def handle_start_run(self, message_data):
0177         """Handle start_run message - run is starting physics"""
0178         self.logger.info(
0179             "Processing start_run message",
0180             extra=self._log_extra(simulation_tick=message_data.get('simulation_tick'))
0181         )
0182 
0183         # Agent is now actively processing this run
0184         self.set_processing()
0185 
0186         # Send enhanced heartbeat with run context
0187         self.send_processing_agent_heartbeat()
0188 
0189         # TODO: Start monitoring for stf_ready messages
0190         self.logger.info("Ready to process data for run", extra=self._log_extra())
0191 
0192     def handle_end_run(self, message_data):
0193         """Handle end_run message - run has ended"""
0194         run_id = message_data.get('run_id')
0195         total_files = message_data.get('total_files', 0)
0196         self.logger.info(
0197             "Processing end_run message",
0198             extra=self._log_extra(total_files=total_files, simulation_tick=message_data.get('simulation_tick'))
0199         )
0200 
0201         # Report final statistics via heartbeat
0202         self.send_processing_agent_heartbeat()
0203 
0204         # Report completion status
0205         active_tasks = len(self.active_processing)
0206         if active_tasks > 0:
0207             self.report_agent_status('WARNING', f'Run {run_id} ended with {active_tasks} tasks still processing')
0208         else:
0209             self.report_agent_status('OK', f'Run {run_id} processing completed successfully')
0210 
0211         # TODO: Finalize processing tasks for this run
0212 
0213         self.logger.info("Processing complete for run", extra=self._log_extra(total_files=total_files))
0214 
0215         # Agent is now idle, waiting for next run
0216         self.set_ready()
0217         self.current_execution_id = None  # Clear context for next run
0218         self.current_run_id = None
0219         self.logger.info("Waiting for next run...")
0220 
0221     def handle_stf_ready(self, message_data):
0222         """Handle stf_ready message - process STF file"""
0223         filename = message_data.get('filename')
0224         file_url = message_data.get('file_url')
0225         checksum = message_data.get('checksum')
0226         size_bytes = message_data.get('size_bytes')
0227         processed_by = message_data.get('processed_by')
0228 
0229         self.logger.info(
0230             "Processing STF data",
0231             extra=self._log_extra(
0232                 stf_filename=filename, size_bytes=size_bytes,
0233                 processed_by=processed_by, simulation_tick=message_data.get('simulation_tick')
0234             )
0235         )
0236         
0237         # Simulate processing time 
0238         import time
0239         time.sleep(0.5)  # Simulate compute-intensive processing
0240         
0241         # Define output files
0242         output_files = [
0243             f"{filename.replace('.dat', '.dst')}",
0244             f"{filename.replace('.dat', '.hist.root')}"
0245         ]
0246         
0247         # Update processing stats
0248         self.processing_stats['total_processed'] += 1
0249         
0250         # Send processing_complete message
0251         # namespace is also auto-injected by BaseAgent.send_message()
0252         processing_complete_message = {
0253             "msg_type": "processing_complete",
0254             "namespace": self.namespace,
0255             "filename": filename,
0256             "run_id": self.current_run_id,
0257             "input_file_url": file_url,
0258             "input_checksum": checksum,
0259             "input_size_bytes": size_bytes,
0260             "output_files": output_files,
0261             "processing_time_ms": 500,
0262             "simulation_tick": message_data.get('simulation_tick'),
0263             "processed_by": self.agent_name
0264         }
0265 
0266         # Register processing results with monitor (legacy method - keep for compatibility)
0267         self.register_processing_results(processing_complete_message)
0268 
0269         # Send to monitoring/analysis agents
0270         self.send_message('/queue/monitoring_agent', processing_complete_message)
0271         self.logger.info(
0272             "Sent processing_complete message",
0273             extra=self._log_extra(stf_filename=filename, destination="monitoring_agent")
0274         )
0275 
0276 
0277     
0278     def register_processing_results(self, processing_data):
0279         """Register processing results with monitor API (legacy method for compatibility)"""
0280         filename = processing_data.get('filename')
0281 
0282         try:
0283             # This is now handled by the new processing agent specific methods above
0284             # Keeping this method for backward compatibility and logging
0285             self.logger.info(
0286                 "Processing results registered via new processing agent specific methods",
0287                 extra=self._log_extra(
0288                     stf_filename=filename,
0289                     output_files=len(processing_data.get('output_files', []))
0290                 )
0291             )
0292 
0293         except Exception as e:
0294             self.logger.error(
0295                 "Error in legacy processing results registration",
0296                 extra=self._log_extra(stf_filename=filename, error=str(e))
0297             )
0298 
0299 
0300 if __name__ == "__main__":
0301     import argparse
0302     from pathlib import Path
0303 
0304     script_dir = Path(__file__).parent
0305 
0306     parser = argparse.ArgumentParser(description="Processing Agent - handles workflow data processing")
0307     parser.add_argument("--debug", action="store_true", help="Enable debug logging")
0308     parser.add_argument("--testbed-config", default=None,
0309                         help="Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)")
0310     args = parser.parse_args()
0311 
0312     agent = ProcessingAgent(debug=args.debug, config_path=args.testbed_config)
0313     agent.run()