schon/engine/vibes_auth/graphene/mutations.py
Egor fureunoir Gorbunov 3228a89d4b Features:
1) Add new `test_graphene` test module for expanded testing coverage;
2) Introduce `test_drf` module in `engine/blog` for improved API testing;

Fixes:
1) Remove unnecessary `--extra testing` flag from Dockerfile to streamline dependencies;
2) Update `uv.lock` with newer versions of dependencies (`certifi`, `coverage`, `django-constance`) for enhanced security and functionality;

Extra:
1) Remove deprecated packages (`bandit`, `cfgv`, `distlib`) from `uv.lock` for cleanup;
2) Adjust `uv.lock` content and formatting to be consistent with updated dependencies.
2025-11-13 15:24:44 +03:00

359 lines
12 KiB
Python

import logging
import traceback
from contextlib import suppress
from hmac import compare_digest
from django.conf import settings
from django.contrib.auth.password_validation import validate_password
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.core.exceptions import BadRequest, PermissionDenied, ValidationError
from django.db import IntegrityError
from django.http import Http404
from django.utils.http import urlsafe_base64_decode
from django.utils.translation import gettext_lazy as _
from graphene import UUID, Boolean, Field, List, String
from graphene.types.generic import GenericScalar
from graphene_file_upload.scalars import Upload
from engine.core.graphene import BaseMutation
from engine.core.utils.messages import permission_denied_message
from engine.vibes_auth.graphene.object_types import UserType
from engine.vibes_auth.models import User
from engine.vibes_auth.serializers import (
TokenObtainPairSerializer,
TokenRefreshSerializer,
TokenVerifySerializer,
)
from engine.vibes_auth.utils.emailing import send_reset_password_email_task
from engine.vibes_auth.validators import is_valid_email, is_valid_phone_number
logger = logging.getLogger(__name__)
class CreateUser(BaseMutation):
class Arguments:
email = String(required=True)
password = String(required=True)
confirm_password = String(required=True)
last_name = String()
first_name = String()
phone_number = String()
is_subscribed = Boolean()
language = String()
referrer = String(required=False, description=_("the user's b64-encoded uuid who referred the new user to us."))
success = Boolean()
def mutate(
self,
info,
email,
password,
confirm_password,
last_name=None,
first_name=None,
phone_number=None,
is_subscribed=None,
language=None,
**kwargs,
):
try:
validate_password(password)
if compare_digest(password.lower(), email.lower()):
raise BadRequest(_("password too weak"))
if compare_digest(password, confirm_password):
User.objects.create_user(
email=email,
password=password,
last_name=last_name,
first_name=first_name,
phone_number=phone_number,
is_subscribed=is_subscribed if is_subscribed else False,
language=language if language else settings.LANGUAGE_CODE,
attributes={"referrer": kwargs.get("referrer", "")} if kwargs.get("referrer", "") else {},
)
# noinspection PyTypeChecker
return CreateUser(success=True)
else:
# noinspection PyTypeChecker
return CreateUser(success=False)
except IntegrityError:
# noinspection PyTypeChecker
return CreateUser(success=True)
except Exception as e:
raise BadRequest(str(e)) from e
class UpdateUser(BaseMutation):
class Arguments:
uuid = UUID(required=True)
email = String(required=False)
phone_number = String(required=False)
password = String(required=False)
confirm_password = String(required=False)
is_verified = Boolean(required=False)
first_name = String(required=False)
last_name = String(required=False)
language = String(required=False)
is_active = Boolean(required=False)
is_staff = Boolean(required=False)
user_permissions = List(String)
groups = List(String)
attributes = GenericScalar(required=False)
user = Field(UserType)
def mutate(self, info, uuid, **kwargs):
try:
user = User.objects.get(uuid=uuid)
except User.DoesNotExist as dne:
name = "User"
raise Http404(_(f"{name} does not exist: {uuid}")) from dne
if not (info.context.user.has_perm("vibes_auth.change_user") or info.context.user == user):
raise PermissionDenied(permission_denied_message)
email = kwargs.get("email")
if (email is not None and not is_valid_email(email)) or User.objects.filter(email=email).exclude(
uuid=uuid
).exists():
raise BadRequest(_("malformed email"))
phone_number = kwargs.get("phone_number")
if (phone_number is not None and not is_valid_phone_number(phone_number)) or (
User.objects.filter(phone_number=phone_number).exclude(uuid=uuid).exists() and phone_number is not None
):
raise BadRequest(_(f"malformed phone number: {phone_number}"))
password = kwargs.get("password", "")
confirm_password = kwargs.get("confirm_password", "")
if password:
validate_password(password=password, user=user)
if not compare_digest(password, "") and compare_digest(password, confirm_password):
user.set_password(password)
user.save()
attribute_pairs = kwargs.pop("attributes", "")
if attribute_pairs:
for attribute_pair in attribute_pairs.split(";"):
if "-" in attribute_pair:
attr, value = attribute_pair.split("-", 1)
if not user.attributes:
user.attributes = {}
user.attributes.update({attr: value})
else:
raise BadRequest(_(f"Invalid attribute format: {attribute_pair}"))
for attr, value in kwargs.items():
if attr == "password" or attr == "confirm_password":
continue
if attr not in [
"groups",
"user_permissions",
"is_verified",
"is_staff",
"is_active",
"is_superuser",
] or info.context.user.has_perm("vibes_auth.change_user"):
setattr(user, attr, value)
user.save()
return UpdateUser(user=user)
class DeleteUser(BaseMutation):
class Arguments:
email = String()
uuid = UUID()
success = Boolean()
def mutate(self, info, uuid=None, email=None):
if info.context.user.has_perm("vibes_auth.delete_user"):
try:
if uuid is not None:
User.objects.get(uuid=uuid).delete()
elif email is not None:
User.objects.get(email=email).delete()
else:
raise BadRequest("uuid or email must be specified")
# noinspection PyTypeChecker
return DeleteUser(success=True)
except User.DoesNotExist as dne:
raise Http404(f"User with the given uuid: {uuid} or email: {email} does not exist.") from dne
raise PermissionDenied(permission_denied_message)
class ObtainJSONWebToken(BaseMutation):
class Arguments:
email = String(required=True)
password = String(required=True)
user = Field(UserType)
refresh_token = String(required=True)
access_token = String(required=True)
def mutate(self, info, email, password):
serializer = TokenObtainPairSerializer(data={"email": email, "password": password})
try:
serializer.is_valid(raise_exception=True)
user = User.objects.get(email=email)
return ObtainJSONWebToken(
user=user,
refresh_token=serializer.validated_data["refresh"],
access_token=serializer.validated_data["access"],
)
except Exception as e:
raise PermissionDenied(f"invalid credentials provided: {e!s}") from e
class RefreshJSONWebToken(BaseMutation):
class Arguments:
refresh_token = String(required=True)
access_token = String()
user = Field(UserType)
refresh_token = String()
def mutate(self, info, refresh_token):
serializer = TokenRefreshSerializer(data={"refresh": refresh_token})
try:
serializer.is_valid(raise_exception=True)
return RefreshJSONWebToken(
access_token=serializer.validated_data["access"],
refresh_token=serializer.validated_data["refresh"],
user=User.objects.get(uuid=serializer.validated_data["user"]["uuid"]),
)
except Exception as e:
raise PermissionDenied(f"invalid refresh token provided: {e!s}") from e
class VerifyJSONWebToken(BaseMutation):
class Arguments:
token = String(required=True)
token_is_valid = Boolean()
user = Field(UserType)
detail = String()
def mutate(self, info, token):
serializer = TokenVerifySerializer(data={"token": token})
with suppress(Exception):
serializer.is_valid(raise_exception=True)
user_uuid = serializer.validated_data["user"]["uuid"]
user = User.objects.get(pk=user_uuid)
# noinspection PyTypeChecker
return VerifyJSONWebToken(token_is_valid=True, user=user)
detail = traceback.format_exc() if settings.DEBUG else ""
# noinspection PyTypeChecker
return VerifyJSONWebToken(token_is_valid=False, user=None, detail=detail)
class ActivateUser(BaseMutation):
class Arguments:
uid = String(required=True)
token = String(required=True)
success = Boolean()
def mutate(self, info, uid, token):
try:
token = urlsafe_base64_decode(token).decode()
uuid = urlsafe_base64_decode(uid).decode()
user = User.objects.get(pk=uuid)
if not user.check_token(token):
raise BadRequest(_("activation link is invalid!"))
if user.is_active:
raise BadRequest(_("account already activated..."))
user.is_active = True
user.is_verified = True
user.save()
except (TypeError, ValueError, OverflowError, User.DoesNotExist) as e:
raise BadRequest(_(f"something went wrong: {e!s}")) from e
# noinspection PyTypeChecker
return ActivateUser(success=True)
class ResetPassword(BaseMutation):
class Arguments:
email = String(required=True)
success = Boolean()
def mutate(self, info, email):
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
# noinspection PyTypeChecker
return ResetPassword(success=False)
send_reset_password_email_task.delay(user_pk=user.uuid)
# noinspection PyTypeChecker
return ResetPassword(success=True)
class ConfirmResetPassword(BaseMutation):
class Arguments:
uid = String(required=True)
token = String(required=True)
password = String(required=True)
confirm_password = String(required=True)
success = Boolean()
def mutate(self, info, uid, token, password, confirm_password):
try:
if not compare_digest(password, confirm_password):
raise BadRequest(_("passwords do not match"))
user = User.objects.get(pk=urlsafe_base64_decode(uid).decode())
password_reset_token = PasswordResetTokenGenerator()
if not password_reset_token.check_token(user, token):
raise BadRequest(_("token is invalid!"))
validate_password(password=password, user=user)
user.set_password(password)
user.save()
# noinspection PyTypeChecker
return ConfirmResetPassword(success=True)
except (TypeError, ValueError, OverflowError, ValidationError, User.DoesNotExist) as e:
raise BadRequest(_(f"something went wrong: {e!s}")) from e
class UploadAvatar(BaseMutation):
class Arguments:
file = Upload(required=True)
user = Field(UserType)
def mutate(self, info, file):
if not info.context.user.is_authenticated:
raise PermissionDenied(permission_denied_message)
try:
info.context.user.avatar = file
info.context.user.save()
info.context.user.refresh_from_db()
except Exception as e:
raise BadRequest(str(e)) from e
return UploadAvatar(user=info.context.user)