File indexing completed on 2026-06-26 08:40:22
0001
0002 """
0003 Submit one PCS ProdTask to PanDA via the client-API EVGEN path — the
0004 credentialed "doer" behind the prod-ops agent's ``submit_evgen_task``
0005 capability, and usable standalone.
0006
0007 This is the production reproduction of the proven condor-side recipe (eic/
0008 job_submission_condor, spec only): a noInput+noOutput task whose containerized
0009 payload xrootd-streams the EVGEN input from JLab and self-registers RECO to JLab
0010 Rucio. It is the EVGEN counterpart of scripts/submit-prod-task.py (the prun
0011 doer, now sidelined); like it, PCS stays the single source of truth — the
0012 submission spec is *fetched* from the monitor's artifact endpoint
0013 (``/pcs/api/prod-tasks/command/?name=<name>&fmt=evgen``), not rebuilt here.
0014
0015 Flow:
0016 1. GET the EVGEN spec for ``--task-name`` from the monitor.
0017 2. Assemble the submission sandbox (the one-row-per-job CSV manifest, the
0018 ``environment-*.sh`` the payload sources, the in-job dispatcher, and the
0019 JLab x509 proxy the payload uses to register output).
0020 3. Run the submission kernel (scripts/evgen_panda_submit.py) in a shell that
0021 has sourced the panda-client environment, reusing the cached OIDC token
0022 (never deleting it, which would force an interactive device flow).
0023 4. Parse ``jediTaskID=<N>`` and POST it to
0024 ``/pcs/api/prod-tasks/record-submission/`` so the ProdTask records its
0025 panda_task_id and flips to 'submitted'.
0026 5. Best-effort: write expected output inventory from the exact submitted spec.
0027
0028 Every failure is surfaced (stderr + non-zero exit); nothing is swallowed. Exit
0029 codes match submit-prod-task.py so the agent handler treats both doers alike:
0030 0 success, 7 submitted-but-unrecorded (idempotent re-record), other non-zero
0031 failure.
0032
0033 Standalone:
0034 python scripts/submit-evgen-task.py --task-name <ProdTask.name> --proxy <x509>
0035 """
0036 import argparse
0037 import json
0038 import os
0039 import re
0040 import shutil
0041 import subprocess
0042 import sys
0043 import tempfile
0044 import time
0045 import urllib.parse
0046 import urllib.request
0047
0048 RECORD_ATTEMPTS = 3
0049 RECORD_BACKOFF = 2
0050
0051 DEFAULT_PCLIENT_SETUP = os.path.expanduser("~/pclient/run/setup.sh")
0052 DEFAULT_AUTH_VO = "EIC.production"
0053 SUBMIT_TMP_ROOT = os.path.join(os.environ.get("SWF_TMP_DIR", "/data/swf-tmp"), "submit-evgen")
0054
0055
0056 HERE = os.path.dirname(os.path.abspath(__file__))
0057 KERNEL_SCRIPT = os.path.join(HERE, "evgen_panda_submit.py")
0058 DISPATCHER_SCRIPT = os.path.join(HERE, "evgen_job_dispatcher.py")
0059 MANAGE_PY = os.path.join(os.path.dirname(HERE), "src", "manage.py")
0060
0061 JEDITASKID_RE = re.compile(r"jediTaskID=(\d+)")
0062
0063
0064 def _log(msg):
0065 print(msg, file=sys.stderr, flush=True)
0066
0067
0068 def _api_get(base, path, query, token):
0069 url = f"{base.rstrip('/')}{path}?{urllib.parse.urlencode(query)}"
0070 req = urllib.request.Request(url)
0071 if token:
0072 req.add_header("Authorization", f"Token {token}")
0073 with urllib.request.urlopen(req, timeout=30) as r:
0074 return r.read().decode()
0075
0076
0077 def _api_post_json(base, path, query, body, token, owner=None):
0078 url = f"{base.rstrip('/')}{path}?{urllib.parse.urlencode(query)}"
0079 data = json.dumps(body).encode()
0080 req = urllib.request.Request(url, data=data, method="POST")
0081 req.add_header("Content-Type", "application/json")
0082 if owner:
0083 req.add_header("X-Remote-User", owner)
0084 if token:
0085 req.add_header("Authorization", f"Token {token}")
0086 with urllib.request.urlopen(req, timeout=30) as r:
0087 return r.read().decode()
0088
0089
0090 def _assemble_sandbox(spec, proxy_path, root):
0091 """Build the submission dir: spec.json + a sandbox/ holding the worker-facing
0092 files (manifest, env, dispatcher, proxy). Returns the submission dir path.
0093
0094 The kernel and spec live OUTSIDE sandbox/ so they are not shipped to the
0095 worker; only the four worker files travel in the tarball.
0096 """
0097 os.makedirs(root, exist_ok=True)
0098 workdir = tempfile.mkdtemp(prefix=f"{spec['csvBase']}.", dir=root)
0099 sandbox = os.path.join(workdir, "sandbox")
0100 os.makedirs(sandbox)
0101
0102 csv_base = spec['csvBase']
0103
0104 with open(os.path.join(sandbox, f"{csv_base}.csv"), "w") as f:
0105 f.write("\n".join(spec['csvRows']) + "\n")
0106
0107
0108
0109 proxy_base = os.path.basename(proxy_path)
0110 env = dict(spec.get('env') or {})
0111 env['X509_USER_PROXY'] = proxy_base
0112 with open(os.path.join(sandbox, f"environment-{csv_base}.sh"), "w") as f:
0113 for k, v in env.items():
0114 f.write(f'export {k}={v}\n')
0115
0116
0117 shutil.copy(DISPATCHER_SCRIPT, os.path.join(sandbox, "evgen_job_dispatcher.py"))
0118 shutil.copy(proxy_path, os.path.join(sandbox, proxy_base))
0119
0120 with open(os.path.join(workdir, "spec.json"), "w") as f:
0121 json.dump(spec, f, indent=2)
0122 return workdir
0123
0124
0125 def _sync_expected_inventory(task_name, spec_path):
0126 """Best-effort local DB update for expected file inventory.
0127
0128 This runs only after record-submission succeeds, so the PCS task has the
0129 JEDI id needed for expected rows. Failure here must not turn a successful
0130 submission into a failed submission.
0131 """
0132 if not os.path.isfile(MANAGE_PY):
0133 _log(f"WARNING: cannot sync expected inventory; manage.py not found: {MANAGE_PY}")
0134 return
0135 cmd = [
0136 sys.executable, MANAGE_PY, "sync_epicprod_inventory",
0137 "--prod-task", task_name,
0138 "--spec-file", spec_path,
0139 ]
0140 try:
0141 p = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
0142 except subprocess.TimeoutExpired:
0143 _log("WARNING: expected inventory sync timed out after 60s")
0144 return
0145 for line in (p.stdout or "").splitlines():
0146 _log(f" inventory-sync: {line}")
0147 for line in (p.stderr or "").splitlines():
0148 _log(f" inventory-sync: {line}")
0149 if p.returncode != 0:
0150 _log(f"WARNING: expected inventory sync failed rc={p.returncode}")
0151
0152
0153 def main():
0154 ap = argparse.ArgumentParser(description="Submit a PCS ProdTask to PanDA (client-API EVGEN).")
0155 ap.add_argument("--task-name", required=True, help="ProdTask.name to submit")
0156 ap.add_argument("--swf-monitor-url",
0157 default=os.environ.get("SWF_MONITOR_URL", "").rstrip("/"),
0158 help="swf-monitor base URL incl. /swf-monitor app path")
0159 ap.add_argument("--token",
0160 default=os.environ.get("SWFMON_TOKEN") or os.environ.get("SWF_MONITOR_TOKEN", ""),
0161 help="DRF token for the record-submission POST")
0162 ap.add_argument("--owner", default=os.environ.get("SWF_TASK_OWNER", ""),
0163 help="ProdTask.created_by; sent as X-Remote-User for the owner-gated record write")
0164 ap.add_argument("--proxy",
0165 default=os.environ.get("EVGEN_X509_PROXY", ""),
0166 help="JLab eicprod x509 proxy shipped in the sandbox for output "
0167 "registration (EVGEN_X509_PROXY) — the same credential the condor "
0168 "template ships. NOT X509_USER_PROXY/longproxy-for-rucio, which is "
0169 "the BNL Rucio metadata credential and would not write to JLab.")
0170 ap.add_argument("--auth-vo", default=DEFAULT_AUTH_VO,
0171 help="PANDA_AUTH_VO for the submission (default EIC.production)")
0172 ap.add_argument("--pclient-setup", default=DEFAULT_PCLIENT_SETUP,
0173 help="panda-client environment setup to source")
0174 ap.add_argument("--timeout", type=int, default=300,
0175 help="seconds before the submission run is abandoned")
0176 args = ap.parse_args()
0177
0178 if not args.swf_monitor_url:
0179 _log("ERROR: no --swf-monitor-url / SWF_MONITOR_URL")
0180 return 2
0181 if not args.proxy or not os.path.isfile(args.proxy):
0182
0183
0184
0185 _log(f"ERROR: JLab x509 proxy not found (--proxy / EVGEN_X509_PROXY): {args.proxy!r}")
0186 return 2
0187
0188
0189 try:
0190 raw = _api_get(args.swf_monitor_url, "/pcs/api/prod-tasks/command/",
0191 {"name": args.task_name, "fmt": "evgen"}, args.token)
0192 except Exception as e:
0193 _log(f"ERROR: could not fetch EVGEN spec for '{args.task_name}': {e}")
0194 return 3
0195 try:
0196 spec = json.loads(raw)
0197 except ValueError:
0198 _log(f"ERROR: spec endpoint did not return JSON:\n{raw[:500]}")
0199 return 3
0200 if not spec.get('outDS') or not spec.get('csvRows'):
0201 _log(f"ERROR: incomplete EVGEN spec for '{args.task_name}': {raw[:500]}")
0202 return 3
0203 if args.owner and not spec.get('userName'):
0204 spec['userName'] = args.owner
0205 _log(f"EVGEN spec for {args.task_name}: outDS={spec['outDS']} "
0206 f"nJobs={spec.get('nJobs')} skipScout={spec.get('skipScout')}")
0207
0208
0209 try:
0210 workdir = _assemble_sandbox(spec, args.proxy, SUBMIT_TMP_ROOT)
0211 except Exception as e:
0212 _log(f"ERROR: could not assemble sandbox: {e}")
0213 return 3
0214 _log(f"sandbox: {workdir}")
0215
0216
0217 runner = os.path.join(workdir, "run-submit.sh")
0218 with open(runner, "w") as f:
0219 f.write("#!/bin/bash\nset -e\n")
0220 f.write(f"source {args.pclient_setup}\n")
0221 f.write(f"export PANDA_AUTH_VO={args.auth_vo}\n")
0222 f.write(f"python3 {KERNEL_SCRIPT} --spec spec.json --workdir sandbox\n")
0223 try:
0224 p = subprocess.run(["bash", runner], cwd=workdir, capture_output=True,
0225 text=True, timeout=args.timeout)
0226 except subprocess.TimeoutExpired:
0227 _log(f"ERROR: submission timed out after {args.timeout}s")
0228 return 4
0229 out = (p.stdout or "") + (p.stderr or "")
0230 for line in out.splitlines():
0231 _log(f" evgen-submit: {line}")
0232 if p.returncode != 0:
0233 _log(f"ERROR: kernel exited rc={p.returncode}")
0234 return 5
0235
0236
0237 m = JEDITASKID_RE.search(out)
0238 if not m:
0239 _log("ERROR: submission succeeded but no jediTaskID in output")
0240 return 6
0241 jedi_task_id = int(m.group(1))
0242 _log(f"SUBMITTED {args.task_name} -> jediTaskID={jedi_task_id}")
0243
0244
0245 last_err = None
0246 for attempt in range(1, RECORD_ATTEMPTS + 1):
0247 try:
0248 _api_post_json(args.swf_monitor_url, "/pcs/api/prod-tasks/record-submission/",
0249 {"name": args.task_name}, {"jedi_task_id": jedi_task_id},
0250 args.token, owner=args.owner)
0251 _log(f"recorded jediTaskID={jedi_task_id} on ProdTask {args.task_name}")
0252 _sync_expected_inventory(args.task_name, os.path.join(workdir, "spec.json"))
0253 print(jedi_task_id)
0254 return 0
0255 except Exception as e:
0256 last_err = e
0257 _log(f"record-submission POST attempt {attempt}/{RECORD_ATTEMPTS} failed: {e}")
0258 if attempt < RECORD_ATTEMPTS:
0259 time.sleep(RECORD_BACKOFF * attempt)
0260 _log(f"WARNING: submitted (jediTaskID={jedi_task_id}) but record-submission "
0261 f"POST failed after {RECORD_ATTEMPTS} attempts: {last_err}")
0262 print(jedi_task_id)
0263 return 7
0264
0265
0266 if __name__ == "__main__":
0267 sys.exit(main())