Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-27 07:41:42

0001 """
0002 SWF Testbed Mattermost bot — assists testbed developers via DMs and a shared channel.
0003 
0004 Connects to Mattermost via WebSocket, listens for:
0005   - Direct messages from authorized users
0006   - Messages in the testbed channel
0007 
0008 Access-controlled: only users in the TESTBED_USER_MAP can interact.
0009 On each message, loads recent dialog from the database — all users, all
0010 contexts — giving the bot full awareness. One soft privacy rule: don't
0011 reveal DM content to others in the channel.
0012 
0013 MCP transport: HTTP POST (JSON-RPC) — same as panda_bot. No SSE, no GET streams.
0014 """
0015 
0016 import asyncio
0017 import json
0018 import logging
0019 import os
0020 from datetime import datetime, timezone
0021 
0022 import anthropic
0023 import httpx
0024 from mattermostdriver import Driver
0025 
0026 logger = logging.getLogger('testbed_bot')
0027 
0028 MAX_TOOL_ROUNDS = 10
0029 MAX_RESULT_LEN = 10000
0030 MM_POST_LIMIT = 16383
0031 MEMORY_TURNS = 30
0032 MEMORY_USERNAME = 'testbedbot'
0033 MCP_URL = os.environ.get(
0034     'MCP_URL', 'https://pandaserver02.sdcc.bnl.gov/swf-monitor/mcp/'
0035 )
0036 BOT_TOOL_PREFIXES = ('swf_', 'panda_', 'pcs_')
0037 
0038 # Mattermost username -> testbed username
0039 # Loaded from TESTBED_USER_MAP env var (JSON) or defaults below
0040 DEFAULT_USER_MAP = {
0041     'wenaus': 'wenauseic',
0042     'rahmans': 'srahman1',
0043 }
0044 
0045 SYSTEM_PREAMBLE = """\
0046 You are the SWF Testbed bot for the ePIC experiment at the Electron Ion Collider. \
0047 You assist testbed developers with workflow operations, monitoring, and PanDA production \
0048 using MCP tools.
0049 
0050 You communicate via both direct messages and a shared Mattermost channel. \
0051 Each message you receive is tagged with the sender's username and context \
0052 (e.g. [wenaus in #swf-testbed-bot] or [wenaus in DM]). Your conversation \
0053 history includes recent dialog across all users and contexts — refer back to \
0054 earlier questions and answers naturally.
0055 
0056 The current user's testbed username is: {testbed_username}
0057 
0058 Privacy: a user's DM exchanges are their own business. Don't volunteer DM content \
0059 to others in the channel.
0060 
0061 CRITICAL: ALWAYS call a tool to answer questions. NEVER answer from memory or from \
0062 examples in these instructions. Data changes constantly — query it live.
0063 
0064 Active commands (start/stop testbed, start/stop workflow, kill agent, etc.) require the user \
0065 to have a pandaserver02 account. Only execute active commands for the current user using \
0066 their testbed username '{testbed_username}'. NEVER execute active commands on behalf of \
0067 another user or with a different username. Read-only queries (status, list, logs, etc.) \
0068 are fine for any mapped user.
0069 
0070 Guidelines:
0071 - Be concise. Use markdown tables for structured data.
0072 - When a tool needs a username parameter, use '{testbed_username}'.
0073 - For testbed operations (start, stop, status), always use the user's testbed username.
0074 - Keep responses focused — extract and present the key information.
0075 
0076 When a query returns no results, try broader queries before reporting nothing found.
0077 """
0078 
0079 
0080 def load_user_map():
0081     """Load user mapping from env var or defaults."""
0082     raw = os.environ.get('TESTBED_USER_MAP', '')
0083     if raw:
0084         try:
0085             return json.loads(raw)
0086         except json.JSONDecodeError:
0087             logger.error("Invalid TESTBED_USER_MAP JSON, using defaults")
0088     return DEFAULT_USER_MAP.copy()
0089 
0090 
0091 class MCPClient:
0092     """Minimal MCP client using HTTP POST only — no SSE, no GET streams."""
0093 
0094     def __init__(self, url: str, client_name: str = "testbed-bot"):
0095         self.url = url
0096         self.session_id = None
0097         self._request_id = 0
0098         self._http = httpx.AsyncClient(timeout=60)
0099         self._client_name = client_name
0100 
0101     async def _post(self, method: str, params: dict | None = None):
0102         self._request_id += 1
0103         body = {"jsonrpc": "2.0", "id": self._request_id, "method": method}
0104         if params:
0105             body["params"] = params
0106         headers = {"Content-Type": "application/json", "Accept": "application/json"}
0107         if self.session_id:
0108             headers["Mcp-Session-Id"] = self.session_id
0109         resp = await self._http.post(self.url, json=body, headers=headers)
0110         resp.raise_for_status()
0111         if "Mcp-Session-Id" in resp.headers:
0112             self.session_id = resp.headers["Mcp-Session-Id"]
0113         return resp.json()
0114 
0115     async def initialize(self):
0116         resp = await self._post("initialize", {
0117             "protocolVersion": "2025-03-26",
0118             "capabilities": {},
0119             "clientInfo": {"name": self._client_name, "version": "1.0"},
0120         })
0121         self.server_instructions = (
0122             resp.get("result", {}).get("instructions", "")
0123         )
0124         return resp
0125 
0126     async def list_tools(self):
0127         result = await self._post("tools/list")
0128         return result.get("result", {}).get("tools", [])
0129 
0130     async def call_tool(self, name: str, arguments: dict):
0131         result = await self._post("tools/call", {
0132             "name": name, "arguments": arguments,
0133         })
0134         return result.get("result", {})
0135 
0136     async def close(self):
0137         await self._http.aclose()
0138 
0139 
0140 def mcp_tool_to_anthropic(tool):
0141     """Convert an MCP tool definition to Anthropic Messages API format."""
0142     return {
0143         "name": tool["name"],
0144         "description": tool.get("description", ""),
0145         "input_schema": tool["inputSchema"],
0146     }
0147 
0148 
0149 class TestbedBot:
0150     """Mattermost bot for testbed developers.
0151 
0152     On each message, loads recent dialog from the database — all users,
0153     all contexts — so the bot has full awareness. Access-controlled:
0154     only users in the user map can interact.
0155     """
0156 
0157     def __init__(self):
0158         self.mm_url = os.environ.get('MATTERMOST_URL', 'chat.epic-eic.org')
0159         self.mm_token = os.environ['TESTBED_BOT_TOKEN']
0160         self.mm_team = os.environ.get('MATTERMOST_TEAM', 'main')
0161         self.mm_channel_name = os.environ.get(
0162             'TESTBED_BOT_CHANNEL', 'swf-testbed-bot'
0163         )
0164         self.mcp_url = MCP_URL
0165 
0166         self.user_map = load_user_map()
0167         logger.info(f"User map: {self.user_map}")
0168 
0169         self.claude = anthropic.AsyncAnthropic()
0170 
0171         self.driver = Driver({
0172             'url': self.mm_url,
0173             'token': self.mm_token,
0174             'scheme': 'https',
0175             'port': 443,
0176         })
0177 
0178         self.bot_user_id = None
0179         self.channel_id = None
0180         self.system_prompt_base = SYSTEM_PREAMBLE
0181         self.anthropic_tools = []
0182         self.server_instructions = ""
0183         self._respond_lock = asyncio.Lock()
0184         self._mm_user_cache: dict[str, str] = {}
0185 
0186     async def _resolve_mm_username(self, mm_user_id):
0187         """Look up Mattermost username from user ID, with caching."""
0188         if mm_user_id in self._mm_user_cache:
0189             return self._mm_user_cache[mm_user_id]
0190         try:
0191             user = await asyncio.to_thread(
0192                 self.driver.users.get_user, mm_user_id
0193             )
0194             username = user.get('username', '')
0195             self._mm_user_cache[mm_user_id] = username
0196             return username
0197         except Exception:
0198             logger.exception(f"Failed to resolve user {mm_user_id}")
0199             return ''
0200 
0201     def _build_system_prompt(self, testbed_username):
0202         """System prompt personalized for the current user."""
0203         now = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')
0204         preamble = self.system_prompt_base.format(
0205             testbed_username=testbed_username,
0206         )
0207         prompt = f"Current date and time: {now}\n\n{preamble}"
0208         if self.server_instructions:
0209             prompt += "\n" + self.server_instructions
0210         return prompt
0211 
0212     async def _setup_mcp(self):
0213         """Discover tools and server instructions via MCP."""
0214         mcp = MCPClient(self.mcp_url)
0215         try:
0216             await mcp.initialize()
0217             tools = await mcp.list_tools()
0218             self.anthropic_tools = [
0219                 mcp_tool_to_anthropic(t) for t in tools
0220                 if t["name"].startswith(BOT_TOOL_PREFIXES)
0221             ]
0222             logger.info(f"Discovered {len(self.anthropic_tools)} tools via MCP")
0223             self.server_instructions = mcp.server_instructions or ""
0224             if self.server_instructions:
0225                 self.system_prompt_base = (
0226                     SYSTEM_PREAMBLE + "\n" + self.server_instructions
0227                 )
0228         except Exception:
0229             logger.exception("Failed MCP setup — will retry on first message")
0230         finally:
0231             await mcp.close()
0232 
0233     async def _load_recent_dialog(self):
0234         """Load recent dialog from the database — all users, all contexts."""
0235         mcp = MCPClient(self.mcp_url)
0236         messages = []
0237         try:
0238             await mcp.initialize()
0239             result = await mcp.call_tool('swf_get_ai_memory', {
0240                 'username': MEMORY_USERNAME,
0241                 'turns': MEMORY_TURNS,
0242             })
0243             content = result.get('content', [])
0244             text = ''
0245             for item in content:
0246                 if isinstance(item, dict) and 'text' in item:
0247                     text += item['text']
0248             if text:
0249                 data = json.loads(text)
0250                 for item in data.get('items', []):
0251                     messages.append({
0252                         "role": item['role'],
0253                         "content": item['content'],
0254                     })
0255                 logger.info(f"Loaded {len(messages)} memory items")
0256         except Exception:
0257             logger.exception("Failed to load recent dialog")
0258         finally:
0259             await mcp.close()
0260         return messages
0261 
0262     async def _record_exchange(self, question, answer, post_id='', root_id=''):
0263         """Record a Q&A exchange to the unified memory."""
0264         mcp = MCPClient(self.mcp_url)
0265         try:
0266             await mcp.initialize()
0267             for role, content in [('user', question), ('assistant', answer)]:
0268                 await mcp.call_tool('swf_record_ai_memory', {
0269                     'username': MEMORY_USERNAME,
0270                     'session_id': 'mattermost',
0271                     'role': role,
0272                     'content': content,
0273                     'namespace': post_id,
0274                     'project_path': root_id,
0275                 })
0276         except Exception:
0277             logger.exception("Failed to record exchange")
0278         finally:
0279             await mcp.close()
0280 
0281     async def _build_thread_context(self, root_id):
0282         """Fetch full Mattermost thread and format as context."""
0283         try:
0284             thread = await asyncio.to_thread(
0285                 self.driver.posts.get_thread, root_id
0286             )
0287             posts = thread.get('posts', {})
0288             order = thread.get('order', [])
0289 
0290             lines = []
0291             for pid in order:
0292                 p = posts.get(pid)
0293                 if not p or not p.get('message', '').strip():
0294                     continue
0295                 speaker = "Bot" if p['user_id'] == self.bot_user_id else "User"
0296                 lines.append(f"{speaker}: {p['message'].strip()}")
0297 
0298             return "\n".join(lines) if lines else None
0299         except Exception:
0300             logger.exception("Failed to fetch thread")
0301             return None
0302 
0303     def start(self):
0304         """Connect to Mattermost and start listening."""
0305         logger.info(f"Connecting to {self.mm_url}...")
0306         self.driver.login()
0307         self.bot_user_id = self.driver.client.userid
0308         logger.info(f"Logged in as user {self.bot_user_id}")
0309 
0310         team = self.driver.teams.get_team_by_name(self.mm_team)
0311         channel = self.driver.channels.get_channel_by_name(
0312             team['id'], self.mm_channel_name
0313         )
0314         self.channel_id = channel['id']
0315 
0316         try:
0317             self.driver.channels.add_user(self.channel_id, options={
0318                 'user_id': self.bot_user_id,
0319             })
0320             logger.info(f"Joined #{self.mm_channel_name}")
0321         except Exception:
0322             logger.info(f"Already a member of #{self.mm_channel_name}")
0323 
0324         logger.info(
0325             f"Listening on #{self.mm_channel_name} + DMs "
0326             f"(MCP: {self.mcp_url}, users: {list(self.user_map.keys())})"
0327         )
0328 
0329         loop = asyncio.get_event_loop()
0330         loop.run_until_complete(self._setup_mcp())
0331         self.driver.init_websocket(self._handle_event)
0332 
0333     async def _handle_event(self, raw):
0334         """WebSocket event handler — accepts channel messages and DMs."""
0335         try:
0336             event = json.loads(raw)
0337         except (json.JSONDecodeError, TypeError):
0338             return
0339 
0340         event_type = event.get('event', '')
0341         if event_type and event_type != 'typing':
0342             logger.debug(f"WS event: {event_type}")
0343 
0344         if event_type != 'posted':
0345             return
0346 
0347         data = event.get('data', {})
0348         post_str = data.get('post')
0349         if not post_str:
0350             return
0351 
0352         try:
0353             post = json.loads(post_str)
0354         except (json.JSONDecodeError, TypeError):
0355             return
0356 
0357         post_channel = post.get('channel_id')
0358         post_user = post.get('user_id')
0359         post_type = post.get('type', '')
0360         channel_type = data.get('channel_type', '')
0361 
0362         if post_user == self.bot_user_id:
0363             return
0364 
0365         if post_type:
0366             return
0367 
0368         # Accept: our channel OR a DM
0369         is_our_channel = (post_channel == self.channel_id)
0370         is_dm = (channel_type == 'D')
0371 
0372         if not is_our_channel and not is_dm:
0373             return
0374 
0375         message_text = post.get('message', '').strip()
0376         if not message_text:
0377             return
0378 
0379         # Resolve Mattermost username and check authorization
0380         mm_username = await self._resolve_mm_username(post_user)
0381         if mm_username not in self.user_map:
0382             logger.info(f"Unauthorized user: {mm_username} ({post_user})")
0383             if is_dm:
0384                 await asyncio.to_thread(
0385                     self.driver.posts.create_post,
0386                     options={
0387                         'channel_id': post_channel,
0388                         'message': (
0389                             "Sorry, I'm only available to authorized "
0390                             "testbed developers."
0391                         ),
0392                     },
0393                 )
0394             return
0395 
0396         testbed_username = self.user_map[mm_username]
0397         post_id = post.get('id')
0398         root_id = post.get('root_id')
0399 
0400         context_tag = 'DM' if is_dm else f'#{self.mm_channel_name}'
0401         tagged_message = f"[{mm_username} in {context_tag}] {message_text}"
0402         logger.info(
0403             f"Message from {mm_username} "
0404             f"({'DM' if is_dm else 'channel'}): {message_text[:100]}"
0405         )
0406 
0407         asyncio.create_task(
0408             self._respond(tagged_message, testbed_username, post_channel, post_id, root_id)
0409         )
0410 
0411     async def _respond(self, tagged_message, testbed_username, reply_channel, post_id, root_id):
0412         """Process any message — channel or DM.
0413 
0414         Loads recent dialog from DB, runs Claude, records the exchange.
0415         Serialized via lock so recordings don't interleave.
0416         """
0417         async with self._respond_lock:
0418             messages = await self._load_recent_dialog()
0419             reply = await self._process_message(
0420                 messages, tagged_message, testbed_username, root_id
0421             )
0422             # Record inside lock so the next load sees this exchange
0423             await self._record_exchange(tagged_message, reply, post_id, root_id)
0424 
0425         if len(reply) > MM_POST_LIMIT:
0426             reply = reply[:MM_POST_LIMIT - 20] + '\n\n... (truncated)'
0427 
0428         try:
0429             logger.info("Posting reply to Mattermost...")
0430             await asyncio.to_thread(
0431                 self.driver.posts.create_post,
0432                 options={
0433                     'channel_id': reply_channel,
0434                     'message': reply,
0435                     'root_id': root_id or post_id,
0436                 },
0437             )
0438             logger.info("Reply posted successfully")
0439         except Exception:
0440             logger.exception("Failed to post reply")
0441 
0442     async def _process_message(self, messages, message_text, testbed_username, root_id):
0443         """Run the Claude conversation loop for one user message.
0444 
0445         Returns the final reply text. The messages list is ephemeral —
0446         loaded from DB for this request and discarded after.
0447         """
0448         # Build user message with full thread context if it's a reply
0449         user_content = message_text
0450         if root_id:
0451             thread_context = await self._build_thread_context(root_id)
0452             if thread_context:
0453                 user_content = (
0454                     f"[Thread conversation so far:\n{thread_context}\n]\n"
0455                     f"New reply: {message_text}"
0456                 )
0457 
0458         messages.append({"role": "user", "content": user_content})
0459 
0460         reply = "Sorry, I encountered an error processing your question."
0461 
0462         mcp = MCPClient(self.mcp_url)
0463         try:
0464             await mcp.initialize()
0465 
0466             if not self.anthropic_tools:
0467                 tools = await mcp.list_tools()
0468                 self.anthropic_tools = [
0469                     mcp_tool_to_anthropic(t) for t in tools
0470                     if t["name"].startswith(BOT_TOOL_PREFIXES)
0471                 ]
0472 
0473             system = self._build_system_prompt(testbed_username)
0474 
0475             for _round in range(MAX_TOOL_ROUNDS):
0476                 response = await self.claude.beta.messages.create(
0477                     model="claude-haiku-4-5-20251001",
0478                     max_tokens=4096,
0479                     cache_control={"type": "ephemeral"},
0480                     system=system,
0481                     tools=self.anthropic_tools,
0482                     messages=messages,
0483                     betas=["context-management-2025-06-27"],
0484                     context_management={
0485                         "edits": [{
0486                             "type": "clear_tool_uses_20250919",
0487                             "trigger": {
0488                                 "type": "input_tokens",
0489                                 "value": 80000,
0490                             },
0491                             "keep": {"type": "tool_uses", "value": 3},
0492                         }]
0493                     },
0494                 )
0495                 logger.info(
0496                     f"Claude response: stop_reason={response.stop_reason}"
0497                 )
0498 
0499                 if response.stop_reason != "tool_use":
0500                     text_parts = [
0501                         b.text for b in response.content if b.type == "text"
0502                     ]
0503                     reply = "\n".join(text_parts)
0504                     break
0505 
0506                 messages.append(
0507                     {"role": "assistant", "content": response.content}
0508                 )
0509                 tool_results = []
0510                 for block in response.content:
0511                     if block.type != "tool_use":
0512                         continue
0513                     logger.info(f"Tool call: {block.name}({block.input})")
0514                     try:
0515                         result = await mcp.call_tool(block.name, block.input)
0516                         content = result.get("content", [])
0517                         result_text = ""
0518                         for item in content:
0519                             if isinstance(item, dict) and "text" in item:
0520                                 result_text += item["text"]
0521                         if len(result_text) > MAX_RESULT_LEN:
0522                             result_text = (
0523                                 result_text[:MAX_RESULT_LEN]
0524                                 + '\n... (truncated)'
0525                             )
0526                     except Exception as e:
0527                         logger.exception(f"MCP tool {block.name} failed")
0528                         result_text = json.dumps({"error": str(e)})
0529 
0530                     tool_results.append({
0531                         "type": "tool_result",
0532                         "tool_use_id": block.id,
0533                         "content": result_text,
0534                     })
0535                 messages.append(
0536                     {"role": "user", "content": tool_results}
0537                 )
0538             else:
0539                 reply = (
0540                     "I hit the maximum number of tool calls. "
0541                     "Please try a more specific question."
0542                 )
0543 
0544             logger.info(f"Got reply: {len(reply)} chars")
0545 
0546         except Exception:
0547             logger.exception("ask_claude failed")
0548         finally:
0549             await mcp.close()
0550 
0551         return reply