File indexing completed on 2026-04-27 07:41:45
0001 """
0002 Fast Processing Agent: Creates TF slices from STF samples for PanDA workers.
0003
0004 This agent:
0005 1. Receives tf_file_registered messages from FastMon Agent (via epictopic)
0006 2. Creates TF slices from STF samples
0007 3. Pushes TF slices to PanDA transformer queue (/queue/panda.transformer.slices)
0008 4. Maintains RunState and TFSlice records in the monitor database
0009
0010 Pipeline: FastMon Agent [tf_file_registered] -> Fast Processing Agent [TF slices] -> PanDA Workers
0011
0012 Message format specification: https://github.com/wguanicedew/iDDS/blob/dev/main/prompt.md
0013 """
0014
0015 import uuid
0016 from datetime import datetime
0017 from swf_common_lib.base_agent import BaseAgent
0018
0019
0020 class FastProcessingAgent(BaseAgent):
0021 """
0022 Fast Processing Agent for TF slice creation and distribution.
0023
0024 Subscribes to epictopic, receives tf_file_registered from FastMon,
0025 creates TF slices, and pushes them to the PanDA transformer queue.
0026 """
0027
0028
0029 TRANSFORMER_QUEUE = '/topic/panda.slices'
0030
0031
0032 WORKER_BROADCAST_TOPIC = '/topic/panda.workers'
0033
0034
0035 TRANSFORMER_RESULTS_QUEUE = '/queue/panda.results.fastprocessing'
0036
0037 def __init__(self, debug=False, config_path=None):
0038 super().__init__(
0039 agent_type='Fast_Processing',
0040 subscription_queues=['/topic/epictopic', self.TRANSFORMER_RESULTS_QUEUE],
0041 debug=debug,
0042 config_path=config_path
0043 )
0044
0045
0046 self.workflow_params = {}
0047
0048
0049 self.tf_files_received = 0
0050 self.slices_created = 0
0051
0052
0053 self.stats = {
0054 'tf_files_received': 0,
0055 'slices_created': 0,
0056 'slices_sent': 0,
0057 'results_received': 0,
0058 'results_done': 0,
0059 'results_failed': 0
0060 }
0061
0062 def on_message(self, frame):
0063 """Handle incoming workflow messages."""
0064 message_data, msg_type = self.log_received_message(frame)
0065 if message_data is None:
0066 return
0067
0068
0069 self._update_run_context(message_data)
0070
0071 try:
0072 if msg_type == 'run_imminent':
0073 self.handle_run_imminent(message_data)
0074 elif msg_type == 'start_run':
0075 self.handle_start_run(message_data)
0076 elif msg_type == 'tf_file_registered':
0077 self.handle_tf_file_registered(message_data)
0078 elif msg_type == 'pause_run':
0079 self.handle_pause_run(message_data)
0080 elif msg_type == 'resume_run':
0081 self.handle_resume_run(message_data)
0082 elif msg_type == 'end_run':
0083 self.handle_end_run(message_data)
0084 elif msg_type == 'slice_result':
0085 self.handle_slice_result(message_data)
0086 else:
0087 self.logger.debug(f"Ignoring message type: {msg_type}")
0088 except Exception as e:
0089 self.logger.error(f"Error processing {msg_type}: {e}",
0090 extra=self._log_extra(error=str(e)))
0091 import traceback
0092 self.logger.error(traceback.format_exc())
0093
0094 def _update_run_context(self, message_data):
0095 """
0096 Update run context from message. Agents may start mid-run and miss run_imminent,
0097 so we extract run_id/execution_id from every message and fetch params if needed.
0098 """
0099 run_id = message_data.get('run_id')
0100 execution_id = message_data.get('execution_id')
0101
0102
0103 if run_id and run_id != self.current_run_id:
0104 self.current_run_id = run_id
0105
0106 self.tf_files_received = 0
0107 self.slices_created = 0
0108 self.stats = {
0109 'tf_files_received': 0,
0110 'slices_created': 0,
0111 'slices_sent': 0
0112 }
0113
0114 if execution_id and execution_id != self.current_execution_id:
0115 self.current_execution_id = execution_id
0116
0117 if not self.workflow_params:
0118 self.workflow_params = self._fetch_workflow_parameters(execution_id)
0119 if self.workflow_params:
0120 import json
0121 self.logger.info(f"Workflow parameters loaded (mid-run): {json.dumps(self.workflow_params, indent=2, sort_keys=True)}")
0122
0123 def handle_run_imminent(self, message_data):
0124 """Handle run_imminent message."""
0125 self.logger.info(
0126 f"Run imminent: execution_id={self.current_execution_id}, run_id={self.current_run_id}",
0127 extra=self._log_extra()
0128 )
0129
0130 self._log_system_event('run_imminent', {
0131 'execution_id': self.current_execution_id,
0132 'target_worker_count': self.workflow_params.get("fast_processing", {}).get('target_worker_count', 0),
0133 'stf_sampling_rate': self.workflow_params.get("fast_processing", {}).get('stf_sampling_rate', 0),
0134 'slices_per_sample': self.workflow_params.get("fast_processing", {}).get('slices_per_sample', 0)
0135 })
0136
0137
0138 try:
0139
0140
0141
0142 content = dict(message_data or {})
0143 content.update({
0144 'execution_id': self.current_execution_id,
0145 'target_worker_count': self.workflow_params.get("fast_processing", {}).get('target_worker_count', 1),
0146 'slice_processing_time': self.workflow_params.get("fast_processing", {}).get('slice_processing_time', 1),
0147 'worker_rampup_time': self.workflow_params.get("fast_processing", {}).get('worker_rampup_time', 1),
0148 'worker_rampdown_time': self.workflow_params.get("fast_processing", {}).get('worker_rampdown_time', 1)
0149 })
0150
0151 message = {
0152 'msg_type': 'run_imminent',
0153 'run_id': self.current_run_id,
0154 'created_at': datetime.utcnow().isoformat(),
0155 'content': content
0156 }
0157
0158
0159 worker_topic = self.WORKER_BROADCAST_TOPIC
0160
0161 headers = {'persistent': 'false'}
0162 self.send_message(worker_topic, message, headers=headers)
0163
0164 self.logger.info(f"Broadcasted run_imminent to workers: {worker_topic}",
0165 extra=self._log_extra(destination=worker_topic))
0166 except Exception as e:
0167 self.logger.error(f"Failed to broadcast run_imminent to workers: {e}",
0168 extra=self._log_extra(error=str(e)))
0169
0170 def handle_start_run(self, message_data):
0171 """Handle start_run: Update RunState phase to 'physics'."""
0172 self.logger.info(f"Run started: run_id={self.current_run_id}",
0173 extra=self._log_extra())
0174
0175
0176 self.set_processing()
0177
0178 self._update_run_state(phase='physics', state='running', substate='physics')
0179
0180 self._log_system_event('start_run', {
0181 'execution_id': self.current_execution_id
0182 })
0183
0184 def handle_tf_file_registered(self, message_data):
0185 """
0186 Handle tf_file_registered from FastMon: Create TF slices, push to worker queue.
0187 """
0188 tf_filename = message_data.get('tf_filename')
0189 stf_filename = message_data.get('stf_filename')
0190
0191 self.stats['tf_files_received'] += 1
0192 self.tf_files_received += 1
0193
0194 self.logger.info(f"TF file registered: {tf_filename} (from STF: {stf_filename})",
0195 extra=self._log_extra(tf_filename=tf_filename, stf_filename=stf_filename))
0196
0197
0198 fast_processing = self.workflow_params.get('fast_processing', {})
0199 slices_per_sample = fast_processing.get('slices_per_sample', 15)
0200
0201
0202 slices = self._create_tf_slices(stf_filename, slices_per_sample)
0203
0204
0205 for slice_data in slices:
0206 self._send_slice_to_queue(slice_data)
0207
0208
0209 self._update_run_state_slices(len(slices))
0210
0211
0212 self._log_system_event('tf_file_processed', {
0213 'tf_filename': tf_filename,
0214 'stf_filename': stf_filename,
0215 'slices_created': len(slices)
0216 })
0217
0218 def handle_pause_run(self, message_data):
0219 """Handle pause_run: Update RunState to standby."""
0220 self.logger.info(f"Run paused: run_id={self.current_run_id}",
0221 extra=self._log_extra())
0222
0223 self._update_run_state(substate='standby')
0224
0225 self._log_system_event('pause_run', {
0226 'execution_id': self.current_execution_id
0227 })
0228
0229 def handle_resume_run(self, message_data):
0230 """Handle resume_run: Update RunState back to physics."""
0231 self.logger.info(f"Run resumed: run_id={self.current_run_id}",
0232 extra=self._log_extra())
0233
0234 self._update_run_state(substate='physics')
0235
0236 self._log_system_event('resume_run', {
0237 'execution_id': self.current_execution_id
0238 })
0239
0240 def handle_end_run(self, message_data):
0241 """Handle end_run: Update RunState to completed."""
0242 total_stf = message_data.get('total_stf_files', 0)
0243
0244 self.logger.info(
0245 f"Run ended: run_id={self.current_run_id}, "
0246 f"tf_files_received={self.stats['tf_files_received']}, "
0247 f"slices_created={self.stats['slices_created']}",
0248 extra=self._log_extra(total_stf=total_stf,
0249 tf_files_received=self.stats['tf_files_received'],
0250 slices_created=self.stats['slices_created'])
0251 )
0252
0253 self._update_run_state(phase='completed', state='ended', substate=None)
0254
0255 self._log_system_event('end_run', {
0256 'execution_id': self.current_execution_id,
0257 'total_tf_files_received': self.stats['tf_files_received'],
0258 'total_slices_created': self.stats['slices_created'],
0259 'total_slices_sent': self.stats['slices_sent']
0260 })
0261
0262
0263 try:
0264
0265
0266
0267 content = dict(message_data or {})
0268 content.update({
0269 'execution_id': self.current_execution_id
0270 })
0271
0272 message = {
0273 'msg_type': 'end_run',
0274 'run_id': self.current_run_id,
0275 'created_at': datetime.utcnow().isoformat(),
0276 'content': content
0277 }
0278
0279 worker_topic = self.WORKER_BROADCAST_TOPIC
0280
0281 headers = {'persistent': 'false'}
0282 self.send_message(worker_topic, message, headers=headers)
0283
0284 self.logger.info(f"Broadcasted end_run to workers: {worker_topic}",
0285 extra=self._log_extra(destination=worker_topic))
0286 except Exception as e:
0287 self.logger.error(f"Failed to broadcast end_run to workers: {e}",
0288 extra=self._log_extra(error=str(e)))
0289
0290
0291 self.current_run_id = None
0292 self.current_execution_id = None
0293 self.workflow_params = {}
0294
0295
0296 self.set_ready()
0297
0298 def handle_slice_result(self, message_data):
0299 """Process slice_result messages from transformer workers."""
0300 self.stats['results_received'] += 1
0301
0302 content = message_data.get('content', {})
0303 result = content.get('result') if isinstance(content, dict) else None
0304
0305 self.logger.info(
0306 f"Slice result received: run={message_data.get('run_id')}, "
0307 f"state={content.get('state') if isinstance(content, dict) else 'unknown'}",
0308 extra=self._log_extra(run_id=message_data.get('run_id'))
0309 )
0310
0311
0312 try:
0313 inner_result = None
0314 if result and isinstance(result, dict):
0315 inner_result = result.get('result') if isinstance(result.get('result'), dict) else None
0316
0317 state = content.get('state') or (inner_result.get('state') if inner_result else None)
0318 if state == 'done' or (inner_result and inner_result.get('processed')):
0319 self.stats['results_done'] += 1
0320 else:
0321 self.stats['results_failed'] += 1
0322 except Exception:
0323 pass
0324
0325
0326 self._update_tfslice_from_result(message_data, content, result)
0327
0328
0329 self._log_system_event('slice_result', {
0330 'message': message_data,
0331 'state': content.get('state') if isinstance(content, dict) else None,
0332 'results_received': self.stats['results_received'],
0333 'results_done': self.stats['results_done'],
0334 'results_failed': self.stats['results_failed']
0335 })
0336
0337 self.logger.info(f"Handled slice_result: run={message_data.get('run_id')}, msg={message_data.get('msg_type')}",
0338 extra=self._log_extra(run_id=message_data.get('run_id')))
0339
0340
0341
0342
0343
0344 def _fetch_workflow_parameters(self, execution_id):
0345 """Fetch workflow parameters from WorkflowExecution API."""
0346 try:
0347 result = self.call_monitor_api(
0348 'GET',
0349 f'/workflow-executions/{execution_id}/'
0350 )
0351 if result:
0352 return result.get('parameter_values', {})
0353 return {}
0354 except Exception as e:
0355 self.logger.error(f"Failed to fetch workflow parameters: {e}",
0356 extra=self._log_extra(error=str(e)))
0357 return {}
0358
0359 def _update_run_state(self, phase=None, state=None, substate=None):
0360 """Update RunState record."""
0361 update_data = {
0362 'state_changed_at': datetime.now().isoformat()
0363 }
0364 if phase is not None:
0365 update_data['phase'] = phase
0366 if state is not None:
0367 update_data['state'] = state
0368 if substate is not None:
0369 update_data['substate'] = substate
0370
0371 try:
0372 result = self.call_monitor_api(
0373 'PATCH',
0374 f'/run-states/{self.current_run_id}/',
0375 update_data
0376 )
0377 if result:
0378 self.logger.debug(f"RunState updated: {update_data}", extra=self._log_extra())
0379 except Exception as e:
0380 self.logger.error(f"Error updating RunState: {e}",
0381 extra=self._log_extra(error=str(e)))
0382
0383 def _update_run_state_slices(self, new_slices_count):
0384 """Update RunState with new slice counts."""
0385
0386 try:
0387 current = self.call_monitor_api('GET', f'/run-states/{self.current_run_id}/')
0388 if current:
0389 update_data = {
0390 'stf_samples_received': current.get('stf_samples_received', 0) + 1,
0391 'slices_created': current.get('slices_created', 0) + new_slices_count,
0392 'slices_queued': current.get('slices_queued', 0) + new_slices_count,
0393 'state_changed_at': datetime.now().isoformat()
0394 }
0395 self.call_monitor_api(
0396 'PATCH',
0397 f'/run-states/{self.current_run_id}/',
0398 update_data
0399 )
0400 except Exception as e:
0401 self.logger.error(f"Error updating RunState slices: {e}",
0402 extra=self._log_extra(error=str(e)))
0403
0404 def _create_tf_slices(self, stf_filename, num_slices):
0405 """
0406 Create TF slice records in database.
0407
0408 Returns list of slice data dictionaries for sending to queue.
0409 """
0410 slices = []
0411
0412
0413 tfs_per_stf = 1000
0414 tfs_per_slice = tfs_per_stf // num_slices
0415
0416 for i in range(num_slices):
0417 tf_first = i * tfs_per_slice
0418 tf_last = (i + 1) * tfs_per_slice - 1 if i < num_slices - 1 else tfs_per_stf - 1
0419 tf_count = tf_last - tf_first + 1
0420
0421
0422 tf_filename = f"{stf_filename.replace('.stf', '')}_slice_{i:03d}.tf"
0423
0424 slice_data = {
0425 'slice_id': i,
0426 'tf_first': tf_first,
0427 'tf_last': tf_last,
0428 'tf_count': tf_count,
0429 'tf_filename': tf_filename,
0430 'stf_filename': stf_filename,
0431 'run_number': self.current_run_id,
0432 'status': 'queued',
0433 'retries': 0,
0434 'metadata': {
0435 'execution_id': self.current_execution_id,
0436 'created_by': self.agent_name
0437 }
0438 }
0439
0440
0441 try:
0442 result = self.call_monitor_api('POST', '/tf-slices/', slice_data)
0443 if result:
0444 self.stats['slices_created'] += 1
0445 self.slices_created += 1
0446
0447 slice_data['db_id'] = result.get('id')
0448 slices.append(slice_data)
0449 self.logger.debug(f"TFSlice created: {tf_filename}",
0450 extra=self._log_extra(tf_filename=tf_filename))
0451 else:
0452 self.logger.warning(f"Failed to create TFSlice: {tf_filename}",
0453 extra=self._log_extra(tf_filename=tf_filename))
0454 except Exception as e:
0455 self.logger.error(f"Error creating TFSlice {tf_filename}: {e}",
0456 extra=self._log_extra(tf_filename=tf_filename, error=str(e)))
0457
0458 return slices
0459
0460 def _send_slice_to_queue(self, slice_data):
0461 """
0462 Send slice message to transformer queue.
0463
0464 Message format per Wen's iDDS design.
0465 """
0466
0467 message = {
0468 'msg_type': 'slice',
0469 'run_id': self.current_run_id,
0470 'created_at': datetime.utcnow().isoformat(),
0471 'content': {
0472 'run_id': self.current_run_id,
0473 'execution_id': self.current_execution_id,
0474 'req_id': str(uuid.uuid4()),
0475 'filename': slice_data['stf_filename'],
0476 'tf_filename': slice_data['tf_filename'],
0477 'slice_id': slice_data['slice_id'],
0478 'start': slice_data['tf_first'],
0479 'end': slice_data['tf_last'],
0480 'tf_count': slice_data['tf_count'],
0481 'state': 'queued',
0482 'substate': 'new'
0483 }
0484 }
0485
0486
0487 try:
0488
0489 headers = {
0490 'persistent': 'true',
0491 'ttl': str(12 * 3600 * 1000)
0492 }
0493 self.send_message(self.TRANSFORMER_QUEUE, message, headers=headers)
0494
0495 self.stats['slices_sent'] += 1
0496 self.logger.info(
0497 f"Slice sent to queue: {slice_data['tf_filename']} -> {self.TRANSFORMER_QUEUE}",
0498 extra=self._log_extra(tf_filename=slice_data['tf_filename'], destination=self.TRANSFORMER_QUEUE)
0499 )
0500 except Exception as e:
0501 self.logger.error(f"Failed to send slice to queue: {e}",
0502 extra=self._log_extra(error=str(e)))
0503
0504 def _log_system_event(self, event_type, event_data):
0505 """Log event to SystemStateEvent table."""
0506 event = {
0507 'timestamp': datetime.now().isoformat(),
0508 'run_number': self.current_run_id,
0509 'event_type': event_type,
0510 'state': self.workflow_params.get('state', 'unknown'),
0511 'substate': self.workflow_params.get('substate'),
0512 'event_data': event_data
0513 }
0514
0515 try:
0516 self.call_monitor_api('POST', '/system-state-events/', event)
0517 except Exception as e:
0518 self.logger.debug(f"Failed to log system event: {e}",
0519 extra=self._log_extra(event_type=event_type, error=str(e)))
0520
0521 def _update_tfslice_from_result(self, message_data, content, result):
0522 """Update TFSlice record in database based on slice_result message."""
0523 try:
0524
0525
0526 inner_result = None
0527 if result and isinstance(result, dict):
0528 inner_result = result.get('result') if isinstance(result.get('result'), dict) else None
0529
0530
0531 slice_id = None
0532 tf_filename = None
0533 if inner_result and isinstance(inner_result, dict):
0534 slice_id = inner_result.get('slice_id')
0535 tf_filename = inner_result.get('tf_filename')
0536
0537 if slice_id is None:
0538 self.logger.debug("No slice_id in result, cannot update TFSlice record")
0539 return
0540
0541
0542 state = content.get('state') if isinstance(content, dict) else None
0543 processed = inner_result.get('processed') if inner_result else None
0544
0545
0546 if state == 'done' or processed:
0547 slice_status = 'completed'
0548 else:
0549 slice_status = 'failed'
0550
0551
0552 update_data = {
0553 'status': slice_status,
0554 'processed_at': content.get('processed_at') or datetime.now().isoformat(),
0555 'metadata': {
0556 'worker_hostname': content.get('hostname'),
0557 'panda_task_id': content.get('panda_task_id'),
0558 'panda_id': content.get('panda_id'),
0559 'harvester_id': content.get('harvester_id'),
0560 'processing_start_at': content.get('processing_start_at'),
0561 'result': result
0562 }
0563 }
0564
0565
0566 run_id = message_data.get('run_id')
0567 try:
0568
0569 slices = self.call_monitor_api(
0570 'GET',
0571 f'/tf-slices/?run_number={run_id}&slice_id={slice_id}'
0572 )
0573
0574 if slices and isinstance(slices, list) and len(slices) > 0:
0575 db_id = slices[0].get('id')
0576 if db_id:
0577
0578 api_result = self.call_monitor_api(
0579 'PATCH',
0580 f'/tf-slices/{db_id}/',
0581 update_data
0582 )
0583 if api_result:
0584 self.logger.info(
0585 f"TFSlice updated: slice_id={slice_id}, tf_filename={tf_filename} -> {slice_status}",
0586 extra=self._log_extra(slice_id=slice_id, tf_filename=tf_filename, status=slice_status)
0587 )
0588 else:
0589 self.logger.warning(
0590 f"Failed to update TFSlice: slice_id={slice_id}",
0591 extra=self._log_extra(slice_id=slice_id)
0592 )
0593 else:
0594 self.logger.debug(
0595 f"TFSlice not found for slice_id: {slice_id}, run: {run_id}",
0596 extra=self._log_extra(slice_id=slice_id, run_id=run_id)
0597 )
0598 except Exception as e:
0599 self.logger.error(
0600 f"Error querying/updating TFSlice slice_id={slice_id}: {e}",
0601 extra=self._log_extra(slice_id=slice_id, error=str(e))
0602 )
0603
0604 except Exception as e:
0605 self.logger.error(
0606 f"Error updating TFSlice from result: {e}",
0607 extra=self._log_extra(error=str(e))
0608 )
0609
0610
0611 if __name__ == "__main__":
0612 import argparse
0613 from pathlib import Path
0614
0615 script_dir = Path(__file__).parent
0616
0617 parser = argparse.ArgumentParser(
0618 description="Fast Processing Agent - samples STFs and creates TF slices"
0619 )
0620 parser.add_argument("--debug", action="store_true", help="Enable debug logging")
0621 parser.add_argument("--testbed-config", default=None,
0622 help="Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)")
0623 args = parser.parse_args()
0624
0625 agent = FastProcessingAgent(debug=args.debug, config_path=args.testbed_config)
0626 agent.run()