Skip to content

Middleware

JWT bearer middleware. Add to any FastAPI / Starlette app to gate every request behind OIDC token validation.

JwtAuthMiddleware

JwtAuthMiddleware(
    app,
    *,
    issuer_url: str,
    current_user: ContextVar[dict | None],
    server_base_url: str,
    open_paths: tuple[str, ...] = (),
)

Bases: BaseHTTPMiddleware

JWT bearer middleware compatible with app.add_middleware().

Validates the Authorization: Bearer <token> header on every non-open request using OIDC JWKS discovery. Works with any standard OIDC provider (Keycloak, Okta, Entra ID, Duende, Auth0, …).

Parameters:

Name Type Description Default
issuer_url str

Base URL of the OIDC issuer, e.g. "http://localhost:8889/realms/mcp-poc5" or "https://login.microsoftonline.com/{tenant}/v2.0".

required
current_user ContextVar[dict | None]

ContextVar populated with the verified JWT claims dict on each authenticated request.

required
server_base_url str

Used to build the WWW-Authenticate realm / resource-metadata URIs.

required
open_paths tuple[str, ...]

Tuple of path prefixes that bypass authentication (browser redirects, health checks, well-known endpoints, provider callbacks, etc.).

()

Initialise the middleware; see class docstring for parameter descriptions.

Source code in mcpauthkit/auth_middleware.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def __init__(
    self,
    app,
    *,
    issuer_url: str,
    current_user: ContextVar[dict | None],
    server_base_url: str,
    open_paths: tuple[str, ...] = (),
) -> None:
    """Initialise the middleware; see class docstring for parameter descriptions."""
    super().__init__(app)
    self._issuer_url = issuer_url
    self._current_user = current_user
    self._base = server_base_url.rstrip("/")
    self._open_paths = open_paths

dispatch async

dispatch(request: Request, call_next) -> Response

Process a single request: validate the Bearer token or pass through open paths.

Returns a 401 Unauthorized JSON response (with a standards-compliant WWW-Authenticate: Bearer header) when the token is absent, malformed, has the wrong issuer, or carries an invalid signature. Returns a 401 with error=invalid_token when the token has expired, so the client can use its refresh token.

Parameters:

Name Type Description Default
request Request

The incoming Starlette / FastAPI request.

required
call_next

The next ASGI handler in the middleware chain.

required
Source code in mcpauthkit/auth_middleware.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
async def dispatch(self, request: Request, call_next) -> Response:
    """Process a single request: validate the Bearer token or pass through open paths.

    Returns a ``401 Unauthorized`` JSON response (with a standards-compliant
    ``WWW-Authenticate: Bearer`` header) when the token is absent, malformed,
    has the wrong issuer, or carries an invalid signature.  Returns a
    ``401`` with ``error=invalid_token`` when the token has expired, so
    the client can use its refresh token.

    Parameters
    ----------
    request
        The incoming Starlette / FastAPI request.
    call_next
        The next ASGI handler in the middleware chain.
    """
    logger.debug(
        "→ %s %s  auth=%s  open=%s",
        request.method,
        request.url.path,
        bool(request.headers.get("Authorization")),
        self._is_open(request.url.path),
    )

    if request.method == "OPTIONS" or self._is_open(request.url.path):
        return cast(Response, await call_next(request))

    auth_header = request.headers.get("Authorization", "")
    if not auth_header.startswith("Bearer "):
        logger.debug("→ no/bad Bearer → 401")
        return self._unauthorized()

    token = auth_header[len("Bearer ") :]
    claims, fail_reason = await validate_jwt(token, self._issuer_url)
    if claims is None:
        logger.debug("→ JWT invalid (reason=%s) → 401", fail_reason)
        return (
            self._token_expired()
            if fail_reason is JwtFailReason.EXPIRED
            else self._unauthorized()
        )

    sub = claims.get("sub") or claims.get("preferred_username", "unknown")
    logger.info(
        "Authenticated: sub=%s preferred_username=%s",
        sub,
        claims.get("preferred_username"),
    )
    self._current_user.set(
        {
            "sub": sub,
            "preferred_username": claims.get("preferred_username"),
            "email": claims.get("email"),
            "name": claims.get("name"),
            "iss": claims.get("iss"),
            "exp": claims.get("exp"),
        }
    )
    return cast(Response, await call_next(request))