File indexing completed on 2026-04-27 07:41:45
0001 """
0002 Example Data Agent: Handles STF generation messages.
0003 """
0004
0005 from swf_common_lib.base_agent import BaseAgent
0006 import json
0007 import requests
0008 from datetime import datetime
0009
0010 class DataAgent(BaseAgent):
0011 """
0012 An example agent that simulates the role of the Data Agent.
0013 It listens for 'stf_gen' messages and sends 'stf_ready' messages.
0014 """
0015
0016 def __init__(self, debug=False, config_path=None):
0017 super().__init__(agent_type='DATA', subscription_queue='/topic/epictopic', debug=debug,
0018 config_path=config_path)
0019 self.active_runs = {}
0020 self.active_files = {}
0021
0022 def on_message(self, frame):
0023 """
0024 Handles incoming DAQ messages (stf_gen, run_imminent, start_run, end_run).
0025 """
0026
0027 message_data, msg_type = self.log_received_message(frame)
0028 if message_data is None:
0029 return
0030
0031
0032 if 'execution_id' in message_data:
0033 self.current_execution_id = message_data['execution_id']
0034 if 'run_id' in message_data:
0035 self.current_run_id = message_data['run_id']
0036
0037 try:
0038 if msg_type == 'stf_gen':
0039 self.handle_stf_gen(message_data)
0040 elif msg_type == 'run_imminent':
0041 self.handle_run_imminent(message_data)
0042 elif msg_type == 'start_run':
0043 self.handle_start_run(message_data)
0044 elif msg_type == 'end_run':
0045 self.handle_end_run(message_data)
0046 except Exception as e:
0047 self.logger.error(
0048 f"CRITICAL: Message processing failed - {str(e)}",
0049 extra=self._log_extra(error=str(e))
0050 )
0051 import traceback
0052 self.logger.error(f"Traceback: {traceback.format_exc()}")
0053 raise RuntimeError(f"Critical message processing failure: {e}") from e
0054
0055
0056 def create_run_record(self, run_id, run_conditions):
0057 """Create a run record in the monitor."""
0058 self.logger.info(f"Creating run record {run_id} in monitor...")
0059
0060 run_data = {
0061 'run_number': int(run_id),
0062 'start_time': datetime.now().isoformat(),
0063 'run_conditions': run_conditions
0064 }
0065
0066 try:
0067 result = self.call_monitor_api('POST', '/runs/', run_data)
0068 if result:
0069 monitor_run_id = result.get('run_id')
0070 self.active_runs[run_id] = {
0071 'monitor_run_id': monitor_run_id,
0072 'files_created': 0,
0073 'total_files': 0
0074 }
0075 self.logger.info(f"Run {run_id} registered in monitor with ID {monitor_run_id}")
0076 return monitor_run_id
0077 else:
0078 self.logger.error(f"Failed to register run {run_id} in monitor - API returned no data")
0079 return None
0080 except RuntimeError as e:
0081 if "400 Client Error" in str(e):
0082
0083 error_msg = str(e)
0084 self.logger.error(f"Run {run_id} registration failed with 400 error: {error_msg}")
0085
0086 raise
0087 else:
0088
0089 raise
0090
0091 def update_run_status(self, run_id, status='completed'):
0092 """Update run status in the monitor."""
0093 if run_id not in self.active_runs:
0094 self.logger.warning(f"Run {run_id} not found in active runs")
0095 return False
0096
0097 monitor_run_id = self.active_runs[run_id]['monitor_run_id']
0098 self.logger.info(f"Updating run {run_id} status to {status} in monitor...")
0099
0100 update_data = {
0101 'end_time': datetime.now().isoformat()
0102 }
0103
0104 result = self.call_monitor_api('PATCH', f'/runs/{monitor_run_id}/', update_data)
0105 if result:
0106 self.logger.info(f"Run {run_id} status updated successfully")
0107 return True
0108 else:
0109 self.logger.warning(f"Failed to update run {run_id} status")
0110 return False
0111
0112 def register_stf_file(self, run_id, filename, file_size=None, start=None, end=None, state=None, substate=None, sequence=None):
0113 """Register an STF file in the monitor."""
0114 if run_id not in self.active_runs:
0115 self.logger.warning(f"Cannot register file {filename} - run {run_id} not active")
0116 return None
0117
0118 monitor_run_id = self.active_runs[run_id]['monitor_run_id']
0119
0120
0121 if monitor_run_id is None:
0122 self.logger.warning(f"Skipping STF file registration for {filename} - run {run_id} was not registered in monitor")
0123 return None
0124
0125 self.logger.info(f"Registering STF file {filename} in monitor...")
0126
0127 file_data = {
0128 'run': monitor_run_id,
0129 'stf_filename': filename,
0130 'file_size_bytes': file_size,
0131 'machine_state': state or 'unknown',
0132 'status': 'registered',
0133 'metadata': {
0134 'created_by': self.agent_name,
0135 'substate': substate,
0136 'start': start,
0137 'end': end,
0138 'sequence': sequence
0139 }
0140 }
0141
0142 try:
0143 result = self.call_monitor_api('POST', '/stf-files/', file_data)
0144 if result:
0145 file_id = result.get('file_id')
0146 self.active_files[filename] = {
0147 'file_id': file_id,
0148 'run_id': run_id,
0149 'status': 'registered'
0150 }
0151 self.active_runs[run_id]['files_created'] += 1
0152 self.logger.info(f"STF file {filename} registered with ID {file_id}")
0153 return file_id
0154 else:
0155 self.logger.warning(f"Failed to register STF file {filename} - API returned no data")
0156 return None
0157 except RuntimeError as e:
0158 if "400 Client Error" in str(e):
0159
0160 error_msg = str(e)
0161 self.logger.error(f"STF file {filename} registration failed with 400 error: {error_msg}")
0162 return None
0163 else:
0164
0165 raise
0166
0167 def update_stf_file_status(self, filename, status):
0168 """Update STF file status in the monitor."""
0169 if filename not in self.active_files:
0170 self.logger.warning(f"File {filename} not found in active files")
0171 return False
0172
0173 file_info = self.active_files[filename]
0174 file_id = file_info['file_id']
0175 self.logger.info(f"Updating STF file {filename} status to {status}...")
0176
0177 update_data = {
0178 'status': status,
0179 'metadata': {'processed_by': self.agent_name, 'updated_at': datetime.now().isoformat()}
0180 }
0181
0182 result = self.call_monitor_api('PATCH', f'/stf-files/{file_id}/', update_data)
0183 if result:
0184 self.active_files[filename]['status'] = status
0185 self.logger.info(f"STF file {filename} status updated to {status}")
0186 return True
0187 else:
0188 self.logger.warning(f"Failed to update STF file {filename} status")
0189 return False
0190
0191 def send_data_agent_heartbeat(self):
0192 """Send enhanced heartbeat with data agent context."""
0193 workflow_metadata = {
0194 'active_runs': len(self.active_runs),
0195 'active_files': len(self.active_files),
0196 'completed_tasks': sum(run['files_created'] for run in self.active_runs.values())
0197 }
0198
0199 return self.send_enhanced_heartbeat(workflow_metadata)
0200
0201 def handle_run_imminent(self, message_data):
0202 """Handle run_imminent message - create dataset in Rucio"""
0203 run_id = message_data.get('run_id')
0204 run_conditions = message_data.get('run_conditions', {})
0205 self.logger.info("Processing run_imminent message",
0206 extra=self._log_extra(simulation_tick=message_data.get('simulation_tick')))
0207
0208
0209 monitor_run_id = self.create_run_record(run_id, run_conditions)
0210
0211
0212
0213
0214 if monitor_run_id:
0215 self.logger.info("Created dataset for run", extra=self._log_extra(monitor_run_id=monitor_run_id))
0216 else:
0217 self.logger.warning("Dataset created but monitor registration failed", extra=self._log_extra())
0218
0219 def handle_start_run(self, message_data):
0220 """Handle start_run message - run is starting physics"""
0221 run_id = message_data.get('run_id')
0222 self.logger.info("Processing start_run message",
0223 extra=self._log_extra(simulation_tick=message_data.get('simulation_tick')))
0224
0225
0226 self.send_data_agent_heartbeat()
0227
0228 self.logger.info("Run started", extra=self._log_extra())
0229
0230 def handle_end_run(self, message_data):
0231 """Handle end_run message - run has ended"""
0232 run_id = message_data.get('run_id')
0233 total_files = message_data.get('total_files', 0)
0234 self.logger.info("Processing end_run message",
0235 extra=self._log_extra(total_files=total_files, simulation_tick=message_data.get('simulation_tick')))
0236
0237
0238 if run_id in self.active_runs:
0239 self.active_runs[run_id]['total_files'] = total_files
0240 self.update_run_status(run_id, 'completed')
0241
0242
0243
0244
0245 self.send_data_agent_heartbeat()
0246 if run_id in self.active_runs:
0247 del self.active_runs[run_id]
0248
0249 self.logger.info("Run ended", extra=self._log_extra(total_files=total_files))
0250
0251 def handle_stf_gen(self, message_data):
0252 """Handle stf_gen message - new STF file available"""
0253 filename = message_data.get('filename')
0254 run_id = message_data.get('run_id')
0255 file_url = message_data.get('file_url')
0256 checksum = message_data.get('checksum')
0257 size_bytes = message_data.get('size_bytes')
0258
0259 start = message_data.get('start')
0260 end = message_data.get('end')
0261 state = message_data.get('state')
0262 substate = message_data.get('substate')
0263 sequence = message_data.get('sequence')
0264
0265 self.logger.info("Processing STF file",
0266 extra=self._log_extra(stf_filename=filename, size_bytes=size_bytes,
0267 simulation_tick=message_data.get('simulation_tick')))
0268
0269
0270 self.register_stf_file(run_id, filename, size_bytes, start, end, state, substate, sequence)
0271
0272
0273
0274
0275
0276 import time
0277 time.sleep(0.1)
0278
0279
0280
0281 stf_ready_message = {
0282 "msg_type": "stf_ready",
0283 "namespace": self.namespace,
0284 "filename": filename,
0285 "run_id": run_id,
0286 "file_url": file_url,
0287 "checksum": checksum,
0288 "size_bytes": size_bytes,
0289 "start": start,
0290 "end": end,
0291 "state": state,
0292 "substate": substate,
0293 "sequence": sequence,
0294 "simulation_tick": message_data.get('simulation_tick'),
0295 "processed_by": self.agent_name
0296 }
0297
0298 self.send_message('/topic/epictopic', stf_ready_message)
0299
0300
0301 self.update_stf_file_status(filename, 'processed')
0302
0303 self.logger.info("Sent stf_ready message",
0304 extra=self._log_extra(stf_filename=filename, destination="epictopic"))
0305
0306
0307
0308
0309
0310 def _parse_time_string(self, time_str):
0311 """Parse time string from DAQ simulator format to ISO format"""
0312 if not time_str:
0313 return datetime.now().isoformat()
0314 try:
0315
0316 dt = datetime.strptime(time_str, '%Y%m%d%H%M%S')
0317 return dt.isoformat()
0318 except ValueError as e:
0319 self.logger.error(f"Failed to parse time string '{time_str}': {e}")
0320 raise RuntimeError(f"Critical time parsing failure: {e}") from e
0321
0322
0323 if __name__ == "__main__":
0324 import argparse
0325 from pathlib import Path
0326
0327 script_dir = Path(__file__).parent
0328
0329 parser = argparse.ArgumentParser(description="Data Agent - handles STF files and run management")
0330 parser.add_argument("--debug", action="store_true", help="Enable debug logging")
0331 parser.add_argument("--testbed-config", default=None,
0332 help="Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)")
0333 args = parser.parse_args()
0334
0335 agent = DataAgent(debug=args.debug, config_path=args.testbed_config)
0336 agent.run()