Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:21

0001 #!/usr/bin/env python3
0002 """
0003 ePIC production operations agent.
0004 
0005 An always-on agent built on the shared testbed agent infrastructure
0006 (`swf_common_lib.base_agent.BaseAgent`) that performs the credentialed
0007 production-operations actions the web tier structurally cannot: it runs as
0008 `wenauseic`, so it holds the Rucio proxy and can drive xrootd.
0009 
0010 It is event-driven, not polled. Requests arrive as JSON messages on an anycast
0011 control queue (handled once by the single consumer). Each action is a
0012 `msg_type` dispatched to a `_handle_<msg_type>` method, so growing the agent =
0013 adding a handler. The actual work is delegated to standalone scripts (the
0014 "doers"), keeping each capability usable on its own and the agent a thin,
0015 testbed-native event front end. Long-running doers (fetch, submit) run on
0016 BaseAgent's worker pool via ``run_in_background`` so the single receiver thread
0017 is never blocked; control messages (health_ping, shutdown) act inline.
0018 
0019 This is a system-level singleton (not a per-user testbed agent). It runs under a
0020 fixed 'prodops' namespace (from prodops.toml) so it is identifiable in the
0021 monitor and every caller addresses it explicitly, and is managed by systemd like
0022 the swf-*-bot units. The cleaner-killer cron reaps stale/duplicate instances and
0023 keeps one alive.
0024 
0025 Capabilities:
0026   fetch_payload_log  — retrieve + cache one PanDA job's payload log
0027                        (delegates to scripts/cache-payload-log.py).
0028   submit_task        — submit one PCS ProdTask to PanDA via prun, reusing the
0029                        cached production token (delegates to
0030                        scripts/submit-prod-task.py).
0031   rucio_snapshot_update — refresh the JLab Rucio output snapshot for the current
0032                        (+last) campaign and rematch produced datasets onto each
0033                        task's overrides['outputs'] (delegates to
0034                        scripts/rucio-snapshot-update.py).
0035   evgen_rucio_update — assimilate the JLab Rucio EVGEN inventory (epic:/EVGEN/*)
0036                        and resolve each PCS evgen Dataset onto metadata['rucio']
0037                        (delegates to scripts/import_evgen_rucio.py --apply).
0038   sync_epicprod_inventory — refresh the monitor's ePIC production job/file
0039                        inventory and parsed failure diagnosis for a PanDA job.
0040   refresh_system_status — refresh cached System status rows for services,
0041                        agents, and external monitor endpoints.
0042   health_ping        — liveness probe; replies 'pong' to reply_to.
0043   shutdown           — deliberate stop; exits EXIT_DELIBERATE so systemd leaves
0044                        the singleton down instead of restarting it.
0045 
0046 See docs/EPICPROD_OPS.md.
0047 """
0048 import json
0049 import logging
0050 import os
0051 import signal
0052 import subprocess
0053 import sys
0054 import threading
0055 import time
0056 from datetime import datetime, timezone
0057 from pathlib import Path
0058 
0059 from swf_common_lib.base_agent import BaseAgent
0060 
0061 # Anycast control queue: one consumer handles each request exactly once.
0062 OPS_QUEUE = os.environ.get("EPICPROD_OPS_QUEUE", "/queue/epicprod.ops")
0063 
0064 # Managed scratch/cache root (shared with the doer and the web view).
0065 SWF_TMP_DIR = os.environ.get("SWF_TMP_DIR", "/data/swf-tmp")
0066 
0067 # Hard backstop on the doer subprocess. Longer than the doer's own xrdcp timeout
0068 # so the doer fails first and cleanly; this only catches a wholly-wedged run.
0069 FETCH_TIMEOUT = int(os.environ.get("EPICPROD_FETCH_TIMEOUT", "180"))
0070 
0071 # Failure marker written into the cache dir; read by the web view to surface the
0072 # error and bound retries. A later success replaces the whole dir, clearing it.
0073 ERROR_MARKER = ".error"
0074 
0075 # The standalone doers, shipped alongside this agent.
0076 FETCH_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "cache-payload-log.py"
0077 SUBMIT_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "submit-prod-task.py"
0078 
0079 # Backstop on the prun submission subprocess (sandbox upload + JEDI insert).
0080 SUBMIT_TIMEOUT = int(os.environ.get("EPICPROD_SUBMIT_TIMEOUT", "300"))
0081 
0082 # Client-API EVGEN submission doer (sidelines prun): assembles the sandbox
0083 # (manifest + env + dispatcher + JLab proxy) and submits noInput+noOutput.
0084 SUBMIT_EVGEN_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "submit-evgen-task.py"
0085 SUBMIT_EVGEN_TIMEOUT = int(os.environ.get("EPICPROD_SUBMIT_EVGEN_TIMEOUT", "300"))
0086 
0087 # Update-from-Rucio doer: a live JLab Rucio fetch (current + last campaign) plus
0088 # the per-task rematch — slow and network-bound, so generously bounded.
0089 RUCIO_SNAPSHOT_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "rucio-snapshot-update.py"
0090 RUCIO_SNAPSHOT_TIMEOUT = int(os.environ.get("EPICPROD_RUCIO_SNAPSHOT_TIMEOUT", "900"))
0091 CATALOG_IMPORT_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "pcs-catalog-import.py"
0092 CATALOG_IMPORT_TIMEOUT = int(os.environ.get("EPICPROD_CATALOG_IMPORT_TIMEOUT", "1800"))
0093 
0094 # EVGEN-input assimilation doer: a live JLab Rucio fetch of epic:/EVGEN/* plus
0095 # the per-dataset match — slow and network-bound, so generously bounded.
0096 EVGEN_RUCIO_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "import_evgen_rucio.py"
0097 EVGEN_RUCIO_TIMEOUT = int(os.environ.get("EPICPROD_EVGEN_RUCIO_TIMEOUT", "900"))
0098 
0099 # Inventory refresh doer: a Django management command that reads PanDA/log
0100 # evidence and writes monitor-side EpicProdJob/EpicProdFile rows.
0101 MANAGE_PY = Path(__file__).resolve().parent.parent / "src" / "manage.py"
0102 INVENTORY_TIMEOUT = int(os.environ.get("EPICPROD_INVENTORY_TIMEOUT", "180"))
0103 
0104 # System status refresh doer: cached monitor/system rows, intentionally a
0105 # standalone script rather than a Django management command.
0106 SYSTEM_STATUS_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "refresh-system-status.py"
0107 SYSTEM_STATUS_TIMEOUT = int(os.environ.get("EPICPROD_SYSTEM_STATUS_TIMEOUT", "60"))
0108 SYSTEM_STATUS_INTERVAL = int(os.environ.get("EPICPROD_SYSTEM_STATUS_INTERVAL", "300"))
0109 SYSTEM_STATUS_INITIAL_DELAY = int(os.environ.get("EPICPROD_SYSTEM_STATUS_INITIAL_DELAY", "30"))
0110 
0111 # Dedicated namespace config shipped beside the agent. A fixed 'prodops' namespace
0112 # makes the singleton identifiable in the monitor and lets callers address it
0113 # explicitly — every message to this agent carries namespace=prodops.
0114 PRODOPS_CONFIG = Path(__file__).resolve().parent / "prodops.toml"
0115 
0116 # Deliberate-shutdown sentinel: main() exits with this so systemd's
0117 # RestartPreventExitStatus knows the stop was on purpose and must not restart.
0118 # Distinct from 0 — a persistent agent exiting 0 unbidden is still a failure.
0119 EXIT_DELIBERATE = 100
0120 
0121 
0122 class EpicProdOpsAgent(BaseAgent):
0123     """Production operations agent — dispatches ops messages to handlers."""
0124 
0125     KNOWN_TYPES = {"fetch_payload_log", "submit_task", "submit_evgen_task",
0126                    "rucio_snapshot_update", "evgen_rucio_update", "catalog_import",
0127                    "sync_epicprod_inventory", "refresh_system_status",
0128                    "health_ping", "shutdown"}
0129 
0130     def __init__(self):
0131         # System-level singleton (not a per-user testbed agent): its namespace is
0132         # the fixed 'prodops' from PRODOPS_CONFIG, so it is identifiable in the
0133         # monitor and callers address it explicitly. See docs/EPICPROD_OPS.md.
0134         super().__init__(agent_type="PRODOPS", subscription_queue=OPS_QUEUE,
0135                          config_path=str(PRODOPS_CONFIG))
0136         self._deliberate = False
0137         self._system_status_thread = threading.Thread(
0138             target=self._system_status_periodic_loop,
0139             name="system-status-refresh",
0140             daemon=True,
0141         )
0142         self._system_status_thread.start()
0143 
0144     def on_message(self, frame):
0145         message_data, msg_type = self.log_received_message(frame, known_types=self.KNOWN_TYPES)
0146         if message_data is None:          # namespace-filtered — ignore
0147             return
0148         handler = getattr(self, f"_handle_{msg_type}", None)
0149         if handler is None:
0150             self.logger.warning(f"PRODOPS: no handler for msg_type '{msg_type}'")
0151             return
0152         # Handlers return immediately: control messages (health_ping/shutdown) act
0153         # inline; work messages (fetch_payload_log/submit_task) validate here and
0154         # enqueue their doer via run_in_background, so the receiver thread is never
0155         # blocked. The worker pool drives PROCESSING state — no processing() wrap.
0156         try:
0157             handler(message_data)
0158         except Exception as e:
0159             self.logger.error(f"PRODOPS: handler '{msg_type}' raised: {e}")
0160 
0161     # -- handlers ------------------------------------------------------------
0162 
0163     def _handle_health_ping(self, m):
0164         """Liveness probe: reply 'pong' to the caller's reply_to queue.
0165 
0166         The cleaner-killer cron pings over the bus — for a messaging service the
0167         message path *is* the health — and restarts the unit if no pong arrives.
0168         Replies via conn.send directly, mirroring the agent-manager's ping reply.
0169         """
0170         reply_to = m.get("reply_to")
0171         if not reply_to:
0172             self.logger.warning("PRODOPS health_ping: no reply_to, dropping")
0173             return
0174         pong = {"msg_type": "pong", "agent": self.agent_name, "pid": self.pid}
0175         self.conn.send(destination=reply_to, body=json.dumps(pong))
0176         self.logger.info(f"PRODOPS health_ping -> pong to {reply_to}")
0177 
0178     def _handle_shutdown(self, m):
0179         """Deliberate-shutdown back door. An operator (or controller) can ask the
0180         singleton to step down over the bus; we unwind through BaseAgent's normal
0181         SIGTERM path and main() then exits EXIT_DELIBERATE so systemd
0182         (RestartPreventExitStatus) leaves it stopped instead of restarting it.
0183         Distinct from `systemctl stop`, the host-level back door."""
0184         self.logger.warning(
0185             f"PRODOPS: deliberate shutdown requested by {m.get('sender', '?')}")
0186         self._deliberate = True
0187         os.kill(self.pid, signal.SIGTERM)   # reuse BaseAgent's graceful unwind
0188 
0189     def _handle_fetch_payload_log(self, m):
0190         """Validate, then run the fetch on the worker pool — it blocks on Rucio
0191         + xrootd. Deduped per job so two requests for the same job don't extract
0192         into one cache dir at once."""
0193         missing = [k for k in ("scope", "lfn", "jeditaskid", "pandaid") if not m.get(k)]
0194         if missing:
0195             self.logger.error(f"PRODOPS fetch_payload_log: missing fields {missing}")
0196             return
0197         self.run_in_background(
0198             self._do_fetch_payload_log, m,
0199             dedup_key=f"fetch:{m['jeditaskid']}:{m['pandaid']}",
0200             label=f"fetch_payload_log pandaid={m['pandaid']}")
0201 
0202     def _do_fetch_payload_log(self, m):
0203         """Fetch + cache one job's payload log via the standalone helper."""
0204         jobdir = os.path.join(SWF_TMP_DIR, "panda-logs", str(m["jeditaskid"]), str(m["pandaid"]))
0205         cmd = [
0206             sys.executable, str(FETCH_SCRIPT),
0207             "--scope", str(m["scope"]),
0208             "--lfn", str(m["lfn"]),
0209             "--jeditaskid", str(m["jeditaskid"]),
0210             "--pandaid", str(m["pandaid"]),
0211         ]
0212         if m.get("force"):           # operator override: re-fetch even if cached
0213             cmd.append("--force")
0214         self.logger.info(f"PRODOPS fetch_payload_log: pandaid={m['pandaid']} task={m['jeditaskid']}")
0215         try:
0216             p = subprocess.run(cmd, capture_output=True, text=True, timeout=FETCH_TIMEOUT)
0217         except subprocess.TimeoutExpired:
0218             self.logger.error(
0219                 f"PRODOPS fetch_payload_log TIMEOUT after {FETCH_TIMEOUT}s pandaid={m['pandaid']}")
0220             self._mark_error(jobdir, f"fetch timed out after {FETCH_TIMEOUT}s")
0221             return
0222         for line in (p.stderr or "").splitlines():
0223             self.logger.info(f"  cache-payload-log: {line}")
0224         if p.returncode != 0:
0225             stderr = (p.stderr or "").strip()
0226             reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0227             self.logger.error(
0228                 f"PRODOPS fetch_payload_log FAILED rc={p.returncode} pandaid={m['pandaid']}")
0229             self._mark_error(jobdir, reason)
0230         else:
0231             self.logger.info(f"PRODOPS fetch_payload_log done: pandaid={m['pandaid']}")
0232             # Push completion to the browser via the SSE relay (rides the topic
0233             # the monitor consumes; the page matches on pandaid). See docs/SSE_PUSH.md.
0234             self.send_message('/topic/epictopic', {
0235                 'msg_type': 'payload_log_ready',
0236                 'pandaid': str(m['pandaid']),
0237                 'jeditaskid': str(m['jeditaskid']),
0238             })
0239 
0240     def _handle_submit_task(self, m):
0241         """Validate, then run the submission on the worker pool. Deduped per
0242         task so two near-simultaneous triggers can't fire two submissions —
0243         the status / panda_task_id gates close that window later; this closes
0244         the in-flight window at the agent."""
0245         task_name = m.get("task_name")
0246         if not task_name:
0247             self.logger.error("PRODOPS submit_task: missing task_name")
0248             return
0249         self.run_in_background(
0250             self._do_submit_task, m,
0251             dedup_key=f"submit:{task_name}", label=f"submit_task {task_name}")
0252 
0253     def _do_submit_task(self, m):
0254         """Submit one PCS ProdTask to PanDA via the standalone doer.
0255 
0256         The web tier ('Submit to PanDA') publishes {task_name}; the doer
0257         fetches the prun command from PCS, runs it non-interactively with the
0258         cached long-lived production token, and records the jediTaskID back.
0259         We hold the token; the web tier structurally cannot. The outcome lands
0260         on the ProdTask (panda_task_id + status) via record-submission, which
0261         the UI reads — so, like fetch_payload_log, there is no bus reply."""
0262         task_name = m["task_name"]
0263         cmd = [sys.executable, str(SUBMIT_SCRIPT), "--task-name", str(task_name)]
0264         if m.get("owner"):          # X-Remote-User for the owner-gated record write
0265             cmd += ["--owner", str(m["owner"])]
0266         self.logger.info(f"PRODOPS submit_task: {task_name}")
0267         try:
0268             p = subprocess.run(cmd, capture_output=True, text=True, timeout=SUBMIT_TIMEOUT)
0269         except subprocess.TimeoutExpired:
0270             self.logger.error(f"PRODOPS submit_task TIMEOUT after {SUBMIT_TIMEOUT}s: {task_name}")
0271             # NO silent failure: tell the waiting page the submission timed out.
0272             self.send_message('/topic/epictopic', {
0273                 'msg_type': 'prodtask_submit_failed', 'task_name': task_name,
0274                 'reason': f'submission timed out after {SUBMIT_TIMEOUT}s'})
0275             return
0276         stderr = p.stderr or ""
0277         for line in stderr.splitlines():
0278             self.logger.info(f"  submit-prod-task: {line}")
0279         jedi_task_id = (p.stdout or '').strip()
0280         reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0281         # Every outcome emits an event over the SSE relay (docs/SSE_PUSH.md) so the
0282         # compose page is never left polling a submission that already resolved.
0283         if p.returncode == 0 and jedi_task_id:
0284             self.logger.info(
0285                 f"PRODOPS submit_task done: {task_name} -> jediTaskID={jedi_task_id}")
0286             self.send_message('/topic/epictopic', {
0287                 'msg_type': 'prodtask_submitted',
0288                 'task_name': task_name, 'jedi_task_id': jedi_task_id})
0289         elif p.returncode == 7 and jedi_task_id:
0290             # Submitted to PanDA, but the record-back POST failed after retries —
0291             # an orphan. Surface the id; the task stays ready/unrecorded and
0292             # record-submission is idempotent, so it can be re-recorded.
0293             self.logger.error(
0294                 f"PRODOPS submit_task UNRECORDED: {task_name} submitted as "
0295                 f"jediTaskID={jedi_task_id} but record-back failed")
0296             self.send_message('/topic/epictopic', {
0297                 'msg_type': 'prodtask_submit_unrecorded', 'task_name': task_name,
0298                 'jedi_task_id': jedi_task_id, 'reason': reason})
0299         else:
0300             self.logger.error(f"PRODOPS submit_task FAILED rc={p.returncode}: {task_name}")
0301             self.send_message('/topic/epictopic', {
0302                 'msg_type': 'prodtask_submit_failed',
0303                 'task_name': task_name, 'reason': reason})
0304 
0305     def _handle_submit_evgen_task(self, m):
0306         """Validate, then run the client-API EVGEN submission on the worker
0307         pool. This is the live Submit-button path (prun is sidelined). Deduped
0308         per task so two near-simultaneous triggers can't fire two submissions."""
0309         task_name = m.get("task_name")
0310         if not task_name:
0311             self.logger.error("PRODOPS submit_evgen_task: missing task_name")
0312             return
0313         self.run_in_background(
0314             self._do_submit_evgen_task, m,
0315             dedup_key=f"submit:{task_name}", label=f"submit_evgen_task {task_name}")
0316 
0317     def _do_submit_evgen_task(self, m):
0318         """Submit one PCS ProdTask to PanDA via the client-API EVGEN doer.
0319 
0320         The doer fetches the EVGEN spec from PCS, assembles the sandbox
0321         (manifest + env + dispatcher + JLab proxy), and submits noInput+noOutput
0322         with the cached production token; the payload stages its EVGEN input and
0323         self-registers RECO to JLab Rucio. Outcome lands on the ProdTask
0324         (panda_task_id + status) via record-submission, and every outcome emits
0325         the same SSE events as the prun path, so the compose page is unchanged
0326         and never left polling. The JLab proxy + monitor URL/token travel in the
0327         agent's environment (SWF_MONITOR_URL, SWFMON_TOKEN, EVGEN_X509_PROXY)."""
0328         task_name = m["task_name"]
0329         cmd = [sys.executable, str(SUBMIT_EVGEN_SCRIPT), "--task-name", str(task_name)]
0330         if m.get("owner"):          # X-Remote-User for the owner-gated record write
0331             cmd += ["--owner", str(m["owner"])]
0332         self.logger.info(f"PRODOPS submit_evgen_task: {task_name}")
0333         try:
0334             p = subprocess.run(cmd, capture_output=True, text=True, timeout=SUBMIT_EVGEN_TIMEOUT)
0335         except subprocess.TimeoutExpired:
0336             self.logger.error(f"PRODOPS submit_evgen_task TIMEOUT after {SUBMIT_EVGEN_TIMEOUT}s: {task_name}")
0337             self.send_message('/topic/epictopic', {
0338                 'msg_type': 'prodtask_submit_failed', 'task_name': task_name,
0339                 'reason': f'submission timed out after {SUBMIT_EVGEN_TIMEOUT}s'})
0340             return
0341         stderr = p.stderr or ""
0342         for line in stderr.splitlines():
0343             self.logger.info(f"  submit-evgen-task: {line}")
0344         jedi_task_id = (p.stdout or '').strip()
0345         reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0346         if p.returncode == 0 and jedi_task_id:
0347             self.logger.info(
0348                 f"PRODOPS submit_evgen_task done: {task_name} -> jediTaskID={jedi_task_id}")
0349             self.send_message('/topic/epictopic', {
0350                 'msg_type': 'prodtask_submitted',
0351                 'task_name': task_name, 'jedi_task_id': jedi_task_id})
0352         elif p.returncode == 7 and jedi_task_id:
0353             # Submitted, but the record-back POST failed after retries — an
0354             # orphan; surface the id (record-submission is idempotent).
0355             self.logger.error(
0356                 f"PRODOPS submit_evgen_task UNRECORDED: {task_name} submitted as "
0357                 f"jediTaskID={jedi_task_id} but record-back failed")
0358             self.send_message('/topic/epictopic', {
0359                 'msg_type': 'prodtask_submit_unrecorded', 'task_name': task_name,
0360                 'jedi_task_id': jedi_task_id, 'reason': reason})
0361         else:
0362             self.logger.error(f"PRODOPS submit_evgen_task FAILED rc={p.returncode}: {task_name}")
0363             self.send_message('/topic/epictopic', {
0364                 'msg_type': 'prodtask_submit_failed',
0365                 'task_name': task_name, 'reason': reason})
0366 
0367     def _handle_sync_epicprod_inventory(self, m):
0368         """Refresh one job/task inventory record on the worker pool."""
0369         if not (m.get("pandaid") or m.get("jeditaskid") or m.get("task_name")):
0370             self.logger.error(
0371                 "PRODOPS sync_epicprod_inventory: need pandaid, jeditaskid, or task_name")
0372             return
0373         if m.get("pandaid"):
0374             key = f"pandaid:{m['pandaid']}"
0375             label = f"sync_epicprod_inventory pandaid={m['pandaid']}"
0376         elif m.get("jeditaskid"):
0377             key = f"jeditaskid:{m['jeditaskid']}"
0378             label = f"sync_epicprod_inventory jeditaskid={m['jeditaskid']}"
0379         else:
0380             key = f"task:{m['task_name']}"
0381             label = f"sync_epicprod_inventory task={m['task_name']}"
0382         self.run_in_background(
0383             self._do_sync_epicprod_inventory, m,
0384             dedup_key=f"sync_inventory:{key}",
0385             label=label)
0386 
0387     def _do_sync_epicprod_inventory(self, m):
0388         cmd = [sys.executable, str(MANAGE_PY), "sync_epicprod_inventory"]
0389         if m.get("pandaid"):
0390             cmd += ["--pandaid", str(m["pandaid"])]
0391         elif m.get("jeditaskid"):
0392             cmd += ["--jeditaskid", str(m["jeditaskid"])]
0393         else:
0394             cmd += ["--prod-task", str(m["task_name"])]
0395         self.logger.info(f"PRODOPS sync_epicprod_inventory: {' '.join(cmd)}")
0396         try:
0397             p = subprocess.run(cmd, capture_output=True, text=True,
0398                                timeout=INVENTORY_TIMEOUT)
0399         except subprocess.TimeoutExpired:
0400             reason = f"timed out after {INVENTORY_TIMEOUT}s"
0401             self.logger.error(f"PRODOPS sync_epicprod_inventory TIMEOUT: {reason}")
0402             self.send_message('/topic/epictopic', {
0403                 'msg_type': 'epicprod_inventory_ready', 'ok': False,
0404                 'pandaid': m.get('pandaid'), 'jeditaskid': m.get('jeditaskid'),
0405                 'task_name': m.get('task_name'), 'error': reason})
0406             return
0407         for line in (p.stdout or "").splitlines():
0408             self.logger.info(f"  sync-epicprod-inventory: {line}")
0409         for line in (p.stderr or "").splitlines():
0410             self.logger.info(f"  sync-epicprod-inventory: {line}")
0411         ok = p.returncode == 0
0412         reason = ''
0413         if not ok:
0414             stderr = (p.stderr or "").strip()
0415             reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0416             self.logger.error(f"PRODOPS sync_epicprod_inventory FAILED rc={p.returncode}")
0417         else:
0418             self.logger.info("PRODOPS sync_epicprod_inventory done")
0419         self.send_message('/topic/epictopic', {
0420             'msg_type': 'epicprod_inventory_ready', 'ok': ok,
0421             'pandaid': m.get('pandaid'), 'jeditaskid': m.get('jeditaskid'),
0422             'task_name': m.get('task_name'), 'error': reason})
0423 
0424     def _handle_refresh_system_status(self, m):
0425         """Refresh cached system status rows through the shared doer."""
0426         self.run_in_background(
0427             self._do_refresh_system_status, m,
0428             dedup_key="refresh_system_status",
0429             label="refresh_system_status")
0430 
0431     def _do_refresh_system_status(self, m):
0432         cmd = [
0433             sys.executable, str(SYSTEM_STATUS_SCRIPT),
0434             "--source", str(m.get("source") or "ops_agent"),
0435         ]
0436         selected = m.get("selected") or m.get("only") or []
0437         if isinstance(selected, str):
0438             selected = [selected]
0439         for name in selected:
0440             cmd += ["--only", str(name)]
0441         self.logger.info(f"PRODOPS refresh_system_status: {' '.join(cmd)}")
0442         try:
0443             p = subprocess.run(cmd, capture_output=True, text=True,
0444                                timeout=SYSTEM_STATUS_TIMEOUT)
0445         except subprocess.TimeoutExpired:
0446             reason = f"timed out after {SYSTEM_STATUS_TIMEOUT}s"
0447             self.logger.error(f"PRODOPS refresh_system_status TIMEOUT: {reason}")
0448             self.send_message('/topic/epictopic', {
0449                 'msg_type': 'system_status_ready', 'ok': False, 'error': reason})
0450             return
0451         for line in (p.stdout or "").splitlines():
0452             self.logger.info(f"  refresh-system-status: {line}")
0453         for line in (p.stderr or "").splitlines():
0454             self.logger.info(f"  refresh-system-status: {line}")
0455         ok = p.returncode == 0
0456         reason = ''
0457         if not ok:
0458             stderr = (p.stderr or "").strip()
0459             reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0460             self.logger.error(f"PRODOPS refresh_system_status FAILED rc={p.returncode}")
0461         else:
0462             self.logger.info("PRODOPS refresh_system_status done")
0463         self.send_message('/topic/epictopic', {
0464             'msg_type': 'system_status_ready', 'ok': ok, 'error': reason})
0465 
0466     def _system_status_periodic_loop(self):
0467         """Keep cached system knowledge fresh without page-load probes."""
0468         if SYSTEM_STATUS_INTERVAL <= 0:
0469             self.logger.info("PRODOPS periodic system status refresh disabled")
0470             return
0471         time.sleep(max(SYSTEM_STATUS_INITIAL_DELAY, 0))
0472         while True:
0473             self.run_in_background(
0474                 self._do_refresh_system_status,
0475                 {'source': 'ops_agent_periodic'},
0476                 dedup_key="refresh_system_status",
0477                 label="refresh_system_status periodic")
0478             time.sleep(SYSTEM_STATUS_INTERVAL)
0479 
0480     def _handle_rucio_snapshot_update(self, m):
0481         """Refresh the JLab Rucio snapshot + rematch outputs, off the receiver
0482         thread — the doer makes a live JLab fetch and matches every task, far
0483         too slow for the web request (which is why it moved here). Deduped to one
0484         refresh at a time; it scans the whole current/last campaign."""
0485         self.run_in_background(
0486             self._do_rucio_snapshot_update, m,
0487             dedup_key="rucio_snapshot_update", label="rucio_snapshot_update")
0488 
0489     def _do_rucio_snapshot_update(self, m):
0490         """Run the snapshot-refresh doer; push completion on success AND failure
0491         so the catalog never hangs on 'Updating…'. See docs/SSE_PUSH.md."""
0492         cmd = [sys.executable, str(RUCIO_SNAPSHOT_SCRIPT)]
0493         self.logger.info("PRODOPS rucio_snapshot_update: refreshing current/last snapshot")
0494         try:
0495             p = subprocess.run(cmd, capture_output=True, text=True,
0496                                timeout=RUCIO_SNAPSHOT_TIMEOUT)
0497         except subprocess.TimeoutExpired:
0498             self.logger.error(
0499                 f"PRODOPS rucio_snapshot_update TIMEOUT after {RUCIO_SNAPSHOT_TIMEOUT}s")
0500             self.send_message('/topic/epictopic', {
0501                 'msg_type': 'rucio_snapshot_ready', 'ok': False,
0502                 'error': f'timed out after {RUCIO_SNAPSHOT_TIMEOUT}s'})
0503             return
0504         for line in (p.stdout or "").splitlines():
0505             self.logger.info(f"  rucio-snapshot-update: {line}")
0506         for line in (p.stderr or "").splitlines():
0507             self.logger.info(f"  rucio-snapshot-update: {line}")
0508         if p.returncode != 0:
0509             stderr = (p.stderr or "").strip()
0510             reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0511             self.logger.error(f"PRODOPS rucio_snapshot_update FAILED rc={p.returncode}")
0512             self.send_message('/topic/epictopic', {
0513                 'msg_type': 'rucio_snapshot_ready', 'ok': False, 'error': reason})
0514         else:
0515             self.logger.info("PRODOPS rucio_snapshot_update done")
0516             self.send_message('/topic/epictopic', {
0517                 'msg_type': 'rucio_snapshot_ready', 'ok': True})
0518 
0519     def _handle_evgen_rucio_update(self, m):
0520         """Assimilate the JLab Rucio EVGEN inventory off the receiver thread — a
0521         live JLab fetch + per-dataset match, too slow for the web request.
0522         Deduped to one assimilation at a time."""
0523         self.run_in_background(
0524             self._do_evgen_rucio_update, m,
0525             dedup_key="evgen_rucio_update", label="evgen_rucio_update")
0526 
0527     def _do_evgen_rucio_update(self, m):
0528         """Run the EVGEN assimilation doer (--apply); push completion on success
0529         AND failure so the catalog never hangs on 'Updating…'. See docs/SSE_PUSH.md."""
0530         cmd = [sys.executable, str(EVGEN_RUCIO_SCRIPT), "--apply"]
0531         self.logger.info("PRODOPS evgen_rucio_update: assimilating JLab Rucio EVGEN")
0532         try:
0533             p = subprocess.run(cmd, capture_output=True, text=True,
0534                                timeout=EVGEN_RUCIO_TIMEOUT)
0535         except subprocess.TimeoutExpired:
0536             self.logger.error(
0537                 f"PRODOPS evgen_rucio_update TIMEOUT after {EVGEN_RUCIO_TIMEOUT}s")
0538             self.send_message('/topic/epictopic', {
0539                 'msg_type': 'evgen_rucio_ready', 'ok': False,
0540                 'error': f'timed out after {EVGEN_RUCIO_TIMEOUT}s'})
0541             return
0542         for line in (p.stdout or "").splitlines():
0543             self.logger.info(f"  import-evgen-rucio: {line}")
0544         for line in (p.stderr or "").splitlines():
0545             self.logger.info(f"  import-evgen-rucio: {line}")
0546         if p.returncode != 0:
0547             stderr = (p.stderr or "").strip()
0548             reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0549             self.logger.error(f"PRODOPS evgen_rucio_update FAILED rc={p.returncode}")
0550             self.send_message('/topic/epictopic', {
0551                 'msg_type': 'evgen_rucio_ready', 'ok': False, 'error': reason})
0552         else:
0553             self.logger.info("PRODOPS evgen_rucio_update done")
0554             self.send_message('/topic/epictopic', {
0555                 'msg_type': 'evgen_rucio_ready', 'ok': True})
0556 
0557     def _handle_catalog_import(self, m):
0558         """Run a PCS catalog import (csv / epic-prod) off the receiver thread —
0559         the epic-prod walk of ~4900 datasets is far too slow for the web request
0560         (which is why it moved here). Deduped per source."""
0561         source = m.get('source', '')
0562         self.run_in_background(
0563             self._do_catalog_import, m,
0564             dedup_key=f"catalog_import:{source}", label=f"catalog_import:{source}")
0565 
0566     def _do_catalog_import(self, m):
0567         """Run the catalog-import doer; push completion on success AND failure so
0568         the catalog never hangs on 'Updating…'. See docs/SSE_PUSH.md."""
0569         source = m.get('source', '')
0570         cmd = [sys.executable, str(CATALOG_IMPORT_SCRIPT), source]
0571         self.logger.info(f"PRODOPS catalog_import: importing {source}")
0572         try:
0573             p = subprocess.run(cmd, capture_output=True, text=True,
0574                                timeout=CATALOG_IMPORT_TIMEOUT)
0575         except subprocess.TimeoutExpired:
0576             self.logger.error(
0577                 f"PRODOPS catalog_import {source} TIMEOUT after {CATALOG_IMPORT_TIMEOUT}s")
0578             self.send_message('/topic/epictopic', {
0579                 'msg_type': 'catalog_import_ready', 'source': source, 'ok': False,
0580                 'error': f'timed out after {CATALOG_IMPORT_TIMEOUT}s'})
0581             return
0582         for line in (p.stdout or "").splitlines():
0583             self.logger.info(f"  pcs-catalog-import: {line}")
0584         for line in (p.stderr or "").splitlines():
0585             self.logger.info(f"  pcs-catalog-import: {line}")
0586         if p.returncode != 0:
0587             stderr = (p.stderr or "").strip()
0588             reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0589             self.logger.error(f"PRODOPS catalog_import {source} FAILED rc={p.returncode}")
0590             self.send_message('/topic/epictopic', {
0591                 'msg_type': 'catalog_import_ready', 'source': source, 'ok': False,
0592                 'error': reason})
0593         else:
0594             summary = (p.stdout or "").strip().splitlines()
0595             self.logger.info(f"PRODOPS catalog_import {source} done")
0596             self.send_message('/topic/epictopic', {
0597                 'msg_type': 'catalog_import_ready', 'source': source, 'ok': True,
0598                 'summary': summary[-1] if summary else ''})
0599 
0600     # -- helpers -------------------------------------------------------------
0601 
0602     def _mark_error(self, jobdir, reason):
0603         """Record a failed fetch in the cache dir (attempt count + reason) so the
0604         web view can surface it and stop auto-retrying past the cap."""
0605         try:
0606             os.makedirs(jobdir, exist_ok=True)
0607             path = os.path.join(jobdir, ERROR_MARKER)
0608             attempts = 0
0609             if os.path.isfile(path):
0610                 try:
0611                     with open(path) as f:
0612                         attempts = int(json.load(f).get("attempts", 0))
0613                 except (ValueError, OSError):
0614                     attempts = 0
0615             with open(path, "w") as f:
0616                 json.dump({"attempts": attempts + 1, "last_error": reason,
0617                            "ts": datetime.now(timezone.utc).isoformat()}, f)
0618         except OSError as e:
0619             self.logger.error(f"PRODOPS could not write {ERROR_MARKER} in {jobdir}: {e}")
0620 
0621 
0622 def main():
0623     agent = EpicProdOpsAgent()
0624     agent.run()
0625     # A deliberate bus 'shutdown' exits with the sentinel so systemd does not
0626     # restart it; any other exit is a failure and is restarted (burst-capped).
0627     sys.exit(EXIT_DELIBERATE if agent._deliberate else 0)
0628 
0629 
0630 if __name__ == "__main__":
0631     main()