Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:44

0001 """Postgres access to swf-remote's `entry` / `entry_context` tables.
0002 
0003 The alarm engine runs standalone (no Django), but the state it writes lives
0004 in swf-remote's own Postgres, not a side store. Schema is owned by Django
0005 migrations in remote_app/models.py. Everything here is raw SQL against:
0006 
0007     entry             — generic document rows (kind='alarm' configs,
0008                         kind='event' firings, kind='engine_run' ticks, …)
0009     entry_context     — named project/topic groupings
0010 
0011 Conventions used by the alarm system:
0012     context.name       = 'swf-alarms'
0013     kind='alarm'       — one per configured alarm, data.entry_id like
0014                          'alarm_panda_failure_rate_sakib'.
0015                          data = {kind, enabled, severity, recipients,
0016                                  params, ...}
0017                          content = human-readable description (used as
0018                          the top of alarm emails; editable in the UI).
0019     kind='event'       — one per firing instance. Shares
0020                          data.entry_id = 'event_<alarm_name>' with all
0021                          other firings of that alarm (non-unique).
0022                          data = {fire_time, clear_time (null=active),
0023                                  dedupe_key, subject, severity, recipients,
0024                                  alarm_config_id, ...context...}
0025                          content = body text used in the email.
0026     kind='engine_run'  — one per engine tick. data = summary counters.
0027 
0028 Archive: status='archive' is filtered out of live dashboard queries.
0029 """
0030 from __future__ import annotations
0031 
0032 import json
0033 import logging
0034 import time
0035 import uuid
0036 from contextlib import contextmanager
0037 from datetime import datetime, timezone
0038 
0039 import psycopg
0040 from psycopg.rows import dict_row
0041 
0042 
0043 log = logging.getLogger(__name__)
0044 
0045 CONTEXT_NAME = 'swf-alarms'
0046 
0047 
0048 def now_utc() -> datetime:
0049     return datetime.now(timezone.utc)
0050 
0051 
0052 def now_ts() -> float:
0053     return time.time()
0054 
0055 
0056 def new_uuid() -> str:
0057     return str(uuid.uuid4())
0058 
0059 
0060 def connect(dsn: str):
0061     return psycopg.connect(dsn, autocommit=True, row_factory=dict_row)
0062 
0063 
0064 def init_schema(conn) -> None:
0065     """No-op — migrations own the schema. Kept for API symmetry."""
0066     return
0067 
0068 
0069 @contextmanager
0070 def transaction(conn):
0071     with conn.transaction():
0072         yield
0073 
0074 
0075 # ── alarm configs ──────────────────────────────────────────────────────────
0076 
0077 def list_alarm_configs(conn, *, enabled_only: bool = True) -> list[dict]:
0078     """Load all non-archived alarm config entries from the swf-alarms context.
0079 
0080     Returns rows ordered by data.entry_id ascending for deterministic runs.
0081     """
0082     q = """
0083         SELECT e.*
0084         FROM entry e
0085         JOIN entry_context c ON c.name = e.context_id
0086         WHERE c.name = %s
0087           AND e.kind = 'alarm'
0088           AND e.archived = FALSE
0089           AND e.deleted_at IS NULL
0090         ORDER BY e.data->>'entry_id'
0091     """
0092     with conn.cursor() as cur:
0093         cur.execute(q, (CONTEXT_NAME,))
0094         rows = cur.fetchall()
0095     if enabled_only:
0096         rows = [r for r in rows if (r.get('data') or {}).get('enabled', True)]
0097     return rows
0098 
0099 
0100 # ── events (firings) ───────────────────────────────────────────────────────
0101 
0102 def active_events_for_alarm(conn, alarm_entry_id: str) -> list[dict]:
0103     """All currently-active (fire_time set, clear_time null) events for this
0104     alarm. Archived events are excluded."""
0105     q = """
0106         SELECT * FROM entry
0107         WHERE kind = 'event'
0108           AND context_id = %s
0109           AND data->>'entry_id' = %s
0110           AND (data->>'clear_time') IS NULL
0111           AND archived = FALSE
0112           AND deleted_at IS NULL
0113     """
0114     event_entry_id = f"event_{alarm_entry_id[len('alarm_'):]}" if alarm_entry_id.startswith('alarm_') else f"event_{alarm_entry_id}"
0115     with conn.cursor() as cur:
0116         cur.execute(q, (CONTEXT_NAME, event_entry_id))
0117         return cur.fetchall()
0118 
0119 
0120 def create_event(conn, *, alarm_entry_id: str, dedupe_key: str,
0121                  subject: str, body: str,
0122                  recipients: list[str], extra_data: dict,
0123                  alarm_config_uuid: str) -> str:
0124     """Insert a new kind='event' entry with fire_time=now, clear_time=null.
0125 
0126     Returns the new Entry UUID.
0127     """
0128     event_entry_id = f"event_{alarm_entry_id[len('alarm_'):]}" if alarm_entry_id.startswith('alarm_') else f"event_{alarm_entry_id}"
0129     now = now_ts()
0130     data = {
0131         'entry_id': event_entry_id,
0132         'fire_time': now,
0133         'clear_time': None,
0134         'last_seen': now,
0135         'dedupe_key': dedupe_key,
0136         'subject': subject,
0137         'recipients': list(recipients),
0138         'alarm_config_id': alarm_config_uuid,
0139         **extra_data,
0140     }
0141     # `last_notified` is NOT set here. The engine bundles detections into
0142     # one per-alarm email per tick; the bundle sender stamps
0143     # `last_notified` on each included event only if the bundle send
0144     # succeeds. Events created while the alarm is emails-off therefore
0145     # have no `last_notified`, which means the next bundle (once emails
0146     # are turned on) naturally sweeps them up.
0147     new_id = new_uuid()
0148     with conn.cursor() as cur:
0149         cur.execute(
0150             """INSERT INTO entry
0151                (id, title, content, kind, context_id, data, status,
0152                 archived, timestamp_created, timestamp_modified)
0153                VALUES (%s, %s, %s, 'event', %s, %s::jsonb, 'active',
0154                        FALSE, %s, %s)""",
0155             (new_id, subject[:255], body, CONTEXT_NAME,
0156              json.dumps(data, default=str), now, now),
0157         )
0158     return new_id
0159 
0160 
0161 def touch_event_last_seen(conn, event_uuid: str) -> None:
0162     """Bump data.last_seen (and timestamp_modified) on an active event."""
0163     now = now_ts()
0164     with conn.cursor() as cur:
0165         cur.execute(
0166             """UPDATE entry
0167                SET data = jsonb_set(data, '{last_seen}', to_jsonb(%s::float8), true),
0168                    timestamp_modified = %s
0169                WHERE id = %s""",
0170             (now, now, event_uuid),
0171         )
0172 
0173 
0174 def mark_event_notified(conn, event_uuid: str) -> None:
0175     """Set data.last_notified = now on an event — bumped on every email."""
0176     now = now_ts()
0177     with conn.cursor() as cur:
0178         cur.execute(
0179             """UPDATE entry
0180                SET data = jsonb_set(data, '{last_notified}', to_jsonb(%s::float8), true),
0181                    timestamp_modified = %s
0182                WHERE id = %s""",
0183             (now, now, event_uuid),
0184         )
0185 
0186 
0187 def resolve_recipients(conn, tokens: list[str] | None) -> tuple[list[str], list[str]]:
0188     """Expand @<team> tokens into their member emails using the DB.
0189 
0190     Same contract as remote_app.alarms_data.expand_recipients(): returns
0191     (emails, unresolved). Tokens may be emails or @<team>; comma/whitespace
0192     separators already split by the caller.
0193     """
0194     if not tokens:
0195         return [], []
0196     emails: list[str] = []
0197     unresolved: list[str] = []
0198     for tok in tokens:
0199         t = (tok or '').strip()
0200         if not t:
0201             continue
0202         if t.startswith('@'):
0203             with conn.cursor() as cur:
0204                 cur.execute(
0205                     "SELECT content FROM entry WHERE context_id='teams' "
0206                     "AND kind='team' AND name=%s AND archived=FALSE "
0207                     "AND deleted_at IS NULL",
0208                     (t,),
0209                 )
0210                 row = cur.fetchone()
0211             if row is None or not (row.get('content') or '').strip():
0212                 unresolved.append(t)
0213                 continue
0214             for part in _split_tokens(row['content']):
0215                 emails.append(part)
0216         else:
0217             emails.append(t)
0218     # Dedup case-insensitively on emails while preserving order.
0219     seen: set[str] = set()
0220     final: list[str] = []
0221     for e in emails:
0222         k = e.lower()
0223         if k in seen:
0224             continue
0225         seen.add(k)
0226         final.append(e)
0227     return final, unresolved
0228 
0229 
0230 def _split_tokens(s: str) -> list[str]:
0231     for sep in [',', ';', '\n', '\r', '\t']:
0232         s = s.replace(sep, ' ')
0233     return [t.strip() for t in s.split(' ') if t.strip()]
0234 
0235 
0236 def clear_event(conn, event_uuid: str) -> None:
0237     """Set data.clear_time = now on an event (condition has resolved)."""
0238     now = now_ts()
0239     with conn.cursor() as cur:
0240         cur.execute(
0241             """UPDATE entry
0242                SET data = jsonb_set(data, '{clear_time}', to_jsonb(%s::float8), true),
0243                    timestamp_modified = %s
0244                WHERE id = %s""",
0245             (now, now, event_uuid),
0246         )
0247 
0248 
0249 # ── engine runs ────────────────────────────────────────────────────────────
0250 
0251 def start_engine_run(conn) -> str:
0252     """Create a kind='engine_run' entry with started_at; return its UUID."""
0253     now = now_ts()
0254     uid = new_uuid()
0255     title = f"Engine run {datetime.fromtimestamp(now, tz=timezone.utc).strftime('%Y%m%d %H:%M:%S UTC')}"
0256     data = {'entry_id': f'run_{int(now)}', 'started_at': now}
0257     with conn.cursor() as cur:
0258         cur.execute(
0259             """INSERT INTO entry
0260                (id, title, content, kind, context_id, data, status,
0261                 archived, timestamp_created, timestamp_modified)
0262                VALUES (%s, %s, '', 'engine_run', %s, %s::jsonb, 'active',
0263                        FALSE, %s, %s)""",
0264             (uid, title, CONTEXT_NAME, json.dumps(data), now, now),
0265         )
0266     return uid
0267 
0268 
0269 def finish_engine_run(conn, run_uuid: str, *, alarms_run: int,
0270                       alarms_seen: int, notifications_sent: int,
0271                       errors: int, error_details: str = '',
0272                       per_alarm: dict | None = None) -> None:
0273     now = now_ts()
0274     update = {
0275         'finished_at': now,
0276         'alarms_run': alarms_run,
0277         'alarms_seen': alarms_seen,
0278         'notifications_sent': notifications_sent,
0279         'errors': errors,
0280         'error_details': error_details,
0281         'per_alarm': per_alarm or {},
0282     }
0283     with conn.cursor() as cur:
0284         cur.execute(
0285             """UPDATE entry
0286                SET data = data || %s::jsonb,
0287                    status = 'done',
0288                    timestamp_modified = %s
0289                WHERE id = %s""",
0290             (json.dumps(update, default=str), now, run_uuid),
0291         )