Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:42

0001 """
0002 PanDA Mattermost bot — answers production monitoring questions using Claude with tool use.
0003 
0004 Connects to Mattermost via WebSocket, listens for messages in a target channel,
0005 and responds using Claude with PanDA monitoring tools discovered via MCP.
0006 
0007 On each question, loads recent dialog from the database — all users, all
0008 contexts — giving the bot full awareness of recent conversations. One soft
0009 privacy rule: don't reveal DM content to others in the channel.
0010 
0011 MCP transport: HTTP POST (JSON-RPC) to the Django MCP endpoint — the same
0012 transport Claude Code uses. No SSE, no GET streams, no subprocesses.
0013 """
0014 
0015 import asyncio
0016 import hashlib
0017 import json
0018 import logging
0019 import os
0020 import re
0021 import subprocess
0022 import sys
0023 import tempfile
0024 import time
0025 from datetime import datetime, timezone
0026 
0027 # ChromaDB requires sqlite3 >= 3.35; RHEL8 ships 3.26.
0028 # pysqlite3-binary bundles a modern sqlite3 — swap BEFORE any chromadb import.
0029 try:
0030     __import__("pysqlite3")
0031     sys.modules["sqlite3"] = sys.modules.pop("pysqlite3")
0032 except ImportError:
0033     pass
0034 
0035 import anthropic
0036 import httpx
0037 import numpy as np
0038 from mattermostdriver import Driver
0039 from sentence_transformers import SentenceTransformer
0040 
0041 logger = logging.getLogger('panda_bot')
0042 
0043 MAX_TOOL_ROUNDS = 10
0044 MAX_RESULT_LEN = 10000
0045 MM_POST_LIMIT = 16383
0046 MEMORY_TURNS = 30
0047 MEMORY_USERNAME = 'pandabot'
0048 MCP_URL = os.environ.get(
0049     'MCP_URL', 'https://pandaserver02.sdcc.bnl.gov/swf-monitor/mcp/'
0050 )
0051 BOT_TOOL_PREFIXES = ('panda_', 'pcs_', 'epic_')
0052 
0053 # Stdio MCP servers — launched as subprocesses at startup.
0054 # update_commands: if present, the server can be updated via bot_manage_servers.
0055 STDIO_MCP_SERVERS = []
0056 
0057 _xrootd_server = os.environ.get('XROOTD_MCP_SERVER')
0058 if _xrootd_server:
0059     STDIO_MCP_SERVERS.append({
0060         'name': 'xrootd',
0061         'source': 'github.com/eic/xrootd-mcp-server',
0062         'command': [
0063             os.environ.get('NODE_PATH', '/eic/u/wenauseic/.nvm/versions/node/v22.17.0/bin/node'),
0064             '/data/wenauseic/github/xrootd-mcp-server/build/src/index.js',
0065         ],
0066         'env': {
0067             'XROOTD_SERVER': os.environ.get('XROOTD_SERVER', 'root://dtn-eic.jlab.org'),
0068             'XROOTD_BASE_DIR': os.environ.get('XROOTD_BASE_DIR', '/volatile/eic/EPIC'),
0069         },
0070         'repo_dir': '/data/wenauseic/github/xrootd-mcp-server',
0071         'update_commands': [
0072             'export PATH=/eic/u/wenauseic/.nvm/versions/node/v22.17.0/bin:$PATH && cd /data/wenauseic/github/xrootd-mcp-server && git pull && npm install && npm run build',
0073         ],
0074     })
0075 
0076 _github_token = os.environ.get('GITHUB_PERSONAL_ACCESS_TOKEN')
0077 if _github_token:
0078     STDIO_MCP_SERVERS.append({
0079         'name': 'github',
0080         'source': 'github.com/github/github-mcp-server',
0081         'command': [
0082             '/data/wenauseic/github/github-mcp-server/github-mcp-server', 'stdio',
0083             '--toolsets=issues,pull_requests,actions,code_security,discussions',
0084         ],
0085         'env': {
0086             'GITHUB_PERSONAL_ACCESS_TOKEN': _github_token,
0087         },
0088         'repo_dir': '/data/wenauseic/github/github-mcp-server',
0089         'update_commands': [
0090             'cd /data/wenauseic/github/github-mcp-server && git pull && PATH=$PATH:/usr/local/go/bin go build -o github-mcp-server ./cmd/github-mcp-server',
0091         ],
0092     })
0093 
0094 _zenodo_key = os.environ.get('ZENODO_API_KEY')
0095 if _zenodo_key:
0096     STDIO_MCP_SERVERS.append({
0097         'name': 'zenodo',
0098         'source': 'github.com/eic/zenodo-mcp-server',
0099         'command': [
0100             os.environ.get('NODE_PATH', '/eic/u/wenauseic/.nvm/versions/node/v22.17.0/bin/node'),
0101             '/data/wenauseic/github/zenodo-mcp-server/build/src/index.js',
0102         ],
0103         'env': {
0104             'ZENODO_API_KEY': _zenodo_key,
0105         },
0106         'repo_dir': '/data/wenauseic/github/zenodo-mcp-server',
0107         'update_commands': [
0108             'export PATH=/eic/u/wenauseic/.nvm/versions/node/v22.17.0/bin:$PATH && cd /data/wenauseic/github/zenodo-mcp-server && git pull && npm install && npm run build',
0109         ],
0110     })
0111 
0112 STDIO_MCP_SERVERS.append({
0113     'name': 'lxr',
0114     'source': 'github.com/BNLNPPS/lxr-mcp-server',
0115     'command': [
0116         os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0117                      'swf-testbed/.venv/bin/python'),
0118         '/data/wenauseic/github/lxr-mcp-server/lxr_mcp_server.py',
0119     ],
0120     'env': {},
0121     'repo_dir': '/data/wenauseic/github/lxr-mcp-server',
0122     'update_commands': [
0123         'cd /data/wenauseic/github/lxr-mcp-server && git pull',
0124     ],
0125 })
0126 
0127 STDIO_MCP_SERVERS.append({
0128     'name': 'uproot',
0129     'source': 'github.com/eic/uproot-mcp-server',
0130     'command': [
0131         os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0132                      'swf-testbed/.venv/bin/uproot-mcp-server'),
0133     ],
0134     'env': {},
0135     'repo_dir': '/data/wenauseic/github/uproot-mcp-server',
0136     'update_commands': [
0137         'cd /data/wenauseic/github/uproot-mcp-server && git pull && '
0138         + os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0139                        'swf-testbed/.venv/bin/pip')
0140         + ' install -e ".[xrootd]"',
0141     ],
0142 })
0143 
0144 STDIO_MCP_SERVERS.append({
0145     'name': 'jlab-rucio',
0146     'prefix': 'jlab_rucio_',
0147     'source': 'github.com/BNLNPPS/rucio-eic-mcp-server',
0148     'command': [
0149         os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0150                      'swf-testbed/.venv/bin/python'),
0151         '/data/wenauseic/github/rucio-eic-mcp-server/rucio_eic_mcp_server.py',
0152     ],
0153     'env': {
0154         'RUCIO_URL': 'https://rucio-server.jlab.org:443',
0155         'RUCIO_AUTH_TYPE': 'userpass',
0156         'RUCIO_ACCOUNT': 'eicread',
0157         'RUCIO_USERNAME': 'eicread',
0158         'RUCIO_PASSWORD': 'eicread',
0159         'RUCIO_CA_BUNDLE': 'false',
0160         'TOKEN_FILE_PATH': '/tmp/rucio_eic_jlab_token.txt',
0161     },
0162     'repo_dir': '/data/wenauseic/github/rucio-eic-mcp-server',
0163     'update_commands': [
0164         'cd /data/wenauseic/github/rucio-eic-mcp-server && git pull && '
0165         + os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0166                        'swf-testbed/.venv/bin/pip')
0167         + ' install -e .',
0168     ],
0169 })
0170 
0171 STDIO_MCP_SERVERS.append({
0172     'name': 'bnl-rucio',
0173     'prefix': 'bnl_rucio_',
0174     'source': 'github.com/BNLNPPS/rucio-eic-mcp-server',
0175     'command': [
0176         os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0177                      'swf-testbed/.venv/bin/python'),
0178         '/data/wenauseic/github/rucio-eic-mcp-server/rucio_eic_mcp_server.py',
0179     ],
0180     'env': {
0181         'RUCIO_URL': 'https://nprucio01.sdcc.bnl.gov:443',
0182         'RUCIO_AUTH_TYPE': 'x509',
0183         'RUCIO_ACCOUNT': 'panda',
0184         'RUCIO_VO': 'eic',
0185         'X509_USER_PROXY': '/data/wenauseic/longproxy-for-rucio',
0186         'TOKEN_FILE_PATH': '/tmp/rucio_eic_bnl_token.txt',
0187     },
0188     'repo_dir': '/data/wenauseic/github/rucio-eic-mcp-server',
0189     'update_commands': [
0190         'cd /data/wenauseic/github/rucio-eic-mcp-server && git pull && '
0191         + os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0192                        'swf-testbed/.venv/bin/pip')
0193         + ' install -e .',
0194     ],
0195 })
0196 
0197 # Virtual tool definition for server management
0198 BOT_MANAGE_SERVERS_TOOL = {
0199     "name": "bot_manage_servers",
0200     "description": (
0201         "List or update the bot's MCP servers. "
0202         "action='list' shows all servers and which are updatable. "
0203         "action='update', server_name='xrootd' pulls latest code, rebuilds, and restarts that server."
0204     ),
0205     "input_schema": {
0206         "type": "object",
0207         "properties": {
0208             "action": {
0209                 "type": "string",
0210                 "enum": ["list", "update"],
0211                 "description": "Action to perform.",
0212             },
0213             "server_name": {
0214                 "type": "string",
0215                 "description": "Server to update (required for action='update').",
0216             },
0217         },
0218         "required": ["action"],
0219     },
0220 }
0221 
0222 # ── ePIC Doc Search (virtual tools — handled in-process) ───────────────────
0223 
0224 CHROMA_PATH = "/data/wenauseic/github/swf-monitor/chroma_db"
0225 CHROMA_COLLECTION = "bamboo_docs"
0226 
0227 EPIC_DOC_SEARCH_TOOL = {
0228     "name": "epic_doc_search",
0229     "description": (
0230         "Search ePIC documentation by natural-language query (semantic vector search). "
0231         "Covers SWF testbed, SWF monitor, Bamboo/PanDA, EICrecon, containers, ePIC production, "
0232         "EIC master docs, afterburner, eic-shell, and more. "
0233         "Use for conceptual 'how does X work?' questions about the software and experiment."
0234     ),
0235     "input_schema": {
0236         "type": "object",
0237         "properties": {
0238             "query": {
0239                 "type": "string",
0240                 "description": "Natural-language question (e.g. 'how does fast processing work?').",
0241             },
0242             "top_k": {
0243                 "type": "integer",
0244                 "description": "Number of results (default 5, max 20).",
0245                 "default": 5,
0246             },
0247         },
0248         "required": ["query"],
0249     },
0250 }
0251 
0252 EPIC_DOC_CONTENTS_TOOL = {
0253     "name": "epic_doc_contents",
0254     "description": (
0255         "Show what's in epicdoc — table of contents of all indexed ePIC documentation. "
0256         "Lists every source and document with chunk counts. Use to discover what documentation "
0257         "is searchable via epic_doc_search."
0258     ),
0259     "input_schema": {
0260         "type": "object",
0261         "properties": {},
0262     },
0263 }
0264 
0265 
0266 class DocSearchHandler:
0267     """Handles epic_doc_search and epic_doc_contents using a ChromaDB vector store.
0268 
0269     Lazy-loads ChromaDB on first call, caches the collection handle.
0270     Runs in the long-lived bot process so the embedding model loads once.
0271     """
0272 
0273     def __init__(self):
0274         self._collection = None
0275         self._init_error = None
0276 
0277     def _ensure_collection(self):
0278         """Lazy-load ChromaDB collection. Returns error string or None."""
0279         if self._collection is not None:
0280             return None
0281         if self._init_error is not None:
0282             return self._init_error
0283 
0284         try:
0285             import chromadb
0286             from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
0287         except ImportError:
0288             self._init_error = "chromadb not installed"
0289             return self._init_error
0290 
0291         if not os.path.exists(CHROMA_PATH):
0292             self._init_error = f"ChromaDB path not found: {CHROMA_PATH}"
0293             return self._init_error
0294 
0295         try:
0296             os.environ.setdefault("HF_HOME", "/opt/swf-monitor/shared/hf_cache")
0297             ef = SentenceTransformerEmbeddingFunction("all-MiniLM-L6-v2")
0298             client = chromadb.PersistentClient(path=CHROMA_PATH)
0299             self._collection = client.get_collection(CHROMA_COLLECTION, embedding_function=ef)
0300             logger.info(
0301                 f"DocSearch: loaded collection '{CHROMA_COLLECTION}' "
0302                 f"({self._collection.count()} chunks)"
0303             )
0304         except Exception as e:
0305             self._init_error = f"ChromaDB init failed: {e}"
0306             return self._init_error
0307         return None
0308 
0309     async def search(self, arguments: dict) -> str:
0310         """Handle epic_doc_search."""
0311         query = str(arguments.get("query", "")).strip()
0312         if not query:
0313             return json.dumps({"error": "query is required"})
0314 
0315         top_k = max(1, min(int(arguments.get("top_k", 5)), 20))
0316 
0317         err = await asyncio.to_thread(self._ensure_collection)
0318         if err:
0319             return json.dumps({"error": err})
0320 
0321         try:
0322             raw = await asyncio.to_thread(
0323                 self._collection.query,
0324                 query_texts=[query], n_results=top_k,
0325                 include=["documents", "metadatas", "distances"],
0326             )
0327         except Exception as e:
0328             return json.dumps({"error": f"ChromaDB query failed: {e}"})
0329 
0330         results = []
0331         docs = (raw.get("documents") or [[]])[0]
0332         metas = (raw.get("metadatas") or [[]])[0]
0333         dists = (raw.get("distances") or [[]])[0]
0334         for doc, meta, dist in zip(docs, metas, dists):
0335             score = max(0, (1 - dist) * 100)
0336             results.append({
0337                 "score": round(score),
0338                 "source": meta.get("source", "?"),
0339                 "file": meta.get("rel_path", "?"),
0340                 "excerpt": doc[:1500],
0341             })
0342         return json.dumps({"query": query, "results": results})
0343 
0344     async def contents(self, arguments: dict) -> str:
0345         """Handle epic_doc_contents."""
0346         err = await asyncio.to_thread(self._ensure_collection)
0347         if err:
0348             return json.dumps({"error": err})
0349 
0350         try:
0351             all_meta = await asyncio.to_thread(
0352                 self._collection.get, include=["metadatas"],
0353             )
0354         except Exception as e:
0355             return json.dumps({"error": f"ChromaDB get failed: {e}"})
0356 
0357         sources = {}
0358         for meta in all_meta.get("metadatas", []):
0359             src = meta.get("source", "unknown")
0360             rel = meta.get("rel_path", "?")
0361             total = meta.get("total_chunks", 1)
0362             if src not in sources:
0363                 sources[src] = {}
0364             sources[src][rel] = total
0365 
0366         toc = {}
0367         total_chunks = 0
0368         total_files = 0
0369         for src, files in sorted(sources.items()):
0370             file_list = []
0371             for rel, chunks in sorted(files.items()):
0372                 file_list.append({"file": rel, "chunks": chunks})
0373                 total_chunks += chunks
0374                 total_files += 1
0375             toc[src] = file_list
0376 
0377         return json.dumps({
0378             "summary": f"{total_files} documents, {total_chunks} chunks across {len(toc)} sources",
0379             "sources": toc,
0380         })
0381 
0382 
0383 SYSTEM_PROMPT_FILE = os.getenv(
0384     'PANDABOT_SYSTEM_PROMPT',
0385     os.path.join(os.path.dirname(__file__), 'system_prompt.txt'),
0386 )
0387 
0388 
0389 def _load_system_preamble():
0390     """Read system prompt from file, fresh on every call."""
0391     try:
0392         with open(SYSTEM_PROMPT_FILE) as f:
0393             return f.read()
0394     except FileNotFoundError:
0395         return "You are the PanDA bot for the ePIC experiment."
0396 
0397 
0398 class MCPClient:
0399     """Minimal MCP client using HTTP POST only — no SSE, no GET streams."""
0400 
0401     def __init__(self, url: str):
0402         self.url = url
0403         self.session_id = None
0404         self._request_id = 0
0405         self._http = httpx.AsyncClient(timeout=60)
0406 
0407     async def _post(self, method: str, params: dict | None = None):
0408         self._request_id += 1
0409         body = {"jsonrpc": "2.0", "id": self._request_id, "method": method}
0410         if params:
0411             body["params"] = params
0412         headers = {"Content-Type": "application/json", "Accept": "application/json"}
0413         if self.session_id:
0414             headers["Mcp-Session-Id"] = self.session_id
0415         resp = await self._http.post(self.url, json=body, headers=headers)
0416         resp.raise_for_status()
0417         if "Mcp-Session-Id" in resp.headers:
0418             self.session_id = resp.headers["Mcp-Session-Id"]
0419         return resp.json()
0420 
0421     async def initialize(self):
0422         resp = await self._post("initialize", {
0423             "protocolVersion": "2025-03-26",
0424             "capabilities": {},
0425             "clientInfo": {"name": "panda-bot", "version": "1.0"},
0426         })
0427         self.server_instructions = (
0428             resp.get("result", {}).get("instructions", "")
0429         )
0430         return resp
0431 
0432     async def list_tools(self):
0433         result = await self._post("tools/list")
0434         return result.get("result", {}).get("tools", [])
0435 
0436     async def call_tool(self, name: str, arguments: dict):
0437         result = await self._post("tools/call", {
0438             "name": name, "arguments": arguments,
0439         })
0440         return result.get("result", {})
0441 
0442     async def close(self):
0443         await self._http.aclose()
0444 
0445 
0446 class StdioMCPClient:
0447     """MCP client for subprocess-based servers using stdio (stdin/stdout JSON-RPC)."""
0448 
0449     def __init__(self, name: str, command: list, env: dict = None, args: list = None):
0450         self.name = name
0451         self.command = command + (args or [])
0452         self.env = {**os.environ, **(env or {})}
0453         self._request_id = 0
0454         self._process = None
0455         self.server_instructions = ""
0456 
0457     async def start(self):
0458         """Launch the subprocess."""
0459         self._process = await asyncio.create_subprocess_exec(
0460             *self.command,
0461             stdin=asyncio.subprocess.PIPE,
0462             stdout=asyncio.subprocess.PIPE,
0463             stderr=asyncio.subprocess.PIPE,
0464             env=self.env,
0465             limit=1024 * 1024,  # 1 MB readline buffer (default 64 KB too small for large tool lists)
0466         )
0467         logger.info(f"Stdio MCP '{self.name}' started (pid {self._process.pid})")
0468 
0469     async def _request(self, method: str, params: dict | None = None):
0470         if not self._process or self._process.returncode is not None:
0471             raise RuntimeError(f"Stdio MCP '{self.name}' not running")
0472         self._request_id += 1
0473         req_id = self._request_id
0474         body = {"jsonrpc": "2.0", "id": req_id, "method": method}
0475         if params:
0476             body["params"] = params
0477         line = json.dumps(body) + "\n"
0478         self._process.stdin.write(line.encode())
0479         await self._process.stdin.drain()
0480 
0481         # Read lines until we get the response matching our request ID,
0482         # skipping any server notifications (no "id" field)
0483         while True:
0484             resp_line = await asyncio.wait_for(
0485                 self._process.stdout.readline(), timeout=60
0486             )
0487             if not resp_line:
0488                 raise RuntimeError(f"Stdio MCP '{self.name}' closed stdout")
0489             msg = json.loads(resp_line)
0490             if "id" in msg and msg["id"] == req_id:
0491                 return msg
0492             # Skip notifications and other non-response messages
0493             logger.debug(f"Stdio MCP '{self.name}' skipped: {msg.get('method', '?')}")
0494 
0495     async def initialize(self):
0496         resp = await self._request("initialize", {
0497             "protocolVersion": "2025-03-26",
0498             "capabilities": {},
0499             "clientInfo": {"name": "panda-bot", "version": "1.0"},
0500         })
0501         self.server_instructions = (
0502             resp.get("result", {}).get("instructions", "")
0503         )
0504         # Send initialized notification
0505         notif = {"jsonrpc": "2.0", "method": "notifications/initialized"}
0506         self._process.stdin.write((json.dumps(notif) + "\n").encode())
0507         await self._process.stdin.drain()
0508         return resp
0509 
0510     async def list_tools(self):
0511         result = await self._request("tools/list")
0512         return result.get("result", {}).get("tools", [])
0513 
0514     async def call_tool(self, name: str, arguments: dict):
0515         result = await self._request("tools/call", {
0516             "name": name, "arguments": arguments,
0517         })
0518         return result.get("result", {})
0519 
0520     async def close(self):
0521         if self._process and self._process.returncode is None:
0522             self._process.stdin.close()
0523             try:
0524                 await asyncio.wait_for(self._process.wait(), timeout=5)
0525             except asyncio.TimeoutError:
0526                 self._process.kill()
0527             logger.info(f"Stdio MCP '{self.name}' stopped")
0528 
0529 
0530 def mcp_tool_to_anthropic(tool):
0531     """Convert an MCP tool definition to Anthropic Messages API format."""
0532     return {
0533         "name": tool["name"],
0534         "description": tool.get("description", ""),
0535         "input_schema": tool["inputSchema"],
0536     }
0537 
0538 
0539 SELECT_TOOLS_TOOL = {
0540     "name": "select_tools",
0541     "description": (
0542         "Load additional tools by name from the tool catalog. "
0543         "Call this when the pre-loaded tools don't cover what you need. "
0544         "The tool catalog is listed in your system prompt."
0545     ),
0546     "input_schema": {
0547         "type": "object",
0548         "properties": {
0549             "names": {
0550                 "type": "array",
0551                 "items": {"type": "string"},
0552                 "description": "Tool names to load from the catalog.",
0553             },
0554         },
0555         "required": ["names"],
0556     },
0557 }
0558 
0559 # Number of tools to pre-load via semantic matching
0560 TOP_K_TOOLS = 8
0561 
0562 
0563 class ToolSelector:
0564     """Selects relevant tools for a user message via semantic similarity.
0565 
0566     At startup, embeds all tool descriptions into vectors. Tool names are
0567     prefixed with their MCP server name for embedding (e.g. "github:get_job_logs")
0568     so the model can distinguish tools from different domains. Returned names
0569     are unprefixed (the actual tool name for dispatch).
0570     """
0571 
0572     def __init__(self):
0573         self._model = SentenceTransformer('all-MiniLM-L6-v2')
0574         self._tool_names: list[str] = []
0575         self._tool_embeddings: np.ndarray | None = None
0576 
0577     def build_index(self, tool_registry: dict[str, dict], server_map: dict[str, str]):
0578         """Embed all tool descriptions with server-prefixed names.
0579 
0580         Args:
0581             tool_registry: tool_name → Anthropic tool dict
0582             server_map: tool_name → server name (e.g. 'github', 'xrootd', 'swf-monitor')
0583         """
0584         self._tool_names = []
0585         texts = []
0586         for name, tool in tool_registry.items():
0587             self._tool_names.append(name)
0588             server = server_map.get(name, 'unknown')
0589             texts.append(f"{server}:{name}: {tool['description']}")
0590         self._tool_embeddings = self._model.encode(texts, normalize_embeddings=True)
0591         logger.info(f"ToolSelector: indexed {len(self._tool_names)} tools")
0592 
0593     def select(self, message: str, top_k: int = TOP_K_TOOLS) -> list[tuple[str, float]]:
0594         """Return top-K tool names with scores, ranked by relevance."""
0595         if self._tool_embeddings is None or len(self._tool_names) == 0:
0596             return []
0597         msg_embedding = self._model.encode(message, normalize_embeddings=True)
0598         scores = self._tool_embeddings @ msg_embedding
0599         top_indices = np.argsort(scores)[-top_k:][::-1]
0600         return [(self._tool_names[i], float(scores[i])) for i in top_indices]
0601 
0602 
0603 class PandaBot:
0604     """Mattermost bot that answers PanDA production questions via Claude.
0605 
0606     On each message, loads recent dialog from the database — all users,
0607     all contexts — so the bot has full awareness of the community.
0608     """
0609 
0610     def __init__(self):
0611         self.mm_url = os.environ.get('MATTERMOST_URL', 'chat.epic-eic.org')
0612         self.mm_token = os.environ['MATTERMOST_TOKEN']
0613         self.mm_team = os.environ.get('MATTERMOST_TEAM', 'main')
0614         self.mm_channel_name = os.environ.get('MATTERMOST_CHANNEL', 'pandabot')
0615         self.mcp_url = MCP_URL
0616 
0617         self.claude = anthropic.AsyncAnthropic()
0618 
0619         self.driver = Driver({
0620             'url': self.mm_url,
0621             'token': self.mm_token,
0622             'scheme': 'https',
0623             'port': 443,
0624         })
0625 
0626         self.bot_user_id = None
0627         self.channel_id = None
0628         self.system_prompt = _load_system_preamble()
0629         self.anthropic_tools = []
0630         self._tool_registry: dict[str, dict] = {}  # tool_name → Anthropic tool dict
0631         self._tool_server_map: dict[str, str] = {}  # tool_name → server name
0632         self._tool_router: dict[str, object] = {}  # tool_name → MCP client
0633         self._tool_original_name: dict[str, str] = {}  # prefixed_name → original MCP tool name
0634         self._stdio_clients: list[StdioMCPClient] = []
0635         self._tool_selector = ToolSelector()
0636         self._doc_handler = DocSearchHandler()
0637         self._respond_lock = asyncio.Lock()
0638         self._active_threads = set()
0639         self._mm_user_cache: dict[str, str] = {}
0640 
0641     async def _resolve_mm_username(self, mm_user_id):
0642         """Look up Mattermost username from user ID, with caching."""
0643         if mm_user_id in self._mm_user_cache:
0644             return self._mm_user_cache[mm_user_id]
0645         try:
0646             user = await asyncio.to_thread(
0647                 self.driver.users.get_user, mm_user_id
0648             )
0649             username = user.get('username', '')
0650             self._mm_user_cache[mm_user_id] = username
0651             return username
0652         except Exception:
0653             logger.exception(f"Failed to resolve user {mm_user_id}")
0654             return ''
0655 
0656     def _build_system_prompt(self):
0657         """System prompt — re-read from file on every message so edits take effect live."""
0658         now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
0659         preamble = _load_system_preamble()
0660         instructions = getattr(self, '_server_instructions', '')
0661         prompt = preamble
0662         if instructions:
0663             prompt += "\n" + instructions
0664         return f"Current date and time: {now}\n\n{prompt}"
0665 
0666     @staticmethod
0667     async def _git_version(repo_dir):
0668         """Get short version string from a git repo: hash + date + tag if any."""
0669         try:
0670             proc = await asyncio.create_subprocess_exec(
0671                 'git', 'log', '-1', '--format=%h %ci',
0672                 cwd=repo_dir,
0673                 stdout=asyncio.subprocess.PIPE,
0674                 stderr=asyncio.subprocess.PIPE,
0675             )
0676             stdout, _ = await proc.communicate()
0677             parts = stdout.decode().strip().split(' ', 1)
0678             commit_hash = parts[0] if parts else '?'
0679             commit_date = parts[1][:10] if len(parts) > 1 else ''
0680             # Try to get a tag
0681             proc2 = await asyncio.create_subprocess_exec(
0682                 'git', 'describe', '--tags', '--always',
0683                 cwd=repo_dir,
0684                 stdout=asyncio.subprocess.PIPE,
0685                 stderr=asyncio.subprocess.PIPE,
0686             )
0687             stdout2, _ = await proc2.communicate()
0688             tag = stdout2.decode().strip()
0689             return f"{tag} ({commit_date})" if tag != commit_hash else f"{commit_hash} ({commit_date})"
0690         except Exception:
0691             return 'unknown'
0692 
0693     async def _handle_manage_servers(self, arguments):
0694         """Handle the bot_manage_servers virtual tool."""
0695         action = arguments.get('action', 'list')
0696 
0697         if action == 'list':
0698             lines = [
0699                 "POST THIS TABLE EXACTLY AS-IS — do not reformat or omit columns:",
0700                 "",
0701                 "| Server | Type | Version | Updatable |",
0702                 "| --- | --- | --- | --- |",
0703             ]
0704             swf_ver = await self._git_version('/data/wenauseic/github/swf-monitor')
0705             lines.append(f"| swf-monitor | HTTP (PanDA, PCS, memory) | {swf_ver} | no |")
0706             for cfg in STDIO_MCP_SERVERS:
0707                 ver = await self._git_version(cfg['repo_dir']) if cfg.get('repo_dir') else '?'
0708                 upd = 'yes' if cfg.get('update_commands') else 'no'
0709                 lines.append(f"| {cfg['name']} | stdio ({cfg.get('source', '')}) | {ver} | {upd} |")
0710             return '\n'.join(lines)
0711 
0712         if action == 'update':
0713             name = arguments.get('server_name', '')
0714             cfg = next((s for s in STDIO_MCP_SERVERS if s['name'] == name), None)
0715             if not cfg:
0716                 return json.dumps({'error': f"Unknown server '{name}'"})
0717             if not cfg.get('update_commands'):
0718                 return json.dumps({'error': f"Server '{name}' is not updatable"})
0719 
0720             # Run update commands
0721             output_lines = []
0722             for cmd in cfg['update_commands']:
0723                 logger.info(f"Updating '{name}': {cmd}")
0724                 proc = await asyncio.create_subprocess_shell(
0725                     cmd,
0726                     stdout=asyncio.subprocess.PIPE,
0727                     stderr=asyncio.subprocess.STDOUT,
0728                 )
0729                 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=120)
0730                 output_lines.append(stdout.decode().strip())
0731                 if proc.returncode != 0:
0732                     return json.dumps({
0733                         'error': f"Update command failed (exit {proc.returncode})",
0734                         'output': '\n'.join(output_lines),
0735                     })
0736 
0737             # Restart the stdio server
0738             old_client = next((c for c in self._stdio_clients if c.name == name), None)
0739             if old_client:
0740                 # Collect old tool names before removing routes
0741                 old_tool_names = {
0742                     t for t, c in self._tool_router.items() if c is old_client
0743                 }
0744                 for t in old_tool_names:
0745                     del self._tool_router[t]
0746                 self.anthropic_tools = [
0747                     t for t in self.anthropic_tools if t['name'] not in old_tool_names
0748                 ]
0749                 self._stdio_clients.remove(old_client)
0750                 await old_client.close()
0751 
0752             # Start fresh
0753             try:
0754                 client = StdioMCPClient(
0755                     name=cfg['name'],
0756                     command=cfg['command'],
0757                     env=cfg.get('env'),
0758                 )
0759                 await client.start()
0760                 await client.initialize()
0761                 tools = await client.list_tools()
0762                 prefix = cfg.get('prefix', '')
0763                 for t in tools:
0764                     at = mcp_tool_to_anthropic(t)
0765                     original_name = at["name"]
0766                     if prefix:
0767                         at["name"] = f"{prefix}{original_name}"
0768                     self.anthropic_tools.append(at)
0769                     self._tool_router[at["name"]] = client
0770                     self._tool_original_name[at["name"]] = original_name
0771                 self._stdio_clients.append(client)
0772                 return json.dumps({
0773                     'success': True,
0774                     'server': name,
0775                     'tools_count': len(tools),
0776                     'update_output': '\n'.join(output_lines),
0777                 })
0778             except Exception as e:
0779                 return json.dumps({'error': f"Restart failed: {e}"})
0780 
0781         return json.dumps({'error': f"Unknown action '{action}'"})
0782 
0783     @staticmethod
0784     def _generate_dpid():
0785         """Generate a short unique Data Provenance ID."""
0786         raw = f"{time.time():.6f}-{os.getpid()}"
0787         return hashlib.sha256(raw.encode()).hexdigest()[:8].upper()
0788 
0789     async def _record_dpid(self, dpid, tool_name, tool_args):
0790         """Record a DPID to the database."""
0791         from monitor_app.models import DataProvenance
0792         try:
0793             await asyncio.to_thread(
0794                 DataProvenance.objects.create,
0795                 dpid=dpid, tool_name=tool_name, tool_args=tool_args,
0796             )
0797         except Exception:
0798             logger.exception(f"Failed to record DPID:{dpid}")
0799 
0800     async def _setup_mcp(self):
0801         """Discover tools from all MCP servers (HTTP + stdio).
0802 
0803         Builds a tool registry (full schemas) and a semantic index for
0804         progressive tool loading. Tools are selected per-message based
0805         on relevance rather than loaded all at once.
0806         """
0807         # 1. HTTP MCP server (Django — PanDA, PCS, memory tools)
0808         mcp = MCPClient(self.mcp_url)
0809         try:
0810             await mcp.initialize()
0811             tools = await mcp.list_tools()
0812             for t in tools:
0813                 if t["name"].startswith(BOT_TOOL_PREFIXES):
0814                     at = mcp_tool_to_anthropic(t)
0815                     self._tool_registry[at["name"]] = at
0816                     self._tool_server_map[at["name"]] = "swf-monitor"
0817             logger.info(f"HTTP MCP: {len(self._tool_registry)} tools")
0818             if mcp.server_instructions:
0819                 self._server_instructions = mcp.server_instructions
0820         except Exception:
0821             logger.exception("Failed HTTP MCP setup — will retry on first message")
0822         finally:
0823             await mcp.close()
0824 
0825         # 2. Stdio MCP servers (xrootd, github, etc.)
0826         for server_cfg in STDIO_MCP_SERVERS:
0827             try:
0828                 client = StdioMCPClient(
0829                     name=server_cfg['name'],
0830                     command=server_cfg['command'],
0831                     env=server_cfg.get('env'),
0832                     args=server_cfg.get('args'),
0833                 )
0834                 await client.start()
0835                 await client.initialize()
0836                 tools = await client.list_tools()
0837                 prefix = server_cfg.get('prefix', '')
0838                 for t in tools:
0839                     at = mcp_tool_to_anthropic(t)
0840                     original_name = at["name"]
0841                     if prefix:
0842                         at["name"] = f"{prefix}{original_name}"
0843                     self._tool_registry[at["name"]] = at
0844                     self._tool_server_map[at["name"]] = server_cfg['name']
0845                     self._tool_router[at["name"]] = client
0846                     self._tool_original_name[at["name"]] = original_name
0847                 self._stdio_clients.append(client)
0848                 logger.info(
0849                     f"Stdio MCP '{client.name}': {len(tools)} tools"
0850                 )
0851             except Exception:
0852                 logger.exception(
0853                     f"Failed to start stdio MCP '{server_cfg['name']}'"
0854                 )
0855 
0856         # 3. Virtual tools (handled by the bot itself)
0857         self._tool_registry['bot_manage_servers'] = BOT_MANAGE_SERVERS_TOOL
0858         self._tool_registry['epic_doc_search'] = EPIC_DOC_SEARCH_TOOL
0859         self._tool_registry['epic_doc_contents'] = EPIC_DOC_CONTENTS_TOOL
0860         self._tool_server_map['epic_doc_search'] = 'epicdoc'
0861         self._tool_server_map['epic_doc_contents'] = 'epicdoc'
0862 
0863         # 3b. Pre-load ChromaDB so first doc query is instant
0864         err = self._doc_handler._ensure_collection()
0865         if err:
0866             logger.warning(f"DocSearch init deferred: {err}")
0867 
0868         # 4. Build semantic index with server-prefixed names for domain separation.
0869         # "github:get_job_logs" vs "panda:panda_list_jobs" gives the embedding
0870         # model semantic context to distinguish CI jobs from PanDA jobs.
0871         self._tool_selector.build_index(self._tool_registry, self._tool_server_map)
0872 
0873         logger.info(f"Total tools in registry: {len(self._tool_registry)}")
0874 
0875     def _build_tool_catalog(self):
0876         """One-liner catalog of all tools for the system prompt."""
0877         lines = [
0878             "TOOL AWARENESS — three tiers:",
0879             "1. CATALOG: All tools are in your system prompt as one-liners — full awareness at minimal token cost.",
0880             "2. PRE-LOADED: Tools you can call directly — pre-loaded based on relevance to this query.",
0881             "3. select_tools: Call this to load any catalog tool that isn't pre-loaded. You are never limited to pre-loaded tools.",
0882             "",
0883             "Full tool catalog:",
0884         ]
0885         for name, tool in sorted(self._tool_registry.items()):
0886             desc = tool["description"].split('\n')[0][:120]
0887             lines.append(f"- {name}: {desc}")
0888         return "\n".join(lines)
0889 
0890     # Stdio servers only included when the user's message mentions the domain.
0891     # Maps server name → keywords that trigger inclusion.
0892     _SERVER_KEYWORDS = {
0893         'github': ('github', 'repo', 'pr ', 'pull request', 'issue', 'commit', 'branch', 'discussion'),
0894         'xrootd': ('xrootd', 'file', 'storage', 'directory', 'volatile'),
0895         'zenodo': ('zenodo', 'record', 'doi', 'deposit'),
0896         'lxr': ('lxr', 'code browser', 'cross-reference', 'source code', 'identifier',
0897                 'class definition', 'where is', 'defined', 'header file', 'algorithm'),
0898         'uproot': ('uproot', 'root file', '.root', 'ttree', 'branch', 'histogram',
0899                    'root data', 'hepdata', 'ntuple'),
0900         'jlab-rucio': ('rucio', 'did', 'dids', 'dataset', 'datasets', 'container',
0901                        'replica', 'replicas', 'rse', 'rses', 'scope', 'replication',
0902                        'pwg', 'jlab', 'jefferson'),
0903         'bnl-rucio': ('bnl rucio', 'brookhaven rucio', 'nprucio', 'panda rucio',
0904                       'bnl did', 'bnl dataset', 'bnl replica'),
0905     }
0906 
0907     def _extract_thread_tool_history(self, thread_context: str | None) -> tuple[set[str], set[str]]:
0908         """Extract tool history from prior bot replies in a thread.
0909 
0910         Parses (tools suggested: ...) and (tools used: ...) metadata lines
0911         from bot messages in the thread.
0912 
0913         Returns (prior_servers, prior_tools) where:
0914         - prior_servers: servers of tools actually used in prior turns
0915         - prior_tools: top 3 suggested tool names from each prior turn
0916         """
0917         prior_servers = set()
0918         prior_tools = set()
0919         if not thread_context:
0920             return prior_servers, prior_tools
0921 
0922         for line in thread_context.split('\n'):
0923             if not line.startswith('Bot:'):
0924                 continue
0925             # Extract used tools → their servers
0926             used_match = re.search(r'\(tools used:\s*([^)]+)\)', line)
0927             if used_match and used_match.group(1).strip() != 'none':
0928                 for tool_name in used_match.group(1).split(','):
0929                     tool_name = tool_name.strip()
0930                     server = self._tool_server_map.get(tool_name)
0931                     if server:
0932                         prior_servers.add(server)
0933             # Extract top 3 suggested tools (name:score format)
0934             sugg_match = re.search(r'\(tools suggested:\s*([^)]+)\)', line)
0935             if sugg_match:
0936                 entries = sugg_match.group(1).split(',')
0937                 for entry in entries[:3]:
0938                     name = entry.strip().split(':')[0]
0939                     if name and name != 'none':
0940                         prior_tools.add(name)
0941 
0942         return prior_servers, prior_tools
0943 
0944     def _select_tools_for_message(self, message: str, thread_context: str | None = None) -> tuple[list[dict], list[tuple[str, float]]]:
0945         """Pick tools for this message: semantic top-K + thread history + always-on tools.
0946 
0947         Tool set is built from three sources:
0948         1. All tools from servers used in prior thread turns
0949         2. Top 3 suggested tools from each prior thread turn
0950         3. Top-K from vector search on the current message
0951 
0952         Strips the [username in #channel] tag before embedding. Excludes
0953         stdio server tools unless activated by keyword or thread history.
0954 
0955         Returns (active_tools, scored_names) where scored_names is
0956         [(name, score), ...] in ranked order.
0957         """
0958         # Strip context tag before embedding
0959         clean_message = re.sub(r'^\[.*?\]\s*', '', message)
0960         msg_lower = clean_message.lower()
0961 
0962         # Thread history: servers used + top suggestions from prior turns
0963         prior_servers, prior_tools = self._extract_thread_tool_history(thread_context)
0964 
0965         # Determine which servers are relevant
0966         allowed_servers = {'swf-monitor', 'epicdoc'} | prior_servers
0967         for server, keywords in self._SERVER_KEYWORDS.items():
0968             if any(kw in msg_lower for kw in keywords):
0969                 allowed_servers.add(server)
0970 
0971         # Start with prior suggested tools (carry forward from thread)
0972         tools = []
0973         scored = []
0974         seen = set()
0975         for name in prior_tools:
0976             if name in self._tool_registry and name not in seen:
0977                 server = self._tool_server_map.get(name, 'unknown')
0978                 if server in allowed_servers:
0979                     tools.append(self._tool_registry[name])
0980                     seen.add(name)
0981 
0982         # Add vector search results for current message
0983         all_scored = self._tool_selector.select(clean_message, top_k=TOP_K_TOOLS)
0984         for name, score in all_scored:
0985             server = self._tool_server_map.get(name, 'unknown')
0986             if server not in allowed_servers:
0987                 continue
0988             if name in self._tool_registry and name not in seen:
0989                 tools.append(self._tool_registry[name])
0990                 scored.append((name, score))
0991                 seen.add(name)
0992         # Always include virtual tools
0993         for name in ('bot_manage_servers',):
0994             if name in self._tool_registry and name not in seen:
0995                 tools.append(self._tool_registry[name])
0996                 seen.add(name)
0997         # Always include select_tools for fallback
0998         tools.append(SELECT_TOOLS_TOOL)
0999         return tools, scored
1000 
1001     async def _load_recent_dialog(self):
1002         """Load recent dialog from the database — all users, all contexts."""
1003         mcp = MCPClient(self.mcp_url)
1004         messages = []
1005         try:
1006             await mcp.initialize()
1007             result = await mcp.call_tool('swf_get_ai_memory', {
1008                 'username': MEMORY_USERNAME,
1009                 'turns': MEMORY_TURNS,
1010             })
1011             content = result.get('content', [])
1012             text = ''
1013             for item in content:
1014                 if isinstance(item, dict) and 'text' in item:
1015                     text += item['text']
1016             if text:
1017                 data = json.loads(text)
1018                 for item in data.get('items', []):
1019                     messages.append({
1020                         "role": item['role'],
1021                         "content": item['content'],
1022                     })
1023                 logger.info(f"Loaded {len(messages)} memory items")
1024         except Exception:
1025             logger.exception("Failed to load recent dialog")
1026         finally:
1027             await mcp.close()
1028         return messages
1029 
1030     async def _record_exchange(self, question, answer, post_id='', root_id=''):
1031         """Record a Q&A exchange to the unified memory."""
1032         mcp = MCPClient(self.mcp_url)
1033         try:
1034             await mcp.initialize()
1035             for role, content in [('user', question), ('assistant', answer)]:
1036                 await mcp.call_tool('swf_record_ai_memory', {
1037                     'username': MEMORY_USERNAME,
1038                     'session_id': 'mattermost',
1039                     'role': role,
1040                     'content': content,
1041                     'namespace': post_id,
1042                     'project_path': root_id,
1043                 })
1044         except Exception:
1045             logger.exception("Failed to record exchange")
1046         finally:
1047             await mcp.close()
1048 
1049     async def _build_thread_context(self, root_id):
1050         """Fetch full Mattermost thread and format as context.
1051 
1052         Thread replies are not visible in the main channel, so Claude
1053         has no record of them in the session conversation. This provides
1054         the full thread history for thread replies.
1055         """
1056         try:
1057             thread = await asyncio.to_thread(
1058                 self.driver.posts.get_thread, root_id
1059             )
1060             posts = thread.get('posts', {})
1061             order = thread.get('order', [])
1062 
1063             lines = []
1064             for pid in order:
1065                 p = posts.get(pid)
1066                 if not p or not p.get('message', '').strip():
1067                     continue
1068                 speaker = "Bot" if p['user_id'] == self.bot_user_id else "User"
1069                 lines.append(f"{speaker}: {p['message'].strip()}")
1070 
1071             return "\n".join(lines) if lines else None
1072         except Exception:
1073             logger.exception("Failed to fetch thread")
1074             return None
1075 
1076     def start(self):
1077         """Connect to Mattermost and start listening."""
1078         logger.info(f"Connecting to {self.mm_url}...")
1079         self.driver.login()
1080         self.bot_user_id = self.driver.client.userid
1081         logger.info(f"Logged in as user {self.bot_user_id}")
1082 
1083         team = self.driver.teams.get_team_by_name(self.mm_team)
1084         channel = self.driver.channels.get_channel_by_name(
1085             team['id'], self.mm_channel_name
1086         )
1087         self.channel_id = channel['id']
1088 
1089         try:
1090             self.driver.channels.add_user(self.channel_id, options={
1091                 'user_id': self.bot_user_id,
1092             })
1093             logger.info(f"Joined #{self.mm_channel_name}")
1094         except Exception:
1095             logger.info(f"Already a member of #{self.mm_channel_name}")
1096 
1097         logger.info(
1098             f"Listening on #{self.mm_channel_name} "
1099             f"(channel {self.channel_id}) in team {self.mm_team} "
1100             f"(MCP: {self.mcp_url})"
1101         )
1102 
1103         loop = asyncio.get_event_loop()
1104         loop.run_until_complete(self._setup_mcp())
1105         self._load_active_threads()
1106         self.driver.init_websocket(self._handle_event)
1107 
1108     THREADS_STATE_KEY = 'pandabot_active_threads'
1109 
1110     def _load_active_threads(self):
1111         """Load active thread IDs from PersistentState."""
1112         from monitor_app.models import PersistentState
1113         try:
1114             obj = PersistentState.objects.get(id=1)
1115             threads = obj.state_data.get(self.THREADS_STATE_KEY, [])
1116             self._active_threads = set(threads)
1117             logger.info(f"Loaded {len(self._active_threads)} active threads")
1118         except PersistentState.DoesNotExist:
1119             self._active_threads = set()
1120 
1121     def _save_active_threads(self):
1122         """Persist active thread IDs. Keep only the most recent 200."""
1123         from monitor_app.models import PersistentState
1124         threads = list(self._active_threads)[-200:]
1125         self._active_threads = set(threads)
1126         obj, _ = PersistentState.objects.get_or_create(id=1, defaults={'state_data': {}})
1127         obj.state_data[self.THREADS_STATE_KEY] = threads
1128         obj.save()
1129 
1130     async def _handle_event(self, raw):
1131         """WebSocket event handler."""
1132         try:
1133             event = json.loads(raw)
1134         except (json.JSONDecodeError, TypeError):
1135             return
1136 
1137         event_type = event.get('event', '')
1138         if event_type and event_type != 'typing':
1139             logger.debug(f"WS event: {event_type}")
1140 
1141         if event_type != 'posted':
1142             return
1143 
1144         data = event.get('data', {})
1145         post_str = data.get('post')
1146         if not post_str:
1147             return
1148 
1149         try:
1150             post = json.loads(post_str)
1151         except (json.JSONDecodeError, TypeError):
1152             return
1153 
1154         post_channel = post.get('channel_id')
1155         post_user = post.get('user_id')
1156         post_type = post.get('type', '')
1157         logger.debug(
1158             f"Posted: channel={post_channel} user={post_user} "
1159             f"type={post_type} root_id={post.get('root_id', '')}"
1160         )
1161 
1162         channel_type = data.get('channel_type', '')
1163 
1164         if post_user == self.bot_user_id:
1165             logger.debug("Skipping own message")
1166             return
1167 
1168         if post_type:
1169             logger.debug(f"Skipping system message type={post_type}")
1170             return
1171 
1172         # Accept: our channel, a DM, an @mention, or a thread we're in
1173         is_our_channel = (post_channel == self.channel_id)
1174         is_dm = (channel_type == 'D')
1175         mentions_str = data.get('mentions', '')
1176         is_mention = self.bot_user_id and self.bot_user_id in mentions_str
1177         root_id = post.get('root_id', '')
1178         is_active_thread = root_id in self._active_threads
1179         if not is_our_channel and not is_dm and not is_mention and not is_active_thread:
1180             return
1181 
1182         message_text = post.get('message', '').strip()
1183         if not message_text:
1184             return
1185 
1186         post_id = post.get('id')
1187         mm_username = await self._resolve_mm_username(post_user)
1188 
1189         if is_dm:
1190             context_tag = 'DM'
1191         elif is_our_channel:
1192             context_tag = f'#{self.mm_channel_name}'
1193         else:
1194             channel_name = data.get('channel_name', 'unknown')
1195             context_tag = f'#{channel_name}'
1196 
1197         tagged_message = f"[{mm_username} in {context_tag}] {message_text}"
1198         source = 'DM' if is_dm else ('mention' if is_mention and not is_our_channel else 'channel')
1199         logger.info(f"Message from {mm_username} ({source}): {message_text[:100]}")
1200 
1201         asyncio.create_task(self._respond(tagged_message, post_channel, post_id, root_id))
1202 
1203     async def _respond(self, tagged_message, reply_channel, post_id, root_id):
1204         """Process any message — channel, DM, or mention.
1205 
1206         Loads recent dialog from DB, runs Claude, records the exchange.
1207         Serialized via lock so recordings don't interleave.
1208         """
1209         async with self._respond_lock:
1210             messages = await self._load_recent_dialog()
1211             reply, dpid_verified, tool_meta = await self._process_message(messages, tagged_message, root_id)
1212             # Strip any tool metadata Haiku echoed from conversation history
1213             reply = re.sub(r'\n*\*?\(tools (?:suggested|used):[^)]*\)\*?', '', reply)
1214             no_query_warn = ":warning: *This response was not based on a live data query.*"
1215             no_cite_warn = ":warning: *Tool was called live but the Data Provenance ID was not cited in the reply.*"
1216             reply = reply.replace(no_query_warn, "").replace(no_cite_warn, "").rstrip()
1217             if not dpid_verified and not reply.startswith("Sorry,"):
1218                 reply += "\n\n" + (no_cite_warn if tool_meta['used'] else no_query_warn)
1219             # Append tool selection metadata only when tools were used
1220             if tool_meta['used']:
1221                 suggested = ', '.join(tool_meta['suggested']) or 'none'
1222                 used = ', '.join(tool_meta['used'])
1223                 reply += f"\n\n*(tools suggested: {suggested})*\n*(tools used: {used})*"
1224             # Record inside lock so the next load sees this exchange
1225             await self._record_exchange(tagged_message, reply, post_id, root_id)
1226 
1227         await self._post_reply(reply, reply_channel, post_id, root_id)
1228 
1229     async def _render_plot(self, code):
1230         """Execute matplotlib code and return the PNG path, or None on failure."""
1231         with tempfile.TemporaryDirectory() as tmpdir:
1232             plot_path = os.path.join(tmpdir, 'plot.png')
1233             # Rewrite savefig path to our temp location
1234             code = re.sub(
1235                 r"plt\.savefig\([^)]+\)",
1236                 f"plt.savefig('{plot_path}', dpi=150, bbox_inches='tight')",
1237                 code,
1238             )
1239             # Ensure no plt.show()
1240             code = code.replace('plt.show()', '')
1241             # Prepend headless backend
1242             code = "import matplotlib\nmatplotlib.use('Agg')\n" + code
1243 
1244             script_path = os.path.join(tmpdir, 'plot.py')
1245             with open(script_path, 'w') as f:
1246                 f.write(code)
1247 
1248             try:
1249                 # Use sys.executable so the subprocess has the same venv
1250                 import sys
1251                 result = await asyncio.to_thread(
1252                     subprocess.run,
1253                     [sys.executable, script_path],
1254                     capture_output=True, text=True, timeout=30,
1255                     cwd=tmpdir,
1256                 )
1257                 if result.returncode != 0:
1258                     logger.warning(f"Plot script failed: {result.stderr[:500]}")
1259                     return None
1260                 if not os.path.exists(plot_path):
1261                     logger.warning("Plot script ran but produced no image")
1262                     return None
1263                 # Copy out of tmpdir before it's cleaned up
1264                 import shutil
1265                 fd, final_path = tempfile.mkstemp(suffix='.png')
1266                 os.close(fd)
1267                 shutil.copy2(plot_path, final_path)
1268                 return final_path
1269             except subprocess.TimeoutExpired:
1270                 logger.warning("Plot script timed out")
1271                 return None
1272             except Exception:
1273                 logger.exception("Plot execution failed")
1274                 return None
1275 
1276     async def _post_reply(self, reply, reply_channel, post_id, root_id):
1277         # Extract and render any plot code blocks (python-plot tag, or python with savefig)
1278         file_ids = []
1279         plot_match = re.search(r'```python-plot\s*\n(.*?)```', reply, re.DOTALL)
1280         if not plot_match:
1281             # Fallback: detect plain python blocks that contain plt.savefig
1282             m = re.search(r'```python\s*\n(.*?)```', reply, re.DOTALL)
1283             if m and 'plt.savefig' in m.group(1):
1284                 plot_match = m
1285         if plot_match:
1286             plot_code = plot_match.group(1)
1287             logger.info("Detected plot code, rendering...")
1288             plot_path = await self._render_plot(plot_code)
1289             if plot_path:
1290                 try:
1291                     uploaded = await asyncio.to_thread(
1292                         self.driver.files.upload_file,
1293                         reply_channel,
1294                         {'files': ('plot.png', open(plot_path, 'rb'))},
1295                     )
1296                     fid = uploaded['file_infos'][0]['id']
1297                     file_ids.append(fid)
1298                     logger.info(f"Plot uploaded: {fid}")
1299                     # Remove the code block from the message since we have the image
1300                     reply = reply.replace(plot_match.group(0), '*(plot attached)*')
1301                 except Exception:
1302                     logger.exception("Failed to upload plot")
1303                 finally:
1304                     try:
1305                         os.unlink(plot_path)
1306                     except OSError:
1307                         pass
1308 
1309         if len(reply) > MM_POST_LIMIT:
1310             reply = reply[:MM_POST_LIMIT - 20] + '\n\n... (truncated)'
1311 
1312         thread_root = root_id or post_id
1313         try:
1314             logger.info("Posting reply to Mattermost...")
1315             post_options = {
1316                 'channel_id': reply_channel,
1317                 'message': reply,
1318                 'root_id': thread_root,
1319             }
1320             if file_ids:
1321                 post_options['file_ids'] = file_ids
1322             await asyncio.to_thread(
1323                 self.driver.posts.create_post,
1324                 options=post_options,
1325             )
1326             # Track this thread so we respond to follow-ups (persisted)
1327             if thread_root not in self._active_threads:
1328                 self._active_threads.add(thread_root)
1329                 await asyncio.to_thread(self._save_active_threads)
1330             logger.info("Reply posted successfully")
1331         except Exception:
1332             logger.exception("Failed to post reply")
1333 
1334     async def _process_message(self, messages, message_text, root_id):
1335         """Run the Claude conversation loop for one user message.
1336 
1337         Returns (reply_text, dpid_verified).  dpid_verified is True only when
1338         a tool was called AND the LLM cited a matching DPID in its final reply.
1339         """
1340         # Build user message with full thread context if it's a reply
1341         user_content = message_text
1342         thread_context = None
1343         if root_id:
1344             thread_context = await self._build_thread_context(root_id)
1345             if thread_context:
1346                 user_content = (
1347                     f"[Thread conversation so far:\n{thread_context}\n]\n"
1348                     f"New reply: {message_text}"
1349                 )
1350 
1351         messages.append({"role": "user", "content": user_content})
1352 
1353         reply = "Sorry, I encountered an error processing your question."
1354         exchange_dpids = []  # DPIDs generated in this exchange
1355 
1356         mcp = MCPClient(self.mcp_url)
1357         try:
1358             await mcp.initialize()
1359 
1360             # Fallback: if registry is empty (setup failed), load eagerly
1361             if not self._tool_registry:
1362                 tools = await mcp.list_tools()
1363                 for t in tools:
1364                     if t["name"].startswith(BOT_TOOL_PREFIXES):
1365                         at = mcp_tool_to_anthropic(t)
1366                         self._tool_registry[at["name"]] = at
1367 
1368             # Select tools relevant to this message + thread history
1369             active_tools, scored = self._select_tools_for_message(message_text, thread_context)
1370             active_tool_names = {t['name'] for t in active_tools}
1371             suggested_names = [
1372                 f"{name}:{score:.2f}" for name, score in scored
1373             ]
1374             tools_used = []
1375             logger.info(f"Selected {len(active_tools)} tools: {suggested_names}")
1376 
1377             system = self._build_system_prompt()
1378             tool_catalog = self._build_tool_catalog()
1379             system_with_catalog = f"{system}\n\n{tool_catalog}"
1380 
1381             for _round in range(MAX_TOOL_ROUNDS):
1382                 response = await self.claude.beta.messages.create(
1383                     # DO NOT change model without user approval
1384                     model="claude-haiku-4-5-20251001",
1385                     max_tokens=4096,
1386                     cache_control={"type": "ephemeral"},
1387                     system=system_with_catalog,
1388                     tools=active_tools,
1389                     messages=messages,
1390                     betas=["context-management-2025-06-27"],
1391                     context_management={
1392                         "edits": [{
1393                             "type": "clear_tool_uses_20250919",
1394                             "trigger": {
1395                                 "type": "input_tokens",
1396                                 "value": 80000,
1397                             },
1398                             "keep": {"type": "tool_uses", "value": 3},
1399                         }]
1400                     },
1401                 )
1402                 logger.info(
1403                     f"Claude response: stop_reason={response.stop_reason}"
1404                 )
1405 
1406                 if response.stop_reason != "tool_use":
1407                     text_parts = [
1408                         b.text for b in response.content if b.type == "text"
1409                     ]
1410                     reply = "\n".join(text_parts)
1411                     break
1412 
1413                 # Tool use — append intermediate messages for this round
1414                 messages.append(
1415                     {"role": "assistant", "content": response.content}
1416                 )
1417                 tool_results = []
1418                 for block in response.content:
1419                     if block.type != "tool_use":
1420                         continue
1421                     logger.info(f"Tool call: {block.name}({block.input})")
1422                     if block.name not in ('select_tools', 'bot_manage_servers'):
1423                         tools_used.append(block.name)
1424                     try:
1425                         # Virtual tools handled by the bot itself
1426                         if block.name == 'select_tools':
1427                             loaded = []
1428                             for tname in block.input.get('names', []):
1429                                 if tname in self._tool_registry and tname not in active_tool_names:
1430                                     active_tools.append(self._tool_registry[tname])
1431                                     active_tool_names.add(tname)
1432                                     loaded.append(tname)
1433                             result_text = json.dumps({
1434                                 'loaded': loaded,
1435                                 'message': f"Loaded {len(loaded)} tools. They are now available for use.",
1436                             })
1437                             logger.info(f"select_tools loaded: {loaded}")
1438                         elif block.name == 'bot_manage_servers':
1439                             result_text = await self._handle_manage_servers(block.input)
1440                         elif block.name == 'epic_doc_search':
1441                             result_text = await self._doc_handler.search(block.input)
1442                         elif block.name == 'epic_doc_contents':
1443                             result_text = await self._doc_handler.contents(block.input)
1444                         else:
1445                             # Route to the correct MCP server
1446                             if block.name in self._tool_router:
1447                                 mcp_name = self._tool_original_name.get(block.name, block.name)
1448                                 result = await self._tool_router[block.name].call_tool(
1449                                     mcp_name, block.input
1450                                 )
1451                             else:
1452                                 result = await mcp.call_tool(block.name, block.input)
1453                             content = result.get("content", [])
1454                             result_text = ""
1455                             for item in content:
1456                                 if isinstance(item, dict) and "text" in item:
1457                                     result_text += item["text"]
1458                         # Assign DPID and stamp the result
1459                         dpid = self._generate_dpid()
1460                         exchange_dpids.append(dpid)
1461                         await self._record_dpid(dpid, block.name, block.input)
1462                         result_text = f"[DPID:{dpid}]\n{result_text}"
1463                         if len(result_text) > MAX_RESULT_LEN:
1464                             result_text = (
1465                                 result_text[:MAX_RESULT_LEN]
1466                                 + '\n... (truncated)'
1467                             )
1468                     except Exception as e:
1469                         logger.exception(f"MCP tool {block.name} failed")
1470                         result_text = json.dumps({"error": str(e)})
1471 
1472                     tool_results.append({
1473                         "type": "tool_result",
1474                         "tool_use_id": block.id,
1475                         "content": result_text,
1476                     })
1477                 messages.append(
1478                     {"role": "user", "content": tool_results}
1479                 )
1480             else:
1481                 reply = (
1482                     "I hit the maximum number of tool calls. "
1483                     "Please try a more specific question."
1484                 )
1485 
1486             logger.info(f"Got reply: {len(reply)} chars")
1487 
1488         except Exception:
1489             logger.exception("ask_claude failed")
1490         finally:
1491             await mcp.close()
1492 
1493         # Verify: did the LLM cite a DPID that was actually generated?
1494         dpid_verified = False
1495         if exchange_dpids:
1496             # Find cited DPIDs: line must mention trigger word AND contain a known DPID
1497             generated = set(exchange_dpids)
1498             generated_lower = {d.lower() for d in generated}
1499             cited = set()
1500             for line in reply.split('\n'):
1501                 if re.search(r'(?i)dpid|provenance', line):
1502                     for h in re.findall(r'\b([A-Fa-f0-9]{8})\b', line):
1503                         if h.upper() in generated or h.lower() in generated_lower:
1504                             cited.add(h.upper())
1505             matched = cited & set(exchange_dpids)
1506             if matched:
1507                 dpid_verified = True
1508                 logger.info(f"DPID verified: {matched}")
1509             else:
1510                 logger.warning(
1511                     f"Tool called but no valid DPID cited. "
1512                     f"Generated: {exchange_dpids}, cited: {cited}"
1513                 )
1514 
1515         # Strip DPID citations from reply — user doesn't need to see them
1516         # Remove lines that are DPID citations (trigger word + known DPID on same line)
1517         generated_upper = {d.upper() for d in exchange_dpids} if exchange_dpids else set()
1518         def _is_dpid_citation(line):
1519             if not re.search(r'(?i)dpid|provenance', line):
1520                 return False
1521             for h in re.findall(r'\b([A-Fa-f0-9]{8})\b', line):
1522                 if h.upper() in generated_upper:
1523                     return True
1524             return False
1525         reply = '\n'.join(
1526             line for line in reply.split('\n')
1527             if not _is_dpid_citation(line)
1528         ).strip()
1529 
1530         # Deduplicate tools_used preserving order
1531         seen = set()
1532         tools_used = [t for t in tools_used if not (t in seen or seen.add(t))]
1533 
1534         tool_meta = {
1535             'suggested': suggested_names,
1536             'used': tools_used,
1537         }
1538         return reply, dpid_verified, tool_meta