- Refactored monetary fields across models to use `DecimalField` for improved precision. - Implemented two-factor authentication (2FA) for admin logins with OTP codes. - Added ability to generate admin OTP via management commands. - Updated Docker Compose override for dev-specific port bindings. - Included template for 2FA OTP verification to enhance security. Additional changes: - Upgraded and downgraded various dependencies (e.g., django-celery-beat and yarl). - Replaced float-based calculations with decimal for consistent rounding behavior. - Improved admin user management commands for activation and OTP generation.
234 lines
9.1 KiB
Python
234 lines
9.1 KiB
Python
import logging
|
|
import traceback
|
|
from contextlib import suppress
|
|
from secrets import compare_digest
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.password_validation import validate_password
|
|
from django.contrib.auth.tokens import PasswordResetTokenGenerator
|
|
from django.core.exceptions import ValidationError
|
|
from django.utils.decorators import method_decorator
|
|
from django.utils.http import urlsafe_base64_decode
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django_ratelimit.decorators import ratelimit
|
|
from drf_spectacular.utils import extend_schema_view
|
|
from rest_framework import mixins, status
|
|
from rest_framework.decorators import action
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
from rest_framework.viewsets import GenericViewSet
|
|
from rest_framework_simplejwt.tokens import RefreshToken
|
|
|
|
from engine.vibes_auth.docs.drf.viewsets import USER_SCHEMA
|
|
from engine.vibes_auth.models import User
|
|
from engine.vibes_auth.serializers import (
|
|
MergeRecentlyViewedSerializer,
|
|
UserSerializer,
|
|
)
|
|
from engine.vibes_auth.utils.emailing import (
|
|
send_reset_password_email_task,
|
|
send_verification_email_task,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# noinspection PyUnusedLocal
|
|
@extend_schema_view(**USER_SCHEMA)
|
|
class UserViewSet(
|
|
mixins.CreateModelMixin,
|
|
mixins.RetrieveModelMixin,
|
|
mixins.UpdateModelMixin,
|
|
mixins.DestroyModelMixin,
|
|
GenericViewSet,
|
|
):
|
|
__doc__ = _(
|
|
"User view set implementation.\n"
|
|
"Provides a set of actions that manage user-related data such as creation, "
|
|
"retrieval, updates, deletion, and custom actions including password reset, "
|
|
"avatar upload, account activation, and recently viewed items merging. "
|
|
"This class extends the mixins and GenericViewSet for robust API handling."
|
|
)
|
|
|
|
serializer_class = UserSerializer
|
|
queryset = User.objects.filter(is_active=True)
|
|
permission_classes = [AllowAny]
|
|
|
|
@action(detail=False, methods=("POST",))
|
|
@method_decorator(
|
|
ratelimit(key="ip", rate="4/h" if not settings.DEBUG else "888/h")
|
|
)
|
|
def reset_password(self, request: Request) -> Response:
|
|
user = None
|
|
with suppress(User.DoesNotExist):
|
|
user = User.objects.get(email=request.data.get("email"))
|
|
if user:
|
|
send_reset_password_email_task.delay(user_pk=str(user.uuid))
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=("PUT",), permission_classes=[IsAuthenticated])
|
|
@method_decorator(
|
|
ratelimit(key="ip", rate="3/h" if not settings.DEBUG else "888/h")
|
|
)
|
|
def upload_avatar(self, request: Request, *args, **kwargs) -> Response:
|
|
user = self.get_object()
|
|
if request.user != user:
|
|
return Response(status=status.HTTP_403_FORBIDDEN)
|
|
if "avatar" in request.FILES:
|
|
user.avatar = request.FILES["avatar"]
|
|
user.save()
|
|
return Response(
|
|
status=status.HTTP_200_OK, data=self.serializer_class(user).data
|
|
)
|
|
return Response(status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@action(detail=False, methods=("POST",))
|
|
@method_decorator(
|
|
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
|
|
)
|
|
def confirm_password_reset(self, request: Request, *args, **kwargs) -> Response:
|
|
try:
|
|
if not compare_digest(
|
|
str(request.data.get("password")),
|
|
str(request.data.get("confirm_password")),
|
|
):
|
|
return Response(
|
|
{"error": _("passwords do not match")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
uuid = urlsafe_base64_decode(str(request.data.get("uidb_64"))).decode()
|
|
user = User.objects.get(pk=uuid)
|
|
|
|
validate_password(password=str(request.data.get("password")), user=user)
|
|
|
|
password_reset_token = PasswordResetTokenGenerator()
|
|
if not password_reset_token.check_token(user, request.data.get("token")):
|
|
return Response(
|
|
{"error": _("token is invalid!")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
user.set_password(request.data.get("password"))
|
|
user.save()
|
|
return Response(
|
|
{"message": _("password reset successfully")}, status=status.HTTP_200_OK
|
|
)
|
|
|
|
except (
|
|
TypeError,
|
|
ValueError,
|
|
OverflowError,
|
|
ValidationError,
|
|
User.DoesNotExist,
|
|
) as e:
|
|
data = {"error": str(e)}
|
|
if settings.DEBUG:
|
|
data["detail"] = str(traceback.format_exc())
|
|
data["received"] = str(request.data)
|
|
return Response(data, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@method_decorator(
|
|
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
|
|
)
|
|
def create(self, request: Request, *args, **kwargs) -> Response:
|
|
email = request.data.get("email")
|
|
if email:
|
|
with suppress(User.DoesNotExist):
|
|
pending = User.objects.get(
|
|
email=email, is_active=False, is_verified=False
|
|
)
|
|
send_verification_email_task.delay(user_pk=str(pending.uuid))
|
|
return Response(
|
|
{
|
|
"detail": _(
|
|
"Account already registered but not yet activated. A new activation email has been sent."
|
|
)
|
|
},
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
serializer = self.get_serializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
user = serializer.save()
|
|
user.save()
|
|
headers = self.get_success_headers(serializer.data)
|
|
return Response(
|
|
serializer.data, status=status.HTTP_201_CREATED, headers=headers
|
|
)
|
|
|
|
@action(detail=False, methods=("POST",))
|
|
@method_decorator(
|
|
ratelimit(key="ip", rate="5/h" if not settings.DEBUG else "888/h")
|
|
)
|
|
def activate(self, request: Request) -> Response:
|
|
detail = ""
|
|
activation_error: Exception | None = None
|
|
try:
|
|
uuid = urlsafe_base64_decode(str(request.data.get("uidb_64"))).decode()
|
|
user = User.objects.get(pk=uuid)
|
|
if not user.check_token(
|
|
urlsafe_base64_decode(str(request.data.get("token"))).decode()
|
|
):
|
|
return Response(
|
|
{"error": _("activation link is invalid!")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
if user.is_active:
|
|
return Response(
|
|
{"error": _("account already activated!")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
user.is_active = True
|
|
user.is_verified = True
|
|
user.save()
|
|
except (
|
|
TypeError,
|
|
ValueError,
|
|
OverflowError,
|
|
User.DoesNotExist,
|
|
) as activation_error:
|
|
user = None
|
|
activation_error = activation_error
|
|
detail = str(traceback.format_exc())
|
|
if user is None:
|
|
if settings.DEBUG:
|
|
raise Exception from activation_error # ty:ignore[possibly-unresolved-reference]
|
|
return Response(
|
|
{"error": _("activation link is invalid!"), "detail": detail},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
else:
|
|
tokens = RefreshToken.for_user(user)
|
|
response_data = self.serializer_class(user).data
|
|
response_data["refresh"] = str(tokens)
|
|
response_data["access"] = str(tokens.access_token)
|
|
return Response(response_data, status=status.HTTP_200_OK)
|
|
|
|
@action(detail=True, methods=("PUT",), permission_classes=[IsAuthenticated])
|
|
def merge_recently_viewed(self, request: Request, *args, **kwargs) -> Response:
|
|
user = self.get_object()
|
|
if request.user != user:
|
|
return Response(status=status.HTTP_403_FORBIDDEN)
|
|
serializer = MergeRecentlyViewedSerializer(
|
|
data=request.data, context=self.get_serializer_context()
|
|
)
|
|
serializer.is_valid(raise_exception=True)
|
|
for product_uuid in serializer.validated_data["product_uuids"]:
|
|
user.add_to_recently_viewed(product_uuid)
|
|
return Response(
|
|
status=status.HTTP_202_ACCEPTED, data=self.serializer_class(user).data
|
|
)
|
|
|
|
def retrieve(self, request: Request, *args, **kwargs) -> Response:
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance)
|
|
return Response(serializer.data)
|
|
|
|
def update(self, request: Request, *args, **kwargs) -> Response:
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance)
|
|
instance = serializer.update(
|
|
instance=self.get_object(), validated_data=request.data
|
|
)
|
|
return Response(self.get_serializer(instance).data)
|