File indexing completed on 2026-06-26 08:40:21
0001
0002 """
0003 ePIC production operations agent.
0004
0005 An always-on agent built on the shared testbed agent infrastructure
0006 (`swf_common_lib.base_agent.BaseAgent`) that performs the credentialed
0007 production-operations actions the web tier structurally cannot: it runs as
0008 `wenauseic`, so it holds the Rucio proxy and can drive xrootd.
0009
0010 It is event-driven, not polled. Requests arrive as JSON messages on an anycast
0011 control queue (handled once by the single consumer). Each action is a
0012 `msg_type` dispatched to a `_handle_<msg_type>` method, so growing the agent =
0013 adding a handler. The actual work is delegated to standalone scripts (the
0014 "doers"), keeping each capability usable on its own and the agent a thin,
0015 testbed-native event front end. Long-running doers (fetch, submit) run on
0016 BaseAgent's worker pool via ``run_in_background`` so the single receiver thread
0017 is never blocked; control messages (health_ping, shutdown) act inline.
0018
0019 This is a system-level singleton (not a per-user testbed agent). It runs under a
0020 fixed 'prodops' namespace (from prodops.toml) so it is identifiable in the
0021 monitor and every caller addresses it explicitly, and is managed by systemd like
0022 the swf-*-bot units. The cleaner-killer cron reaps stale/duplicate instances and
0023 keeps one alive.
0024
0025 Capabilities:
0026 fetch_payload_log — retrieve + cache one PanDA job's payload log
0027 (delegates to scripts/cache-payload-log.py).
0028 submit_task — submit one PCS ProdTask to PanDA via prun, reusing the
0029 cached production token (delegates to
0030 scripts/submit-prod-task.py).
0031 rucio_snapshot_update — refresh the JLab Rucio output snapshot for the current
0032 (+last) campaign and rematch produced datasets onto each
0033 task's overrides['outputs'] (delegates to
0034 scripts/rucio-snapshot-update.py).
0035 evgen_rucio_update — assimilate the JLab Rucio EVGEN inventory (epic:/EVGEN/*)
0036 and resolve each PCS evgen Dataset onto metadata['rucio']
0037 (delegates to scripts/import_evgen_rucio.py --apply).
0038 sync_epicprod_inventory — refresh the monitor's ePIC production job/file
0039 inventory and parsed failure diagnosis for a PanDA job.
0040 refresh_system_status — refresh cached System status rows for services,
0041 agents, and external monitor endpoints.
0042 health_ping — liveness probe; replies 'pong' to reply_to.
0043 shutdown — deliberate stop; exits EXIT_DELIBERATE so systemd leaves
0044 the singleton down instead of restarting it.
0045
0046 See docs/EPICPROD_OPS.md.
0047 """
0048 import json
0049 import logging
0050 import os
0051 import signal
0052 import subprocess
0053 import sys
0054 import threading
0055 import time
0056 from datetime import datetime, timezone
0057 from pathlib import Path
0058
0059 from swf_common_lib.base_agent import BaseAgent
0060
0061
0062 OPS_QUEUE = os.environ.get("EPICPROD_OPS_QUEUE", "/queue/epicprod.ops")
0063
0064
0065 SWF_TMP_DIR = os.environ.get("SWF_TMP_DIR", "/data/swf-tmp")
0066
0067
0068
0069 FETCH_TIMEOUT = int(os.environ.get("EPICPROD_FETCH_TIMEOUT", "180"))
0070
0071
0072
0073 ERROR_MARKER = ".error"
0074
0075
0076 FETCH_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "cache-payload-log.py"
0077 SUBMIT_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "submit-prod-task.py"
0078
0079
0080 SUBMIT_TIMEOUT = int(os.environ.get("EPICPROD_SUBMIT_TIMEOUT", "300"))
0081
0082
0083
0084 SUBMIT_EVGEN_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "submit-evgen-task.py"
0085 SUBMIT_EVGEN_TIMEOUT = int(os.environ.get("EPICPROD_SUBMIT_EVGEN_TIMEOUT", "300"))
0086
0087
0088
0089 RUCIO_SNAPSHOT_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "rucio-snapshot-update.py"
0090 RUCIO_SNAPSHOT_TIMEOUT = int(os.environ.get("EPICPROD_RUCIO_SNAPSHOT_TIMEOUT", "900"))
0091 CATALOG_IMPORT_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "pcs-catalog-import.py"
0092 CATALOG_IMPORT_TIMEOUT = int(os.environ.get("EPICPROD_CATALOG_IMPORT_TIMEOUT", "1800"))
0093
0094
0095
0096 EVGEN_RUCIO_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "import_evgen_rucio.py"
0097 EVGEN_RUCIO_TIMEOUT = int(os.environ.get("EPICPROD_EVGEN_RUCIO_TIMEOUT", "900"))
0098
0099
0100
0101 MANAGE_PY = Path(__file__).resolve().parent.parent / "src" / "manage.py"
0102 INVENTORY_TIMEOUT = int(os.environ.get("EPICPROD_INVENTORY_TIMEOUT", "180"))
0103
0104
0105
0106 SYSTEM_STATUS_SCRIPT = Path(__file__).resolve().parent.parent / "scripts" / "refresh-system-status.py"
0107 SYSTEM_STATUS_TIMEOUT = int(os.environ.get("EPICPROD_SYSTEM_STATUS_TIMEOUT", "60"))
0108 SYSTEM_STATUS_INTERVAL = int(os.environ.get("EPICPROD_SYSTEM_STATUS_INTERVAL", "300"))
0109 SYSTEM_STATUS_INITIAL_DELAY = int(os.environ.get("EPICPROD_SYSTEM_STATUS_INITIAL_DELAY", "30"))
0110
0111
0112
0113
0114 PRODOPS_CONFIG = Path(__file__).resolve().parent / "prodops.toml"
0115
0116
0117
0118
0119 EXIT_DELIBERATE = 100
0120
0121
0122 class EpicProdOpsAgent(BaseAgent):
0123 """Production operations agent — dispatches ops messages to handlers."""
0124
0125 KNOWN_TYPES = {"fetch_payload_log", "submit_task", "submit_evgen_task",
0126 "rucio_snapshot_update", "evgen_rucio_update", "catalog_import",
0127 "sync_epicprod_inventory", "refresh_system_status",
0128 "health_ping", "shutdown"}
0129
0130 def __init__(self):
0131
0132
0133
0134 super().__init__(agent_type="PRODOPS", subscription_queue=OPS_QUEUE,
0135 config_path=str(PRODOPS_CONFIG))
0136 self._deliberate = False
0137 self._system_status_thread = threading.Thread(
0138 target=self._system_status_periodic_loop,
0139 name="system-status-refresh",
0140 daemon=True,
0141 )
0142 self._system_status_thread.start()
0143
0144 def on_message(self, frame):
0145 message_data, msg_type = self.log_received_message(frame, known_types=self.KNOWN_TYPES)
0146 if message_data is None:
0147 return
0148 handler = getattr(self, f"_handle_{msg_type}", None)
0149 if handler is None:
0150 self.logger.warning(f"PRODOPS: no handler for msg_type '{msg_type}'")
0151 return
0152
0153
0154
0155
0156 try:
0157 handler(message_data)
0158 except Exception as e:
0159 self.logger.error(f"PRODOPS: handler '{msg_type}' raised: {e}")
0160
0161
0162
0163 def _handle_health_ping(self, m):
0164 """Liveness probe: reply 'pong' to the caller's reply_to queue.
0165
0166 The cleaner-killer cron pings over the bus — for a messaging service the
0167 message path *is* the health — and restarts the unit if no pong arrives.
0168 Replies via conn.send directly, mirroring the agent-manager's ping reply.
0169 """
0170 reply_to = m.get("reply_to")
0171 if not reply_to:
0172 self.logger.warning("PRODOPS health_ping: no reply_to, dropping")
0173 return
0174 pong = {"msg_type": "pong", "agent": self.agent_name, "pid": self.pid}
0175 self.conn.send(destination=reply_to, body=json.dumps(pong))
0176 self.logger.info(f"PRODOPS health_ping -> pong to {reply_to}")
0177
0178 def _handle_shutdown(self, m):
0179 """Deliberate-shutdown back door. An operator (or controller) can ask the
0180 singleton to step down over the bus; we unwind through BaseAgent's normal
0181 SIGTERM path and main() then exits EXIT_DELIBERATE so systemd
0182 (RestartPreventExitStatus) leaves it stopped instead of restarting it.
0183 Distinct from `systemctl stop`, the host-level back door."""
0184 self.logger.warning(
0185 f"PRODOPS: deliberate shutdown requested by {m.get('sender', '?')}")
0186 self._deliberate = True
0187 os.kill(self.pid, signal.SIGTERM)
0188
0189 def _handle_fetch_payload_log(self, m):
0190 """Validate, then run the fetch on the worker pool — it blocks on Rucio
0191 + xrootd. Deduped per job so two requests for the same job don't extract
0192 into one cache dir at once."""
0193 missing = [k for k in ("scope", "lfn", "jeditaskid", "pandaid") if not m.get(k)]
0194 if missing:
0195 self.logger.error(f"PRODOPS fetch_payload_log: missing fields {missing}")
0196 return
0197 self.run_in_background(
0198 self._do_fetch_payload_log, m,
0199 dedup_key=f"fetch:{m['jeditaskid']}:{m['pandaid']}",
0200 label=f"fetch_payload_log pandaid={m['pandaid']}")
0201
0202 def _do_fetch_payload_log(self, m):
0203 """Fetch + cache one job's payload log via the standalone helper."""
0204 jobdir = os.path.join(SWF_TMP_DIR, "panda-logs", str(m["jeditaskid"]), str(m["pandaid"]))
0205 cmd = [
0206 sys.executable, str(FETCH_SCRIPT),
0207 "--scope", str(m["scope"]),
0208 "--lfn", str(m["lfn"]),
0209 "--jeditaskid", str(m["jeditaskid"]),
0210 "--pandaid", str(m["pandaid"]),
0211 ]
0212 if m.get("force"):
0213 cmd.append("--force")
0214 self.logger.info(f"PRODOPS fetch_payload_log: pandaid={m['pandaid']} task={m['jeditaskid']}")
0215 try:
0216 p = subprocess.run(cmd, capture_output=True, text=True, timeout=FETCH_TIMEOUT)
0217 except subprocess.TimeoutExpired:
0218 self.logger.error(
0219 f"PRODOPS fetch_payload_log TIMEOUT after {FETCH_TIMEOUT}s pandaid={m['pandaid']}")
0220 self._mark_error(jobdir, f"fetch timed out after {FETCH_TIMEOUT}s")
0221 return
0222 for line in (p.stderr or "").splitlines():
0223 self.logger.info(f" cache-payload-log: {line}")
0224 if p.returncode != 0:
0225 stderr = (p.stderr or "").strip()
0226 reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0227 self.logger.error(
0228 f"PRODOPS fetch_payload_log FAILED rc={p.returncode} pandaid={m['pandaid']}")
0229 self._mark_error(jobdir, reason)
0230 else:
0231 self.logger.info(f"PRODOPS fetch_payload_log done: pandaid={m['pandaid']}")
0232
0233
0234 self.send_message('/topic/epictopic', {
0235 'msg_type': 'payload_log_ready',
0236 'pandaid': str(m['pandaid']),
0237 'jeditaskid': str(m['jeditaskid']),
0238 })
0239
0240 def _handle_submit_task(self, m):
0241 """Validate, then run the submission on the worker pool. Deduped per
0242 task so two near-simultaneous triggers can't fire two submissions —
0243 the status / panda_task_id gates close that window later; this closes
0244 the in-flight window at the agent."""
0245 task_name = m.get("task_name")
0246 if not task_name:
0247 self.logger.error("PRODOPS submit_task: missing task_name")
0248 return
0249 self.run_in_background(
0250 self._do_submit_task, m,
0251 dedup_key=f"submit:{task_name}", label=f"submit_task {task_name}")
0252
0253 def _do_submit_task(self, m):
0254 """Submit one PCS ProdTask to PanDA via the standalone doer.
0255
0256 The web tier ('Submit to PanDA') publishes {task_name}; the doer
0257 fetches the prun command from PCS, runs it non-interactively with the
0258 cached long-lived production token, and records the jediTaskID back.
0259 We hold the token; the web tier structurally cannot. The outcome lands
0260 on the ProdTask (panda_task_id + status) via record-submission, which
0261 the UI reads — so, like fetch_payload_log, there is no bus reply."""
0262 task_name = m["task_name"]
0263 cmd = [sys.executable, str(SUBMIT_SCRIPT), "--task-name", str(task_name)]
0264 if m.get("owner"):
0265 cmd += ["--owner", str(m["owner"])]
0266 self.logger.info(f"PRODOPS submit_task: {task_name}")
0267 try:
0268 p = subprocess.run(cmd, capture_output=True, text=True, timeout=SUBMIT_TIMEOUT)
0269 except subprocess.TimeoutExpired:
0270 self.logger.error(f"PRODOPS submit_task TIMEOUT after {SUBMIT_TIMEOUT}s: {task_name}")
0271
0272 self.send_message('/topic/epictopic', {
0273 'msg_type': 'prodtask_submit_failed', 'task_name': task_name,
0274 'reason': f'submission timed out after {SUBMIT_TIMEOUT}s'})
0275 return
0276 stderr = p.stderr or ""
0277 for line in stderr.splitlines():
0278 self.logger.info(f" submit-prod-task: {line}")
0279 jedi_task_id = (p.stdout or '').strip()
0280 reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0281
0282
0283 if p.returncode == 0 and jedi_task_id:
0284 self.logger.info(
0285 f"PRODOPS submit_task done: {task_name} -> jediTaskID={jedi_task_id}")
0286 self.send_message('/topic/epictopic', {
0287 'msg_type': 'prodtask_submitted',
0288 'task_name': task_name, 'jedi_task_id': jedi_task_id})
0289 elif p.returncode == 7 and jedi_task_id:
0290
0291
0292
0293 self.logger.error(
0294 f"PRODOPS submit_task UNRECORDED: {task_name} submitted as "
0295 f"jediTaskID={jedi_task_id} but record-back failed")
0296 self.send_message('/topic/epictopic', {
0297 'msg_type': 'prodtask_submit_unrecorded', 'task_name': task_name,
0298 'jedi_task_id': jedi_task_id, 'reason': reason})
0299 else:
0300 self.logger.error(f"PRODOPS submit_task FAILED rc={p.returncode}: {task_name}")
0301 self.send_message('/topic/epictopic', {
0302 'msg_type': 'prodtask_submit_failed',
0303 'task_name': task_name, 'reason': reason})
0304
0305 def _handle_submit_evgen_task(self, m):
0306 """Validate, then run the client-API EVGEN submission on the worker
0307 pool. This is the live Submit-button path (prun is sidelined). Deduped
0308 per task so two near-simultaneous triggers can't fire two submissions."""
0309 task_name = m.get("task_name")
0310 if not task_name:
0311 self.logger.error("PRODOPS submit_evgen_task: missing task_name")
0312 return
0313 self.run_in_background(
0314 self._do_submit_evgen_task, m,
0315 dedup_key=f"submit:{task_name}", label=f"submit_evgen_task {task_name}")
0316
0317 def _do_submit_evgen_task(self, m):
0318 """Submit one PCS ProdTask to PanDA via the client-API EVGEN doer.
0319
0320 The doer fetches the EVGEN spec from PCS, assembles the sandbox
0321 (manifest + env + dispatcher + JLab proxy), and submits noInput+noOutput
0322 with the cached production token; the payload stages its EVGEN input and
0323 self-registers RECO to JLab Rucio. Outcome lands on the ProdTask
0324 (panda_task_id + status) via record-submission, and every outcome emits
0325 the same SSE events as the prun path, so the compose page is unchanged
0326 and never left polling. The JLab proxy + monitor URL/token travel in the
0327 agent's environment (SWF_MONITOR_URL, SWFMON_TOKEN, EVGEN_X509_PROXY)."""
0328 task_name = m["task_name"]
0329 cmd = [sys.executable, str(SUBMIT_EVGEN_SCRIPT), "--task-name", str(task_name)]
0330 if m.get("owner"):
0331 cmd += ["--owner", str(m["owner"])]
0332 self.logger.info(f"PRODOPS submit_evgen_task: {task_name}")
0333 try:
0334 p = subprocess.run(cmd, capture_output=True, text=True, timeout=SUBMIT_EVGEN_TIMEOUT)
0335 except subprocess.TimeoutExpired:
0336 self.logger.error(f"PRODOPS submit_evgen_task TIMEOUT after {SUBMIT_EVGEN_TIMEOUT}s: {task_name}")
0337 self.send_message('/topic/epictopic', {
0338 'msg_type': 'prodtask_submit_failed', 'task_name': task_name,
0339 'reason': f'submission timed out after {SUBMIT_EVGEN_TIMEOUT}s'})
0340 return
0341 stderr = p.stderr or ""
0342 for line in stderr.splitlines():
0343 self.logger.info(f" submit-evgen-task: {line}")
0344 jedi_task_id = (p.stdout or '').strip()
0345 reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0346 if p.returncode == 0 and jedi_task_id:
0347 self.logger.info(
0348 f"PRODOPS submit_evgen_task done: {task_name} -> jediTaskID={jedi_task_id}")
0349 self.send_message('/topic/epictopic', {
0350 'msg_type': 'prodtask_submitted',
0351 'task_name': task_name, 'jedi_task_id': jedi_task_id})
0352 elif p.returncode == 7 and jedi_task_id:
0353
0354
0355 self.logger.error(
0356 f"PRODOPS submit_evgen_task UNRECORDED: {task_name} submitted as "
0357 f"jediTaskID={jedi_task_id} but record-back failed")
0358 self.send_message('/topic/epictopic', {
0359 'msg_type': 'prodtask_submit_unrecorded', 'task_name': task_name,
0360 'jedi_task_id': jedi_task_id, 'reason': reason})
0361 else:
0362 self.logger.error(f"PRODOPS submit_evgen_task FAILED rc={p.returncode}: {task_name}")
0363 self.send_message('/topic/epictopic', {
0364 'msg_type': 'prodtask_submit_failed',
0365 'task_name': task_name, 'reason': reason})
0366
0367 def _handle_sync_epicprod_inventory(self, m):
0368 """Refresh one job/task inventory record on the worker pool."""
0369 if not (m.get("pandaid") or m.get("jeditaskid") or m.get("task_name")):
0370 self.logger.error(
0371 "PRODOPS sync_epicprod_inventory: need pandaid, jeditaskid, or task_name")
0372 return
0373 if m.get("pandaid"):
0374 key = f"pandaid:{m['pandaid']}"
0375 label = f"sync_epicprod_inventory pandaid={m['pandaid']}"
0376 elif m.get("jeditaskid"):
0377 key = f"jeditaskid:{m['jeditaskid']}"
0378 label = f"sync_epicprod_inventory jeditaskid={m['jeditaskid']}"
0379 else:
0380 key = f"task:{m['task_name']}"
0381 label = f"sync_epicprod_inventory task={m['task_name']}"
0382 self.run_in_background(
0383 self._do_sync_epicprod_inventory, m,
0384 dedup_key=f"sync_inventory:{key}",
0385 label=label)
0386
0387 def _do_sync_epicprod_inventory(self, m):
0388 cmd = [sys.executable, str(MANAGE_PY), "sync_epicprod_inventory"]
0389 if m.get("pandaid"):
0390 cmd += ["--pandaid", str(m["pandaid"])]
0391 elif m.get("jeditaskid"):
0392 cmd += ["--jeditaskid", str(m["jeditaskid"])]
0393 else:
0394 cmd += ["--prod-task", str(m["task_name"])]
0395 self.logger.info(f"PRODOPS sync_epicprod_inventory: {' '.join(cmd)}")
0396 try:
0397 p = subprocess.run(cmd, capture_output=True, text=True,
0398 timeout=INVENTORY_TIMEOUT)
0399 except subprocess.TimeoutExpired:
0400 reason = f"timed out after {INVENTORY_TIMEOUT}s"
0401 self.logger.error(f"PRODOPS sync_epicprod_inventory TIMEOUT: {reason}")
0402 self.send_message('/topic/epictopic', {
0403 'msg_type': 'epicprod_inventory_ready', 'ok': False,
0404 'pandaid': m.get('pandaid'), 'jeditaskid': m.get('jeditaskid'),
0405 'task_name': m.get('task_name'), 'error': reason})
0406 return
0407 for line in (p.stdout or "").splitlines():
0408 self.logger.info(f" sync-epicprod-inventory: {line}")
0409 for line in (p.stderr or "").splitlines():
0410 self.logger.info(f" sync-epicprod-inventory: {line}")
0411 ok = p.returncode == 0
0412 reason = ''
0413 if not ok:
0414 stderr = (p.stderr or "").strip()
0415 reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0416 self.logger.error(f"PRODOPS sync_epicprod_inventory FAILED rc={p.returncode}")
0417 else:
0418 self.logger.info("PRODOPS sync_epicprod_inventory done")
0419 self.send_message('/topic/epictopic', {
0420 'msg_type': 'epicprod_inventory_ready', 'ok': ok,
0421 'pandaid': m.get('pandaid'), 'jeditaskid': m.get('jeditaskid'),
0422 'task_name': m.get('task_name'), 'error': reason})
0423
0424 def _handle_refresh_system_status(self, m):
0425 """Refresh cached system status rows through the shared doer."""
0426 self.run_in_background(
0427 self._do_refresh_system_status, m,
0428 dedup_key="refresh_system_status",
0429 label="refresh_system_status")
0430
0431 def _do_refresh_system_status(self, m):
0432 cmd = [
0433 sys.executable, str(SYSTEM_STATUS_SCRIPT),
0434 "--source", str(m.get("source") or "ops_agent"),
0435 ]
0436 selected = m.get("selected") or m.get("only") or []
0437 if isinstance(selected, str):
0438 selected = [selected]
0439 for name in selected:
0440 cmd += ["--only", str(name)]
0441 self.logger.info(f"PRODOPS refresh_system_status: {' '.join(cmd)}")
0442 try:
0443 p = subprocess.run(cmd, capture_output=True, text=True,
0444 timeout=SYSTEM_STATUS_TIMEOUT)
0445 except subprocess.TimeoutExpired:
0446 reason = f"timed out after {SYSTEM_STATUS_TIMEOUT}s"
0447 self.logger.error(f"PRODOPS refresh_system_status TIMEOUT: {reason}")
0448 self.send_message('/topic/epictopic', {
0449 'msg_type': 'system_status_ready', 'ok': False, 'error': reason})
0450 return
0451 for line in (p.stdout or "").splitlines():
0452 self.logger.info(f" refresh-system-status: {line}")
0453 for line in (p.stderr or "").splitlines():
0454 self.logger.info(f" refresh-system-status: {line}")
0455 ok = p.returncode == 0
0456 reason = ''
0457 if not ok:
0458 stderr = (p.stderr or "").strip()
0459 reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0460 self.logger.error(f"PRODOPS refresh_system_status FAILED rc={p.returncode}")
0461 else:
0462 self.logger.info("PRODOPS refresh_system_status done")
0463 self.send_message('/topic/epictopic', {
0464 'msg_type': 'system_status_ready', 'ok': ok, 'error': reason})
0465
0466 def _system_status_periodic_loop(self):
0467 """Keep cached system knowledge fresh without page-load probes."""
0468 if SYSTEM_STATUS_INTERVAL <= 0:
0469 self.logger.info("PRODOPS periodic system status refresh disabled")
0470 return
0471 time.sleep(max(SYSTEM_STATUS_INITIAL_DELAY, 0))
0472 while True:
0473 self.run_in_background(
0474 self._do_refresh_system_status,
0475 {'source': 'ops_agent_periodic'},
0476 dedup_key="refresh_system_status",
0477 label="refresh_system_status periodic")
0478 time.sleep(SYSTEM_STATUS_INTERVAL)
0479
0480 def _handle_rucio_snapshot_update(self, m):
0481 """Refresh the JLab Rucio snapshot + rematch outputs, off the receiver
0482 thread — the doer makes a live JLab fetch and matches every task, far
0483 too slow for the web request (which is why it moved here). Deduped to one
0484 refresh at a time; it scans the whole current/last campaign."""
0485 self.run_in_background(
0486 self._do_rucio_snapshot_update, m,
0487 dedup_key="rucio_snapshot_update", label="rucio_snapshot_update")
0488
0489 def _do_rucio_snapshot_update(self, m):
0490 """Run the snapshot-refresh doer; push completion on success AND failure
0491 so the catalog never hangs on 'Updating…'. See docs/SSE_PUSH.md."""
0492 cmd = [sys.executable, str(RUCIO_SNAPSHOT_SCRIPT)]
0493 self.logger.info("PRODOPS rucio_snapshot_update: refreshing current/last snapshot")
0494 try:
0495 p = subprocess.run(cmd, capture_output=True, text=True,
0496 timeout=RUCIO_SNAPSHOT_TIMEOUT)
0497 except subprocess.TimeoutExpired:
0498 self.logger.error(
0499 f"PRODOPS rucio_snapshot_update TIMEOUT after {RUCIO_SNAPSHOT_TIMEOUT}s")
0500 self.send_message('/topic/epictopic', {
0501 'msg_type': 'rucio_snapshot_ready', 'ok': False,
0502 'error': f'timed out after {RUCIO_SNAPSHOT_TIMEOUT}s'})
0503 return
0504 for line in (p.stdout or "").splitlines():
0505 self.logger.info(f" rucio-snapshot-update: {line}")
0506 for line in (p.stderr or "").splitlines():
0507 self.logger.info(f" rucio-snapshot-update: {line}")
0508 if p.returncode != 0:
0509 stderr = (p.stderr or "").strip()
0510 reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0511 self.logger.error(f"PRODOPS rucio_snapshot_update FAILED rc={p.returncode}")
0512 self.send_message('/topic/epictopic', {
0513 'msg_type': 'rucio_snapshot_ready', 'ok': False, 'error': reason})
0514 else:
0515 self.logger.info("PRODOPS rucio_snapshot_update done")
0516 self.send_message('/topic/epictopic', {
0517 'msg_type': 'rucio_snapshot_ready', 'ok': True})
0518
0519 def _handle_evgen_rucio_update(self, m):
0520 """Assimilate the JLab Rucio EVGEN inventory off the receiver thread — a
0521 live JLab fetch + per-dataset match, too slow for the web request.
0522 Deduped to one assimilation at a time."""
0523 self.run_in_background(
0524 self._do_evgen_rucio_update, m,
0525 dedup_key="evgen_rucio_update", label="evgen_rucio_update")
0526
0527 def _do_evgen_rucio_update(self, m):
0528 """Run the EVGEN assimilation doer (--apply); push completion on success
0529 AND failure so the catalog never hangs on 'Updating…'. See docs/SSE_PUSH.md."""
0530 cmd = [sys.executable, str(EVGEN_RUCIO_SCRIPT), "--apply"]
0531 self.logger.info("PRODOPS evgen_rucio_update: assimilating JLab Rucio EVGEN")
0532 try:
0533 p = subprocess.run(cmd, capture_output=True, text=True,
0534 timeout=EVGEN_RUCIO_TIMEOUT)
0535 except subprocess.TimeoutExpired:
0536 self.logger.error(
0537 f"PRODOPS evgen_rucio_update TIMEOUT after {EVGEN_RUCIO_TIMEOUT}s")
0538 self.send_message('/topic/epictopic', {
0539 'msg_type': 'evgen_rucio_ready', 'ok': False,
0540 'error': f'timed out after {EVGEN_RUCIO_TIMEOUT}s'})
0541 return
0542 for line in (p.stdout or "").splitlines():
0543 self.logger.info(f" import-evgen-rucio: {line}")
0544 for line in (p.stderr or "").splitlines():
0545 self.logger.info(f" import-evgen-rucio: {line}")
0546 if p.returncode != 0:
0547 stderr = (p.stderr or "").strip()
0548 reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0549 self.logger.error(f"PRODOPS evgen_rucio_update FAILED rc={p.returncode}")
0550 self.send_message('/topic/epictopic', {
0551 'msg_type': 'evgen_rucio_ready', 'ok': False, 'error': reason})
0552 else:
0553 self.logger.info("PRODOPS evgen_rucio_update done")
0554 self.send_message('/topic/epictopic', {
0555 'msg_type': 'evgen_rucio_ready', 'ok': True})
0556
0557 def _handle_catalog_import(self, m):
0558 """Run a PCS catalog import (csv / epic-prod) off the receiver thread —
0559 the epic-prod walk of ~4900 datasets is far too slow for the web request
0560 (which is why it moved here). Deduped per source."""
0561 source = m.get('source', '')
0562 self.run_in_background(
0563 self._do_catalog_import, m,
0564 dedup_key=f"catalog_import:{source}", label=f"catalog_import:{source}")
0565
0566 def _do_catalog_import(self, m):
0567 """Run the catalog-import doer; push completion on success AND failure so
0568 the catalog never hangs on 'Updating…'. See docs/SSE_PUSH.md."""
0569 source = m.get('source', '')
0570 cmd = [sys.executable, str(CATALOG_IMPORT_SCRIPT), source]
0571 self.logger.info(f"PRODOPS catalog_import: importing {source}")
0572 try:
0573 p = subprocess.run(cmd, capture_output=True, text=True,
0574 timeout=CATALOG_IMPORT_TIMEOUT)
0575 except subprocess.TimeoutExpired:
0576 self.logger.error(
0577 f"PRODOPS catalog_import {source} TIMEOUT after {CATALOG_IMPORT_TIMEOUT}s")
0578 self.send_message('/topic/epictopic', {
0579 'msg_type': 'catalog_import_ready', 'source': source, 'ok': False,
0580 'error': f'timed out after {CATALOG_IMPORT_TIMEOUT}s'})
0581 return
0582 for line in (p.stdout or "").splitlines():
0583 self.logger.info(f" pcs-catalog-import: {line}")
0584 for line in (p.stderr or "").splitlines():
0585 self.logger.info(f" pcs-catalog-import: {line}")
0586 if p.returncode != 0:
0587 stderr = (p.stderr or "").strip()
0588 reason = stderr.splitlines()[-1] if stderr else f"rc={p.returncode}"
0589 self.logger.error(f"PRODOPS catalog_import {source} FAILED rc={p.returncode}")
0590 self.send_message('/topic/epictopic', {
0591 'msg_type': 'catalog_import_ready', 'source': source, 'ok': False,
0592 'error': reason})
0593 else:
0594 summary = (p.stdout or "").strip().splitlines()
0595 self.logger.info(f"PRODOPS catalog_import {source} done")
0596 self.send_message('/topic/epictopic', {
0597 'msg_type': 'catalog_import_ready', 'source': source, 'ok': True,
0598 'summary': summary[-1] if summary else ''})
0599
0600
0601
0602 def _mark_error(self, jobdir, reason):
0603 """Record a failed fetch in the cache dir (attempt count + reason) so the
0604 web view can surface it and stop auto-retrying past the cap."""
0605 try:
0606 os.makedirs(jobdir, exist_ok=True)
0607 path = os.path.join(jobdir, ERROR_MARKER)
0608 attempts = 0
0609 if os.path.isfile(path):
0610 try:
0611 with open(path) as f:
0612 attempts = int(json.load(f).get("attempts", 0))
0613 except (ValueError, OSError):
0614 attempts = 0
0615 with open(path, "w") as f:
0616 json.dump({"attempts": attempts + 1, "last_error": reason,
0617 "ts": datetime.now(timezone.utc).isoformat()}, f)
0618 except OSError as e:
0619 self.logger.error(f"PRODOPS could not write {ERROR_MARKER} in {jobdir}: {e}")
0620
0621
0622 def main():
0623 agent = EpicProdOpsAgent()
0624 agent.run()
0625
0626
0627 sys.exit(EXIT_DELIBERATE if agent._deliberate else 0)
0628
0629
0630 if __name__ == "__main__":
0631 main()