File indexing completed on 2026-04-25 08:29:10
0001 """Authentication middleware and DRF backends for MCP OAuth 2.1 and tunnel proxy."""
0002
0003 import logging
0004
0005 from django.conf import settings
0006 from django.contrib.auth import get_user_model
0007 from django.http import JsonResponse
0008 from rest_framework.authentication import BaseAuthentication
0009
0010 from .auth0 import get_bearer_token, validate_token
0011
0012 logger = logging.getLogger(__name__)
0013
0014 LOCALHOST_IPS = {'127.0.0.1', '::1'}
0015
0016
0017 def _is_localhost(request):
0018 return request.META.get('REMOTE_ADDR', '') in LOCALHOST_IPS
0019
0020
0021 class TunnelAuthentication(BaseAuthentication):
0022 """DRF authentication backend for SSH tunnel (localhost) requests.
0023
0024 Authenticates via X-Remote-User header on localhost requests, bypassing
0025 CSRF. Must be listed BEFORE SessionAuthentication in authentication_classes
0026 so DRF uses it first for tunnel requests and never reaches CSRF checks.
0027
0028 Falls back to a generic 'swf-remote-proxy' user if no header is present.
0029 Returns None (skip) for non-localhost requests, letting the next backend try.
0030 """
0031
0032 def authenticate(self, request):
0033 if not _is_localhost(request):
0034 return None
0035 User = get_user_model()
0036 remote_user = request.META.get('HTTP_X_REMOTE_USER', '').strip()
0037 if remote_user:
0038 user, created = User.objects.get_or_create(
0039 username=remote_user,
0040 defaults={'is_active': True},
0041 )
0042 if created:
0043 logger.info(f"Auto-created user '{remote_user}' from tunnel proxy")
0044 else:
0045 user, _ = User.objects.get_or_create(
0046 username='swf-remote-proxy',
0047 defaults={'is_active': True},
0048 )
0049 return (user, None)
0050
0051
0052 class TunnelAuthMiddleware:
0053 """Auto-authenticate requests from localhost (SSH tunnel proxy).
0054
0055 Must be placed after AuthenticationMiddleware in MIDDLEWARE.
0056 """
0057
0058 def __init__(self, get_response):
0059 self.get_response = get_response
0060
0061 def __call__(self, request):
0062 if not request.user.is_authenticated and _is_localhost(request):
0063 remote_user = request.META.get('HTTP_X_REMOTE_USER', '').strip()
0064 if remote_user:
0065 User = get_user_model()
0066 user, created = User.objects.get_or_create(
0067 username=remote_user,
0068 defaults={'is_active': True},
0069 )
0070 if created:
0071 logger.info(f"Auto-created user '{remote_user}' from tunnel proxy")
0072 request.user = user
0073
0074 return self.get_response(request)
0075
0076
0077 def tunnel_context(request):
0078 """Template context processor: sets is_tunnel for localhost requests."""
0079 return {'is_tunnel': _is_localhost(request)}
0080
0081
0082 class MCPAuthMiddleware:
0083 """
0084 Middleware for MCP endpoint authentication.
0085
0086 Bearer token present: validate via Auth0, reject if invalid.
0087 No token: allow through (Claude Code, local clients).
0088 Non-MCP paths: pass through.
0089 """
0090
0091 def __init__(self, get_response):
0092 self.get_response = get_response
0093
0094 def __call__(self, request):
0095 script_name = getattr(settings, 'FORCE_SCRIPT_NAME', None) or ""
0096 mcp_path = f"{script_name}/mcp"
0097
0098 if not (request.path == mcp_path or request.path.startswith(mcp_path + "/")):
0099 return self.get_response(request)
0100
0101 token = get_bearer_token(request)
0102
0103 if token:
0104 payload = validate_token(token)
0105 if payload:
0106 request.auth0_payload = payload
0107 request.auth0_user = payload.get("sub")
0108 return self.get_response(request)
0109 else:
0110 return self._unauthorized_response(request, "Invalid or expired token")
0111
0112
0113 return self.get_response(request)
0114
0115 def _unauthorized_response(self, request, message: str):
0116 """Return 401 for invalid token."""
0117 response = JsonResponse({"error": "unauthorized", "message": message}, status=401)
0118 response["WWW-Authenticate"] = self._www_authenticate_header(request)
0119 return response
0120
0121 def _www_authenticate_header(self, request) -> str:
0122 """Build WWW-Authenticate header."""
0123 scheme = "https" if request.is_secure() else "http"
0124 host = request.get_host()
0125 script_name = getattr(settings, 'FORCE_SCRIPT_NAME', None) or ""
0126 resource_metadata_url = f"{scheme}://{host}{script_name}/.well-known/oauth-protected-resource"
0127
0128 return (
0129 f'Bearer realm="{settings.AUTH0_API_IDENTIFIER}", '
0130 f'resource_metadata="{resource_metadata_url}"'
0131 )