Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:44

0001 """swf-alarms engine entry point.
0002 
0003 Per-tick behavior (active/clear semantics):
0004 
0005   1. Load kind='alarm' entries (not archived) from the DB. `data.enabled`
0006      is NOT consulted here — the algorithm runs for every non-archived
0007      alarm every tick. `enabled` gates only email delivery (see step 3).
0008      "Stop running this alarm" is expressed as archived=True.
0009   2. For each alarm config:
0010      a. Fetch existing active events for this alarm (one per entity still
0011         firing) — indexed by data.dedupe_key.
0012      b. Call the alarm's ``detect(client, params)`` and collect the
0013         dedupe_keys it yields this tick.
0014      c. For each detection:
0015           - If already active for same dedupe_key: touch last_seen. If
0016             the alarm's emails are on AND the event has never been
0017             notified (created while emails-off) OR the per-alarm
0018             renotification window has elapsed, add to the RENOTIFY
0019             bundle.
0020           - Else: create a new event entry (fire_time=now, clear_time=null),
0021             compose and store the single-detection body, add to the NEW
0022             bundle. Event rows are created unconditionally of `enabled`
0023             so the dashboard stays truthful.
0024      d. For each previously-active event NOT in the current detection set:
0025         set data.clear_time = now (auto-clear — unconditional of `enabled`).
0026   3. If the alarm's emails are on AND (NEW or RENOTIFY) is non-empty:
0027      compose ONE bundled email listing every detection in both sections
0028      and ship it via SES. On success, stamp `last_notified = now` on
0029      every included event. ``notifications_sent`` counts this as one,
0030      regardless of bundle size. Closes out the `engine_run` entry with
0031      aggregate counters + per-alarm detail (including `bundle_new`,
0032      `bundle_renotify`, `bundle_sent`).
0033 
0034 No cooldown flag: dedup is *state-based* — one active event per (alarm,
0035 entity). Re-firings only happen after a cleared→fire transition, plus
0036 the optional renotification window re-emails a still-active event (as
0037 part of the next bundle).
0038 """
0039 from __future__ import annotations
0040 
0041 import argparse
0042 import importlib
0043 import json
0044 import logging
0045 import sys
0046 import time
0047 import traceback
0048 
0049 from . import config as config_mod
0050 from . import db
0051 from .lib import Detection
0052 from .fetch import Client, FetchError
0053 from .notify import Alarm, send_email_ses
0054 
0055 
0056 def _load_alarm_module(alarm_entry_id: str):
0057     """Return the per-alarm module for ``alarm_<name>``.
0058 
0059     Snowflake dispatch: one Python file per alarm, named
0060     ``swf_alarms/alarms/<name>.py``, exposing ``detect(client, params)``.
0061     """
0062     name = alarm_entry_id[len('alarm_'):] if alarm_entry_id.startswith('alarm_') else alarm_entry_id
0063     return importlib.import_module(f"swf_alarms.alarms.{name}")
0064 
0065 
0066 log = logging.getLogger("swf_alarms")
0067 
0068 
0069 def _configure_logging(log_path: str | None, verbose: bool) -> None:
0070     level = logging.DEBUG if verbose else logging.INFO
0071     handlers: list[logging.Handler] = [logging.StreamHandler(sys.stdout)]
0072     if log_path:
0073         handlers.append(logging.FileHandler(log_path))
0074     logging.basicConfig(
0075         level=level,
0076         format="%(asctime)s %(levelname)s %(name)s %(message)s",
0077         handlers=handlers, force=True,
0078     )
0079 
0080 
0081 def main(argv: list[str] | None = None) -> int:
0082     ap = argparse.ArgumentParser(prog="swf-alarms-run")
0083     ap.add_argument("--config", required=True, help="path to engine TOML")
0084     ap.add_argument("--dry-run", action="store_true",
0085                     help="detect and persist, but suppress notifications")
0086     ap.add_argument("-v", "--verbose", action="store_true")
0087     args = ap.parse_args(argv)
0088 
0089     cfg = config_mod.load(args.config)
0090     _configure_logging(cfg.engine.log_path, args.verbose)
0091 
0092     log.info("run starting  config=%s  dry_run=%s", args.config, args.dry_run)
0093     conn = db.connect(cfg.db_dsn)
0094     run_uuid = db.start_engine_run(conn)
0095 
0096     client = Client(cfg.engine.swf_remote_base_url,
0097                     timeout=cfg.engine.request_timeout)
0098 
0099     # Load ALL non-archived alarm configs. Per-alarm `data.enabled` gates
0100     # ONLY the email side — detection, event creation, active/clear,
0101     # dashboard visibility keep working regardless. "Disabled" means
0102     # "silent": alarm reporting still happens, emails do not.
0103     alarm_configs = db.list_alarm_configs(conn, enabled_only=False)
0104     log.info("loaded %d alarm config(s) — algorithms always run; "
0105              "per-alarm `enabled` controls email only", len(alarm_configs))
0106 
0107     alarms_run = 0
0108     alarms_seen = 0
0109     notifications_sent = 0
0110     errors = 0
0111     error_traces: list[str] = []
0112     per_alarm: dict[str, dict] = {}
0113 
0114     for alarm in alarm_configs:
0115         data = alarm.get("data") or {}
0116         alarm_entry_id = data.get("entry_id", "")
0117         if not alarm_entry_id:
0118             log.error("alarm %s missing data.entry_id — skipping", alarm["id"])
0119             errors += 1
0120             continue
0121         try:
0122             alarm_mod = _load_alarm_module(alarm_entry_id)
0123             fn = alarm_mod.detect
0124         except (ImportError, AttributeError) as e:
0125             msg = f"no alarm module for {alarm_entry_id!r}: {e}"
0126             log.error("%s — skipping", msg)
0127             errors += 1
0128             error_traces.append(f"[{alarm_entry_id}] {msg}")
0129             per_alarm[alarm_entry_id] = {
0130                 "enabled": True, "alarms_seen": 0,
0131                 "errors": 1, "error_message": msg,
0132             }
0133             continue
0134 
0135         alarms_run += 1
0136         params = dict(data.get("params") or {})
0137         raw_recipients = data.get("recipients") or []
0138         # Accept either string (user-typed, stored verbatim) or list (legacy).
0139         if isinstance(raw_recipients, str):
0140             raw_recipients = db._split_tokens(raw_recipients)
0141         else:
0142             raw_recipients = list(raw_recipients)
0143         # Resolve @<team> tokens to member emails via DB lookup.
0144         recipients, unresolved_teams = db.resolve_recipients(conn, raw_recipients)
0145         if unresolved_teams:
0146             log.warning("alarm %s: unresolved team(s) %s — skipped",
0147                         alarm_entry_id, unresolved_teams)
0148         # Per-alarm renotification window (hours). 0 / missing = no
0149         # re-notification — a still-firing entity stays quiet until clear.
0150         renotification_window_hours = float(
0151             data.get("renotification_window_hours") or 0)
0152 
0153         alarm_seen = 0
0154         alarm_err = 0
0155         alarm_err_msg = ""
0156         detected_keys: set[str] = set()
0157 
0158         # Per-alarm email gate. When False, event rows still fire and
0159         # active/clear still tick — we just don't ship mail.
0160         email_enabled = bool(data.get("enabled", True))
0161         send_mail = email_enabled and not args.dry_run
0162 
0163         # Bundle buckets: at most ONE email per alarm per tick, listing
0164         # every detection that would otherwise have triggered a send
0165         # this tick. Entries are (event_uuid, Detection).
0166         new_bundle: list[tuple[str, Detection]] = []
0167         renotify_bundle: list[tuple[str, Detection]] = []
0168 
0169         # Existing active events for this alarm — map dedupe_key → row
0170         try:
0171             active_rows = db.active_events_for_alarm(conn, alarm_entry_id)
0172         except Exception as e:  # noqa: BLE001
0173             log.error("active_events_for_alarm(%s) failed: %s",
0174                       alarm_entry_id, e)
0175             active_rows = []
0176         active_by_key = {
0177             (r.get("data") or {}).get("dedupe_key"): r for r in active_rows
0178         }
0179 
0180         try:
0181             for det in fn(client, params):  # type: Detection
0182                 alarm_seen += 1
0183                 alarms_seen += 1
0184                 detected_keys.add(det.dedupe_key)
0185                 existing = active_by_key.get(det.dedupe_key)
0186 
0187                 if existing is None:
0188                     # New entity crosses threshold → create the event row
0189                     # unconditionally (dashboard must reflect truth even
0190                     # when emails are off). The event body is the fully
0191                     # composed single-detection body — same as before —
0192                     # so the event-detail page reads naturally. The
0193                     # bundle for this tick adds it to `new_bundle`.
0194                     full_body = _compose_body(
0195                         alarm.get("content") or "", det.body_context)
0196                     event_uuid = db.create_event(
0197                         conn,
0198                         alarm_entry_id=alarm_entry_id,
0199                         dedupe_key=det.dedupe_key,
0200                         subject=det.subject,
0201                         body=full_body,
0202                         recipients=recipients,
0203                         extra_data=det.extra_data,
0204                         alarm_config_uuid=alarm["id"],
0205                     )
0206                     new_bundle.append((event_uuid, det))
0207                     continue
0208 
0209                 # Same entity already firing — always bump last_seen.
0210                 db.touch_event_last_seen(conn, existing["id"])
0211 
0212                 # Renotification candidacy (bundled, not sent per-detection):
0213                 # computed independently of send_mail so that the
0214                 # per-run report can show what WOULD have been emailed
0215                 # even when this alarm's emails are off. We gate the
0216                 # actual SES call later on send_mail.
0217                 #   - Event never notified (last_notified missing/0 —
0218                 #     e.g. created while emails were off) → always bundle.
0219                 #   - Otherwise: renotification window must be set and
0220                 #     elapsed.
0221                 last_notified = float(
0222                     (existing.get("data") or {}).get("last_notified") or 0)
0223                 if last_notified == 0:
0224                     renotify_bundle.append((existing["id"], det))
0225                 elif renotification_window_hours > 0 and (
0226                         time.time() - last_notified
0227                         >= renotification_window_hours * 3600):
0228                     renotify_bundle.append((existing["id"], det))
0229         except FetchError as e:
0230             log.error("alarm %s fetch failed: %s", alarm_entry_id, e)
0231             errors += 1
0232             alarm_err = 1
0233             alarm_err_msg = f"fetch: {e}"
0234             error_traces.append(f"[{alarm_entry_id}] fetch: {e}")
0235         except Exception:  # noqa: BLE001
0236             tb = traceback.format_exc()
0237             log.error("alarm %s raised:\n%s", alarm_entry_id, tb)
0238             errors += 1
0239             alarm_err = 1
0240             alarm_err_msg = tb
0241             error_traces.append(f"[{alarm_entry_id}]\n{tb}")
0242 
0243         # Auto-clear events whose dedupe_key wasn't seen this tick. Only
0244         # do so if the alarm ran without error — otherwise a transient
0245         # fetch failure would clear everything.
0246         if alarm_err == 0:
0247             for key, ev in active_by_key.items():
0248                 if key not in detected_keys:
0249                     try:
0250                         db.clear_event(conn, ev["id"])
0251                     except Exception as e:  # noqa: BLE001
0252                         log.error("clear_event failed for %s: %s", ev["id"], e)
0253 
0254         # One email per alarm per tick. Compose the bundle whenever
0255         # there's anything to bundle — store its identifiers on the
0256         # engine_run row so the dashboard can show a per-run report
0257         # regardless of whether email went out. Send SES only when
0258         # this alarm's emails are on.
0259         bundle_sent = False
0260         bundle_new = len(new_bundle)
0261         bundle_renotify = len(renotify_bundle)
0262         bundle_subject = ""
0263         if new_bundle or renotify_bundle:
0264             bundle_subject, body = _compose_bundle(
0265                 alarm_entry_id=alarm_entry_id,
0266                 alarm_description=alarm.get("content") or "",
0267                 new_bundle=new_bundle,
0268                 renotify_bundle=renotify_bundle,
0269             )
0270             if send_mail:
0271                 ok = send_email_ses(
0272                     Alarm(
0273                         alarm_name=alarm_entry_id,
0274                         dedupe_key=f"bundle:{int(time.time())}",
0275                         subject=bundle_subject,
0276                         body=body,
0277                         recipients=list(recipients),
0278                         data={
0279                             "bundle": True,
0280                             "new_count": bundle_new,
0281                             "renotify_count": bundle_renotify,
0282                         },
0283                     ),
0284                     region=cfg.email.region,
0285                     from_addr=cfg.email.from_addr,
0286                 )
0287                 if ok:
0288                     for event_uuid, _ in new_bundle:
0289                         db.mark_event_notified(conn, event_uuid)
0290                     for event_uuid, _ in renotify_bundle:
0291                         db.mark_event_notified(conn, event_uuid)
0292                     notifications_sent += 1  # exactly one email, regardless of count
0293                     bundle_sent = True
0294 
0295         per_alarm[alarm_entry_id] = {
0296             "enabled": email_enabled,
0297             "alarms_seen": alarm_seen,
0298             "errors": alarm_err,
0299             "error_message": alarm_err_msg,
0300             "bundle_new": bundle_new,
0301             "bundle_renotify": bundle_renotify,
0302             "bundle_sent": bundle_sent,
0303             "bundle_subject": bundle_subject,
0304             "bundle_new_event_ids": [u for u, _ in new_bundle],
0305             "bundle_renotify_event_ids": [u for u, _ in renotify_bundle],
0306             "params": params,
0307             "recipients": recipients,
0308         }
0309 
0310     db.finish_engine_run(
0311         conn, run_uuid,
0312         alarms_run=alarms_run,
0313         alarms_seen=alarms_seen,
0314         notifications_sent=notifications_sent,
0315         errors=errors,
0316         error_details="\n\n".join(error_traces),
0317         per_alarm=per_alarm,
0318     )
0319     log.info("run done  alarms_run=%d  alarms_seen=%d  sent=%d  errors=%d",
0320              alarms_run, alarms_seen, notifications_sent, errors)
0321     return 0 if errors == 0 else 1
0322 
0323 
0324 def _compose_body(alarm_description: str, detail: str) -> str:
0325     """Single-detection body — used when persisting the event row so the
0326     event-detail page reads as a standalone record. The per-tick email
0327     is built by `_compose_bundle` and may contain many such details."""
0328     if alarm_description and alarm_description.strip():
0329         return f"{alarm_description.rstrip()}\n\n---\n\n{detail}"
0330     return detail
0331 
0332 
0333 def _compose_bundle(*, alarm_entry_id: str, alarm_description: str,
0334                     new_bundle: list, renotify_bundle: list) -> tuple[str, str]:
0335     """Build ONE email covering every detection that warrants mail this
0336     tick for this alarm. Returns (subject, body).
0337 
0338     Subject: "[{alarm_entry_id}] {N} detection(s): {X} new, {Y} continuing"
0339     Body: alarm description, then a "New" section, then a "Continuing"
0340     section. Each section lists its detections with subject + indented
0341     body_context.
0342     """
0343     n_new = len(new_bundle)
0344     n_ren = len(renotify_bundle)
0345     n_total = n_new + n_ren
0346     parts = [f"{n_total} detection(s)"]
0347     if n_new:
0348         parts.append(f"{n_new} new")
0349     if n_ren:
0350         parts.append(f"{n_ren} continuing")
0351     subject = f"[{alarm_entry_id}] {', '.join(parts)}"
0352 
0353     lines: list[str] = []
0354     desc = (alarm_description or "").rstrip()
0355     if desc:
0356         lines.append(desc)
0357         lines.append("")
0358         lines.append("---")
0359         lines.append("")
0360 
0361     def _append_section(header: str, items: list) -> None:
0362         if not items:
0363             return
0364         lines.append(header)
0365         lines.append("")
0366         for i, (_uuid, det) in enumerate(items, 1):
0367             lines.append(f"  [{i}] {det.subject}")
0368             if det.body_context:
0369                 for body_line in det.body_context.splitlines():
0370                     lines.append(f"      {body_line}")
0371             lines.append("")
0372         lines.append("")
0373 
0374     _append_section(f"NEW ({n_new}):", new_bundle)
0375     _append_section(f"CONTINUING — renotification ({n_ren}):", renotify_bundle)
0376 
0377     return subject, "\n".join(lines).rstrip() + "\n"
0378 
0379 
0380 if __name__ == "__main__":
0381     sys.exit(main())