Skip to content

OAuthProvider

Tool-level OAuth 2.0 Authorization Code flow via MCP elicitation.

OAuthProvider

OAuthProvider(
    name: str,
    build_auth_url: Callable[[str, str], str],
    exchange_code: Callable[
        ..., Coroutine[Any, Any, ExchangeResult]
    ],
    redirect_uri: str,
    user_context: ContextVar[dict | None],
    token_store: TokenStore | None = None,
    pending_store: PendingStore | None = None,
    refresh_token_fn: Callable[
        ..., Coroutine[Any, Any, ExchangeResult]
    ]
    | None = None,
    token_timeout: float = 120.0,
)

Generic MCP OAuth elicitation provider (MCP spec 2025-11-25).

Gates MCP tools behind a third-party OAuth flow. Handles the complete token lifecycle: first-time elicitation, expiry checking, silent refresh, and reactive invalidation.

Token lifecycle
  1. First call — no token → URL mode elicitation → browser OAuth → callback → token stored → tool proceeds.
  2. Next calls — token valid → tool proceeds immediately.
  3. Expiry known (expires_in provided) — silent refresh via refresh_token_fn if available; else re-elicitation.
  4. Revocation — tool detects API 401, calls await provider.invalidate_token(sub) and returns an error string → next call triggers re-elicitation.
Elicitation modes

require_token(fail_fast=False) [default] Calls ctx.elicit_url(); the tool call stays open and waits for the OAuth callback via PendingStore. After the callback fires, send_elicit_complete notifies the client (spec §3.4).

require_token(fail_fast=True) Raises UrlElicitationRequiredError (JSON-RPC -32042, spec §3.5) immediately. The client retries the tool call after the OAuth flow.

Parameters:

Name Type Description Default
name str
required
build_auth_url Callable[[str, str], str]
required
exchange_code Callable[..., Coroutine[Any, Any, ExchangeResult]]
required
redirect_uri str
required
user_context ContextVar[dict | None]
required
token_store TokenStore | None
         built by ``create_stores()`` from current env vars.
None
pending_store PendingStore | None
         to the store built by ``create_stores()`` from env vars.
None
refresh_token_fn Callable[..., Coroutine[Any, Any, ExchangeResult]] | None
None
token_timeout float
120.0

Initialise the provider; see class docstring for parameter descriptions.

Source code in mcpauthkit/providers/oauth_provider.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def __init__(
    self,
    name: str,
    build_auth_url: Callable[[str, str], str],
    exchange_code: Callable[..., Coroutine[Any, Any, ExchangeResult]],
    redirect_uri: str,
    user_context: ContextVar[dict | None],
    token_store: TokenStore | None = None,
    pending_store: PendingStore | None = None,
    refresh_token_fn: Callable[..., Coroutine[Any, Any, ExchangeResult]] | None = None,
    token_timeout: float = 120.0,
) -> None:
    """Initialise the provider; see class docstring for parameter descriptions."""
    self.name = name
    self.callback_path = urlparse(redirect_uri).path

    self._build_auth_url = build_auth_url
    self._exchange_code = exchange_code
    self._redirect_uri = redirect_uri
    self._user_context = user_context
    self._refresh_token_fn = refresh_token_fn
    self._token_timeout = token_timeout

    # Stores — lazy-init from env vars if not injected
    if token_store is not None and pending_store is not None:
        self._token_store: TokenStore = token_store
        self._pending_store: PendingStore = pending_store
    else:
        from ..store.factory import create_stores

        ts, ps = create_stores(namespace=name)
        self._token_store = token_store if token_store is not None else ts
        self._pending_store = pending_store if pending_store is not None else ps

    # In-process only: state → {session, elicitation_id}
    # Never serialised — used to call send_elicit_complete on the right instance.
    self._sessions: dict[str, dict[str, Any]] = {}

    # Per-request token accessor (set by @require_token decorator)
    self._current_token: ContextVar[str | None] = ContextVar(
        f"elicit_oauth_{name}_token", default=None
    )

from_standard_oauth2 classmethod

from_standard_oauth2(
    *,
    name: str,
    authorization_url: str,
    token_url: str,
    client_id: str,
    client_secret: str,
    scope: str,
    redirect_uri: str,
    user_context: ContextVar[dict | None],
    token_store: TokenStore | None = None,
    pending_store: PendingStore | None = None,
    refresh_token_fn: Callable[
        ..., Coroutine[Any, Any, ExchangeResult]
    ]
    | None = None,
    token_timeout: float = 120.0,
    http_verify: bool | SSLContext | str = True,
) -> OAuthProvider

Convenience factory for any standard OAuth2 Authorization Code provider (GitHub, Google, Jira, Entra, etc.).

Builds build_auth_url and exchange_code internally from standard OAuth2 endpoints so the caller only needs to supply configuration::

github = OAuthProvider.from_standard_oauth2(
    name="github",
    authorization_url="https://github.com/login/oauth/authorize",
    token_url="https://github.com/login/oauth/access_token",
    client_id=settings.github_client_id,
    client_secret=settings.github_client_secret,
    scope="read:user repo",
    redirect_uri="http://localhost:8005/github/callback",
    user_context=current_user,
    http_verify=_SSL_CTX,
)

Parameters:

Name Type Description Default
authorization_url str

Full URL of the provider's authorization endpoint.

required
token_url str

Full URL of the provider's token endpoint.

required
client_id str

OAuth2 client ID.

required
client_secret str

OAuth2 client secret.

required
scope str

Space-separated scope string.

required
http_verify bool | SSLContext | str

Passed as verify= to httpx for the token exchange request.

True
token_store TokenStore | None

Optional persistent store override.

None
pending_store PendingStore | None

Optional pending store override.

None
name str

Same as OAuthProvider.__init__.

required
redirect_uri str

Same as OAuthProvider.__init__.

required
user_context str

Same as OAuthProvider.__init__.

required
refresh_token_fn str

Same as OAuthProvider.__init__.

required
token_timeout str

Same as OAuthProvider.__init__.

required
Source code in mcpauthkit/providers/oauth_provider.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@classmethod
def from_standard_oauth2(
    cls,
    *,
    name: str,
    authorization_url: str,
    token_url: str,
    client_id: str,
    client_secret: str,
    scope: str,
    redirect_uri: str,
    user_context: ContextVar[dict | None],
    token_store: TokenStore | None = None,
    pending_store: PendingStore | None = None,
    refresh_token_fn: Callable[..., Coroutine[Any, Any, ExchangeResult]] | None = None,
    token_timeout: float = 120.0,
    http_verify: bool | ssl.SSLContext | str = True,
) -> OAuthProvider:
    """
    Convenience factory for any standard OAuth2 Authorization Code provider
    (GitHub, Google, Jira, Entra, etc.).

    Builds ``build_auth_url`` and ``exchange_code`` internally from standard
    OAuth2 endpoints so the caller only needs to supply configuration::

        github = OAuthProvider.from_standard_oauth2(
            name="github",
            authorization_url="https://github.com/login/oauth/authorize",
            token_url="https://github.com/login/oauth/access_token",
            client_id=settings.github_client_id,
            client_secret=settings.github_client_secret,
            scope="read:user repo",
            redirect_uri="http://localhost:8005/github/callback",
            user_context=current_user,
            http_verify=_SSL_CTX,
        )

    Parameters
    ----------
    authorization_url
        Full URL of the provider's authorization endpoint.
    token_url
        Full URL of the provider's token endpoint.
    client_id
        OAuth2 client ID.
    client_secret
        OAuth2 client secret.
    scope
        Space-separated scope string.
    http_verify
        Passed as ``verify=`` to httpx for the token exchange request.
    token_store
        Optional persistent store override.
    pending_store
        Optional pending store override.
    name, redirect_uri, user_context, refresh_token_fn, token_timeout
        Same as ``OAuthProvider.__init__``.
    """

    def _build_auth_url(state: str, redir: str) -> str:
        return (
            authorization_url
            + "?"
            + urlencode(
                {
                    "client_id": client_id,
                    "redirect_uri": redir,
                    "scope": scope,
                    "state": state,
                    "response_type": "code",
                }
            )
        )

    async def _exchange_code(code: str, state: str, redir: str) -> ExchangeResult:
        async with httpx.AsyncClient(
            timeout=15, follow_redirects=False, verify=http_verify
        ) as client:
            resp = await client.post(
                token_url,
                data={
                    "client_id": client_id,
                    "client_secret": client_secret,
                    "code": code,
                    "redirect_uri": redir,
                    "grant_type": "authorization_code",
                },
                headers={"Accept": "application/json"},
            )
        if resp.status_code != 200:
            logger.error(
                "%s token exchange failed HTTP %s: %s",
                name,
                resp.status_code,
                resp.text[:300],
            )
            return None
        data = resp.json()
        if not data.get("access_token"):
            logger.error("%s token exchange: no access_token in response: %s", name, data)
            return None
        return cast(dict[str, Any], data)

    return cls(
        name=name,
        build_auth_url=_build_auth_url,
        exchange_code=_exchange_code,
        redirect_uri=redirect_uri,
        user_context=user_context,
        token_store=token_store,
        pending_store=pending_store,
        refresh_token_fn=refresh_token_fn,
        token_timeout=token_timeout,
    )

get_token

get_token() -> str | None

Return the access token for the current tool invocation. Only meaningful inside a @require_token()-decorated function.

Source code in mcpauthkit/providers/oauth_provider.py
311
312
313
314
def get_token(self) -> str | None:
    """Return the access token for the current tool invocation.
    Only meaningful inside a @require_token()-decorated function."""
    return self._current_token.get()

invalidate_token async

invalidate_token(sub: str) -> None

Remove the cached token for a user, forcing re-elicitation on the next tool invocation.

Call this when the downstream API returns 401::

token = provider.get_token()
resp = await _api_get("/path", token)
if resp.status_code == 401:
    await provider.invalidate_token(current_user.get()["sub"])
    return "Authorization expired — please retry."
Source code in mcpauthkit/providers/oauth_provider.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
async def invalidate_token(self, sub: str) -> None:
    """
    Remove the cached token for a user, forcing re-elicitation on the
    next tool invocation.

    Call this when the downstream API returns 401::

        token = provider.get_token()
        resp = await _api_get("/path", token)
        if resp.status_code == 401:
            await provider.invalidate_token(current_user.get()["sub"])
            return "Authorization expired — please retry."
    """
    await self._token_store.delete(sub)
    logger.info("%s token invalidated for sub='%s'", self.name, sub)

require_token

require_token(*, fail_fast: bool = False) -> Callable

Decorator factory that gates an async MCP tool behind OAuth.

Apply AFTER @mcp.tool()::

@mcp.tool(description="...")
@provider.require_token()
async def my_tool(ctx: Context, arg: str) -> str:
    token = provider.get_token()   # guaranteed non-None here
    ...

Parameters:

Name Type Description Default
fail_fast bool

False (default): tool call stays open during the OAuth flow. True: raises UrlElicitationRequiredError; client must retry.

False
Source code in mcpauthkit/providers/oauth_provider.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def require_token(self, *, fail_fast: bool = False) -> Callable:
    """
    Decorator factory that gates an async MCP tool behind OAuth.

    Apply AFTER @mcp.tool()::

        @mcp.tool(description="...")
        @provider.require_token()
        async def my_tool(ctx: Context, arg: str) -> str:
            token = provider.get_token()   # guaranteed non-None here
            ...

    Parameters
    ----------
    fail_fast
        False (default): tool call stays open during the OAuth flow.
        True: raises UrlElicitationRequiredError; client must retry.
    """

    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(ctx: Context, *args: Any, **kwargs: Any) -> Any:
            user = self._user_context.get()
            if user is None:
                return "Error: no authenticated user context."

            sub = user.get("sub", "")
            username = user.get("preferred_username", sub)
            logger.debug("%s require_token: sub=%r fail_fast=%s", self.name, sub, fail_fast)

            if fail_fast:
                token = await self._ensure_token_fail_fast(ctx, sub, username)
            else:
                token = await self._ensure_token_blocking(ctx, sub, username)

            if token is None:
                return (
                    f"{self.name.capitalize()} authorization was cancelled "
                    "or timed out. Please try again."
                )

            reset = self._current_token.set(token)
            try:
                return await func(ctx, *args, **kwargs)
            finally:
                self._current_token.reset(reset)

        return wrapper

    return decorator

register

register(app: FastAPI) -> None

Register the OAuth callback GET route on a FastAPI app.

Call this before mounting the MCP sub-app (app.mount("/", ...)). The callback path must also be in open_paths so the auth middleware does not reject the unauthenticated redirect. Use provider.callback_path to reference the path dynamically.

Source code in mcpauthkit/providers/oauth_provider.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def register(self, app: FastAPI) -> None:
    """
    Register the OAuth callback GET route on a FastAPI app.

    Call this before mounting the MCP sub-app (app.mount("/", ...)).
    The callback path must also be in open_paths so the auth middleware
    does not reject the unauthenticated redirect.  Use
    ``provider.callback_path`` to reference the path dynamically.
    """
    provider = self

    @app.get(provider.callback_path, include_in_schema=False)
    async def _oauth_callback(
        request: Request,
        code: str | None = None,
        state: str | None = None,
        error: str | None = None,
        error_description: str | None = None,
    ):
        name = provider.name.capitalize()

        if error:
            logger.warning("%s callback error: %s%s", name, error, error_description)
            await provider._fail_pending(state)
            return HTMLResponse(
                _jinja.get_template("oauth_error.html").render(
                    provider_name=name,
                    error=error,
                    error_description=error_description or "",
                ),
                status_code=400,
            )

        if not code or not state:
            return JSONResponse(status_code=400, content={"error": "invalid_request"})

        sub = await provider._handle_callback(code, state)
        if sub is None:
            return HTMLResponse(
                _jinja.get_template("oauth_error.html").render(
                    provider_name=name,
                    error="Code exchange failed",
                    error_description="Could not exchange the authorization code.",
                ),
                status_code=400,
            )

        return HTMLResponse(
            _jinja.get_template("oauth_success.html").render(
                provider_name=name,
                sub=sub,
            )
        )

Helper types

_parse_token_data

_parse_token_data(
    result: ExchangeResult, stored_at: float
) -> TokenData | None

Normalise an exchange_code / refresh_token_fn result into a TokenData entry.

Source code in mcpauthkit/providers/oauth_provider.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def _parse_token_data(result: ExchangeResult, stored_at: float) -> TokenData | None:
    """Normalise an exchange_code / refresh_token_fn result into a TokenData entry."""
    if result is None:
        return None
    if isinstance(result, str):
        return {"access_token": result, "stored_at": stored_at}
    if isinstance(result, dict):
        access = result.get("access_token")
        if not access:
            return None
        entry: TokenData = {"access_token": access, "stored_at": stored_at}
        if rt := result.get("refresh_token"):
            entry["refresh_token"] = rt
        if ei := result.get("expires_in"):
            entry["expires_at"] = stored_at + float(ei)
        return entry
    return None