schon/engine/vibes_auth/emailing/tasks.py
Egor fureunoir Gorbunov eef774c3a3 feat(markdown): integrate markdown rendering and editor support
Replace WYSIWYG editor with Markdown editor across all relevant models and admin fields. Add utilities for rendering and stripping markdown. Adjust serializers, views, and templates to support markdown content. Introduce `PastedImage` model and upload endpoint for handling inline image uploads in markdown.

This change simplifies content formatting while enhancing flexibility with markdown support.
2026-02-27 23:36:51 +03:00

318 lines
11 KiB
Python

import logging
import re
from html import unescape
from celery import shared_task
from constance import config
from django.conf import settings
from django.core.mail import EmailMultiAlternatives
from django.template import Context, Template
from django.utils import timezone
from django.utils.html import strip_tags
from django.utils.translation import activate
from engine.core.utils import get_dynamic_email_connection
from engine.core.utils.markdown import render_markdown
from engine.vibes_auth.emailing.choices import CampaignStatus, RecipientStatus
logger = logging.getLogger(__name__)
# Batch size for sending emails
BATCH_SIZE = 50
def render_template_string(template_str: str, context: dict) -> str:
"""Render a Django template string with the given context."""
template = Template(template_str)
return template.render(Context(context))
def html_to_plain_text(html: str) -> str:
"""Convert HTML to plain text, preserving some structure."""
# Replace <br> and </p> with newlines
text = re.sub(r"<br\s*/?>", "\n", html, flags=re.IGNORECASE)
text = re.sub(r"</p>", "\n\n", text, flags=re.IGNORECASE)
text = re.sub(r"</div>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"</li>", "\n", text, flags=re.IGNORECASE)
# Strip remaining HTML tags
text = strip_tags(text)
# Unescape HTML entities
text = unescape(text)
# Clean up excessive whitespace
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r" {2,}", " ", text)
return text.strip()
def build_unsubscribe_url(user) -> str:
"""Build the unsubscribe URL for a user."""
return (
f"https://{settings.STOREFRONT_DOMAIN}/{user.language}"
f"/unsubscribe?token={user.unsubscribe_token}"
)
@shared_task(queue="default")
def prepare_campaign_recipients(campaign_pk: str) -> tuple[bool, str]:
"""
Prepare recipients for a campaign by adding all subscribed users.
Args:
campaign_pk: The primary key of the EmailCampaign.
Returns:
Tuple of (success, message).
"""
from engine.vibes_auth.emailing.models import CampaignRecipient, EmailCampaign
from engine.vibes_auth.models import User
try:
campaign = EmailCampaign.objects.get(pk=campaign_pk)
if campaign.status != CampaignStatus.DRAFT:
return False, f"Campaign is not in draft status: {campaign.status}"
# Get all subscribed and active users
subscribed_users = User.objects.filter(
is_subscribed=True,
is_active=True,
).exclude(
# Exclude users already added as recipients
campaign_emails__campaign=campaign
)
# Create recipient records in bulk
recipients_to_create = [
CampaignRecipient(
campaign=campaign,
user=user,
status=RecipientStatus.PENDING,
)
for user in subscribed_users
]
CampaignRecipient.objects.bulk_create(
recipients_to_create, ignore_conflicts=True
)
# Update campaign statistics
total = campaign.recipients.count()
campaign.total_recipients = total
campaign.status = CampaignStatus.SCHEDULED
campaign.save(update_fields=["total_recipients", "status", "modified"])
return True, f"Prepared {total} recipients for campaign {campaign.name}"
except EmailCampaign.DoesNotExist:
return False, f"Campaign not found: {campaign_pk}"
except Exception as e:
logger.exception("Error preparing campaign recipients")
return False, f"Error: {e!s}"
@shared_task(queue="default", bind=True, max_retries=3)
def send_campaign_email_batch(
self, campaign_pk: str, offset: int = 0
) -> tuple[bool, str]:
"""
Send a batch of campaign emails.
This task processes emails in batches to avoid overwhelming the mail server.
It automatically schedules the next batch when done.
Args:
campaign_pk: The primary key of the EmailCampaign.
offset: Starting offset for this batch.
Returns:
Tuple of (success, message).
"""
from engine.vibes_auth.emailing.models import EmailCampaign
try:
campaign = EmailCampaign.objects.select_related("template").get(pk=campaign_pk)
if campaign.status not in (CampaignStatus.SENDING, CampaignStatus.SCHEDULED):
return False, f"Campaign status is {campaign.status}, cannot send"
if not campaign.template:
campaign.status = CampaignStatus.CANCELLED
campaign.save(update_fields=["status", "modified"])
return False, "Campaign has no template"
# Mark as sending if not already
if campaign.status != CampaignStatus.SENDING:
campaign.status = CampaignStatus.SENDING
campaign.save(update_fields=["status", "modified"])
# Get pending recipients for this batch
pending_recipients = campaign.recipients.filter(
status=RecipientStatus.PENDING
).select_related("user")[offset : offset + BATCH_SIZE]
if not pending_recipients:
# No more recipients, mark campaign as sent
campaign.status = CampaignStatus.SENT
campaign.sent_at = timezone.now()
campaign.save(update_fields=["status", "sent_at", "modified"])
return True, f"Campaign {campaign.name} completed"
# Get email connection
connection = get_dynamic_email_connection()
sent_count = 0
failed_count = 0
for recipient in pending_recipients:
try:
send_single_campaign_email(campaign, recipient, connection)
recipient.status = RecipientStatus.SENT
recipient.sent_at = timezone.now()
recipient.save(update_fields=["status", "sent_at", "modified"])
sent_count += 1
except Exception as e:
logger.exception("Failed to send email to %s", recipient.user.email)
recipient.status = RecipientStatus.FAILED
recipient.error_message = str(e)[:500]
recipient.save(update_fields=["status", "error_message", "modified"])
failed_count += 1
# Update campaign statistics
campaign.refresh_from_db()
campaign.sent_count = campaign.recipients.filter(
status=RecipientStatus.SENT
).count()
campaign.failed_count = campaign.recipients.filter(
status=RecipientStatus.FAILED
).count()
campaign.save(update_fields=["sent_count", "failed_count", "modified"])
# Schedule next batch if there are more pending
remaining = campaign.recipients.filter(status=RecipientStatus.PENDING).count()
if remaining > 0:
send_campaign_email_batch.apply_async(
args=(campaign_pk, 0), # Always start from 0 since we filter by PENDING
countdown=2, # Small delay between batches
)
return True, f"Sent {sent_count}, failed {failed_count}, remaining {remaining}"
except EmailCampaign.DoesNotExist:
return False, f"Campaign not found: {campaign_pk}"
except Exception as e:
logger.exception("Error sending campaign batch")
# Retry on transient errors
raise self.retry(exc=e, countdown=60) from e
def send_single_campaign_email(campaign, recipient, connection) -> None:
"""
Send a single campaign email to a recipient.
Args:
campaign: The EmailCampaign instance.
recipient: The CampaignRecipient instance.
connection: The email connection to use.
"""
user = recipient.user
template = campaign.template
# Activate user's language for any translated content
activate(user.language)
# Build context for template rendering
unsubscribe_url = build_unsubscribe_url(user)
context = {
"user": user,
"project_name": settings.PROJECT_NAME,
"unsubscribe_url": unsubscribe_url,
"tracking_id": str(recipient.tracking_id),
}
# Render subject and content
subject = render_template_string(template.subject, context)
rendered_content = render_template_string(template.content, context)
html_content = render_markdown(rendered_content)
# Add unsubscribe footer to HTML
html_content = add_unsubscribe_footer(html_content, unsubscribe_url)
# Generate plain text version
if template.plain_content:
plain_content = render_template_string(template.plain_content, context)
else:
plain_content = html_to_plain_text(html_content)
# Add unsubscribe info to plain text
plain_content += f"\n\n---\nTo unsubscribe: {unsubscribe_url}"
# Create and send email
email = EmailMultiAlternatives(
subject=subject,
body=plain_content,
from_email=f"{settings.PROJECT_NAME} <{config.EMAIL_FROM}>",
to=[user.email],
connection=connection,
headers={
"List-Unsubscribe": f"<{unsubscribe_url}>",
"List-Unsubscribe-Post": "List-Unsubscribe=One-Click",
},
)
email.attach_alternative(html_content, "text/html")
email.send()
def add_unsubscribe_footer(html_content: str, unsubscribe_url: str) -> str:
"""
Add an unsubscribe footer to HTML email content.
Args:
html_content: The HTML content.
unsubscribe_url: The unsubscribe URL.
Returns:
HTML content with unsubscribe footer.
"""
footer = f"""
<div style="margin-top: 30px; padding-top: 20px; border-top: 1px solid #eee;
font-size: 12px; color: #666; text-align: center;">
<p>You received this email because you subscribed to our newsletter.</p>
<p><a href="{unsubscribe_url}" style="color: #666;">Unsubscribe</a></p>
</div>
"""
# Try to insert before </body> if present, otherwise append
if "</body>" in html_content.lower():
return re.sub(
r"(</body>)",
f"{footer}\\1",
html_content,
flags=re.IGNORECASE,
)
return html_content + footer
@shared_task(queue="default")
def send_scheduled_campaigns() -> tuple[bool, str]:
"""
Check for scheduled campaigns that are due and start sending them.
This task should be run periodically via Celery Beat.
"""
from engine.vibes_auth.emailing.models import EmailCampaign
now = timezone.now()
due_campaigns = EmailCampaign.objects.filter(
status=CampaignStatus.SCHEDULED,
scheduled_at__lte=now,
is_active=True,
)
started = 0
for campaign in due_campaigns:
campaign.status = CampaignStatus.SENDING
campaign.save(update_fields=["status", "modified"])
send_campaign_email_batch.delay(str(campaign.uuid))
started += 1
return True, f"Started {started} scheduled campaign(s)"