schon/engine/vibes_auth/emailing/tasks.py
2026-01-26 03:23:41 +03:00

316 lines
11 KiB
Python

import logging
import re
from html import unescape
from celery import shared_task
from constance import config
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.template import Context, Template
from django.utils import timezone
from django.utils.html import strip_tags
from django.utils.translation import activate
from engine.core.utils import get_dynamic_email_connection
from engine.vibes_auth.emailing.choices import CampaignStatus, RecipientStatus
logger = logging.getLogger(__name__)
# Batch size for sending emails
BATCH_SIZE = 50
def render_template_string(template_str: str, context: dict) -> str:
"""Render a Django template string with the given context."""
template = Template(template_str)
return template.render(Context(context))
def html_to_plain_text(html: str) -> str:
"""Convert HTML to plain text, preserving some structure."""
# Replace <br> and </p> with newlines
text = re.sub(r"<br\s*/?>", "\n", html, flags=re.IGNORECASE)
text = re.sub(r"</p>", "\n\n", text, flags=re.IGNORECASE)
text = re.sub(r"</div>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"</li>", "\n", text, flags=re.IGNORECASE)
# Strip remaining HTML tags
text = strip_tags(text)
# Unescape HTML entities
text = unescape(text)
# Clean up excessive whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r" {2,}", " ", text)
return text.strip()
def build_unsubscribe_url(user) -> str:
"""Build the unsubscribe URL for a user."""
return (
f"https://{settings.STOREFRONT_DOMAIN}/{user.language}"
f"/unsubscribe?token={user.unsubscribe_token}"
)
@shared_task(queue="default")
def prepare_campaign_recipients(campaign_pk: str) -> tuple[bool, str]:
"""
Prepare recipients for a campaign by adding all subscribed users.
Args:
campaign_pk: The primary key of the EmailCampaign.
Returns:
Tuple of (success, message).
"""
from engine.vibes_auth.emailing.models import CampaignRecipient, EmailCampaign
from engine.vibes_auth.models import User
try:
campaign = EmailCampaign.objects.get(pk=campaign_pk)
if campaign.status != CampaignStatus.DRAFT:
return False, f"Campaign is not in draft status: {campaign.status}"
# Get all subscribed and active users
subscribed_users = User.objects.filter(
is_subscribed=True,
is_active=True,
).exclude(
# Exclude users already added as recipients
campaign_emails__campaign=campaign
)
# Create recipient records in bulk
recipients_to_create = [
CampaignRecipient(
campaign=campaign,
user=user,
status=RecipientStatus.PENDING,
)
for user in subscribed_users
]
CampaignRecipient.objects.bulk_create(
recipients_to_create, ignore_conflicts=True
)
# Update campaign statistics
total = campaign.recipients.count()
campaign.total_recipients = total
campaign.status = CampaignStatus.SCHEDULED
campaign.save(update_fields=["total_recipients", "status", "modified"])
return True, f"Prepared {total} recipients for campaign {campaign.name}"
except EmailCampaign.DoesNotExist:
return False, f"Campaign not found: {campaign_pk}"
except Exception as e:
logger.exception("Error preparing campaign recipients")
return False, f"Error: {e!s}"
@shared_task(queue="default", bind=True, max_retries=3)
def send_campaign_email_batch(
self, campaign_pk: str, offset: int = 0
) -> tuple[bool, str]:
"""
Send a batch of campaign emails.
This task processes emails in batches to avoid overwhelming the mail server.
It automatically schedules the next batch when done.
Args:
campaign_pk: The primary key of the EmailCampaign.
offset: Starting offset for this batch.
Returns:
Tuple of (success, message).
"""
from engine.vibes_auth.emailing.models import EmailCampaign
try:
campaign = EmailCampaign.objects.select_related("template").get(pk=campaign_pk)
if campaign.status not in (CampaignStatus.SENDING, CampaignStatus.SCHEDULED):
return False, f"Campaign status is {campaign.status}, cannot send"
if not campaign.template:
campaign.status = CampaignStatus.CANCELLED
campaign.save(update_fields=["status", "modified"])
return False, "Campaign has no template"
# Mark as sending if not already
if campaign.status != CampaignStatus.SENDING:
campaign.status = CampaignStatus.SENDING
campaign.save(update_fields=["status", "modified"])
# Get pending recipients for this batch
pending_recipients = campaign.recipients.filter(
status=RecipientStatus.PENDING
).select_related("user")[offset : offset + BATCH_SIZE]
if not pending_recipients:
# No more recipients, mark campaign as sent
campaign.status = CampaignStatus.SENT
campaign.sent_at = timezone.now()
campaign.save(update_fields=["status", "sent_at", "modified"])
return True, f"Campaign {campaign.name} completed"
# Get email connection
connection = get_dynamic_email_connection()
sent_count = 0
failed_count = 0
for recipient in pending_recipients:
try:
send_single_campaign_email(campaign, recipient, connection)
recipient.status = RecipientStatus.SENT
recipient.sent_at = timezone.now()
recipient.save(update_fields=["status", "sent_at", "modified"])
sent_count += 1
except Exception as e:
logger.exception("Failed to send email to %s", recipient.user.email)
recipient.status = RecipientStatus.FAILED
recipient.error_message = str(e)[:500]
recipient.save(update_fields=["status", "error_message", "modified"])
failed_count += 1
# Update campaign statistics
campaign.refresh_from_db()
campaign.sent_count = campaign.recipients.filter(
status=RecipientStatus.SENT
).count()
campaign.failed_count = campaign.recipients.filter(
status=RecipientStatus.FAILED
).count()
campaign.save(update_fields=["sent_count", "failed_count", "modified"])
# Schedule next batch if there are more pending
remaining = campaign.recipients.filter(status=RecipientStatus.PENDING).count()
if remaining > 0:
send_campaign_email_batch.apply_async(
args=(campaign_pk, 0), # Always start from 0 since we filter by PENDING
countdown=2, # Small delay between batches
)
return True, f"Sent {sent_count}, failed {failed_count}, remaining {remaining}"
except EmailCampaign.DoesNotExist:
return False, f"Campaign not found: {campaign_pk}"
except Exception as e:
logger.exception("Error sending campaign batch")
# Retry on transient errors
raise self.retry(exc=e, countdown=60) from e
def send_single_campaign_email(campaign, recipient, connection) -> None:
"""
Send a single campaign email to a recipient.
Args:
campaign: The EmailCampaign instance.
recipient: The CampaignRecipient instance.
connection: The email connection to use.
"""
user = recipient.user
template = campaign.template
# Activate user's language for any translated content
activate(user.language)
# Build context for template rendering
unsubscribe_url = build_unsubscribe_url(user)
context = {
"user": user,
"project_name": settings.PROJECT_NAME,
"unsubscribe_url": unsubscribe_url,
"tracking_id": str(recipient.tracking_id),
}
# Render subject and content
subject = render_template_string(template.subject, context)
html_content = render_template_string(template.html_content, context)
# Add unsubscribe footer to HTML
html_content = add_unsubscribe_footer(html_content, unsubscribe_url)
# Generate plain text version
if template.plain_content:
plain_content = render_template_string(template.plain_content, context)
else:
plain_content = html_to_plain_text(html_content)
# Add unsubscribe info to plain text
plain_content += f"\n\n---\nTo unsubscribe: {unsubscribe_url}"
# Create and send email
email = EmailMultiAlternatives(
subject=subject,
body=plain_content,
from_email=f"{settings.PROJECT_NAME} <{config.EMAIL_FROM}>",
to=[user.email],
connection=connection,
headers={
"List-Unsubscribe": f"<{unsubscribe_url}>",
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
},
)
email.attach_alternative(html_content, "text/html")
email.send()
def add_unsubscribe_footer(html_content: str, unsubscribe_url: str) -> str:
"""
Add an unsubscribe footer to HTML email content.
Args:
html_content: The HTML content.
unsubscribe_url: The unsubscribe URL.
Returns:
HTML content with unsubscribe footer.
"""
footer = f"""
<div style="margin-top: 30px; padding-top: 20px; border-top: 1px solid #eee;
font-size: 12px; color: #666; text-align: center;">
<p>You received this email because you subscribed to our newsletter.</p>
<p><a href="{unsubscribe_url}" style="color: #666;">Unsubscribe</a></p>
</div>
"""
# Try to insert before </body> if present, otherwise append
if "</body>" in html_content.lower():
return re.sub(
r"(</body>)",
f"{footer}\\1",
html_content,
flags=re.IGNORECASE,
)
return html_content + footer
@shared_task(queue="default")
def send_scheduled_campaigns() -> tuple[bool, str]:
"""
Check for scheduled campaigns that are due and start sending them.
This task should be run periodically via Celery Beat.
"""
from engine.vibes_auth.emailing.models import EmailCampaign
now = timezone.now()
due_campaigns = EmailCampaign.objects.filter(
status=CampaignStatus.SCHEDULED,
scheduled_at__lte=now,
is_active=True,
)
started = 0
for campaign in due_campaigns:
campaign.status = CampaignStatus.SENDING
campaign.save(update_fields=["status", "modified"])
send_campaign_email_batch.delay(str(campaign.uuid))
started += 1
return True, f"Started {started} scheduled campaign(s)"