Skip to content

CredentialsProvider

Tool-level PAT / API key collection via a self-hosted HTML form.

CredentialsProvider

CredentialsProvider(
    name: str,
    variables: dict[str, VariableDef],
    user_context: ContextVar[dict | None],
    server_base_url: str,
    creds_store: TokenStore | None = None,
    pending_store: PendingStore | None = None,
    doc: str | None = None,
    token_timeout: float = 300.0,
)

MCP elicitation-based credential provider for PATs and API keys.

Serves an internal HTML page (auto-generated from variables) where users enter their credentials. The page optionally renders a Markdown how-to guide above the form.

Unlike OAuthProvider, there is no external redirect: the MCP server itself is the destination of the URL mode elicitation.

Parameters:

Name Type Description Default
name str

Short identifier used in route paths and log messages (e.g. "confluence", "jira").

required
variables dict[str, VariableDef]

Ordered dict of credential field definitions::

{
    "pat": {
        "label": "Personal Access Token",
        "type": "password",      # string | password | url | textarea
        "hint": "e.g. ATBBxyz...",
        "required": True,
    }
}
required
user_context ContextVar[dict | None]

ContextVar[Optional[dict]] set by your auth middleware (same one used by OAuthProvider).

required
server_base_url str

Full base URL of the MCP server, e.g. "http://localhost:8005". Used to build the entry URL sent via elicitation.

required
creds_store TokenStore | None

Persistent store for credentials keyed by OIDC sub. Defaults to the store built by create_stores() from current env vars.

None
pending_store PendingStore | None

Ephemeral store for in-flight form sessions. Defaults to the store built by create_stores() from current env vars.

None
doc str | None

Optional path to a Markdown file rendered above the credential form.

None
token_timeout float

Seconds to wait for the user to submit credentials. Default: 300.

300.0

Initialise the provider; see class docstring for parameter descriptions.

Source code in mcpauthkit/providers/credentials_provider.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def __init__(
    self,
    name: str,
    variables: dict[str, VariableDef],
    user_context: ContextVar[dict | None],
    server_base_url: str,
    creds_store: TokenStore | None = None,
    pending_store: PendingStore | None = None,
    doc: str | None = None,
    token_timeout: float = 300.0,
) -> None:
    """Initialise the provider; see class docstring for parameter descriptions."""
    self.name = name
    self.open_paths = [
        f"/credentials/{name}/entry",
        f"/credentials/{name}/submit",
    ]

    self._variables = variables
    self._user_context = user_context
    self._server_base_url = server_base_url.rstrip("/")
    self._token_timeout = token_timeout
    self._doc_md: str | None = _load_doc(doc)

    # Stores — lazy-init from env vars if not injected
    if creds_store is not None and pending_store is not None:
        self._creds_store: TokenStore = creds_store
        self._pending_store: PendingStore = pending_store
    else:
        from ..store.factory import create_stores

        ts, ps = create_stores(namespace=name)
        self._creds_store = creds_store if creds_store is not None else ts
        self._pending_store = pending_store if pending_store is not None else ps

    # In-process only: entry_token → {session, elicitation_id}
    self._sessions: dict[str, dict[str, Any]] = {}

    # Per-request credentials (set by @require_credentials decorator)
    self._current_creds: ContextVar[dict[str, str] | None] = ContextVar(
        f"credentials_{name}", default=None
    )

get_credentials

get_credentials() -> dict[str, str] | None

Return the credentials dict for the current tool invocation. Only meaningful inside a @require_credentials()-decorated function.

Source code in mcpauthkit/providers/credentials_provider.py
194
195
196
197
def get_credentials(self) -> dict[str, str] | None:
    """Return the credentials dict for the current tool invocation.
    Only meaningful inside a @require_credentials()-decorated function."""
    return self._current_creds.get()

invalidate_credentials async

invalidate_credentials(sub: str) -> None

Force re-collection on the user's next tool invocation.

Source code in mcpauthkit/providers/credentials_provider.py
199
200
201
202
async def invalidate_credentials(self, sub: str) -> None:
    """Force re-collection on the user's next tool invocation."""
    await self._creds_store.delete(sub)
    logger.info("%s credentials invalidated for sub='%s'", self.name, sub)

require_credentials

require_credentials(*, fail_fast: bool = False) -> Callable

Decorator factory that gates an async MCP tool behind credential collection.

Apply AFTER @mcp.tool()::

@mcp.tool(description="...")
@provider.require_credentials()
async def my_tool(ctx: Context, ...) -> str:
    creds = provider.get_credentials()
    pat = creds["pat"]
    ...

Parameters:

Name Type Description Default
fail_fast bool

False (default): tool call stays open during the credential form flow. True: raises UrlElicitationRequiredError; client must retry.

False
Source code in mcpauthkit/providers/credentials_provider.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def require_credentials(self, *, fail_fast: bool = False) -> Callable:
    """
    Decorator factory that gates an async MCP tool behind credential
    collection.

    Apply AFTER @mcp.tool()::

        @mcp.tool(description="...")
        @provider.require_credentials()
        async def my_tool(ctx: Context, ...) -> str:
            creds = provider.get_credentials()
            pat = creds["pat"]
            ...

    Parameters
    ----------
    fail_fast
        False (default): tool call stays open during the credential form flow.
        True: raises UrlElicitationRequiredError; client must retry.
    """

    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(ctx: Context, *args: Any, **kwargs: Any) -> Any:
            user = self._user_context.get()
            if user is None:
                return "Error: no authenticated user context."

            sub = user.get("sub", "")
            username = user.get("preferred_username", sub)
            logger.debug(
                "%s require_credentials: sub=%r fail_fast=%s", self.name, sub, fail_fast
            )

            if fail_fast:
                creds = await self._ensure_credentials_fail_fast(ctx, sub, username)
            else:
                creds = await self._ensure_credentials_blocking(ctx, sub, username)

            if creds is None:
                return (
                    f"{self.name.capitalize()} credentials were not provided "
                    "or the request timed out. Please try again."
                )

            reset = self._current_creds.set(creds)
            try:
                return await func(ctx, *args, **kwargs)
            finally:
                self._current_creds.reset(reset)

        return wrapper

    return decorator

register

register(app: FastAPI) -> None

Register the credential entry + submit routes on a FastAPI app.

Call this before mounting the MCP sub-app. Add provider.open_paths to your open_paths tuple so the auth middleware skips these routes.

Source code in mcpauthkit/providers/credentials_provider.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def register(self, app: FastAPI) -> None:
    """
    Register the credential entry + submit routes on a FastAPI app.

    Call this before mounting the MCP sub-app.  Add ``provider.open_paths``
    to your ``open_paths`` tuple so the auth middleware skips these routes.
    """
    provider = self
    name_cap = provider.name.capitalize()

    @app.get(f"/credentials/{provider.name}/entry", include_in_schema=False)
    async def _entry_page(t: str | None = None):
        if not t:
            return HTMLResponse(
                _jinja.get_template("credentials_error.html").render(
                    provider_name=name_cap,
                    message="Invalid or missing credential request token.",
                ),
                status_code=400,
            )
        meta = await provider._pending_store.get(t)
        if meta is None:
            return HTMLResponse(
                _jinja.get_template("credentials_error.html").render(
                    provider_name=name_cap,
                    message="Invalid or expired credential request.",
                ),
                status_code=400,
            )

        submit_url = f"{provider._server_base_url}/credentials/{provider.name}/submit?t={t}"
        return HTMLResponse(
            _jinja.get_template("credentials_entry.html").render(
                provider_name=name_cap,
                fields=_fields_for_template(provider._variables),
                submit_url=submit_url,
                doc_md=provider._doc_md,
            )
        )

    @app.post(f"/credentials/{provider.name}/submit", include_in_schema=False)
    async def _submit_credentials(request: Request, t: str | None = None):
        if not t:
            return HTMLResponse(
                _jinja.get_template("credentials_error.html").render(
                    provider_name=name_cap,
                    message="Invalid or missing credential request token.",
                ),
                status_code=400,
            )

        # Pop atomically so a second submit for the same token is rejected
        meta = await provider._pending_store.pop(t)
        if meta is None:
            return HTMLResponse(
                _jinja.get_template("credentials_error.html").render(
                    provider_name=name_cap,
                    message="Invalid or expired credential request.",
                ),
                status_code=400,
            )

        sub: str = meta["sub"]
        local = provider._sessions.pop(t, None)  # {session, elicitation_id} or None

        form_data = await request.form()
        collected: dict[str, str] = {}
        for var_name, var_def in provider._variables.items():
            value = str(form_data.get(var_name, "")).strip()
            if var_def.get("required", True) and not value:
                return HTMLResponse(
                    _jinja.get_template("credentials_error.html").render(
                        provider_name=name_cap,
                        message=f"'{var_def.get('label', var_name)}' is required.",
                    ),
                    status_code=400,
                )
            collected[var_name] = value

        await provider._creds_store.set(sub, collected)
        logger.info(
            "%s credentials stored for sub='%s' fields=%s",
            provider.name,
            sub,
            list(collected.keys()),
        )

        elicit_sent = False
        if local:
            try:
                await local["session"].send_elicit_complete(local["elicitation_id"])
                elicit_sent = True
                logger.info(
                    "%s sent elicit_complete (submit) elicitation_id='%s'",
                    provider.name,
                    local["elicitation_id"],
                )
            except Exception as exc:
                logger.warning(
                    "%s send_elicit_complete failed for sub='%s': %s",
                    provider.name,
                    sub,
                    exc,
                )

        await provider._pending_store.set_result(t, {"sub": sub, "_elicit_sent": elicit_sent})

        return HTMLResponse(
            _jinja.get_template("credentials_success.html").render(
                provider_name=name_cap,
            )
        )