Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 """
0002 Example Data Agent: Handles STF generation messages.
0003 """
0004 
0005 from swf_common_lib.base_agent import BaseAgent
0006 import json
0007 import requests
0008 from datetime import datetime
0009 
0010 class DataAgent(BaseAgent):
0011     """
0012     An example agent that simulates the role of the Data Agent.
0013     It listens for 'stf_gen' messages and sends 'stf_ready' messages.
0014     """
0015 
0016     def __init__(self, debug=False, config_path=None):
0017         super().__init__(agent_type='DATA', subscription_queue='/topic/epictopic', debug=debug,
0018                          config_path=config_path)
0019         self.active_runs = {}  # Track active runs and their monitor IDs
0020         self.active_files = {}  # Track STF files being processed
0021 
0022     def on_message(self, frame):
0023         """
0024         Handles incoming DAQ messages (stf_gen, run_imminent, start_run, end_run).
0025         """
0026         # Use base class helper for consistent logging
0027         message_data, msg_type = self.log_received_message(frame)
0028         if message_data is None:
0029             return
0030 
0031         # Extract workflow context from message
0032         if 'execution_id' in message_data:
0033             self.current_execution_id = message_data['execution_id']
0034         if 'run_id' in message_data:
0035             self.current_run_id = message_data['run_id']
0036 
0037         try:
0038             if msg_type == 'stf_gen':
0039                 self.handle_stf_gen(message_data)
0040             elif msg_type == 'run_imminent':
0041                 self.handle_run_imminent(message_data)
0042             elif msg_type == 'start_run':
0043                 self.handle_start_run(message_data)
0044             elif msg_type == 'end_run':
0045                 self.handle_end_run(message_data)
0046         except Exception as e:
0047             self.logger.error(
0048                 f"CRITICAL: Message processing failed - {str(e)}",
0049                 extra=self._log_extra(error=str(e))
0050             )
0051             import traceback
0052             self.logger.error(f"Traceback: {traceback.format_exc()}")
0053             raise RuntimeError(f"Critical message processing failure: {e}") from e
0054     
0055     # Data agent specific monitor integration methods
0056     def create_run_record(self, run_id, run_conditions):
0057         """Create a run record in the monitor."""
0058         self.logger.info(f"Creating run record {run_id} in monitor...")
0059         
0060         run_data = {
0061             'run_number': int(run_id),  # Convert string run_id to integer
0062             'start_time': datetime.now().isoformat(),
0063             'run_conditions': run_conditions
0064         }
0065         
0066         try:
0067             result = self.call_monitor_api('POST', '/runs/', run_data)
0068             if result:
0069                 monitor_run_id = result.get('run_id')
0070                 self.active_runs[run_id] = {
0071                     'monitor_run_id': monitor_run_id,
0072                     'files_created': 0,
0073                     'total_files': 0
0074                 }
0075                 self.logger.info(f"Run {run_id} registered in monitor with ID {monitor_run_id}")
0076                 return monitor_run_id
0077             else:
0078                 self.logger.error(f"Failed to register run {run_id} in monitor - API returned no data")
0079                 return None
0080         except RuntimeError as e:
0081             if "400 Client Error" in str(e):
0082                 # Report the actual error details so we can see what it is
0083                 error_msg = str(e)
0084                 self.logger.error(f"Run {run_id} registration failed with 400 error: {error_msg}")
0085                 # Crash so we can examine the actual error and implement proper handling
0086                 raise
0087             else:
0088                 # Re-raise other API errors
0089                 raise
0090     
0091     def update_run_status(self, run_id, status='completed'):
0092         """Update run status in the monitor."""
0093         if run_id not in self.active_runs:
0094             self.logger.warning(f"Run {run_id} not found in active runs")
0095             return False
0096             
0097         monitor_run_id = self.active_runs[run_id]['monitor_run_id']
0098         self.logger.info(f"Updating run {run_id} status to {status} in monitor...")
0099         
0100         update_data = {
0101             'end_time': datetime.now().isoformat()
0102         }
0103         
0104         result = self.call_monitor_api('PATCH', f'/runs/{monitor_run_id}/', update_data)
0105         if result:
0106             self.logger.info(f"Run {run_id} status updated successfully")
0107             return True
0108         else:
0109             self.logger.warning(f"Failed to update run {run_id} status")
0110             return False
0111     
0112     def register_stf_file(self, run_id, filename, file_size=None, start=None, end=None, state=None, substate=None, sequence=None):
0113         """Register an STF file in the monitor."""
0114         if run_id not in self.active_runs:
0115             self.logger.warning(f"Cannot register file {filename} - run {run_id} not active")
0116             return None
0117             
0118         monitor_run_id = self.active_runs[run_id]['monitor_run_id']
0119         
0120         # Skip registration if run registration failed
0121         if monitor_run_id is None:
0122             self.logger.warning(f"Skipping STF file registration for {filename} - run {run_id} was not registered in monitor")
0123             return None
0124             
0125         self.logger.info(f"Registering STF file {filename} in monitor...")
0126         
0127         file_data = {
0128             'run': monitor_run_id,
0129             'stf_filename': filename,
0130             'file_size_bytes': file_size,
0131             'machine_state': state or 'unknown',
0132             'status': 'registered',
0133             'metadata': {
0134                 'created_by': self.agent_name,
0135                 'substate': substate,
0136                 'start': start,
0137                 'end': end,
0138                 'sequence': sequence
0139             }
0140         }
0141         
0142         try:
0143             result = self.call_monitor_api('POST', '/stf-files/', file_data)
0144             if result:
0145                 file_id = result.get('file_id')
0146                 self.active_files[filename] = {
0147                     'file_id': file_id,
0148                     'run_id': run_id,
0149                     'status': 'registered'
0150                 }
0151                 self.active_runs[run_id]['files_created'] += 1
0152                 self.logger.info(f"STF file {filename} registered with ID {file_id}")
0153                 return file_id
0154             else:
0155                 self.logger.warning(f"Failed to register STF file {filename} - API returned no data")
0156                 return None
0157         except RuntimeError as e:
0158             if "400 Client Error" in str(e):
0159                 # Parse the actual error response to understand what went wrong
0160                 error_msg = str(e)
0161                 self.logger.error(f"STF file {filename} registration failed with 400 error: {error_msg}")
0162                 return None
0163             else:
0164                 # Re-raise other API errors
0165                 raise
0166     
0167     def update_stf_file_status(self, filename, status):
0168         """Update STF file status in the monitor."""
0169         if filename not in self.active_files:
0170             self.logger.warning(f"File {filename} not found in active files")
0171             return False
0172             
0173         file_info = self.active_files[filename]
0174         file_id = file_info['file_id']
0175         self.logger.info(f"Updating STF file {filename} status to {status}...")
0176         
0177         update_data = {
0178             'status': status,
0179             'metadata': {'processed_by': self.agent_name, 'updated_at': datetime.now().isoformat()}
0180         }
0181         
0182         result = self.call_monitor_api('PATCH', f'/stf-files/{file_id}/', update_data)
0183         if result:
0184             self.active_files[filename]['status'] = status
0185             self.logger.info(f"STF file {filename} status updated to {status}")
0186             return True
0187         else:
0188             self.logger.warning(f"Failed to update STF file {filename} status")
0189             return False
0190     
0191     def send_data_agent_heartbeat(self):
0192         """Send enhanced heartbeat with data agent context."""
0193         workflow_metadata = {
0194             'active_runs': len(self.active_runs),
0195             'active_files': len(self.active_files),
0196             'completed_tasks': sum(run['files_created'] for run in self.active_runs.values())
0197         }
0198         
0199         return self.send_enhanced_heartbeat(workflow_metadata)
0200 
0201     def handle_run_imminent(self, message_data):
0202         """Handle run_imminent message - create dataset in Rucio"""
0203         run_id = message_data.get('run_id')
0204         run_conditions = message_data.get('run_conditions', {})
0205         self.logger.info("Processing run_imminent message",
0206                         extra=self._log_extra(simulation_tick=message_data.get('simulation_tick')))
0207         
0208         # Create run record in monitor
0209         monitor_run_id = self.create_run_record(run_id, run_conditions)
0210         
0211         # TODO: Call Rucio to create dataset for this run
0212         
0213         # Simulate dataset creation
0214         if monitor_run_id:
0215             self.logger.info("Created dataset for run", extra=self._log_extra(monitor_run_id=monitor_run_id))
0216         else:
0217             self.logger.warning("Dataset created but monitor registration failed", extra=self._log_extra())
0218 
0219     def handle_start_run(self, message_data):
0220         """Handle start_run message - run is starting physics"""
0221         run_id = message_data.get('run_id')
0222         self.logger.info("Processing start_run message",
0223                         extra=self._log_extra(simulation_tick=message_data.get('simulation_tick')))
0224         
0225         # Send enhanced heartbeat with run context
0226         self.send_data_agent_heartbeat()
0227 
0228         self.logger.info("Run started", extra=self._log_extra())
0229 
0230     def handle_end_run(self, message_data):
0231         """Handle end_run message - run has ended"""
0232         run_id = message_data.get('run_id')
0233         total_files = message_data.get('total_files', 0)
0234         self.logger.info("Processing end_run message",
0235                         extra=self._log_extra(total_files=total_files, simulation_tick=message_data.get('simulation_tick')))
0236         
0237         # Update run status in monitor API
0238         if run_id in self.active_runs:
0239             self.active_runs[run_id]['total_files'] = total_files
0240             self.update_run_status(run_id, 'completed')
0241         
0242         # TODO: Finalize dataset in Rucio
0243         
0244         # Send final heartbeat and clean up
0245         self.send_data_agent_heartbeat()
0246         if run_id in self.active_runs:
0247             del self.active_runs[run_id]
0248 
0249         self.logger.info("Run ended", extra=self._log_extra(total_files=total_files))
0250 
0251     def handle_stf_gen(self, message_data):
0252         """Handle stf_gen message - new STF file available"""
0253         filename = message_data.get('filename')
0254         run_id = message_data.get('run_id')
0255         file_url = message_data.get('file_url')
0256         checksum = message_data.get('checksum')
0257         size_bytes = message_data.get('size_bytes')
0258         # Capture timing, state, and sequence fields
0259         start = message_data.get('start')
0260         end = message_data.get('end')
0261         state = message_data.get('state')
0262         substate = message_data.get('substate')
0263         sequence = message_data.get('sequence')
0264 
0265         self.logger.info("Processing STF file",
0266                         extra=self._log_extra(stf_filename=filename, size_bytes=size_bytes,
0267                                              simulation_tick=message_data.get('simulation_tick')))
0268 
0269         # Register STF file and workflow with monitor
0270         self.register_stf_file(run_id, filename, size_bytes, start, end, state, substate, sequence)
0271         
0272         # TODO: Register STF file with Rucio
0273         # TODO: Initiate transfer to E1 facilities  
0274         
0275         # Simulate processing time
0276         import time
0277         time.sleep(0.1)
0278         
0279         # Send stf_ready message to processing agent
0280         # namespace is also auto-injected by BaseAgent.send_message()
0281         stf_ready_message = {
0282             "msg_type": "stf_ready",
0283             "namespace": self.namespace,
0284             "filename": filename,
0285             "run_id": run_id,
0286             "file_url": file_url,
0287             "checksum": checksum,
0288             "size_bytes": size_bytes,
0289             "start": start,
0290             "end": end,
0291             "state": state,
0292             "substate": substate,
0293             "sequence": sequence,
0294             "simulation_tick": message_data.get('simulation_tick'),
0295             "processed_by": self.agent_name
0296         }
0297         
0298         self.send_message('/topic/epictopic', stf_ready_message)
0299 
0300         # Update STF file status to processed
0301         self.update_stf_file_status(filename, 'processed')
0302 
0303         self.logger.info("Sent stf_ready message",
0304                         extra=self._log_extra(stf_filename=filename, destination="epictopic"))
0305 
0306 
0307     
0308     
0309     
0310     def _parse_time_string(self, time_str):
0311         """Parse time string from DAQ simulator format to ISO format"""
0312         if not time_str:
0313             return datetime.now().isoformat()
0314         try:
0315             # Convert from format like '20250801143000' to ISO format
0316             dt = datetime.strptime(time_str, '%Y%m%d%H%M%S')
0317             return dt.isoformat()
0318         except ValueError as e:
0319             self.logger.error(f"Failed to parse time string '{time_str}': {e}")
0320             raise RuntimeError(f"Critical time parsing failure: {e}") from e
0321 
0322 
0323 if __name__ == "__main__":
0324     import argparse
0325     from pathlib import Path
0326 
0327     script_dir = Path(__file__).parent
0328 
0329     parser = argparse.ArgumentParser(description="Data Agent - handles STF files and run management")
0330     parser.add_argument("--debug", action="store_true", help="Enable debug logging")
0331     parser.add_argument("--testbed-config", default=None,
0332                         help="Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)")
0333     args = parser.parse_args()
0334 
0335     agent = DataAgent(debug=args.debug, config_path=args.testbed_config)
0336     agent.run()