Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-17 07:46:15

0001 #!/usr/bin/env python3
0002 # /// script
0003 # requires-python = ">=3.12"
0004 # dependencies = [
0005 #     "typer",
0006 #     "rich",
0007 #     "pydantic",
0008 #     "pyyaml",
0009 # ]
0010 # ///
0011 """
0012 Run clang-tidy on source files and emit GitHub Actions annotations.
0013 
0014 By default, only files changed relative to a base git ref are analysed
0015 (PR mode).  Pass --all to analyse every file in compile_commands.json.
0016 
0017 Sub-commands:
0018   analyze   Collect targets, run clang-tidy, and optionally persist fixes.
0019   annotate  Emit GitHub Actions annotations from a fixes YAML.
0020 """
0021 
0022 import asyncio
0023 import json
0024 import os
0025 import re
0026 import shutil
0027 import subprocess
0028 import sys
0029 import tempfile
0030 from multiprocessing import cpu_count
0031 from pathlib import Path, PurePosixPath
0032 from typing import Annotated
0033 
0034 import typer
0035 import yaml
0036 from pydantic import BaseModel, Field
0037 from rich.console import Console, Group
0038 from rich.panel import Panel
0039 from rich.progress import (
0040     BarColumn,
0041     MofNCompleteColumn,
0042     Progress,
0043     TextColumn,
0044     TimeRemainingColumn,
0045     TaskID,
0046 )
0047 from rich.syntax import Syntax
0048 from rich.text import Text
0049 
0050 app = typer.Typer()
0051 console = Console(stderr=True, width=None if sys.stderr.isatty() else 120)
0052 
0053 SOURCE_SUFFIXES = {".cpp", ".cxx", ".cc", ".c"}
0054 HEADER_SUFFIXES = {".hpp", ".hxx", ".hh", ".h", ".ipp"}
0055 
0056 
0057 # ---------------------------------------------------------------------------
0058 #  Models
0059 # ---------------------------------------------------------------------------
0060 
0061 
0062 class FilterConfig(BaseModel):
0063     exclude_path_regexes: list[str] = Field(default_factory=list)
0064     exclude_check_regexes: list[str] = Field(default_factory=list)
0065     exclude_message_regexes: list[str] = Field(default_factory=list)
0066     severity: str = "error"
0067 
0068 
0069 class Replacement(BaseModel):
0070     model_config = {"populate_by_name": True}
0071 
0072     file_path: str = Field(default="", alias="FilePath")
0073     offset: int = Field(default=0, alias="Offset")
0074     length: int = Field(default=0, alias="Length")
0075     replacement_text: str = Field(default="", alias="ReplacementText")
0076 
0077 
0078 class DiagMessage(BaseModel):
0079     model_config = {"populate_by_name": True}
0080 
0081     message: str = Field(default="", alias="Message")
0082     file_path: str = Field(default="", alias="FilePath")
0083     file_offset: int | None = Field(default=None, alias="FileOffset")
0084     replacements: list[Replacement] = Field(default_factory=list, alias="Replacements")
0085 
0086 
0087 class Diagnostic(BaseModel):
0088     model_config = {"populate_by_name": True}
0089 
0090     name: str = Field(default="unknown", alias="DiagnosticName")
0091     message: DiagMessage = Field(default_factory=DiagMessage, alias="DiagnosticMessage")
0092     analyzed_files: list[str] = Field(default_factory=list, alias="AnalyzedFiles")
0093 
0094 
0095 class FixesFile(BaseModel):
0096     model_config = {"populate_by_name": True}
0097 
0098     diagnostics: list[Diagnostic] = Field(default_factory=list, alias="Diagnostics")
0099     analyzed_file: str = Field(default="", alias="AnalyzedFile")
0100 
0101 
0102 class ParsedDiagnostic(BaseModel):
0103     check: str
0104     message: str
0105     path: Path
0106     line: int = 0
0107     col: int = 0
0108     abs_path: str = ""
0109     rel_path: str = ""
0110     suggestion: str = ""
0111     analyzed_files: list[str] = Field(default_factory=list)
0112 
0113 
0114 # ---------------------------------------------------------------------------
0115 #  Helpers
0116 # ---------------------------------------------------------------------------
0117 
0118 
0119 def get_source_root() -> Path:
0120     # This script lives at <repo>/CI/clang_tidy/run_clang_tidy_pr.py
0121     return Path(__file__).resolve().parent.parent.parent
0122 
0123 
0124 def load_filter_config(filter_config: Path | None) -> FilterConfig:
0125     if filter_config is not None and filter_config.exists():
0126         data = yaml.safe_load(filter_config.read_text()) or {}
0127         return FilterConfig.model_validate(data)
0128     return FilterConfig()
0129 
0130 
0131 def load_compdb(build_dir: Path) -> set[Path]:
0132     compdb_path = build_dir / "compile_commands.json"
0133     entries = json.loads(compdb_path.read_text())
0134 
0135     deps_dir = (build_dir / "_deps").resolve()
0136     files: set[Path] = set()
0137     for entry in entries:
0138         filepath = Path(entry.get("file", ""))
0139         if not filepath.is_absolute():
0140             filepath = Path(entry.get("directory", "")) / filepath
0141         filepath = filepath.resolve()
0142         if not filepath.is_relative_to(deps_dir):
0143             files.add(filepath)
0144     return files
0145 
0146 
0147 # ---------------------------------------------------------------------------
0148 #  Target collection
0149 # ---------------------------------------------------------------------------
0150 
0151 
0152 def _is_git_repo(source_root: Path) -> bool:
0153     return (source_root / ".git").exists()
0154 
0155 
0156 def _get_changed_files_git(base_ref: str) -> list[str]:
0157     result = subprocess.run(
0158         ["git", "diff", "--name-only", "--diff-filter=ACM", f"{base_ref}...HEAD"],
0159         capture_output=True,
0160         text=True,
0161         check=True,
0162     )
0163     return [line.strip() for line in result.stdout.splitlines() if line.strip()]
0164 
0165 
0166 def _get_changed_files_jj(base_ref: str) -> list[str]:
0167     result = subprocess.run(
0168         ["jj", "diff", "--from", base_ref, "--name-only"],
0169         capture_output=True,
0170         text=True,
0171         check=True,
0172     )
0173     return [line.strip() for line in result.stdout.splitlines() if line.strip()]
0174 
0175 
0176 def get_changed_files(base_ref: str, source_root: Path) -> list[str]:
0177     if _is_git_repo(source_root):
0178         console.print("Detected git repository")
0179         return _get_changed_files_git(base_ref)
0180     console.print("No .git found, using jj")
0181     return _get_changed_files_jj(base_ref)
0182 
0183 
0184 def find_header_tu(
0185     header_abs_path: Path, source_root: Path, compdb_files: set[Path]
0186 ) -> Path | None:
0187     """Find the generated TU for a header by suffix-matching against the compdb.
0188 
0189     acts_compile_headers() generates TUs like:
0190       <build_dir>/<rel_path>/Foo.hpp.cpp
0191     We compute the header's path relative to the repo root and look for a
0192     compdb entry ending with that relative path + ``.cpp``.
0193     """
0194     rel = header_abs_path.relative_to(source_root, walk_up=True)
0195     suffix = "/" + str(rel) + ".cpp"
0196     for f in compdb_files:
0197         if str(f).endswith(suffix):
0198             return f
0199     return None
0200 
0201 
0202 def is_path_excluded(path: str, exclude_path_regexes: list[str]) -> bool:
0203     return any(re.search(p, path) for p in exclude_path_regexes)
0204 
0205 
0206 def resolve_targets(
0207     repo_paths: list[str],
0208     source_root: Path,
0209     compdb_files: set[Path],
0210     exclude_path_regexes: list[str] | None = None,
0211 ) -> list[Path]:
0212     """Classify repo-relative paths into sources and headers, resolve headers
0213     to their generated TUs, and return the list of clang-tidy targets that
0214     exist in compile_commands.json."""
0215     if exclude_path_regexes is None:
0216         exclude_path_regexes = []
0217 
0218     sources: list[str] = []
0219     headers: list[str] = []
0220     for f in repo_paths:
0221         ext = PurePosixPath(f).suffix.lower()
0222         if ext in SOURCE_SUFFIXES:
0223             sources.append(f)
0224         elif ext in HEADER_SUFFIXES:
0225             headers.append(f)
0226 
0227     console.print(f"  {len(sources)} source files, {len(headers)} headers")
0228 
0229     targets: set[Path] = set()
0230 
0231     for src in sources:
0232         abs_path = (source_root / src).resolve()
0233         if not abs_path.exists():
0234             console.print(f"  SKIP source {src} (file no longer exists)")
0235             continue
0236         if is_path_excluded(str(abs_path), exclude_path_regexes):
0237             console.print(f"  SKIP source {src} (excluded by filter)")
0238             continue
0239         if abs_path in compdb_files:
0240             targets.add(abs_path)
0241         else:
0242             console.print(f"  SKIP source {src} (not in compile_commands.json)")
0243 
0244     for hdr in headers:
0245         abs_path = (source_root / hdr).resolve()
0246         if not abs_path.exists():
0247             console.print(f"  SKIP header {hdr} (file no longer exists)")
0248             continue
0249         if is_path_excluded(str(abs_path), exclude_path_regexes):
0250             console.print(f"  SKIP header {hdr} (excluded by filter)")
0251             continue
0252         tu = find_header_tu(abs_path, source_root, compdb_files)
0253         if tu is not None:
0254             targets.add(tu)
0255         else:
0256             console.print(f"  SKIP header {hdr} (no TU in compile_commands.json)")
0257 
0258     console.print(f"Total targets: {len(targets)}")
0259     return sorted(targets)
0260 
0261 
0262 def collect_changed_targets(
0263     base_ref: str,
0264     source_root: Path,
0265     compdb_files: set[Path],
0266     exclude_path_regexes: list[str] | None = None,
0267 ) -> list[Path]:
0268     changed = get_changed_files(base_ref, source_root)
0269     console.print(f"Found {len(changed)} changed files")
0270 
0271     return resolve_targets(changed, source_root, compdb_files, exclude_path_regexes)
0272 
0273 
0274 def collect_targets_from_fixes(
0275     fixes_file: Path,
0276     source_root: Path,
0277     compdb_files: set[Path],
0278     exclude_path_regexes: list[str] | None = None,
0279 ) -> list[Path]:
0280     """Extract file paths from a previous fixes YAML and resolve them as targets."""
0281     fixes = FixesFile.model_validate(yaml.safe_load(fixes_file.read_text()) or {})
0282     paths: set[str] = set()
0283     for diag in fixes.diagnostics:
0284         fp = diag.message.file_path
0285         if fp:
0286             paths.add(fp)
0287 
0288     console.print(f"Found {len(paths)} unique file(s) in {fixes_file}")
0289 
0290     # Normalize to repo-relative paths for resolve_targets
0291     repo_paths: list[str] = []
0292     for p in paths:
0293         rel = str(Path(p).relative_to(source_root, walk_up=True))
0294         repo_paths.append(rel)
0295 
0296     return resolve_targets(repo_paths, source_root, compdb_files, exclude_path_regexes)
0297 
0298 
0299 def collect_all_targets(
0300     compdb_files: set[Path],
0301     exclude_path_regexes: list[str] | None = None,
0302 ) -> list[Path]:
0303     if exclude_path_regexes is None:
0304         exclude_path_regexes = []
0305     targets = sorted(
0306         f
0307         for f in compdb_files
0308         if f.suffix in SOURCE_SUFFIXES
0309         and not is_path_excluded(str(f), exclude_path_regexes)
0310     )
0311     console.print(f"Total targets: {len(targets)}")
0312     return targets
0313 
0314 
0315 def _macos_sysroot() -> str | None:
0316     """Return the macOS SDK path, or None if not on macOS / xcrun fails."""
0317     if sys.platform != "darwin":
0318         return None
0319     try:
0320         result = subprocess.run(
0321             ["xcrun", "--show-sdk-path"],
0322             capture_output=True,
0323             text=True,
0324             check=True,
0325         )
0326         sdk = result.stdout.strip()
0327         if sdk:
0328             console.print(f"Using macOS sysroot: {sdk}")
0329             return sdk
0330     except (subprocess.CalledProcessError, FileNotFoundError):
0331         pass
0332     return None
0333 
0334 
0335 # ---------------------------------------------------------------------------
0336 #  clang-tidy execution
0337 # ---------------------------------------------------------------------------
0338 
0339 
0340 async def run_clang_tidy_on_targets(
0341     targets: list[Path],
0342     build_dir: Path,
0343     fixes_dir: Path,
0344     jobs: int,
0345     clang_tidy: str,
0346     verbose: bool = False,
0347     trace_includes: bool = False,
0348 ) -> None:
0349     fixes_dir.mkdir(parents=True, exist_ok=True)
0350     sem = asyncio.Semaphore(jobs)
0351 
0352     sysroot = _macos_sysroot()
0353     extra_args: list[str] = []
0354     if sysroot:
0355         extra_args += [f"--extra-arg=-isysroot", f"--extra-arg={sysroot}"]
0356     if trace_includes:
0357         extra_args.append("--extra-arg=-H")
0358 
0359     progress = Progress(
0360         TextColumn("[progress.description]{task.description}"),
0361         BarColumn(),
0362         MofNCompleteColumn(),
0363         TimeRemainingColumn(),
0364         console=console,
0365     )
0366 
0367     async def analyse(file: Path, idx: int, task_id: TaskID) -> None:
0368         yaml_path = fixes_dir / f"{idx}.yaml"
0369         cmd = [
0370             clang_tidy,
0371             "-p",
0372             str(build_dir),
0373             str(file),
0374             "--quiet",
0375             f"--export-fixes={yaml_path}",
0376             "-header-filter=.*",  # export fixes for headers, even when analyzing a TU
0377             *extra_args,
0378         ]
0379         async with sem:
0380             if verbose or trace_includes:
0381                 proc = await asyncio.create_subprocess_exec(
0382                     *cmd,
0383                     stdout=asyncio.subprocess.PIPE,
0384                     stderr=asyncio.subprocess.STDOUT,
0385                 )
0386                 stdout, _ = await proc.communicate()
0387                 output = stdout.decode(errors="replace").strip()
0388                 if output:
0389                     console.print(f"\n[bold]{file}[/]")
0390                     console.print(output)
0391             else:
0392                 proc = await asyncio.create_subprocess_exec(
0393                     *cmd,
0394                     stdout=asyncio.subprocess.DEVNULL,
0395                     stderr=asyncio.subprocess.DEVNULL,
0396                 )
0397                 await proc.wait()
0398 
0399         # Inject the analyzed file into the YAML so we can trace diagnostics
0400         # back to the TU that produced them.
0401         if yaml_path.exists():
0402             data = yaml.safe_load(yaml_path.read_text()) or {}
0403             data["AnalyzedFile"] = str(file)
0404             yaml_path.write_text(
0405                 yaml.dump(data, default_flow_style=False, sort_keys=False)
0406             )
0407 
0408         console.log(file)
0409         progress.advance(task_id)
0410 
0411     with progress:
0412         task_id = progress.add_task("clang-tidy", total=len(targets))
0413         async with asyncio.TaskGroup() as tg:
0414             for idx, file in enumerate(targets):
0415                 tg.create_task(analyse(file, idx, task_id))
0416 
0417 
0418 # ---------------------------------------------------------------------------
0419 #  Fixes merging
0420 # ---------------------------------------------------------------------------
0421 
0422 
0423 def write_empty_fixes(output: Path) -> None:
0424     """Write an empty fixes YAML so downstream steps always find the file."""
0425     output.parent.mkdir(parents=True, exist_ok=True)
0426     output.write_text(yaml.dump({"Diagnostics": []}, default_flow_style=False))
0427     console.print(f"Wrote empty fixes to {output}")
0428 
0429 
0430 def merge_fixes_yaml(fixes_dir: Path, output: Path) -> None:
0431     """Merge and deduplicate all per-TU export-fixes YAML files into a single file.
0432 
0433     When the same diagnostic appears from multiple TUs, the ``AnalyzedFiles``
0434     list on the merged diagnostic collects all originating files.
0435     """
0436     yaml_files = sorted(fixes_dir.glob("*.yaml"))
0437 
0438     # Collect diagnostics, tagging each with its originating analyzed file.
0439     all_diagnostics: list[Diagnostic] = []
0440     for yf in yaml_files:
0441         fixes = FixesFile.model_validate(yaml.safe_load(yf.read_text()) or {})
0442         analyzed_file = fixes.analyzed_file
0443         for d in fixes.diagnostics:
0444             if analyzed_file:
0445                 d.analyzed_files = [analyzed_file]
0446             all_diagnostics.append(d)
0447 
0448     # Deduplicate by (file, offset, check), merging analyzed_files lists.
0449     seen: dict[tuple, Diagnostic] = {}
0450     for d in all_diagnostics:
0451         key = (d.message.file_path, d.message.file_offset, d.name)
0452         if key in seen:
0453             existing = seen[key]
0454             for af in d.analyzed_files:
0455                 if af not in existing.analyzed_files:
0456                     existing.analyzed_files.append(af)
0457         else:
0458             seen[key] = d
0459 
0460     unique = list(seen.values())
0461     deduped = len(all_diagnostics) - len(unique)
0462     if deduped:
0463         console.print(f"Deduplicated {deduped} diagnostic(s).")
0464 
0465     merged = {"Diagnostics": [d.model_dump(by_alias=True) for d in unique]}
0466     output.parent.mkdir(parents=True, exist_ok=True)
0467     output.write_text(yaml.dump(merged, default_flow_style=False, sort_keys=False))
0468     console.print(f"Wrote {len(unique)} diagnostic(s) to {output}")
0469 
0470 
0471 # ---------------------------------------------------------------------------
0472 #  Annotation
0473 # ---------------------------------------------------------------------------
0474 
0475 
0476 def offset_to_line_col(raw: bytes, offset: int) -> tuple[int, int]:
0477     """Convert a byte offset into (1-based line, 1-based column)."""
0478     line = raw[:offset].count(b"\n") + 1
0479     last_nl = raw.rfind(b"\n", 0, offset)
0480     col = offset - last_nl  # works even when last_nl == -1
0481     return line, col
0482 
0483 
0484 def build_suggestion(
0485     source: str,
0486     source_bytes: bytes,
0487     diag_file: str,
0488     replacements: list[Replacement],
0489 ) -> str:
0490     """Build a clang-tidy-style suggestion from replacements.
0491 
0492     Produces output like::
0493 
0494         42 |   ContextType(const T& value) : m_data{value} {}
0495            |   ^
0496            |   explicit
0497 
0498     Only considers replacements that target the same file as the diagnostic.
0499     Returns an empty string when no applicable replacements exist.
0500     """
0501     applicable = [r for r in replacements if r.file_path == diag_file]
0502     if not applicable or not source_bytes:
0503         return ""
0504 
0505     lines = source.splitlines()
0506     parts: list[str] = []
0507 
0508     for repl in sorted(applicable, key=lambda r: r.offset):
0509         line_no, col = offset_to_line_col(source_bytes, repl.offset)
0510         if line_no < 1 or line_no > len(lines):
0511             continue
0512 
0513         source_line = lines[line_no - 1]
0514         line_num_width = len(str(line_no))
0515         gutter = " " * line_num_width
0516 
0517         parts.append(f"{line_no} | {source_line}")
0518 
0519         # col is 1-based; we need (col - 1) spaces to reach the caret
0520         indent = " " * (col - 1)
0521         if repl.length > 1:
0522             # Show ^~~~ spanning the replaced range
0523             span = "^" + "~" * (repl.length - 1)
0524         else:
0525             span = "^"
0526         parts.append(f"{gutter} | {indent}{span}")
0527 
0528         # Show the replacement text (may be multi-line)
0529         fix_lines = repl.replacement_text.splitlines() or [""]
0530         for fl in fix_lines:
0531             parts.append(f"{gutter} | {indent}{fl}")
0532 
0533     return "\n".join(parts)
0534 
0535 
0536 def parse_fixes_yaml(path: Path) -> list[ParsedDiagnostic]:
0537     fixes = FixesFile.model_validate(yaml.safe_load(path.read_text()) or {})
0538 
0539     results: list[ParsedDiagnostic] = []
0540     file_cache: dict[Path, tuple[str, bytes]] = {}
0541 
0542     for diag in fixes.diagnostics:
0543         msg = diag.message
0544         if not msg.file_path:
0545             continue
0546 
0547         filepath = Path(msg.file_path)
0548         line = 0
0549         col = 0
0550         if msg.file_offset is not None and msg.file_offset >= 0:
0551             if filepath not in file_cache:
0552                 try:
0553                     raw = filepath.read_bytes()
0554                     file_cache[filepath] = (raw.decode(errors="replace"), raw)
0555                 except OSError:
0556                     file_cache[filepath] = ("", b"")
0557             source, source_bytes = file_cache[filepath]
0558             line, col = offset_to_line_col(source_bytes, msg.file_offset)
0559         else:
0560             source = ""
0561             source_bytes = b""
0562 
0563         suggestion = build_suggestion(
0564             source, source_bytes, msg.file_path, msg.replacements
0565         )
0566 
0567         results.append(
0568             ParsedDiagnostic(
0569                 check=diag.name,
0570                 message=msg.message,
0571                 path=filepath,
0572                 line=line,
0573                 col=col,
0574                 suggestion=suggestion,
0575                 analyzed_files=list(diag.analyzed_files),
0576             )
0577         )
0578     return results
0579 
0580 
0581 def normalize_path(filepath: Path, source_root: Path) -> str:
0582     """Make an absolute path repo-relative.  Falls back to the original for
0583     paths outside source_root (e.g. system headers)."""
0584     return str(filepath.relative_to(source_root, walk_up=True))
0585 
0586 
0587 def is_excluded(diag: ParsedDiagnostic, config: FilterConfig) -> str | None:
0588     """Check whether a diagnostic should be excluded.
0589 
0590     Returns ``None`` if the diagnostic passes all filters, or a human-readable
0591     reason string describing why it was excluded.
0592     """
0593     for pattern in config.exclude_path_regexes:
0594         if re.search(pattern, diag.abs_path):
0595             return f"path matches exclude pattern '{pattern}'"
0596     for pattern in config.exclude_check_regexes:
0597         if re.search(pattern, diag.check):
0598             return f"check matches exclude pattern '{pattern}'"
0599     for pattern in config.exclude_message_regexes:
0600         if re.search(pattern, diag.message):
0601             return f"message matches exclude pattern '{pattern}'"
0602     return None
0603 
0604 
0605 def emit_annotations(
0606     diagnostics: list[ParsedDiagnostic],
0607     source_root: Path,
0608     config: FilterConfig,
0609     severity: str,
0610     verbose: bool = False,
0611     github_annotate: bool = True,
0612 ) -> int:
0613     """Normalize paths, deduplicate, filter, and emit GH Actions annotations.
0614     Returns 1 if any diagnostics remain, 0 otherwise."""
0615     severity = config.severity or severity
0616 
0617     for diag in diagnostics:
0618         diag.abs_path = str(diag.path)
0619         diag.rel_path = normalize_path(diag.path, source_root)
0620 
0621     # Deduplicate, merging analyzed_files lists
0622     seen_map: dict[tuple, ParsedDiagnostic] = {}
0623     for diag in diagnostics:
0624         key = (diag.rel_path, diag.line, diag.col, diag.check)
0625         if key in seen_map:
0626             existing = seen_map[key]
0627             for af in diag.analyzed_files:
0628                 if af not in existing.analyzed_files:
0629                     existing.analyzed_files.append(af)
0630             if verbose:
0631                 console.print(
0632                     f"  [dim]DEDUP[/] {diag.rel_path}:{diag.line}:{diag.col}"
0633                     f" [{diag.check}] {diag.message}"
0634                 )
0635         else:
0636             seen_map[key] = diag
0637     unique = list(seen_map.values())
0638 
0639     if verbose and len(diagnostics) != len(unique):
0640         console.print(f"Deduplicated {len(diagnostics) - len(unique)} diagnostic(s).")
0641     diagnostics = unique
0642 
0643     remaining: list[ParsedDiagnostic] = []
0644     excluded_count = 0
0645     for d in diagnostics:
0646         reason = is_excluded(d, config)
0647         if reason is not None:
0648             excluded_count += 1
0649             if verbose:
0650                 console.print(
0651                     f"  [dim]EXCLUDE[/] {d.rel_path}:{d.line}:{d.col}"
0652                     f" [{d.check}] {d.message} -- {reason}"
0653                 )
0654         else:
0655             remaining.append(d)
0656 
0657     console.print(
0658         f"After filtering: {len(remaining)} remaining, {excluded_count} excluded."
0659     )
0660 
0661     # Group by file for rich output
0662     by_file: dict[str, list[ParsedDiagnostic]] = {}
0663     for diag in remaining:
0664         by_file.setdefault(diag.rel_path, []).append(diag)
0665 
0666     file_cache: dict[str, str] = {}
0667 
0668     for rel_path, file_diags in sorted(by_file.items()):
0669         for diag in sorted(file_diags, key=lambda d: d.line):
0670             # GH Actions annotation on stdout
0671             if github_annotate:
0672                 body = diag.message
0673                 if diag.suggestion:
0674                     body += "\n" + diag.suggestion
0675                 body = body.replace("%", "%25")
0676                 body = body.replace("\r", "%0D")
0677                 body = body.replace("\n", "%0A")
0678                 print(
0679                     f"::{severity} file={diag.rel_path},line={diag.line},col={diag.col},title={diag.check}::{body}",
0680                 )
0681 
0682             # Rich panel on stderr for the CI log
0683             if diag.line > 0 and diag.abs_path:
0684                 if diag.abs_path not in file_cache:
0685                     try:
0686                         file_cache[diag.abs_path] = Path(diag.abs_path).read_text(
0687                             errors="replace"
0688                         )
0689                     except OSError:
0690                         file_cache[diag.abs_path] = ""
0691 
0692                 source = file_cache[diag.abs_path]
0693                 if source:
0694                     ctx = 3
0695                     start = max(1, diag.line - ctx)
0696                     end = diag.line + ctx
0697                     syntax = Syntax(
0698                         source,
0699                         lexer="cpp",
0700                         line_numbers=True,
0701                         line_range=(start, end),
0702                         highlight_lines={diag.line},
0703                     )
0704                     title = Text(diag.check, style="bold red")
0705                     renderables: list = [
0706                         Text.assemble(
0707                             (f"{rel_path}:{diag.line}:{diag.col}", "dim"),
0708                         ),
0709                         syntax,
0710                     ]
0711                     renderables.append(
0712                         Panel(
0713                             Text(diag.message, style="yellow"),
0714                             border_style="dim",
0715                             title="message",
0716                             title_align="left",
0717                         )
0718                     )
0719                     if diag.suggestion:
0720                         renderables.append(
0721                             Panel(
0722                                 Syntax(
0723                                     diag.suggestion,
0724                                     lexer="text",
0725                                     line_numbers=False,
0726                                     theme="ansi_dark",
0727                                 ),
0728                                 border_style="green",
0729                                 title="suggestion",
0730                                 title_align="left",
0731                             )
0732                         )
0733                     if diag.analyzed_files:
0734                         af_lines = "\n".join(diag.analyzed_files)
0735                         renderables.append(
0736                             Panel(
0737                                 Text(af_lines, style="cyan"),
0738                                 border_style="dim",
0739                                 title=f"analyzed from ({len(diag.analyzed_files)})",
0740                                 title_align="left",
0741                             )
0742                         )
0743                     panel = Panel(
0744                         Group(*renderables),
0745                         title=title,
0746                         title_align="left",
0747                         border_style="red",
0748                     )
0749                     console.print(panel)
0750                     continue
0751 
0752             # Fallback: no source available
0753             console.print(
0754                 f"[bold red]{diag.check}[/] {diag.rel_path}:{diag.line}:{diag.col}"
0755             )
0756             console.print(f"  [yellow]{diag.message}[/]")
0757             if diag.analyzed_files:
0758                 console.print(
0759                     f"  [dim]analyzed from:[/] [cyan]{', '.join(diag.analyzed_files)}[/]"
0760                 )
0761 
0762     if remaining:
0763         console.print(
0764             f"\n[bold red]{len(remaining)} clang-tidy diagnostic(s) "
0765             f"across {len(by_file)} file(s).[/]"
0766         )
0767         return 1
0768     console.print("[bold green]No clang-tidy diagnostics (after filtering).[/]")
0769     return 0
0770 
0771 
0772 def annotate_from_fixes_dir(
0773     fixes_dir: Path,
0774     source_root: Path,
0775     config: FilterConfig,
0776     severity: str,
0777     verbose: bool = False,
0778     github_annotate: bool = True,
0779 ) -> int:
0780     yaml_files = sorted(fixes_dir.glob("*.yaml"))
0781     if not yaml_files:
0782         console.print("No export-fixes YAML files found.")
0783         return 0
0784 
0785     console.print(f"Parsing {len(yaml_files)} YAML file(s)...")
0786 
0787     all_diagnostics: list[ParsedDiagnostic] = []
0788     for yf in yaml_files:
0789         all_diagnostics.extend(parse_fixes_yaml(yf))
0790 
0791     console.print(f"Found {len(all_diagnostics)} total diagnostic(s).")
0792     return emit_annotations(
0793         all_diagnostics,
0794         source_root,
0795         config,
0796         severity,
0797         verbose=verbose,
0798         github_annotate=github_annotate,
0799     )
0800 
0801 
0802 def annotate_from_fixes_file(
0803     fixes_file: Path,
0804     source_root: Path,
0805     config: FilterConfig,
0806     severity: str,
0807     verbose: bool = False,
0808     github_annotate: bool = True,
0809 ) -> int:
0810     console.print(f"Parsing {fixes_file}...")
0811     all_diagnostics = parse_fixes_yaml(fixes_file)
0812     console.print(f"Found {len(all_diagnostics)} total diagnostic(s).")
0813     return emit_annotations(
0814         all_diagnostics,
0815         source_root,
0816         config,
0817         severity,
0818         verbose=verbose,
0819         github_annotate=github_annotate,
0820     )
0821 
0822 
0823 # ---------------------------------------------------------------------------
0824 #  Commands
0825 # ---------------------------------------------------------------------------
0826 
0827 
0828 @app.command()
0829 def analyze(
0830     build_dir: Annotated[Path, typer.Argument(help="Build directory")],
0831     output_fixes: Annotated[
0832         Path, typer.Argument(help="Output path for merged fixes YAML")
0833     ],
0834     files: Annotated[
0835         list[Path] | None,
0836         typer.Argument(
0837             help="Explicit file paths to analyse (bypasses --base-ref / --all)"
0838         ),
0839     ] = None,
0840     base_ref: Annotated[
0841         str | None,
0842         typer.Option(
0843             "--base-ref",
0844             "-b",
0845             help="Git ref to diff against (required unless --all or files are given)",
0846         ),
0847     ] = None,
0848     all: Annotated[
0849         bool,
0850         typer.Option("--all", help="Analyse all files instead of only changed ones"),
0851     ] = False,
0852     source_root: Annotated[
0853         Path | None,
0854         typer.Option(help="Source root (default: git toplevel)"),
0855     ] = None,
0856     fixes_dir: Annotated[
0857         Path | None,
0858         typer.Option(help="Directory for export-fixes YAML (default: temp dir)"),
0859     ] = None,
0860     from_fixes: Annotated[
0861         Path | None,
0862         typer.Option(help="Re-analyse files from a previous fixes YAML"),
0863     ] = None,
0864     filter_config: Annotated[
0865         Path | None, typer.Option(help="Path to filter.yml")
0866     ] = Path(__file__)
0867     .resolve()
0868     .parent
0869     / "filter.yml",
0870     jobs: Annotated[
0871         int, typer.Option("-j", "--jobs", help="Parallel jobs")
0872     ] = cpu_count(),
0873     clang_tidy: Annotated[str | None, typer.Option(help="clang-tidy binary")] = None,
0874     list_targets: Annotated[
0875         bool,
0876         typer.Option("--list-targets", help="Print resolved targets and exit"),
0877     ] = False,
0878     dry_run: Annotated[
0879         bool,
0880         typer.Option("--dry-run", help="Collect targets but skip clang-tidy execution"),
0881     ] = False,
0882     verbose: Annotated[
0883         bool,
0884         typer.Option("--verbose", "-v", help="Print clang-tidy output for each file"),
0885     ] = False,
0886     trace_includes: Annotated[
0887         bool,
0888         typer.Option(
0889             "--trace-includes", help="Pass -H to the compiler to dump the include tree"
0890         ),
0891     ] = False,
0892 ) -> None:
0893     """Collect targets, run clang-tidy, and optionally persist merged fixes."""
0894     if not dry_run and not list_targets:
0895         if clang_tidy is None:
0896             clang_tidy = shutil.which("clang-tidy")
0897             if clang_tidy is None:
0898                 raise typer.BadParameter("clang-tidy not found on PATH")
0899     assert clang_tidy is not None
0900 
0901     if source_root is None:
0902         source_root = get_source_root()
0903     source_root = source_root.resolve()
0904     build_dir = build_dir.resolve()
0905 
0906     # Load filter config early so path excludes apply during target selection
0907     config = load_filter_config(filter_config)
0908 
0909     compdb_files = load_compdb(build_dir)
0910     console.print(f"Loaded {len(compdb_files)} entries from compile_commands.json")
0911 
0912     # 1) Collect targets
0913     if from_fixes is not None:
0914         targets = collect_targets_from_fixes(
0915             from_fixes,
0916             source_root,
0917             compdb_files,
0918             config.exclude_path_regexes,
0919         )
0920     elif files:
0921         file_list = [str(f) for f in files]
0922         console.print(f"{len(file_list)} file(s) specified as arguments")
0923         targets = resolve_targets(
0924             file_list,
0925             source_root,
0926             compdb_files,
0927             config.exclude_path_regexes,
0928         )
0929     elif all:
0930         targets = collect_all_targets(compdb_files, config.exclude_path_regexes)
0931     else:
0932         if base_ref is None:
0933             raise typer.BadParameter(
0934                 "--base-ref is required unless --all, --from-fixes, or files are given"
0935             )
0936         targets = collect_changed_targets(
0937             base_ref, source_root, compdb_files, config.exclude_path_regexes
0938         )
0939 
0940     if list_targets:
0941         for t in targets:
0942             typer.echo(t)
0943         raise typer.Exit(0)
0944 
0945     if not targets:
0946         console.print("[bold green]No targets to analyse.[/]")
0947         write_empty_fixes(output_fixes)
0948         raise typer.Exit(0)
0949 
0950     if dry_run:
0951         console.print(f"[bold]Dry run:[/] would analyse {len(targets)} file(s)")
0952         raise typer.Exit(0)
0953 
0954     # 2) Run clang-tidy
0955     with tempfile.TemporaryDirectory(prefix="clang-tidy-fixes-") as tmp:
0956         if fixes_dir is None:
0957             fixes_dir = Path(tmp)
0958 
0959         asyncio.run(
0960             run_clang_tidy_on_targets(
0961                 targets,
0962                 build_dir,
0963                 fixes_dir,
0964                 jobs,
0965                 clang_tidy,
0966                 verbose=verbose,
0967                 trace_includes=trace_includes,
0968             )
0969         )
0970 
0971         # 3) Persist merged fixes
0972         merge_fixes_yaml(fixes_dir, output_fixes)
0973 
0974     raise typer.Exit(0)
0975 
0976 
0977 @app.command()
0978 def annotate(
0979     fixes: Annotated[Path, typer.Argument(help="Merged fixes YAML file")],
0980     source_root: Annotated[
0981         Path | None,
0982         typer.Option(help="Source root (default: derived from script location)"),
0983     ] = None,
0984     filter_config: Annotated[
0985         Path | None, typer.Option(help="Path to filter.yml")
0986     ] = Path(__file__)
0987     .resolve()
0988     .parent
0989     / "filter.yml",
0990     exclude_path: Annotated[
0991         list[str] | None,
0992         typer.Option(help="Additional path regex to exclude (repeatable)"),
0993     ] = None,
0994     severity: Annotated[
0995         str, typer.Option(help="Annotation severity (error/warning)")
0996     ] = "error",
0997     verbose: Annotated[
0998         bool,
0999         typer.Option(
1000             "--verbose",
1001             "-v",
1002             help="Print each excluded and deduplicated diagnostic with the reason",
1003         ),
1004     ] = False,
1005     github_annotate: Annotated[
1006         bool,
1007         typer.Option(
1008             "--github-annotate/--no-github-annotate",
1009             help="Emit ::error/::warning workflow commands (default: auto-detect from GITHUB_ACTIONS env var)",
1010         ),
1011     ] = os.environ.get("GITHUB_ACTIONS", "").lower()
1012     == "true",
1013 ) -> None:
1014     """Emit GitHub Actions annotations from a previously saved fixes YAML."""
1015     if source_root is None:
1016         source_root = get_source_root()
1017     source_root = source_root.resolve()
1018 
1019     config = load_filter_config(filter_config)
1020     if exclude_path:
1021         config.exclude_path_regexes.extend(exclude_path)
1022 
1023     code = annotate_from_fixes_file(
1024         fixes,
1025         source_root,
1026         config,
1027         severity,
1028         verbose=verbose,
1029         github_annotate=github_annotate,
1030     )
1031     raise typer.Exit(code)
1032 
1033 
1034 if __name__ == "__main__":
1035     app()