schon/core/utils/emailing.py
Egor fureunoir Gorbunov 0a2b4b65a0 Features: 1) Prevent duplicate "order finished" emails with added check on system_email_sent attribute; 2) Automatically update system_email_sent attribute after sending order email;
Fixes: 1) Remove unused `_` import in `drf.py`; 2) Trim redundant newline in `apply_addresses` method;

Extra: 1) Fix minor formatting in `deepl_translate.py` command; 2) Simplify `SPECTACULAR_PLATFORM_DESCRIPTION` string handling; 3) Minor refactoring and cleanup in email utility and related logic.
2025-07-16 14:54:33 +03:00

172 lines
5.1 KiB
Python

from datetime import datetime
from celery.app import shared_task
from constance import config
from django.core import mail
from django.core.mail import EmailMessage
from django.template.loader import render_to_string
from django.utils.translation import activate
from django.utils.translation import gettext_lazy as _
from core.models import Order, OrderProduct
from core.utils.constance import set_email_settings
@shared_task(queue="default")
def contact_us_email(contact_info):
set_email_settings()
connection = mail.get_connection()
email = EmailMessage(
_(f"{config.PROJECT_NAME} | contact us initiated"),
render_to_string(
"contact_us_email.html",
{
"email": contact_info.get("email"),
"name": contact_info.get("name"),
"subject": contact_info.get("subject", "Without subject"),
"phone_number": contact_info.get("phone_number", "Without phone number"),
"message": contact_info.get("message"),
"config": config,
},
),
to=[config.EMAIL_FROM],
from_email=f"{config.PROJECT_NAME} <{config.EMAIL_FROM}>",
connection=connection,
)
email.content_subtype = "html"
email.send()
return True, str(contact_info.get("email"))
@shared_task(queue="default")
def send_order_created_email(order_pk: str) -> tuple[bool, str]:
try:
order = Order.objects.get(pk=order_pk)
except Order.DoesNotExist:
return False, f"Order not found with the given pk: {order_pk}"
if not order.user:
return False, f"Order's user not found with the given pk: {order_pk}"
activate(order.user.language)
set_email_settings()
connection = mail.get_connection()
if not order.is_whole_digital:
email = EmailMessage(
_(f"{config.PROJECT_NAME} | order confirmation"),
render_to_string(
"digital_order_created_email.html" if order.is_whole_digital else "shipped_order_created_email.html",
{
"order": order,
"today": datetime.today(),
"config": config,
"total_price": order.total_price,
},
),
to=[order.user.email],
from_email=f"{config.PROJECT_NAME} <{config.EMAIL_FROM}>",
connection=connection,
)
email.content_subtype = "html"
email.send()
return True, str(order.uuid)
@shared_task(queue="default")
def send_order_finished_email(order_pk: str) -> tuple[bool, str]:
def send_digital_assets_email(ops: list[OrderProduct]):
if len(ops) <= 0:
return
if not order.user:
return
activate(order.user.language)
set_email_settings()
connection = mail.get_connection()
email = EmailMessage(
_(f"{config.PROJECT_NAME} | order delivered"),
render_to_string(
template_name="digital_order_delivered_email.html",
context={
"order_uuid": order.human_readable_id,
"user_first_name": "" or order.user.first_name,
"order_products": ops,
"project_name": config.PROJECT_NAME,
"contact_email": config.EMAIL_FROM,
"total_price": round(sum(0.0 or op.buy_price for op in ops), 2), # type: ignore [misc]
"display_system_attributes": order.user.has_perm("core.view_order"),
"today": datetime.today(),
},
),
to=[order.user.email],
from_email=f"{config.PROJECT_NAME} <{config.EMAIL_FROM}>",
connection=connection,
)
email.content_subtype = "html"
email.send()
def send_thank_you_email(ops: list[OrderProduct]):
if ops:
pass
if not order.user:
return
activate(order.user.language)
set_email_settings()
pass
try:
order = Order.objects.get(pk=order_pk)
except Order.DoesNotExist:
return False, f"Order not found with the given pk: {order_pk}"
if not order.user:
return False, f"Order's user not found with the given pk: {order_pk}"
if order.attributes.get("system_email_sent"):
return True, str(order.uuid)
digital_ops = []
for digital_op in order.order_products.filter(
product__is_digital=True,
status__in=[
"FINISHED",
"DELIVERED",
"ACCEPTED",
],
):
digital_ops.append(digital_op)
send_digital_assets_email(digital_ops)
shipped_ops = []
for shipped_op in order.order_products.filter(
product__is_digital=False,
status__in=[
"FINISHED",
"DELIVERED",
"ACCEPTED",
],
):
shipped_ops.append(shipped_op)
send_thank_you_email(shipped_ops)
if type(order.attributes) is not dict:
order.attributes = {}
order.attributes["system_email_sent"] = True
order.save()
return True, str(order.uuid)