Features: 1) Add language validation and fallback for users without proper language settings in initialize.py; 2) Enhance user update mutation with improved error handling and dynamic attribute support; 3) Integrate is_safe_key validation in user attribute updates for better security.

Fixes: 1) Add missing imports for `settings`, `Q`, and `is_safe_key`; 2) Correct user permissions keys in `lists.py` utility.

Extra: 1) Refactor `UpdateUser` mutation for cleaner structure and unified error handling; 2) Format and tidy up list declarations and exception handling for clarity.
This commit is contained in:
Egor Pavlovich Gorbunov 2025-11-25 14:26:14 +03:00
parent 476e9c73c5
commit 43ece8c25c
3 changed files with 63 additions and 58 deletions

View file

@ -1,11 +1,13 @@
import logging import logging
from typing import Any from typing import Any
from django.conf import settings
from django.contrib.auth.models import Permission from django.contrib.auth.models import Permission
from django.core.management.base import BaseCommand from django.core.management.base import BaseCommand
from django.db.models import Q
from engine.core.models import Vendor from engine.core.models import Vendor
from engine.vibes_auth.models import Group from engine.vibes_auth.models import Group, User
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -155,4 +157,7 @@ class Command(BaseCommand):
perms = Permission.objects.filter(codename__in=e_commerce_admin_permissions) perms = Permission.objects.filter(codename__in=e_commerce_admin_permissions)
e_commerce_admin.permissions.add(*perms) e_commerce_admin.permissions.add(*perms)
valid_codes = [code for code, _ in settings.LANGUAGES]
(User.objects.filter(Q(language="") | ~Q(language__in=valid_codes)).update(language=settings.LANGUAGE_CODE))
self.stdout.write(self.style.SUCCESS("Successfully initialized must-have instances!")) self.stdout.write(self.style.SUCCESS("Successfully initialized must-have instances!"))

View file

@ -16,5 +16,7 @@ BAD_KEYS_TO_LISTEN = [
"is_staff", "is_staff",
"is_superuser", "is_superuser",
"is_active", "is_active",
"active", "is_verified",
"groups",
"user_permissions",
] ]

View file

@ -17,6 +17,7 @@ from graphene_file_upload.scalars import Upload
from engine.core.graphene import BaseMutation from engine.core.graphene import BaseMutation
from engine.core.utils.messages import permission_denied_message from engine.core.utils.messages import permission_denied_message
from engine.core.utils.security import is_safe_key
from engine.vibes_auth.graphene.object_types import UserType from engine.vibes_auth.graphene.object_types import UserType
from engine.vibes_auth.models import User from engine.vibes_auth.models import User
from engine.vibes_auth.serializers import ( from engine.vibes_auth.serializers import (
@ -107,65 +108,62 @@ class UpdateUser(BaseMutation):
try: try:
user = User.objects.get(uuid=uuid) user = User.objects.get(uuid=uuid)
if not (info.context.user.has_perm("vibes_auth.change_user") or info.context.user == user):
raise PermissionDenied(permission_denied_message)
email = kwargs.get("email")
if (email is not None and not is_valid_email(email)) or User.objects.filter(email=email).exclude(
uuid=uuid
).exists():
raise BadRequest(_("malformed email"))
phone_number = kwargs.get("phone_number")
if (phone_number is not None and not is_valid_phone_number(phone_number)) or (
User.objects.filter(phone_number=phone_number).exclude(uuid=uuid).exists() and phone_number is not None
):
raise BadRequest(_(f"malformed phone number: {phone_number}"))
password = kwargs.get("password", "")
confirm_password = kwargs.get("confirm_password", "")
if password:
validate_password(password=password, user=user)
if not compare_digest(password, "") and compare_digest(password, confirm_password):
user.set_password(password)
user.save()
attribute_pairs = kwargs.pop("attributes", "")
if attribute_pairs:
for attribute_pair in attribute_pairs.split(";"):
if "-" in attribute_pair:
attr, value = attribute_pair.split("-", 1)
if not user.attributes:
user.attributes = {}
user.attributes.update({attr: value})
else:
raise BadRequest(_(f"Invalid attribute format: {attribute_pair}"))
for attr, value in kwargs.items():
if attr == "password" or attr == "confirm_password":
continue
if is_safe_key(attr) or info.context.user.has_perm("vibes_auth.change_user"):
setattr(user, attr, value)
user.save()
return UpdateUser(user=user)
except User.DoesNotExist as dne: except User.DoesNotExist as dne:
name = "User" name = "User"
raise Http404(_(f"{name} does not exist: {uuid}")) from dne raise Http404(_(f"{name} does not exist: {uuid}")) from dne
except Exception as e:
if not (info.context.user.has_perm("vibes_auth.change_user") or info.context.user == user): logger.warning("Could not update user: %s", str(e))
raise PermissionDenied(permission_denied_message) logger.debug(traceback.format_exc())
raise BadRequest(str(e)) from e
email = kwargs.get("email")
if (email is not None and not is_valid_email(email)) or User.objects.filter(email=email).exclude(
uuid=uuid
).exists():
raise BadRequest(_("malformed email"))
phone_number = kwargs.get("phone_number")
if (phone_number is not None and not is_valid_phone_number(phone_number)) or (
User.objects.filter(phone_number=phone_number).exclude(uuid=uuid).exists() and phone_number is not None
):
raise BadRequest(_(f"malformed phone number: {phone_number}"))
password = kwargs.get("password", "")
confirm_password = kwargs.get("confirm_password", "")
if password:
validate_password(password=password, user=user)
if not compare_digest(password, "") and compare_digest(password, confirm_password):
user.set_password(password)
user.save()
attribute_pairs = kwargs.pop("attributes", "")
if attribute_pairs:
for attribute_pair in attribute_pairs.split(";"):
if "-" in attribute_pair:
attr, value = attribute_pair.split("-", 1)
if not user.attributes:
user.attributes = {}
user.attributes.update({attr: value})
else:
raise BadRequest(_(f"Invalid attribute format: {attribute_pair}"))
for attr, value in kwargs.items():
if attr == "password" or attr == "confirm_password":
continue
if attr not in [
"groups",
"user_permissions",
"is_verified",
"is_staff",
"is_active",
"is_superuser",
] or info.context.user.has_perm("vibes_auth.change_user"):
setattr(user, attr, value)
user.save()
return UpdateUser(user=user)
class DeleteUser(BaseMutation): class DeleteUser(BaseMutation):