Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:22

0001 #!/usr/bin/env python3
0002 """
0003 Submit one PCS ProdTask to PanDA — the credentialed "doer" behind the
0004 prod-ops agent's ``submit_task`` capability, and usable standalone.
0005 
0006 This is the automated counterpart to the manual production recipe in
0007 docs/EPICPROD_OPS.md: it runs the same ``prun`` an operator runs, but
0008 non-interactively, reusing the cached long-lived OIDC token (it never
0009 deletes ``$PANDA_CONFIG_ROOT/.token``, which would force an interactive
0010 device flow). PCS remains the single source of truth — the prun command
0011 is fetched from the monitor's own artifact endpoint
0012 (``/pcs/api/prod-tasks/command/?name=<name>&fmt=panda``), not rebuilt here.
0013 
0014 Flow:
0015   1. GET the prun command for ``--task-name`` from the monitor.
0016   2. Run it in a clean sandbox dir under the panda-client environment
0017      (source the pclient setup, export PANDA_AUTH_VO), non-interactive.
0018   3. Parse ``jediTaskID=<N>`` from prun's output.
0019   4. POST the outcome back to ``/pcs/api/prod-tasks/record-submission/``
0020      so the ProdTask records its panda_task_id and flips to 'submitted'.
0021 
0022 Every failure is surfaced (stderr + non-zero exit); nothing is swallowed.
0023 
0024 Standalone:
0025     python scripts/submit-prod-task.py --task-name <ProdTask.name>
0026 """
0027 import argparse
0028 import os
0029 import re
0030 import subprocess
0031 import sys
0032 import tempfile
0033 import time
0034 import urllib.parse
0035 import urllib.request
0036 
0037 # record-submission is idempotent on the jediTaskID, so retrying a failed POST
0038 # is safe (a partial success won't double-record). Closes transient orphans.
0039 RECORD_ATTEMPTS = 3
0040 RECORD_BACKOFF = 2          # seconds, multiplied by attempt number
0041 
0042 # Where the cached OIDC token and pclient live; the agent runs as the same
0043 # user, so these resolve identically under systemd.
0044 DEFAULT_PCLIENT_SETUP = os.path.expanduser("~/pclient/run/setup.sh")
0045 DEFAULT_AUTH_VO = "EIC.production"
0046 # Sandbox root on the large /data volume (prun packages the cwd).
0047 SUBMIT_TMP_ROOT = os.path.join(os.environ.get("SWF_TMP_DIR", "/data/swf-tmp"), "submit")
0048 
0049 JEDITASKID_RE = re.compile(r"jediTaskID=(\d+)")
0050 
0051 
0052 def _log(msg):
0053     print(msg, file=sys.stderr, flush=True)
0054 
0055 
0056 def _api_get(base, path, query, token):
0057     url = f"{base.rstrip('/')}{path}?{urllib.parse.urlencode(query)}"
0058     req = urllib.request.Request(url)
0059     if token:
0060         req.add_header("Authorization", f"Token {token}")
0061     with urllib.request.urlopen(req, timeout=30) as r:
0062         return r.read().decode()
0063 
0064 
0065 def _api_post_json(base, path, query, body, token, owner=None):
0066     import json
0067     url = f"{base.rstrip('/')}{path}?{urllib.parse.urlencode(query)}"
0068     data = json.dumps(body).encode()
0069     req = urllib.request.Request(url, data=data, method="POST")
0070     req.add_header("Content-Type", "application/json")
0071     # record-submission is owner-gated (IsOwnerOrReadOnly: created_by ==
0072     # request.user.username). On-host, TunnelAuthentication trusts X-Remote-User
0073     # on localhost requests, so we authenticate as the task owner without a
0074     # secret. Off-host, a DRF token for that owner is the fallback.
0075     if owner:
0076         req.add_header("X-Remote-User", owner)
0077     if token:
0078         req.add_header("Authorization", f"Token {token}")
0079     with urllib.request.urlopen(req, timeout=30) as r:
0080         return r.read().decode()
0081 
0082 
0083 def main():
0084     ap = argparse.ArgumentParser(description="Submit a PCS ProdTask to PanDA via prun.")
0085     ap.add_argument("--task-name", required=True, help="ProdTask.name to submit")
0086     ap.add_argument("--swf-monitor-url",
0087                     default=os.environ.get("SWF_MONITOR_URL", "").rstrip("/"),
0088                     help="swf-monitor base URL incl. /swf-monitor app path")
0089     ap.add_argument("--token",
0090                     default=os.environ.get("SWFMON_TOKEN") or os.environ.get("SWF_MONITOR_TOKEN", ""),
0091                     help="DRF token for the monitor API record-submission POST "
0092                          "(SWFMON_TOKEN; read-only command GET needs none)")
0093     ap.add_argument("--owner", default=os.environ.get("SWF_TASK_OWNER", ""),
0094                     help="ProdTask.created_by (Django username); sent as X-Remote-User "
0095                          "so the on-host tunnel authenticates the owner-gated record write")
0096     ap.add_argument("--auth-vo", default=DEFAULT_AUTH_VO,
0097                     help="PANDA_AUTH_VO for the submission (default EIC.production)")
0098     ap.add_argument("--pclient-setup", default=DEFAULT_PCLIENT_SETUP,
0099                     help="panda-client environment setup to source")
0100     ap.add_argument("--timeout", type=int, default=300,
0101                     help="seconds before the prun run is abandoned")
0102     args = ap.parse_args()
0103 
0104     if not args.swf_monitor_url:
0105         _log("ERROR: no --swf-monitor-url / SWF_MONITOR_URL")
0106         return 2
0107 
0108     # 1. Fetch the prun command from PCS (single source of truth).
0109     try:
0110         prun_cmd = _api_get(args.swf_monitor_url, "/pcs/api/prod-tasks/command/",
0111                             {"name": args.task_name, "fmt": "panda"}, args.token).strip()
0112     except Exception as e:
0113         _log(f"ERROR: could not fetch prun command for '{args.task_name}': {e}")
0114         return 3
0115     if not prun_cmd.startswith("prun"):
0116         _log(f"ERROR: artifact endpoint did not return a prun command:\n{prun_cmd[:500]}")
0117         return 3
0118     _log(f"prun command for {args.task_name}:\n{prun_cmd}")
0119 
0120     # 2. Run prun in a clean sandbox under the pclient environment. The cached
0121     #    token is reused (never deleted) so this stays non-interactive.
0122     os.makedirs(SUBMIT_TMP_ROOT, exist_ok=True)
0123     sandbox = tempfile.mkdtemp(prefix=f"{args.task_name}.", dir=SUBMIT_TMP_ROOT)
0124     runner = os.path.join(sandbox, "run-prun.sh")
0125     with open(runner, "w") as f:
0126         f.write("#!/bin/bash\nset -e\n")
0127         f.write(f"source {args.pclient_setup}\n")
0128         f.write(f"export PANDA_AUTH_VO={args.auth_vo}\n")
0129         f.write(prun_cmd + "\n")
0130     _log(f"sandbox: {sandbox}")
0131     try:
0132         p = subprocess.run(["bash", runner], cwd=sandbox, capture_output=True,
0133                            text=True, timeout=args.timeout)
0134     except subprocess.TimeoutExpired:
0135         _log(f"ERROR: prun timed out after {args.timeout}s")
0136         return 4
0137     out = (p.stdout or "") + (p.stderr or "")
0138     for line in out.splitlines():
0139         _log(f"  prun: {line}")
0140     if p.returncode != 0:
0141         _log(f"ERROR: prun exited rc={p.returncode}")
0142         return 5
0143 
0144     # 3. Parse the JEDI task ID.
0145     m = JEDITASKID_RE.search(out)
0146     if not m:
0147         _log("ERROR: prun succeeded but no jediTaskID in output")
0148         return 6
0149     jedi_task_id = int(m.group(1))
0150     _log(f"SUBMITTED {args.task_name} -> jediTaskID={jedi_task_id}")
0151 
0152     # 4. Record the outcome back to PCS. Retry a few times — record-submission
0153     #    is idempotent on the jediTaskID, so a retry after a partial success is
0154     #    safe, and it closes a transient POST failure that would otherwise
0155     #    orphan the submission.
0156     last_err = None
0157     for attempt in range(1, RECORD_ATTEMPTS + 1):
0158         try:
0159             _api_post_json(args.swf_monitor_url, "/pcs/api/prod-tasks/record-submission/",
0160                            {"name": args.task_name}, {"jedi_task_id": jedi_task_id},
0161                            args.token, owner=args.owner)
0162             _log(f"recorded jediTaskID={jedi_task_id} on ProdTask {args.task_name}")
0163             print(jedi_task_id)
0164             return 0
0165         except Exception as e:
0166             last_err = e
0167             _log(f"record-submission POST attempt {attempt}/{RECORD_ATTEMPTS} failed: {e}")
0168             if attempt < RECORD_ATTEMPTS:
0169                 time.sleep(RECORD_BACKOFF * attempt)
0170     # The task IS submitted; only the bookkeeping POST failed after retries — an
0171     # orphan. Surface it loudly AND print the id (even on this failure path) so
0172     # the agent/operator can re-record it (record-submission is idempotent).
0173     _log(f"WARNING: submitted (jediTaskID={jedi_task_id}) but record-submission "
0174          f"POST failed after {RECORD_ATTEMPTS} attempts: {last_err}")
0175     print(jedi_task_id)
0176     return 7
0177 
0178 
0179 if __name__ == "__main__":
0180     sys.exit(main())