File indexing completed on 2026-04-27 07:41:45
0001
0002 """
0003 Utility functions for the Fast Monitor Agent.
0004
0005 """
0006
0007 import logging
0008 import hashlib
0009 import random
0010 import re
0011 from datetime import datetime, timedelta
0012 from typing import List, Dict, Any
0013
0014
0015 class FileStatus:
0016 REGISTERED = 'registered'
0017 PROCESSING = 'processing'
0018 PROCESSED = 'processed'
0019 FAILED = 'failed'
0020 DONE = 'done'
0021
0022
0023 def validate_config(config: dict) -> None:
0024 """Validate the configuration parameters for message-driven agent."""
0025 required_keys = [
0026 "selection_fraction",
0027 ]
0028
0029 for key in required_keys:
0030 if key not in config:
0031 raise ValueError(f"Missing required configuration key: {key}")
0032
0033 if not (0.0 <= config["selection_fraction"] <= 1.0):
0034 raise ValueError("selection_fraction must be between 0.0 and 1.0")
0035
0036
0037
0038
0039
0040
0041 def calculate_checksum(file_path: str, logger: logging.Logger) -> str:
0042 """
0043 Calculate MD5 checksum of file.
0044
0045 Args:
0046 file_path: Path to the file as string
0047 logger: Logger instance
0048
0049 Returns:
0050 MD5 checksum string
0051 """
0052 hash_md5 = hashlib.md5()
0053 try:
0054 with open(file_path, "rb") as f:
0055 for chunk in iter(lambda: f.read(4096), b""):
0056 hash_md5.update(chunk)
0057 return hash_md5.hexdigest()
0058 except Exception as e:
0059 logger.error(f"Error calculating checksum for {file_path}: {e}")
0060 return ""
0061
0062
0063
0064
0065
0066
0067
0068
0069 def simulate_tf_subsamples(stf_file: Dict[str, Any], config: dict, logger: logging.Logger, agent_name: str) -> List[Dict[str, Any]]:
0070 """
0071 Simulate creation of Time Frame (TF) subsamples from a Super Time Frame (STF) file.
0072
0073 Args:
0074 stf_file: STF data dictionary (follows the keys from daq agent)
0075 config: Configuration dictionary
0076 logger: Logger instance
0077
0078 Returns:
0079 List of TF metadata dictionaries
0080 """
0081 try:
0082 tf_files_per_stf = config.get("tf_files_per_stf", 2)
0083 tf_size_fraction = config.get("tf_size_fraction", 0.15)
0084 tf_sequence_start = config.get("tf_sequence_start", 1)
0085
0086 tf_subsamples = []
0087 stf_size = stf_file.get("size_bytes", 0)
0088
0089 base_filename = stf_file.get("filename", "unknown").rsplit('.', 1)[0]
0090
0091 for i in range(tf_files_per_stf):
0092 sequence_number = tf_sequence_start + i
0093
0094
0095 tf_filename = f"{base_filename}_tf_{sequence_number:03d}.tf"
0096
0097
0098 tf_size = int(stf_size * tf_size_fraction * random.gauss(1.0, 0.1))
0099
0100
0101 tf_metadata = {
0102 "tf_filename": tf_filename,
0103 "file_size_bytes": tf_size,
0104 "sequence_number": sequence_number,
0105 "stf_parent": stf_file.get("filename"),
0106 "metadata": {
0107 "simulation": True,
0108 "created_from": stf_file.get('filename'),
0109 "tf_size_fraction": tf_size_fraction,
0110 "agent_name": agent_name,
0111 "state": stf_file.get('state'),
0112 "substate": stf_file.get('substate'),
0113 "start": stf_file.get('start'),
0114 "end": stf_file.get('end'),
0115 }
0116 }
0117
0118 tf_subsamples.append(tf_metadata)
0119
0120 return tf_subsamples
0121
0122 except Exception as e:
0123 logger.error(f"Unexpected error simulating TF subsamples: {e}")
0124 return []
0125
0126
0127 def record_tf_file(tf_metadata: Dict[str, Any], config: dict, agent, logger: logging.Logger) -> Dict[str, Any]:
0128 """
0129 Record a Time Frame (TF) file in the database using REST API.
0130
0131 Args:
0132 tf_metadata: TF metadata dictionary from simulate_tf_subsamples
0133 config: Configuration dictionary
0134 agent: BaseAgent instance for API access
0135 logger: Logger instance
0136
0137 Returns:
0138 FastMonFile data dictionary or None if failed
0139 """
0140 try:
0141
0142 tf_file_data = {
0143 "stf_file": tf_metadata.get("stf_parent", None),
0144 "tf_filename": tf_metadata["tf_filename"],
0145 "file_size_bytes": tf_metadata["file_size_bytes"],
0146 "status": FileStatus.REGISTERED,
0147 "metadata": tf_metadata.get("metadata", {})
0148 }
0149
0150
0151 tf_file = agent.call_monitor_api('post', '/fastmon-files/', tf_file_data)
0152 tf_file_id = tf_file.get('tf_file_id') or tf_file.get('id') or 'unknown'
0153 logger.debug(f"Recorded TF file: {tf_metadata['tf_filename']} -> {tf_file_id}")
0154 return tf_file
0155
0156 except Exception as e:
0157 logger.error(f"Error recording TF file {tf_metadata['tf_filename']}: {e}")
0158 return {}
0159
0160
0161 def create_tf_message(tf_file: Dict[str, Any], stf_file: Dict[str, Any], agent_name: str) -> Dict[str, Any]:
0162 """
0163 Create a message for TF file registration notifications.
0164
0165 Args:
0166 tf_file: TF file data from the FastMonFile API
0167 stf_file: Parent STF file data
0168 agent_name: Name of the agent sending the message
0169
0170 Returns:
0171 Message dictionary ready for broadcasting
0172 """
0173 from datetime import datetime
0174
0175
0176 message = {
0177 "msg_type": "tf_file_registered",
0178 "namespace": stf_file.get('namespace'),
0179 "run_id": stf_file.get('run_id'),
0180 "execution_id": stf_file.get('execution_id'),
0181 "processed_by": agent_name,
0182 "tf_file_id": tf_file.get('tf_file_id'),
0183 "tf_filename": tf_file.get('tf_filename'),
0184 "file_size_bytes": tf_file.get('file_size_bytes'),
0185 "stf_filename": stf_file.get('stf_filename') or stf_file.get('filename'),
0186 "status": tf_file.get('status'),
0187 "timestamp": datetime.now().isoformat(),
0188 }
0189
0190 return message
0191
0192
0193 def create_status_message(agent_name: str, status: str, message_text: str, run_id: str = None) -> Dict[str, Any]:
0194 """
0195 Create a status message for agent notifications.
0196
0197 Args:
0198 agent_name: Name of the agent sending the message
0199 status: Status of the operation (e.g., 'started', 'completed', 'error')
0200 message_text: Human-readable message describing the status
0201 run_id: Optional run identifier
0202
0203 Returns:
0204 Message dictionary ready for broadcasting
0205 """
0206 from datetime import datetime
0207
0208 message = {
0209 "msg_type": "fastmon_status",
0210 "processed_by": agent_name,
0211 "status": status,
0212 "message": message_text,
0213 "timestamp": datetime.now().isoformat()
0214 }
0215
0216 if run_id:
0217 message["run_id"] = run_id
0218
0219 return message