Fixes: 1) Replace direct user access with UUID-based lookup in token mutations; Extra: 1) Updated all three token mutation methods to use User.objects.get by UUID; 2) Added consistent formatting and indentation.
358 lines
12 KiB
Python
358 lines
12 KiB
Python
import logging
|
|
import traceback
|
|
from contextlib import suppress
|
|
from hmac import compare_digest
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.password_validation import validate_password
|
|
from django.contrib.auth.tokens import PasswordResetTokenGenerator
|
|
from django.core.exceptions import BadRequest, PermissionDenied, ValidationError
|
|
from django.db import IntegrityError
|
|
from django.http import Http404
|
|
from django.utils.http import urlsafe_base64_decode
|
|
from django.utils.translation import gettext_lazy as _
|
|
from graphene import UUID, Boolean, Field, List, String
|
|
from graphene.types.generic import GenericScalar
|
|
from graphene_file_upload.scalars import Upload
|
|
|
|
from engine.core.graphene import BaseMutation
|
|
from engine.core.utils.messages import permission_denied_message
|
|
from engine.vibes_auth.graphene.object_types import UserType
|
|
from engine.vibes_auth.models import User
|
|
from engine.vibes_auth.serializers import (
|
|
TokenObtainPairSerializer,
|
|
TokenRefreshSerializer,
|
|
TokenVerifySerializer,
|
|
)
|
|
from engine.vibes_auth.utils.emailing import send_reset_password_email_task
|
|
from engine.vibes_auth.validators import is_valid_email, is_valid_phone_number
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CreateUser(BaseMutation):
|
|
class Arguments:
|
|
email = String(required=True)
|
|
password = String(required=True)
|
|
confirm_password = String(required=True)
|
|
last_name = String()
|
|
first_name = String()
|
|
phone_number = String()
|
|
is_subscribed = Boolean()
|
|
language = String()
|
|
referrer = String(required=False, description=_("the user's b64-encoded uuid who referred the new user to us."))
|
|
|
|
success = Boolean()
|
|
|
|
def mutate(
|
|
self,
|
|
info,
|
|
email,
|
|
password,
|
|
confirm_password,
|
|
last_name=None,
|
|
first_name=None,
|
|
phone_number=None,
|
|
is_subscribed=None,
|
|
language=None,
|
|
**kwargs,
|
|
):
|
|
try:
|
|
validate_password(password)
|
|
if compare_digest(password.lower(), email.lower()):
|
|
raise BadRequest(_("password too weak"))
|
|
if compare_digest(password, confirm_password):
|
|
User.objects.create_user(
|
|
email=email,
|
|
password=password,
|
|
last_name=last_name,
|
|
first_name=first_name,
|
|
phone_number=phone_number,
|
|
is_subscribed=is_subscribed if is_subscribed else False,
|
|
language=language if language else settings.LANGUAGE_CODE,
|
|
attributes={"referrer": kwargs.get("referrer", "")} if kwargs.get("referrer", "") else {},
|
|
)
|
|
# noinspection PyTypeChecker
|
|
return CreateUser(success=True)
|
|
else:
|
|
# noinspection PyTypeChecker
|
|
return CreateUser(success=False)
|
|
except IntegrityError:
|
|
# noinspection PyTypeChecker
|
|
return CreateUser(success=True)
|
|
except Exception as e:
|
|
raise BadRequest(str(e)) from e
|
|
|
|
|
|
class UpdateUser(BaseMutation):
|
|
class Arguments:
|
|
uuid = UUID(required=True)
|
|
email = String(required=False)
|
|
phone_number = String(required=False)
|
|
password = String(required=False)
|
|
confirm_password = String(required=False)
|
|
is_verified = Boolean(required=False)
|
|
first_name = String(required=False)
|
|
last_name = String(required=False)
|
|
language = String(required=False)
|
|
is_active = Boolean(required=False)
|
|
is_staff = Boolean(required=False)
|
|
user_permissions = List(String)
|
|
groups = List(String)
|
|
attributes = GenericScalar(required=False)
|
|
|
|
user = Field(UserType)
|
|
|
|
def mutate(self, info, uuid, **kwargs):
|
|
try:
|
|
user = User.objects.get(uuid=uuid)
|
|
|
|
except User.DoesNotExist as dne:
|
|
name = "User"
|
|
raise Http404(_(f"{name} does not exist: {uuid}")) from dne
|
|
|
|
if not (info.context.user.has_perm("vibes_auth.change_user") or info.context.user == user):
|
|
raise PermissionDenied(permission_denied_message)
|
|
|
|
email = kwargs.get("email")
|
|
|
|
if (email is not None and not is_valid_email(email)) or User.objects.filter(email=email).exclude(
|
|
uuid=uuid
|
|
).exists():
|
|
raise BadRequest(_("malformed email"))
|
|
|
|
phone_number = kwargs.get("phone_number")
|
|
|
|
if (phone_number is not None and not is_valid_phone_number(phone_number)) or (
|
|
User.objects.filter(phone_number=phone_number).exclude(uuid=uuid).exists() and phone_number is not None
|
|
):
|
|
raise BadRequest(_(f"malformed phone number: {phone_number}"))
|
|
|
|
password = kwargs.get("password", "")
|
|
confirm_password = kwargs.get("confirm_password", "")
|
|
|
|
if password:
|
|
validate_password(password=password, user=user)
|
|
|
|
if not compare_digest(password, "") and compare_digest(password, confirm_password):
|
|
user.set_password(password)
|
|
user.save()
|
|
|
|
attribute_pairs = kwargs.pop("attributes", "")
|
|
|
|
if attribute_pairs:
|
|
for attribute_pair in attribute_pairs.split(";"):
|
|
if "-" in attribute_pair:
|
|
attr, value = attribute_pair.split("-", 1)
|
|
if not user.attributes:
|
|
user.attributes = {}
|
|
user.attributes.update({attr: value})
|
|
else:
|
|
raise BadRequest(_(f"Invalid attribute format: {attribute_pair}"))
|
|
|
|
for attr, value in kwargs.items():
|
|
if attr == "password" or attr == "confirm_password":
|
|
continue
|
|
if attr not in [
|
|
"groups",
|
|
"user_permissions",
|
|
"is_verified",
|
|
"is_staff",
|
|
"is_active",
|
|
"is_superuser",
|
|
] or info.context.user.has_perm("vibes_auth.change_user"):
|
|
setattr(user, attr, value)
|
|
|
|
user.save()
|
|
|
|
return UpdateUser(user=user)
|
|
|
|
|
|
class DeleteUser(BaseMutation):
|
|
class Arguments:
|
|
email = String()
|
|
uuid = UUID()
|
|
|
|
success = Boolean()
|
|
|
|
def mutate(self, info, uuid=None, email=None):
|
|
if info.context.user.has_perm("vibes_auth.delete_user"):
|
|
try:
|
|
if uuid is not None:
|
|
User.objects.get(uuid=uuid).delete()
|
|
elif email is not None:
|
|
User.objects.get(email=email).delete()
|
|
else:
|
|
raise BadRequest("uuid or email must be specified")
|
|
# noinspection PyTypeChecker
|
|
return DeleteUser(success=True)
|
|
except User.DoesNotExist as dne:
|
|
raise Http404(f"User with the given uuid: {uuid} or email: {email} does not exist.") from dne
|
|
raise PermissionDenied(permission_denied_message)
|
|
|
|
|
|
class ObtainJSONWebToken(BaseMutation):
|
|
class Arguments:
|
|
email = String(required=True)
|
|
password = String(required=True)
|
|
|
|
user = Field(UserType)
|
|
refresh_token = String(required=True)
|
|
access_token = String(required=True)
|
|
|
|
def mutate(self, info, email, password):
|
|
serializer = TokenObtainPairSerializer(data={"email": email, "password": password})
|
|
try:
|
|
serializer.is_valid(raise_exception=True)
|
|
return ObtainJSONWebToken(
|
|
user=User.objects.get(serializer.validated_data["user"]["uuid"]),
|
|
refresh_token=serializer.validated_data["refresh"],
|
|
access_token=serializer.validated_data["access"],
|
|
)
|
|
except Exception as e:
|
|
raise PermissionDenied(f"invalid credentials provided: {e!s}") from e
|
|
|
|
|
|
class RefreshJSONWebToken(BaseMutation):
|
|
class Arguments:
|
|
refresh_token = String(required=True)
|
|
|
|
access_token = String()
|
|
user = Field(UserType)
|
|
refresh_token = String()
|
|
|
|
def mutate(self, info, refresh_token):
|
|
serializer = TokenRefreshSerializer(data={"refresh": refresh_token})
|
|
try:
|
|
serializer.is_valid(raise_exception=True)
|
|
return RefreshJSONWebToken(
|
|
user=User.objects.get(serializer.validated_data["user"]["uuid"]),
|
|
access_token=serializer.validated_data["access"],
|
|
refresh_token=serializer.validated_data["refresh"],
|
|
)
|
|
except Exception as e:
|
|
raise PermissionDenied(f"invalid refresh token provided: {e!s}") from e
|
|
|
|
|
|
class VerifyJSONWebToken(BaseMutation):
|
|
class Arguments:
|
|
token = String(required=True)
|
|
|
|
token_is_valid = Boolean()
|
|
user = Field(UserType)
|
|
detail = String()
|
|
|
|
def mutate(self, info, token):
|
|
serializer = TokenVerifySerializer(data={"token": token})
|
|
with suppress(Exception):
|
|
serializer.is_valid(raise_exception=True)
|
|
# noinspection PyTypeChecker
|
|
return VerifyJSONWebToken(
|
|
token_is_valid=True, user=User.objects.get(serializer.validated_data["user"]["uuid"])
|
|
)
|
|
detail = traceback.format_exc() if settings.DEBUG else ""
|
|
# noinspection PyTypeChecker
|
|
return VerifyJSONWebToken(token_is_valid=False, user=None, detail=detail)
|
|
|
|
|
|
class ActivateUser(BaseMutation):
|
|
class Arguments:
|
|
uid = String(required=True)
|
|
token = String(required=True)
|
|
|
|
success = Boolean()
|
|
|
|
def mutate(self, info, uid, token):
|
|
try:
|
|
token = urlsafe_base64_decode(token).decode()
|
|
uuid = urlsafe_base64_decode(uid).decode()
|
|
user = User.objects.get(pk=uuid)
|
|
|
|
if not user.check_token(token):
|
|
raise BadRequest(_("activation link is invalid!"))
|
|
|
|
if user.is_active:
|
|
raise BadRequest(_("account already activated..."))
|
|
|
|
user.is_active = True
|
|
user.is_verified = True
|
|
user.save()
|
|
|
|
except (TypeError, ValueError, OverflowError, User.DoesNotExist) as e:
|
|
raise BadRequest(_(f"something went wrong: {e!s}")) from e
|
|
|
|
# noinspection PyTypeChecker
|
|
return ActivateUser(success=True)
|
|
|
|
|
|
class ResetPassword(BaseMutation):
|
|
class Arguments:
|
|
email = String(required=True)
|
|
|
|
success = Boolean()
|
|
|
|
def mutate(self, info, email):
|
|
try:
|
|
user = User.objects.get(email=email)
|
|
except User.DoesNotExist:
|
|
# noinspection PyTypeChecker
|
|
return ResetPassword(success=False)
|
|
|
|
send_reset_password_email_task.delay(user_pk=user.uuid)
|
|
|
|
# noinspection PyTypeChecker
|
|
return ResetPassword(success=True)
|
|
|
|
|
|
class ConfirmResetPassword(BaseMutation):
|
|
class Arguments:
|
|
uid = String(required=True)
|
|
token = String(required=True)
|
|
password = String(required=True)
|
|
confirm_password = String(required=True)
|
|
|
|
success = Boolean()
|
|
|
|
def mutate(self, info, uid, token, password, confirm_password):
|
|
try:
|
|
if not compare_digest(password, confirm_password):
|
|
raise BadRequest(_("passwords do not match"))
|
|
|
|
user = User.objects.get(pk=urlsafe_base64_decode(uid).decode())
|
|
|
|
password_reset_token = PasswordResetTokenGenerator()
|
|
|
|
if not password_reset_token.check_token(user, token):
|
|
raise BadRequest(_("token is invalid!"))
|
|
|
|
validate_password(password=password, user=user)
|
|
|
|
user.set_password(password)
|
|
|
|
user.save()
|
|
|
|
# noinspection PyTypeChecker
|
|
return ConfirmResetPassword(success=True)
|
|
|
|
except (TypeError, ValueError, OverflowError, ValidationError, User.DoesNotExist) as e:
|
|
raise BadRequest(_(f"something went wrong: {e!s}")) from e
|
|
|
|
|
|
class UploadAvatar(BaseMutation):
|
|
class Arguments:
|
|
file = Upload(required=True)
|
|
|
|
user = Field(UserType)
|
|
|
|
def mutate(self, info, file):
|
|
if not info.context.user.is_authenticated:
|
|
raise PermissionDenied(permission_denied_message)
|
|
|
|
try:
|
|
info.context.user.avatar = file
|
|
info.context.user.save()
|
|
info.context.user.refresh_from_db()
|
|
except Exception as e:
|
|
raise BadRequest(str(e)) from e
|
|
|
|
return UploadAvatar(user=info.context.user)
|