Back to home page

EIC code displayed by LXR

 
 

    


File indexing completed on 2026-04-25 08:29:10

0001 """Authentication middleware and DRF backends for MCP OAuth 2.1 and tunnel proxy."""
0002 
0003 import logging
0004 
0005 from django.conf import settings
0006 from django.contrib.auth import get_user_model
0007 from django.http import JsonResponse
0008 from rest_framework.authentication import BaseAuthentication
0009 
0010 from .auth0 import get_bearer_token, validate_token
0011 
0012 logger = logging.getLogger(__name__)
0013 
0014 LOCALHOST_IPS = {'127.0.0.1', '::1'}
0015 
0016 
0017 def _is_localhost(request):
0018     return request.META.get('REMOTE_ADDR', '') in LOCALHOST_IPS
0019 
0020 
0021 class TunnelAuthentication(BaseAuthentication):
0022     """DRF authentication backend for SSH tunnel (localhost) requests.
0023 
0024     Authenticates via X-Remote-User header on localhost requests, bypassing
0025     CSRF. Must be listed BEFORE SessionAuthentication in authentication_classes
0026     so DRF uses it first for tunnel requests and never reaches CSRF checks.
0027 
0028     Falls back to a generic 'swf-remote-proxy' user if no header is present.
0029     Returns None (skip) for non-localhost requests, letting the next backend try.
0030     """
0031 
0032     def authenticate(self, request):
0033         if not _is_localhost(request):
0034             return None
0035         User = get_user_model()
0036         remote_user = request.META.get('HTTP_X_REMOTE_USER', '').strip()
0037         if remote_user:
0038             user, created = User.objects.get_or_create(
0039                 username=remote_user,
0040                 defaults={'is_active': True},
0041             )
0042             if created:
0043                 logger.info(f"Auto-created user '{remote_user}' from tunnel proxy")
0044         else:
0045             user, _ = User.objects.get_or_create(
0046                 username='swf-remote-proxy',
0047                 defaults={'is_active': True},
0048             )
0049         return (user, None)
0050 
0051 
0052 class TunnelAuthMiddleware:
0053     """Auto-authenticate requests from localhost (SSH tunnel proxy).
0054 
0055     Must be placed after AuthenticationMiddleware in MIDDLEWARE.
0056     """
0057 
0058     def __init__(self, get_response):
0059         self.get_response = get_response
0060 
0061     def __call__(self, request):
0062         if not request.user.is_authenticated and _is_localhost(request):
0063             remote_user = request.META.get('HTTP_X_REMOTE_USER', '').strip()
0064             if remote_user:
0065                 User = get_user_model()
0066                 user, created = User.objects.get_or_create(
0067                     username=remote_user,
0068                     defaults={'is_active': True},
0069                 )
0070                 if created:
0071                     logger.info(f"Auto-created user '{remote_user}' from tunnel proxy")
0072                 request.user = user
0073             # No X-Remote-User → leave request anonymous (proxy user not logged in)
0074         return self.get_response(request)
0075 
0076 
0077 def tunnel_context(request):
0078     """Template context processor: sets is_tunnel for localhost requests."""
0079     return {'is_tunnel': _is_localhost(request)}
0080 
0081 
0082 class MCPAuthMiddleware:
0083     """
0084     Middleware for MCP endpoint authentication.
0085 
0086     Bearer token present: validate via Auth0, reject if invalid.
0087     No token: allow through (Claude Code, local clients).
0088     Non-MCP paths: pass through.
0089     """
0090 
0091     def __init__(self, get_response):
0092         self.get_response = get_response
0093 
0094     def __call__(self, request):
0095         script_name = getattr(settings, 'FORCE_SCRIPT_NAME', None) or ""
0096         mcp_path = f"{script_name}/mcp"
0097 
0098         if not (request.path == mcp_path or request.path.startswith(mcp_path + "/")):
0099             return self.get_response(request)
0100 
0101         token = get_bearer_token(request)
0102 
0103         if token:
0104             payload = validate_token(token)
0105             if payload:
0106                 request.auth0_payload = payload
0107                 request.auth0_user = payload.get("sub")
0108                 return self.get_response(request)
0109             else:
0110                 return self._unauthorized_response(request, "Invalid or expired token")
0111 
0112         # No token — allow through (Claude Code, local clients)
0113         return self.get_response(request)
0114 
0115     def _unauthorized_response(self, request, message: str):
0116         """Return 401 for invalid token."""
0117         response = JsonResponse({"error": "unauthorized", "message": message}, status=401)
0118         response["WWW-Authenticate"] = self._www_authenticate_header(request)
0119         return response
0120 
0121     def _www_authenticate_header(self, request) -> str:
0122         """Build WWW-Authenticate header."""
0123         scheme = "https" if request.is_secure() else "http"
0124         host = request.get_host()
0125         script_name = getattr(settings, 'FORCE_SCRIPT_NAME', None) or ""
0126         resource_metadata_url = f"{scheme}://{host}{script_name}/.well-known/oauth-protected-resource"
0127 
0128         return (
0129             f'Bearer realm="{settings.AUTH0_API_IDENTIFIER}", '
0130             f'resource_metadata="{resource_metadata_url}"'
0131         )