♻️ Refactor ohmyapi_auth

- remove middlewares (should come from user)
- save symmetrically hased e-mail instead of raw
This commit is contained in:
Brian Wiborg 2025-10-06 21:27:16 +02:00
parent cc3872cf74
commit 33a9c94042
No known key found for this signature in database
2 changed files with 32 additions and 5 deletions

View file

@ -1 +1 @@
from . import middlewares, models, permissions, routes
from . import models, permissions, routes

View file

@ -1,25 +1,40 @@
import hmac
import hashlib
import base64
from functools import wraps
from secrets import token_bytes
from typing import List, Optional
from uuid import UUID
from passlib.context import CryptContext
from tortoise.contrib.pydantic import pydantic_queryset_creator
from ohmyapi.db import Model, field, pre_delete, pre_save
from ohmyapi.db import Model, field, Q
from ohmyapi.router import HTTPException
import settings
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
SECRET_KEY = getattr(settings, "SECRET_KEY", "OhMyAPI Secret Key")
def hmac_hash(data: str) -> str:
digest = hmac.new(SECRET_KEY.encode("UTF-8"), data.encode("utf-8"), hashlib.sha256).digest()
return base64.urlsafe_b64encode(digest).decode("utf-8")
class Group(Model):
id: UUID = field.data.UUIDField(pk=True)
name: str = field.CharField(max_length=42, index=True)
def __str__(self):
return self.name if self.name else ""
class User(Model):
id: UUID = field.data.UUIDField(pk=True)
email: str = field.CharField(max_length=255, unique=True, index=True)
username: str = field.CharField(max_length=150, unique=True)
email_hash: Optional[str] = field.CharField(max_length=255, unique=True, index=True)
password_hash: str = field.CharField(max_length=128)
is_admin: bool = field.BooleanField(default=False)
is_staff: bool = field.BooleanField(default=False)
@ -28,18 +43,30 @@ class User(Model):
)
class Schema:
exclude = ("password_hash",)
exclude = ["password_hash", "email_hash"]
def __str__(self):
fields = {
'username': self.username if self.username else "-",
'is_admin': 'y' if self.is_admin else 'n',
'is_staff': 'y' if self.is_staff else 'n',
}
return ' '.join([f"{k}:{v}" for k, v in fields.items()])
def set_password(self, raw_password: str) -> None:
"""Hash and store the password."""
self.password_hash = pwd_context.hash(raw_password)
def set_email(self, new_email: str) -> None:
"""Hash and set the e-mail address."""
self.email_hash = hmac_hash(email)
def verify_password(self, raw_password: str) -> bool:
"""Verify a plaintext password against the stored hash."""
return pwd_context.verify(raw_password, self.password_hash)
@classmethod
async def authenticate(cls, username: str, password: str) -> Optional["User"]:
async def authenticate_username(cls, username: str, password: str) -> Optional["User"]:
"""Authenticate a user by username and password."""
user = await cls.filter(username=username).first()
if user and user.verify_password(password):