Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:18

0001 #!/usr/bin/env python
0002 #
0003 # Licensed under the Apache License, Version 2.0 (the "License");
0004 # You may not use this file except in compliance with the License.
0005 # You may obtain a copy of the License at
0006 # http://www.apache.org/licenses/LICENSE-2.0OA
0007 #
0008 # Authors:
0009 # - Wen Guan, <wen.guan@cern.ch>, 2025
0010 
0011 
0012 import datetime
0013 import email.utils
0014 
0015 from idds.common.utils import setup_logging
0016 
0017 
0018 setup_logging(__name__)
0019 
0020 
0021 def _parse_datetime(value):
0022     """Parse a datetime-like value robustly.
0023 
0024     Accepts datetime objects or strings in multiple common formats:
0025     - ISO 8601 (handled by datetime.fromisoformat)
0026     - RFC 2822 / HTTP-date (handled by email.utils.parsedate_to_datetime)
0027     - Falls back to trying a few common strptime patterns.
0028 
0029     Returns a datetime.datetime or raises ValueError on failure.
0030     """
0031     if value is None:
0032         raise ValueError("None value")
0033     if isinstance(value, datetime.datetime):
0034         return value
0035     if not isinstance(value, str):
0036         raise ValueError(f"Unsupported datetime value type: {type(value)}")
0037 
0038     # Try ISO format first
0039     try:
0040         return datetime.datetime.fromisoformat(value)
0041     except Exception:
0042         pass
0043 
0044     # Try RFC 2822 / HTTP-date formats
0045     try:
0046         dt = email.utils.parsedate_to_datetime(value)
0047         if dt is not None:
0048             # Normalize to UTC naive datetime for consistent subtraction
0049             if dt.tzinfo is not None:
0050                 dt = dt.astimezone(datetime.timezone.utc).replace(tzinfo=None)
0051             return dt
0052     except Exception:
0053         pass
0054 
0055     # Common fallbacks
0056     patterns = [
0057         "%a, %d %b %Y %H:%M:%S %Z",
0058         "%Y-%m-%d %H:%M:%S",
0059         "%Y-%m-%dT%H:%M:%S",
0060         "%Y-%m-%dT%H:%M:%S.%f",
0061     ]
0062     for p in patterns:
0063         try:
0064             dt = datetime.datetime.strptime(value, p)
0065             return dt
0066         except Exception:
0067             continue
0068 
0069     raise ValueError(f"Could not parse datetime: {value}")
0070 
0071 
0072 def slice_handler(header, msg, task_id=None, handler_kwargs={}, logger=None):
0073     """
0074     Handle slice messages based on prompt.md specifications.
0075 
0076     Message types:
0077     1. 'slice': Forward to transformer queue
0078        Format: {
0079          'msg_type': 'slice',
0080          'run_id': 20250914185722,
0081          'created_at': datetime.datetime.utcnow(),
0082          'content': {
0083            "run_id": 20250914185722,
0084            "state": "no_beam",
0085            "substate": "calib",
0086            "filename": "swf.20250914.185724.767135.no_beam.calib.stf",
0087            "start": "20250914185722420185",
0088            "end": "20250914185724767135",
0089            "checksum": "ad:3915264619",
0090            "size": 191,
0091            "msg_type": "stf_gen",
0092            "req_id": 1
0093          }
0094        }
0095 
0096     2. 'slice_result': Log the result from transformer
0097     Format: {
0098          'msg_type': 'slice_result',
0099          'run_id': 20250914185722,
0100          'created_at': datetime.datetime.utcnow(),
0101          'content': {
0102            'requested_at': <copied from slice's created_at>,
0103            'processing_start_at': <utctime>,
0104            'processed_at': <utctime>,
0105            'result': {'state': ..., 'attribute': 'value'}
0106          }
0107        }
0108 
0109     :param header: Message header (should contain 'run_id')
0110     :param msg: Message content
0111     :param task_id: Optional task ID
0112     :param transformer_publisher: Publisher instance to send messages to transformer
0113     :param timetolive: Time to live for messages in milliseconds
0114     """
0115     msg_type = msg.get("msg_type")
0116     run_id = msg.get("run_id")
0117     timetolive = handler_kwargs.get("timetolive", 12 * 3600 * 1000)
0118     transformer_publisher = handler_kwargs.get("transformer_publisher", None)
0119 
0120     try:
0121         if msg_type == "slice":
0122             # Forward slice message to transformer queue
0123             tf_header = {
0124                 "persistent": "true",
0125                 "ttl": timetolive,
0126                 "vo": "eic",
0127                 "msg_type": "slice",
0128                 "run_id": str(run_id),
0129             }
0130 
0131             # Forward the entire message to transformer
0132             if transformer_publisher:
0133                 transformer_publisher.publish(msg, headers=tf_header)
0134                 if logger:
0135                     logger.info(
0136                         f"Forwarded slice to transformer: run_id={run_id}, filename={msg.get('content', {}).get('filename')}"
0137                     )
0138             else:
0139                 if logger:
0140                     logger.warning(
0141                         f"No transformer_publisher available to forward slice: run_id={run_id}"
0142                     )
0143 
0144         elif msg_type == "slice_result":
0145             # Log the slice result with timing information
0146             content = msg.get("content", {})
0147             requested_at = content.get("requested_at")
0148             processing_start_at = content.get("processing_start_at")
0149             processed_at = content.get("processed_at")
0150             result = content.get("result", {})
0151 
0152             if logger:
0153                 logger.info(
0154                     f"Slice result: run_id={run_id}, requested_at={requested_at}, "
0155                     f"processing_start_at={processing_start_at}, processed_at={processed_at}, "
0156                     f"result={result}"
0157                 )
0158             # Calculate processing delays if timestamps are available
0159             if requested_at and processed_at:
0160                 try:
0161                     requested_dt = _parse_datetime(requested_at)
0162                     processed_dt = _parse_datetime(processed_at)
0163 
0164                     delay = (processed_dt - requested_dt).total_seconds()
0165                     if logger:
0166                         logger.info(
0167                             f"Slice processing delay: run_id={run_id}, delay={delay:.2f}s"
0168                         )
0169                 except Exception as ex:
0170                     if logger:
0171                         logger.debug(f"Could not calculate delay: {ex}")
0172         else:
0173             if logger:
0174                 logger.warning(
0175                     f"Unknown message type in slice_handler: {msg_type}, run_id={run_id}"
0176                 )
0177     except Exception as ex:
0178         if logger:
0179             logger.error(
0180                 f"Error in slice_handler for msg_type={msg_type}, run_id={run_id}: {ex}",
0181                 exc_info=True,
0182             )