Back to home page

EIC code displayed by LXR

 
 

    


Warning, /iDDS/main/prompt.md is written in an unsupported language. File is not indexed.

0001 ## Prompt Agent
0002 
0003 This document describes the message contract and workflow for the prompt "transceiver" agent used by iDDS.
0004 
0005 Goals:
0006 - Define a minimal, consistent message format used by the transceiver, transformer, and worker handlers.
0007 - Provide concrete examples for common message types (run start/stop, slice, and transformer lifecycle events).
0008 
0009 ---
0010 
0011 ### 1. Message contract (headers)
0012 
0013 All messages MUST include `instance`, `msg_type`, and `run_id` in the headers. Use `instance` to identify the deployment (for example `prod` or `dev_<userid>`), `msg_type` to indicate the purpose of the message, and `run_id` to scope messages to a specific run. Keeping these fields consistent makes it easier to extend message types: processors only need to understand changes to `content` while the outer contract remains stable.
0014 
0015 Broker-side filtering (selectors) is used so messages are filtered before delivery. On ActiveMQ, use a `topic` when you want broadcasts (every subscriber receives the message) and a `queue` for point-to-point delivery (one consumer receives each message). In production you typically run multiple agent processes per `instance` behind a queue so work is distributed. To avoid fetching and discarding messages locally, use header selectors (broker-side) — e.g. a STOMP selector like:
0016 
0017 ```python
0018 headers['selector'] = "instance='prod' AND run_id='12345'"
0019 ```
0020 
0021 Defining standard headers enables consistent use of selectors across the system.
0022 
0023 Example headers (Python dict):
0024 
0025 ```python
0026 headers = {
0027     'persistent': 'true',
0028     'ttl': self.timetolive,     # default: 12 * 3600 * 1000 (milliseconds)
0029     'vo': 'eic',
0030     'instance': instance_name,  # e.g. 'prod', 'dev1', 'dev_<userid>'
0031     'msg_type': message_type,
0032     'run_id': run_id,
0033 }
0034 ```
0035 
0036 Message body keys (recommended):
0037 - `instance` (string)
0038 - `msg_type` (string)
0039 - `run_id` (int or string)
0040 - `created_at` (UTC timestamp)
0041 - `content` (dict): message-specific payload
0042 
0043 Result/response messages should include timing fields inside `content` to support latency measurements:
0044 - `requested_at` (copied from the request's `created_at`)
0045 - `processing_start_at`
0046 - `processed_at`
0047 
0048 Example `slice_result`:
0049 
0050 ```python
0051 msg = {
0052     'instance': instance_name,
0053     'msg_type': 'slice_result',
0054     'run_id': run_id,
0055     'created_at': datetime.datetime.utcnow(),
0056     'content': {
0057         'requested_at': slice_created_at,
0058         'processing_start_at': processing_start,
0059         'processed_at': processing_end,
0060         'result': { ... },
0061     }
0062 }
0063 ```
0064 
0065 ---
0066 
0067 ### 2. Input messages from the SWF Processing Agent
0068 
0069 The `content` schema is flexible and can be adapted by EIC. Below are suggested, well-formed examples.
0070 
0071 Run imminent (start):
0072 
0073 ```python
0074 start_msg = {
0075     'instance': instance_name,
0076     'msg_type': 'run_imminent',
0077     'run_id': run_id,
0078     'created_at': datetime.datetime.utcnow(),
0079     'content': {
0080         'num_workers': 2,
0081         'num_cores_per_worker': 10,
0082         'num_ram_per_core': 4000.0,  # MB
0083         # optional: copy of original DAQ/PA fields
0084         'msg_type': 'start_run',
0085         'req_id': 1,
0086         'ts': '20250914185722',
0087     }
0088 }
0089 ```
0090 
0091 Slice message example:
0092 
0093 ```python
0094 slice_msg = {
0095     'instance': instance_name,
0096     'msg_type': 'slice',
0097     'run_id': run_id,
0098     'created_at': datetime.datetime.utcnow(),
0099     'content': {
0100         'run_id': run_id,
0101         'state': 'no_beam',
0102         'substate': 'calib',
0103         'filename': 'swf.20250914.185724.767135.no_beam.calib.stf',
0104         'start': '20250914185722420185',
0105         'end': '20250914185724767135',
0106         'checksum': 'ad:3915264619',
0107         'size': 191,
0108         'msg_type': 'stf_gen',
0109         'req_id': 1,
0110     }
0111 }
0112 ```
0113 
0114 Run stop (end):
0115 
0116 ```python
0117 stop_msg = {
0118     'instance': instance_name,
0119     'msg_type': 'run_stop',
0120     'run_id': run_id,
0121     'created_at': datetime.datetime.utcnow(),
0122     'content': {
0123         'req_id': 1,
0124         'run_id': run_id,
0125         'ts': '20250914185722',
0126     }
0127 }
0128 ```
0129 
0130 ---
0131 
0132 ### 3. Worker handler behavior
0133 
0134 When receiving `run_imminent`, the worker handler should:
0135 
0136 - Create an iDDS workflow and PanDA tasks (via `create_workflow_task(msg)`).
0137 - Send an adjuster message to Harvester to start workers.
0138 
0139 Example adjuster message to start workers:
0140 
0141 ```python
0142 start_worker_msg = {
0143     'instance': instance_name,
0144     'msg_type': 'adjuster_worker',
0145     'run_id': run_id,
0146     'created_at': datetime.datetime.utcnow(),
0147     'content': {
0148         'num_workers': start_msg['content']['num_workers'],
0149         'num_cores_per_worker': start_msg['content']['num_cores_per_worker'],
0150         'num_ram_per_core': start_msg['content']['num_ram_per_core'],
0151         'requested_at': start_msg['created_at'],
0152     }
0153 }
0154 ```
0155 
0156 When receiving `run_stop`, the handler should close the PanDA task and send stop adjuster messages:
0157 
0158 ```python
0159 # close PanDA task to avoid retries
0160 task_id = get_task_id_from_run_id(stop_msg['run_id'])
0161 close_panda_task(task_id)
0162 
0163 stop_worker_msg = {
0164     'instance': instance_name,
0165     'msg_type': 'adjuster_worker',
0166     'run_id': run_id,
0167     'created_at': datetime.datetime.utcnow(),
0168     'content': {
0169         'num_workers': 0,
0170         'num_cores_per_worker': 0,
0171         'num_ram_per_core': 0,
0172         'requested_at': stop_msg['created_at'],
0173     }
0174 }
0175 
0176 stop_transformer_msg = {
0177     'instance': instance_name,
0178     'msg_type': 'stop_transformer',
0179     'run_id': run_id,
0180     'created_at': datetime.datetime.utcnow(),
0181     'content': {
0182         'requested_at': stop_msg['created_at'],
0183     }
0184 }
0185 ```
0186 
0187 ---
0188 
0189 ### 4. Creating workflow tasks (TODO)
0190 
0191 Create iDDS workflow and PanDA task. The iDDS poller will monitor the task and the number of jobs. If the PanDA task has fewer jobs than expected (failures or other issues), iDDS can trigger PanDA to create new jobs.
0192 
0193 One way to trigger job creation is to emit a synthetic Rucio `transfer-done` event that PanDA recognizes:
0194 
0195 ```python
0196 transfer_done_msg = {
0197     'event_type': 'transfer-done',
0198     'created_at': datetime.datetime.utcnow(),
0199     'payload': {
0200         'activity': activity,
0201         'name': name,
0202         'scope': scope,
0203         'dst-rse': rse,
0204     }
0205 }
0206 ```
0207 
0208 ---
0209 
0210 ### 5. Slice handler
0211 
0212 Currently not required in the baseline; keep this section for future transformer-side forwarding logic and debugging helpers.
0213 
0214 ---
0215 
0216 ### 6. Transformer lifecycle messages
0217 
0218 The transformer (running inside a pilot) consumes `slice` messages from the queue and may also receive `stop_transformer` via a topic.
0219 
0220 Key lifecycle messages:
0221 
0222 ```python
0223 slice_result_msg = {
0224     'instance': instance_name,
0225     'msg_type': 'slice_result',
0226     'run_id': run_id,
0227     'created_at': datetime.datetime.utcnow(),
0228     'content': {
0229         'requested_at': slice_created_at,
0230         'processing_start_at': processing_start,
0231         'processed_at': processing_end,
0232         'result': { ... },
0233     }
0234 }
0235 
0236 transformer_start_msg = {
0237     'instance': instance_name,
0238     'msg_type': 'transformer_start',
0239     'run_id': run_id,
0240     'created_at': datetime.datetime.utcnow(),
0241     'content': {'hostname': hostname, 'id': pilot_id}
0242 }
0243 
0244 transformer_end_msg = {
0245     'instance': instance_name,
0246     'msg_type': 'transformer_end',
0247     'run_id': run_id,
0248     'created_at': datetime.datetime.utcnow(),
0249     'content': {'hostname': hostname, 'id': pilot_id}
0250 }
0251 
0252 transformer_heartbeat_msg = {
0253     'instance': instance_name,
0254     'msg_type': 'transformer_heartbeat',
0255     'run_id': run_id,
0256     'created_at': datetime.datetime.utcnow(),
0257     'content': {'hostname': hostname, 'id': pilot_id}
0258 }
0259 ```
0260 
0261 ---
0262 
0263 ### 7. Transformer subscription example
0264 
0265 Use `client-individual` ack and selectors to ensure transformers only receive their intended messages. Example STOMP headers:
0266 
0267 ```python
0268 ack = 'client-individual'
0269 headers = {
0270     'vo': 'eic',
0271     'selector': "instance='{}' AND run_id='{}'".format(instance_name, run_id),
0272     'activemq.prefetchSize': '1',
0273 }
0274 ```
0275 
0276 ---
0277 
0278 Notes:
0279 - Keep message timestamps in UTC.
0280 - Preserve original PA/DAQ timestamps in `content` (useful for latency/evaluation).
0281 - The examples above use Python dict literals but the wire format should be your chosen serialization (JSON, msgpack, etc.).