import datetime import logging from django import forms from django.contrib import messages from django.contrib.auth import authenticate, login from django.http import HttpRequest, HttpResponse, HttpResponseRedirect from django.shortcuts import render from django.urls import path, reverse from django.utils.translation import gettext_lazy as _ from health_check.exceptions import HealthCheckException from unfold.sites import UnfoldAdminSite from engine.vibes_auth.utils.otp import generate_otp_code, send_admin_otp_email_task from schon.health_checks import DynamicMail logger = logging.getLogger(__name__) class OTPVerifyForm(forms.Form): code = forms.CharField( max_length=6, min_length=6, label=_("Verification code"), widget=forms.TextInput( attrs={ "autofocus": True, "autocomplete": "one-time-code", "inputmode": "numeric", "pattern": "[0-9]*", } ), ) class SchonAdminSite(UnfoldAdminSite): @staticmethod def _email_backend_configured() -> bool: """Use DynamicMail health check to verify the SMTP backend is reachable.""" from asgiref.sync import async_to_sync try: check = DynamicMail(timeout=datetime.timedelta(seconds=5)) async_to_sync(check.run)() except (HealthCheckException, Exception): return False return True def login(self, request: HttpRequest, extra_context=None) -> HttpResponse: if request.method == "POST": email = request.POST.get("username", "") password = request.POST.get("password", "") user = authenticate(request, username=email, password=password) if user is not None and user.is_staff: # ty: ignore[unresolved-attribute] if self._email_backend_configured(): code = generate_otp_code(user) send_admin_otp_email_task.delay(user_pk=str(user.pk), code=code) request.session["_2fa_user_id"] = str(user.pk) messages.info( request, _("A verification code has been sent to your email."), ) return HttpResponseRedirect(reverse("admin:verify-otp")) logger.warning( "Email backend is not configured — " "skipping OTP, logging in user %s directly.", user.pk, ) login(request, user) return HttpResponseRedirect(reverse("admin:index")) return super().login(request, extra_context) def get_urls(self): custom_urls = [ path( "verify-otp/", self.verify_otp_view, name="verify-otp", ), ] return custom_urls + super().get_urls() def verify_otp_view(self, request: HttpRequest) -> HttpResponse: from engine.vibes_auth.models import AdminOTPCode, User user_pk = request.session.get("_2fa_user_id") if not user_pk: return HttpResponseRedirect(reverse("admin:login")) try: user = User.objects.get(pk=user_pk) except User.DoesNotExist: return HttpResponseRedirect(reverse("admin:login")) form = OTPVerifyForm() error = None if request.method == "POST": form = OTPVerifyForm(request.POST) if form.is_valid(): code = form.cleaned_data["code"] otp = ( AdminOTPCode.objects.filter(user=user, code=code, is_used=False) .order_by("-created") .first() ) if otp and otp.is_valid(): otp.is_used = True otp.save(update_fields=["is_used", "modified"]) del request.session["_2fa_user_id"] login(request, user) return HttpResponseRedirect(reverse("admin:index")) else: error = _("Invalid or expired code. Please try again.") context = { **self.each_context(request), "form": form, "error": error, "title": _("Two-factor authentication"), "site_title": self.site_title, "site_header": self.site_header, } return render(request, "admin/verify_otp.html", context)