File indexing completed on 2026-04-09 07:58:18
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012 import datetime
0013 import email.utils
0014
0015 from idds.common.utils import setup_logging
0016
0017
0018 setup_logging(__name__)
0019
0020
0021 def _parse_datetime(value):
0022 """Parse a datetime-like value robustly.
0023
0024 Accepts datetime objects or strings in multiple common formats:
0025 - ISO 8601 (handled by datetime.fromisoformat)
0026 - RFC 2822 / HTTP-date (handled by email.utils.parsedate_to_datetime)
0027 - Falls back to trying a few common strptime patterns.
0028
0029 Returns a datetime.datetime or raises ValueError on failure.
0030 """
0031 if value is None:
0032 raise ValueError("None value")
0033 if isinstance(value, datetime.datetime):
0034 return value
0035 if not isinstance(value, str):
0036 raise ValueError(f"Unsupported datetime value type: {type(value)}")
0037
0038
0039 try:
0040 return datetime.datetime.fromisoformat(value)
0041 except Exception:
0042 pass
0043
0044
0045 try:
0046 dt = email.utils.parsedate_to_datetime(value)
0047 if dt is not None:
0048
0049 if dt.tzinfo is not None:
0050 dt = dt.astimezone(datetime.timezone.utc).replace(tzinfo=None)
0051 return dt
0052 except Exception:
0053 pass
0054
0055
0056 patterns = [
0057 "%a, %d %b %Y %H:%M:%S %Z",
0058 "%Y-%m-%d %H:%M:%S",
0059 "%Y-%m-%dT%H:%M:%S",
0060 "%Y-%m-%dT%H:%M:%S.%f",
0061 ]
0062 for p in patterns:
0063 try:
0064 dt = datetime.datetime.strptime(value, p)
0065 return dt
0066 except Exception:
0067 continue
0068
0069 raise ValueError(f"Could not parse datetime: {value}")
0070
0071
0072 def slice_handler(header, msg, task_id=None, handler_kwargs={}, logger=None):
0073 """
0074 Handle slice messages based on prompt.md specifications.
0075
0076 Message types:
0077 1. 'slice': Forward to transformer queue
0078 Format: {
0079 'msg_type': 'slice',
0080 'run_id': 20250914185722,
0081 'created_at': datetime.datetime.utcnow(),
0082 'content': {
0083 "run_id": 20250914185722,
0084 "state": "no_beam",
0085 "substate": "calib",
0086 "filename": "swf.20250914.185724.767135.no_beam.calib.stf",
0087 "start": "20250914185722420185",
0088 "end": "20250914185724767135",
0089 "checksum": "ad:3915264619",
0090 "size": 191,
0091 "msg_type": "stf_gen",
0092 "req_id": 1
0093 }
0094 }
0095
0096 2. 'slice_result': Log the result from transformer
0097 Format: {
0098 'msg_type': 'slice_result',
0099 'run_id': 20250914185722,
0100 'created_at': datetime.datetime.utcnow(),
0101 'content': {
0102 'requested_at': <copied from slice's created_at>,
0103 'processing_start_at': <utctime>,
0104 'processed_at': <utctime>,
0105 'result': {'state': ..., 'attribute': 'value'}
0106 }
0107 }
0108
0109 :param header: Message header (should contain 'run_id')
0110 :param msg: Message content
0111 :param task_id: Optional task ID
0112 :param transformer_publisher: Publisher instance to send messages to transformer
0113 :param timetolive: Time to live for messages in milliseconds
0114 """
0115 msg_type = msg.get("msg_type")
0116 run_id = msg.get("run_id")
0117 timetolive = handler_kwargs.get("timetolive", 12 * 3600 * 1000)
0118 transformer_publisher = handler_kwargs.get("transformer_publisher", None)
0119
0120 try:
0121 if msg_type == "slice":
0122
0123 tf_header = {
0124 "persistent": "true",
0125 "ttl": timetolive,
0126 "vo": "eic",
0127 "msg_type": "slice",
0128 "run_id": str(run_id),
0129 }
0130
0131
0132 if transformer_publisher:
0133 transformer_publisher.publish(msg, headers=tf_header)
0134 if logger:
0135 logger.info(
0136 f"Forwarded slice to transformer: run_id={run_id}, filename={msg.get('content', {}).get('filename')}"
0137 )
0138 else:
0139 if logger:
0140 logger.warning(
0141 f"No transformer_publisher available to forward slice: run_id={run_id}"
0142 )
0143
0144 elif msg_type == "slice_result":
0145
0146 content = msg.get("content", {})
0147 requested_at = content.get("requested_at")
0148 processing_start_at = content.get("processing_start_at")
0149 processed_at = content.get("processed_at")
0150 result = content.get("result", {})
0151
0152 if logger:
0153 logger.info(
0154 f"Slice result: run_id={run_id}, requested_at={requested_at}, "
0155 f"processing_start_at={processing_start_at}, processed_at={processed_at}, "
0156 f"result={result}"
0157 )
0158
0159 if requested_at and processed_at:
0160 try:
0161 requested_dt = _parse_datetime(requested_at)
0162 processed_dt = _parse_datetime(processed_at)
0163
0164 delay = (processed_dt - requested_dt).total_seconds()
0165 if logger:
0166 logger.info(
0167 f"Slice processing delay: run_id={run_id}, delay={delay:.2f}s"
0168 )
0169 except Exception as ex:
0170 if logger:
0171 logger.debug(f"Could not calculate delay: {ex}")
0172 else:
0173 if logger:
0174 logger.warning(
0175 f"Unknown message type in slice_handler: {msg_type}, run_id={run_id}"
0176 )
0177 except Exception as ex:
0178 if logger:
0179 logger.error(
0180 f"Error in slice_handler for msg_type={msg_type}, run_id={run_id}: {ex}",
0181 exc_info=True,
0182 )