Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:22

0001 #!/usr/bin/env python3
0002 """
0003 Submit one PCS ProdTask to PanDA via the client-API EVGEN path — the
0004 credentialed "doer" behind the prod-ops agent's ``submit_evgen_task``
0005 capability, and usable standalone.
0006 
0007 This is the production reproduction of the proven condor-side recipe (eic/
0008 job_submission_condor, spec only): a noInput+noOutput task whose containerized
0009 payload xrootd-streams the EVGEN input from JLab and self-registers RECO to JLab
0010 Rucio. It is the EVGEN counterpart of scripts/submit-prod-task.py (the prun
0011 doer, now sidelined); like it, PCS stays the single source of truth — the
0012 submission spec is *fetched* from the monitor's artifact endpoint
0013 (``/pcs/api/prod-tasks/command/?name=<name>&fmt=evgen``), not rebuilt here.
0014 
0015 Flow:
0016   1. GET the EVGEN spec for ``--task-name`` from the monitor.
0017   2. Assemble the submission sandbox (the one-row-per-job CSV manifest, the
0018      ``environment-*.sh`` the payload sources, the in-job dispatcher, and the
0019      JLab x509 proxy the payload uses to register output).
0020   3. Run the submission kernel (scripts/evgen_panda_submit.py) in a shell that
0021      has sourced the panda-client environment, reusing the cached OIDC token
0022      (never deleting it, which would force an interactive device flow).
0023   4. Parse ``jediTaskID=<N>`` and POST it to
0024      ``/pcs/api/prod-tasks/record-submission/`` so the ProdTask records its
0025      panda_task_id and flips to 'submitted'.
0026   5. Best-effort: write expected output inventory from the exact submitted spec.
0027 
0028 Every failure is surfaced (stderr + non-zero exit); nothing is swallowed. Exit
0029 codes match submit-prod-task.py so the agent handler treats both doers alike:
0030 0 success, 7 submitted-but-unrecorded (idempotent re-record), other non-zero
0031 failure.
0032 
0033 Standalone:
0034     python scripts/submit-evgen-task.py --task-name <ProdTask.name> --proxy <x509>
0035 """
0036 import argparse
0037 import json
0038 import os
0039 import re
0040 import shutil
0041 import subprocess
0042 import sys
0043 import tempfile
0044 import time
0045 import urllib.parse
0046 import urllib.request
0047 
0048 RECORD_ATTEMPTS = 3
0049 RECORD_BACKOFF = 2          # seconds, multiplied by attempt number
0050 
0051 DEFAULT_PCLIENT_SETUP = os.path.expanduser("~/pclient/run/setup.sh")
0052 DEFAULT_AUTH_VO = "EIC.production"
0053 SUBMIT_TMP_ROOT = os.path.join(os.environ.get("SWF_TMP_DIR", "/data/swf-tmp"), "submit-evgen")
0054 
0055 # The submission kernel + the in-job dispatcher live beside this doer.
0056 HERE = os.path.dirname(os.path.abspath(__file__))
0057 KERNEL_SCRIPT = os.path.join(HERE, "evgen_panda_submit.py")
0058 DISPATCHER_SCRIPT = os.path.join(HERE, "evgen_job_dispatcher.py")
0059 MANAGE_PY = os.path.join(os.path.dirname(HERE), "src", "manage.py")
0060 
0061 JEDITASKID_RE = re.compile(r"jediTaskID=(\d+)")
0062 
0063 
0064 def _log(msg):
0065     print(msg, file=sys.stderr, flush=True)
0066 
0067 
0068 def _api_get(base, path, query, token):
0069     url = f"{base.rstrip('/')}{path}?{urllib.parse.urlencode(query)}"
0070     req = urllib.request.Request(url)
0071     if token:
0072         req.add_header("Authorization", f"Token {token}")
0073     with urllib.request.urlopen(req, timeout=30) as r:
0074         return r.read().decode()
0075 
0076 
0077 def _api_post_json(base, path, query, body, token, owner=None):
0078     url = f"{base.rstrip('/')}{path}?{urllib.parse.urlencode(query)}"
0079     data = json.dumps(body).encode()
0080     req = urllib.request.Request(url, data=data, method="POST")
0081     req.add_header("Content-Type", "application/json")
0082     if owner:
0083         req.add_header("X-Remote-User", owner)
0084     if token:
0085         req.add_header("Authorization", f"Token {token}")
0086     with urllib.request.urlopen(req, timeout=30) as r:
0087         return r.read().decode()
0088 
0089 
0090 def _assemble_sandbox(spec, proxy_path, root):
0091     """Build the submission dir: spec.json + a sandbox/ holding the worker-facing
0092     files (manifest, env, dispatcher, proxy). Returns the submission dir path.
0093 
0094     The kernel and spec live OUTSIDE sandbox/ so they are not shipped to the
0095     worker; only the four worker files travel in the tarball.
0096     """
0097     os.makedirs(root, exist_ok=True)
0098     workdir = tempfile.mkdtemp(prefix=f"{spec['csvBase']}.", dir=root)
0099     sandbox = os.path.join(workdir, "sandbox")
0100     os.makedirs(sandbox)
0101 
0102     csv_base = spec['csvBase']
0103     # One-row-per-job manifest (file,ext,nevents,ichunk).
0104     with open(os.path.join(sandbox, f"{csv_base}.csv"), "w") as f:
0105         f.write("\n".join(spec['csvRows']) + "\n")
0106 
0107     # environment-*.sh — the payload run.sh sources it by glob. The proxy
0108     # basename rides here; run.sh reads X509_USER_PROXY back.
0109     proxy_base = os.path.basename(proxy_path)
0110     env = dict(spec.get('env') or {})
0111     env['X509_USER_PROXY'] = proxy_base
0112     with open(os.path.join(sandbox, f"environment-{csv_base}.sh"), "w") as f:
0113         for k, v in env.items():
0114             f.write(f'export {k}={v}\n')
0115 
0116     # In-job dispatcher (named to match spec['exec']) and the proxy.
0117     shutil.copy(DISPATCHER_SCRIPT, os.path.join(sandbox, "evgen_job_dispatcher.py"))
0118     shutil.copy(proxy_path, os.path.join(sandbox, proxy_base))
0119 
0120     with open(os.path.join(workdir, "spec.json"), "w") as f:
0121         json.dump(spec, f, indent=2)
0122     return workdir
0123 
0124 
0125 def _sync_expected_inventory(task_name, spec_path):
0126     """Best-effort local DB update for expected file inventory.
0127 
0128     This runs only after record-submission succeeds, so the PCS task has the
0129     JEDI id needed for expected rows. Failure here must not turn a successful
0130     submission into a failed submission.
0131     """
0132     if not os.path.isfile(MANAGE_PY):
0133         _log(f"WARNING: cannot sync expected inventory; manage.py not found: {MANAGE_PY}")
0134         return
0135     cmd = [
0136         sys.executable, MANAGE_PY, "sync_epicprod_inventory",
0137         "--prod-task", task_name,
0138         "--spec-file", spec_path,
0139     ]
0140     try:
0141         p = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
0142     except subprocess.TimeoutExpired:
0143         _log("WARNING: expected inventory sync timed out after 60s")
0144         return
0145     for line in (p.stdout or "").splitlines():
0146         _log(f"  inventory-sync: {line}")
0147     for line in (p.stderr or "").splitlines():
0148         _log(f"  inventory-sync: {line}")
0149     if p.returncode != 0:
0150         _log(f"WARNING: expected inventory sync failed rc={p.returncode}")
0151 
0152 
0153 def main():
0154     ap = argparse.ArgumentParser(description="Submit a PCS ProdTask to PanDA (client-API EVGEN).")
0155     ap.add_argument("--task-name", required=True, help="ProdTask.name to submit")
0156     ap.add_argument("--swf-monitor-url",
0157                     default=os.environ.get("SWF_MONITOR_URL", "").rstrip("/"),
0158                     help="swf-monitor base URL incl. /swf-monitor app path")
0159     ap.add_argument("--token",
0160                     default=os.environ.get("SWFMON_TOKEN") or os.environ.get("SWF_MONITOR_TOKEN", ""),
0161                     help="DRF token for the record-submission POST")
0162     ap.add_argument("--owner", default=os.environ.get("SWF_TASK_OWNER", ""),
0163                     help="ProdTask.created_by; sent as X-Remote-User for the owner-gated record write")
0164     ap.add_argument("--proxy",
0165                     default=os.environ.get("EVGEN_X509_PROXY", ""),
0166                     help="JLab eicprod x509 proxy shipped in the sandbox for output "
0167                          "registration (EVGEN_X509_PROXY) — the same credential the condor "
0168                          "template ships. NOT X509_USER_PROXY/longproxy-for-rucio, which is "
0169                          "the BNL Rucio metadata credential and would not write to JLab.")
0170     ap.add_argument("--auth-vo", default=DEFAULT_AUTH_VO,
0171                     help="PANDA_AUTH_VO for the submission (default EIC.production)")
0172     ap.add_argument("--pclient-setup", default=DEFAULT_PCLIENT_SETUP,
0173                     help="panda-client environment setup to source")
0174     ap.add_argument("--timeout", type=int, default=300,
0175                     help="seconds before the submission run is abandoned")
0176     args = ap.parse_args()
0177 
0178     if not args.swf_monitor_url:
0179         _log("ERROR: no --swf-monitor-url / SWF_MONITOR_URL")
0180         return 2
0181     if not args.proxy or not os.path.isfile(args.proxy):
0182         # The payload registers RECO to JLab Rucio with this proxy; without it
0183         # the job cannot write output. Fail loudly rather than submit a job
0184         # destined to fail at output.
0185         _log(f"ERROR: JLab x509 proxy not found (--proxy / EVGEN_X509_PROXY): {args.proxy!r}")
0186         return 2
0187 
0188     # 1. Fetch the EVGEN spec from PCS (single source of truth).
0189     try:
0190         raw = _api_get(args.swf_monitor_url, "/pcs/api/prod-tasks/command/",
0191                        {"name": args.task_name, "fmt": "evgen"}, args.token)
0192     except Exception as e:
0193         _log(f"ERROR: could not fetch EVGEN spec for '{args.task_name}': {e}")
0194         return 3
0195     try:
0196         spec = json.loads(raw)
0197     except ValueError:
0198         _log(f"ERROR: spec endpoint did not return JSON:\n{raw[:500]}")
0199         return 3
0200     if not spec.get('outDS') or not spec.get('csvRows'):
0201         _log(f"ERROR: incomplete EVGEN spec for '{args.task_name}': {raw[:500]}")
0202         return 3
0203     if args.owner and not spec.get('userName'):
0204         spec['userName'] = args.owner
0205     _log(f"EVGEN spec for {args.task_name}: outDS={spec['outDS']} "
0206          f"nJobs={spec.get('nJobs')} skipScout={spec.get('skipScout')}")
0207 
0208     # 2. Assemble the sandbox.
0209     try:
0210         workdir = _assemble_sandbox(spec, args.proxy, SUBMIT_TMP_ROOT)
0211     except Exception as e:
0212         _log(f"ERROR: could not assemble sandbox: {e}")
0213         return 3
0214     _log(f"sandbox: {workdir}")
0215 
0216     # 3. Run the kernel under the panda-client environment (cached OIDC token).
0217     runner = os.path.join(workdir, "run-submit.sh")
0218     with open(runner, "w") as f:
0219         f.write("#!/bin/bash\nset -e\n")
0220         f.write(f"source {args.pclient_setup}\n")
0221         f.write(f"export PANDA_AUTH_VO={args.auth_vo}\n")
0222         f.write(f"python3 {KERNEL_SCRIPT} --spec spec.json --workdir sandbox\n")
0223     try:
0224         p = subprocess.run(["bash", runner], cwd=workdir, capture_output=True,
0225                            text=True, timeout=args.timeout)
0226     except subprocess.TimeoutExpired:
0227         _log(f"ERROR: submission timed out after {args.timeout}s")
0228         return 4
0229     out = (p.stdout or "") + (p.stderr or "")
0230     for line in out.splitlines():
0231         _log(f"  evgen-submit: {line}")
0232     if p.returncode != 0:
0233         _log(f"ERROR: kernel exited rc={p.returncode}")
0234         return 5
0235 
0236     # 4. Parse the JEDI task ID.
0237     m = JEDITASKID_RE.search(out)
0238     if not m:
0239         _log("ERROR: submission succeeded but no jediTaskID in output")
0240         return 6
0241     jedi_task_id = int(m.group(1))
0242     _log(f"SUBMITTED {args.task_name} -> jediTaskID={jedi_task_id}")
0243 
0244     # 5. Record the outcome back to PCS (idempotent; retry a transient failure).
0245     last_err = None
0246     for attempt in range(1, RECORD_ATTEMPTS + 1):
0247         try:
0248             _api_post_json(args.swf_monitor_url, "/pcs/api/prod-tasks/record-submission/",
0249                            {"name": args.task_name}, {"jedi_task_id": jedi_task_id},
0250                            args.token, owner=args.owner)
0251             _log(f"recorded jediTaskID={jedi_task_id} on ProdTask {args.task_name}")
0252             _sync_expected_inventory(args.task_name, os.path.join(workdir, "spec.json"))
0253             print(jedi_task_id)
0254             return 0
0255         except Exception as e:
0256             last_err = e
0257             _log(f"record-submission POST attempt {attempt}/{RECORD_ATTEMPTS} failed: {e}")
0258             if attempt < RECORD_ATTEMPTS:
0259                 time.sleep(RECORD_BACKOFF * attempt)
0260     _log(f"WARNING: submitted (jediTaskID={jedi_task_id}) but record-submission "
0261          f"POST failed after {RECORD_ATTEMPTS} attempts: {last_err}")
0262     print(jedi_task_id)
0263     return 7
0264 
0265 
0266 if __name__ == "__main__":
0267     sys.exit(main())