File indexing completed on 2026-06-26 08:40:22
0001
0002 """
0003 Cleaner-killer for the ePIC production-operations agent (a system singleton).
0004
0005 The prod-ops agent subscribes to the anycast queue /queue/epicprod.ops, so a
0006 second instance anywhere would *steal* requests. systemd keeps exactly one
0007 instance under its own management, but it cannot see a process that is alive yet
0008 wedged, nor a stray agent started outside the unit. This cron job closes both
0009 gaps. Run order matters:
0010
0011 1. Reap duplicates — keep only the systemd-managed instance: host-gated
0012 SIGKILL of any other live PRODOPS agent, found by its registry-saved
0013 pid+hostname (the same source the MCP swf_kill_agent uses — that is the
0014 interactive, per-dev shape; this is the autonomous, system-singleton shape).
0015 Never a process-name match.
0016 2. Liveness — with duplicates gone, ping the one remaining agent over the bus
0017 (health_ping -> pong). For a messaging service the message path *is* the
0018 health, so a process that is alive but not consuming still fails this. No
0019 pong -> restart the unit (a failed restart is an alarm).
0020
0021 Optional --prune-days N removes cached job logs older than N days (reclaimable;
0022 a miss just re-fetches).
0023
0024 Standalone, no Django. MQ config from the same env vars BaseAgent uses
0025 (ACTIVEMQ_*). Intended for cron. See docs/EPICPROD_OPS.md.
0026 """
0027 import argparse
0028 import json
0029 import logging
0030 import os
0031 import shutil
0032 import signal
0033 import socket
0034 import subprocess
0035 import sys
0036 import time
0037 from datetime import datetime, timezone
0038
0039 import requests
0040
0041 UNIT = os.environ.get("EPICPROD_OPS_UNIT", "epicprod-ops-agent")
0042 OPS_QUEUE = os.environ.get("EPICPROD_OPS_QUEUE", "/queue/epicprod.ops")
0043 SWF_TMP_DIR = os.environ.get("SWF_TMP_DIR", "/data/swf-tmp")
0044 PING_TIMEOUT = int(os.environ.get("EPICPROD_PING_TIMEOUT", "10"))
0045
0046
0047
0048 MONITOR_URL = os.environ.get("SWF_MONITOR_URL", "").rstrip("/")
0049 API_TOKEN = os.environ.get("SWF_API_TOKEN", "")
0050 CA_BUNDLE = os.environ.get("REQUESTS_CA_BUNDLE") or True
0051 HEARTBEAT_FRESH_SEC = int(os.environ.get("EPICPROD_HEARTBEAT_FRESH_SEC", "120"))
0052
0053
0054
0055 EXIT_DELIBERATE = int(os.environ.get("EPICPROD_EXIT_DELIBERATE", "100"))
0056
0057 log = logging.getLogger("prodops-cleaner-killer")
0058
0059
0060 def _run(cmd):
0061 return subprocess.run(cmd, capture_output=True, text=True)
0062
0063
0064
0065
0066 def systemd_main_pid():
0067 """PID of the systemd-managed agent, or None if not running / unknown."""
0068 p = _run(["systemctl", "show", "-p", "MainPID", "--value", UNIT])
0069 if p.returncode != 0:
0070 log.error(f"systemctl show '{UNIT}' failed rc={p.returncode}: {p.stderr.strip()}")
0071 return None
0072 try:
0073 pid = int(p.stdout.strip())
0074 except ValueError:
0075 return None
0076 return pid or None
0077
0078
0079 def _heartbeat_fresh(ts):
0080 """True if the registry heartbeat is recent enough that the process is really
0081 alive — so we never SIGKILL a pid that a dead agent left behind (and the OS
0082 may have reused)."""
0083 if not ts:
0084 return False
0085 try:
0086 t = datetime.fromisoformat(ts)
0087 except ValueError:
0088 return False
0089 if t.tzinfo is None:
0090 t = t.replace(tzinfo=timezone.utc)
0091 return (datetime.now(timezone.utc) - t).total_seconds() <= HEARTBEAT_FRESH_SEC
0092
0093
0094 def registry_prodops():
0095 """Live PRODOPS records — each carries the pid+hostname the agent saved to the
0096 monitor on registration. Returns None if the registry can't be read."""
0097 if not MONITOR_URL or not API_TOKEN:
0098 log.error("reap: SWF_MONITOR_URL / SWF_API_TOKEN not set — cannot read registry")
0099 return None
0100 r = requests.get(f"{MONITOR_URL}/api/systemagents/",
0101 headers={"Authorization": f"Token {API_TOKEN}"},
0102 verify=CA_BUNDLE, timeout=20)
0103 r.raise_for_status()
0104 data = r.json()
0105 items = data.get("results", data) if isinstance(data, dict) else data
0106 return [a for a in items if a.get("agent_type") == "PRODOPS"]
0107
0108
0109 def reap_duplicates():
0110 """Keep only the systemd-managed instance alive. Find PRODOPS duplicates by
0111 their registry-saved pid (what swf_kill_agent uses), not a process-name match;
0112 host-gated SIGKILL of any live one whose pid is not the systemd MainPID."""
0113 main = systemd_main_pid()
0114 if main is None:
0115 log.warning(f"unit '{UNIT}' MainPID unknown — not reaping (letting systemd settle)")
0116 return
0117 try:
0118 agents = registry_prodops()
0119 except Exception as e:
0120 log.error(f"reap: registry read failed: {e}")
0121 return
0122 if agents is None:
0123 return
0124 this_host = socket.gethostname()
0125 killed = 0
0126 for a in agents:
0127 pid, host = a.get("pid"), a.get("hostname")
0128 if not pid or a.get("operational_state") == "EXITED":
0129 continue
0130 if not _heartbeat_fresh(a.get("last_heartbeat")):
0131 continue
0132 if pid == main:
0133 continue
0134 if host != this_host:
0135 log.warning(f"reap: live PRODOPS '{a.get('instance_name')}' on {host} pid={pid} — "
0136 f"cannot kill remotely; it may be stealing from the ops queue (ALARM)")
0137 continue
0138 try:
0139 os.kill(pid, signal.SIGKILL)
0140 killed += 1
0141 log.warning(f"REAPED duplicate PRODOPS '{a.get('instance_name')}' pid={pid} "
0142 f"(registry-saved; systemd main={main})")
0143 except ProcessLookupError:
0144 log.info(f"reap: '{a.get('instance_name')}' pid={pid} already gone")
0145 except PermissionError:
0146 log.error(f"reap: cannot kill pid={pid}: permission denied")
0147 if not killed:
0148 log.info(f"reap: systemd main pid {main} is the sole live PRODOPS on {this_host}")
0149
0150
0151
0152
0153 def liveness():
0154 try:
0155 import stomp
0156 except ImportError:
0157 log.error("stomp.py not available — cannot run liveness check")
0158 return
0159
0160 host = os.environ.get("ACTIVEMQ_HOST", "localhost")
0161 port = int(os.environ.get("ACTIVEMQ_PORT", "61612"))
0162 user = os.environ.get("ACTIVEMQ_USER", "admin")
0163 password = os.environ.get("ACTIVEMQ_PASSWORD", "admin")
0164 use_ssl = os.environ.get("ACTIVEMQ_USE_SSL", "False").lower() == "true"
0165 ca_certs = os.environ.get("ACTIVEMQ_SSL_CA_CERTS", "")
0166 reply_q = f"/queue/epicprod.ops.pong.{socket.gethostname()}.{os.getpid()}"
0167
0168 got = {"pong": False}
0169
0170 class _Listener(stomp.ConnectionListener):
0171 def on_message(self, frame):
0172 got["pong"] = True
0173
0174
0175 conn = stomp.Connection(
0176 host_and_ports=[(host, port)], vhost=host,
0177 try_loopback_connect=False, heartbeats=(30000, 30000),
0178 auto_content_length=False,
0179 )
0180 if use_ssl and ca_certs:
0181 import ssl
0182 conn.transport.set_ssl(for_hosts=[(host, port)], ca_certs=ca_certs,
0183 ssl_version=ssl.PROTOCOL_TLS_CLIENT)
0184 conn.set_listener("", _Listener())
0185 try:
0186 conn.connect(user, password, wait=True)
0187 conn.subscribe(destination=reply_q, id="pong", ack="auto")
0188 conn.send(destination=OPS_QUEUE,
0189 body=json.dumps({"msg_type": "health_ping",
0190 "namespace": "prodops", "reply_to": reply_q}))
0191 except Exception as e:
0192 log.error(f"liveness: MQ connect/send failed: {e}")
0193 return
0194
0195 deadline = time.time() + PING_TIMEOUT
0196 while time.time() < deadline and not got["pong"]:
0197 time.sleep(0.2)
0198 try:
0199 conn.disconnect()
0200 except Exception:
0201 pass
0202
0203 if got["pong"]:
0204 log.info("liveness: pong received — agent healthy")
0205 return
0206
0207
0208 st = _run(["systemctl", "show", "-p", "ExecMainStatus", "--value", UNIT])
0209 if st.stdout.strip() == str(EXIT_DELIBERATE):
0210 log.info(f"liveness: no pong, but '{UNIT}' exited deliberately "
0211 f"(code {EXIT_DELIBERATE}) — leaving it stopped")
0212 return
0213 log.warning(f"liveness: no pong in {PING_TIMEOUT}s — restarting unit '{UNIT}'")
0214
0215
0216 _run(["sudo", "systemctl", "reset-failed", UNIT])
0217 r = _run(["sudo", "systemctl", "restart", UNIT])
0218 if r.returncode != 0:
0219 log.error(f"ALARM: restart of '{UNIT}' FAILED rc={r.returncode}: {r.stderr.strip()}")
0220 else:
0221 log.warning(f"restarted '{UNIT}'")
0222
0223
0224
0225
0226 def prune(days):
0227 root = os.path.join(SWF_TMP_DIR, "panda-logs")
0228 if not os.path.isdir(root):
0229 return
0230 cutoff = time.time() - days * 86400
0231 removed = 0
0232 for task in os.listdir(root):
0233 tdir = os.path.join(root, task)
0234 if not os.path.isdir(tdir):
0235 continue
0236 for job in os.listdir(tdir):
0237 jdir = os.path.join(tdir, job)
0238 try:
0239 if os.path.isdir(jdir) and os.path.getmtime(jdir) < cutoff:
0240 shutil.rmtree(jdir, ignore_errors=True)
0241 removed += 1
0242 except OSError as e:
0243 log.error(f"prune: {jdir}: {e}")
0244 log.info(f"prune: removed {removed} cached job-log dir(s) older than {days}d")
0245
0246
0247 def main():
0248 logging.basicConfig(
0249 level=logging.INFO,
0250 format="%(asctime)s prodops-cleaner-killer %(levelname)s %(message)s",
0251 stream=sys.stderr,
0252 )
0253 ap = argparse.ArgumentParser(
0254 description="Reap duplicate prod-ops agents and keep the singleton alive.")
0255 ap.add_argument("--prune-days", type=int, default=None,
0256 help="also remove cached job logs older than N days")
0257 ap.add_argument("--no-liveness", action="store_true",
0258 help="skip the MQ ping / restart (reap [+ prune] only)")
0259 a = ap.parse_args()
0260
0261 reap_duplicates()
0262 if not a.no_liveness:
0263 liveness()
0264 if a.prune_days is not None:
0265 prune(a.prune_days)
0266
0267
0268 if __name__ == "__main__":
0269 main()