schon/engine/vibes_auth/viewsets.py
Egor fureunoir Gorbunov ad320235d6 feat(payments, vibes_auth, core): introduce decimal fields, 2FA, and admin OTP
- Refactored monetary fields across models to use `DecimalField` for improved precision.
- Implemented two-factor authentication (2FA) for admin logins with OTP codes.
- Added ability to generate admin OTP via management commands.
- Updated Docker Compose override for dev-specific port bindings.
- Included template for 2FA OTP verification to enhance security.

Additional changes:
- Upgraded and downgraded various dependencies (e.g., django-celery-beat and yarl).
- Replaced float-based calculations with decimal for consistent rounding behavior.
- Improved admin user management commands for activation and OTP generation.
2026-03-03 00:42:21 +03:00

234 lines
9.1 KiB
Python

import logging
import traceback
from contextlib import suppress
from secrets import compare_digest
from django.conf import settings
from django.contrib.auth.password_validation import validate_password
from django.contrib.auth.tokens import PasswordResetTokenGenerator
from django.core.exceptions import ValidationError
from django.utils.decorators import method_decorator
from django.utils.http import urlsafe_base64_decode
from django.utils.translation import gettext_lazy as _
from django_ratelimit.decorators import ratelimit
from drf_spectacular.utils import extend_schema_view
from rest_framework import mixins, status
from rest_framework.decorators import action
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from rest_framework_simplejwt.tokens import RefreshToken
from engine.vibes_auth.docs.drf.viewsets import USER_SCHEMA
from engine.vibes_auth.models import User
from engine.vibes_auth.serializers import (
MergeRecentlyViewedSerializer,
UserSerializer,
)
from engine.vibes_auth.utils.emailing import (
send_reset_password_email_task,
send_verification_email_task,
)
logger = logging.getLogger(__name__)
# noinspection PyUnusedLocal
@extend_schema_view(**USER_SCHEMA)
class UserViewSet(
mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
mixins.UpdateModelMixin,
mixins.DestroyModelMixin,
GenericViewSet,
):
__doc__ = _(
"User view set implementation.\n"
"Provides a set of actions that manage user-related data such as creation, "
"retrieval, updates, deletion, and custom actions including password reset, "
"avatar upload, account activation, and recently viewed items merging. "
"This class extends the mixins and GenericViewSet for robust API handling."
)
serializer_class = UserSerializer
queryset = User.objects.filter(is_active=True)
permission_classes = [AllowAny]
@action(detail=False, methods=("POST",))
@method_decorator(
ratelimit(key="ip", rate="4/h" if not settings.DEBUG else "888/h")
)
def reset_password(self, request: Request) -> Response:
user = None
with suppress(User.DoesNotExist):
user = User.objects.get(email=request.data.get("email"))
if user:
send_reset_password_email_task.delay(user_pk=str(user.uuid))
return Response(status=status.HTTP_200_OK)
@action(detail=True, methods=("PUT",), permission_classes=[IsAuthenticated])
@method_decorator(
ratelimit(key="ip", rate="3/h" if not settings.DEBUG else "888/h")
)
def upload_avatar(self, request: Request, *args, **kwargs) -> Response:
user = self.get_object()
if request.user != user:
return Response(status=status.HTTP_403_FORBIDDEN)
if "avatar" in request.FILES:
user.avatar = request.FILES["avatar"]
user.save()
return Response(
status=status.HTTP_200_OK, data=self.serializer_class(user).data
)
return Response(status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=("POST",))
@method_decorator(
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
)
def confirm_password_reset(self, request: Request, *args, **kwargs) -> Response:
try:
if not compare_digest(
str(request.data.get("password")),
str(request.data.get("confirm_password")),
):
return Response(
{"error": _("passwords do not match")},
status=status.HTTP_400_BAD_REQUEST,
)
uuid = urlsafe_base64_decode(str(request.data.get("uidb_64"))).decode()
user = User.objects.get(pk=uuid)
validate_password(password=str(request.data.get("password")), user=user)
password_reset_token = PasswordResetTokenGenerator()
if not password_reset_token.check_token(user, request.data.get("token")):
return Response(
{"error": _("token is invalid!")},
status=status.HTTP_400_BAD_REQUEST,
)
user.set_password(request.data.get("password"))
user.save()
return Response(
{"message": _("password reset successfully")}, status=status.HTTP_200_OK
)
except (
TypeError,
ValueError,
OverflowError,
ValidationError,
User.DoesNotExist,
) as e:
data = {"error": str(e)}
if settings.DEBUG:
data["detail"] = str(traceback.format_exc())
data["received"] = str(request.data)
return Response(data, status=status.HTTP_400_BAD_REQUEST)
@method_decorator(
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
)
def create(self, request: Request, *args, **kwargs) -> Response:
email = request.data.get("email")
if email:
with suppress(User.DoesNotExist):
pending = User.objects.get(
email=email, is_active=False, is_verified=False
)
send_verification_email_task.delay(user_pk=str(pending.uuid))
return Response(
{
"detail": _(
"Account already registered but not yet activated. A new activation email has been sent."
)
},
status=status.HTTP_200_OK,
)
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
user = serializer.save()
user.save()
headers = self.get_success_headers(serializer.data)
return Response(
serializer.data, status=status.HTTP_201_CREATED, headers=headers
)
@action(detail=False, methods=("POST",))
@method_decorator(
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
)
def activate(self, request: Request) -> Response:
detail = ""
activation_error: Exception | None = None
try:
uuid = urlsafe_base64_decode(str(request.data.get("uidb_64"))).decode()
user = User.objects.get(pk=uuid)
if not user.check_token(
urlsafe_base64_decode(str(request.data.get("token"))).decode()
):
return Response(
{"error": _("activation link is invalid!")},
status=status.HTTP_400_BAD_REQUEST,
)
if user.is_active:
return Response(
{"error": _("account already activated!")},
status=status.HTTP_400_BAD_REQUEST,
)
user.is_active = True
user.is_verified = True
user.save()
except (
TypeError,
ValueError,
OverflowError,
User.DoesNotExist,
) as activation_error:
user = None
activation_error = activation_error
detail = str(traceback.format_exc())
if user is None:
if settings.DEBUG:
raise Exception from activation_error # ty:ignore[possibly-unresolved-reference]
return Response(
{"error": _("activation link is invalid!"), "detail": detail},
status=status.HTTP_400_BAD_REQUEST,
)
else:
tokens = RefreshToken.for_user(user)
response_data = self.serializer_class(user).data
response_data["refresh"] = str(tokens)
response_data["access"] = str(tokens.access_token)
return Response(response_data, status=status.HTTP_200_OK)
@action(detail=True, methods=("PUT",), permission_classes=[IsAuthenticated])
def merge_recently_viewed(self, request: Request, *args, **kwargs) -> Response:
user = self.get_object()
if request.user != user:
return Response(status=status.HTTP_403_FORBIDDEN)
serializer = MergeRecentlyViewedSerializer(
data=request.data, context=self.get_serializer_context()
)
serializer.is_valid(raise_exception=True)
for product_uuid in serializer.validated_data["product_uuids"]:
user.add_to_recently_viewed(product_uuid)
return Response(
status=status.HTTP_202_ACCEPTED, data=self.serializer_class(user).data
)
def retrieve(self, request: Request, *args, **kwargs) -> Response:
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)
def update(self, request: Request, *args, **kwargs) -> Response:
instance = self.get_object()
serializer = self.get_serializer(instance)
instance = serializer.update(
instance=self.get_object(), validated_data=request.data
)
return Response(self.get_serializer(instance).data)