♻️ field.data.UUIDField as Models.id on auth models

This commit is contained in:
Brian Wiborg 2025-09-27 15:00:13 +02:00
parent 018587618e
commit 51037b615a
No known key found for this signature in database
3 changed files with 49 additions and 16 deletions

View file

@ -7,12 +7,12 @@ pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
class Group(Model): class Group(Model):
id = field.IntField(pk=True) id = field.data.UUIDField(pk=True)
name = field.CharField(max_length=42, index=True) name = field.CharField(max_length=42, index=True)
class User(Model): class User(Model):
id = field.IntField(pk=True) id = field.data.UUIDField(pk=True)
email = field.CharField(max_length=255, unique=True, index=True) email = field.CharField(max_length=255, unique=True, index=True)
username = field.CharField(max_length=150, unique=True) username = field.CharField(max_length=150, unique=True)
password_hash = field.CharField(max_length=128) password_hash = field.CharField(max_length=128)

View file

@ -1,4 +1,5 @@
from .routes import ( from .routes import (
get_token,
get_current_user, get_current_user,
require_authenticated, require_authenticated,
require_admin, require_admin,

View file

@ -1,12 +1,13 @@
import time import time
from typing import Dict from enum import Enum
from typing import Any, Dict, List
import jwt import jwt
from fastapi import APIRouter, Body, Depends, Header, HTTPException, status from fastapi import APIRouter, Body, Depends, Header, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel from pydantic import BaseModel
from ohmyapi.builtin.auth.models import User from ohmyapi.builtin.auth.models import User, Group
import settings import settings
@ -40,14 +41,39 @@ def decode_token(token: str) -> Dict:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
class TokenType(str, Enum):
"""
Helper for indicating the token type when generating claims.
"""
access = "access"
refresh = "refresh"
def claims(token_type: TokenType, user: User, groups: List[Group] = []) -> Dict[str, Any]:
return {
'type': token_type,
'sub': str(user.id),
'user': {
'username': user.username,
'email': user.email,
},
'roles': [g.name for g in groups]
}
async def get_token(token: str = Depends(oauth2_scheme)) -> Dict:
"""Dependency: token introspection"""
payload = decode_token(token)
return payload
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User: async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
"""Dependency: extract user from access token.""" """Dependency: extract user from access token."""
payload = decode_token(token) payload = decode_token(token)
username = payload.get("sub") user_id = payload.get("sub")
if username is None: if user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload")
user = await User.filter(username=username).first() user = await User.filter(id=user_id).first()
if not user: if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user return user
@ -100,8 +126,8 @@ async def login(form_data: LoginRequest = Body(...)):
if not user: if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
access_token = create_token({"sub": user.username, "type": "access"}, ACCESS_TOKEN_EXPIRE_SECONDS) access_token = create_token(claims(TokenType.access, user), ACCESS_TOKEN_EXPIRE_SECONDS)
refresh_token = create_token({"sub": user.username, "type": "refresh"}, REFRESH_TOKEN_EXPIRE_SECONDS) refresh_token = create_token(claims(TokenType.refresh, user), REFRESH_TOKEN_EXPIRE_SECONDS)
return { return {
"access_token": access_token, "access_token": access_token,
@ -117,20 +143,26 @@ async def refresh_token(refresh_token: str):
if payload.get("type") != "refresh": if payload.get("type") != "refresh":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token")
username = payload.get("sub") user_id = payload.get("sub")
user = await User.filter(username=username).first() user = await User.filter(id=user_id).first()
if not user: if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
new_access = create_token({"sub": user.username, "type": "access"}, ACCESS_TOKEN_EXPIRE_SECONDS) new_access = create_token(claims(TokenType.access, user), ACCESS_TOKEN_EXPIRE_SECONDS)
return {"access_token": new_access, "token_type": "bearer"} return {"access_token": new_access, "token_type": "bearer"}
@router.get("/me") @router.get("/me")
async def me(current_user: User = Depends(get_current_user)): async def me(user: User = Depends(get_current_user)):
"""Return the currently authenticated user.""" """Return the currently authenticated user."""
return { return {
"username": current_user.username, "email": user.email,
"is_admin": current_user.is_admin, "username": user.username,
"is_staff": current_user.is_staff, "is_admin": user.is_admin,
"is_staff": user.is_staff,
} }
@router.get("/introspect")
async def introspect(token: Dict = Depends(get_token)):
return token