Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-26 08:40:21

0001 """Notification channels. Email (SES) first; Mattermost etc. can slot in here.
0002 
0003 Channels take an Alarm + EmailConfig (+ future per-channel config) and return
0004 True on successful send, False on any failure. Failures must be logged but
0005 must not raise — a stuck channel should not block other channels or future
0006 alarms.
0007 """
0008 from __future__ import annotations
0009 
0010 import logging
0011 import os
0012 from dataclasses import dataclass
0013 
0014 import boto3
0015 from botocore.exceptions import BotoCoreError, ClientError
0016 
0017 
0018 log = logging.getLogger(__name__)
0019 
0020 
0021 @dataclass
0022 class Alarm:
0023     alarm_name: str
0024     dedupe_key: str
0025     subject: str
0026     body: str
0027     recipients: list[str]
0028     data: dict
0029 
0030 
0031 def send_email_ses(alarm: Alarm, *, region: str, from_addr: str) -> bool:
0032     kwargs = {'region_name': region}
0033     access_key = os.getenv('AWS_ACCESS_KEY_ID') or os.getenv('AWS_ACCESS_KEY')
0034     secret_key = os.getenv('AWS_SECRET_ACCESS_KEY') or os.getenv('AWS_SECRET_KEY')
0035     if access_key and secret_key:
0036         kwargs['aws_access_key_id'] = access_key
0037         kwargs['aws_secret_access_key'] = secret_key
0038     ses = boto3.client("ses", **kwargs)
0039     try:
0040         resp = ses.send_email(
0041             Source=from_addr,
0042             Destination={"ToAddresses": alarm.recipients},
0043             Message={
0044                 "Subject": {"Data": alarm.subject, "Charset": "UTF-8"},
0045                 "Body": {"Text": {"Data": alarm.body, "Charset": "UTF-8"}},
0046             },
0047         )
0048         log.info("SES send OK: MessageId=%s to=%s", resp.get("MessageId"),
0049                  ",".join(alarm.recipients))
0050         return True
0051     except (BotoCoreError, ClientError) as e:
0052         log.error("SES send FAILED for %s to %s: %s",
0053                   alarm.dedupe_key, alarm.recipients, e)
0054         return False