Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:44

0001 """Shared task-failure-rate helper.
0002 
0003 Yields one Detection per PanDA task whose final-failure rate exceeds the
0004 configured threshold. Uses `computed_finalfailurerate` (nfinalfailed /
0005 (nfinalfailed + nfinished)) — nfinalfailed counts only jobs failed with
0006 attemptnr >= maxattempt (3), i.e. retry-exhausted true failures. Jobs
0007 that failed once or twice and succeeded on retry don't count.
0008 
0009 Rationale (Rahman, NPPS 2026-04-22): nfailed counts every failed job
0010 record including retries that later succeeded, which inflates the rate
0011 and pages on noise. Alarms should trigger only on true failures.
0012 
0013 Falls back to `computed_failurerate` (all-failures rate) with a stderr
0014 warning if the upstream swf-monitor doesn't yet expose the new field —
0015 covers the window while swf-monitor is redeployed.
0016 
0017 Alarm modules import and delegate to `task_failure_rate(client, params)`.
0018 No central registry.
0019 
0020 Params (read from the alarm config entry's data.params):
0021 
0022   threshold          float, required. e.g. 0.03 for 3%.
0023   since_days         int, default 1. How far back to look at PanDA data.
0024   username           str, optional (supports % LIKE wildcard upstream).
0025   processingtype     str, optional.
0026   min_terminal_jobs  int, default 5. Noise floor: tasks with fewer
0027                      terminal jobs (nfinalfailed + nfinished) are skipped.
0028   statuses           list[str], optional. Task statuses to consider;
0029                      defaults to ['running', 'failed', 'broken'].
0030 """
0031 from __future__ import annotations
0032 
0033 from . import Detection
0034 
0035 
0036 DEFAULT_STATUSES = ["running", "failed", "broken"]
0037 
0038 # Param schema consumed by the shared helper. Alarm modules may re-export
0039 # or extend this. Each value is a dict with keys:
0040 #   type          python type used for the param (float, int, str, list)
0041 #   required      True if this param has no default and must be supplied
0042 #   default       default value when the alarm config omits the key
0043 #   description   human-readable one-liner for the editor help panel
0044 PARAMS: dict[str, dict] = {
0045     "threshold": {
0046         "type": float, "required": True,
0047         "description": "failure-rate threshold (e.g. 0.03 = 3%)",
0048     },
0049     "since_days": {
0050         "type": int, "default": 1,
0051         "description": "look back this many days into PanDA",
0052     },
0053     "username": {
0054         "type": str,
0055         "description": "optional task-owner filter (supports % LIKE)",
0056     },
0057     "processingtype": {
0058         "type": str,
0059         "description": "optional PanDA processingtype filter",
0060     },
0061     "min_terminal_jobs": {
0062         "type": int, "default": 5,
0063         "description": "ignore tasks with fewer finished+failed jobs than this",
0064     },
0065     "statuses": {
0066         "type": list,
0067         "description": "task statuses to consider; default running/failed/broken",
0068     },
0069 }
0070 
0071 
0072 def task_failure_rate(client, params: dict):
0073     import logging
0074     _logger = logging.getLogger(__name__)
0075     _warned_fallback = False
0076 
0077     threshold = float(params["threshold"])
0078     since_days = int(params.get("since_days", 1))
0079     username = params.get("username")
0080     processingtype = params.get("processingtype")
0081     min_terminal = int(params.get("min_terminal_jobs", 5))
0082     statuses = params.get("statuses") or DEFAULT_STATUSES
0083 
0084     for status in statuses:
0085         for t in client.iter_all_tasks(
0086             days=since_days, status=status,
0087             username=username,
0088             processingtype=processingtype,
0089         ):
0090             # Prefer retry-exhausted-failures rate (true failures).
0091             # Fall back to all-failures rate only if upstream doesn't
0092             # expose the new field (stale swf-monitor).
0093             cfr = t.get("computed_finalfailurerate")
0094             using_finalrate = cfr is not None
0095             if not using_finalrate:
0096                 cfr = t.get("computed_failurerate")
0097                 if cfr is not None and not _warned_fallback:
0098                     _logger.warning(
0099                         "task_failure_rate: upstream swf-monitor lacks "
0100                         "computed_finalfailurerate; falling back to "
0101                         "computed_failurerate (all failures, not retry-"
0102                         "exhausted). Deploy swf-monitor to activate the "
0103                         "nfinalfailed-based trigger."
0104                     )
0105                     _warned_fallback = True
0106             if cfr is None:
0107                 continue
0108             nfinished = int(t.get("nfinished") or 0)
0109             nfailed_all = int(t.get("nfailed") or 0)
0110             nfinalfailed = (
0111                 int(t.get("nfinalfailed") or 0) if using_finalrate
0112                 else nfailed_all
0113             )
0114             if nfinalfailed + nfinished < min_terminal:
0115                 continue
0116             if cfr < threshold:
0117                 continue
0118 
0119             jeditaskid = t["jeditaskid"]
0120             rate_label = (
0121                 "final-failure rate" if using_finalrate
0122                 else "failure rate (fallback — swf-monitor stale)"
0123             )
0124             yield Detection(
0125                 dedupe_key=f"task:{jeditaskid}",
0126                 subject=(
0127                     f"task {jeditaskid} ({t.get('status') or '?'}) "
0128                     f"{rate_label} {cfr*100:.1f}% — "
0129                     f"{t.get('taskname') or '?'}"
0130                 ),
0131                 body_context=_body_detail(
0132                     jeditaskid=jeditaskid,
0133                     taskname=t.get("taskname") or "?",
0134                     task_status=t.get("status") or "?",
0135                     task_user=t.get("username") or "?",
0136                     site=t.get("site") or "?",
0137                     cfr=cfr, nfailed=nfinalfailed, nfinished=nfinished,
0138                     nactive=int(t.get("nactive") or 0),
0139                     threshold=threshold, since_days=since_days,
0140                     native_failurerate=t.get("failurerate"),
0141                     rate_kind=rate_label,
0142                 ),
0143                 extra_data={
0144                     "metric": f"{cfr*100:.1f}%",
0145                     "rate_kind": (
0146                         "final-failure" if using_finalrate
0147                         else "all-failures-fallback"
0148                     ),
0149                     "jeditaskid": jeditaskid,
0150                     "taskname": t.get("taskname"),
0151                     "status": t.get("status"),
0152                     "username": t.get("username"),
0153                     "site": t.get("site"),
0154                     "computed_failurerate": t.get("computed_failurerate"),
0155                     "computed_finalfailurerate": t.get(
0156                         "computed_finalfailurerate"),
0157                     "native_failurerate": t.get("failurerate"),
0158                     "nactive": int(t.get("nactive") or 0),
0159                     "nfinished": nfinished,
0160                     "nfailed": nfailed_all,
0161                     "nfinalfailed": nfinalfailed,
0162                     "nretries": int(t.get("nretries") or 0),
0163                     "threshold": threshold,
0164                     "since_days": since_days,
0165                 },
0166             )
0167 
0168 
0169 def _body_detail(**k) -> str:
0170     native = k["native_failurerate"]
0171     native_line = (
0172         f"JEDI native failurerate: {native} (computed is the operative signal)"
0173         if native is not None
0174         else "JEDI native failurerate: NULL (expected — not populated for ePIC task types)"
0175     )
0176     rate_kind = k.get("rate_kind", "final-failure rate")
0177     return (
0178         f"PanDA task {k['jeditaskid']} — {k['taskname']}\n"
0179         f"Status:      {k['task_status']}\n"
0180         f"Owner:       {k['task_user']}\n"
0181         f"Site:        {k['site']}\n"
0182         f"\n"
0183         f"{rate_kind}: {k['cfr']*100:.1f}%  (threshold {k['threshold']*100:.1f}%)\n"
0184         f"Jobs: nfinalfailed={k['nfailed']}  nfinished={k['nfinished']}  nactive={k['nactive']}\n"
0185         f"(nfinalfailed = failed jobs with attemptnr >= maxattempt 3 — retry-exhausted true failures)\n"
0186         f"Since: last {k['since_days']} day(s)\n"
0187         f"{native_line}\n"
0188         f"\n"
0189         f"Task page: https://epic-devcloud.org/prod/panda/tasks/{k['jeditaskid']}/\n"
0190     )