Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:22

0001 #!/usr/bin/env python3
0002 """
0003 Cleaner-killer for the ePIC production-operations agent (a system singleton).
0004 
0005 The prod-ops agent subscribes to the anycast queue /queue/epicprod.ops, so a
0006 second instance anywhere would *steal* requests. systemd keeps exactly one
0007 instance under its own management, but it cannot see a process that is alive yet
0008 wedged, nor a stray agent started outside the unit. This cron job closes both
0009 gaps. Run order matters:
0010 
0011   1. Reap duplicates — keep only the systemd-managed instance: host-gated
0012      SIGKILL of any other live PRODOPS agent, found by its registry-saved
0013      pid+hostname (the same source the MCP swf_kill_agent uses — that is the
0014      interactive, per-dev shape; this is the autonomous, system-singleton shape).
0015      Never a process-name match.
0016   2. Liveness — with duplicates gone, ping the one remaining agent over the bus
0017      (health_ping -> pong). For a messaging service the message path *is* the
0018      health, so a process that is alive but not consuming still fails this. No
0019      pong -> restart the unit (a failed restart is an alarm).
0020 
0021 Optional --prune-days N removes cached job logs older than N days (reclaimable;
0022 a miss just re-fetches).
0023 
0024 Standalone, no Django. MQ config from the same env vars BaseAgent uses
0025 (ACTIVEMQ_*). Intended for cron. See docs/EPICPROD_OPS.md.
0026 """
0027 import argparse
0028 import json
0029 import logging
0030 import os
0031 import shutil
0032 import signal
0033 import socket
0034 import subprocess
0035 import sys
0036 import time
0037 from datetime import datetime, timezone
0038 
0039 import requests
0040 
0041 UNIT = os.environ.get("EPICPROD_OPS_UNIT", "epicprod-ops-agent")
0042 OPS_QUEUE = os.environ.get("EPICPROD_OPS_QUEUE", "/queue/epicprod.ops")
0043 SWF_TMP_DIR = os.environ.get("SWF_TMP_DIR", "/data/swf-tmp")
0044 PING_TIMEOUT = int(os.environ.get("EPICPROD_PING_TIMEOUT", "10"))
0045 # Monitor registry — the authoritative source of each agent's saved pid+hostname.
0046 # SWF_MONITOR_URL comes from production.env (the truth), already carrying the
0047 # /swf-monitor app path — used as-is, no appending.
0048 MONITOR_URL = os.environ.get("SWF_MONITOR_URL", "").rstrip("/")
0049 API_TOKEN = os.environ.get("SWF_API_TOKEN", "")
0050 CA_BUNDLE = os.environ.get("REQUESTS_CA_BUNDLE") or True
0051 HEARTBEAT_FRESH_SEC = int(os.environ.get("EPICPROD_HEARTBEAT_FRESH_SEC", "120"))
0052 # Sentinel the agent exits with on a deliberate bus 'shutdown' (must match
0053 # EXIT_DELIBERATE in epicprod_ops_agent.py and RestartPreventExitStatus in the
0054 # unit). A unit whose last main exit was this is down on purpose — don't restart.
0055 EXIT_DELIBERATE = int(os.environ.get("EPICPROD_EXIT_DELIBERATE", "100"))
0056 
0057 log = logging.getLogger("prodops-cleaner-killer")
0058 
0059 
0060 def _run(cmd):
0061     return subprocess.run(cmd, capture_output=True, text=True)
0062 
0063 
0064 # -- 1. reap duplicates ------------------------------------------------------
0065 
0066 def systemd_main_pid():
0067     """PID of the systemd-managed agent, or None if not running / unknown."""
0068     p = _run(["systemctl", "show", "-p", "MainPID", "--value", UNIT])
0069     if p.returncode != 0:
0070         log.error(f"systemctl show '{UNIT}' failed rc={p.returncode}: {p.stderr.strip()}")
0071         return None
0072     try:
0073         pid = int(p.stdout.strip())
0074     except ValueError:
0075         return None
0076     return pid or None
0077 
0078 
0079 def _heartbeat_fresh(ts):
0080     """True if the registry heartbeat is recent enough that the process is really
0081     alive — so we never SIGKILL a pid that a dead agent left behind (and the OS
0082     may have reused)."""
0083     if not ts:
0084         return False
0085     try:
0086         t = datetime.fromisoformat(ts)
0087     except ValueError:
0088         return False
0089     if t.tzinfo is None:
0090         t = t.replace(tzinfo=timezone.utc)
0091     return (datetime.now(timezone.utc) - t).total_seconds() <= HEARTBEAT_FRESH_SEC
0092 
0093 
0094 def registry_prodops():
0095     """Live PRODOPS records — each carries the pid+hostname the agent saved to the
0096     monitor on registration. Returns None if the registry can't be read."""
0097     if not MONITOR_URL or not API_TOKEN:
0098         log.error("reap: SWF_MONITOR_URL / SWF_API_TOKEN not set — cannot read registry")
0099         return None
0100     r = requests.get(f"{MONITOR_URL}/api/systemagents/",
0101                      headers={"Authorization": f"Token {API_TOKEN}"},
0102                      verify=CA_BUNDLE, timeout=20)
0103     r.raise_for_status()
0104     data = r.json()
0105     items = data.get("results", data) if isinstance(data, dict) else data
0106     return [a for a in items if a.get("agent_type") == "PRODOPS"]
0107 
0108 
0109 def reap_duplicates():
0110     """Keep only the systemd-managed instance alive. Find PRODOPS duplicates by
0111     their registry-saved pid (what swf_kill_agent uses), not a process-name match;
0112     host-gated SIGKILL of any live one whose pid is not the systemd MainPID."""
0113     main = systemd_main_pid()
0114     if main is None:
0115         log.warning(f"unit '{UNIT}' MainPID unknown — not reaping (letting systemd settle)")
0116         return
0117     try:
0118         agents = registry_prodops()
0119     except Exception as e:
0120         log.error(f"reap: registry read failed: {e}")
0121         return
0122     if agents is None:
0123         return
0124     this_host = socket.gethostname()
0125     killed = 0
0126     for a in agents:
0127         pid, host = a.get("pid"), a.get("hostname")
0128         if not pid or a.get("operational_state") == "EXITED":
0129             continue
0130         if not _heartbeat_fresh(a.get("last_heartbeat")):
0131             continue                                   # stale: process gone, leave the pid alone
0132         if pid == main:
0133             continue                                   # the systemd-managed instance
0134         if host != this_host:
0135             log.warning(f"reap: live PRODOPS '{a.get('instance_name')}' on {host} pid={pid} — "
0136                         f"cannot kill remotely; it may be stealing from the ops queue (ALARM)")
0137             continue
0138         try:
0139             os.kill(pid, signal.SIGKILL)
0140             killed += 1
0141             log.warning(f"REAPED duplicate PRODOPS '{a.get('instance_name')}' pid={pid} "
0142                         f"(registry-saved; systemd main={main})")
0143         except ProcessLookupError:
0144             log.info(f"reap: '{a.get('instance_name')}' pid={pid} already gone")
0145         except PermissionError:
0146             log.error(f"reap: cannot kill pid={pid}: permission denied")
0147     if not killed:
0148         log.info(f"reap: systemd main pid {main} is the sole live PRODOPS on {this_host}")
0149 
0150 
0151 # -- 2. liveness -------------------------------------------------------------
0152 
0153 def liveness():
0154     try:
0155         import stomp
0156     except ImportError:
0157         log.error("stomp.py not available — cannot run liveness check")
0158         return
0159 
0160     host = os.environ.get("ACTIVEMQ_HOST", "localhost")
0161     port = int(os.environ.get("ACTIVEMQ_PORT", "61612"))
0162     user = os.environ.get("ACTIVEMQ_USER", "admin")
0163     password = os.environ.get("ACTIVEMQ_PASSWORD", "admin")
0164     use_ssl = os.environ.get("ACTIVEMQ_USE_SSL", "False").lower() == "true"
0165     ca_certs = os.environ.get("ACTIVEMQ_SSL_CA_CERTS", "")
0166     reply_q = f"/queue/epicprod.ops.pong.{socket.gethostname()}.{os.getpid()}"
0167 
0168     got = {"pong": False}
0169 
0170     class _Listener(stomp.ConnectionListener):
0171         def on_message(self, frame):
0172             got["pong"] = True
0173 
0174     # Mirror BaseAgent's connection footprint.
0175     conn = stomp.Connection(
0176         host_and_ports=[(host, port)], vhost=host,
0177         try_loopback_connect=False, heartbeats=(30000, 30000),
0178         auto_content_length=False,
0179     )
0180     if use_ssl and ca_certs:
0181         import ssl
0182         conn.transport.set_ssl(for_hosts=[(host, port)], ca_certs=ca_certs,
0183                                ssl_version=ssl.PROTOCOL_TLS_CLIENT)
0184     conn.set_listener("", _Listener())
0185     try:
0186         conn.connect(user, password, wait=True)
0187         conn.subscribe(destination=reply_q, id="pong", ack="auto")
0188         conn.send(destination=OPS_QUEUE,
0189                   body=json.dumps({"msg_type": "health_ping",
0190                                    "namespace": "prodops", "reply_to": reply_q}))
0191     except Exception as e:
0192         log.error(f"liveness: MQ connect/send failed: {e}")
0193         return
0194 
0195     deadline = time.time() + PING_TIMEOUT
0196     while time.time() < deadline and not got["pong"]:
0197         time.sleep(0.2)
0198     try:
0199         conn.disconnect()
0200     except Exception:
0201         pass
0202 
0203     if got["pong"]:
0204         log.info("liveness: pong received — agent healthy")
0205         return
0206     # No pong — but a deliberate shutdown (sentinel EXIT_DELIBERATE) means the
0207     # singleton is *meant* to be down; don't fight it.
0208     st = _run(["systemctl", "show", "-p", "ExecMainStatus", "--value", UNIT])
0209     if st.stdout.strip() == str(EXIT_DELIBERATE):
0210         log.info(f"liveness: no pong, but '{UNIT}' exited deliberately "
0211                  f"(code {EXIT_DELIBERATE}) — leaving it stopped")
0212         return
0213     log.warning(f"liveness: no pong in {PING_TIMEOUT}s — restarting unit '{UNIT}'")
0214     # reset-failed first: a unit that tripped the start-limit burst sits in
0215     # 'failed' and a bare restart is refused until the rate-limit state is cleared.
0216     _run(["sudo", "systemctl", "reset-failed", UNIT])
0217     r = _run(["sudo", "systemctl", "restart", UNIT])
0218     if r.returncode != 0:
0219         log.error(f"ALARM: restart of '{UNIT}' FAILED rc={r.returncode}: {r.stderr.strip()}")
0220     else:
0221         log.warning(f"restarted '{UNIT}'")
0222 
0223 
0224 # -- 3. prune ----------------------------------------------------------------
0225 
0226 def prune(days):
0227     root = os.path.join(SWF_TMP_DIR, "panda-logs")
0228     if not os.path.isdir(root):
0229         return
0230     cutoff = time.time() - days * 86400
0231     removed = 0
0232     for task in os.listdir(root):
0233         tdir = os.path.join(root, task)
0234         if not os.path.isdir(tdir):
0235             continue
0236         for job in os.listdir(tdir):
0237             jdir = os.path.join(tdir, job)
0238             try:
0239                 if os.path.isdir(jdir) and os.path.getmtime(jdir) < cutoff:
0240                     shutil.rmtree(jdir, ignore_errors=True)
0241                     removed += 1
0242             except OSError as e:
0243                 log.error(f"prune: {jdir}: {e}")
0244     log.info(f"prune: removed {removed} cached job-log dir(s) older than {days}d")
0245 
0246 
0247 def main():
0248     logging.basicConfig(
0249         level=logging.INFO,
0250         format="%(asctime)s prodops-cleaner-killer %(levelname)s %(message)s",
0251         stream=sys.stderr,
0252     )
0253     ap = argparse.ArgumentParser(
0254         description="Reap duplicate prod-ops agents and keep the singleton alive.")
0255     ap.add_argument("--prune-days", type=int, default=None,
0256                     help="also remove cached job logs older than N days")
0257     ap.add_argument("--no-liveness", action="store_true",
0258                     help="skip the MQ ping / restart (reap [+ prune] only)")
0259     a = ap.parse_args()
0260 
0261     reap_duplicates()
0262     if not a.no_liveness:
0263         liveness()
0264     if a.prune_days is not None:
0265         prune(a.prune_days)
0266 
0267 
0268 if __name__ == "__main__":
0269     main()