File indexing completed on 2026-06-26 08:40:22
0001
0002 """
0003 Fetch and cache the payload log for one PanDA job.
0004
0005 Resolves the job's Rucio log tarball (`*.log.tgz`) to a replica PFN, copies it
0006 with `xrdcp` using the long-lived proxy, and extracts the log members into the
0007 managed scratch cache so the swf-monitor job view can serve them as text.
0008
0009 Runs as the account that can read the Rucio proxy and run xrdcp (wenauseic) —
0010 the web view only ever reads the world-readable output, never the proxy. The
0011 view/drain-worker invokes this with the log DID it already knows from
0012 `study_job` (scope + lfn + jeditaskid + pandaid).
0013
0014 See docs/EPICPROD_OPS.md.
0015
0016 Config (env, with defaults matching the bots / rucio MCP):
0017 RUCIO_URL, RUCIO_ACCOUNT, RUCIO_VO, X509_USER_PROXY,
0018 REQUESTS_CA_BUNDLE (TLS verify), SWF_TMP_DIR
0019 """
0020 import argparse
0021 import json
0022 import os
0023 import shutil
0024 import subprocess
0025 import sys
0026 import tarfile
0027
0028 RUCIO_URL = os.environ.get("RUCIO_URL", "https://nprucio01.sdcc.bnl.gov:443")
0029 RUCIO_ACCOUNT = os.environ.get("RUCIO_ACCOUNT", "panda")
0030 RUCIO_VO = os.environ.get("RUCIO_VO", "eic")
0031 X509_PROXY = os.environ.get("X509_USER_PROXY", "/data/wenauseic/longproxy-for-rucio")
0032 CA_BUNDLE = os.environ.get("REQUESTS_CA_BUNDLE") or os.environ.get("SSL_CERT_FILE") or True
0033 SWF_TMP_DIR = os.environ.get("SWF_TMP_DIR", "/data/swf-tmp")
0034 XRDCP_TIMEOUT = int(os.environ.get("XRDCP_TIMEOUT", "120"))
0035
0036
0037 KEEP = {"payload.stdout", "payload.stderr", "pilotlog.txt", "pandatracerlog.txt"}
0038
0039
0040
0041 DONE_MARKER = ".done"
0042
0043
0044 def log(msg):
0045 print(msg, file=sys.stderr, flush=True)
0046
0047
0048 def fail(msg, code=1):
0049 log(f"ERROR: {msg}")
0050 sys.exit(code)
0051
0052
0053 def _rucio_token():
0054 import requests
0055 headers = {"X-Rucio-Account": RUCIO_ACCOUNT}
0056 if RUCIO_VO:
0057 headers["X-Rucio-VO"] = RUCIO_VO
0058 r = requests.get(f"{RUCIO_URL}/auth/x509", headers=headers,
0059 cert=(X509_PROXY, X509_PROXY), verify=CA_BUNDLE, timeout=20)
0060 r.raise_for_status()
0061 tok = r.headers.get("X-Rucio-Auth-Token")
0062 if not tok:
0063 fail("no X-Rucio-Auth-Token in /auth/x509 response")
0064 return tok
0065
0066
0067 def resolve_pfn(scope, name):
0068 import requests
0069 r = requests.post(
0070 f"{RUCIO_URL}/replicas/list",
0071 headers={"X-Rucio-Auth-Token": _rucio_token(),
0072 "Content-Type": "application/json",
0073 "Accept": "application/x-json-stream"},
0074 json={"dids": [{"scope": scope, "name": name}]},
0075 verify=CA_BUNDLE, timeout=60,
0076 )
0077 r.raise_for_status()
0078 pfn, state = None, None
0079 for line in r.text.splitlines():
0080 line = line.strip()
0081 if not line:
0082 continue
0083 rec = json.loads(line)
0084 pfns = rec.get("pfns") or {}
0085 for url in pfns:
0086 if url.startswith("root://"):
0087 pfn = url
0088 break
0089 if not pfn and pfns:
0090 pfn = next(iter(pfns))
0091 states = rec.get("states") or {}
0092 state = ",".join(f"{k}={v}" for k, v in states.items())
0093 if not pfn:
0094 fail(f"no replica PFN for {scope}:{name}")
0095 log(f"replica: {pfn} [{state}]")
0096 return pfn
0097
0098
0099 def xrdcp(pfn, dest):
0100 env = dict(os.environ, X509_USER_PROXY=X509_PROXY)
0101 log(f"xrdcp -> {dest}")
0102 try:
0103 p = subprocess.run(["xrdcp", "-f", "--nopbar", pfn, dest],
0104 env=env, capture_output=True, text=True,
0105 timeout=XRDCP_TIMEOUT)
0106 except subprocess.TimeoutExpired:
0107 fail(f"xrdcp timed out after {XRDCP_TIMEOUT}s")
0108 if p.returncode != 0:
0109 fail(f"xrdcp failed (rc={p.returncode}): {p.stderr.strip()}")
0110
0111
0112 def extract(tgz, jobdir):
0113 """Extract the kept members (basename only — no path traversal) into a
0114 .partial dir, then atomically rename into place. World-readable output."""
0115 tmp = jobdir + ".partial"
0116 shutil.rmtree(tmp, ignore_errors=True)
0117 os.makedirs(tmp, exist_ok=True)
0118 kept = []
0119 with tarfile.open(tgz, "r:gz") as tf:
0120 for m in tf.getmembers():
0121 if not m.isfile():
0122 continue
0123 base = os.path.basename(m.name)
0124 if base not in KEEP:
0125 continue
0126 src = tf.extractfile(m)
0127 if src is None:
0128 continue
0129 out = os.path.join(tmp, base)
0130 with open(out, "wb") as f:
0131 shutil.copyfileobj(src, f)
0132 os.chmod(out, 0o644)
0133 kept.append(base)
0134 if not kept:
0135 shutil.rmtree(tmp, ignore_errors=True)
0136 fail("tarball contained none of the expected log members")
0137
0138
0139 with open(os.path.join(tmp, DONE_MARKER), "w") as f:
0140 f.write(",".join(sorted(kept)) + "\n")
0141 os.chmod(tmp, 0o2775)
0142 shutil.rmtree(jobdir, ignore_errors=True)
0143 os.rename(tmp, jobdir)
0144 log(f"cached: {jobdir} ({', '.join(sorted(kept))})")
0145
0146
0147 def main():
0148 ap = argparse.ArgumentParser(description="Fetch + cache a PanDA job payload log.")
0149 ap.add_argument("--scope", required=True)
0150 ap.add_argument("--lfn", required=True, help="log tarball LFN (…log.tgz)")
0151 ap.add_argument("--jeditaskid", required=True)
0152 ap.add_argument("--pandaid", required=True)
0153 ap.add_argument("--force", action="store_true", help="re-fetch even if cached")
0154 a = ap.parse_args()
0155
0156 jobdir = os.path.join(SWF_TMP_DIR, "panda-logs", str(a.jeditaskid), str(a.pandaid))
0157 if os.path.exists(os.path.join(jobdir, DONE_MARKER)) and not a.force:
0158 log(f"already cached: {jobdir}")
0159 return
0160 if not os.path.exists(X509_PROXY):
0161 fail(f"x509 proxy not found: {X509_PROXY}")
0162
0163 pfn = resolve_pfn(a.scope, a.lfn)
0164 dldir = os.path.join(SWF_TMP_DIR, "downloads")
0165 os.makedirs(dldir, exist_ok=True)
0166 tgz = os.path.join(dldir, f"{a.lfn}.{a.pandaid}")
0167 try:
0168 xrdcp(pfn, tgz)
0169 os.makedirs(os.path.dirname(jobdir), exist_ok=True)
0170 extract(tgz, jobdir)
0171 finally:
0172 try:
0173 os.remove(tgz)
0174 except OSError:
0175 pass
0176
0177
0178 if __name__ == "__main__":
0179 main()