schon/engine/vibes_auth/graphene/mutations.py
Egor fureunoir Gorbunov 43ece8c25c Features: 1) Add language validation and fallback for users without proper language settings in initialize.py; 2) Enhance user update mutation with improved error handling and dynamic attribute support; 3) Integrate is_safe_key validation in user attribute updates for better security.
Fixes: 1) Add missing imports for `settings`, `Q`, and `is_safe_key`; 2) Correct user permissions keys in `lists.py` utility.

Extra: 1) Refactor `UpdateUser` mutation for cleaner structure and unified error handling; 2) Format and tidy up list declarations and exception handling for clarity.
2025-11-25 14:26:14 +03:00

356 lines
12 KiB
Python

import logging
import traceback
from contextlib import suppress
from hmac import compare_digest
from django.conf import settings
from django.contrib.auth.password_validation import validate_password
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.core.exceptions import BadRequest, PermissionDenied, ValidationError
from django.db import IntegrityError
from django.http import Http404
from django.utils.http import urlsafe_base64_decode
from django.utils.translation import gettext_lazy as _
from graphene import UUID, Boolean, Field, List, String
from graphene.types.generic import GenericScalar
from graphene_file_upload.scalars import Upload
from engine.core.graphene import BaseMutation
from engine.core.utils.messages import permission_denied_message
from engine.core.utils.security import is_safe_key
from engine.vibes_auth.graphene.object_types import UserType
from engine.vibes_auth.models import User
from engine.vibes_auth.serializers import (
TokenObtainPairSerializer,
TokenRefreshSerializer,
TokenVerifySerializer,
)
from engine.vibes_auth.utils.emailing import send_reset_password_email_task
from engine.vibes_auth.validators import is_valid_email, is_valid_phone_number
logger = logging.getLogger(__name__)
class CreateUser(BaseMutation):
class Arguments:
email = String(required=True)
password = String(required=True)
confirm_password = String(required=True)
last_name = String()
first_name = String()
phone_number = String()
is_subscribed = Boolean()
language = String()
referrer = String(required=False, description=_("the user's b64-encoded uuid who referred the new user to us."))
success = Boolean()
def mutate(
self,
info,
email,
password,
confirm_password,
last_name=None,
first_name=None,
phone_number=None,
is_subscribed=None,
language=None,
**kwargs,
):
try:
validate_password(password)
if compare_digest(password.lower(), email.lower()):
raise BadRequest(_("password too weak"))
if compare_digest(password, confirm_password):
User.objects.create_user(
email=email,
password=password,
last_name=last_name,
first_name=first_name,
phone_number=phone_number,
is_subscribed=is_subscribed if is_subscribed else False,
language=language if language else settings.LANGUAGE_CODE,
attributes={"referrer": kwargs.get("referrer", "")} if kwargs.get("referrer", "") else {},
)
# noinspection PyTypeChecker
return CreateUser(success=True)
else:
# noinspection PyTypeChecker
return CreateUser(success=False)
except IntegrityError:
# noinspection PyTypeChecker
return CreateUser(success=True)
except Exception as e:
raise BadRequest(str(e)) from e
class UpdateUser(BaseMutation):
class Arguments:
uuid = UUID(required=True)
email = String(required=False)
phone_number = String(required=False)
password = String(required=False)
confirm_password = String(required=False)
is_verified = Boolean(required=False)
first_name = String(required=False)
last_name = String(required=False)
language = String(required=False)
is_active = Boolean(required=False)
is_staff = Boolean(required=False)
user_permissions = List(String)
groups = List(String)
attributes = GenericScalar(required=False)
user = Field(UserType)
def mutate(self, info, uuid, **kwargs):
try:
user = User.objects.get(uuid=uuid)
if not (info.context.user.has_perm("vibes_auth.change_user") or info.context.user == user):
raise PermissionDenied(permission_denied_message)
email = kwargs.get("email")
if (email is not None and not is_valid_email(email)) or User.objects.filter(email=email).exclude(
uuid=uuid
).exists():
raise BadRequest(_("malformed email"))
phone_number = kwargs.get("phone_number")
if (phone_number is not None and not is_valid_phone_number(phone_number)) or (
User.objects.filter(phone_number=phone_number).exclude(uuid=uuid).exists() and phone_number is not None
):
raise BadRequest(_(f"malformed phone number: {phone_number}"))
password = kwargs.get("password", "")
confirm_password = kwargs.get("confirm_password", "")
if password:
validate_password(password=password, user=user)
if not compare_digest(password, "") and compare_digest(password, confirm_password):
user.set_password(password)
user.save()
attribute_pairs = kwargs.pop("attributes", "")
if attribute_pairs:
for attribute_pair in attribute_pairs.split(";"):
if "-" in attribute_pair:
attr, value = attribute_pair.split("-", 1)
if not user.attributes:
user.attributes = {}
user.attributes.update({attr: value})
else:
raise BadRequest(_(f"Invalid attribute format: {attribute_pair}"))
for attr, value in kwargs.items():
if attr == "password" or attr == "confirm_password":
continue
if is_safe_key(attr) or info.context.user.has_perm("vibes_auth.change_user"):
setattr(user, attr, value)
user.save()
return UpdateUser(user=user)
except User.DoesNotExist as dne:
name = "User"
raise Http404(_(f"{name} does not exist: {uuid}")) from dne
except Exception as e:
logger.warning("Could not update user: %s", str(e))
logger.debug(traceback.format_exc())
raise BadRequest(str(e)) from e
class DeleteUser(BaseMutation):
class Arguments:
email = String()
uuid = UUID()
success = Boolean()
def mutate(self, info, uuid=None, email=None):
if info.context.user.has_perm("vibes_auth.delete_user"):
try:
if uuid is not None:
User.objects.get(uuid=uuid).delete()
elif email is not None:
User.objects.get(email=email).delete()
else:
raise BadRequest("uuid or email must be specified")
# noinspection PyTypeChecker
return DeleteUser(success=True)
except User.DoesNotExist as dne:
raise Http404(f"User with the given uuid: {uuid} or email: {email} does not exist.") from dne
raise PermissionDenied(permission_denied_message)
class ObtainJSONWebToken(BaseMutation):
class Arguments:
email = String(required=True)
password = String(required=True)
user = Field(UserType)
refresh_token = String(required=True)
access_token = String(required=True)
def mutate(self, info, email, password):
serializer = TokenObtainPairSerializer(data={"email": email, "password": password}, retrieve_user=False)
try:
serializer.is_valid(raise_exception=True)
return ObtainJSONWebToken(
user=User.objects.get(uuid=serializer.validated_data["user"]),
refresh_token=serializer.validated_data["refresh"],
access_token=serializer.validated_data["access"],
)
except Exception as e:
raise PermissionDenied(f"invalid credentials provided: {e!s}") from e
class RefreshJSONWebToken(BaseMutation):
class Arguments:
refresh_token = String(required=True)
access_token = String()
user = Field(UserType)
refresh_token = String()
def mutate(self, info, refresh_token):
serializer = TokenRefreshSerializer(data={"refresh": refresh_token}, retrieve_user=False)
try:
serializer.is_valid(raise_exception=True)
return RefreshJSONWebToken(
user=User.objects.get(uuid=serializer.validated_data["user"]),
access_token=serializer.validated_data["access"],
refresh_token=serializer.validated_data["refresh"],
)
except Exception as e:
raise PermissionDenied(f"invalid refresh token provided: {e!s}") from e
class VerifyJSONWebToken(BaseMutation):
class Arguments:
token = String(required=True)
token_is_valid = Boolean()
user = Field(UserType)
detail = String()
def mutate(self, info, token):
serializer = TokenVerifySerializer(data={"token": token}, retrieve_user=False)
with suppress(Exception):
serializer.is_valid(raise_exception=True)
# noinspection PyTypeChecker
return VerifyJSONWebToken(
token_is_valid=True, user=User.objects.get(uuid=serializer.validated_data["user"])
)
detail = traceback.format_exc() if settings.DEBUG else ""
# noinspection PyTypeChecker
return VerifyJSONWebToken(token_is_valid=False, user=None, detail=detail)
class ActivateUser(BaseMutation):
class Arguments:
uid = String(required=True)
token = String(required=True)
success = Boolean()
def mutate(self, info, uid, token):
try:
token = urlsafe_base64_decode(token).decode()
uuid = urlsafe_base64_decode(uid).decode()
user = User.objects.get(pk=uuid)
if not user.check_token(token):
raise BadRequest(_("activation link is invalid!"))
if user.is_active:
raise BadRequest(_("account already activated..."))
user.is_active = True
user.is_verified = True
user.save()
except (TypeError, ValueError, OverflowError, User.DoesNotExist) as e:
raise BadRequest(_(f"something went wrong: {e!s}")) from e
# noinspection PyTypeChecker
return ActivateUser(success=True)
class ResetPassword(BaseMutation):
class Arguments:
email = String(required=True)
success = Boolean()
def mutate(self, info, email):
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
# noinspection PyTypeChecker
return ResetPassword(success=False)
send_reset_password_email_task.delay(user_pk=user.uuid)
# noinspection PyTypeChecker
return ResetPassword(success=True)
class ConfirmResetPassword(BaseMutation):
class Arguments:
uid = String(required=True)
token = String(required=True)
password = String(required=True)
confirm_password = String(required=True)
success = Boolean()
def mutate(self, info, uid, token, password, confirm_password):
try:
if not compare_digest(password, confirm_password):
raise BadRequest(_("passwords do not match"))
user = User.objects.get(pk=urlsafe_base64_decode(uid).decode())
password_reset_token = PasswordResetTokenGenerator()
if not password_reset_token.check_token(user, token):
raise BadRequest(_("token is invalid!"))
validate_password(password=password, user=user)
user.set_password(password)
user.save()
# noinspection PyTypeChecker
return ConfirmResetPassword(success=True)
except (TypeError, ValueError, OverflowError, ValidationError, User.DoesNotExist) as e:
raise BadRequest(_(f"something went wrong: {e!s}")) from e
class UploadAvatar(BaseMutation):
class Arguments:
file = Upload(required=True)
user = Field(UserType)
def mutate(self, info, file):
if not info.context.user.is_authenticated:
raise PermissionDenied(permission_denied_message)
try:
info.context.user.avatar = file
info.context.user.save()
info.context.user.refresh_from_db()
except Exception as e:
raise BadRequest(str(e)) from e
return UploadAvatar(user=info.context.user)