File indexing completed on 2026-04-27 07:41:45
0001 """
0002 Example Processing Agent: Handles data ready messages.
0003 """
0004
0005 from swf_common_lib.base_agent import BaseAgent
0006 import json
0007 import time
0008 from datetime import datetime
0009
0010 class ProcessingAgent(BaseAgent):
0011 """
0012 An example agent that simulates the role of the Processing Agent.
0013 It listens for 'stf_ready' messages.
0014 """
0015
0016 def __init__(self, debug=False, config_path=None):
0017 super().__init__(agent_type='STF_Processing', subscription_queue='/queue/processing_agent', debug=debug,
0018 config_path=config_path)
0019 self.active_processing = {}
0020 self.processing_stats = {'total_processed': 0, 'failed_count': 0}
0021
0022 def on_message(self, frame):
0023 """
0024 Handles incoming workflow messages (stf_ready, run_imminent, start_run, end_run).
0025 """
0026
0027 message_data, msg_type = self.log_received_message(frame)
0028 if message_data is None:
0029 return
0030
0031
0032 if 'execution_id' in message_data:
0033 self.current_execution_id = message_data['execution_id']
0034 if 'run_id' in message_data:
0035 self.current_run_id = message_data['run_id']
0036
0037
0038 self.send_processing_agent_heartbeat()
0039 try:
0040 if msg_type == 'stf_ready':
0041 self.handle_stf_ready(message_data)
0042 elif msg_type == 'run_imminent':
0043 self.handle_run_imminent(message_data)
0044 elif msg_type == 'start_run':
0045 self.handle_start_run(message_data)
0046 elif msg_type == 'end_run':
0047 self.handle_end_run(message_data)
0048 except Exception as e:
0049 self.logger.error(
0050 f"CRITICAL: Message processing failed - {str(e)}",
0051 extra=self._log_extra(error=str(e))
0052 )
0053 import traceback
0054 self.logger.error(f"Traceback: {traceback.format_exc()}")
0055 raise RuntimeError(f"Critical message processing failure: {e}") from e
0056
0057
0058 def update_file_processing_status(self, filename, status, monitor_file_id=None):
0059 """Update STF file processing status in monitor."""
0060 if not monitor_file_id:
0061 self.logger.warning(f"No monitor file ID provided for {filename}", extra=self._log_extra())
0062 return False
0063
0064 self.logger.info(f"Updating file {filename} processing status to {status}...", extra=self._log_extra())
0065
0066 update_data = {
0067 'status': status,
0068 'metadata': {
0069 'processed_by': self.agent_name,
0070 'processing_stage': 'reconstruction',
0071 'updated_at': datetime.now().isoformat()
0072 }
0073 }
0074
0075 result = self.call_monitor_api('PATCH', f'/stf-files/{monitor_file_id}/', update_data)
0076 if result:
0077 self.logger.info(f"File {filename} processing status updated to {status}", extra=self._log_extra())
0078 return True
0079 else:
0080 self.logger.warning(f"Failed to update file {filename} processing status", extra=self._log_extra())
0081 return False
0082
0083 def register_processing_task(self, filename, input_data):
0084 """Register a processing task in the monitor."""
0085 self.logger.info(f"Registering processing task for {filename}...", extra=self._log_extra())
0086
0087 task_data = {
0088 'agent_name': self.agent_name,
0089 'agent_type': 'processing',
0090 'task_type': 'reconstruction',
0091 'input_filename': filename,
0092 'run_id': input_data.get('run_id'),
0093 'status': 'started',
0094 'started_at': datetime.now().isoformat(),
0095 'task_metadata': {
0096 'input_size_bytes': input_data.get('size_bytes'),
0097 'input_checksum': input_data.get('checksum'),
0098 'processing_algorithm': 'eic_reconstruction_v1.0'
0099 }
0100 }
0101
0102 result = self.call_monitor_api('POST', '/workflow-stages/', task_data)
0103 if result:
0104 task_id = result.get('stage_id')
0105 self.active_processing[filename] = {
0106 'task_id': task_id,
0107 'started_at': datetime.now(),
0108 'input_data': input_data
0109 }
0110 self.logger.info(f"Processing task registered for {filename} with ID {task_id}", extra=self._log_extra())
0111 return task_id
0112 else:
0113 self.logger.warning(f"Failed to register processing task for {filename}", extra=self._log_extra())
0114 return None
0115
0116 def complete_processing_task(self, filename, output_files):
0117 """Mark processing task as completed in monitor."""
0118 if filename not in self.active_processing:
0119 self.logger.warning(f"No active processing task found for {filename}", extra=self._log_extra())
0120 return False
0121
0122 task_info = self.active_processing[filename]
0123 task_id = task_info['task_id']
0124
0125 processing_time = (datetime.now() - task_info['started_at']).total_seconds()
0126
0127 self.logger.info(f"Completing processing task for {filename}...", extra=self._log_extra())
0128
0129 completion_data = {
0130 'status': 'completed',
0131 'completed_at': datetime.now().isoformat(),
0132 'processing_time_seconds': processing_time,
0133 'output_metadata': {
0134 'output_files': output_files,
0135 'success': True,
0136 'algorithm_version': 'eic_reconstruction_v1.0'
0137 }
0138 }
0139
0140 result = self.call_monitor_api('PATCH', f'/workflow-stages/{task_id}/', completion_data)
0141 if result:
0142 self.processing_stats['total_processed'] += 1
0143 del self.active_processing[filename]
0144 self.logger.info(f"Processing task completed for {filename}", extra=self._log_extra())
0145 return True
0146 else:
0147 self.logger.warning(f"Failed to complete processing task for {filename}", extra=self._log_extra())
0148 return False
0149
0150 def send_processing_agent_heartbeat(self):
0151 """Send enhanced heartbeat with processing agent context."""
0152 workflow_metadata = {
0153 'active_tasks': len(self.active_processing),
0154 'completed_tasks': self.processing_stats['total_processed'],
0155 'failed_tasks': self.processing_stats['failed_count']
0156 }
0157
0158 return self.send_enhanced_heartbeat(workflow_metadata)
0159
0160 def handle_run_imminent(self, message_data):
0161 """Handle run_imminent message - prepare for processing tasks"""
0162 run_id = message_data.get('run_id')
0163 self.logger.info(
0164 "Processing run_imminent message",
0165 extra=self._log_extra(simulation_tick=message_data.get('simulation_tick'))
0166 )
0167
0168
0169 self.report_agent_status('OK', f'Preparing for run {run_id}')
0170
0171
0172
0173
0174 self.logger.info("Prepared processing resources for run", extra=self._log_extra())
0175
0176 def handle_start_run(self, message_data):
0177 """Handle start_run message - run is starting physics"""
0178 self.logger.info(
0179 "Processing start_run message",
0180 extra=self._log_extra(simulation_tick=message_data.get('simulation_tick'))
0181 )
0182
0183
0184 self.set_processing()
0185
0186
0187 self.send_processing_agent_heartbeat()
0188
0189
0190 self.logger.info("Ready to process data for run", extra=self._log_extra())
0191
0192 def handle_end_run(self, message_data):
0193 """Handle end_run message - run has ended"""
0194 run_id = message_data.get('run_id')
0195 total_files = message_data.get('total_files', 0)
0196 self.logger.info(
0197 "Processing end_run message",
0198 extra=self._log_extra(total_files=total_files, simulation_tick=message_data.get('simulation_tick'))
0199 )
0200
0201
0202 self.send_processing_agent_heartbeat()
0203
0204
0205 active_tasks = len(self.active_processing)
0206 if active_tasks > 0:
0207 self.report_agent_status('WARNING', f'Run {run_id} ended with {active_tasks} tasks still processing')
0208 else:
0209 self.report_agent_status('OK', f'Run {run_id} processing completed successfully')
0210
0211
0212
0213 self.logger.info("Processing complete for run", extra=self._log_extra(total_files=total_files))
0214
0215
0216 self.set_ready()
0217 self.current_execution_id = None
0218 self.current_run_id = None
0219 self.logger.info("Waiting for next run...")
0220
0221 def handle_stf_ready(self, message_data):
0222 """Handle stf_ready message - process STF file"""
0223 filename = message_data.get('filename')
0224 file_url = message_data.get('file_url')
0225 checksum = message_data.get('checksum')
0226 size_bytes = message_data.get('size_bytes')
0227 processed_by = message_data.get('processed_by')
0228
0229 self.logger.info(
0230 "Processing STF data",
0231 extra=self._log_extra(
0232 stf_filename=filename, size_bytes=size_bytes,
0233 processed_by=processed_by, simulation_tick=message_data.get('simulation_tick')
0234 )
0235 )
0236
0237
0238 import time
0239 time.sleep(0.5)
0240
0241
0242 output_files = [
0243 f"{filename.replace('.dat', '.dst')}",
0244 f"{filename.replace('.dat', '.hist.root')}"
0245 ]
0246
0247
0248 self.processing_stats['total_processed'] += 1
0249
0250
0251
0252 processing_complete_message = {
0253 "msg_type": "processing_complete",
0254 "namespace": self.namespace,
0255 "filename": filename,
0256 "run_id": self.current_run_id,
0257 "input_file_url": file_url,
0258 "input_checksum": checksum,
0259 "input_size_bytes": size_bytes,
0260 "output_files": output_files,
0261 "processing_time_ms": 500,
0262 "simulation_tick": message_data.get('simulation_tick'),
0263 "processed_by": self.agent_name
0264 }
0265
0266
0267 self.register_processing_results(processing_complete_message)
0268
0269
0270 self.send_message('/queue/monitoring_agent', processing_complete_message)
0271 self.logger.info(
0272 "Sent processing_complete message",
0273 extra=self._log_extra(stf_filename=filename, destination="monitoring_agent")
0274 )
0275
0276
0277
0278 def register_processing_results(self, processing_data):
0279 """Register processing results with monitor API (legacy method for compatibility)"""
0280 filename = processing_data.get('filename')
0281
0282 try:
0283
0284
0285 self.logger.info(
0286 "Processing results registered via new processing agent specific methods",
0287 extra=self._log_extra(
0288 stf_filename=filename,
0289 output_files=len(processing_data.get('output_files', []))
0290 )
0291 )
0292
0293 except Exception as e:
0294 self.logger.error(
0295 "Error in legacy processing results registration",
0296 extra=self._log_extra(stf_filename=filename, error=str(e))
0297 )
0298
0299
0300 if __name__ == "__main__":
0301 import argparse
0302 from pathlib import Path
0303
0304 script_dir = Path(__file__).parent
0305
0306 parser = argparse.ArgumentParser(description="Processing Agent - handles workflow data processing")
0307 parser.add_argument("--debug", action="store_true", help="Enable debug logging")
0308 parser.add_argument("--testbed-config", default=None,
0309 help="Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)")
0310 args = parser.parse_args()
0311
0312 agent = ProcessingAgent(debug=args.debug, config_path=args.testbed_config)
0313 agent.run()