File indexing completed on 2026-04-27 07:41:42
0001 """
0002 SWF Testbed Mattermost bot — assists testbed developers via DMs and a shared channel.
0003
0004 Connects to Mattermost via WebSocket, listens for:
0005 - Direct messages from authorized users
0006 - Messages in the testbed channel
0007
0008 Access-controlled: only users in the TESTBED_USER_MAP can interact.
0009 On each message, loads recent dialog from the database — all users, all
0010 contexts — giving the bot full awareness. One soft privacy rule: don't
0011 reveal DM content to others in the channel.
0012
0013 MCP transport: HTTP POST (JSON-RPC) — same as panda_bot. No SSE, no GET streams.
0014 """
0015
0016 import asyncio
0017 import json
0018 import logging
0019 import os
0020 from datetime import datetime, timezone
0021
0022 import anthropic
0023 import httpx
0024 from mattermostdriver import Driver
0025
0026 logger = logging.getLogger('testbed_bot')
0027
0028 MAX_TOOL_ROUNDS = 10
0029 MAX_RESULT_LEN = 10000
0030 MM_POST_LIMIT = 16383
0031 MEMORY_TURNS = 30
0032 MEMORY_USERNAME = 'testbedbot'
0033 MCP_URL = os.environ.get(
0034 'MCP_URL', 'https://pandaserver02.sdcc.bnl.gov/swf-monitor/mcp/'
0035 )
0036 BOT_TOOL_PREFIXES = ('swf_', 'panda_', 'pcs_')
0037
0038
0039
0040 DEFAULT_USER_MAP = {
0041 'wenaus': 'wenauseic',
0042 'rahmans': 'srahman1',
0043 }
0044
0045 SYSTEM_PREAMBLE = """\
0046 You are the SWF Testbed bot for the ePIC experiment at the Electron Ion Collider. \
0047 You assist testbed developers with workflow operations, monitoring, and PanDA production \
0048 using MCP tools.
0049
0050 You communicate via both direct messages and a shared Mattermost channel. \
0051 Each message you receive is tagged with the sender's username and context \
0052 (e.g. [wenaus in #swf-testbed-bot] or [wenaus in DM]). Your conversation \
0053 history includes recent dialog across all users and contexts — refer back to \
0054 earlier questions and answers naturally.
0055
0056 The current user's testbed username is: {testbed_username}
0057
0058 Privacy: a user's DM exchanges are their own business. Don't volunteer DM content \
0059 to others in the channel.
0060
0061 CRITICAL: ALWAYS call a tool to answer questions. NEVER answer from memory or from \
0062 examples in these instructions. Data changes constantly — query it live.
0063
0064 Active commands (start/stop testbed, start/stop workflow, kill agent, etc.) require the user \
0065 to have a pandaserver02 account. Only execute active commands for the current user using \
0066 their testbed username '{testbed_username}'. NEVER execute active commands on behalf of \
0067 another user or with a different username. Read-only queries (status, list, logs, etc.) \
0068 are fine for any mapped user.
0069
0070 Guidelines:
0071 - Be concise. Use markdown tables for structured data.
0072 - When a tool needs a username parameter, use '{testbed_username}'.
0073 - For testbed operations (start, stop, status), always use the user's testbed username.
0074 - Keep responses focused — extract and present the key information.
0075
0076 When a query returns no results, try broader queries before reporting nothing found.
0077 """
0078
0079
0080 def load_user_map():
0081 """Load user mapping from env var or defaults."""
0082 raw = os.environ.get('TESTBED_USER_MAP', '')
0083 if raw:
0084 try:
0085 return json.loads(raw)
0086 except json.JSONDecodeError:
0087 logger.error("Invalid TESTBED_USER_MAP JSON, using defaults")
0088 return DEFAULT_USER_MAP.copy()
0089
0090
0091 class MCPClient:
0092 """Minimal MCP client using HTTP POST only — no SSE, no GET streams."""
0093
0094 def __init__(self, url: str, client_name: str = "testbed-bot"):
0095 self.url = url
0096 self.session_id = None
0097 self._request_id = 0
0098 self._http = httpx.AsyncClient(timeout=60)
0099 self._client_name = client_name
0100
0101 async def _post(self, method: str, params: dict | None = None):
0102 self._request_id += 1
0103 body = {"jsonrpc": "2.0", "id": self._request_id, "method": method}
0104 if params:
0105 body["params"] = params
0106 headers = {"Content-Type": "application/json", "Accept": "application/json"}
0107 if self.session_id:
0108 headers["Mcp-Session-Id"] = self.session_id
0109 resp = await self._http.post(self.url, json=body, headers=headers)
0110 resp.raise_for_status()
0111 if "Mcp-Session-Id" in resp.headers:
0112 self.session_id = resp.headers["Mcp-Session-Id"]
0113 return resp.json()
0114
0115 async def initialize(self):
0116 resp = await self._post("initialize", {
0117 "protocolVersion": "2025-03-26",
0118 "capabilities": {},
0119 "clientInfo": {"name": self._client_name, "version": "1.0"},
0120 })
0121 self.server_instructions = (
0122 resp.get("result", {}).get("instructions", "")
0123 )
0124 return resp
0125
0126 async def list_tools(self):
0127 result = await self._post("tools/list")
0128 return result.get("result", {}).get("tools", [])
0129
0130 async def call_tool(self, name: str, arguments: dict):
0131 result = await self._post("tools/call", {
0132 "name": name, "arguments": arguments,
0133 })
0134 return result.get("result", {})
0135
0136 async def close(self):
0137 await self._http.aclose()
0138
0139
0140 def mcp_tool_to_anthropic(tool):
0141 """Convert an MCP tool definition to Anthropic Messages API format."""
0142 return {
0143 "name": tool["name"],
0144 "description": tool.get("description", ""),
0145 "input_schema": tool["inputSchema"],
0146 }
0147
0148
0149 class TestbedBot:
0150 """Mattermost bot for testbed developers.
0151
0152 On each message, loads recent dialog from the database — all users,
0153 all contexts — so the bot has full awareness. Access-controlled:
0154 only users in the user map can interact.
0155 """
0156
0157 def __init__(self):
0158 self.mm_url = os.environ.get('MATTERMOST_URL', 'chat.epic-eic.org')
0159 self.mm_token = os.environ['TESTBED_BOT_TOKEN']
0160 self.mm_team = os.environ.get('MATTERMOST_TEAM', 'main')
0161 self.mm_channel_name = os.environ.get(
0162 'TESTBED_BOT_CHANNEL', 'swf-testbed-bot'
0163 )
0164 self.mcp_url = MCP_URL
0165
0166 self.user_map = load_user_map()
0167 logger.info(f"User map: {self.user_map}")
0168
0169 self.claude = anthropic.AsyncAnthropic()
0170
0171 self.driver = Driver({
0172 'url': self.mm_url,
0173 'token': self.mm_token,
0174 'scheme': 'https',
0175 'port': 443,
0176 })
0177
0178 self.bot_user_id = None
0179 self.channel_id = None
0180 self.system_prompt_base = SYSTEM_PREAMBLE
0181 self.anthropic_tools = []
0182 self.server_instructions = ""
0183 self._respond_lock = asyncio.Lock()
0184 self._mm_user_cache: dict[str, str] = {}
0185
0186 async def _resolve_mm_username(self, mm_user_id):
0187 """Look up Mattermost username from user ID, with caching."""
0188 if mm_user_id in self._mm_user_cache:
0189 return self._mm_user_cache[mm_user_id]
0190 try:
0191 user = await asyncio.to_thread(
0192 self.driver.users.get_user, mm_user_id
0193 )
0194 username = user.get('username', '')
0195 self._mm_user_cache[mm_user_id] = username
0196 return username
0197 except Exception:
0198 logger.exception(f"Failed to resolve user {mm_user_id}")
0199 return ''
0200
0201 def _build_system_prompt(self, testbed_username):
0202 """System prompt personalized for the current user."""
0203 now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
0204 preamble = self.system_prompt_base.format(
0205 testbed_username=testbed_username,
0206 )
0207 prompt = f"Current date and time: {now}\n\n{preamble}"
0208 if self.server_instructions:
0209 prompt += "\n" + self.server_instructions
0210 return prompt
0211
0212 async def _setup_mcp(self):
0213 """Discover tools and server instructions via MCP."""
0214 mcp = MCPClient(self.mcp_url)
0215 try:
0216 await mcp.initialize()
0217 tools = await mcp.list_tools()
0218 self.anthropic_tools = [
0219 mcp_tool_to_anthropic(t) for t in tools
0220 if t["name"].startswith(BOT_TOOL_PREFIXES)
0221 ]
0222 logger.info(f"Discovered {len(self.anthropic_tools)} tools via MCP")
0223 self.server_instructions = mcp.server_instructions or ""
0224 if self.server_instructions:
0225 self.system_prompt_base = (
0226 SYSTEM_PREAMBLE + "\n" + self.server_instructions
0227 )
0228 except Exception:
0229 logger.exception("Failed MCP setup — will retry on first message")
0230 finally:
0231 await mcp.close()
0232
0233 async def _load_recent_dialog(self):
0234 """Load recent dialog from the database — all users, all contexts."""
0235 mcp = MCPClient(self.mcp_url)
0236 messages = []
0237 try:
0238 await mcp.initialize()
0239 result = await mcp.call_tool('swf_get_ai_memory', {
0240 'username': MEMORY_USERNAME,
0241 'turns': MEMORY_TURNS,
0242 })
0243 content = result.get('content', [])
0244 text = ''
0245 for item in content:
0246 if isinstance(item, dict) and 'text' in item:
0247 text += item['text']
0248 if text:
0249 data = json.loads(text)
0250 for item in data.get('items', []):
0251 messages.append({
0252 "role": item['role'],
0253 "content": item['content'],
0254 })
0255 logger.info(f"Loaded {len(messages)} memory items")
0256 except Exception:
0257 logger.exception("Failed to load recent dialog")
0258 finally:
0259 await mcp.close()
0260 return messages
0261
0262 async def _record_exchange(self, question, answer, post_id='', root_id=''):
0263 """Record a Q&A exchange to the unified memory."""
0264 mcp = MCPClient(self.mcp_url)
0265 try:
0266 await mcp.initialize()
0267 for role, content in [('user', question), ('assistant', answer)]:
0268 await mcp.call_tool('swf_record_ai_memory', {
0269 'username': MEMORY_USERNAME,
0270 'session_id': 'mattermost',
0271 'role': role,
0272 'content': content,
0273 'namespace': post_id,
0274 'project_path': root_id,
0275 })
0276 except Exception:
0277 logger.exception("Failed to record exchange")
0278 finally:
0279 await mcp.close()
0280
0281 async def _build_thread_context(self, root_id):
0282 """Fetch full Mattermost thread and format as context."""
0283 try:
0284 thread = await asyncio.to_thread(
0285 self.driver.posts.get_thread, root_id
0286 )
0287 posts = thread.get('posts', {})
0288 order = thread.get('order', [])
0289
0290 lines = []
0291 for pid in order:
0292 p = posts.get(pid)
0293 if not p or not p.get('message', '').strip():
0294 continue
0295 speaker = "Bot" if p['user_id'] == self.bot_user_id else "User"
0296 lines.append(f"{speaker}: {p['message'].strip()}")
0297
0298 return "\n".join(lines) if lines else None
0299 except Exception:
0300 logger.exception("Failed to fetch thread")
0301 return None
0302
0303 def start(self):
0304 """Connect to Mattermost and start listening."""
0305 logger.info(f"Connecting to {self.mm_url}...")
0306 self.driver.login()
0307 self.bot_user_id = self.driver.client.userid
0308 logger.info(f"Logged in as user {self.bot_user_id}")
0309
0310 team = self.driver.teams.get_team_by_name(self.mm_team)
0311 channel = self.driver.channels.get_channel_by_name(
0312 team['id'], self.mm_channel_name
0313 )
0314 self.channel_id = channel['id']
0315
0316 try:
0317 self.driver.channels.add_user(self.channel_id, options={
0318 'user_id': self.bot_user_id,
0319 })
0320 logger.info(f"Joined #{self.mm_channel_name}")
0321 except Exception:
0322 logger.info(f"Already a member of #{self.mm_channel_name}")
0323
0324 logger.info(
0325 f"Listening on #{self.mm_channel_name} + DMs "
0326 f"(MCP: {self.mcp_url}, users: {list(self.user_map.keys())})"
0327 )
0328
0329 loop = asyncio.get_event_loop()
0330 loop.run_until_complete(self._setup_mcp())
0331 self.driver.init_websocket(self._handle_event)
0332
0333 async def _handle_event(self, raw):
0334 """WebSocket event handler — accepts channel messages and DMs."""
0335 try:
0336 event = json.loads(raw)
0337 except (json.JSONDecodeError, TypeError):
0338 return
0339
0340 event_type = event.get('event', '')
0341 if event_type and event_type != 'typing':
0342 logger.debug(f"WS event: {event_type}")
0343
0344 if event_type != 'posted':
0345 return
0346
0347 data = event.get('data', {})
0348 post_str = data.get('post')
0349 if not post_str:
0350 return
0351
0352 try:
0353 post = json.loads(post_str)
0354 except (json.JSONDecodeError, TypeError):
0355 return
0356
0357 post_channel = post.get('channel_id')
0358 post_user = post.get('user_id')
0359 post_type = post.get('type', '')
0360 channel_type = data.get('channel_type', '')
0361
0362 if post_user == self.bot_user_id:
0363 return
0364
0365 if post_type:
0366 return
0367
0368
0369 is_our_channel = (post_channel == self.channel_id)
0370 is_dm = (channel_type == 'D')
0371
0372 if not is_our_channel and not is_dm:
0373 return
0374
0375 message_text = post.get('message', '').strip()
0376 if not message_text:
0377 return
0378
0379
0380 mm_username = await self._resolve_mm_username(post_user)
0381 if mm_username not in self.user_map:
0382 logger.info(f"Unauthorized user: {mm_username} ({post_user})")
0383 if is_dm:
0384 await asyncio.to_thread(
0385 self.driver.posts.create_post,
0386 options={
0387 'channel_id': post_channel,
0388 'message': (
0389 "Sorry, I'm only available to authorized "
0390 "testbed developers."
0391 ),
0392 },
0393 )
0394 return
0395
0396 testbed_username = self.user_map[mm_username]
0397 post_id = post.get('id')
0398 root_id = post.get('root_id')
0399
0400 context_tag = 'DM' if is_dm else f'#{self.mm_channel_name}'
0401 tagged_message = f"[{mm_username} in {context_tag}] {message_text}"
0402 logger.info(
0403 f"Message from {mm_username} "
0404 f"({'DM' if is_dm else 'channel'}): {message_text[:100]}"
0405 )
0406
0407 asyncio.create_task(
0408 self._respond(tagged_message, testbed_username, post_channel, post_id, root_id)
0409 )
0410
0411 async def _respond(self, tagged_message, testbed_username, reply_channel, post_id, root_id):
0412 """Process any message — channel or DM.
0413
0414 Loads recent dialog from DB, runs Claude, records the exchange.
0415 Serialized via lock so recordings don't interleave.
0416 """
0417 async with self._respond_lock:
0418 messages = await self._load_recent_dialog()
0419 reply = await self._process_message(
0420 messages, tagged_message, testbed_username, root_id
0421 )
0422
0423 await self._record_exchange(tagged_message, reply, post_id, root_id)
0424
0425 if len(reply) > MM_POST_LIMIT:
0426 reply = reply[:MM_POST_LIMIT - 20] + '\n\n... (truncated)'
0427
0428 try:
0429 logger.info("Posting reply to Mattermost...")
0430 await asyncio.to_thread(
0431 self.driver.posts.create_post,
0432 options={
0433 'channel_id': reply_channel,
0434 'message': reply,
0435 'root_id': root_id or post_id,
0436 },
0437 )
0438 logger.info("Reply posted successfully")
0439 except Exception:
0440 logger.exception("Failed to post reply")
0441
0442 async def _process_message(self, messages, message_text, testbed_username, root_id):
0443 """Run the Claude conversation loop for one user message.
0444
0445 Returns the final reply text. The messages list is ephemeral —
0446 loaded from DB for this request and discarded after.
0447 """
0448
0449 user_content = message_text
0450 if root_id:
0451 thread_context = await self._build_thread_context(root_id)
0452 if thread_context:
0453 user_content = (
0454 f"[Thread conversation so far:\n{thread_context}\n]\n"
0455 f"New reply: {message_text}"
0456 )
0457
0458 messages.append({"role": "user", "content": user_content})
0459
0460 reply = "Sorry, I encountered an error processing your question."
0461
0462 mcp = MCPClient(self.mcp_url)
0463 try:
0464 await mcp.initialize()
0465
0466 if not self.anthropic_tools:
0467 tools = await mcp.list_tools()
0468 self.anthropic_tools = [
0469 mcp_tool_to_anthropic(t) for t in tools
0470 if t["name"].startswith(BOT_TOOL_PREFIXES)
0471 ]
0472
0473 system = self._build_system_prompt(testbed_username)
0474
0475 for _round in range(MAX_TOOL_ROUNDS):
0476 response = await self.claude.beta.messages.create(
0477 model="claude-haiku-4-5-20251001",
0478 max_tokens=4096,
0479 cache_control={"type": "ephemeral"},
0480 system=system,
0481 tools=self.anthropic_tools,
0482 messages=messages,
0483 betas=["context-management-2025-06-27"],
0484 context_management={
0485 "edits": [{
0486 "type": "clear_tool_uses_20250919",
0487 "trigger": {
0488 "type": "input_tokens",
0489 "value": 80000,
0490 },
0491 "keep": {"type": "tool_uses", "value": 3},
0492 }]
0493 },
0494 )
0495 logger.info(
0496 f"Claude response: stop_reason={response.stop_reason}"
0497 )
0498
0499 if response.stop_reason != "tool_use":
0500 text_parts = [
0501 b.text for b in response.content if b.type == "text"
0502 ]
0503 reply = "\n".join(text_parts)
0504 break
0505
0506 messages.append(
0507 {"role": "assistant", "content": response.content}
0508 )
0509 tool_results = []
0510 for block in response.content:
0511 if block.type != "tool_use":
0512 continue
0513 logger.info(f"Tool call: {block.name}({block.input})")
0514 try:
0515 result = await mcp.call_tool(block.name, block.input)
0516 content = result.get("content", [])
0517 result_text = ""
0518 for item in content:
0519 if isinstance(item, dict) and "text" in item:
0520 result_text += item["text"]
0521 if len(result_text) > MAX_RESULT_LEN:
0522 result_text = (
0523 result_text[:MAX_RESULT_LEN]
0524 + '\n... (truncated)'
0525 )
0526 except Exception as e:
0527 logger.exception(f"MCP tool {block.name} failed")
0528 result_text = json.dumps({"error": str(e)})
0529
0530 tool_results.append({
0531 "type": "tool_result",
0532 "tool_use_id": block.id,
0533 "content": result_text,
0534 })
0535 messages.append(
0536 {"role": "user", "content": tool_results}
0537 )
0538 else:
0539 reply = (
0540 "I hit the maximum number of tool calls. "
0541 "Please try a more specific question."
0542 )
0543
0544 logger.info(f"Got reply: {len(reply)} chars")
0545
0546 except Exception:
0547 logger.exception("ask_claude failed")
0548 finally:
0549 await mcp.close()
0550
0551 return reply