Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:22

0001 #!/usr/bin/env python3
0002 """
0003 Fetch and cache the payload log for one PanDA job.
0004 
0005 Resolves the job's Rucio log tarball (`*.log.tgz`) to a replica PFN, copies it
0006 with `xrdcp` using the long-lived proxy, and extracts the log members into the
0007 managed scratch cache so the swf-monitor job view can serve them as text.
0008 
0009 Runs as the account that can read the Rucio proxy and run xrdcp (wenauseic) —
0010 the web view only ever reads the world-readable output, never the proxy. The
0011 view/drain-worker invokes this with the log DID it already knows from
0012 `study_job` (scope + lfn + jeditaskid + pandaid).
0013 
0014 See docs/EPICPROD_OPS.md.
0015 
0016 Config (env, with defaults matching the bots / rucio MCP):
0017   RUCIO_URL, RUCIO_ACCOUNT, RUCIO_VO, X509_USER_PROXY,
0018   REQUESTS_CA_BUNDLE (TLS verify), SWF_TMP_DIR
0019 """
0020 import argparse
0021 import json
0022 import os
0023 import shutil
0024 import subprocess
0025 import sys
0026 import tarfile
0027 
0028 RUCIO_URL = os.environ.get("RUCIO_URL", "https://nprucio01.sdcc.bnl.gov:443")
0029 RUCIO_ACCOUNT = os.environ.get("RUCIO_ACCOUNT", "panda")
0030 RUCIO_VO = os.environ.get("RUCIO_VO", "eic")
0031 X509_PROXY = os.environ.get("X509_USER_PROXY", "/data/wenauseic/longproxy-for-rucio")
0032 CA_BUNDLE = os.environ.get("REQUESTS_CA_BUNDLE") or os.environ.get("SSL_CERT_FILE") or True
0033 SWF_TMP_DIR = os.environ.get("SWF_TMP_DIR", "/data/swf-tmp")
0034 XRDCP_TIMEOUT = int(os.environ.get("XRDCP_TIMEOUT", "120"))
0035 
0036 # Log members worth caching for the operator-facing view.
0037 KEEP = {"payload.stdout", "payload.stderr", "pilotlog.txt", "pandatracerlog.txt"}
0038 
0039 # Success sentinel written last, inside the cache dir. Hit/skip checks key on
0040 # this — NOT on any single member (a log may legitimately lack payload.stdout).
0041 DONE_MARKER = ".done"
0042 
0043 
0044 def log(msg):
0045     print(msg, file=sys.stderr, flush=True)
0046 
0047 
0048 def fail(msg, code=1):
0049     log(f"ERROR: {msg}")
0050     sys.exit(code)
0051 
0052 
0053 def _rucio_token():
0054     import requests
0055     headers = {"X-Rucio-Account": RUCIO_ACCOUNT}
0056     if RUCIO_VO:
0057         headers["X-Rucio-VO"] = RUCIO_VO
0058     r = requests.get(f"{RUCIO_URL}/auth/x509", headers=headers,
0059                      cert=(X509_PROXY, X509_PROXY), verify=CA_BUNDLE, timeout=20)
0060     r.raise_for_status()
0061     tok = r.headers.get("X-Rucio-Auth-Token")
0062     if not tok:
0063         fail("no X-Rucio-Auth-Token in /auth/x509 response")
0064     return tok
0065 
0066 
0067 def resolve_pfn(scope, name):
0068     import requests
0069     r = requests.post(
0070         f"{RUCIO_URL}/replicas/list",
0071         headers={"X-Rucio-Auth-Token": _rucio_token(),
0072                  "Content-Type": "application/json",
0073                  "Accept": "application/x-json-stream"},
0074         json={"dids": [{"scope": scope, "name": name}]},
0075         verify=CA_BUNDLE, timeout=60,
0076     )
0077     r.raise_for_status()
0078     pfn, state = None, None
0079     for line in r.text.splitlines():
0080         line = line.strip()
0081         if not line:
0082             continue
0083         rec = json.loads(line)
0084         pfns = rec.get("pfns") or {}
0085         for url in pfns:                       # prefer the xrootd door
0086             if url.startswith("root://"):
0087                 pfn = url
0088                 break
0089         if not pfn and pfns:
0090             pfn = next(iter(pfns))
0091         states = rec.get("states") or {}
0092         state = ",".join(f"{k}={v}" for k, v in states.items())
0093     if not pfn:
0094         fail(f"no replica PFN for {scope}:{name}")
0095     log(f"replica: {pfn}  [{state}]")
0096     return pfn
0097 
0098 
0099 def xrdcp(pfn, dest):
0100     env = dict(os.environ, X509_USER_PROXY=X509_PROXY)
0101     log(f"xrdcp -> {dest}")
0102     try:
0103         p = subprocess.run(["xrdcp", "-f", "--nopbar", pfn, dest],
0104                            env=env, capture_output=True, text=True,
0105                            timeout=XRDCP_TIMEOUT)
0106     except subprocess.TimeoutExpired:
0107         fail(f"xrdcp timed out after {XRDCP_TIMEOUT}s")
0108     if p.returncode != 0:
0109         fail(f"xrdcp failed (rc={p.returncode}): {p.stderr.strip()}")
0110 
0111 
0112 def extract(tgz, jobdir):
0113     """Extract the kept members (basename only — no path traversal) into a
0114     .partial dir, then atomically rename into place. World-readable output."""
0115     tmp = jobdir + ".partial"
0116     shutil.rmtree(tmp, ignore_errors=True)
0117     os.makedirs(tmp, exist_ok=True)
0118     kept = []
0119     with tarfile.open(tgz, "r:gz") as tf:
0120         for m in tf.getmembers():
0121             if not m.isfile():
0122                 continue
0123             base = os.path.basename(m.name)
0124             if base not in KEEP:
0125                 continue
0126             src = tf.extractfile(m)
0127             if src is None:
0128                 continue
0129             out = os.path.join(tmp, base)
0130             with open(out, "wb") as f:
0131                 shutil.copyfileobj(src, f)
0132             os.chmod(out, 0o644)
0133             kept.append(base)
0134     if not kept:
0135         shutil.rmtree(tmp, ignore_errors=True)
0136         fail("tarball contained none of the expected log members")
0137     # Write the success sentinel last, before the atomic rename, so a cache dir
0138     # is only ever "hit" once fully populated. Records the kept members.
0139     with open(os.path.join(tmp, DONE_MARKER), "w") as f:
0140         f.write(",".join(sorted(kept)) + "\n")
0141     os.chmod(tmp, 0o2775)
0142     shutil.rmtree(jobdir, ignore_errors=True)
0143     os.rename(tmp, jobdir)
0144     log(f"cached: {jobdir}  ({', '.join(sorted(kept))})")
0145 
0146 
0147 def main():
0148     ap = argparse.ArgumentParser(description="Fetch + cache a PanDA job payload log.")
0149     ap.add_argument("--scope", required=True)
0150     ap.add_argument("--lfn", required=True, help="log tarball LFN (…log.tgz)")
0151     ap.add_argument("--jeditaskid", required=True)
0152     ap.add_argument("--pandaid", required=True)
0153     ap.add_argument("--force", action="store_true", help="re-fetch even if cached")
0154     a = ap.parse_args()
0155 
0156     jobdir = os.path.join(SWF_TMP_DIR, "panda-logs", str(a.jeditaskid), str(a.pandaid))
0157     if os.path.exists(os.path.join(jobdir, DONE_MARKER)) and not a.force:
0158         log(f"already cached: {jobdir}")
0159         return
0160     if not os.path.exists(X509_PROXY):
0161         fail(f"x509 proxy not found: {X509_PROXY}")
0162 
0163     pfn = resolve_pfn(a.scope, a.lfn)
0164     dldir = os.path.join(SWF_TMP_DIR, "downloads")
0165     os.makedirs(dldir, exist_ok=True)
0166     tgz = os.path.join(dldir, f"{a.lfn}.{a.pandaid}")
0167     try:
0168         xrdcp(pfn, tgz)
0169         os.makedirs(os.path.dirname(jobdir), exist_ok=True)
0170         extract(tgz, jobdir)
0171     finally:
0172         try:
0173             os.remove(tgz)
0174         except OSError:
0175             pass
0176 
0177 
0178 if __name__ == "__main__":
0179     main()