Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-09 07:58:22

0001 #!/usr/bin/env python
0002 """Small, deterministic unit tests for prompt message construction.
0003 
0004 The original test file attempted to exercise live brokers and contained
0005 syntax errors. These tests focus on the pure construction/contract of
0006 messages so they remain reliable in CI without requiring an ActiveMQ
0007 instance.
0008 """
0009 
0010 import datetime
0011 import logging
0012 import time
0013 
0014 from idds.common.constants import Sections
0015 from idds.common.config import config_get
0016 from idds.common.utils import setup_logging, json_loads
0017 
0018 # Delay importing Publisher until runtime to keep tests independent of broker packages
0019 
0020 from idds.prompt.brokers.activemq import Publisher
0021 
0022 setup_logging(__name__)
0023 logger = logging.getLogger(__name__)
0024 
0025 # We only use the configured instance/namespace and broker strings (if present).
0026 namespace = config_get(Sections.Prompt, "namespace")
0027 # timetolive may be missing in some test environments; fall back to the default
0028 try:
0029     timetolive = int(config_get(Sections.Prompt, "timetolive"))
0030 except Exception:
0031     timetolive = 12 * 3600 * 1000
0032 worker_publisher_broker = json_loads(config_get(Sections.Prompt, "worker_publisher_broker"))
0033 slice_publisher_broker = json_loads(config_get(Sections.Prompt, "slice_publisher_broker"))
0034 
0035 
0036 def create_start_messages(namespace, run_id):
0037     """Return a well-formed run_imminent message dict."""
0038     headers = {
0039         "persistent": "true",
0040         "ttl": timetolive,  # default: 12 * 3600 * 1000 (milliseconds)
0041         "vo": "eic",
0042         "namespace": namespace,  # e.g. 'prod', 'dev1', 'dev_<userid>'
0043         "msg_type": "run_imminent",
0044         "run_id": run_id,
0045     }
0046     msg = {
0047         "namespace": namespace,
0048         "msg_type": "run_imminent",
0049         "run_id": run_id,
0050         "created_at": datetime.datetime.utcnow(),
0051         "content": {
0052             "num_workers": 2,
0053             "num_cores_per_worker": 10,
0054             "num_ram_per_core": 4000.0,
0055             "msg_type": "start_run",
0056             "req_id": 1,
0057             "ts": "20250914185722",
0058         },
0059     }
0060     return msg, headers
0061 
0062 
0063 def create_slice_messages(namespace, run_id):
0064     """Return a well-formed slice message dict."""
0065     headers = {
0066         "persistent": "true",
0067         "ttl": timetolive,  # default: 12 * 3600 * 1000 (milliseconds)
0068         "vo": "eic",
0069         "namespace": namespace,  # e.g. 'prod', 'dev1', 'dev_<userid>'
0070         "msg_type": "slice",
0071         "run_id": run_id,
0072     }
0073     msg = {
0074         "namespace": namespace,
0075         "msg_type": "slice",
0076         "run_id": run_id,
0077         "created_at": datetime.datetime.utcnow(),
0078         "content": {
0079             "run_id": run_id,
0080             "state": "no_beam",
0081             "substate": "calib",
0082             "filename": "swf.20250914.185724.767135.no_beam.calib.stf",
0083             "start": "20250914185722420185",
0084             "end": "20250914185724767135",
0085             "checksum": "ad:3915264619",
0086             "size": 191,
0087             "msg_type": "stf_gen",
0088             "req_id": 1,
0089         },
0090     }
0091     return msg, headers
0092 
0093 
0094 def create_stop_messages(namespace, run_id):
0095     """Return a well-formed run_stop message dict."""
0096     headers = {
0097         "persistent": "true",
0098         "ttl": timetolive,  # default: 12 * 3600 * 1000 (milliseconds)
0099         "vo": "eic",
0100         "namespace": namespace,  # e.g. 'prod', 'dev1', 'dev_<userid>'
0101         "msg_type": "run_stop",
0102         "run_id": run_id,
0103     }
0104     msg = {
0105         "namespace": namespace,
0106         "msg_type": "run_stop",
0107         "run_id": run_id,
0108         "created_at": datetime.datetime.utcnow(),
0109         "content": {"req_id": 1, "run_id": run_id, "ts": "20250914185722"},
0110     }
0111     return msg, headers
0112 
0113 
0114 def validate_message_basics(msg):
0115     assert isinstance(msg, dict)
0116     for k in ("namespace", "msg_type", "run_id", "created_at", "content"):
0117         assert k in msg
0118 
0119 
0120 def test_message_construction():
0121     run_id = int(time.time())
0122     start, start_headers = create_start_messages(namespace, run_id)
0123     slice_m, slice_headers = create_slice_messages(namespace, run_id)
0124     stop, stop_headers = create_stop_messages(namespace, run_id)
0125 
0126     validate_message_basics(start)
0127     validate_message_basics(slice_m)
0128     validate_message_basics(stop)
0129 
0130     assert start["msg_type"] == "run_imminent"
0131     assert slice_m["msg_type"] == "slice"
0132     assert stop["msg_type"] == "run_stop"
0133 
0134 
0135 def run():
0136     run_id = int(time.time())
0137     start, start_headers = create_start_messages(namespace, run_id)
0138     slice_m, slice_headers = create_slice_messages(namespace, run_id)
0139     stop, stop_headers = create_stop_messages(namespace, run_id)
0140 
0141     worker_publisher = Publisher(
0142         name="WorkerPublisher",
0143         namespace=namespace,
0144         broker=worker_publisher_broker,
0145         broadcast=True,
0146         logger=logger,
0147     )
0148     slice_publisher = Publisher(
0149         name="SlicePublisher",
0150         namespace=namespace,
0151         broker=slice_publisher_broker,
0152         broadcast=True,
0153         logger=logger,
0154     )
0155 
0156     worker_publisher.publish(start, headers=start_headers)
0157     logger.info(f"Published start message: {start} with headers: {start_headers}")
0158     time.sleep(10)
0159     slice_publisher.publish(slice_m, headers=slice_headers)
0160     logger.info(f"Published slice message: {slice_m} with headers: {slice_headers}")
0161     time.sleep(10)
0162     worker_publisher.publish(stop, headers=stop_headers)
0163     logger.info(f"Published stop message: {stop} with headers: {stop_headers}")
0164 
0165 
0166 if __name__ == "__main__":
0167     test_message_construction()
0168     run()