Skip to content

Auth Strategies

OAuth2

oauth2

OAuth2 strategy with support for Google, GitHub, and custom providers.

OAuth2Strategy

OAuth2Strategy(
    provider_name: str, config: OAuth2ProviderConfig
)

Handles OAuth2 authorization code flow for external identity providers.

Source code in libs/ninja-auth/src/ninja_auth/strategies/oauth2.py
def __init__(self, provider_name: str, config: OAuth2ProviderConfig) -> None:
    self.provider_name = provider_name
    self.config = config

get_authorization_url

get_authorization_url(state: str = '') -> str

Build the URL to redirect the user to for OAuth2 authorization.

Source code in libs/ninja-auth/src/ninja_auth/strategies/oauth2.py
def get_authorization_url(self, state: str = "") -> str:
    """Build the URL to redirect the user to for OAuth2 authorization."""
    params: dict[str, str] = {
        "client_id": self.config.client_id,
        "redirect_uri": self.config.redirect_uri,
        "response_type": "code",
        "scope": " ".join(self.config.scopes),
    }
    if state:
        params["state"] = state
    return f"{self.config.authorize_url}?{urlencode(params)}"

exchange_code async

exchange_code(code: str) -> dict[str, Any]

Exchange an authorization code for tokens.

Source code in libs/ninja-auth/src/ninja_auth/strategies/oauth2.py
async def exchange_code(self, code: str) -> dict[str, Any]:
    """Exchange an authorization code for tokens."""
    async with httpx.AsyncClient() as client:
        headers = {"Accept": "application/json"}
        resp = await client.post(
            self.config.token_url,
            data={
                "grant_type": "authorization_code",
                "client_id": self.config.client_id,
                "client_secret": self.config.client_secret,
                "code": code,
                "redirect_uri": self.config.redirect_uri,
            },
            headers=headers,
        )
        resp.raise_for_status()
        return resp.json()

get_userinfo async

get_userinfo(access_token: str) -> dict[str, Any]

Fetch user profile from the provider's userinfo endpoint.

Source code in libs/ninja-auth/src/ninja_auth/strategies/oauth2.py
async def get_userinfo(self, access_token: str) -> dict[str, Any]:
    """Fetch user profile from the provider's userinfo endpoint."""
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            self.config.userinfo_url,
            headers={"Authorization": f"Bearer {access_token}"},
        )
        resp.raise_for_status()
        return resp.json()

authenticate_with_code async

authenticate_with_code(code: str) -> UserContext

Full OAuth2 flow: exchange code -> fetch userinfo -> return context.

Source code in libs/ninja-auth/src/ninja_auth/strategies/oauth2.py
async def authenticate_with_code(self, code: str) -> UserContext:
    """Full OAuth2 flow: exchange code -> fetch userinfo -> return context."""
    tokens = await self.exchange_code(code)
    access_token = tokens.get("access_token", "")
    userinfo = await self.get_userinfo(access_token)

    # Normalize across providers
    user_id = str(userinfo.get("sub") or userinfo.get("id", ""))
    email = userinfo.get("email")

    return UserContext(
        user_id=user_id,
        email=email,
        roles=[],
        provider=f"oauth2:{self.provider_name}",
        metadata={"userinfo": userinfo, "access_token": access_token},
    )

Bearer (JWT)

bearer

JWT bearer token validation strategy.

BearerStrategy

BearerStrategy(config: BearerConfig)

Validates JWT bearer tokens and extracts user context.

Source code in libs/ninja-auth/src/ninja_auth/strategies/bearer.py
def __init__(self, config: BearerConfig) -> None:
    self.config = config

authenticate async

authenticate(request: Request) -> UserContext | None

Extract and validate a JWT from the Authorization header.

Source code in libs/ninja-auth/src/ninja_auth/strategies/bearer.py
async def authenticate(self, request: Request) -> UserContext | None:
    """Extract and validate a JWT from the Authorization header."""
    auth_header = request.headers.get("authorization", "")
    if not auth_header.startswith("Bearer "):
        return None

    token = auth_header[7:]
    return self.validate_token(token)

validate_token

validate_token(token: str) -> UserContext | None

Decode and validate a JWT, returning UserContext on success.

Source code in libs/ninja-auth/src/ninja_auth/strategies/bearer.py
def validate_token(self, token: str) -> UserContext | None:
    """Decode and validate a JWT, returning UserContext on success."""
    try:
        decode_opts: dict[str, Any] = {
            "algorithms": self._get_algorithms(),
        }
        if self.config.issuer:
            decode_opts["issuer"] = self.config.issuer
        if self.config.audience:
            decode_opts["audience"] = self.config.audience

        payload = jwt.decode(
            token,
            self._get_signing_key(),
            **decode_opts,
        )

        return UserContext(
            user_id=payload.get("sub", ""),
            email=payload.get("email"),
            roles=payload.get("roles", []),
            permissions=payload.get("permissions", []),
            provider="bearer",
            metadata={"claims": payload},
        )
    except jwt.PyJWTError:
        return None

API Key

apikey

API key validation strategy for service-to-service auth.

ApiKeyStrategy

ApiKeyStrategy(config: ApiKeyConfig)

Validates API keys from a configured header against known keys.

Source code in libs/ninja-auth/src/ninja_auth/strategies/apikey.py
def __init__(self, config: ApiKeyConfig) -> None:
    self.config = config

authenticate async

authenticate(request: Request) -> UserContext | None

Check the request header for a valid API key.

Source code in libs/ninja-auth/src/ninja_auth/strategies/apikey.py
async def authenticate(self, request: Request) -> UserContext | None:
    """Check the request header for a valid API key."""
    api_key = request.headers.get(self.config.header_name.lower(), "")
    if not api_key:
        # Also check query param as fallback
        api_key = request.query_params.get("api_key", "")
    if not api_key:
        return None

    return self.validate_key(api_key)

validate_key

validate_key(api_key: str) -> UserContext | None

Validate an API key against configured keys using constant-time comparison.

Source code in libs/ninja-auth/src/ninja_auth/strategies/apikey.py
def validate_key(self, api_key: str) -> UserContext | None:
    """Validate an API key against configured keys using constant-time comparison."""
    for name, known_key in self.config.keys.items():
        if hmac.compare_digest(
            hashlib.sha256(api_key.encode()).hexdigest(),
            hashlib.sha256(known_key.encode()).hexdigest(),
        ):
            return UserContext(
                user_id=f"apikey:{name}",
                roles=["service"],
                provider="apikey",
                metadata={"key_name": name},
            )
    return None

Built-in Identity

identity

Built-in identity strategy: user registration, login, password hashing.

IdentityStrategy

IdentityStrategy(config: IdentityConfig)

Manages local user accounts with password hashing and JWT session tokens.

Source code in libs/ninja-auth/src/ninja_auth/strategies/identity.py
def __init__(self, config: IdentityConfig) -> None:
    self.config = config
    # In-memory store; production would use ninja-persistence
    self._users: dict[str, dict[str, Any]] = {}

hash_password

hash_password(password: str) -> str

Hash a plaintext password using bcrypt.

Source code in libs/ninja-auth/src/ninja_auth/strategies/identity.py
def hash_password(self, password: str) -> str:
    """Hash a plaintext password using bcrypt."""
    return bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()

verify_password

verify_password(plain: str, hashed: str) -> bool

Verify a password against its bcrypt hash.

Source code in libs/ninja-auth/src/ninja_auth/strategies/identity.py
def verify_password(self, plain: str, hashed: str) -> bool:
    """Verify a password against its bcrypt hash."""
    return bcrypt.checkpw(plain.encode(), hashed.encode())

register

register(
    email: str,
    password: str,
    roles: list[str] | None = None,
) -> UserContext

Register a new user account.

Source code in libs/ninja-auth/src/ninja_auth/strategies/identity.py
def register(self, email: str, password: str, roles: list[str] | None = None) -> UserContext:
    """Register a new user account."""
    if email in self._users:
        raise ValueError(f"User already exists: {email}")

    user_id = secrets.token_hex(16)
    self._users[email] = {
        "user_id": user_id,
        "email": email,
        "password_hash": self.hash_password(password),
        "roles": roles or [],
        "created_at": datetime.now(timezone.utc).isoformat(),
    }

    return UserContext(
        user_id=user_id,
        email=email,
        roles=roles or [],
        provider="identity",
    )

login

login(email: str, password: str) -> UserContext | None

Authenticate a user by email and password.

Source code in libs/ninja-auth/src/ninja_auth/strategies/identity.py
def login(self, email: str, password: str) -> UserContext | None:
    """Authenticate a user by email and password."""
    user = self._users.get(email)
    if not user:
        return None
    if not self.verify_password(password, user["password_hash"]):
        return None

    return UserContext(
        user_id=user["user_id"],
        email=user["email"],
        roles=user.get("roles", []),
        provider="identity",
    )

issue_token

issue_token(user_ctx: UserContext) -> str

Issue a JWT session token for an authenticated user.

Source code in libs/ninja-auth/src/ninja_auth/strategies/identity.py
def issue_token(self, user_ctx: UserContext) -> str:
    """Issue a JWT session token for an authenticated user."""
    now = datetime.now(timezone.utc)
    payload = {
        "sub": user_ctx.user_id,
        "email": user_ctx.email,
        "roles": user_ctx.roles,
        "iat": now,
        "exp": now + timedelta(minutes=self.config.token_expiry_minutes),
    }
    return jwt.encode(payload, self.config.token_secret, algorithm="HS256")

validate_token

validate_token(token: str) -> UserContext | None

Validate a session token and return user context.

Source code in libs/ninja-auth/src/ninja_auth/strategies/identity.py
def validate_token(self, token: str) -> UserContext | None:
    """Validate a session token and return user context."""
    try:
        payload = jwt.decode(token, self.config.token_secret, algorithms=["HS256"])
        return UserContext(
            user_id=payload["sub"],
            email=payload.get("email"),
            roles=payload.get("roles", []),
            provider="identity",
        )
    except jwt.PyJWTError:
        return None