File indexing completed on 2026-04-27 07:41:44
0001 """swf-alarms engine entry point.
0002
0003 Per-tick behavior (active/clear semantics):
0004
0005 1. Load kind='alarm' entries (not archived) from the DB. `data.enabled`
0006 is NOT consulted here — the algorithm runs for every non-archived
0007 alarm every tick. `enabled` gates only email delivery (see step 3).
0008 "Stop running this alarm" is expressed as archived=True.
0009 2. For each alarm config:
0010 a. Fetch existing active events for this alarm (one per entity still
0011 firing) — indexed by data.dedupe_key.
0012 b. Call the alarm's ``detect(client, params)`` and collect the
0013 dedupe_keys it yields this tick.
0014 c. For each detection:
0015 - If already active for same dedupe_key: touch last_seen. If
0016 the alarm's emails are on AND the event has never been
0017 notified (created while emails-off) OR the per-alarm
0018 renotification window has elapsed, add to the RENOTIFY
0019 bundle.
0020 - Else: create a new event entry (fire_time=now, clear_time=null),
0021 compose and store the single-detection body, add to the NEW
0022 bundle. Event rows are created unconditionally of `enabled`
0023 so the dashboard stays truthful.
0024 d. For each previously-active event NOT in the current detection set:
0025 set data.clear_time = now (auto-clear — unconditional of `enabled`).
0026 3. If the alarm's emails are on AND (NEW or RENOTIFY) is non-empty:
0027 compose ONE bundled email listing every detection in both sections
0028 and ship it via SES. On success, stamp `last_notified = now` on
0029 every included event. ``notifications_sent`` counts this as one,
0030 regardless of bundle size. Closes out the `engine_run` entry with
0031 aggregate counters + per-alarm detail (including `bundle_new`,
0032 `bundle_renotify`, `bundle_sent`).
0033
0034 No cooldown flag: dedup is *state-based* — one active event per (alarm,
0035 entity). Re-firings only happen after a cleared→fire transition, plus
0036 the optional renotification window re-emails a still-active event (as
0037 part of the next bundle).
0038 """
0039 from __future__ import annotations
0040
0041 import argparse
0042 import importlib
0043 import json
0044 import logging
0045 import sys
0046 import time
0047 import traceback
0048
0049 from . import config as config_mod
0050 from . import db
0051 from .lib import Detection
0052 from .fetch import Client, FetchError
0053 from .notify import Alarm, send_email_ses
0054
0055
0056 def _load_alarm_module(alarm_entry_id: str):
0057 """Return the per-alarm module for ``alarm_<name>``.
0058
0059 Snowflake dispatch: one Python file per alarm, named
0060 ``swf_alarms/alarms/<name>.py``, exposing ``detect(client, params)``.
0061 """
0062 name = alarm_entry_id[len('alarm_'):] if alarm_entry_id.startswith('alarm_') else alarm_entry_id
0063 return importlib.import_module(f"swf_alarms.alarms.{name}")
0064
0065
0066 log = logging.getLogger("swf_alarms")
0067
0068
0069 def _configure_logging(log_path: str | None, verbose: bool) -> None:
0070 level = logging.DEBUG if verbose else logging.INFO
0071 handlers: list[logging.Handler] = [logging.StreamHandler(sys.stdout)]
0072 if log_path:
0073 handlers.append(logging.FileHandler(log_path))
0074 logging.basicConfig(
0075 level=level,
0076 format="%(asctime)s %(levelname)s %(name)s %(message)s",
0077 handlers=handlers, force=True,
0078 )
0079
0080
0081 def main(argv: list[str] | None = None) -> int:
0082 ap = argparse.ArgumentParser(prog="swf-alarms-run")
0083 ap.add_argument("--config", required=True, help="path to engine TOML")
0084 ap.add_argument("--dry-run", action="store_true",
0085 help="detect and persist, but suppress notifications")
0086 ap.add_argument("-v", "--verbose", action="store_true")
0087 args = ap.parse_args(argv)
0088
0089 cfg = config_mod.load(args.config)
0090 _configure_logging(cfg.engine.log_path, args.verbose)
0091
0092 log.info("run starting config=%s dry_run=%s", args.config, args.dry_run)
0093 conn = db.connect(cfg.db_dsn)
0094 run_uuid = db.start_engine_run(conn)
0095
0096 client = Client(cfg.engine.swf_remote_base_url,
0097 timeout=cfg.engine.request_timeout)
0098
0099
0100
0101
0102
0103 alarm_configs = db.list_alarm_configs(conn, enabled_only=False)
0104 log.info("loaded %d alarm config(s) — algorithms always run; "
0105 "per-alarm `enabled` controls email only", len(alarm_configs))
0106
0107 alarms_run = 0
0108 alarms_seen = 0
0109 notifications_sent = 0
0110 errors = 0
0111 error_traces: list[str] = []
0112 per_alarm: dict[str, dict] = {}
0113
0114 for alarm in alarm_configs:
0115 data = alarm.get("data") or {}
0116 alarm_entry_id = data.get("entry_id", "")
0117 if not alarm_entry_id:
0118 log.error("alarm %s missing data.entry_id — skipping", alarm["id"])
0119 errors += 1
0120 continue
0121 try:
0122 alarm_mod = _load_alarm_module(alarm_entry_id)
0123 fn = alarm_mod.detect
0124 except (ImportError, AttributeError) as e:
0125 msg = f"no alarm module for {alarm_entry_id!r}: {e}"
0126 log.error("%s — skipping", msg)
0127 errors += 1
0128 error_traces.append(f"[{alarm_entry_id}] {msg}")
0129 per_alarm[alarm_entry_id] = {
0130 "enabled": True, "alarms_seen": 0,
0131 "errors": 1, "error_message": msg,
0132 }
0133 continue
0134
0135 alarms_run += 1
0136 params = dict(data.get("params") or {})
0137 raw_recipients = data.get("recipients") or []
0138
0139 if isinstance(raw_recipients, str):
0140 raw_recipients = db._split_tokens(raw_recipients)
0141 else:
0142 raw_recipients = list(raw_recipients)
0143
0144 recipients, unresolved_teams = db.resolve_recipients(conn, raw_recipients)
0145 if unresolved_teams:
0146 log.warning("alarm %s: unresolved team(s) %s — skipped",
0147 alarm_entry_id, unresolved_teams)
0148
0149
0150 renotification_window_hours = float(
0151 data.get("renotification_window_hours") or 0)
0152
0153 alarm_seen = 0
0154 alarm_err = 0
0155 alarm_err_msg = ""
0156 detected_keys: set[str] = set()
0157
0158
0159
0160 email_enabled = bool(data.get("enabled", True))
0161 send_mail = email_enabled and not args.dry_run
0162
0163
0164
0165
0166 new_bundle: list[tuple[str, Detection]] = []
0167 renotify_bundle: list[tuple[str, Detection]] = []
0168
0169
0170 try:
0171 active_rows = db.active_events_for_alarm(conn, alarm_entry_id)
0172 except Exception as e:
0173 log.error("active_events_for_alarm(%s) failed: %s",
0174 alarm_entry_id, e)
0175 active_rows = []
0176 active_by_key = {
0177 (r.get("data") or {}).get("dedupe_key"): r for r in active_rows
0178 }
0179
0180 try:
0181 for det in fn(client, params):
0182 alarm_seen += 1
0183 alarms_seen += 1
0184 detected_keys.add(det.dedupe_key)
0185 existing = active_by_key.get(det.dedupe_key)
0186
0187 if existing is None:
0188
0189
0190
0191
0192
0193
0194 full_body = _compose_body(
0195 alarm.get("content") or "", det.body_context)
0196 event_uuid = db.create_event(
0197 conn,
0198 alarm_entry_id=alarm_entry_id,
0199 dedupe_key=det.dedupe_key,
0200 subject=det.subject,
0201 body=full_body,
0202 recipients=recipients,
0203 extra_data=det.extra_data,
0204 alarm_config_uuid=alarm["id"],
0205 )
0206 new_bundle.append((event_uuid, det))
0207 continue
0208
0209
0210 db.touch_event_last_seen(conn, existing["id"])
0211
0212
0213
0214
0215
0216
0217
0218
0219
0220
0221 last_notified = float(
0222 (existing.get("data") or {}).get("last_notified") or 0)
0223 if last_notified == 0:
0224 renotify_bundle.append((existing["id"], det))
0225 elif renotification_window_hours > 0 and (
0226 time.time() - last_notified
0227 >= renotification_window_hours * 3600):
0228 renotify_bundle.append((existing["id"], det))
0229 except FetchError as e:
0230 log.error("alarm %s fetch failed: %s", alarm_entry_id, e)
0231 errors += 1
0232 alarm_err = 1
0233 alarm_err_msg = f"fetch: {e}"
0234 error_traces.append(f"[{alarm_entry_id}] fetch: {e}")
0235 except Exception:
0236 tb = traceback.format_exc()
0237 log.error("alarm %s raised:\n%s", alarm_entry_id, tb)
0238 errors += 1
0239 alarm_err = 1
0240 alarm_err_msg = tb
0241 error_traces.append(f"[{alarm_entry_id}]\n{tb}")
0242
0243
0244
0245
0246 if alarm_err == 0:
0247 for key, ev in active_by_key.items():
0248 if key not in detected_keys:
0249 try:
0250 db.clear_event(conn, ev["id"])
0251 except Exception as e:
0252 log.error("clear_event failed for %s: %s", ev["id"], e)
0253
0254
0255
0256
0257
0258
0259 bundle_sent = False
0260 bundle_new = len(new_bundle)
0261 bundle_renotify = len(renotify_bundle)
0262 bundle_subject = ""
0263 if new_bundle or renotify_bundle:
0264 bundle_subject, body = _compose_bundle(
0265 alarm_entry_id=alarm_entry_id,
0266 alarm_description=alarm.get("content") or "",
0267 new_bundle=new_bundle,
0268 renotify_bundle=renotify_bundle,
0269 )
0270 if send_mail:
0271 ok = send_email_ses(
0272 Alarm(
0273 alarm_name=alarm_entry_id,
0274 dedupe_key=f"bundle:{int(time.time())}",
0275 subject=bundle_subject,
0276 body=body,
0277 recipients=list(recipients),
0278 data={
0279 "bundle": True,
0280 "new_count": bundle_new,
0281 "renotify_count": bundle_renotify,
0282 },
0283 ),
0284 region=cfg.email.region,
0285 from_addr=cfg.email.from_addr,
0286 )
0287 if ok:
0288 for event_uuid, _ in new_bundle:
0289 db.mark_event_notified(conn, event_uuid)
0290 for event_uuid, _ in renotify_bundle:
0291 db.mark_event_notified(conn, event_uuid)
0292 notifications_sent += 1
0293 bundle_sent = True
0294
0295 per_alarm[alarm_entry_id] = {
0296 "enabled": email_enabled,
0297 "alarms_seen": alarm_seen,
0298 "errors": alarm_err,
0299 "error_message": alarm_err_msg,
0300 "bundle_new": bundle_new,
0301 "bundle_renotify": bundle_renotify,
0302 "bundle_sent": bundle_sent,
0303 "bundle_subject": bundle_subject,
0304 "bundle_new_event_ids": [u for u, _ in new_bundle],
0305 "bundle_renotify_event_ids": [u for u, _ in renotify_bundle],
0306 "params": params,
0307 "recipients": recipients,
0308 }
0309
0310 db.finish_engine_run(
0311 conn, run_uuid,
0312 alarms_run=alarms_run,
0313 alarms_seen=alarms_seen,
0314 notifications_sent=notifications_sent,
0315 errors=errors,
0316 error_details="\n\n".join(error_traces),
0317 per_alarm=per_alarm,
0318 )
0319 log.info("run done alarms_run=%d alarms_seen=%d sent=%d errors=%d",
0320 alarms_run, alarms_seen, notifications_sent, errors)
0321 return 0 if errors == 0 else 1
0322
0323
0324 def _compose_body(alarm_description: str, detail: str) -> str:
0325 """Single-detection body — used when persisting the event row so the
0326 event-detail page reads as a standalone record. The per-tick email
0327 is built by `_compose_bundle` and may contain many such details."""
0328 if alarm_description and alarm_description.strip():
0329 return f"{alarm_description.rstrip()}\n\n---\n\n{detail}"
0330 return detail
0331
0332
0333 def _compose_bundle(*, alarm_entry_id: str, alarm_description: str,
0334 new_bundle: list, renotify_bundle: list) -> tuple[str, str]:
0335 """Build ONE email covering every detection that warrants mail this
0336 tick for this alarm. Returns (subject, body).
0337
0338 Subject: "[{alarm_entry_id}] {N} detection(s): {X} new, {Y} continuing"
0339 Body: alarm description, then a "New" section, then a "Continuing"
0340 section. Each section lists its detections with subject + indented
0341 body_context.
0342 """
0343 n_new = len(new_bundle)
0344 n_ren = len(renotify_bundle)
0345 n_total = n_new + n_ren
0346 parts = [f"{n_total} detection(s)"]
0347 if n_new:
0348 parts.append(f"{n_new} new")
0349 if n_ren:
0350 parts.append(f"{n_ren} continuing")
0351 subject = f"[{alarm_entry_id}] {', '.join(parts)}"
0352
0353 lines: list[str] = []
0354 desc = (alarm_description or "").rstrip()
0355 if desc:
0356 lines.append(desc)
0357 lines.append("")
0358 lines.append("---")
0359 lines.append("")
0360
0361 def _append_section(header: str, items: list) -> None:
0362 if not items:
0363 return
0364 lines.append(header)
0365 lines.append("")
0366 for i, (_uuid, det) in enumerate(items, 1):
0367 lines.append(f" [{i}] {det.subject}")
0368 if det.body_context:
0369 for body_line in det.body_context.splitlines():
0370 lines.append(f" {body_line}")
0371 lines.append("")
0372 lines.append("")
0373
0374 _append_section(f"NEW ({n_new}):", new_bundle)
0375 _append_section(f"CONTINUING — renotification ({n_ren}):", renotify_bundle)
0376
0377 return subject, "\n".join(lines).rstrip() + "\n"
0378
0379
0380 if __name__ == "__main__":
0381 sys.exit(main())