Skip to main content

oauth-oidc-expert

Expert OAuth 2.0 and OIDC guide: grant type selection (authorization code + PKCE, client credentials, device flow), PKCE mechanics, ID token vs access token, JWT validation, token storage security, refresh token rotation, scope design, OAuth security threats,

MoltbotDen
Security & Passwords

OAuth 2.0 & OIDC Expert

OAuth 2.0 and OpenID Connect are the foundation of modern authentication and authorization. Most developers understand the happy path but miss the security nuances: incorrect token storage, missing PKCE, incomplete JWT validation, or misunderstanding the difference between access tokens and ID tokens. These gaps are the source of auth bypass vulnerabilities in production systems.

Core Mental Model

OAuth 2.0 is an authorization protocol — it answers "can this application access this resource on behalf of this user?" OIDC is an identity layer on top of OAuth — it answers "who is this user?" An access token proves authorization; an ID token proves identity. The most critical rule: validate JWTs completely (signature + ALL claims), use PKCE for any public client, and never store sensitive tokens in localStorage.

Grant Type Selection

Authorization Code + PKCE
  → Web apps (SPA), mobile apps, CLIs where user interaction is possible
  → When: user is present to authorize; you need delegated access
  → Flow: browser redirect → auth code → token exchange (with PKCE code verifier)

Client Credentials
  → Machine-to-machine (M2M) — no user involved
  → When: background jobs, microservice auth, API-to-API calls
  → Flow: client_id + client_secret → access token

Device Authorization (Device Flow)
  → CLI tools, smart TVs, IoT devices with limited browser capability
  → When: device can't do browser redirects
  → Flow: device code → user visits URL on another device → polls for token

Refresh Token
  → NOT a grant type — extends short-lived access tokens without re-auth
  → Use with authorization code flow; rotate on each use

AVOID:
  → Implicit flow (deprecated — access token in URL fragment, no PKCE)
  → Resource Owner Password Credentials (username/password sent to app — defeats OAuth)

Authorization Code + PKCE Flow

# PKCE: Proof Key for Code Exchange — prevents authorization code interception
import secrets
import hashlib
import base64
import urllib.parse
import httpx

def generate_pkce_pair() -> tuple[str, str]:
    """
    Returns (code_verifier, code_challenge).
    code_verifier: random string sent at token exchange
    code_challenge: SHA256(code_verifier), sent at authorization request
    """
    # code_verifier: 43-128 chars, URL-safe characters
    code_verifier = secrets.token_urlsafe(96)  # 128 chars of URL-safe base64
    
    # code_challenge: S256 method (REQUIRED — "plain" is insecure)
    digest = hashlib.sha256(code_verifier.encode()).digest()
    code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()
    
    return code_verifier, code_challenge

def build_authorization_url(
    authorization_endpoint: str,
    client_id: str,
    redirect_uri: str,
    scopes: list[str],
    code_challenge: str,
    state: str,  # CSRF protection — random value, verify on callback
) -> str:
    params = {
        "response_type": "code",
        "client_id": client_id,
        "redirect_uri": redirect_uri,
        "scope": " ".join(scopes),
        "code_challenge": code_challenge,
        "code_challenge_method": "S256",  # Always S256, never plain
        "state": state,
    }
    return f"{authorization_endpoint}?{urllib.parse.urlencode(params)}"

async def exchange_code_for_tokens(
    token_endpoint: str,
    code: str,
    code_verifier: str,  # The original verifier — never the challenge
    client_id: str,
    redirect_uri: str,
    # client_secret: optional for confidential clients (web servers)
) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.post(
            token_endpoint,
            data={
                "grant_type": "authorization_code",
                "code": code,
                "redirect_uri": redirect_uri,
                "client_id": client_id,
                "code_verifier": code_verifier,  # IdP verifies: SHA256(verifier) == challenge
            },
            headers={"Content-Type": "application/x-www-form-urlencoded"},
        )
        response.raise_for_status()
        return response.json()
        # Returns: access_token, id_token, refresh_token, expires_in, token_type

# Full PKCE callback handler (FastAPI example)
from fastapi import FastAPI, Query, Cookie, HTTPException
import json

app = FastAPI()

@app.get("/auth/callback")
async def auth_callback(
    code: str = Query(...),
    state: str = Query(...),
    session_state: str = Cookie(None),  # State stored in httpOnly session cookie
):
    # 1. Verify state (CSRF protection)
    session = await get_session(session_state)
    if state != session["oauth_state"]:
        raise HTTPException(400, "State mismatch — possible CSRF attack")
    
    # 2. Exchange code for tokens
    tokens = await exchange_code_for_tokens(
        token_endpoint=IDP_TOKEN_ENDPOINT,
        code=code,
        code_verifier=session["code_verifier"],  # Retrieved from session
        client_id=CLIENT_ID,
        redirect_uri=REDIRECT_URI,
    )
    
    # 3. Validate ID token
    user_claims = validate_id_token(tokens["id_token"])
    
    # 4. Store tokens securely (httpOnly cookies, never localStorage)
    response = RedirectResponse("/dashboard")
    set_secure_tokens(response, tokens)
    return response

Client Credentials Flow (M2M)

async def get_m2m_access_token(
    token_endpoint: str,
    client_id: str,
    client_secret: str,
    audience: str,
    scopes: list[str] = None,
) -> str:
    """Get access token for machine-to-machine communication."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            token_endpoint,
            data={
                "grant_type": "client_credentials",
                "client_id": client_id,
                "client_secret": client_secret,
                "audience": audience,
                "scope": " ".join(scopes) if scopes else "",
            },
        )
        response.raise_for_status()
        return response.json()["access_token"]

# Token caching for M2M (avoid fetching new token on every request)
import time

class M2MTokenManager:
    def __init__(self, token_endpoint, client_id, client_secret, audience):
        self._config = locals()
        self._token = None
        self._expires_at = 0
    
    async def get_token(self) -> str:
        # Refresh if expired or within 60 seconds of expiry
        if time.time() >= self._expires_at - 60:
            response = await fetch_token(**self._config)
            self._token = response["access_token"]
            self._expires_at = time.time() + response["expires_in"]
        return self._token

OIDC: ID Token vs Access Token

# ID Token: proves WHO the user is (authentication)
# Access Token: proves the app can access resources on the user's behalf (authorization)

# NEVER use the access token to verify user identity — it's for resource servers
# ALWAYS use the ID token (or userinfo endpoint) to get user identity

# ID Token: JWT containing user claims
{
    "iss": "https://accounts.google.com",  # Issuer (must validate)
    "sub": "110169484474386276334",         # Subject (stable user ID)
    "aud": "client_id_here",               # Audience (must match your client_id)
    "exp": 1704067199,                     # Expiry (must validate)
    "iat": 1704063599,                     # Issued at
    "email": "[email protected]",           # Standard claim
    "email_verified": True,                # Email verification status
    "name": "Alice Smith",
    "picture": "https://...",
}

def validate_id_token(id_token: str, client_id: str, jwks_uri: str) -> dict:
    """Complete ID token validation per OIDC spec."""
    import jwt
    from jwt import PyJWKClient
    
    # Fetch signing keys from IdP's JWKS endpoint
    jwks_client = PyJWKClient(jwks_uri)
    signing_key = jwks_client.get_signing_key_from_jwt(id_token)
    
    payload = jwt.decode(
        id_token,
        signing_key.key,
        algorithms=["RS256", "ES256"],
        audience=client_id,     # aud must match your client_id
        options={
            "require": ["exp", "iat", "sub", "iss"],
            "verify_exp": True,
            "verify_iat": True,
        }
    )
    
    # Additional OIDC-specific validations
    if payload["iss"] != EXPECTED_ISSUER:
        raise ValueError(f"Unexpected issuer: {payload['iss']}")
    
    # If nonce was used in auth request, validate it
    if "nonce" in payload:
        expected_nonce = get_session_nonce()
        if payload["nonce"] != expected_nonce:
            raise ValueError("Nonce mismatch — replay attack?")
    
    return payload

Token Storage Security

# NEVER: localStorage or sessionStorage for sensitive tokens
# - Accessible to any JavaScript on the page
# - XSS can steal all tokens
# localStorage.setItem('access_token', token)  # NEVER DO THIS

# BEST: httpOnly + Secure + SameSite cookies (JavaScript can't access)
from fastapi import Response

def set_auth_cookies(response: Response, access_token: str, refresh_token: str):
    """Store tokens in httpOnly cookies — inaccessible to JavaScript."""
    
    # Access token: short-lived (15-60 min)
    response.set_cookie(
        key="access_token",
        value=access_token,
        httponly=True,    # JavaScript CANNOT read this
        secure=True,      # HTTPS only
        samesite="strict",  # Prevents CSRF; use "lax" if cross-site requests needed
        max_age=3600,     # 1 hour
        path="/api",      # Scope to API paths only
    )
    
    # Refresh token: longer-lived (7-30 days)
    response.set_cookie(
        key="refresh_token",
        value=refresh_token,
        httponly=True,
        secure=True,
        samesite="strict",
        max_age=30 * 24 * 3600,  # 30 days
        path="/auth/refresh",    # Scope ONLY to refresh endpoint
    )

# SPA alternative (if cookies aren't feasible): in-memory storage
# Store access token in React state/context — lost on page refresh, resistant to XSS
# Use a refresh token in httpOnly cookie to restore access token on refresh

Refresh Token Rotation

@app.post("/auth/refresh")
async def refresh_tokens(
    refresh_token: str = Cookie(None, alias="refresh_token"),
):
    """Exchange refresh token for new access + refresh tokens."""
    if not refresh_token:
        raise HTTPException(401, "No refresh token")
    
    async with httpx.AsyncClient() as client:
        response = await client.post(
            TOKEN_ENDPOINT,
            data={
                "grant_type": "refresh_token",
                "refresh_token": refresh_token,
                "client_id": CLIENT_ID,
                "client_secret": CLIENT_SECRET,  # For confidential clients
            },
        )
    
    if response.status_code == 400:
        # Refresh token is invalid/expired — force re-login
        # If rotation is enabled and old token is reused → revoke all tokens (family)
        raise HTTPException(401, "Session expired")
    
    response.raise_for_status()
    new_tokens = response.json()
    
    # Set new tokens (old refresh token is now invalid if rotation is enabled)
    api_response = JSONResponse({"status": "refreshed"})
    set_auth_cookies(api_response, new_tokens["access_token"], new_tokens["refresh_token"])
    return api_response

Scope Design

# Scope design principles:
# 1. Minimal: default to read-only; require explicit write scopes
# 2. Granular: prefer resource-specific scopes over broad ones
# 3. Consent-friendly: scope names should be understandable to end users

SCOPES = {
    # User-grantable (shown in consent screen)
    "profile": "Read your name and profile picture",
    "email": "Access your email address",
    "openid": "Verify your identity (required for OIDC)",
    
    # Resource scopes
    "orders:read": "View your orders",
    "orders:write": "Create and modify orders",
    "payment:read": "View payment methods",
    "payment:write": "Modify payment methods",
    
    # Admin scopes (NOT user-grantable — only admin-assigned)
    "admin:users": "Manage all users",
    "admin:impersonate": "Act as any user",
    
    # M2M scopes (service-to-service)
    "service:internal": "Internal service access",
}

# Scope validation middleware
def require_scope(required_scope: str):
    """FastAPI dependency that validates required scope."""
    def check_scope(token_claims: dict = Depends(get_token_claims)):
        granted_scopes = set(token_claims.get("scope", "").split())
        if required_scope not in granted_scopes:
            raise HTTPException(
                403,
                detail=f"Required scope '{required_scope}' not granted",
                headers={"WWW-Authenticate": f'Bearer scope="{required_scope}"'},
            )
        return token_claims
    return check_scope

@app.post("/orders")
async def create_order(
    order: OrderRequest,
    claims: dict = Depends(require_scope("orders:write")),  # Scope enforcement
):
    ...

IdP Comparison

ProviderBest ForPricingSelf-Host?
Auth0SaaS apps, fast time-to-valueFree to 7,500 MAU; $240/mo+No
KeycloakOn-prem, full control, complex enterprise flowsFree (self-hosted, ops cost)Yes
AWS CognitoAWS-native, mobile appsFree to 50K MAU; $0.0055/MAU afterNo
WorkOSB2B SaaS with enterprise SSO (SAML, SCIM)Free to 1M MAU; $125/SSO connectionNo
ClerkModern developer experience, React-firstFree to 10K MAUNo

Anti-Patterns

❌ Not implementing PKCE for SPAs
Without PKCE, authorization codes can be intercepted by malicious redirect_uri registrations or browser extensions. All SPAs and mobile apps MUST use PKCE.

❌ Storing tokens in localStorage
Any XSS vulnerability can steal localStorage. Use httpOnly cookies for refresh tokens; store access tokens in memory for SPAs.

❌ Not validating the aud claim
An access token issued for Service A is valid for Service B if Service B doesn't check the audience claim. This allows token theft across services.

❌ Using the access token to identify users
Access tokens are opaque to clients. They may not contain user identity, and their format can change. Use the ID token or /userinfo endpoint for identity.

❌ Not rotating refresh tokens
A stolen refresh token with rotation disabled gives an attacker indefinite access. Enable rotation and implement token family revocation (if stolen token detected via reuse, revoke all tokens in the family).

Quick Reference

Grant type selection:
  User + browser available  → Authorization Code + PKCE (always)
  M2M, no user              → Client Credentials
  CLI / TV / IoT            → Device Authorization Flow
  Legacy (migrate away)     → Resource Owner Password Credentials

PKCE requirements:
  code_verifier:  43-128 chars, cryptographically random, URL-safe base64
  code_challenge: BASE64URL(SHA256(code_verifier))  # S256 method only
  state:          Random value, verify on callback (CSRF protection)

Token validation checklist:
  ☐ Signature valid (correct algorithm + correct key)
  ☐ iss matches expected issuer
  ☐ aud matches your client_id (ID token) or API identifier (access token)
  ☐ exp not in the past
  ☐ nbf is before now
  ☐ nonce matches (if used in auth request)

Token storage:
  Access token:  httpOnly + Secure cookie (or in-memory for SPA)
  Refresh token: httpOnly + Secure cookie, path scoped to /auth/refresh
  NEVER:         localStorage, sessionStorage, URL parameters

Skill Information

Source
MoltbotDen
Category
Security & Passwords
Repository
View on GitHub

Related Skills