File indexing completed on 2026-04-27 07:41:42
0001 """
0002 PanDA Mattermost bot — answers production monitoring questions using Claude with tool use.
0003
0004 Connects to Mattermost via WebSocket, listens for messages in a target channel,
0005 and responds using Claude with PanDA monitoring tools discovered via MCP.
0006
0007 On each question, loads recent dialog from the database — all users, all
0008 contexts — giving the bot full awareness of recent conversations. One soft
0009 privacy rule: don't reveal DM content to others in the channel.
0010
0011 MCP transport: HTTP POST (JSON-RPC) to the Django MCP endpoint — the same
0012 transport Claude Code uses. No SSE, no GET streams, no subprocesses.
0013 """
0014
0015 import asyncio
0016 import hashlib
0017 import json
0018 import logging
0019 import os
0020 import re
0021 import subprocess
0022 import sys
0023 import tempfile
0024 import time
0025 from datetime import datetime, timezone
0026
0027
0028
0029 try:
0030 __import__("pysqlite3")
0031 sys.modules["sqlite3"] = sys.modules.pop("pysqlite3")
0032 except ImportError:
0033 pass
0034
0035 import anthropic
0036 import httpx
0037 import numpy as np
0038 from mattermostdriver import Driver
0039 from sentence_transformers import SentenceTransformer
0040
0041 logger = logging.getLogger('panda_bot')
0042
0043 MAX_TOOL_ROUNDS = 10
0044 MAX_RESULT_LEN = 10000
0045 MM_POST_LIMIT = 16383
0046 MEMORY_TURNS = 30
0047 MEMORY_USERNAME = 'pandabot'
0048 MCP_URL = os.environ.get(
0049 'MCP_URL', 'https://pandaserver02.sdcc.bnl.gov/swf-monitor/mcp/'
0050 )
0051 BOT_TOOL_PREFIXES = ('panda_', 'pcs_', 'epic_')
0052
0053
0054
0055 STDIO_MCP_SERVERS = []
0056
0057 _xrootd_server = os.environ.get('XROOTD_MCP_SERVER')
0058 if _xrootd_server:
0059 STDIO_MCP_SERVERS.append({
0060 'name': 'xrootd',
0061 'source': 'github.com/eic/xrootd-mcp-server',
0062 'command': [
0063 os.environ.get('NODE_PATH', '/eic/u/wenauseic/.nvm/versions/node/v22.17.0/bin/node'),
0064 '/data/wenauseic/github/xrootd-mcp-server/build/src/index.js',
0065 ],
0066 'env': {
0067 'XROOTD_SERVER': os.environ.get('XROOTD_SERVER', 'root://dtn-eic.jlab.org'),
0068 'XROOTD_BASE_DIR': os.environ.get('XROOTD_BASE_DIR', '/volatile/eic/EPIC'),
0069 },
0070 'repo_dir': '/data/wenauseic/github/xrootd-mcp-server',
0071 'update_commands': [
0072 'export PATH=/eic/u/wenauseic/.nvm/versions/node/v22.17.0/bin:$PATH && cd /data/wenauseic/github/xrootd-mcp-server && git pull && npm install && npm run build',
0073 ],
0074 })
0075
0076 _github_token = os.environ.get('GITHUB_PERSONAL_ACCESS_TOKEN')
0077 if _github_token:
0078 STDIO_MCP_SERVERS.append({
0079 'name': 'github',
0080 'source': 'github.com/github/github-mcp-server',
0081 'command': [
0082 '/data/wenauseic/github/github-mcp-server/github-mcp-server', 'stdio',
0083 '--toolsets=issues,pull_requests,actions,code_security,discussions',
0084 ],
0085 'env': {
0086 'GITHUB_PERSONAL_ACCESS_TOKEN': _github_token,
0087 },
0088 'repo_dir': '/data/wenauseic/github/github-mcp-server',
0089 'update_commands': [
0090 'cd /data/wenauseic/github/github-mcp-server && git pull && PATH=$PATH:/usr/local/go/bin go build -o github-mcp-server ./cmd/github-mcp-server',
0091 ],
0092 })
0093
0094 _zenodo_key = os.environ.get('ZENODO_API_KEY')
0095 if _zenodo_key:
0096 STDIO_MCP_SERVERS.append({
0097 'name': 'zenodo',
0098 'source': 'github.com/eic/zenodo-mcp-server',
0099 'command': [
0100 os.environ.get('NODE_PATH', '/eic/u/wenauseic/.nvm/versions/node/v22.17.0/bin/node'),
0101 '/data/wenauseic/github/zenodo-mcp-server/build/src/index.js',
0102 ],
0103 'env': {
0104 'ZENODO_API_KEY': _zenodo_key,
0105 },
0106 'repo_dir': '/data/wenauseic/github/zenodo-mcp-server',
0107 'update_commands': [
0108 'export PATH=/eic/u/wenauseic/.nvm/versions/node/v22.17.0/bin:$PATH && cd /data/wenauseic/github/zenodo-mcp-server && git pull && npm install && npm run build',
0109 ],
0110 })
0111
0112 STDIO_MCP_SERVERS.append({
0113 'name': 'lxr',
0114 'source': 'github.com/BNLNPPS/lxr-mcp-server',
0115 'command': [
0116 os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0117 'swf-testbed/.venv/bin/python'),
0118 '/data/wenauseic/github/lxr-mcp-server/lxr_mcp_server.py',
0119 ],
0120 'env': {},
0121 'repo_dir': '/data/wenauseic/github/lxr-mcp-server',
0122 'update_commands': [
0123 'cd /data/wenauseic/github/lxr-mcp-server && git pull',
0124 ],
0125 })
0126
0127 STDIO_MCP_SERVERS.append({
0128 'name': 'uproot',
0129 'source': 'github.com/eic/uproot-mcp-server',
0130 'command': [
0131 os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0132 'swf-testbed/.venv/bin/uproot-mcp-server'),
0133 ],
0134 'env': {},
0135 'repo_dir': '/data/wenauseic/github/uproot-mcp-server',
0136 'update_commands': [
0137 'cd /data/wenauseic/github/uproot-mcp-server && git pull && '
0138 + os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0139 'swf-testbed/.venv/bin/pip')
0140 + ' install -e ".[xrootd]"',
0141 ],
0142 })
0143
0144 STDIO_MCP_SERVERS.append({
0145 'name': 'jlab-rucio',
0146 'prefix': 'jlab_rucio_',
0147 'source': 'github.com/BNLNPPS/rucio-eic-mcp-server',
0148 'command': [
0149 os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0150 'swf-testbed/.venv/bin/python'),
0151 '/data/wenauseic/github/rucio-eic-mcp-server/rucio_eic_mcp_server.py',
0152 ],
0153 'env': {
0154 'RUCIO_URL': 'https://rucio-server.jlab.org:443',
0155 'RUCIO_AUTH_TYPE': 'userpass',
0156 'RUCIO_ACCOUNT': 'eicread',
0157 'RUCIO_USERNAME': 'eicread',
0158 'RUCIO_PASSWORD': 'eicread',
0159 'RUCIO_CA_BUNDLE': 'false',
0160 'TOKEN_FILE_PATH': '/tmp/rucio_eic_jlab_token.txt',
0161 },
0162 'repo_dir': '/data/wenauseic/github/rucio-eic-mcp-server',
0163 'update_commands': [
0164 'cd /data/wenauseic/github/rucio-eic-mcp-server && git pull && '
0165 + os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0166 'swf-testbed/.venv/bin/pip')
0167 + ' install -e .',
0168 ],
0169 })
0170
0171 STDIO_MCP_SERVERS.append({
0172 'name': 'bnl-rucio',
0173 'prefix': 'bnl_rucio_',
0174 'source': 'github.com/BNLNPPS/rucio-eic-mcp-server',
0175 'command': [
0176 os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0177 'swf-testbed/.venv/bin/python'),
0178 '/data/wenauseic/github/rucio-eic-mcp-server/rucio_eic_mcp_server.py',
0179 ],
0180 'env': {
0181 'RUCIO_URL': 'https://nprucio01.sdcc.bnl.gov:443',
0182 'RUCIO_AUTH_TYPE': 'x509',
0183 'RUCIO_ACCOUNT': 'panda',
0184 'RUCIO_VO': 'eic',
0185 'X509_USER_PROXY': '/data/wenauseic/longproxy-for-rucio',
0186 'TOKEN_FILE_PATH': '/tmp/rucio_eic_bnl_token.txt',
0187 },
0188 'repo_dir': '/data/wenauseic/github/rucio-eic-mcp-server',
0189 'update_commands': [
0190 'cd /data/wenauseic/github/rucio-eic-mcp-server && git pull && '
0191 + os.path.join(os.environ.get('SWF_HOME', '/data/wenauseic/github'),
0192 'swf-testbed/.venv/bin/pip')
0193 + ' install -e .',
0194 ],
0195 })
0196
0197
0198 BOT_MANAGE_SERVERS_TOOL = {
0199 "name": "bot_manage_servers",
0200 "description": (
0201 "List or update the bot's MCP servers. "
0202 "action='list' shows all servers and which are updatable. "
0203 "action='update', server_name='xrootd' pulls latest code, rebuilds, and restarts that server."
0204 ),
0205 "input_schema": {
0206 "type": "object",
0207 "properties": {
0208 "action": {
0209 "type": "string",
0210 "enum": ["list", "update"],
0211 "description": "Action to perform.",
0212 },
0213 "server_name": {
0214 "type": "string",
0215 "description": "Server to update (required for action='update').",
0216 },
0217 },
0218 "required": ["action"],
0219 },
0220 }
0221
0222
0223
0224 CHROMA_PATH = "/data/wenauseic/github/swf-monitor/chroma_db"
0225 CHROMA_COLLECTION = "bamboo_docs"
0226
0227 EPIC_DOC_SEARCH_TOOL = {
0228 "name": "epic_doc_search",
0229 "description": (
0230 "Search ePIC documentation by natural-language query (semantic vector search). "
0231 "Covers SWF testbed, SWF monitor, Bamboo/PanDA, EICrecon, containers, ePIC production, "
0232 "EIC master docs, afterburner, eic-shell, and more. "
0233 "Use for conceptual 'how does X work?' questions about the software and experiment."
0234 ),
0235 "input_schema": {
0236 "type": "object",
0237 "properties": {
0238 "query": {
0239 "type": "string",
0240 "description": "Natural-language question (e.g. 'how does fast processing work?').",
0241 },
0242 "top_k": {
0243 "type": "integer",
0244 "description": "Number of results (default 5, max 20).",
0245 "default": 5,
0246 },
0247 },
0248 "required": ["query"],
0249 },
0250 }
0251
0252 EPIC_DOC_CONTENTS_TOOL = {
0253 "name": "epic_doc_contents",
0254 "description": (
0255 "Show what's in epicdoc — table of contents of all indexed ePIC documentation. "
0256 "Lists every source and document with chunk counts. Use to discover what documentation "
0257 "is searchable via epic_doc_search."
0258 ),
0259 "input_schema": {
0260 "type": "object",
0261 "properties": {},
0262 },
0263 }
0264
0265
0266 class DocSearchHandler:
0267 """Handles epic_doc_search and epic_doc_contents using a ChromaDB vector store.
0268
0269 Lazy-loads ChromaDB on first call, caches the collection handle.
0270 Runs in the long-lived bot process so the embedding model loads once.
0271 """
0272
0273 def __init__(self):
0274 self._collection = None
0275 self._init_error = None
0276
0277 def _ensure_collection(self):
0278 """Lazy-load ChromaDB collection. Returns error string or None."""
0279 if self._collection is not None:
0280 return None
0281 if self._init_error is not None:
0282 return self._init_error
0283
0284 try:
0285 import chromadb
0286 from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
0287 except ImportError:
0288 self._init_error = "chromadb not installed"
0289 return self._init_error
0290
0291 if not os.path.exists(CHROMA_PATH):
0292 self._init_error = f"ChromaDB path not found: {CHROMA_PATH}"
0293 return self._init_error
0294
0295 try:
0296 os.environ.setdefault("HF_HOME", "/opt/swf-monitor/shared/hf_cache")
0297 ef = SentenceTransformerEmbeddingFunction("all-MiniLM-L6-v2")
0298 client = chromadb.PersistentClient(path=CHROMA_PATH)
0299 self._collection = client.get_collection(CHROMA_COLLECTION, embedding_function=ef)
0300 logger.info(
0301 f"DocSearch: loaded collection '{CHROMA_COLLECTION}' "
0302 f"({self._collection.count()} chunks)"
0303 )
0304 except Exception as e:
0305 self._init_error = f"ChromaDB init failed: {e}"
0306 return self._init_error
0307 return None
0308
0309 async def search(self, arguments: dict) -> str:
0310 """Handle epic_doc_search."""
0311 query = str(arguments.get("query", "")).strip()
0312 if not query:
0313 return json.dumps({"error": "query is required"})
0314
0315 top_k = max(1, min(int(arguments.get("top_k", 5)), 20))
0316
0317 err = await asyncio.to_thread(self._ensure_collection)
0318 if err:
0319 return json.dumps({"error": err})
0320
0321 try:
0322 raw = await asyncio.to_thread(
0323 self._collection.query,
0324 query_texts=[query], n_results=top_k,
0325 include=["documents", "metadatas", "distances"],
0326 )
0327 except Exception as e:
0328 return json.dumps({"error": f"ChromaDB query failed: {e}"})
0329
0330 results = []
0331 docs = (raw.get("documents") or [[]])[0]
0332 metas = (raw.get("metadatas") or [[]])[0]
0333 dists = (raw.get("distances") or [[]])[0]
0334 for doc, meta, dist in zip(docs, metas, dists):
0335 score = max(0, (1 - dist) * 100)
0336 results.append({
0337 "score": round(score),
0338 "source": meta.get("source", "?"),
0339 "file": meta.get("rel_path", "?"),
0340 "excerpt": doc[:1500],
0341 })
0342 return json.dumps({"query": query, "results": results})
0343
0344 async def contents(self, arguments: dict) -> str:
0345 """Handle epic_doc_contents."""
0346 err = await asyncio.to_thread(self._ensure_collection)
0347 if err:
0348 return json.dumps({"error": err})
0349
0350 try:
0351 all_meta = await asyncio.to_thread(
0352 self._collection.get, include=["metadatas"],
0353 )
0354 except Exception as e:
0355 return json.dumps({"error": f"ChromaDB get failed: {e}"})
0356
0357 sources = {}
0358 for meta in all_meta.get("metadatas", []):
0359 src = meta.get("source", "unknown")
0360 rel = meta.get("rel_path", "?")
0361 total = meta.get("total_chunks", 1)
0362 if src not in sources:
0363 sources[src] = {}
0364 sources[src][rel] = total
0365
0366 toc = {}
0367 total_chunks = 0
0368 total_files = 0
0369 for src, files in sorted(sources.items()):
0370 file_list = []
0371 for rel, chunks in sorted(files.items()):
0372 file_list.append({"file": rel, "chunks": chunks})
0373 total_chunks += chunks
0374 total_files += 1
0375 toc[src] = file_list
0376
0377 return json.dumps({
0378 "summary": f"{total_files} documents, {total_chunks} chunks across {len(toc)} sources",
0379 "sources": toc,
0380 })
0381
0382
0383 SYSTEM_PROMPT_FILE = os.getenv(
0384 'PANDABOT_SYSTEM_PROMPT',
0385 os.path.join(os.path.dirname(__file__), 'system_prompt.txt'),
0386 )
0387
0388
0389 def _load_system_preamble():
0390 """Read system prompt from file, fresh on every call."""
0391 try:
0392 with open(SYSTEM_PROMPT_FILE) as f:
0393 return f.read()
0394 except FileNotFoundError:
0395 return "You are the PanDA bot for the ePIC experiment."
0396
0397
0398 class MCPClient:
0399 """Minimal MCP client using HTTP POST only — no SSE, no GET streams."""
0400
0401 def __init__(self, url: str):
0402 self.url = url
0403 self.session_id = None
0404 self._request_id = 0
0405 self._http = httpx.AsyncClient(timeout=60)
0406
0407 async def _post(self, method: str, params: dict | None = None):
0408 self._request_id += 1
0409 body = {"jsonrpc": "2.0", "id": self._request_id, "method": method}
0410 if params:
0411 body["params"] = params
0412 headers = {"Content-Type": "application/json", "Accept": "application/json"}
0413 if self.session_id:
0414 headers["Mcp-Session-Id"] = self.session_id
0415 resp = await self._http.post(self.url, json=body, headers=headers)
0416 resp.raise_for_status()
0417 if "Mcp-Session-Id" in resp.headers:
0418 self.session_id = resp.headers["Mcp-Session-Id"]
0419 return resp.json()
0420
0421 async def initialize(self):
0422 resp = await self._post("initialize", {
0423 "protocolVersion": "2025-03-26",
0424 "capabilities": {},
0425 "clientInfo": {"name": "panda-bot", "version": "1.0"},
0426 })
0427 self.server_instructions = (
0428 resp.get("result", {}).get("instructions", "")
0429 )
0430 return resp
0431
0432 async def list_tools(self):
0433 result = await self._post("tools/list")
0434 return result.get("result", {}).get("tools", [])
0435
0436 async def call_tool(self, name: str, arguments: dict):
0437 result = await self._post("tools/call", {
0438 "name": name, "arguments": arguments,
0439 })
0440 return result.get("result", {})
0441
0442 async def close(self):
0443 await self._http.aclose()
0444
0445
0446 class StdioMCPClient:
0447 """MCP client for subprocess-based servers using stdio (stdin/stdout JSON-RPC)."""
0448
0449 def __init__(self, name: str, command: list, env: dict = None, args: list = None):
0450 self.name = name
0451 self.command = command + (args or [])
0452 self.env = {**os.environ, **(env or {})}
0453 self._request_id = 0
0454 self._process = None
0455 self.server_instructions = ""
0456
0457 async def start(self):
0458 """Launch the subprocess."""
0459 self._process = await asyncio.create_subprocess_exec(
0460 *self.command,
0461 stdin=asyncio.subprocess.PIPE,
0462 stdout=asyncio.subprocess.PIPE,
0463 stderr=asyncio.subprocess.PIPE,
0464 env=self.env,
0465 limit=1024 * 1024,
0466 )
0467 logger.info(f"Stdio MCP '{self.name}' started (pid {self._process.pid})")
0468
0469 async def _request(self, method: str, params: dict | None = None):
0470 if not self._process or self._process.returncode is not None:
0471 raise RuntimeError(f"Stdio MCP '{self.name}' not running")
0472 self._request_id += 1
0473 req_id = self._request_id
0474 body = {"jsonrpc": "2.0", "id": req_id, "method": method}
0475 if params:
0476 body["params"] = params
0477 line = json.dumps(body) + "\n"
0478 self._process.stdin.write(line.encode())
0479 await self._process.stdin.drain()
0480
0481
0482
0483 while True:
0484 resp_line = await asyncio.wait_for(
0485 self._process.stdout.readline(), timeout=60
0486 )
0487 if not resp_line:
0488 raise RuntimeError(f"Stdio MCP '{self.name}' closed stdout")
0489 msg = json.loads(resp_line)
0490 if "id" in msg and msg["id"] == req_id:
0491 return msg
0492
0493 logger.debug(f"Stdio MCP '{self.name}' skipped: {msg.get('method', '?')}")
0494
0495 async def initialize(self):
0496 resp = await self._request("initialize", {
0497 "protocolVersion": "2025-03-26",
0498 "capabilities": {},
0499 "clientInfo": {"name": "panda-bot", "version": "1.0"},
0500 })
0501 self.server_instructions = (
0502 resp.get("result", {}).get("instructions", "")
0503 )
0504
0505 notif = {"jsonrpc": "2.0", "method": "notifications/initialized"}
0506 self._process.stdin.write((json.dumps(notif) + "\n").encode())
0507 await self._process.stdin.drain()
0508 return resp
0509
0510 async def list_tools(self):
0511 result = await self._request("tools/list")
0512 return result.get("result", {}).get("tools", [])
0513
0514 async def call_tool(self, name: str, arguments: dict):
0515 result = await self._request("tools/call", {
0516 "name": name, "arguments": arguments,
0517 })
0518 return result.get("result", {})
0519
0520 async def close(self):
0521 if self._process and self._process.returncode is None:
0522 self._process.stdin.close()
0523 try:
0524 await asyncio.wait_for(self._process.wait(), timeout=5)
0525 except asyncio.TimeoutError:
0526 self._process.kill()
0527 logger.info(f"Stdio MCP '{self.name}' stopped")
0528
0529
0530 def mcp_tool_to_anthropic(tool):
0531 """Convert an MCP tool definition to Anthropic Messages API format."""
0532 return {
0533 "name": tool["name"],
0534 "description": tool.get("description", ""),
0535 "input_schema": tool["inputSchema"],
0536 }
0537
0538
0539 SELECT_TOOLS_TOOL = {
0540 "name": "select_tools",
0541 "description": (
0542 "Load additional tools by name from the tool catalog. "
0543 "Call this when the pre-loaded tools don't cover what you need. "
0544 "The tool catalog is listed in your system prompt."
0545 ),
0546 "input_schema": {
0547 "type": "object",
0548 "properties": {
0549 "names": {
0550 "type": "array",
0551 "items": {"type": "string"},
0552 "description": "Tool names to load from the catalog.",
0553 },
0554 },
0555 "required": ["names"],
0556 },
0557 }
0558
0559
0560 TOP_K_TOOLS = 8
0561
0562
0563 class ToolSelector:
0564 """Selects relevant tools for a user message via semantic similarity.
0565
0566 At startup, embeds all tool descriptions into vectors. Tool names are
0567 prefixed with their MCP server name for embedding (e.g. "github:get_job_logs")
0568 so the model can distinguish tools from different domains. Returned names
0569 are unprefixed (the actual tool name for dispatch).
0570 """
0571
0572 def __init__(self):
0573 self._model = SentenceTransformer('all-MiniLM-L6-v2')
0574 self._tool_names: list[str] = []
0575 self._tool_embeddings: np.ndarray | None = None
0576
0577 def build_index(self, tool_registry: dict[str, dict], server_map: dict[str, str]):
0578 """Embed all tool descriptions with server-prefixed names.
0579
0580 Args:
0581 tool_registry: tool_name → Anthropic tool dict
0582 server_map: tool_name → server name (e.g. 'github', 'xrootd', 'swf-monitor')
0583 """
0584 self._tool_names = []
0585 texts = []
0586 for name, tool in tool_registry.items():
0587 self._tool_names.append(name)
0588 server = server_map.get(name, 'unknown')
0589 texts.append(f"{server}:{name}: {tool['description']}")
0590 self._tool_embeddings = self._model.encode(texts, normalize_embeddings=True)
0591 logger.info(f"ToolSelector: indexed {len(self._tool_names)} tools")
0592
0593 def select(self, message: str, top_k: int = TOP_K_TOOLS) -> list[tuple[str, float]]:
0594 """Return top-K tool names with scores, ranked by relevance."""
0595 if self._tool_embeddings is None or len(self._tool_names) == 0:
0596 return []
0597 msg_embedding = self._model.encode(message, normalize_embeddings=True)
0598 scores = self._tool_embeddings @ msg_embedding
0599 top_indices = np.argsort(scores)[-top_k:][::-1]
0600 return [(self._tool_names[i], float(scores[i])) for i in top_indices]
0601
0602
0603 class PandaBot:
0604 """Mattermost bot that answers PanDA production questions via Claude.
0605
0606 On each message, loads recent dialog from the database — all users,
0607 all contexts — so the bot has full awareness of the community.
0608 """
0609
0610 def __init__(self):
0611 self.mm_url = os.environ.get('MATTERMOST_URL', 'chat.epic-eic.org')
0612 self.mm_token = os.environ['MATTERMOST_TOKEN']
0613 self.mm_team = os.environ.get('MATTERMOST_TEAM', 'main')
0614 self.mm_channel_name = os.environ.get('MATTERMOST_CHANNEL', 'pandabot')
0615 self.mcp_url = MCP_URL
0616
0617 self.claude = anthropic.AsyncAnthropic()
0618
0619 self.driver = Driver({
0620 'url': self.mm_url,
0621 'token': self.mm_token,
0622 'scheme': 'https',
0623 'port': 443,
0624 })
0625
0626 self.bot_user_id = None
0627 self.channel_id = None
0628 self.system_prompt = _load_system_preamble()
0629 self.anthropic_tools = []
0630 self._tool_registry: dict[str, dict] = {}
0631 self._tool_server_map: dict[str, str] = {}
0632 self._tool_router: dict[str, object] = {}
0633 self._tool_original_name: dict[str, str] = {}
0634 self._stdio_clients: list[StdioMCPClient] = []
0635 self._tool_selector = ToolSelector()
0636 self._doc_handler = DocSearchHandler()
0637 self._respond_lock = asyncio.Lock()
0638 self._active_threads = set()
0639 self._mm_user_cache: dict[str, str] = {}
0640
0641 async def _resolve_mm_username(self, mm_user_id):
0642 """Look up Mattermost username from user ID, with caching."""
0643 if mm_user_id in self._mm_user_cache:
0644 return self._mm_user_cache[mm_user_id]
0645 try:
0646 user = await asyncio.to_thread(
0647 self.driver.users.get_user, mm_user_id
0648 )
0649 username = user.get('username', '')
0650 self._mm_user_cache[mm_user_id] = username
0651 return username
0652 except Exception:
0653 logger.exception(f"Failed to resolve user {mm_user_id}")
0654 return ''
0655
0656 def _build_system_prompt(self):
0657 """System prompt — re-read from file on every message so edits take effect live."""
0658 now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
0659 preamble = _load_system_preamble()
0660 instructions = getattr(self, '_server_instructions', '')
0661 prompt = preamble
0662 if instructions:
0663 prompt += "\n" + instructions
0664 return f"Current date and time: {now}\n\n{prompt}"
0665
0666 @staticmethod
0667 async def _git_version(repo_dir):
0668 """Get short version string from a git repo: hash + date + tag if any."""
0669 try:
0670 proc = await asyncio.create_subprocess_exec(
0671 'git', 'log', '-1', '--format=%h %ci',
0672 cwd=repo_dir,
0673 stdout=asyncio.subprocess.PIPE,
0674 stderr=asyncio.subprocess.PIPE,
0675 )
0676 stdout, _ = await proc.communicate()
0677 parts = stdout.decode().strip().split(' ', 1)
0678 commit_hash = parts[0] if parts else '?'
0679 commit_date = parts[1][:10] if len(parts) > 1 else ''
0680
0681 proc2 = await asyncio.create_subprocess_exec(
0682 'git', 'describe', '--tags', '--always',
0683 cwd=repo_dir,
0684 stdout=asyncio.subprocess.PIPE,
0685 stderr=asyncio.subprocess.PIPE,
0686 )
0687 stdout2, _ = await proc2.communicate()
0688 tag = stdout2.decode().strip()
0689 return f"{tag} ({commit_date})" if tag != commit_hash else f"{commit_hash} ({commit_date})"
0690 except Exception:
0691 return 'unknown'
0692
0693 async def _handle_manage_servers(self, arguments):
0694 """Handle the bot_manage_servers virtual tool."""
0695 action = arguments.get('action', 'list')
0696
0697 if action == 'list':
0698 lines = [
0699 "POST THIS TABLE EXACTLY AS-IS — do not reformat or omit columns:",
0700 "",
0701 "| Server | Type | Version | Updatable |",
0702 "| --- | --- | --- | --- |",
0703 ]
0704 swf_ver = await self._git_version('/data/wenauseic/github/swf-monitor')
0705 lines.append(f"| swf-monitor | HTTP (PanDA, PCS, memory) | {swf_ver} | no |")
0706 for cfg in STDIO_MCP_SERVERS:
0707 ver = await self._git_version(cfg['repo_dir']) if cfg.get('repo_dir') else '?'
0708 upd = 'yes' if cfg.get('update_commands') else 'no'
0709 lines.append(f"| {cfg['name']} | stdio ({cfg.get('source', '')}) | {ver} | {upd} |")
0710 return '\n'.join(lines)
0711
0712 if action == 'update':
0713 name = arguments.get('server_name', '')
0714 cfg = next((s for s in STDIO_MCP_SERVERS if s['name'] == name), None)
0715 if not cfg:
0716 return json.dumps({'error': f"Unknown server '{name}'"})
0717 if not cfg.get('update_commands'):
0718 return json.dumps({'error': f"Server '{name}' is not updatable"})
0719
0720
0721 output_lines = []
0722 for cmd in cfg['update_commands']:
0723 logger.info(f"Updating '{name}': {cmd}")
0724 proc = await asyncio.create_subprocess_shell(
0725 cmd,
0726 stdout=asyncio.subprocess.PIPE,
0727 stderr=asyncio.subprocess.STDOUT,
0728 )
0729 stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=120)
0730 output_lines.append(stdout.decode().strip())
0731 if proc.returncode != 0:
0732 return json.dumps({
0733 'error': f"Update command failed (exit {proc.returncode})",
0734 'output': '\n'.join(output_lines),
0735 })
0736
0737
0738 old_client = next((c for c in self._stdio_clients if c.name == name), None)
0739 if old_client:
0740
0741 old_tool_names = {
0742 t for t, c in self._tool_router.items() if c is old_client
0743 }
0744 for t in old_tool_names:
0745 del self._tool_router[t]
0746 self.anthropic_tools = [
0747 t for t in self.anthropic_tools if t['name'] not in old_tool_names
0748 ]
0749 self._stdio_clients.remove(old_client)
0750 await old_client.close()
0751
0752
0753 try:
0754 client = StdioMCPClient(
0755 name=cfg['name'],
0756 command=cfg['command'],
0757 env=cfg.get('env'),
0758 )
0759 await client.start()
0760 await client.initialize()
0761 tools = await client.list_tools()
0762 prefix = cfg.get('prefix', '')
0763 for t in tools:
0764 at = mcp_tool_to_anthropic(t)
0765 original_name = at["name"]
0766 if prefix:
0767 at["name"] = f"{prefix}{original_name}"
0768 self.anthropic_tools.append(at)
0769 self._tool_router[at["name"]] = client
0770 self._tool_original_name[at["name"]] = original_name
0771 self._stdio_clients.append(client)
0772 return json.dumps({
0773 'success': True,
0774 'server': name,
0775 'tools_count': len(tools),
0776 'update_output': '\n'.join(output_lines),
0777 })
0778 except Exception as e:
0779 return json.dumps({'error': f"Restart failed: {e}"})
0780
0781 return json.dumps({'error': f"Unknown action '{action}'"})
0782
0783 @staticmethod
0784 def _generate_dpid():
0785 """Generate a short unique Data Provenance ID."""
0786 raw = f"{time.time():.6f}-{os.getpid()}"
0787 return hashlib.sha256(raw.encode()).hexdigest()[:8].upper()
0788
0789 async def _record_dpid(self, dpid, tool_name, tool_args):
0790 """Record a DPID to the database."""
0791 from monitor_app.models import DataProvenance
0792 try:
0793 await asyncio.to_thread(
0794 DataProvenance.objects.create,
0795 dpid=dpid, tool_name=tool_name, tool_args=tool_args,
0796 )
0797 except Exception:
0798 logger.exception(f"Failed to record DPID:{dpid}")
0799
0800 async def _setup_mcp(self):
0801 """Discover tools from all MCP servers (HTTP + stdio).
0802
0803 Builds a tool registry (full schemas) and a semantic index for
0804 progressive tool loading. Tools are selected per-message based
0805 on relevance rather than loaded all at once.
0806 """
0807
0808 mcp = MCPClient(self.mcp_url)
0809 try:
0810 await mcp.initialize()
0811 tools = await mcp.list_tools()
0812 for t in tools:
0813 if t["name"].startswith(BOT_TOOL_PREFIXES):
0814 at = mcp_tool_to_anthropic(t)
0815 self._tool_registry[at["name"]] = at
0816 self._tool_server_map[at["name"]] = "swf-monitor"
0817 logger.info(f"HTTP MCP: {len(self._tool_registry)} tools")
0818 if mcp.server_instructions:
0819 self._server_instructions = mcp.server_instructions
0820 except Exception:
0821 logger.exception("Failed HTTP MCP setup — will retry on first message")
0822 finally:
0823 await mcp.close()
0824
0825
0826 for server_cfg in STDIO_MCP_SERVERS:
0827 try:
0828 client = StdioMCPClient(
0829 name=server_cfg['name'],
0830 command=server_cfg['command'],
0831 env=server_cfg.get('env'),
0832 args=server_cfg.get('args'),
0833 )
0834 await client.start()
0835 await client.initialize()
0836 tools = await client.list_tools()
0837 prefix = server_cfg.get('prefix', '')
0838 for t in tools:
0839 at = mcp_tool_to_anthropic(t)
0840 original_name = at["name"]
0841 if prefix:
0842 at["name"] = f"{prefix}{original_name}"
0843 self._tool_registry[at["name"]] = at
0844 self._tool_server_map[at["name"]] = server_cfg['name']
0845 self._tool_router[at["name"]] = client
0846 self._tool_original_name[at["name"]] = original_name
0847 self._stdio_clients.append(client)
0848 logger.info(
0849 f"Stdio MCP '{client.name}': {len(tools)} tools"
0850 )
0851 except Exception:
0852 logger.exception(
0853 f"Failed to start stdio MCP '{server_cfg['name']}'"
0854 )
0855
0856
0857 self._tool_registry['bot_manage_servers'] = BOT_MANAGE_SERVERS_TOOL
0858 self._tool_registry['epic_doc_search'] = EPIC_DOC_SEARCH_TOOL
0859 self._tool_registry['epic_doc_contents'] = EPIC_DOC_CONTENTS_TOOL
0860 self._tool_server_map['epic_doc_search'] = 'epicdoc'
0861 self._tool_server_map['epic_doc_contents'] = 'epicdoc'
0862
0863
0864 err = self._doc_handler._ensure_collection()
0865 if err:
0866 logger.warning(f"DocSearch init deferred: {err}")
0867
0868
0869
0870
0871 self._tool_selector.build_index(self._tool_registry, self._tool_server_map)
0872
0873 logger.info(f"Total tools in registry: {len(self._tool_registry)}")
0874
0875 def _build_tool_catalog(self):
0876 """One-liner catalog of all tools for the system prompt."""
0877 lines = [
0878 "TOOL AWARENESS — three tiers:",
0879 "1. CATALOG: All tools are in your system prompt as one-liners — full awareness at minimal token cost.",
0880 "2. PRE-LOADED: Tools you can call directly — pre-loaded based on relevance to this query.",
0881 "3. select_tools: Call this to load any catalog tool that isn't pre-loaded. You are never limited to pre-loaded tools.",
0882 "",
0883 "Full tool catalog:",
0884 ]
0885 for name, tool in sorted(self._tool_registry.items()):
0886 desc = tool["description"].split('\n')[0][:120]
0887 lines.append(f"- {name}: {desc}")
0888 return "\n".join(lines)
0889
0890
0891
0892 _SERVER_KEYWORDS = {
0893 'github': ('github', 'repo', 'pr ', 'pull request', 'issue', 'commit', 'branch', 'discussion'),
0894 'xrootd': ('xrootd', 'file', 'storage', 'directory', 'volatile'),
0895 'zenodo': ('zenodo', 'record', 'doi', 'deposit'),
0896 'lxr': ('lxr', 'code browser', 'cross-reference', 'source code', 'identifier',
0897 'class definition', 'where is', 'defined', 'header file', 'algorithm'),
0898 'uproot': ('uproot', 'root file', '.root', 'ttree', 'branch', 'histogram',
0899 'root data', 'hepdata', 'ntuple'),
0900 'jlab-rucio': ('rucio', 'did', 'dids', 'dataset', 'datasets', 'container',
0901 'replica', 'replicas', 'rse', 'rses', 'scope', 'replication',
0902 'pwg', 'jlab', 'jefferson'),
0903 'bnl-rucio': ('bnl rucio', 'brookhaven rucio', 'nprucio', 'panda rucio',
0904 'bnl did', 'bnl dataset', 'bnl replica'),
0905 }
0906
0907 def _extract_thread_tool_history(self, thread_context: str | None) -> tuple[set[str], set[str]]:
0908 """Extract tool history from prior bot replies in a thread.
0909
0910 Parses (tools suggested: ...) and (tools used: ...) metadata lines
0911 from bot messages in the thread.
0912
0913 Returns (prior_servers, prior_tools) where:
0914 - prior_servers: servers of tools actually used in prior turns
0915 - prior_tools: top 3 suggested tool names from each prior turn
0916 """
0917 prior_servers = set()
0918 prior_tools = set()
0919 if not thread_context:
0920 return prior_servers, prior_tools
0921
0922 for line in thread_context.split('\n'):
0923 if not line.startswith('Bot:'):
0924 continue
0925
0926 used_match = re.search(r'\(tools used:\s*([^)]+)\)', line)
0927 if used_match and used_match.group(1).strip() != 'none':
0928 for tool_name in used_match.group(1).split(','):
0929 tool_name = tool_name.strip()
0930 server = self._tool_server_map.get(tool_name)
0931 if server:
0932 prior_servers.add(server)
0933
0934 sugg_match = re.search(r'\(tools suggested:\s*([^)]+)\)', line)
0935 if sugg_match:
0936 entries = sugg_match.group(1).split(',')
0937 for entry in entries[:3]:
0938 name = entry.strip().split(':')[0]
0939 if name and name != 'none':
0940 prior_tools.add(name)
0941
0942 return prior_servers, prior_tools
0943
0944 def _select_tools_for_message(self, message: str, thread_context: str | None = None) -> tuple[list[dict], list[tuple[str, float]]]:
0945 """Pick tools for this message: semantic top-K + thread history + always-on tools.
0946
0947 Tool set is built from three sources:
0948 1. All tools from servers used in prior thread turns
0949 2. Top 3 suggested tools from each prior thread turn
0950 3. Top-K from vector search on the current message
0951
0952 Strips the [username in #channel] tag before embedding. Excludes
0953 stdio server tools unless activated by keyword or thread history.
0954
0955 Returns (active_tools, scored_names) where scored_names is
0956 [(name, score), ...] in ranked order.
0957 """
0958
0959 clean_message = re.sub(r'^\[.*?\]\s*', '', message)
0960 msg_lower = clean_message.lower()
0961
0962
0963 prior_servers, prior_tools = self._extract_thread_tool_history(thread_context)
0964
0965
0966 allowed_servers = {'swf-monitor', 'epicdoc'} | prior_servers
0967 for server, keywords in self._SERVER_KEYWORDS.items():
0968 if any(kw in msg_lower for kw in keywords):
0969 allowed_servers.add(server)
0970
0971
0972 tools = []
0973 scored = []
0974 seen = set()
0975 for name in prior_tools:
0976 if name in self._tool_registry and name not in seen:
0977 server = self._tool_server_map.get(name, 'unknown')
0978 if server in allowed_servers:
0979 tools.append(self._tool_registry[name])
0980 seen.add(name)
0981
0982
0983 all_scored = self._tool_selector.select(clean_message, top_k=TOP_K_TOOLS)
0984 for name, score in all_scored:
0985 server = self._tool_server_map.get(name, 'unknown')
0986 if server not in allowed_servers:
0987 continue
0988 if name in self._tool_registry and name not in seen:
0989 tools.append(self._tool_registry[name])
0990 scored.append((name, score))
0991 seen.add(name)
0992
0993 for name in ('bot_manage_servers',):
0994 if name in self._tool_registry and name not in seen:
0995 tools.append(self._tool_registry[name])
0996 seen.add(name)
0997
0998 tools.append(SELECT_TOOLS_TOOL)
0999 return tools, scored
1000
1001 async def _load_recent_dialog(self):
1002 """Load recent dialog from the database — all users, all contexts."""
1003 mcp = MCPClient(self.mcp_url)
1004 messages = []
1005 try:
1006 await mcp.initialize()
1007 result = await mcp.call_tool('swf_get_ai_memory', {
1008 'username': MEMORY_USERNAME,
1009 'turns': MEMORY_TURNS,
1010 })
1011 content = result.get('content', [])
1012 text = ''
1013 for item in content:
1014 if isinstance(item, dict) and 'text' in item:
1015 text += item['text']
1016 if text:
1017 data = json.loads(text)
1018 for item in data.get('items', []):
1019 messages.append({
1020 "role": item['role'],
1021 "content": item['content'],
1022 })
1023 logger.info(f"Loaded {len(messages)} memory items")
1024 except Exception:
1025 logger.exception("Failed to load recent dialog")
1026 finally:
1027 await mcp.close()
1028 return messages
1029
1030 async def _record_exchange(self, question, answer, post_id='', root_id=''):
1031 """Record a Q&A exchange to the unified memory."""
1032 mcp = MCPClient(self.mcp_url)
1033 try:
1034 await mcp.initialize()
1035 for role, content in [('user', question), ('assistant', answer)]:
1036 await mcp.call_tool('swf_record_ai_memory', {
1037 'username': MEMORY_USERNAME,
1038 'session_id': 'mattermost',
1039 'role': role,
1040 'content': content,
1041 'namespace': post_id,
1042 'project_path': root_id,
1043 })
1044 except Exception:
1045 logger.exception("Failed to record exchange")
1046 finally:
1047 await mcp.close()
1048
1049 async def _build_thread_context(self, root_id):
1050 """Fetch full Mattermost thread and format as context.
1051
1052 Thread replies are not visible in the main channel, so Claude
1053 has no record of them in the session conversation. This provides
1054 the full thread history for thread replies.
1055 """
1056 try:
1057 thread = await asyncio.to_thread(
1058 self.driver.posts.get_thread, root_id
1059 )
1060 posts = thread.get('posts', {})
1061 order = thread.get('order', [])
1062
1063 lines = []
1064 for pid in order:
1065 p = posts.get(pid)
1066 if not p or not p.get('message', '').strip():
1067 continue
1068 speaker = "Bot" if p['user_id'] == self.bot_user_id else "User"
1069 lines.append(f"{speaker}: {p['message'].strip()}")
1070
1071 return "\n".join(lines) if lines else None
1072 except Exception:
1073 logger.exception("Failed to fetch thread")
1074 return None
1075
1076 def start(self):
1077 """Connect to Mattermost and start listening."""
1078 logger.info(f"Connecting to {self.mm_url}...")
1079 self.driver.login()
1080 self.bot_user_id = self.driver.client.userid
1081 logger.info(f"Logged in as user {self.bot_user_id}")
1082
1083 team = self.driver.teams.get_team_by_name(self.mm_team)
1084 channel = self.driver.channels.get_channel_by_name(
1085 team['id'], self.mm_channel_name
1086 )
1087 self.channel_id = channel['id']
1088
1089 try:
1090 self.driver.channels.add_user(self.channel_id, options={
1091 'user_id': self.bot_user_id,
1092 })
1093 logger.info(f"Joined #{self.mm_channel_name}")
1094 except Exception:
1095 logger.info(f"Already a member of #{self.mm_channel_name}")
1096
1097 logger.info(
1098 f"Listening on #{self.mm_channel_name} "
1099 f"(channel {self.channel_id}) in team {self.mm_team} "
1100 f"(MCP: {self.mcp_url})"
1101 )
1102
1103 loop = asyncio.get_event_loop()
1104 loop.run_until_complete(self._setup_mcp())
1105 self._load_active_threads()
1106 self.driver.init_websocket(self._handle_event)
1107
1108 THREADS_STATE_KEY = 'pandabot_active_threads'
1109
1110 def _load_active_threads(self):
1111 """Load active thread IDs from PersistentState."""
1112 from monitor_app.models import PersistentState
1113 try:
1114 obj = PersistentState.objects.get(id=1)
1115 threads = obj.state_data.get(self.THREADS_STATE_KEY, [])
1116 self._active_threads = set(threads)
1117 logger.info(f"Loaded {len(self._active_threads)} active threads")
1118 except PersistentState.DoesNotExist:
1119 self._active_threads = set()
1120
1121 def _save_active_threads(self):
1122 """Persist active thread IDs. Keep only the most recent 200."""
1123 from monitor_app.models import PersistentState
1124 threads = list(self._active_threads)[-200:]
1125 self._active_threads = set(threads)
1126 obj, _ = PersistentState.objects.get_or_create(id=1, defaults={'state_data': {}})
1127 obj.state_data[self.THREADS_STATE_KEY] = threads
1128 obj.save()
1129
1130 async def _handle_event(self, raw):
1131 """WebSocket event handler."""
1132 try:
1133 event = json.loads(raw)
1134 except (json.JSONDecodeError, TypeError):
1135 return
1136
1137 event_type = event.get('event', '')
1138 if event_type and event_type != 'typing':
1139 logger.debug(f"WS event: {event_type}")
1140
1141 if event_type != 'posted':
1142 return
1143
1144 data = event.get('data', {})
1145 post_str = data.get('post')
1146 if not post_str:
1147 return
1148
1149 try:
1150 post = json.loads(post_str)
1151 except (json.JSONDecodeError, TypeError):
1152 return
1153
1154 post_channel = post.get('channel_id')
1155 post_user = post.get('user_id')
1156 post_type = post.get('type', '')
1157 logger.debug(
1158 f"Posted: channel={post_channel} user={post_user} "
1159 f"type={post_type} root_id={post.get('root_id', '')}"
1160 )
1161
1162 channel_type = data.get('channel_type', '')
1163
1164 if post_user == self.bot_user_id:
1165 logger.debug("Skipping own message")
1166 return
1167
1168 if post_type:
1169 logger.debug(f"Skipping system message type={post_type}")
1170 return
1171
1172
1173 is_our_channel = (post_channel == self.channel_id)
1174 is_dm = (channel_type == 'D')
1175 mentions_str = data.get('mentions', '')
1176 is_mention = self.bot_user_id and self.bot_user_id in mentions_str
1177 root_id = post.get('root_id', '')
1178 is_active_thread = root_id in self._active_threads
1179 if not is_our_channel and not is_dm and not is_mention and not is_active_thread:
1180 return
1181
1182 message_text = post.get('message', '').strip()
1183 if not message_text:
1184 return
1185
1186 post_id = post.get('id')
1187 mm_username = await self._resolve_mm_username(post_user)
1188
1189 if is_dm:
1190 context_tag = 'DM'
1191 elif is_our_channel:
1192 context_tag = f'#{self.mm_channel_name}'
1193 else:
1194 channel_name = data.get('channel_name', 'unknown')
1195 context_tag = f'#{channel_name}'
1196
1197 tagged_message = f"[{mm_username} in {context_tag}] {message_text}"
1198 source = 'DM' if is_dm else ('mention' if is_mention and not is_our_channel else 'channel')
1199 logger.info(f"Message from {mm_username} ({source}): {message_text[:100]}")
1200
1201 asyncio.create_task(self._respond(tagged_message, post_channel, post_id, root_id))
1202
1203 async def _respond(self, tagged_message, reply_channel, post_id, root_id):
1204 """Process any message — channel, DM, or mention.
1205
1206 Loads recent dialog from DB, runs Claude, records the exchange.
1207 Serialized via lock so recordings don't interleave.
1208 """
1209 async with self._respond_lock:
1210 messages = await self._load_recent_dialog()
1211 reply, dpid_verified, tool_meta = await self._process_message(messages, tagged_message, root_id)
1212
1213 reply = re.sub(r'\n*\*?\(tools (?:suggested|used):[^)]*\)\*?', '', reply)
1214 no_query_warn = ":warning: *This response was not based on a live data query.*"
1215 no_cite_warn = ":warning: *Tool was called live but the Data Provenance ID was not cited in the reply.*"
1216 reply = reply.replace(no_query_warn, "").replace(no_cite_warn, "").rstrip()
1217 if not dpid_verified and not reply.startswith("Sorry,"):
1218 reply += "\n\n" + (no_cite_warn if tool_meta['used'] else no_query_warn)
1219
1220 if tool_meta['used']:
1221 suggested = ', '.join(tool_meta['suggested']) or 'none'
1222 used = ', '.join(tool_meta['used'])
1223 reply += f"\n\n*(tools suggested: {suggested})*\n*(tools used: {used})*"
1224
1225 await self._record_exchange(tagged_message, reply, post_id, root_id)
1226
1227 await self._post_reply(reply, reply_channel, post_id, root_id)
1228
1229 async def _render_plot(self, code):
1230 """Execute matplotlib code and return the PNG path, or None on failure."""
1231 with tempfile.TemporaryDirectory() as tmpdir:
1232 plot_path = os.path.join(tmpdir, 'plot.png')
1233
1234 code = re.sub(
1235 r"plt\.savefig\([^)]+\)",
1236 f"plt.savefig('{plot_path}', dpi=150, bbox_inches='tight')",
1237 code,
1238 )
1239
1240 code = code.replace('plt.show()', '')
1241
1242 code = "import matplotlib\nmatplotlib.use('Agg')\n" + code
1243
1244 script_path = os.path.join(tmpdir, 'plot.py')
1245 with open(script_path, 'w') as f:
1246 f.write(code)
1247
1248 try:
1249
1250 import sys
1251 result = await asyncio.to_thread(
1252 subprocess.run,
1253 [sys.executable, script_path],
1254 capture_output=True, text=True, timeout=30,
1255 cwd=tmpdir,
1256 )
1257 if result.returncode != 0:
1258 logger.warning(f"Plot script failed: {result.stderr[:500]}")
1259 return None
1260 if not os.path.exists(plot_path):
1261 logger.warning("Plot script ran but produced no image")
1262 return None
1263
1264 import shutil
1265 fd, final_path = tempfile.mkstemp(suffix='.png')
1266 os.close(fd)
1267 shutil.copy2(plot_path, final_path)
1268 return final_path
1269 except subprocess.TimeoutExpired:
1270 logger.warning("Plot script timed out")
1271 return None
1272 except Exception:
1273 logger.exception("Plot execution failed")
1274 return None
1275
1276 async def _post_reply(self, reply, reply_channel, post_id, root_id):
1277
1278 file_ids = []
1279 plot_match = re.search(r'```python-plot\s*\n(.*?)```', reply, re.DOTALL)
1280 if not plot_match:
1281
1282 m = re.search(r'```python\s*\n(.*?)```', reply, re.DOTALL)
1283 if m and 'plt.savefig' in m.group(1):
1284 plot_match = m
1285 if plot_match:
1286 plot_code = plot_match.group(1)
1287 logger.info("Detected plot code, rendering...")
1288 plot_path = await self._render_plot(plot_code)
1289 if plot_path:
1290 try:
1291 uploaded = await asyncio.to_thread(
1292 self.driver.files.upload_file,
1293 reply_channel,
1294 {'files': ('plot.png', open(plot_path, 'rb'))},
1295 )
1296 fid = uploaded['file_infos'][0]['id']
1297 file_ids.append(fid)
1298 logger.info(f"Plot uploaded: {fid}")
1299
1300 reply = reply.replace(plot_match.group(0), '*(plot attached)*')
1301 except Exception:
1302 logger.exception("Failed to upload plot")
1303 finally:
1304 try:
1305 os.unlink(plot_path)
1306 except OSError:
1307 pass
1308
1309 if len(reply) > MM_POST_LIMIT:
1310 reply = reply[:MM_POST_LIMIT - 20] + '\n\n... (truncated)'
1311
1312 thread_root = root_id or post_id
1313 try:
1314 logger.info("Posting reply to Mattermost...")
1315 post_options = {
1316 'channel_id': reply_channel,
1317 'message': reply,
1318 'root_id': thread_root,
1319 }
1320 if file_ids:
1321 post_options['file_ids'] = file_ids
1322 await asyncio.to_thread(
1323 self.driver.posts.create_post,
1324 options=post_options,
1325 )
1326
1327 if thread_root not in self._active_threads:
1328 self._active_threads.add(thread_root)
1329 await asyncio.to_thread(self._save_active_threads)
1330 logger.info("Reply posted successfully")
1331 except Exception:
1332 logger.exception("Failed to post reply")
1333
1334 async def _process_message(self, messages, message_text, root_id):
1335 """Run the Claude conversation loop for one user message.
1336
1337 Returns (reply_text, dpid_verified). dpid_verified is True only when
1338 a tool was called AND the LLM cited a matching DPID in its final reply.
1339 """
1340
1341 user_content = message_text
1342 thread_context = None
1343 if root_id:
1344 thread_context = await self._build_thread_context(root_id)
1345 if thread_context:
1346 user_content = (
1347 f"[Thread conversation so far:\n{thread_context}\n]\n"
1348 f"New reply: {message_text}"
1349 )
1350
1351 messages.append({"role": "user", "content": user_content})
1352
1353 reply = "Sorry, I encountered an error processing your question."
1354 exchange_dpids = []
1355
1356 mcp = MCPClient(self.mcp_url)
1357 try:
1358 await mcp.initialize()
1359
1360
1361 if not self._tool_registry:
1362 tools = await mcp.list_tools()
1363 for t in tools:
1364 if t["name"].startswith(BOT_TOOL_PREFIXES):
1365 at = mcp_tool_to_anthropic(t)
1366 self._tool_registry[at["name"]] = at
1367
1368
1369 active_tools, scored = self._select_tools_for_message(message_text, thread_context)
1370 active_tool_names = {t['name'] for t in active_tools}
1371 suggested_names = [
1372 f"{name}:{score:.2f}" for name, score in scored
1373 ]
1374 tools_used = []
1375 logger.info(f"Selected {len(active_tools)} tools: {suggested_names}")
1376
1377 system = self._build_system_prompt()
1378 tool_catalog = self._build_tool_catalog()
1379 system_with_catalog = f"{system}\n\n{tool_catalog}"
1380
1381 for _round in range(MAX_TOOL_ROUNDS):
1382 response = await self.claude.beta.messages.create(
1383
1384 model="claude-haiku-4-5-20251001",
1385 max_tokens=4096,
1386 cache_control={"type": "ephemeral"},
1387 system=system_with_catalog,
1388 tools=active_tools,
1389 messages=messages,
1390 betas=["context-management-2025-06-27"],
1391 context_management={
1392 "edits": [{
1393 "type": "clear_tool_uses_20250919",
1394 "trigger": {
1395 "type": "input_tokens",
1396 "value": 80000,
1397 },
1398 "keep": {"type": "tool_uses", "value": 3},
1399 }]
1400 },
1401 )
1402 logger.info(
1403 f"Claude response: stop_reason={response.stop_reason}"
1404 )
1405
1406 if response.stop_reason != "tool_use":
1407 text_parts = [
1408 b.text for b in response.content if b.type == "text"
1409 ]
1410 reply = "\n".join(text_parts)
1411 break
1412
1413
1414 messages.append(
1415 {"role": "assistant", "content": response.content}
1416 )
1417 tool_results = []
1418 for block in response.content:
1419 if block.type != "tool_use":
1420 continue
1421 logger.info(f"Tool call: {block.name}({block.input})")
1422 if block.name not in ('select_tools', 'bot_manage_servers'):
1423 tools_used.append(block.name)
1424 try:
1425
1426 if block.name == 'select_tools':
1427 loaded = []
1428 for tname in block.input.get('names', []):
1429 if tname in self._tool_registry and tname not in active_tool_names:
1430 active_tools.append(self._tool_registry[tname])
1431 active_tool_names.add(tname)
1432 loaded.append(tname)
1433 result_text = json.dumps({
1434 'loaded': loaded,
1435 'message': f"Loaded {len(loaded)} tools. They are now available for use.",
1436 })
1437 logger.info(f"select_tools loaded: {loaded}")
1438 elif block.name == 'bot_manage_servers':
1439 result_text = await self._handle_manage_servers(block.input)
1440 elif block.name == 'epic_doc_search':
1441 result_text = await self._doc_handler.search(block.input)
1442 elif block.name == 'epic_doc_contents':
1443 result_text = await self._doc_handler.contents(block.input)
1444 else:
1445
1446 if block.name in self._tool_router:
1447 mcp_name = self._tool_original_name.get(block.name, block.name)
1448 result = await self._tool_router[block.name].call_tool(
1449 mcp_name, block.input
1450 )
1451 else:
1452 result = await mcp.call_tool(block.name, block.input)
1453 content = result.get("content", [])
1454 result_text = ""
1455 for item in content:
1456 if isinstance(item, dict) and "text" in item:
1457 result_text += item["text"]
1458
1459 dpid = self._generate_dpid()
1460 exchange_dpids.append(dpid)
1461 await self._record_dpid(dpid, block.name, block.input)
1462 result_text = f"[DPID:{dpid}]\n{result_text}"
1463 if len(result_text) > MAX_RESULT_LEN:
1464 result_text = (
1465 result_text[:MAX_RESULT_LEN]
1466 + '\n... (truncated)'
1467 )
1468 except Exception as e:
1469 logger.exception(f"MCP tool {block.name} failed")
1470 result_text = json.dumps({"error": str(e)})
1471
1472 tool_results.append({
1473 "type": "tool_result",
1474 "tool_use_id": block.id,
1475 "content": result_text,
1476 })
1477 messages.append(
1478 {"role": "user", "content": tool_results}
1479 )
1480 else:
1481 reply = (
1482 "I hit the maximum number of tool calls. "
1483 "Please try a more specific question."
1484 )
1485
1486 logger.info(f"Got reply: {len(reply)} chars")
1487
1488 except Exception:
1489 logger.exception("ask_claude failed")
1490 finally:
1491 await mcp.close()
1492
1493
1494 dpid_verified = False
1495 if exchange_dpids:
1496
1497 generated = set(exchange_dpids)
1498 generated_lower = {d.lower() for d in generated}
1499 cited = set()
1500 for line in reply.split('\n'):
1501 if re.search(r'(?i)dpid|provenance', line):
1502 for h in re.findall(r'\b([A-Fa-f0-9]{8})\b', line):
1503 if h.upper() in generated or h.lower() in generated_lower:
1504 cited.add(h.upper())
1505 matched = cited & set(exchange_dpids)
1506 if matched:
1507 dpid_verified = True
1508 logger.info(f"DPID verified: {matched}")
1509 else:
1510 logger.warning(
1511 f"Tool called but no valid DPID cited. "
1512 f"Generated: {exchange_dpids}, cited: {cited}"
1513 )
1514
1515
1516
1517 generated_upper = {d.upper() for d in exchange_dpids} if exchange_dpids else set()
1518 def _is_dpid_citation(line):
1519 if not re.search(r'(?i)dpid|provenance', line):
1520 return False
1521 for h in re.findall(r'\b([A-Fa-f0-9]{8})\b', line):
1522 if h.upper() in generated_upper:
1523 return True
1524 return False
1525 reply = '\n'.join(
1526 line for line in reply.split('\n')
1527 if not _is_dpid_citation(line)
1528 ).strip()
1529
1530
1531 seen = set()
1532 tools_used = [t for t in tools_used if not (t in seen or seen.add(t))]
1533
1534 tool_meta = {
1535 'suggested': suggested_names,
1536 'used': tools_used,
1537 }
1538 return reply, dpid_verified, tool_meta