Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 """
0002 Fast Processing Agent: Creates TF slices from STF samples for PanDA workers.
0003 
0004 This agent:
0005 1. Receives tf_file_registered messages from FastMon Agent (via epictopic)
0006 2. Creates TF slices from STF samples
0007 3. Pushes TF slices to PanDA transformer queue (/queue/panda.transformer.slices)
0008 4. Maintains RunState and TFSlice records in the monitor database
0009 
0010 Pipeline: FastMon Agent [tf_file_registered] -> Fast Processing Agent [TF slices] -> PanDA Workers
0011 
0012 Message format specification: https://github.com/wguanicedew/iDDS/blob/dev/main/prompt.md
0013 """
0014 
0015 import uuid
0016 from datetime import datetime
0017 from swf_common_lib.base_agent import BaseAgent
0018 
0019 
0020 class FastProcessingAgent(BaseAgent):
0021     """
0022     Fast Processing Agent for TF slice creation and distribution.
0023 
0024     Subscribes to epictopic, receives tf_file_registered from FastMon,
0025     creates TF slices, and pushes them to the PanDA transformer queue.
0026     """
0027 
0028     # Queue for transformer workers (from Wen's iDDS design)
0029     TRANSFORMER_QUEUE = '/topic/panda.slices'
0030 
0031     # Queue for worker broadcasts
0032     WORKER_BROADCAST_TOPIC = '/topic/panda.workers'
0033 
0034     # Queue for transformer results
0035     TRANSFORMER_RESULTS_QUEUE = '/queue/panda.results.fastprocessing'
0036 
0037     def __init__(self, debug=False, config_path=None):
0038         super().__init__(
0039             agent_type='Fast_Processing',
0040             subscription_queues=['/topic/epictopic', self.TRANSFORMER_RESULTS_QUEUE],
0041             debug=debug,
0042             config_path=config_path
0043         )
0044 
0045         # Workflow parameters (populated on run_imminent)
0046         self.workflow_params = {}
0047 
0048         # Processing state
0049         self.tf_files_received = 0
0050         self.slices_created = 0
0051 
0052         # Statistics
0053         self.stats = {
0054             'tf_files_received': 0,
0055             'slices_created': 0,
0056             'slices_sent': 0,
0057             'results_received': 0,
0058             'results_done': 0,
0059             'results_failed': 0
0060         }
0061 
0062     def on_message(self, frame):
0063         """Handle incoming workflow messages."""
0064         message_data, msg_type = self.log_received_message(frame)
0065         if message_data is None:
0066             return
0067 
0068         # Extract run context from each message (agents may start mid-run)
0069         self._update_run_context(message_data)
0070 
0071         try:
0072             if msg_type == 'run_imminent':
0073                 self.handle_run_imminent(message_data)
0074             elif msg_type == 'start_run':
0075                 self.handle_start_run(message_data)
0076             elif msg_type == 'tf_file_registered':
0077                 self.handle_tf_file_registered(message_data)
0078             elif msg_type == 'pause_run':
0079                 self.handle_pause_run(message_data)
0080             elif msg_type == 'resume_run':
0081                 self.handle_resume_run(message_data)
0082             elif msg_type == 'end_run':
0083                 self.handle_end_run(message_data)
0084             elif msg_type == 'slice_result':
0085                 self.handle_slice_result(message_data)
0086             else:
0087                 self.logger.debug(f"Ignoring message type: {msg_type}")
0088         except Exception as e:
0089             self.logger.error(f"Error processing {msg_type}: {e}",
0090                               extra=self._log_extra(error=str(e)))
0091             import traceback
0092             self.logger.error(traceback.format_exc())
0093 
0094     def _update_run_context(self, message_data):
0095         """
0096         Update run context from message. Agents may start mid-run and miss run_imminent,
0097         so we extract run_id/execution_id from every message and fetch params if needed.
0098         """
0099         run_id = message_data.get('run_id')
0100         execution_id = message_data.get('execution_id')
0101 
0102         # Update current run context if provided
0103         if run_id and run_id != self.current_run_id:
0104             self.current_run_id = run_id
0105             # Reset stats for new run
0106             self.tf_files_received = 0
0107             self.slices_created = 0
0108             self.stats = {
0109                 'tf_files_received': 0,
0110                 'slices_created': 0,
0111                 'slices_sent': 0
0112             }
0113 
0114         if execution_id and execution_id != self.current_execution_id:
0115             self.current_execution_id = execution_id
0116             # Fetch workflow params if we don't have them
0117             if not self.workflow_params:
0118                 self.workflow_params = self._fetch_workflow_parameters(execution_id)
0119                 if self.workflow_params:
0120                     import json
0121                     self.logger.info(f"Workflow parameters loaded (mid-run): {json.dumps(self.workflow_params, indent=2, sort_keys=True)}")
0122 
0123     def handle_run_imminent(self, message_data):
0124         """Handle run_imminent message."""
0125         self.logger.info(
0126             f"Run imminent: execution_id={self.current_execution_id}, run_id={self.current_run_id}",
0127             extra=self._log_extra()
0128         )
0129 
0130         self._log_system_event('run_imminent', {
0131             'execution_id': self.current_execution_id,
0132             'target_worker_count': self.workflow_params.get("fast_processing", {}).get('target_worker_count', 0),
0133             'stf_sampling_rate': self.workflow_params.get("fast_processing", {}).get('stf_sampling_rate', 0),
0134             'slices_per_sample': self.workflow_params.get("fast_processing", {}).get('slices_per_sample', 0)
0135         })
0136 
0137         # Build and broadcast a run_imminent message to workers
0138         try:
0139             # Compose message similar to _send_slice_to_queue format.
0140             # Put the incoming message_data inside 'content' and add execution_id
0141             # and target_worker_count so workers know how many to spin up.
0142             content = dict(message_data or {})
0143             content.update({
0144                 'execution_id': self.current_execution_id,
0145                 'target_worker_count': self.workflow_params.get("fast_processing", {}).get('target_worker_count', 1),
0146                 'slice_processing_time': self.workflow_params.get("fast_processing", {}).get('slice_processing_time', 1),
0147                 'worker_rampup_time': self.workflow_params.get("fast_processing", {}).get('worker_rampup_time', 1),
0148                 'worker_rampdown_time': self.workflow_params.get("fast_processing", {}).get('worker_rampdown_time', 1)
0149             })
0150 
0151             message = {
0152                 'msg_type': 'run_imminent',
0153                 'run_id': self.current_run_id,
0154                 'created_at': datetime.utcnow().isoformat(),
0155                 'content': content
0156             }
0157 
0158             # Topic for worker broadcasts
0159             worker_topic = self.WORKER_BROADCAST_TOPIC
0160 
0161             headers = {'persistent': 'false'}
0162             self.send_message(worker_topic, message, headers=headers)
0163 
0164             self.logger.info(f"Broadcasted run_imminent to workers: {worker_topic}",
0165                              extra=self._log_extra(destination=worker_topic))
0166         except Exception as e:
0167             self.logger.error(f"Failed to broadcast run_imminent to workers: {e}",
0168                               extra=self._log_extra(error=str(e)))
0169 
0170     def handle_start_run(self, message_data):
0171         """Handle start_run: Update RunState phase to 'physics'."""
0172         self.logger.info(f"Run started: run_id={self.current_run_id}",
0173                          extra=self._log_extra())
0174 
0175         # Agent is now actively processing this run
0176         self.set_processing()
0177 
0178         self._update_run_state(phase='physics', state='running', substate='physics')
0179 
0180         self._log_system_event('start_run', {
0181             'execution_id': self.current_execution_id
0182         })
0183 
0184     def handle_tf_file_registered(self, message_data):
0185         """
0186         Handle tf_file_registered from FastMon: Create TF slices, push to worker queue.
0187         """
0188         tf_filename = message_data.get('tf_filename')
0189         stf_filename = message_data.get('stf_filename')
0190 
0191         self.stats['tf_files_received'] += 1
0192         self.tf_files_received += 1
0193 
0194         self.logger.info(f"TF file registered: {tf_filename} (from STF: {stf_filename})",
0195                          extra=self._log_extra(tf_filename=tf_filename, stf_filename=stf_filename))
0196 
0197         # Get slices_per_sample from workflow params
0198         fast_processing = self.workflow_params.get('fast_processing', {})
0199         slices_per_sample = fast_processing.get('slices_per_sample', 15)
0200 
0201         # Create TF slices from this STF sample
0202         slices = self._create_tf_slices(stf_filename, slices_per_sample)
0203 
0204         # Push each slice to transformer queue
0205         for slice_data in slices:
0206             self._send_slice_to_queue(slice_data)
0207 
0208         # Update RunState with slice counts
0209         self._update_run_state_slices(len(slices))
0210 
0211         # Log event
0212         self._log_system_event('tf_file_processed', {
0213             'tf_filename': tf_filename,
0214             'stf_filename': stf_filename,
0215             'slices_created': len(slices)
0216         })
0217 
0218     def handle_pause_run(self, message_data):
0219         """Handle pause_run: Update RunState to standby."""
0220         self.logger.info(f"Run paused: run_id={self.current_run_id}",
0221                          extra=self._log_extra())
0222 
0223         self._update_run_state(substate='standby')
0224 
0225         self._log_system_event('pause_run', {
0226             'execution_id': self.current_execution_id
0227         })
0228 
0229     def handle_resume_run(self, message_data):
0230         """Handle resume_run: Update RunState back to physics."""
0231         self.logger.info(f"Run resumed: run_id={self.current_run_id}",
0232                          extra=self._log_extra())
0233 
0234         self._update_run_state(substate='physics')
0235 
0236         self._log_system_event('resume_run', {
0237             'execution_id': self.current_execution_id
0238         })
0239 
0240     def handle_end_run(self, message_data):
0241         """Handle end_run: Update RunState to completed."""
0242         total_stf = message_data.get('total_stf_files', 0)
0243 
0244         self.logger.info(
0245             f"Run ended: run_id={self.current_run_id}, "
0246             f"tf_files_received={self.stats['tf_files_received']}, "
0247             f"slices_created={self.stats['slices_created']}",
0248             extra=self._log_extra(total_stf=total_stf,
0249                                   tf_files_received=self.stats['tf_files_received'],
0250                                   slices_created=self.stats['slices_created'])
0251         )
0252 
0253         self._update_run_state(phase='completed', state='ended', substate=None)
0254 
0255         self._log_system_event('end_run', {
0256             'execution_id': self.current_execution_id,
0257             'total_tf_files_received': self.stats['tf_files_received'],
0258             'total_slices_created': self.stats['slices_created'],
0259             'total_slices_sent': self.stats['slices_sent']
0260         })
0261 
0262         # Broadcast end_run to workers so they can perform any teardown/cleanup
0263         try:
0264             # Compose message similar to _send_slice_to_queue format.
0265             # Put the incoming message_data inside 'content' and add execution_id
0266             # and target_worker_count so workers can finalize appropriately.
0267             content = dict(message_data or {})
0268             content.update({
0269                 'execution_id': self.current_execution_id
0270             })
0271 
0272             message = {
0273                 'msg_type': 'end_run',
0274                 'run_id': self.current_run_id,
0275                 'created_at': datetime.utcnow().isoformat(),
0276                 'content': content
0277             }
0278 
0279             worker_topic = self.WORKER_BROADCAST_TOPIC
0280 
0281             headers = {'persistent': 'false'}
0282             self.send_message(worker_topic, message, headers=headers)
0283 
0284             self.logger.info(f"Broadcasted end_run to workers: {worker_topic}",
0285                              extra=self._log_extra(destination=worker_topic))
0286         except Exception as e:
0287             self.logger.error(f"Failed to broadcast end_run to workers: {e}",
0288                               extra=self._log_extra(error=str(e)))
0289 
0290         # Clear current run state
0291         self.current_run_id = None
0292         self.current_execution_id = None
0293         self.workflow_params = {}
0294 
0295         # Agent is now idle, waiting for next run
0296         self.set_ready()
0297 
0298     def handle_slice_result(self, message_data):
0299         """Process slice_result messages from transformer workers."""
0300         self.stats['results_received'] += 1
0301 
0302         content = message_data.get('content', {})
0303         result = content.get('result') if isinstance(content, dict) else None
0304 
0305         self.logger.info(
0306             f"Slice result received: run={message_data.get('run_id')}, "
0307             f"state={content.get('state') if isinstance(content, dict) else 'unknown'}",
0308             extra=self._log_extra(run_id=message_data.get('run_id'))
0309         )
0310 
0311         # Track done/failed counts if result payload present
0312         try:
0313             inner_result = None
0314             if result and isinstance(result, dict):
0315                 inner_result = result.get('result') if isinstance(result.get('result'), dict) else None
0316 
0317             state = content.get('state') or (inner_result.get('state') if inner_result else None)
0318             if state == 'done' or (inner_result and inner_result.get('processed')):
0319                 self.stats['results_done'] += 1
0320             else:
0321                 self.stats['results_failed'] += 1
0322         except Exception:
0323             pass
0324 
0325         # Update TFSlice record in database
0326         self._update_tfslice_from_result(message_data, content, result)
0327 
0328         # Log system event for observability
0329         self._log_system_event('slice_result', {
0330             'message': message_data,
0331             'state': content.get('state') if isinstance(content, dict) else None,
0332             'results_received': self.stats['results_received'],
0333             'results_done': self.stats['results_done'],
0334             'results_failed': self.stats['results_failed']
0335         })
0336 
0337         self.logger.info(f"Handled slice_result: run={message_data.get('run_id')}, msg={message_data.get('msg_type')}",
0338                          extra=self._log_extra(run_id=message_data.get('run_id')))
0339 
0340     # -------------------------------------------------------------------------
0341     # Helper methods
0342     # -------------------------------------------------------------------------
0343 
0344     def _fetch_workflow_parameters(self, execution_id):
0345         """Fetch workflow parameters from WorkflowExecution API."""
0346         try:
0347             result = self.call_monitor_api(
0348                 'GET',
0349                 f'/workflow-executions/{execution_id}/'
0350             )
0351             if result:
0352                 return result.get('parameter_values', {})
0353             return {}
0354         except Exception as e:
0355             self.logger.error(f"Failed to fetch workflow parameters: {e}",
0356                               extra=self._log_extra(error=str(e)))
0357             return {}
0358 
0359     def _update_run_state(self, phase=None, state=None, substate=None):
0360         """Update RunState record."""
0361         update_data = {
0362             'state_changed_at': datetime.now().isoformat()
0363         }
0364         if phase is not None:
0365             update_data['phase'] = phase
0366         if state is not None:
0367             update_data['state'] = state
0368         if substate is not None:
0369             update_data['substate'] = substate
0370 
0371         try:
0372             result = self.call_monitor_api(
0373                 'PATCH',
0374                 f'/run-states/{self.current_run_id}/',
0375                 update_data
0376             )
0377             if result:
0378                 self.logger.debug(f"RunState updated: {update_data}", extra=self._log_extra())
0379         except Exception as e:
0380             self.logger.error(f"Error updating RunState: {e}",
0381                               extra=self._log_extra(error=str(e)))
0382 
0383     def _update_run_state_slices(self, new_slices_count):
0384         """Update RunState with new slice counts."""
0385         # We need to increment, so fetch current values first
0386         try:
0387             current = self.call_monitor_api('GET', f'/run-states/{self.current_run_id}/')
0388             if current:
0389                 update_data = {
0390                     'stf_samples_received': current.get('stf_samples_received', 0) + 1,
0391                     'slices_created': current.get('slices_created', 0) + new_slices_count,
0392                     'slices_queued': current.get('slices_queued', 0) + new_slices_count,
0393                     'state_changed_at': datetime.now().isoformat()
0394                 }
0395                 self.call_monitor_api(
0396                     'PATCH',
0397                     f'/run-states/{self.current_run_id}/',
0398                     update_data
0399                 )
0400         except Exception as e:
0401             self.logger.error(f"Error updating RunState slices: {e}",
0402                               extra=self._log_extra(error=str(e)))
0403 
0404     def _create_tf_slices(self, stf_filename, num_slices):
0405         """
0406         Create TF slice records in database.
0407 
0408         Returns list of slice data dictionaries for sending to queue.
0409         """
0410         slices = []
0411 
0412         # Assume ~1000 TFs per STF, divide into num_slices
0413         tfs_per_stf = 1000
0414         tfs_per_slice = tfs_per_stf // num_slices
0415 
0416         for i in range(num_slices):
0417             tf_first = i * tfs_per_slice
0418             tf_last = (i + 1) * tfs_per_slice - 1 if i < num_slices - 1 else tfs_per_stf - 1
0419             tf_count = tf_last - tf_first + 1
0420 
0421             # Generate TF filename for this slice
0422             tf_filename = f"{stf_filename.replace('.stf', '')}_slice_{i:03d}.tf"
0423 
0424             slice_data = {
0425                 'slice_id': i,
0426                 'tf_first': tf_first,
0427                 'tf_last': tf_last,
0428                 'tf_count': tf_count,
0429                 'tf_filename': tf_filename,
0430                 'stf_filename': stf_filename,
0431                 'run_number': self.current_run_id,
0432                 'status': 'queued',
0433                 'retries': 0,
0434                 'metadata': {
0435                     'execution_id': self.current_execution_id,
0436                     'created_by': self.agent_name
0437                 }
0438             }
0439 
0440             # Create in database
0441             try:
0442                 result = self.call_monitor_api('POST', '/tf-slices/', slice_data)
0443                 if result:
0444                     self.stats['slices_created'] += 1
0445                     self.slices_created += 1
0446                     # Add database ID to slice data for queue message
0447                     slice_data['db_id'] = result.get('id')
0448                     slices.append(slice_data)
0449                     self.logger.debug(f"TFSlice created: {tf_filename}",
0450                                       extra=self._log_extra(tf_filename=tf_filename))
0451                 else:
0452                     self.logger.warning(f"Failed to create TFSlice: {tf_filename}",
0453                                         extra=self._log_extra(tf_filename=tf_filename))
0454             except Exception as e:
0455                 self.logger.error(f"Error creating TFSlice {tf_filename}: {e}",
0456                                   extra=self._log_extra(tf_filename=tf_filename, error=str(e)))
0457 
0458         return slices
0459 
0460     def _send_slice_to_queue(self, slice_data):
0461         """
0462         Send slice message to transformer queue.
0463 
0464         Message format per Wen's iDDS design.
0465         """
0466         # Build message per iDDS format
0467         message = {
0468             'msg_type': 'slice',
0469             'run_id': self.current_run_id,
0470             'created_at': datetime.utcnow().isoformat(),
0471             'content': {
0472                 'run_id': self.current_run_id,
0473                 'execution_id': self.current_execution_id,
0474                 'req_id': str(uuid.uuid4()),
0475                 'filename': slice_data['stf_filename'],
0476                 'tf_filename': slice_data['tf_filename'],
0477                 'slice_id': slice_data['slice_id'],
0478                 'start': slice_data['tf_first'],
0479                 'end': slice_data['tf_last'],
0480                 'tf_count': slice_data['tf_count'],
0481                 'state': 'queued',
0482                 'substate': 'new'
0483             }
0484         }
0485 
0486         # Send to transformer queue with required headers
0487         try:
0488             # Use send_message with persistent=True and ttl for slice messages
0489             headers = {
0490                 'persistent': 'true',
0491                 'ttl': str(12 * 3600 * 1000)  # 12 hours in ms
0492             }
0493             self.send_message(self.TRANSFORMER_QUEUE, message, headers=headers)
0494 
0495             self.stats['slices_sent'] += 1
0496             self.logger.info(
0497                 f"Slice sent to queue: {slice_data['tf_filename']} -> {self.TRANSFORMER_QUEUE}",
0498                 extra=self._log_extra(tf_filename=slice_data['tf_filename'], destination=self.TRANSFORMER_QUEUE)
0499             )
0500         except Exception as e:
0501             self.logger.error(f"Failed to send slice to queue: {e}",
0502                               extra=self._log_extra(error=str(e)))
0503 
0504     def _log_system_event(self, event_type, event_data):
0505         """Log event to SystemStateEvent table."""
0506         event = {
0507             'timestamp': datetime.now().isoformat(),
0508             'run_number': self.current_run_id,
0509             'event_type': event_type,
0510             'state': self.workflow_params.get('state', 'unknown'),
0511             'substate': self.workflow_params.get('substate'),
0512             'event_data': event_data
0513         }
0514 
0515         try:
0516             self.call_monitor_api('POST', '/system-state-events/', event)
0517         except Exception as e:
0518             self.logger.debug(f"Failed to log system event: {e}",
0519                               extra=self._log_extra(event_type=event_type, error=str(e)))
0520 
0521     def _update_tfslice_from_result(self, message_data, content, result):
0522         """Update TFSlice record in database based on slice_result message."""
0523         try:
0524             # Extract slice information from the result
0525             # The result structure is: content -> result -> result (nested)
0526             inner_result = None
0527             if result and isinstance(result, dict):
0528                 inner_result = result.get('result') if isinstance(result.get('result'), dict) else None
0529 
0530             # Get slice_id directly from the result data
0531             slice_id = None
0532             tf_filename = None
0533             if inner_result and isinstance(inner_result, dict):
0534                 slice_id = inner_result.get('slice_id')
0535                 tf_filename = inner_result.get('tf_filename')
0536 
0537             if slice_id is None:
0538                 self.logger.debug("No slice_id in result, cannot update TFSlice record")
0539                 return
0540 
0541             # Determine the final state
0542             state = content.get('state') if isinstance(content, dict) else None
0543             processed = inner_result.get('processed') if inner_result else None
0544 
0545             # Map worker state to slice status
0546             if state == 'done' or processed:
0547                 slice_status = 'completed'
0548             else:
0549                 slice_status = 'failed'
0550 
0551             # Build update payload
0552             update_data = {
0553                 'status': slice_status,
0554                 'processed_at': content.get('processed_at') or datetime.now().isoformat(),
0555                 'metadata': {
0556                     'worker_hostname': content.get('hostname'),
0557                     'panda_task_id': content.get('panda_task_id'),
0558                     'panda_id': content.get('panda_id'),
0559                     'harvester_id': content.get('harvester_id'),
0560                     'processing_start_at': content.get('processing_start_at'),
0561                     'result': result
0562                 }
0563             }
0564 
0565             # Update the slice directly using slice_id from the message
0566             run_id = message_data.get('run_id')
0567             try:
0568                 # Query for the slice by run_id and slice_id to get the database ID
0569                 slices = self.call_monitor_api(
0570                     'GET',
0571                     f'/tf-slices/?run_number={run_id}&slice_id={slice_id}'
0572                 )
0573 
0574                 if slices and isinstance(slices, list) and len(slices) > 0:
0575                     db_id = slices[0].get('id')
0576                     if db_id:
0577                         # Update the slice using database ID
0578                         api_result = self.call_monitor_api(
0579                             'PATCH',
0580                             f'/tf-slices/{db_id}/',
0581                             update_data
0582                         )
0583                         if api_result:
0584                             self.logger.info(
0585                                 f"TFSlice updated: slice_id={slice_id}, tf_filename={tf_filename} -> {slice_status}",
0586                                 extra=self._log_extra(slice_id=slice_id, tf_filename=tf_filename, status=slice_status)
0587                             )
0588                         else:
0589                             self.logger.warning(
0590                                 f"Failed to update TFSlice: slice_id={slice_id}",
0591                                 extra=self._log_extra(slice_id=slice_id)
0592                             )
0593                 else:
0594                     self.logger.debug(
0595                         f"TFSlice not found for slice_id: {slice_id}, run: {run_id}",
0596                         extra=self._log_extra(slice_id=slice_id, run_id=run_id)
0597                     )
0598             except Exception as e:
0599                 self.logger.error(
0600                     f"Error querying/updating TFSlice slice_id={slice_id}: {e}",
0601                     extra=self._log_extra(slice_id=slice_id, error=str(e))
0602                 )
0603 
0604         except Exception as e:
0605             self.logger.error(
0606                 f"Error updating TFSlice from result: {e}",
0607                 extra=self._log_extra(error=str(e))
0608             )
0609 
0610 
0611 if __name__ == "__main__":
0612     import argparse
0613     from pathlib import Path
0614 
0615     script_dir = Path(__file__).parent
0616 
0617     parser = argparse.ArgumentParser(
0618         description="Fast Processing Agent - samples STFs and creates TF slices"
0619     )
0620     parser.add_argument("--debug", action="store_true", help="Enable debug logging")
0621     parser.add_argument("--testbed-config", default=None,
0622                         help="Testbed config file (default: SWF_TESTBED_CONFIG env var or workflows/testbed.toml)")
0623     args = parser.parse_args()
0624 
0625     agent = FastProcessingAgent(debug=args.debug, config_path=args.testbed_config)
0626     agent.run()