Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:45

0001 #!/usr/bin/env python3
0002 """
0003 Utility functions for the Fast Monitor Agent.
0004 
0005 """
0006 
0007 import logging
0008 import hashlib
0009 import random
0010 import re
0011 from datetime import datetime, timedelta
0012 from typing import List, Dict, Any
0013 
0014 # File status constants (matching Django FileStatus choices)
0015 class FileStatus:
0016     REGISTERED = 'registered'
0017     PROCESSING = 'processing'
0018     PROCESSED = 'processed'
0019     FAILED = 'failed'
0020     DONE = 'done'
0021 
0022 
0023 def validate_config(config: dict) -> None:
0024     """Validate the configuration parameters for message-driven agent."""
0025     required_keys = [
0026         "selection_fraction",
0027     ]
0028 
0029     for key in required_keys:
0030         if key not in config:
0031             raise ValueError(f"Missing required configuration key: {key}")
0032 
0033     if not (0.0 <= config["selection_fraction"] <= 1.0):
0034         raise ValueError("selection_fraction must be between 0.0 and 1.0")
0035 
0036 
0037 
0038 
0039 
0040 
0041 def calculate_checksum(file_path: str, logger: logging.Logger) -> str:
0042     """
0043     Calculate MD5 checksum of file.
0044 
0045     Args:
0046         file_path: Path to the file as string
0047         logger: Logger instance
0048 
0049     Returns:
0050         MD5 checksum string
0051     """
0052     hash_md5 = hashlib.md5()
0053     try:
0054         with open(file_path, "rb") as f:
0055             for chunk in iter(lambda: f.read(4096), b""):
0056                 hash_md5.update(chunk)
0057         return hash_md5.hexdigest()
0058     except Exception as e:
0059         logger.error(f"Error calculating checksum for {file_path}: {e}")
0060         return ""
0061 
0062 
0063 
0064 
0065 
0066 
0067 
0068 
0069 def simulate_tf_subsamples(stf_file: Dict[str, Any], config: dict, logger: logging.Logger, agent_name: str) -> List[Dict[str, Any]]:
0070     """
0071     Simulate creation of Time Frame (TF) subsamples from a Super Time Frame (STF) file.
0072     
0073     Args:
0074         stf_file: STF data dictionary (follows the keys from daq agent)
0075         config: Configuration dictionary
0076         logger: Logger instance
0077         
0078     Returns:
0079         List of TF metadata dictionaries
0080     """
0081     try:
0082         tf_files_per_stf = config.get("tf_files_per_stf", 2)
0083         tf_size_fraction = config.get("tf_size_fraction", 0.15)
0084         tf_sequence_start = config.get("tf_sequence_start", 1)
0085         
0086         tf_subsamples = []
0087         stf_size = stf_file.get("size_bytes", 0)
0088         # filename without extension
0089         base_filename = stf_file.get("filename", "unknown").rsplit('.', 1)[0]
0090         
0091         for i in range(tf_files_per_stf):
0092             sequence_number = tf_sequence_start + i
0093             
0094             # Generate TF filename based on STF filename
0095             tf_filename = f"{base_filename}_tf_{sequence_number:03d}.tf"
0096             
0097             # Calculate TF file size as fraction of STF size with some gaussian randomness
0098             tf_size = int(stf_size * tf_size_fraction * random.gauss(1.0, 0.1))
0099             
0100             # Create TF metadata
0101             tf_metadata = {
0102                 "tf_filename": tf_filename,
0103                 "file_size_bytes": tf_size,
0104                 "sequence_number": sequence_number,
0105                 "stf_parent": stf_file.get("filename"),  # Use unique filename as parent identifier
0106                 "metadata": {
0107                     "simulation": True,
0108                     "created_from": stf_file.get('filename'),
0109                     "tf_size_fraction": tf_size_fraction,
0110                     "agent_name": agent_name,
0111                     "state": stf_file.get('state'),
0112                     "substate": stf_file.get('substate'),
0113                     "start": stf_file.get('start'),
0114                     "end": stf_file.get('end'),
0115                 }
0116             }
0117             
0118             tf_subsamples.append(tf_metadata)
0119 
0120         return tf_subsamples
0121 
0122     except Exception as e:
0123         logger.error(f"Unexpected error simulating TF subsamples: {e}")
0124         return []
0125 
0126 
0127 def record_tf_file(tf_metadata: Dict[str, Any], config: dict, agent, logger: logging.Logger) -> Dict[str, Any]:
0128     """
0129     Record a Time Frame (TF) file in the database using REST API.
0130     
0131     Args:
0132         tf_metadata: TF metadata dictionary from simulate_tf_subsamples
0133         config: Configuration dictionary
0134         agent: BaseAgent instance for API access
0135         logger: Logger instance
0136         
0137     Returns:
0138         FastMonFile data dictionary or None if failed
0139     """
0140     try:
0141         # Prepare FastMonFile data for API
0142         tf_file_data = {
0143             "stf_file": tf_metadata.get("stf_parent", None),  # STF filename as parent identifier
0144             "tf_filename": tf_metadata["tf_filename"],
0145             "file_size_bytes": tf_metadata["file_size_bytes"],
0146             "status": FileStatus.REGISTERED,
0147             "metadata": tf_metadata.get("metadata", {})
0148         }
0149         
0150         # Create TF file record via FastMonFile API
0151         tf_file = agent.call_monitor_api('post', '/fastmon-files/', tf_file_data)
0152         tf_file_id = tf_file.get('tf_file_id') or tf_file.get('id') or 'unknown'
0153         logger.debug(f"Recorded TF file: {tf_metadata['tf_filename']} -> {tf_file_id}")
0154         return tf_file
0155         
0156     except Exception as e:
0157         logger.error(f"Error recording TF file {tf_metadata['tf_filename']}: {e}")
0158         return {}
0159 
0160 
0161 def create_tf_message(tf_file: Dict[str, Any], stf_file: Dict[str, Any], agent_name: str) -> Dict[str, Any]:
0162     """
0163     Create a message for TF file registration notifications.
0164     
0165     Args:
0166         tf_file: TF file data from the FastMonFile API
0167         stf_file: Parent STF file data
0168         agent_name: Name of the agent sending the message
0169         
0170     Returns:
0171         Message dictionary ready for broadcasting
0172     """
0173     from datetime import datetime
0174     
0175     # namespace is also auto-injected by BaseAgent.send_message()
0176     message = {
0177         "msg_type": "tf_file_registered",
0178         "namespace": stf_file.get('namespace'),
0179         "run_id": stf_file.get('run_id'),
0180         "execution_id": stf_file.get('execution_id'),
0181         "processed_by": agent_name,
0182         "tf_file_id": tf_file.get('tf_file_id'),
0183         "tf_filename": tf_file.get('tf_filename'),
0184         "file_size_bytes": tf_file.get('file_size_bytes'),
0185         "stf_filename": stf_file.get('stf_filename') or stf_file.get('filename'),
0186         "status": tf_file.get('status'),
0187         "timestamp": datetime.now().isoformat(),
0188     }
0189     
0190     return message
0191 
0192 
0193 def create_status_message(agent_name: str, status: str, message_text: str, run_id: str = None) -> Dict[str, Any]:
0194     """
0195     Create a status message for agent notifications.
0196     
0197     Args:
0198         agent_name: Name of the agent sending the message
0199         status: Status of the operation (e.g., 'started', 'completed', 'error')
0200         message_text: Human-readable message describing the status
0201         run_id: Optional run identifier
0202         
0203     Returns:
0204         Message dictionary ready for broadcasting
0205     """
0206     from datetime import datetime
0207     
0208     message = {
0209         "msg_type": "fastmon_status",
0210         "processed_by": agent_name,
0211         "status": status,
0212         "message": message_text,
0213         "timestamp": datetime.now().isoformat()
0214     }
0215     
0216     if run_id:
0217         message["run_id"] = run_id
0218         
0219     return message