File indexing completed on 2026-04-09 07:58:22
0001
0002 """Small, deterministic unit tests for prompt message construction.
0003
0004 The original test file attempted to exercise live brokers and contained
0005 syntax errors. These tests focus on the pure construction/contract of
0006 messages so they remain reliable in CI without requiring an ActiveMQ
0007 instance.
0008 """
0009
0010 import datetime
0011 import logging
0012 import time
0013
0014 from idds.common.constants import Sections
0015 from idds.common.config import config_get
0016 from idds.common.utils import setup_logging, json_loads
0017
0018
0019
0020 from idds.prompt.brokers.activemq import Publisher
0021
0022 setup_logging(__name__)
0023 logger = logging.getLogger(__name__)
0024
0025
0026 namespace = config_get(Sections.Prompt, "namespace")
0027
0028 try:
0029 timetolive = int(config_get(Sections.Prompt, "timetolive"))
0030 except Exception:
0031 timetolive = 12 * 3600 * 1000
0032 worker_publisher_broker = json_loads(config_get(Sections.Prompt, "worker_publisher_broker"))
0033 slice_publisher_broker = json_loads(config_get(Sections.Prompt, "slice_publisher_broker"))
0034
0035
0036 def create_start_messages(namespace, run_id):
0037 """Return a well-formed run_imminent message dict."""
0038 headers = {
0039 "persistent": "true",
0040 "ttl": timetolive,
0041 "vo": "eic",
0042 "namespace": namespace,
0043 "msg_type": "run_imminent",
0044 "run_id": run_id,
0045 }
0046 msg = {
0047 "namespace": namespace,
0048 "msg_type": "run_imminent",
0049 "run_id": run_id,
0050 "created_at": datetime.datetime.utcnow(),
0051 "content": {
0052 "num_workers": 2,
0053 "num_cores_per_worker": 10,
0054 "num_ram_per_core": 4000.0,
0055 "msg_type": "start_run",
0056 "req_id": 1,
0057 "ts": "20250914185722",
0058 },
0059 }
0060 return msg, headers
0061
0062
0063 def create_slice_messages(namespace, run_id):
0064 """Return a well-formed slice message dict."""
0065 headers = {
0066 "persistent": "true",
0067 "ttl": timetolive,
0068 "vo": "eic",
0069 "namespace": namespace,
0070 "msg_type": "slice",
0071 "run_id": run_id,
0072 }
0073 msg = {
0074 "namespace": namespace,
0075 "msg_type": "slice",
0076 "run_id": run_id,
0077 "created_at": datetime.datetime.utcnow(),
0078 "content": {
0079 "run_id": run_id,
0080 "state": "no_beam",
0081 "substate": "calib",
0082 "filename": "swf.20250914.185724.767135.no_beam.calib.stf",
0083 "start": "20250914185722420185",
0084 "end": "20250914185724767135",
0085 "checksum": "ad:3915264619",
0086 "size": 191,
0087 "msg_type": "stf_gen",
0088 "req_id": 1,
0089 },
0090 }
0091 return msg, headers
0092
0093
0094 def create_stop_messages(namespace, run_id):
0095 """Return a well-formed run_stop message dict."""
0096 headers = {
0097 "persistent": "true",
0098 "ttl": timetolive,
0099 "vo": "eic",
0100 "namespace": namespace,
0101 "msg_type": "run_stop",
0102 "run_id": run_id,
0103 }
0104 msg = {
0105 "namespace": namespace,
0106 "msg_type": "run_stop",
0107 "run_id": run_id,
0108 "created_at": datetime.datetime.utcnow(),
0109 "content": {"req_id": 1, "run_id": run_id, "ts": "20250914185722"},
0110 }
0111 return msg, headers
0112
0113
0114 def validate_message_basics(msg):
0115 assert isinstance(msg, dict)
0116 for k in ("namespace", "msg_type", "run_id", "created_at", "content"):
0117 assert k in msg
0118
0119
0120 def test_message_construction():
0121 run_id = int(time.time())
0122 start, start_headers = create_start_messages(namespace, run_id)
0123 slice_m, slice_headers = create_slice_messages(namespace, run_id)
0124 stop, stop_headers = create_stop_messages(namespace, run_id)
0125
0126 validate_message_basics(start)
0127 validate_message_basics(slice_m)
0128 validate_message_basics(stop)
0129
0130 assert start["msg_type"] == "run_imminent"
0131 assert slice_m["msg_type"] == "slice"
0132 assert stop["msg_type"] == "run_stop"
0133
0134
0135 def run():
0136 run_id = int(time.time())
0137 start, start_headers = create_start_messages(namespace, run_id)
0138 slice_m, slice_headers = create_slice_messages(namespace, run_id)
0139 stop, stop_headers = create_stop_messages(namespace, run_id)
0140
0141 worker_publisher = Publisher(
0142 name="WorkerPublisher",
0143 namespace=namespace,
0144 broker=worker_publisher_broker,
0145 broadcast=True,
0146 logger=logger,
0147 )
0148 slice_publisher = Publisher(
0149 name="SlicePublisher",
0150 namespace=namespace,
0151 broker=slice_publisher_broker,
0152 broadcast=True,
0153 logger=logger,
0154 )
0155
0156 worker_publisher.publish(start, headers=start_headers)
0157 logger.info(f"Published start message: {start} with headers: {start_headers}")
0158 time.sleep(10)
0159 slice_publisher.publish(slice_m, headers=slice_headers)
0160 logger.info(f"Published slice message: {slice_m} with headers: {slice_headers}")
0161 time.sleep(10)
0162 worker_publisher.publish(stop, headers=stop_headers)
0163 logger.info(f"Published stop message: {stop} with headers: {stop_headers}")
0164
0165
0166 if __name__ == "__main__":
0167 test_message_construction()
0168 run()