316 lines
11 KiB
Python
316 lines
11 KiB
Python
import logging
|
|
import re
|
|
from html import unescape
|
|
|
|
from celery import shared_task
|
|
from constance import config
|
|
from django.conf import settings
|
|
from django.core.mail import EmailMultiAlternatives
|
|
from django.template import Context, Template
|
|
from django.utils import timezone
|
|
from django.utils.html import strip_tags
|
|
from django.utils.translation import activate
|
|
|
|
from engine.core.utils import get_dynamic_email_connection
|
|
from engine.vibes_auth.emailing.choices import CampaignStatus, RecipientStatus
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Batch size for sending emails
|
|
BATCH_SIZE = 50
|
|
|
|
|
|
def render_template_string(template_str: str, context: dict) -> str:
|
|
"""Render a Django template string with the given context."""
|
|
template = Template(template_str)
|
|
return template.render(Context(context))
|
|
|
|
|
|
def html_to_plain_text(html: str) -> str:
|
|
"""Convert HTML to plain text, preserving some structure."""
|
|
# Replace <br> and </p> with newlines
|
|
text = re.sub(r"<br\s*/?>", "\n", html, flags=re.IGNORECASE)
|
|
text = re.sub(r"</p>", "\n\n", text, flags=re.IGNORECASE)
|
|
text = re.sub(r"</div>", "\n", text, flags=re.IGNORECASE)
|
|
text = re.sub(r"</li>", "\n", text, flags=re.IGNORECASE)
|
|
# Strip remaining HTML tags
|
|
text = strip_tags(text)
|
|
# Unescape HTML entities
|
|
text = unescape(text)
|
|
# Clean up excessive whitespace
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
text = re.sub(r" {2,}", " ", text)
|
|
return text.strip()
|
|
|
|
|
|
def build_unsubscribe_url(user) -> str:
|
|
"""Build the unsubscribe URL for a user."""
|
|
return (
|
|
f"https://{settings.STOREFRONT_DOMAIN}/{user.language}"
|
|
f"/unsubscribe?token={user.unsubscribe_token}"
|
|
)
|
|
|
|
|
|
@shared_task(queue="default")
|
|
def prepare_campaign_recipients(campaign_pk: str) -> tuple[bool, str]:
|
|
"""
|
|
Prepare recipients for a campaign by adding all subscribed users.
|
|
|
|
Args:
|
|
campaign_pk: The primary key of the EmailCampaign.
|
|
|
|
Returns:
|
|
Tuple of (success, message).
|
|
"""
|
|
from engine.vibes_auth.emailing.models import CampaignRecipient, EmailCampaign
|
|
from engine.vibes_auth.models import User
|
|
|
|
try:
|
|
campaign = EmailCampaign.objects.get(pk=campaign_pk)
|
|
|
|
if campaign.status != CampaignStatus.DRAFT:
|
|
return False, f"Campaign is not in draft status: {campaign.status}"
|
|
|
|
# Get all subscribed and active users
|
|
subscribed_users = User.objects.filter(
|
|
is_subscribed=True,
|
|
is_active=True,
|
|
).exclude(
|
|
# Exclude users already added as recipients
|
|
campaign_emails__campaign=campaign
|
|
)
|
|
|
|
# Create recipient records in bulk
|
|
recipients_to_create = [
|
|
CampaignRecipient(
|
|
campaign=campaign,
|
|
user=user,
|
|
status=RecipientStatus.PENDING,
|
|
)
|
|
for user in subscribed_users
|
|
]
|
|
|
|
CampaignRecipient.objects.bulk_create(
|
|
recipients_to_create, ignore_conflicts=True
|
|
)
|
|
|
|
# Update campaign statistics
|
|
total = campaign.recipients.count()
|
|
campaign.total_recipients = total
|
|
campaign.status = CampaignStatus.SCHEDULED
|
|
campaign.save(update_fields=["total_recipients", "status", "modified"])
|
|
|
|
return True, f"Prepared {total} recipients for campaign {campaign.name}"
|
|
|
|
except EmailCampaign.DoesNotExist:
|
|
return False, f"Campaign not found: {campaign_pk}"
|
|
except Exception as e:
|
|
logger.exception("Error preparing campaign recipients")
|
|
return False, f"Error: {e!s}"
|
|
|
|
|
|
@shared_task(queue="default", bind=True, max_retries=3)
|
|
def send_campaign_email_batch(
|
|
self, campaign_pk: str, offset: int = 0
|
|
) -> tuple[bool, str]:
|
|
"""
|
|
Send a batch of campaign emails.
|
|
|
|
This task processes emails in batches to avoid overwhelming the mail server.
|
|
It automatically schedules the next batch when done.
|
|
|
|
Args:
|
|
campaign_pk: The primary key of the EmailCampaign.
|
|
offset: Starting offset for this batch.
|
|
|
|
Returns:
|
|
Tuple of (success, message).
|
|
"""
|
|
from engine.vibes_auth.emailing.models import EmailCampaign
|
|
|
|
try:
|
|
campaign = EmailCampaign.objects.select_related("template").get(pk=campaign_pk)
|
|
|
|
if campaign.status not in (CampaignStatus.SENDING, CampaignStatus.SCHEDULED):
|
|
return False, f"Campaign status is {campaign.status}, cannot send"
|
|
|
|
if not campaign.template:
|
|
campaign.status = CampaignStatus.CANCELLED
|
|
campaign.save(update_fields=["status", "modified"])
|
|
return False, "Campaign has no template"
|
|
|
|
# Mark as sending if not already
|
|
if campaign.status != CampaignStatus.SENDING:
|
|
campaign.status = CampaignStatus.SENDING
|
|
campaign.save(update_fields=["status", "modified"])
|
|
|
|
# Get pending recipients for this batch
|
|
pending_recipients = campaign.recipients.filter(
|
|
status=RecipientStatus.PENDING
|
|
).select_related("user")[offset : offset + BATCH_SIZE]
|
|
|
|
if not pending_recipients:
|
|
# No more recipients, mark campaign as sent
|
|
campaign.status = CampaignStatus.SENT
|
|
campaign.sent_at = timezone.now()
|
|
campaign.save(update_fields=["status", "sent_at", "modified"])
|
|
return True, f"Campaign {campaign.name} completed"
|
|
|
|
# Get email connection
|
|
connection = get_dynamic_email_connection()
|
|
|
|
sent_count = 0
|
|
failed_count = 0
|
|
|
|
for recipient in pending_recipients:
|
|
try:
|
|
send_single_campaign_email(campaign, recipient, connection)
|
|
recipient.status = RecipientStatus.SENT
|
|
recipient.sent_at = timezone.now()
|
|
recipient.save(update_fields=["status", "sent_at", "modified"])
|
|
sent_count += 1
|
|
except Exception as e:
|
|
logger.exception("Failed to send email to %s", recipient.user.email)
|
|
recipient.status = RecipientStatus.FAILED
|
|
recipient.error_message = str(e)[:500]
|
|
recipient.save(update_fields=["status", "error_message", "modified"])
|
|
failed_count += 1
|
|
|
|
# Update campaign statistics
|
|
campaign.refresh_from_db()
|
|
campaign.sent_count = campaign.recipients.filter(
|
|
status=RecipientStatus.SENT
|
|
).count()
|
|
campaign.failed_count = campaign.recipients.filter(
|
|
status=RecipientStatus.FAILED
|
|
).count()
|
|
campaign.save(update_fields=["sent_count", "failed_count", "modified"])
|
|
|
|
# Schedule next batch if there are more pending
|
|
remaining = campaign.recipients.filter(status=RecipientStatus.PENDING).count()
|
|
if remaining > 0:
|
|
send_campaign_email_batch.apply_async(
|
|
args=(campaign_pk, 0), # Always start from 0 since we filter by PENDING
|
|
countdown=2, # Small delay between batches
|
|
)
|
|
|
|
return True, f"Sent {sent_count}, failed {failed_count}, remaining {remaining}"
|
|
|
|
except EmailCampaign.DoesNotExist:
|
|
return False, f"Campaign not found: {campaign_pk}"
|
|
except Exception as e:
|
|
logger.exception("Error sending campaign batch")
|
|
# Retry on transient errors
|
|
raise self.retry(exc=e, countdown=60) from e
|
|
|
|
|
|
def send_single_campaign_email(campaign, recipient, connection) -> None:
|
|
"""
|
|
Send a single campaign email to a recipient.
|
|
|
|
Args:
|
|
campaign: The EmailCampaign instance.
|
|
recipient: The CampaignRecipient instance.
|
|
connection: The email connection to use.
|
|
"""
|
|
user = recipient.user
|
|
template = campaign.template
|
|
|
|
# Activate user's language for any translated content
|
|
activate(user.language)
|
|
|
|
# Build context for template rendering
|
|
unsubscribe_url = build_unsubscribe_url(user)
|
|
context = {
|
|
"user": user,
|
|
"project_name": settings.PROJECT_NAME,
|
|
"unsubscribe_url": unsubscribe_url,
|
|
"tracking_id": str(recipient.tracking_id),
|
|
}
|
|
|
|
# Render subject and content
|
|
subject = render_template_string(template.subject, context)
|
|
html_content = render_template_string(template.html_content, context)
|
|
|
|
# Add unsubscribe footer to HTML
|
|
html_content = add_unsubscribe_footer(html_content, unsubscribe_url)
|
|
|
|
# Generate plain text version
|
|
if template.plain_content:
|
|
plain_content = render_template_string(template.plain_content, context)
|
|
else:
|
|
plain_content = html_to_plain_text(html_content)
|
|
|
|
# Add unsubscribe info to plain text
|
|
plain_content += f"\n\n---\nTo unsubscribe: {unsubscribe_url}"
|
|
|
|
# Create and send email
|
|
email = EmailMultiAlternatives(
|
|
subject=subject,
|
|
body=plain_content,
|
|
from_email=f"{settings.PROJECT_NAME} <{config.EMAIL_FROM}>",
|
|
to=[user.email],
|
|
connection=connection,
|
|
headers={
|
|
"List-Unsubscribe": f"<{unsubscribe_url}>",
|
|
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
|
|
},
|
|
)
|
|
email.attach_alternative(html_content, "text/html")
|
|
email.send()
|
|
|
|
|
|
def add_unsubscribe_footer(html_content: str, unsubscribe_url: str) -> str:
|
|
"""
|
|
Add an unsubscribe footer to HTML email content.
|
|
|
|
Args:
|
|
html_content: The HTML content.
|
|
unsubscribe_url: The unsubscribe URL.
|
|
|
|
Returns:
|
|
HTML content with unsubscribe footer.
|
|
"""
|
|
footer = f"""
|
|
<div style="margin-top: 30px; padding-top: 20px; border-top: 1px solid #eee;
|
|
font-size: 12px; color: #666; text-align: center;">
|
|
<p>You received this email because you subscribed to our newsletter.</p>
|
|
<p><a href="{unsubscribe_url}" style="color: #666;">Unsubscribe</a></p>
|
|
</div>
|
|
"""
|
|
|
|
# Try to insert before </body> if present, otherwise append
|
|
if "</body>" in html_content.lower():
|
|
return re.sub(
|
|
r"(</body>)",
|
|
f"{footer}\\1",
|
|
html_content,
|
|
flags=re.IGNORECASE,
|
|
)
|
|
return html_content + footer
|
|
|
|
|
|
@shared_task(queue="default")
|
|
def send_scheduled_campaigns() -> tuple[bool, str]:
|
|
"""
|
|
Check for scheduled campaigns that are due and start sending them.
|
|
|
|
This task should be run periodically via Celery Beat.
|
|
"""
|
|
from engine.vibes_auth.emailing.models import EmailCampaign
|
|
|
|
now = timezone.now()
|
|
due_campaigns = EmailCampaign.objects.filter(
|
|
status=CampaignStatus.SCHEDULED,
|
|
scheduled_at__lte=now,
|
|
is_active=True,
|
|
)
|
|
|
|
started = 0
|
|
for campaign in due_campaigns:
|
|
campaign.status = CampaignStatus.SENDING
|
|
campaign.save(update_fields=["status", "modified"])
|
|
send_campaign_email_batch.delay(str(campaign.uuid))
|
|
started += 1
|
|
|
|
return True, f"Started {started} scheduled campaign(s)"
|