import secrets from celery.app import shared_task from constance import config from django.conf import settings from django.core.mail import EmailMessage from engine.core.utils import get_dynamic_email_connection def generate_otp_code(user) -> str: from engine.vibes_auth.models import AdminOTPCode AdminOTPCode.objects.filter(user=user, is_used=False).update(is_used=True) code = f"{secrets.randbelow(1000000):06d}" AdminOTPCode.objects.create(user=user, code=code) return code @shared_task(queue="default") def send_admin_otp_email_task(user_pk: str, code: str) -> tuple[bool, str]: from engine.vibes_auth.models import User try: user = User.objects.get(pk=user_pk) email = EmailMessage( subject=f"{settings.PROJECT_NAME} | Admin Login Code", body=f"Your admin login code: {code}\n\nValid for 5 minutes.", from_email=f"{settings.PROJECT_NAME} <{config.EMAIL_FROM}>", to=[user.email], connection=get_dynamic_email_connection(), ) email.send() except Exception as e: return False, str(e) return True, str(user.uuid)