File indexing completed on 2026-04-27 07:41:44
0001 """Postgres access to swf-remote's `entry` / `entry_context` tables.
0002
0003 The alarm engine runs standalone (no Django), but the state it writes lives
0004 in swf-remote's own Postgres, not a side store. Schema is owned by Django
0005 migrations in remote_app/models.py. Everything here is raw SQL against:
0006
0007 entry — generic document rows (kind='alarm' configs,
0008 kind='event' firings, kind='engine_run' ticks, …)
0009 entry_context — named project/topic groupings
0010
0011 Conventions used by the alarm system:
0012 context.name = 'swf-alarms'
0013 kind='alarm' — one per configured alarm, data.entry_id like
0014 'alarm_panda_failure_rate_sakib'.
0015 data = {kind, enabled, severity, recipients,
0016 params, ...}
0017 content = human-readable description (used as
0018 the top of alarm emails; editable in the UI).
0019 kind='event' — one per firing instance. Shares
0020 data.entry_id = 'event_<alarm_name>' with all
0021 other firings of that alarm (non-unique).
0022 data = {fire_time, clear_time (null=active),
0023 dedupe_key, subject, severity, recipients,
0024 alarm_config_id, ...context...}
0025 content = body text used in the email.
0026 kind='engine_run' — one per engine tick. data = summary counters.
0027
0028 Archive: status='archive' is filtered out of live dashboard queries.
0029 """
0030 from __future__ import annotations
0031
0032 import json
0033 import logging
0034 import time
0035 import uuid
0036 from contextlib import contextmanager
0037 from datetime import datetime, timezone
0038
0039 import psycopg
0040 from psycopg.rows import dict_row
0041
0042
0043 log = logging.getLogger(__name__)
0044
0045 CONTEXT_NAME = 'swf-alarms'
0046
0047
0048 def now_utc() -> datetime:
0049 return datetime.now(timezone.utc)
0050
0051
0052 def now_ts() -> float:
0053 return time.time()
0054
0055
0056 def new_uuid() -> str:
0057 return str(uuid.uuid4())
0058
0059
0060 def connect(dsn: str):
0061 return psycopg.connect(dsn, autocommit=True, row_factory=dict_row)
0062
0063
0064 def init_schema(conn) -> None:
0065 """No-op — migrations own the schema. Kept for API symmetry."""
0066 return
0067
0068
0069 @contextmanager
0070 def transaction(conn):
0071 with conn.transaction():
0072 yield
0073
0074
0075
0076
0077 def list_alarm_configs(conn, *, enabled_only: bool = True) -> list[dict]:
0078 """Load all non-archived alarm config entries from the swf-alarms context.
0079
0080 Returns rows ordered by data.entry_id ascending for deterministic runs.
0081 """
0082 q = """
0083 SELECT e.*
0084 FROM entry e
0085 JOIN entry_context c ON c.name = e.context_id
0086 WHERE c.name = %s
0087 AND e.kind = 'alarm'
0088 AND e.archived = FALSE
0089 AND e.deleted_at IS NULL
0090 ORDER BY e.data->>'entry_id'
0091 """
0092 with conn.cursor() as cur:
0093 cur.execute(q, (CONTEXT_NAME,))
0094 rows = cur.fetchall()
0095 if enabled_only:
0096 rows = [r for r in rows if (r.get('data') or {}).get('enabled', True)]
0097 return rows
0098
0099
0100
0101
0102 def active_events_for_alarm(conn, alarm_entry_id: str) -> list[dict]:
0103 """All currently-active (fire_time set, clear_time null) events for this
0104 alarm. Archived events are excluded."""
0105 q = """
0106 SELECT * FROM entry
0107 WHERE kind = 'event'
0108 AND context_id = %s
0109 AND data->>'entry_id' = %s
0110 AND (data->>'clear_time') IS NULL
0111 AND archived = FALSE
0112 AND deleted_at IS NULL
0113 """
0114 event_entry_id = f"event_{alarm_entry_id[len('alarm_'):]}" if alarm_entry_id.startswith('alarm_') else f"event_{alarm_entry_id}"
0115 with conn.cursor() as cur:
0116 cur.execute(q, (CONTEXT_NAME, event_entry_id))
0117 return cur.fetchall()
0118
0119
0120 def create_event(conn, *, alarm_entry_id: str, dedupe_key: str,
0121 subject: str, body: str,
0122 recipients: list[str], extra_data: dict,
0123 alarm_config_uuid: str) -> str:
0124 """Insert a new kind='event' entry with fire_time=now, clear_time=null.
0125
0126 Returns the new Entry UUID.
0127 """
0128 event_entry_id = f"event_{alarm_entry_id[len('alarm_'):]}" if alarm_entry_id.startswith('alarm_') else f"event_{alarm_entry_id}"
0129 now = now_ts()
0130 data = {
0131 'entry_id': event_entry_id,
0132 'fire_time': now,
0133 'clear_time': None,
0134 'last_seen': now,
0135 'dedupe_key': dedupe_key,
0136 'subject': subject,
0137 'recipients': list(recipients),
0138 'alarm_config_id': alarm_config_uuid,
0139 **extra_data,
0140 }
0141
0142
0143
0144
0145
0146
0147 new_id = new_uuid()
0148 with conn.cursor() as cur:
0149 cur.execute(
0150 """INSERT INTO entry
0151 (id, title, content, kind, context_id, data, status,
0152 archived, timestamp_created, timestamp_modified)
0153 VALUES (%s, %s, %s, 'event', %s, %s::jsonb, 'active',
0154 FALSE, %s, %s)""",
0155 (new_id, subject[:255], body, CONTEXT_NAME,
0156 json.dumps(data, default=str), now, now),
0157 )
0158 return new_id
0159
0160
0161 def touch_event_last_seen(conn, event_uuid: str) -> None:
0162 """Bump data.last_seen (and timestamp_modified) on an active event."""
0163 now = now_ts()
0164 with conn.cursor() as cur:
0165 cur.execute(
0166 """UPDATE entry
0167 SET data = jsonb_set(data, '{last_seen}', to_jsonb(%s::float8), true),
0168 timestamp_modified = %s
0169 WHERE id = %s""",
0170 (now, now, event_uuid),
0171 )
0172
0173
0174 def mark_event_notified(conn, event_uuid: str) -> None:
0175 """Set data.last_notified = now on an event — bumped on every email."""
0176 now = now_ts()
0177 with conn.cursor() as cur:
0178 cur.execute(
0179 """UPDATE entry
0180 SET data = jsonb_set(data, '{last_notified}', to_jsonb(%s::float8), true),
0181 timestamp_modified = %s
0182 WHERE id = %s""",
0183 (now, now, event_uuid),
0184 )
0185
0186
0187 def resolve_recipients(conn, tokens: list[str] | None) -> tuple[list[str], list[str]]:
0188 """Expand @<team> tokens into their member emails using the DB.
0189
0190 Same contract as remote_app.alarms_data.expand_recipients(): returns
0191 (emails, unresolved). Tokens may be emails or @<team>; comma/whitespace
0192 separators already split by the caller.
0193 """
0194 if not tokens:
0195 return [], []
0196 emails: list[str] = []
0197 unresolved: list[str] = []
0198 for tok in tokens:
0199 t = (tok or '').strip()
0200 if not t:
0201 continue
0202 if t.startswith('@'):
0203 with conn.cursor() as cur:
0204 cur.execute(
0205 "SELECT content FROM entry WHERE context_id='teams' "
0206 "AND kind='team' AND name=%s AND archived=FALSE "
0207 "AND deleted_at IS NULL",
0208 (t,),
0209 )
0210 row = cur.fetchone()
0211 if row is None or not (row.get('content') or '').strip():
0212 unresolved.append(t)
0213 continue
0214 for part in _split_tokens(row['content']):
0215 emails.append(part)
0216 else:
0217 emails.append(t)
0218
0219 seen: set[str] = set()
0220 final: list[str] = []
0221 for e in emails:
0222 k = e.lower()
0223 if k in seen:
0224 continue
0225 seen.add(k)
0226 final.append(e)
0227 return final, unresolved
0228
0229
0230 def _split_tokens(s: str) -> list[str]:
0231 for sep in [',', ';', '\n', '\r', '\t']:
0232 s = s.replace(sep, ' ')
0233 return [t.strip() for t in s.split(' ') if t.strip()]
0234
0235
0236 def clear_event(conn, event_uuid: str) -> None:
0237 """Set data.clear_time = now on an event (condition has resolved)."""
0238 now = now_ts()
0239 with conn.cursor() as cur:
0240 cur.execute(
0241 """UPDATE entry
0242 SET data = jsonb_set(data, '{clear_time}', to_jsonb(%s::float8), true),
0243 timestamp_modified = %s
0244 WHERE id = %s""",
0245 (now, now, event_uuid),
0246 )
0247
0248
0249
0250
0251 def start_engine_run(conn) -> str:
0252 """Create a kind='engine_run' entry with started_at; return its UUID."""
0253 now = now_ts()
0254 uid = new_uuid()
0255 title = f"Engine run {datetime.fromtimestamp(now, tz=timezone.utc).strftime('%Y%m%d %H:%M:%S UTC')}"
0256 data = {'entry_id': f'run_{int(now)}', 'started_at': now}
0257 with conn.cursor() as cur:
0258 cur.execute(
0259 """INSERT INTO entry
0260 (id, title, content, kind, context_id, data, status,
0261 archived, timestamp_created, timestamp_modified)
0262 VALUES (%s, %s, '', 'engine_run', %s, %s::jsonb, 'active',
0263 FALSE, %s, %s)""",
0264 (uid, title, CONTEXT_NAME, json.dumps(data), now, now),
0265 )
0266 return uid
0267
0268
0269 def finish_engine_run(conn, run_uuid: str, *, alarms_run: int,
0270 alarms_seen: int, notifications_sent: int,
0271 errors: int, error_details: str = '',
0272 per_alarm: dict | None = None) -> None:
0273 now = now_ts()
0274 update = {
0275 'finished_at': now,
0276 'alarms_run': alarms_run,
0277 'alarms_seen': alarms_seen,
0278 'notifications_sent': notifications_sent,
0279 'errors': errors,
0280 'error_details': error_details,
0281 'per_alarm': per_alarm or {},
0282 }
0283 with conn.cursor() as cur:
0284 cur.execute(
0285 """UPDATE entry
0286 SET data = data || %s::jsonb,
0287 status = 'done',
0288 timestamp_modified = %s
0289 WHERE id = %s""",
0290 (json.dumps(update, default=str), now, run_uuid),
0291 )