import logging import re from html import unescape from celery import shared_task from constance import config from django.conf import settings from django.core.mail import EmailMultiAlternatives from django.template import Context, Template from django.utils import timezone from django.utils.html import strip_tags from django.utils.translation import activate from engine.core.utils import get_dynamic_email_connection from engine.core.utils.markdown import render_markdown from engine.vibes_auth.emailing.choices import CampaignStatus, RecipientStatus logger = logging.getLogger(__name__) # Batch size for sending emails BATCH_SIZE = 50 def render_template_string(template_str: str, context: dict) -> str: """Render a Django template string with the given context.""" template = Template(template_str) return template.render(Context(context)) def html_to_plain_text(html: str) -> str: """Convert HTML to plain text, preserving some structure.""" # Replace
and

with newlines text = re.sub(r"", "\n", html, flags=re.IGNORECASE) text = re.sub(r"

", "\n\n", text, flags=re.IGNORECASE) text = re.sub(r"", "\n", text, flags=re.IGNORECASE) text = re.sub(r"", "\n", text, flags=re.IGNORECASE) # Strip remaining HTML tags text = strip_tags(text) # Unescape HTML entities text = unescape(text) # Clean up excessive whitespace text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r" {2,}", " ", text) return text.strip() def build_unsubscribe_url(user) -> str: """Build the unsubscribe URL for a user.""" return ( f"https://{settings.STOREFRONT_DOMAIN}/{user.language}" f"/unsubscribe?token={user.unsubscribe_token}" ) @shared_task(queue="default") def prepare_campaign_recipients(campaign_pk: str) -> tuple[bool, str]: """ Prepare recipients for a campaign by adding all subscribed users. Args: campaign_pk: The primary key of the EmailCampaign. Returns: Tuple of (success, message). """ from engine.vibes_auth.emailing.models import CampaignRecipient, EmailCampaign from engine.vibes_auth.models import User try: campaign = EmailCampaign.objects.get(pk=campaign_pk) if campaign.status != CampaignStatus.DRAFT: return False, f"Campaign is not in draft status: {campaign.status}" # Get all subscribed and active users subscribed_users = User.objects.filter( is_subscribed=True, is_active=True, ).exclude( # Exclude users already added as recipients campaign_emails__campaign=campaign ) # Create recipient records in bulk recipients_to_create = [ CampaignRecipient( campaign=campaign, user=user, status=RecipientStatus.PENDING, ) for user in subscribed_users ] CampaignRecipient.objects.bulk_create( recipients_to_create, ignore_conflicts=True ) # Update campaign statistics total = campaign.recipients.count() campaign.total_recipients = total campaign.status = CampaignStatus.SCHEDULED campaign.save(update_fields=["total_recipients", "status", "modified"]) return True, f"Prepared {total} recipients for campaign {campaign.name}" except EmailCampaign.DoesNotExist: return False, f"Campaign not found: {campaign_pk}" except Exception as e: logger.exception("Error preparing campaign recipients") return False, f"Error: {e!s}" @shared_task(queue="default", bind=True, max_retries=3) def send_campaign_email_batch( self, campaign_pk: str, offset: int = 0 ) -> tuple[bool, str]: """ Send a batch of campaign emails. This task processes emails in batches to avoid overwhelming the mail server. It automatically schedules the next batch when done. Args: campaign_pk: The primary key of the EmailCampaign. offset: Starting offset for this batch. Returns: Tuple of (success, message). """ from engine.vibes_auth.emailing.models import EmailCampaign try: campaign = EmailCampaign.objects.select_related("template").get(pk=campaign_pk) if campaign.status not in (CampaignStatus.SENDING, CampaignStatus.SCHEDULED): return False, f"Campaign status is {campaign.status}, cannot send" if not campaign.template: campaign.status = CampaignStatus.CANCELLED campaign.save(update_fields=["status", "modified"]) return False, "Campaign has no template" # Mark as sending if not already if campaign.status != CampaignStatus.SENDING: campaign.status = CampaignStatus.SENDING campaign.save(update_fields=["status", "modified"]) # Get pending recipients for this batch pending_recipients = campaign.recipients.filter( status=RecipientStatus.PENDING ).select_related("user")[offset : offset + BATCH_SIZE] if not pending_recipients: # No more recipients, mark campaign as sent campaign.status = CampaignStatus.SENT campaign.sent_at = timezone.now() campaign.save(update_fields=["status", "sent_at", "modified"]) return True, f"Campaign {campaign.name} completed" # Get email connection connection = get_dynamic_email_connection() sent_count = 0 failed_count = 0 for recipient in pending_recipients: try: send_single_campaign_email(campaign, recipient, connection) recipient.status = RecipientStatus.SENT recipient.sent_at = timezone.now() recipient.save(update_fields=["status", "sent_at", "modified"]) sent_count += 1 except Exception as e: logger.exception("Failed to send email to %s", recipient.user.email) recipient.status = RecipientStatus.FAILED recipient.error_message = str(e)[:500] recipient.save(update_fields=["status", "error_message", "modified"]) failed_count += 1 # Update campaign statistics campaign.refresh_from_db() campaign.sent_count = campaign.recipients.filter( status=RecipientStatus.SENT ).count() campaign.failed_count = campaign.recipients.filter( status=RecipientStatus.FAILED ).count() campaign.save(update_fields=["sent_count", "failed_count", "modified"]) # Schedule next batch if there are more pending remaining = campaign.recipients.filter(status=RecipientStatus.PENDING).count() if remaining > 0: send_campaign_email_batch.apply_async( args=(campaign_pk, 0), # Always start from 0 since we filter by PENDING countdown=2, # Small delay between batches ) return True, f"Sent {sent_count}, failed {failed_count}, remaining {remaining}" except EmailCampaign.DoesNotExist: return False, f"Campaign not found: {campaign_pk}" except Exception as e: logger.exception("Error sending campaign batch") # Retry on transient errors raise self.retry(exc=e, countdown=60) from e def send_single_campaign_email(campaign, recipient, connection) -> None: """ Send a single campaign email to a recipient. Args: campaign: The EmailCampaign instance. recipient: The CampaignRecipient instance. connection: The email connection to use. """ user = recipient.user template = campaign.template # Activate user's language for any translated content activate(user.language) # Build context for template rendering unsubscribe_url = build_unsubscribe_url(user) context = { "user": user, "project_name": settings.PROJECT_NAME, "unsubscribe_url": unsubscribe_url, "tracking_id": str(recipient.tracking_id), } # Render subject and content subject = render_template_string(template.subject, context) rendered_content = render_template_string(template.content, context) html_content = render_markdown(rendered_content) # Add unsubscribe footer to HTML html_content = add_unsubscribe_footer(html_content, unsubscribe_url) # Generate plain text version if template.plain_content: plain_content = render_template_string(template.plain_content, context) else: plain_content = html_to_plain_text(html_content) # Add unsubscribe info to plain text plain_content += f"\n\n---\nTo unsubscribe: {unsubscribe_url}" # Create and send email email = EmailMultiAlternatives( subject=subject, body=plain_content, from_email=f"{settings.PROJECT_NAME} <{config.EMAIL_FROM}>", to=[user.email], connection=connection, headers={ "List-Unsubscribe": f"<{unsubscribe_url}>", "List-Unsubscribe-Post": "List-Unsubscribe=One-Click", }, ) email.attach_alternative(html_content, "text/html") email.send() def add_unsubscribe_footer(html_content: str, unsubscribe_url: str) -> str: """ Add an unsubscribe footer to HTML email content. Args: html_content: The HTML content. unsubscribe_url: The unsubscribe URL. Returns: HTML content with unsubscribe footer. """ footer = f"""

You received this email because you subscribed to our newsletter.

Unsubscribe

""" # Try to insert before if present, otherwise append if "" in html_content.lower(): return re.sub( r"()", f"{footer}\\1", html_content, flags=re.IGNORECASE, ) return html_content + footer @shared_task(queue="default") def send_scheduled_campaigns() -> tuple[bool, str]: """ Check for scheduled campaigns that are due and start sending them. This task should be run periodically via Celery Beat. """ from engine.vibes_auth.emailing.models import EmailCampaign now = timezone.now() due_campaigns = EmailCampaign.objects.filter( status=CampaignStatus.SCHEDULED, scheduled_at__lte=now, is_active=True, ) started = 0 for campaign in due_campaigns: campaign.status = CampaignStatus.SENDING campaign.save(update_fields=["status", "modified"]) send_campaign_email_batch.delay(str(campaign.uuid)) started += 1 return True, f"Started {started} scheduled campaign(s)"