Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-06-04 08:56:26

0001 #!/usr/bin/env python3
0002 """Parity smoke probe for the swf-monitor FastMCP migration.
0003 
0004 Compares two MCP endpoints (live django-mcp-server vs FastMCP candidate)
0005 and asserts the checks listed in docs/MCP_FASTMCP_MIGRATION_PLAN.md
0006 "Parity Checks (must all pass before Phase 2 cutover)".
0007 
0008 Example:
0009     scripts/mcp_migration_smoke.py \\
0010         --live-url http://127.0.0.1:8001/swf-monitor/mcp/ \\
0011         --candidate-url http://127.0.0.1:8013/swf-monitor/mcp/ \\
0012         --candidate-token "$MCP_BEARER_TOKEN" \\
0013         --check-against-settings
0014 
0015 Exits non-zero on any failure. Live and candidate take per-endpoint auth
0016 flags (--live-token / --candidate-token); pass an empty string for
0017 unauthenticated endpoints. The plan deliberately does NOT support
0018 disabling auth on the candidate during the parity window -- a temporary
0019 auth bypass is exactly the kind of code we don't want shipped.
0020 """
0021 
0022 from __future__ import annotations
0023 
0024 import argparse
0025 import json
0026 import sys
0027 import urllib.error
0028 import urllib.request
0029 from urllib.parse import urlparse
0030 
0031 
0032 def _request(url, method="POST", payload=None, token=None, timeout=10.0):
0033     headers = {
0034         "accept": "application/json, text/event-stream",
0035         "user-agent": "mcp-migration-smoke/1.0",
0036     }
0037     data = None
0038     if payload is not None:
0039         headers["content-type"] = "application/json"
0040         data = json.dumps(payload).encode("utf-8")
0041     if token:
0042         headers["authorization"] = f"Bearer {token}"
0043     req = urllib.request.Request(url, data=data, headers=headers, method=method)
0044     try:
0045         with urllib.request.urlopen(req, timeout=timeout) as response:
0046             body = response.read().decode("utf-8")
0047             try:
0048                 return response.status, json.loads(body) if body else None
0049             except json.JSONDecodeError:
0050                 return response.status, {"_raw": body}
0051     except urllib.error.HTTPError as exc:
0052         body = exc.read().decode("utf-8")
0053         try:
0054             return exc.code, json.loads(body)
0055         except json.JSONDecodeError:
0056             return exc.code, {"_raw": body}
0057 
0058 
0059 def mcp_initialize(url, token, timeout):
0060     status, body = _request(url, payload={
0061         "jsonrpc": "2.0", "id": 1, "method": "initialize",
0062         "params": {
0063             "protocolVersion": "2025-06-18",
0064             "capabilities": {},
0065             "clientInfo": {"name": "mcp-migration-smoke", "version": "1.0"},
0066         },
0067     }, token=token, timeout=timeout)
0068     if status != 200 or not body or "result" not in body:
0069         raise RuntimeError(f"initialize {url}: HTTP {status} body={body!r}")
0070     return body["result"]
0071 
0072 
0073 def mcp_tools_list(url, token, timeout):
0074     status, body = _request(url, payload={
0075         "jsonrpc": "2.0", "id": 2, "method": "tools/list",
0076     }, token=token, timeout=timeout)
0077     if status != 200 or not body or "result" not in body:
0078         raise RuntimeError(f"tools/list {url}: HTTP {status} body={body!r}")
0079     return body["result"]["tools"]
0080 
0081 
0082 def mcp_call(url, token, name, args, timeout):
0083     """tools/call returning the concatenated text of all content blocks.
0084 
0085     Single-block tools (return a str/dict) get block[0]'s text. Multi-block
0086     tools (return a list) — FastMCP and django-mcp-server both split each
0087     element into its own block — get their texts concatenated.
0088     """
0089     status, body = _request(url, payload={
0090         "jsonrpc": "2.0", "id": 3, "method": "tools/call",
0091         "params": {"name": name, "arguments": args},
0092     }, token=token, timeout=timeout)
0093     if status != 200 or not body or "result" not in body:
0094         raise RuntimeError(f"tools/call {name} {url}: HTTP {status} body={body!r}")
0095     content = body["result"].get("content") or []
0096     return [block.get("text", "") for block in content]
0097 
0098 
0099 def _load_settings_instructions():
0100     """Import Django settings and return MCP_SERVER_INSTRUCTIONS."""
0101     import os
0102     repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
0103     sys.path.insert(0, os.path.join(repo_root, "src"))
0104     os.environ.setdefault("DJANGO_SETTINGS_MODULE", "swf_monitor_project.settings")
0105     import django
0106     django.setup()
0107     from django.conf import settings
0108     return settings.MCP_SERVER_NAME, settings.MCP_SERVER_INSTRUCTIONS
0109 
0110 
0111 def main():
0112     parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
0113     parser.add_argument("--live-url", required=True)
0114     parser.add_argument("--candidate-url", required=True)
0115     parser.add_argument("--live-token", default="")
0116     parser.add_argument("--candidate-token", default="")
0117     parser.add_argument("--expected-name", default="swf-testbed")
0118     parser.add_argument("--timeout", type=float, default=10.0)
0119     parser.add_argument("--check-against-settings", action="store_true",
0120                         help="Load Django settings and require candidate "
0121                              "serverInfo to byte-match MCP_SERVER_NAME / "
0122                              "MCP_SERVER_INSTRUCTIONS.")
0123     args = parser.parse_args()
0124 
0125     failures: list[str] = []
0126 
0127     def fail(msg: str):
0128         failures.append(msg)
0129         print(f"  [FAIL] {msg}")
0130 
0131     def ok(msg: str):
0132         print(f"  [OK]   {msg}")
0133 
0134     print(f"Live      : {args.live_url}")
0135     print(f"Candidate : {args.candidate_url}")
0136     print()
0137 
0138     print("== initialize handshake ==")
0139     live_init = mcp_initialize(args.live_url, args.live_token, args.timeout)
0140     cand_init = mcp_initialize(args.candidate_url, args.candidate_token, args.timeout)
0141     live_si = live_init.get("serverInfo", {})
0142     cand_si = cand_init.get("serverInfo", {})
0143     live_instr = live_init.get("instructions") or ""
0144     cand_instr = cand_init.get("instructions") or ""
0145     print(f"  live  serverInfo={live_si} instructions={len(live_instr)} chars")
0146     print(f"  cand  serverInfo={cand_si} instructions={len(cand_instr)} chars")
0147     print()
0148 
0149     # Check 3: candidate serverInfo.name
0150     print("== #3 candidate serverInfo.name ==")
0151     if cand_si.get("name") != args.expected_name:
0152         fail(f"#3 candidate serverInfo.name={cand_si.get('name')!r} expected {args.expected_name!r}")
0153     else:
0154         ok(f"#3 candidate serverInfo.name == {args.expected_name!r}")
0155     print()
0156 
0157     # Check 4: candidate instructions vs settings
0158     print("== #4 candidate serverInfo.instructions ==")
0159     if args.check_against_settings:
0160         try:
0161             expected_name, expected_instr = _load_settings_instructions()
0162         except Exception as exc:
0163             fail(f"#4 could not load Django settings: {exc}")
0164         else:
0165             if expected_name != args.expected_name:
0166                 fail(f"#4 settings.MCP_SERVER_NAME={expected_name!r} != --expected-name {args.expected_name!r}")
0167             if cand_instr != expected_instr:
0168                 fail(f"#4 candidate instructions != settings.MCP_SERVER_INSTRUCTIONS "
0169                      f"(lens {len(cand_instr)} vs {len(expected_instr)})")
0170             else:
0171                 ok(f"#4 candidate instructions byte-equal to settings.MCP_SERVER_INSTRUCTIONS "
0172                    f"({len(expected_instr)} chars)")
0173     else:
0174         if cand_instr != live_instr:
0175             fail(f"#4 candidate instructions != live instructions "
0176                  f"(lens {len(cand_instr)} vs {len(live_instr)}); "
0177                  f"pass --check-against-settings for canonical comparison")
0178         else:
0179             ok(f"#4 candidate instructions byte-equal to live instructions "
0180                f"({len(cand_instr)} chars)")
0181     print()
0182 
0183     # Check 5: candidate get_server_instructions
0184     print("== #5 candidate tools/call get_server_instructions ==")
0185     cand_gsi_blocks = mcp_call(args.candidate_url, args.candidate_token,
0186                               "get_server_instructions", {}, args.timeout)
0187     cand_gsi = cand_gsi_blocks[0] if cand_gsi_blocks else ""
0188     if cand_gsi != cand_instr:
0189         fail(f"#5 candidate get_server_instructions != serverInfo.instructions "
0190              f"(lens {len(cand_gsi)} vs {len(cand_instr)})")
0191     else:
0192         ok(f"#5 candidate get_server_instructions byte-equal to serverInfo.instructions")
0193     print()
0194 
0195     # Check 1: tool name set equality
0196     print("== #1 tool name set equality ==")
0197     live_tools = mcp_tools_list(args.live_url, args.live_token, args.timeout)
0198     cand_tools = mcp_tools_list(args.candidate_url, args.candidate_token, args.timeout)
0199     live_names = {t["name"] for t in live_tools}
0200     cand_names = {t["name"] for t in cand_tools}
0201     only_live = sorted(live_names - cand_names)
0202     only_cand = sorted(cand_names - live_names)
0203     if only_live or only_cand:
0204         fail(f"#1 tool name sets diverge — "
0205              f"live-only: {only_live}, candidate-only: {only_cand}")
0206     else:
0207         ok(f"#1 tool name sets equal ({len(live_names)} tools)")
0208     print()
0209 
0210     # Check 2: schema equivalence (parameter sets + required flags)
0211     print("== #2 inputSchema parameter-set and required equality ==")
0212     live_by_name = {t["name"]: t for t in live_tools}
0213     cand_by_name = {t["name"]: t for t in cand_tools}
0214     shared = sorted(live_names & cand_names)
0215     schema_diffs = []
0216     for name in shared:
0217         ls = live_by_name[name].get("inputSchema") or {}
0218         cs = cand_by_name[name].get("inputSchema") or {}
0219         l_props = set((ls.get("properties") or {}).keys())
0220         c_props = set((cs.get("properties") or {}).keys())
0221         l_req = set(ls.get("required") or [])
0222         c_req = set(cs.get("required") or [])
0223         if l_props != c_props or l_req != c_req:
0224             schema_diffs.append({
0225                 "tool": name,
0226                 "live_props_only": sorted(l_props - c_props),
0227                 "cand_props_only": sorted(c_props - l_props),
0228                 "live_required_only": sorted(l_req - c_req),
0229                 "cand_required_only": sorted(c_req - l_req),
0230             })
0231     if schema_diffs:
0232         fail(f"#2 inputSchema diverges on {len(schema_diffs)} tools:")
0233         for d in schema_diffs[:10]:
0234             print(f"      {d}")
0235         if len(schema_diffs) > 10:
0236             print(f"      ... and {len(schema_diffs) - 10} more")
0237     else:
0238         ok(f"#2 inputSchema parameter-set + required equal on {len(shared)} shared tools")
0239     print()
0240 
0241     # Check 7: swf_list_available_tools parity with tools/list (candidate).
0242     # Returns a list of dicts; FastMCP and django-mcp-server both split a
0243     # list return into one content block per element, so parse each block.
0244     print("== #7 swf_list_available_tools parity (candidate) ==")
0245     sla_blocks = mcp_call(args.candidate_url, args.candidate_token,
0246                           "swf_list_available_tools", {}, args.timeout)
0247     sla_names = set()
0248     parse_errors = 0
0249     for block in sla_blocks:
0250         try:
0251             obj = json.loads(block)
0252         except json.JSONDecodeError:
0253             parse_errors += 1
0254             continue
0255         if isinstance(obj, dict) and "name" in obj:
0256             sla_names.add(obj["name"])
0257         elif isinstance(obj, list):
0258             for item in obj:
0259                 if isinstance(item, dict) and "name" in item:
0260                     sla_names.add(item["name"])
0261     if parse_errors:
0262         fail(f"#7 swf_list_available_tools: {parse_errors} content blocks "
0263              f"failed to parse as JSON")
0264     if not sla_names:
0265         fail(f"#7 swf_list_available_tools returned no recognizable names "
0266              f"({len(sla_blocks)} blocks)")
0267     else:
0268         only_sla = sorted(sla_names - cand_names)
0269         only_cn = sorted(cand_names - sla_names)
0270         if only_sla or only_cn:
0271             fail(f"#7 discoverer != tools/list — "
0272                  f"discoverer-only: {only_sla}, tools-only: {only_cn}")
0273         else:
0274             ok(f"#7 swf_list_available_tools set == tools/list set "
0275                f"({len(sla_names)} tools)")
0276     print()
0277 
0278     # Check 6: candidate auth matrix (only if a token is configured)
0279     print("== #6 candidate auth matrix ==")
0280     if not args.candidate_token:
0281         print(f"  [SKIP] #6 — no --candidate-token configured")
0282     else:
0283         # 6a: GET → 405
0284         get_status, _ = _request(args.candidate_url, method="GET",
0285                                  token=args.candidate_token,
0286                                  timeout=args.timeout)
0287         if get_status != 405:
0288             fail(f"#6a candidate GET status {get_status} expected 405")
0289         else:
0290             ok(f"#6a candidate GET returns 405")
0291         # 6b: POST no auth → 401
0292         s, _ = _request(args.candidate_url, payload={
0293             "jsonrpc": "2.0", "id": 99, "method": "tools/list",
0294         }, token=None, timeout=args.timeout)
0295         if s != 401:
0296             fail(f"#6b candidate POST no-auth status {s} expected 401")
0297         else:
0298             ok(f"#6b candidate POST no-auth returns 401")
0299         # 6c: POST bad token → 403
0300         s, _ = _request(args.candidate_url, payload={
0301             "jsonrpc": "2.0", "id": 99, "method": "tools/list",
0302         }, token="wrong-token-for-smoke", timeout=args.timeout)
0303         if s != 403:
0304             fail(f"#6c candidate POST wrong-token status {s} expected 403")
0305         else:
0306             ok(f"#6c candidate POST wrong-token returns 403")
0307         # 6d: /health no auth → 200
0308         parsed = urlparse(args.candidate_url)
0309         health_url = f"{parsed.scheme}://{parsed.netloc}/health"
0310         s, body = _request(health_url, method="GET", timeout=args.timeout)
0311         if s != 200 or not isinstance(body, dict) or body.get("status") != "ok":
0312             fail(f"#6d candidate {health_url}: status {s} body {body!r}")
0313         else:
0314             ok(f"#6d candidate /health returns 200 {{'status':'ok'}}")
0315     print()
0316 
0317     if failures:
0318         print(f"\n{len(failures)} parity check(s) FAILED.")
0319         return 1
0320     print("All parity checks passed.")
0321     return 0
0322 
0323 
0324 if __name__ == "__main__":
0325     sys.exit(main())