schon/engine/vibes_auth/viewsets.py
Egor fureunoir Gorbunov adec5503b2 feat(core/auth): enable encryption for sensitive fields and token handling
Add encryption for user PII fields (phone number, name, attributes) and address fields to enhance data security. Introduced timestamped activation tokens for improved validation. Included migrations to encrypt existing plaintext data.

Refactored GraphQL settings to limit query depth and optionally disable introspection for enhanced API defense. Implemented throttling to safeguard API rates.

Improved Dockerfiles for better user management and restored media migration tools for smooth instance upgrades.
2026-03-02 00:11:57 +03:00

236 lines
9.1 KiB
Python

import logging
import traceback
from contextlib import suppress
from secrets import compare_digest
from django.conf import settings
from django.contrib.auth.password_validation import validate_password
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.core.exceptions import ValidationError
from django.utils.decorators import method_decorator
from django.utils.http import urlsafe_base64_decode
from django.utils.translation import gettext_lazy as _
from django_ratelimit.decorators import ratelimit
from drf_spectacular.utils import extend_schema_view
from rest_framework import mixins, status
from rest_framework.decorators import action
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from rest_framework_simplejwt.tokens import RefreshToken
from engine.vibes_auth.docs.drf.viewsets import USER_SCHEMA
from engine.vibes_auth.models import User
from engine.vibes_auth.serializers import (
MergeRecentlyViewedSerializer,
UserSerializer,
)
from engine.vibes_auth.utils.emailing import (
send_reset_password_email_task,
send_verification_email_task,
)
logger = logging.getLogger(__name__)
# noinspection PyUnusedLocal
@extend_schema_view(**USER_SCHEMA)
class UserViewSet(
mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
GenericViewSet,
):
__doc__ = _(
"User view set implementation.\n"
"Provides a set of actions that manage user-related data such as creation, "
"retrieval, updates, deletion, and custom actions including password reset, "
"avatar upload, account activation, and recently viewed items merging. "
"This class extends the mixins and GenericViewSet for robust API handling."
)
serializer_class = UserSerializer
queryset = User.objects.filter(is_active=True)
permission_classes = [AllowAny]
@action(detail=False, methods=("POST",))
@method_decorator(
ratelimit(key="ip", rate="4/h" if not settings.DEBUG else "888/h")
)
def reset_password(self, request: Request) -> Response:
user = None
with suppress(User.DoesNotExist):
user = User.objects.get(email=request.data.get("email"))
if user:
send_reset_password_email_task.delay(user_pk=str(user.uuid))
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=("PUT",), permission_classes=[IsAuthenticated])
@method_decorator(
ratelimit(key="ip", rate="3/h" if not settings.DEBUG else "888/h")
)
def upload_avatar(self, request: Request, *args, **kwargs) -> Response:
user = self.get_object()
if request.user != user:
return Response(status=status.HTTP_403_FORBIDDEN)
if "avatar" in request.FILES:
user.avatar = request.FILES["avatar"]
user.save()
return Response(
status=status.HTTP_200_OK, data=self.serializer_class(user).data
)
return Response(status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=("POST",))
@method_decorator(
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
)
def confirm_password_reset(self, request: Request, *args, **kwargs) -> Response:
try:
if not compare_digest(
str(request.data.get("password")),
str(request.data.get("confirm_password")),
):
return Response(
{"error": _("passwords do not match")},
status=status.HTTP_400_BAD_REQUEST,
)
uuid = urlsafe_base64_decode(str(request.data.get("uidb_64"))).decode()
user = User.objects.get(pk=uuid)
validate_password(password=str(request.data.get("password")), user=user)
password_reset_token = PasswordResetTokenGenerator()
if not password_reset_token.check_token(user, request.data.get("token")):
return Response(
{"error": _("token is invalid!")},
status=status.HTTP_400_BAD_REQUEST,
)
user.set_password(request.data.get("password"))
user.save()
return Response(
{"message": _("password reset successfully")}, status=status.HTTP_200_OK
)
except (
TypeError,
ValueError,
OverflowError,
ValidationError,
User.DoesNotExist,
) as e:
data = {"error": str(e)}
if settings.DEBUG:
data["detail"] = str(traceback.format_exc())
data["received"] = str(request.data)
return Response(data, status=status.HTTP_400_BAD_REQUEST)
@method_decorator(
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
)
def create(self, request: Request, *args, **kwargs) -> Response:
email = request.data.get("email")
if email:
with suppress(User.DoesNotExist):
pending = User.objects.get(
email=email, is_active=False, is_verified=False
)
pending.refresh_activation_token()
pending.save()
send_verification_email_task.delay(user_pk=str(pending.uuid))
return Response(
{
"detail": _(
"Account already registered but not yet activated. A new activation email has been sent."
)
},
status=status.HTTP_200_OK,
)
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.save()
user.save()
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data, status=status.HTTP_201_CREATED, headers=headers
)
@action(detail=False, methods=("POST",))
@method_decorator(
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
)
def activate(self, request: Request) -> Response:
detail = ""
activation_error: Exception | None = None
try:
uuid = urlsafe_base64_decode(str(request.data.get("uidb_64"))).decode()
user = User.objects.get(pk=uuid)
if not user.check_token(
urlsafe_base64_decode(str(request.data.get("token"))).decode()
):
return Response(
{"error": _("activation link is invalid!")},
status=status.HTTP_400_BAD_REQUEST,
)
if user.is_active:
return Response(
{"error": _("account already activated!")},
status=status.HTTP_400_BAD_REQUEST,
)
user.is_active = True
user.is_verified = True
user.save()
except (
TypeError,
ValueError,
OverflowError,
User.DoesNotExist,
) as activation_error:
user = None
activation_error = activation_error
detail = str(traceback.format_exc())
if user is None:
if settings.DEBUG:
raise Exception from activation_error # ty:ignore[possibly-unresolved-reference]
return Response(
{"error": _("activation link is invalid!"), "detail": detail},
status=status.HTTP_400_BAD_REQUEST,
)
else:
tokens = RefreshToken.for_user(user)
response_data = self.serializer_class(user).data
response_data["refresh"] = str(tokens)
response_data["access"] = str(tokens.access_token)
return Response(response_data, status=status.HTTP_200_OK)
@action(detail=True, methods=("PUT",), permission_classes=[IsAuthenticated])
def merge_recently_viewed(self, request: Request, *args, **kwargs) -> Response:
user = self.get_object()
if request.user != user:
return Response(status=status.HTTP_403_FORBIDDEN)
serializer = MergeRecentlyViewedSerializer(
data=request.data, context=self.get_serializer_context()
)
serializer.is_valid(raise_exception=True)
for product_uuid in serializer.validated_data["product_uuids"]:
user.add_to_recently_viewed(product_uuid)
return Response(
status=status.HTTP_202_ACCEPTED, data=self.serializer_class(user).data
)
def retrieve(self, request: Request, *args, **kwargs) -> Response:
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)
def update(self, request: Request, *args, **kwargs) -> Response:
instance = self.get_object()
serializer = self.get_serializer(instance)
instance = serializer.update(
instance=self.get_object(), validated_data=request.data
)
return Response(self.get_serializer(instance).data)