import logging import traceback from contextlib import suppress from hmac import compare_digest from django.conf import settings from django.contrib.auth.password_validation import validate_password from django.contrib.auth.tokens import PasswordResetTokenGenerator from django.core.exceptions import BadRequest, PermissionDenied, ValidationError from django.db import IntegrityError from django.http import Http404 from django.utils.http import urlsafe_base64_decode from django.utils.translation import gettext_lazy as _ from graphene import UUID, Boolean, Field, List, Mutation, String from graphene.types.generic import GenericScalar from graphene_file_upload.scalars import Upload from engine.core.utils.messages import permission_denied_message from engine.core.utils.security import is_safe_key from engine.vibes_auth.graphene.object_types import UserType from engine.vibes_auth.models import User from engine.vibes_auth.serializers import ( TokenObtainPairSerializer, TokenRefreshSerializer, TokenVerifySerializer, ) from engine.vibes_auth.utils.emailing import send_reset_password_email_task from engine.vibes_auth.validators import is_valid_email, is_valid_phone_number from schon.utils.ratelimit import graphql_ratelimit logger = logging.getLogger(__name__) class CreateUser(Mutation): class Arguments: email = String(required=True) password = String(required=True) confirm_password = String(required=True) last_name = String() first_name = String() phone_number = String() is_subscribed = Boolean() language = String() referrer = String( required=False, description=_( "the user's b64-encoded uuid who referred the new user to us." ), ) success = Boolean() @graphql_ratelimit(rate="5/h") def mutate( self, info, email, password, confirm_password, last_name=None, first_name=None, phone_number=None, is_subscribed=None, language=None, **kwargs, ): try: validate_password(password) if compare_digest(password.lower(), email.lower()): raise BadRequest(_("password too weak")) if compare_digest(password, confirm_password): User.objects.create_user( email=email, password=password, last_name=last_name, first_name=first_name, phone_number=phone_number, is_subscribed=is_subscribed if is_subscribed else False, language=language if language else settings.LANGUAGE_CODE, attributes={"referrer": kwargs.get("referrer", "")} if kwargs.get("referrer", "") else {}, ) # noinspection PyTypeChecker return CreateUser(success=True) # ty: ignore[unknown-argument] else: # noinspection PyTypeChecker return CreateUser(success=False) # ty: ignore[unknown-argument] except IntegrityError: # noinspection PyTypeChecker return CreateUser(success=True) # ty: ignore[unknown-argument] except Exception as e: raise BadRequest(str(e)) from e class UpdateUser(Mutation): class Arguments: uuid = UUID(required=True) email = String(required=False) phone_number = String(required=False) password = String(required=False) confirm_password = String(required=False) is_verified = Boolean(required=False) first_name = String(required=False) last_name = String(required=False) language = String(required=False) is_active = Boolean(required=False) is_staff = Boolean(required=False) user_permissions = List(String) groups = List(String) attributes = GenericScalar(required=False) user = Field(UserType) def mutate(self, info, uuid, **kwargs): try: user = User.objects.get(uuid=uuid) if not ( info.context.user.has_perm("vibes_auth.change_user") or info.context.user == user ): raise PermissionDenied(permission_denied_message) email = kwargs.get("email") if (email is not None and not is_valid_email(email)) or User.objects.filter( email=email ).exclude(uuid=uuid).exists(): raise BadRequest(_("malformed email")) phone_number = kwargs.get("phone_number") if ( phone_number is not None and not is_valid_phone_number(phone_number) ) or ( User.objects.filter(phone_number=phone_number) .exclude(uuid=uuid) .exists() and phone_number is not None ): raise BadRequest(_(f"malformed phone number: {phone_number}")) password = kwargs.get("password", "") confirm_password = kwargs.get("confirm_password", "") if password: validate_password(password=password, user=user) if not compare_digest(password, "") and compare_digest( password, confirm_password ): user.set_password(password) user.save() attribute_pairs = kwargs.pop("attributes", "") if attribute_pairs: for attribute_pair in attribute_pairs.split(";"): if "-" in attribute_pair: attr, value = attribute_pair.split("-", 1) if not user.attributes: user.attributes = {} user.attributes.update({attr: value}) else: raise BadRequest( _(f"Invalid attribute format: {attribute_pair}") ) for attr, value in kwargs.items(): if attr == "password" or attr == "confirm_password": continue if is_safe_key(attr) or info.context.user.has_perm( "vibes_auth.change_user" ): setattr(user, attr, value) user.save() return UpdateUser(user=user) # ty: ignore[unknown-argument] except User.DoesNotExist as dne: name = "User" raise Http404(_(f"{name} does not exist: {uuid}")) from dne except Exception as e: logger.warning("Could not update user: %s", str(e)) logger.debug(traceback.format_exc()) raise BadRequest(str(e)) from e class DeleteUser(Mutation): class Arguments: email = String() uuid = UUID() success = Boolean() def mutate(self, info, uuid=None, email=None): if info.context.user.has_perm("vibes_auth.delete_user"): try: if uuid is not None: User.objects.get(uuid=uuid).delete() elif email is not None: User.objects.get(email=email).delete() else: raise BadRequest("uuid or email must be specified") # noinspection PyTypeChecker return DeleteUser(success=True) # ty: ignore[unknown-argument] except User.DoesNotExist as dne: raise Http404( f"User with the given uuid: {uuid} or email: {email} does not exist." ) from dne raise PermissionDenied(permission_denied_message) class ObtainJSONWebToken(Mutation): class Arguments: email = String(required=True) password = String(required=True) user = Field(UserType) refresh_token = String(required=True) access_token = String(required=True) @graphql_ratelimit(rate="10/h") def mutate(self, info, email, password): serializer = TokenObtainPairSerializer( data={"email": email, "password": password}, retrieve_user=False ) try: serializer.is_valid(raise_exception=True) obtained_user = User.objects.get(uuid=serializer.validated_data["user"]) return ObtainJSONWebToken( user=obtained_user, # ty: ignore[unknown-argument] refresh_token=serializer.validated_data["refresh"], # ty: ignore[unknown-argument] access_token=serializer.validated_data["access"], # ty: ignore[unknown-argument] ) except Exception as e: raise PermissionDenied(f"invalid credentials provided: {e!s}") from e class RefreshJSONWebToken(Mutation): class Arguments: refresh_token = String(required=True) access_token = String() user = Field(UserType) refresh_token = String() @graphql_ratelimit(rate="10/h") def mutate(self, info, refresh_token): serializer = TokenRefreshSerializer( data={"refresh": refresh_token}, retrieve_user=False ) try: serializer.is_valid(raise_exception=True) refreshed_user = User.objects.get(uuid=serializer.validated_data["user"]) return RefreshJSONWebToken( user=refreshed_user, # ty: ignore[unknown-argument] access_token=serializer.validated_data["access"], # ty: ignore[unknown-argument] refresh_token=serializer.validated_data["refresh"], # ty: ignore[unknown-argument] ) except Exception as e: raise PermissionDenied(f"invalid refresh token provided: {e!s}") from e class VerifyJSONWebToken(Mutation): class Arguments: token = String(required=True) token_is_valid = Boolean() user = Field(UserType) detail = String() def mutate(self, info, token): serializer = TokenVerifySerializer(data={"token": token}, retrieve_user=False) with suppress(Exception): serializer.is_valid(raise_exception=True) verified_user = User.objects.get(uuid=serializer.validated_data["user"]) # noinspection PyTypeChecker return VerifyJSONWebToken( token_is_valid=True, # ty:ignore[unknown-argument] user=verified_user, # ty:ignore[unknown-argument] ) detail = traceback.format_exc() if settings.DEBUG else "" # noinspection PyTypeChecker return VerifyJSONWebToken( token_is_valid=False, user=None, detail=detail, ) class ActivateUser(Mutation): class Arguments: uid = String(required=True) token = String(required=True) success = Boolean() @graphql_ratelimit(rate="5/h") def mutate(self, info, uid, token): try: token = urlsafe_base64_decode(token).decode() uuid = urlsafe_base64_decode(uid).decode() user = User.objects.get(pk=uuid) if not user.check_token(token): raise BadRequest(_("activation link is invalid!")) if user.is_active: raise BadRequest(_("account already activated...")) user.is_active = True user.is_verified = True user.save() except (TypeError, ValueError, OverflowError, User.DoesNotExist) as e: raise BadRequest(_(f"something went wrong: {e!s}")) from e # noinspection PyTypeChecker return ActivateUser(success=True) # ty: ignore[unknown-argument] class ResetPassword(Mutation): class Arguments: email = String(required=True) success = Boolean() @graphql_ratelimit(rate="4/h") def mutate(self, info, email): try: user = User.objects.get(email=email) except User.DoesNotExist: # noinspection PyTypeChecker return ResetPassword(success=False) # ty: ignore[unknown-argument] send_reset_password_email_task.delay(user_pk=user.uuid) # noinspection PyTypeChecker return ResetPassword(success=True) # ty: ignore[unknown-argument] class ConfirmResetPassword(Mutation): class Arguments: uid = String(required=True) token = String(required=True) password = String(required=True) confirm_password = String(required=True) success = Boolean() @graphql_ratelimit(rate="5/h") def mutate(self, info, uid, token, password, confirm_password): try: if not compare_digest(password, confirm_password): raise BadRequest(_("passwords do not match")) user = User.objects.get(pk=urlsafe_base64_decode(uid).decode()) password_reset_token = PasswordResetTokenGenerator() if not password_reset_token.check_token(user, token): raise BadRequest(_("token is invalid!")) validate_password(password=password, user=user) user.set_password(password) user.save() # noinspection PyTypeChecker return ConfirmResetPassword(success=True) # ty: ignore[unknown-argument] except ( TypeError, ValueError, OverflowError, ValidationError, User.DoesNotExist, ) as e: raise BadRequest(_(f"something went wrong: {e!s}")) from e ALLOWED_IMAGE_EXTENSIONS = {"jpg", "jpeg", "png", "gif", "webp"} MAX_AVATAR_SIZE = 5 * 1024 * 1024 # 5 MB class UploadAvatar(Mutation): class Arguments: file = Upload(required=True) user = Field(UserType) @graphql_ratelimit(rate="3/h") def mutate(self, info, file): if not info.context.user.is_authenticated: raise PermissionDenied(permission_denied_message) ext = file.name.rsplit(".", 1)[-1].lower() if "." in file.name else "" if ext not in ALLOWED_IMAGE_EXTENSIONS: raise BadRequest(_("only image files are allowed (jpg, png, gif, webp)")) if file.size > MAX_AVATAR_SIZE: raise BadRequest(_("file size must not exceed 5 MB")) try: info.context.user.avatar = file info.context.user.save() info.context.user.refresh_from_db() except Exception as e: raise BadRequest(str(e)) from e return UploadAvatar(user=info.context.user) # ty: ignore[unknown-argument]