File indexing completed on 2026-06-26 08:40:22
0001
0002 """
0003 Submit one PCS ProdTask to PanDA — the credentialed "doer" behind the
0004 prod-ops agent's ``submit_task`` capability, and usable standalone.
0005
0006 This is the automated counterpart to the manual production recipe in
0007 docs/EPICPROD_OPS.md: it runs the same ``prun`` an operator runs, but
0008 non-interactively, reusing the cached long-lived OIDC token (it never
0009 deletes ``$PANDA_CONFIG_ROOT/.token``, which would force an interactive
0010 device flow). PCS remains the single source of truth — the prun command
0011 is fetched from the monitor's own artifact endpoint
0012 (``/pcs/api/prod-tasks/command/?name=<name>&fmt=panda``), not rebuilt here.
0013
0014 Flow:
0015 1. GET the prun command for ``--task-name`` from the monitor.
0016 2. Run it in a clean sandbox dir under the panda-client environment
0017 (source the pclient setup, export PANDA_AUTH_VO), non-interactive.
0018 3. Parse ``jediTaskID=<N>`` from prun's output.
0019 4. POST the outcome back to ``/pcs/api/prod-tasks/record-submission/``
0020 so the ProdTask records its panda_task_id and flips to 'submitted'.
0021
0022 Every failure is surfaced (stderr + non-zero exit); nothing is swallowed.
0023
0024 Standalone:
0025 python scripts/submit-prod-task.py --task-name <ProdTask.name>
0026 """
0027 import argparse
0028 import os
0029 import re
0030 import subprocess
0031 import sys
0032 import tempfile
0033 import time
0034 import urllib.parse
0035 import urllib.request
0036
0037
0038
0039 RECORD_ATTEMPTS = 3
0040 RECORD_BACKOFF = 2
0041
0042
0043
0044 DEFAULT_PCLIENT_SETUP = os.path.expanduser("~/pclient/run/setup.sh")
0045 DEFAULT_AUTH_VO = "EIC.production"
0046
0047 SUBMIT_TMP_ROOT = os.path.join(os.environ.get("SWF_TMP_DIR", "/data/swf-tmp"), "submit")
0048
0049 JEDITASKID_RE = re.compile(r"jediTaskID=(\d+)")
0050
0051
0052 def _log(msg):
0053 print(msg, file=sys.stderr, flush=True)
0054
0055
0056 def _api_get(base, path, query, token):
0057 url = f"{base.rstrip('/')}{path}?{urllib.parse.urlencode(query)}"
0058 req = urllib.request.Request(url)
0059 if token:
0060 req.add_header("Authorization", f"Token {token}")
0061 with urllib.request.urlopen(req, timeout=30) as r:
0062 return r.read().decode()
0063
0064
0065 def _api_post_json(base, path, query, body, token, owner=None):
0066 import json
0067 url = f"{base.rstrip('/')}{path}?{urllib.parse.urlencode(query)}"
0068 data = json.dumps(body).encode()
0069 req = urllib.request.Request(url, data=data, method="POST")
0070 req.add_header("Content-Type", "application/json")
0071
0072
0073
0074
0075 if owner:
0076 req.add_header("X-Remote-User", owner)
0077 if token:
0078 req.add_header("Authorization", f"Token {token}")
0079 with urllib.request.urlopen(req, timeout=30) as r:
0080 return r.read().decode()
0081
0082
0083 def main():
0084 ap = argparse.ArgumentParser(description="Submit a PCS ProdTask to PanDA via prun.")
0085 ap.add_argument("--task-name", required=True, help="ProdTask.name to submit")
0086 ap.add_argument("--swf-monitor-url",
0087 default=os.environ.get("SWF_MONITOR_URL", "").rstrip("/"),
0088 help="swf-monitor base URL incl. /swf-monitor app path")
0089 ap.add_argument("--token",
0090 default=os.environ.get("SWFMON_TOKEN") or os.environ.get("SWF_MONITOR_TOKEN", ""),
0091 help="DRF token for the monitor API record-submission POST "
0092 "(SWFMON_TOKEN; read-only command GET needs none)")
0093 ap.add_argument("--owner", default=os.environ.get("SWF_TASK_OWNER", ""),
0094 help="ProdTask.created_by (Django username); sent as X-Remote-User "
0095 "so the on-host tunnel authenticates the owner-gated record write")
0096 ap.add_argument("--auth-vo", default=DEFAULT_AUTH_VO,
0097 help="PANDA_AUTH_VO for the submission (default EIC.production)")
0098 ap.add_argument("--pclient-setup", default=DEFAULT_PCLIENT_SETUP,
0099 help="panda-client environment setup to source")
0100 ap.add_argument("--timeout", type=int, default=300,
0101 help="seconds before the prun run is abandoned")
0102 args = ap.parse_args()
0103
0104 if not args.swf_monitor_url:
0105 _log("ERROR: no --swf-monitor-url / SWF_MONITOR_URL")
0106 return 2
0107
0108
0109 try:
0110 prun_cmd = _api_get(args.swf_monitor_url, "/pcs/api/prod-tasks/command/",
0111 {"name": args.task_name, "fmt": "panda"}, args.token).strip()
0112 except Exception as e:
0113 _log(f"ERROR: could not fetch prun command for '{args.task_name}': {e}")
0114 return 3
0115 if not prun_cmd.startswith("prun"):
0116 _log(f"ERROR: artifact endpoint did not return a prun command:\n{prun_cmd[:500]}")
0117 return 3
0118 _log(f"prun command for {args.task_name}:\n{prun_cmd}")
0119
0120
0121
0122 os.makedirs(SUBMIT_TMP_ROOT, exist_ok=True)
0123 sandbox = tempfile.mkdtemp(prefix=f"{args.task_name}.", dir=SUBMIT_TMP_ROOT)
0124 runner = os.path.join(sandbox, "run-prun.sh")
0125 with open(runner, "w") as f:
0126 f.write("#!/bin/bash\nset -e\n")
0127 f.write(f"source {args.pclient_setup}\n")
0128 f.write(f"export PANDA_AUTH_VO={args.auth_vo}\n")
0129 f.write(prun_cmd + "\n")
0130 _log(f"sandbox: {sandbox}")
0131 try:
0132 p = subprocess.run(["bash", runner], cwd=sandbox, capture_output=True,
0133 text=True, timeout=args.timeout)
0134 except subprocess.TimeoutExpired:
0135 _log(f"ERROR: prun timed out after {args.timeout}s")
0136 return 4
0137 out = (p.stdout or "") + (p.stderr or "")
0138 for line in out.splitlines():
0139 _log(f" prun: {line}")
0140 if p.returncode != 0:
0141 _log(f"ERROR: prun exited rc={p.returncode}")
0142 return 5
0143
0144
0145 m = JEDITASKID_RE.search(out)
0146 if not m:
0147 _log("ERROR: prun succeeded but no jediTaskID in output")
0148 return 6
0149 jedi_task_id = int(m.group(1))
0150 _log(f"SUBMITTED {args.task_name} -> jediTaskID={jedi_task_id}")
0151
0152
0153
0154
0155
0156 last_err = None
0157 for attempt in range(1, RECORD_ATTEMPTS + 1):
0158 try:
0159 _api_post_json(args.swf_monitor_url, "/pcs/api/prod-tasks/record-submission/",
0160 {"name": args.task_name}, {"jedi_task_id": jedi_task_id},
0161 args.token, owner=args.owner)
0162 _log(f"recorded jediTaskID={jedi_task_id} on ProdTask {args.task_name}")
0163 print(jedi_task_id)
0164 return 0
0165 except Exception as e:
0166 last_err = e
0167 _log(f"record-submission POST attempt {attempt}/{RECORD_ATTEMPTS} failed: {e}")
0168 if attempt < RECORD_ATTEMPTS:
0169 time.sleep(RECORD_BACKOFF * attempt)
0170
0171
0172
0173 _log(f"WARNING: submitted (jediTaskID={jedi_task_id}) but record-submission "
0174 f"POST failed after {RECORD_ATTEMPTS} attempts: {last_err}")
0175 print(jedi_task_id)
0176 return 7
0177
0178
0179 if __name__ == "__main__":
0180 sys.exit(main())