schon/vibes_auth/serializers.py

217 lines
6.8 KiB
Python

import logging
from contextlib import suppress
from typing import Any, Dict, Optional, Type
from constance import config
from django.contrib.auth import authenticate
from django.contrib.auth.models import update_last_login
from django.contrib.auth.password_validation import validate_password
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import AuthenticationFailed, ValidationError
from rest_framework.fields import (
BooleanField,
CharField,
EmailField,
SerializerMethodField,
)
from rest_framework.serializers import ModelSerializer, Serializer
from rest_framework_simplejwt.exceptions import TokenError
from rest_framework_simplejwt.serializers import AuthUser, PasswordField
from rest_framework_simplejwt.settings import api_settings
from rest_framework_simplejwt.token_blacklist.models import BlacklistedToken
from rest_framework_simplejwt.tokens import RefreshToken, Token, UntypedToken
from core.utils.security import is_safe_key
from evibes import settings
from vibes_auth.models import User
logger = logging.getLogger(__name__)
class UserSerializer(ModelSerializer):
avatar_url = SerializerMethodField(required=False, read_only=True)
password = CharField(write_only=True, required=False)
is_staff = BooleanField(read_only=True)
@staticmethod
def get_avatar_url(obj) -> str:
if obj.avatar:
return f"https://api.{config.BASE_DOMAIN}/media/{obj.avatar!s}"
return f"https://api.{config.BASE_DOMAIN}/static/person.png"
class Meta:
model = User
fields = [
"uuid",
"email",
"avatar_url",
"is_staff",
"created",
"first_name",
"last_name",
"password",
"phone_number",
"is_subscribed",
"modified",
"created",
]
def create(self, validated_data):
user = User.objects.create(
email=validated_data["email"],
first_name=validated_data["first_name"],
last_name=validated_data["last_name"],
)
user.set_password(validated_data.pop("password"))
for attr, value in validated_data.items():
if is_safe_key(attr):
setattr(user, attr, value)
user.save()
return user
def update(self, instance, validated_data):
for attr, value in validated_data.items():
if is_safe_key(attr):
setattr(instance, attr, value)
if attr == "password":
instance.set_password(value)
instance.save()
return instance
def validate(self, attrs):
if "password" in attrs:
validate_password(attrs["password"])
return attrs
class TokenObtainSerializer(Serializer):
username_field = User.USERNAME_FIELD
token_class: Optional[Type[Token]] = None
default_error_messages = {"no_active_account": _("no active account")}
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.user = None
self.fields[self.username_field] = CharField(write_only=True)
self.fields["password"] = PasswordField()
def validate(self, attrs: Dict[str, Any]) -> Dict[Any, Any]:
authenticate_kwargs = {
self.username_field: attrs[self.username_field],
"password": attrs["password"],
}
with suppress(KeyError):
authenticate_kwargs["request"] = self.context["request"]
self.user = authenticate(**authenticate_kwargs)
if not api_settings.USER_AUTHENTICATION_RULE(self.user):
raise AuthenticationFailed(
self.error_messages["no_active_account"],
_("no active account"),
)
return {}
@classmethod
def get_token(cls, user: AuthUser) -> Token:
return cls.token_class.for_user(user) # type: ignore
class TokenObtainPairSerializer(TokenObtainSerializer):
token_class = RefreshToken
def validate(self, attrs: Dict[str, Any]) -> Dict[str, str]:
data = super().validate(attrs)
logger.debug("Data validated")
refresh = self.get_token(self.user)
data["refresh"] = str(refresh)
data["access"] = str(refresh.access_token)
data["user"] = UserSerializer(self.user).data
logger.debug("Data formed")
if api_settings.UPDATE_LAST_LOGIN:
update_last_login(self.user, self.user)
logger.debug("Updated last login")
logger.debug("Returning data")
return data
class TokenRefreshSerializer(Serializer):
refresh = CharField()
access = CharField(read_only=True)
token_class = RefreshToken
def validate(self, attrs: Dict[str, Any]) -> Dict[str, str]:
refresh = self.token_class(attrs["refresh"])
data = {"access": str(refresh.access_token)}
if api_settings.ROTATE_REFRESH_TOKENS:
if api_settings.BLACKLIST_AFTER_ROTATION:
with suppress(AttributeError):
refresh.blacklist()
refresh.set_jti()
refresh.set_exp()
refresh.set_iat()
data["refresh"] = str(refresh)
user = User.objects.get(uuid=refresh.payload["user_uuid"])
data["user"] = UserSerializer(user).data
return data
class TokenVerifySerializer(Serializer):
token = CharField(write_only=True)
def validate(self, attrs: Dict[str, None]) -> Dict[Any, Any]:
token = UntypedToken(attrs["token"])
if (
api_settings.BLACKLIST_AFTER_ROTATION
and "rest_framework_simplejwt.token_blacklist" in settings.INSTALLED_APPS
):
jti = token.get(api_settings.JTI_CLAIM)
if BlacklistedToken.objects.filter(token__jti=jti).exists():
raise ValidationError(_("token_blacklisted"))
try:
payload = UntypedToken(attrs["token"]).payload
except TokenError:
raise ValidationError(_("invalid token"))
try:
user_uuid = payload["user_uuid"]
user = User.objects.get(uuid=user_uuid)
except KeyError:
raise ValidationError(_("no user uuid claim present in token"))
except User.DoesNotExist:
raise ValidationError(_("user does not exist"))
attrs["user"] = UserSerializer(user).data
return attrs
class ConfirmPasswordResetSerializer(Serializer):
uidb64 = CharField(write_only=True, required=True)
token = CharField(write_only=True, required=True)
password = CharField(write_only=True, required=True)
confirm_password = CharField(write_only=True, required=True)
class ResetPasswordSerializer(Serializer):
email = EmailField(write_only=True, required=True)
class ActivateEmailSerializer(Serializer):
uidb64 = CharField(required=True)
token = CharField(required=True)