Warning, /iDDS/main/prompt.md is written in an unsupported language. File is not indexed.
0001 ## Prompt Agent
0002
0003 This document describes the message contract and workflow for the prompt "transceiver" agent used by iDDS.
0004
0005 Goals:
0006 - Define a minimal, consistent message format used by the transceiver, transformer, and worker handlers.
0007 - Provide concrete examples for common message types (run start/stop, slice, and transformer lifecycle events).
0008
0009 ---
0010
0011 ### 1. Message contract (headers)
0012
0013 All messages MUST include `instance`, `msg_type`, and `run_id` in the headers. Use `instance` to identify the deployment (for example `prod` or `dev_<userid>`), `msg_type` to indicate the purpose of the message, and `run_id` to scope messages to a specific run. Keeping these fields consistent makes it easier to extend message types: processors only need to understand changes to `content` while the outer contract remains stable.
0014
0015 Broker-side filtering (selectors) is used so messages are filtered before delivery. On ActiveMQ, use a `topic` when you want broadcasts (every subscriber receives the message) and a `queue` for point-to-point delivery (one consumer receives each message). In production you typically run multiple agent processes per `instance` behind a queue so work is distributed. To avoid fetching and discarding messages locally, use header selectors (broker-side) — e.g. a STOMP selector like:
0016
0017 ```python
0018 headers['selector'] = "instance='prod' AND run_id='12345'"
0019 ```
0020
0021 Defining standard headers enables consistent use of selectors across the system.
0022
0023 Example headers (Python dict):
0024
0025 ```python
0026 headers = {
0027 'persistent': 'true',
0028 'ttl': self.timetolive, # default: 12 * 3600 * 1000 (milliseconds)
0029 'vo': 'eic',
0030 'instance': instance_name, # e.g. 'prod', 'dev1', 'dev_<userid>'
0031 'msg_type': message_type,
0032 'run_id': run_id,
0033 }
0034 ```
0035
0036 Message body keys (recommended):
0037 - `instance` (string)
0038 - `msg_type` (string)
0039 - `run_id` (int or string)
0040 - `created_at` (UTC timestamp)
0041 - `content` (dict): message-specific payload
0042
0043 Result/response messages should include timing fields inside `content` to support latency measurements:
0044 - `requested_at` (copied from the request's `created_at`)
0045 - `processing_start_at`
0046 - `processed_at`
0047
0048 Example `slice_result`:
0049
0050 ```python
0051 msg = {
0052 'instance': instance_name,
0053 'msg_type': 'slice_result',
0054 'run_id': run_id,
0055 'created_at': datetime.datetime.utcnow(),
0056 'content': {
0057 'requested_at': slice_created_at,
0058 'processing_start_at': processing_start,
0059 'processed_at': processing_end,
0060 'result': { ... },
0061 }
0062 }
0063 ```
0064
0065 ---
0066
0067 ### 2. Input messages from the SWF Processing Agent
0068
0069 The `content` schema is flexible and can be adapted by EIC. Below are suggested, well-formed examples.
0070
0071 Run imminent (start):
0072
0073 ```python
0074 start_msg = {
0075 'instance': instance_name,
0076 'msg_type': 'run_imminent',
0077 'run_id': run_id,
0078 'created_at': datetime.datetime.utcnow(),
0079 'content': {
0080 'num_workers': 2,
0081 'num_cores_per_worker': 10,
0082 'num_ram_per_core': 4000.0, # MB
0083 # optional: copy of original DAQ/PA fields
0084 'msg_type': 'start_run',
0085 'req_id': 1,
0086 'ts': '20250914185722',
0087 }
0088 }
0089 ```
0090
0091 Slice message example:
0092
0093 ```python
0094 slice_msg = {
0095 'instance': instance_name,
0096 'msg_type': 'slice',
0097 'run_id': run_id,
0098 'created_at': datetime.datetime.utcnow(),
0099 'content': {
0100 'run_id': run_id,
0101 'state': 'no_beam',
0102 'substate': 'calib',
0103 'filename': 'swf.20250914.185724.767135.no_beam.calib.stf',
0104 'start': '20250914185722420185',
0105 'end': '20250914185724767135',
0106 'checksum': 'ad:3915264619',
0107 'size': 191,
0108 'msg_type': 'stf_gen',
0109 'req_id': 1,
0110 }
0111 }
0112 ```
0113
0114 Run stop (end):
0115
0116 ```python
0117 stop_msg = {
0118 'instance': instance_name,
0119 'msg_type': 'run_stop',
0120 'run_id': run_id,
0121 'created_at': datetime.datetime.utcnow(),
0122 'content': {
0123 'req_id': 1,
0124 'run_id': run_id,
0125 'ts': '20250914185722',
0126 }
0127 }
0128 ```
0129
0130 ---
0131
0132 ### 3. Worker handler behavior
0133
0134 When receiving `run_imminent`, the worker handler should:
0135
0136 - Create an iDDS workflow and PanDA tasks (via `create_workflow_task(msg)`).
0137 - Send an adjuster message to Harvester to start workers.
0138
0139 Example adjuster message to start workers:
0140
0141 ```python
0142 start_worker_msg = {
0143 'instance': instance_name,
0144 'msg_type': 'adjuster_worker',
0145 'run_id': run_id,
0146 'created_at': datetime.datetime.utcnow(),
0147 'content': {
0148 'num_workers': start_msg['content']['num_workers'],
0149 'num_cores_per_worker': start_msg['content']['num_cores_per_worker'],
0150 'num_ram_per_core': start_msg['content']['num_ram_per_core'],
0151 'requested_at': start_msg['created_at'],
0152 }
0153 }
0154 ```
0155
0156 When receiving `run_stop`, the handler should close the PanDA task and send stop adjuster messages:
0157
0158 ```python
0159 # close PanDA task to avoid retries
0160 task_id = get_task_id_from_run_id(stop_msg['run_id'])
0161 close_panda_task(task_id)
0162
0163 stop_worker_msg = {
0164 'instance': instance_name,
0165 'msg_type': 'adjuster_worker',
0166 'run_id': run_id,
0167 'created_at': datetime.datetime.utcnow(),
0168 'content': {
0169 'num_workers': 0,
0170 'num_cores_per_worker': 0,
0171 'num_ram_per_core': 0,
0172 'requested_at': stop_msg['created_at'],
0173 }
0174 }
0175
0176 stop_transformer_msg = {
0177 'instance': instance_name,
0178 'msg_type': 'stop_transformer',
0179 'run_id': run_id,
0180 'created_at': datetime.datetime.utcnow(),
0181 'content': {
0182 'requested_at': stop_msg['created_at'],
0183 }
0184 }
0185 ```
0186
0187 ---
0188
0189 ### 4. Creating workflow tasks (TODO)
0190
0191 Create iDDS workflow and PanDA task. The iDDS poller will monitor the task and the number of jobs. If the PanDA task has fewer jobs than expected (failures or other issues), iDDS can trigger PanDA to create new jobs.
0192
0193 One way to trigger job creation is to emit a synthetic Rucio `transfer-done` event that PanDA recognizes:
0194
0195 ```python
0196 transfer_done_msg = {
0197 'event_type': 'transfer-done',
0198 'created_at': datetime.datetime.utcnow(),
0199 'payload': {
0200 'activity': activity,
0201 'name': name,
0202 'scope': scope,
0203 'dst-rse': rse,
0204 }
0205 }
0206 ```
0207
0208 ---
0209
0210 ### 5. Slice handler
0211
0212 Currently not required in the baseline; keep this section for future transformer-side forwarding logic and debugging helpers.
0213
0214 ---
0215
0216 ### 6. Transformer lifecycle messages
0217
0218 The transformer (running inside a pilot) consumes `slice` messages from the queue and may also receive `stop_transformer` via a topic.
0219
0220 Key lifecycle messages:
0221
0222 ```python
0223 slice_result_msg = {
0224 'instance': instance_name,
0225 'msg_type': 'slice_result',
0226 'run_id': run_id,
0227 'created_at': datetime.datetime.utcnow(),
0228 'content': {
0229 'requested_at': slice_created_at,
0230 'processing_start_at': processing_start,
0231 'processed_at': processing_end,
0232 'result': { ... },
0233 }
0234 }
0235
0236 transformer_start_msg = {
0237 'instance': instance_name,
0238 'msg_type': 'transformer_start',
0239 'run_id': run_id,
0240 'created_at': datetime.datetime.utcnow(),
0241 'content': {'hostname': hostname, 'id': pilot_id}
0242 }
0243
0244 transformer_end_msg = {
0245 'instance': instance_name,
0246 'msg_type': 'transformer_end',
0247 'run_id': run_id,
0248 'created_at': datetime.datetime.utcnow(),
0249 'content': {'hostname': hostname, 'id': pilot_id}
0250 }
0251
0252 transformer_heartbeat_msg = {
0253 'instance': instance_name,
0254 'msg_type': 'transformer_heartbeat',
0255 'run_id': run_id,
0256 'created_at': datetime.datetime.utcnow(),
0257 'content': {'hostname': hostname, 'id': pilot_id}
0258 }
0259 ```
0260
0261 ---
0262
0263 ### 7. Transformer subscription example
0264
0265 Use `client-individual` ack and selectors to ensure transformers only receive their intended messages. Example STOMP headers:
0266
0267 ```python
0268 ack = 'client-individual'
0269 headers = {
0270 'vo': 'eic',
0271 'selector': "instance='{}' AND run_id='{}'".format(instance_name, run_id),
0272 'activemq.prefetchSize': '1',
0273 }
0274 ```
0275
0276 ---
0277
0278 Notes:
0279 - Keep message timestamps in UTC.
0280 - Preserve original PA/DAQ timestamps in `content` (useful for latency/evaluation).
0281 - The examples above use Python dict literals but the wire format should be your chosen serialization (JSON, msgpack, etc.).