import logging from django import forms from django.contrib import messages from django.contrib.auth import authenticate, login from django.http import HttpRequest, HttpResponse, HttpResponseRedirect from django.shortcuts import render from django.urls import path, reverse from django.utils.translation import gettext_lazy as _ from unfold.sites import UnfoldAdminSite from engine.vibes_auth.utils.otp import generate_otp_code, send_admin_otp_email_task logger = logging.getLogger(__name__) class OTPVerifyForm(forms.Form): code = forms.CharField( max_length=6, min_length=6, label=_("Verification code"), widget=forms.TextInput( attrs={ "autofocus": True, "autocomplete": "one-time-code", "inputmode": "numeric", "pattern": "[0-9]*", } ), ) class SchonAdminSite(UnfoldAdminSite): def login(self, request: HttpRequest, extra_context=None) -> HttpResponse: if request.method == "POST": email = request.POST.get("username", "") password = request.POST.get("password", "") user = authenticate(request, username=email, password=password) if user is not None and user.is_staff: # ty: ignore[unresolved-attribute] code = generate_otp_code(user) send_admin_otp_email_task.delay(user_pk=str(user.pk), code=code) request.session["_2fa_user_id"] = str(user.pk) messages.info( request, _("A verification code has been sent to your email.") ) return HttpResponseRedirect(reverse("admin:verify-otp")) return super().login(request, extra_context) def extra_urls(self): return [ path( "verify-otp/", self.verify_otp_view, name="verify-otp", ), ] def verify_otp_view(self, request: HttpRequest) -> HttpResponse: from engine.vibes_auth.models import AdminOTPCode, User user_pk = request.session.get("_2fa_user_id") if not user_pk: return HttpResponseRedirect(reverse("admin:login")) try: user = User.objects.get(pk=user_pk) except User.DoesNotExist: return HttpResponseRedirect(reverse("admin:login")) form = OTPVerifyForm() error = None if request.method == "POST": form = OTPVerifyForm(request.POST) if form.is_valid(): code = form.cleaned_data["code"] otp = ( AdminOTPCode.objects.filter(user=user, code=code, is_used=False) .order_by("-created") .first() ) if otp and otp.is_valid(): otp.is_used = True otp.save(update_fields=["is_used", "modified"]) del request.session["_2fa_user_id"] login(request, user) return HttpResponseRedirect(reverse("admin:index")) else: error = _("Invalid or expired code. Please try again.") context = { **self.each_context(request), "form": form, "error": error, "title": _("Two-factor authentication"), "site_title": self.site_title, "site_header": self.site_header, } return render(request, "admin/verify_otp.html", context)