schon/engine/vibes_auth/messaging/services.py

163 lines
5.4 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.core.exceptions import PermissionDenied, ValidationError
from django.db import models
from django.db.models import Count
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from engine.vibes_auth.messaging.models import ChatMessage, ChatThread, SenderType, ThreadStatus
from engine.vibes_auth.models import User
THREAD_GROUP_PREFIX = "thread:"
STAFF_INBOX_GROUP = "staff:inbox"
@dataclass
class MessageDTO:
id: str
thread: str
text: str
sender_type: str
sender_user: str | None
sent_at: str
def _broadcast_thread_event(thread: ChatThread, event: dict) -> None:
layer = get_channel_layer()
async_to_sync(layer.group_send)(f"{THREAD_GROUP_PREFIX}{thread.uuid}", event)
def _broadcast_staff_inbox(event: dict) -> None:
layer = get_channel_layer()
async_to_sync(layer.group_send)(STAFF_INBOX_GROUP, event)
def get_or_create_user_thread(user: User) -> ChatThread:
thread, _ = ChatThread.objects.get_or_create(user=user, defaults={"email": user.email or ""})
return thread
def create_anon_thread(email: str) -> ChatThread:
if not email or "@" not in email:
raise ValidationError({"email": _("Valid email is required for anonymous chats.")})
thread = ChatThread.objects.create(email=email)
_broadcast_staff_inbox({"type": "staff.thread.created", "thread_id": str(thread.uuid)})
return thread
def send_message(thread: ChatThread, *, sender_user: User | None, sender_type: SenderType, text: str) -> ChatMessage:
if not text or len(text) > 1028:
raise ValidationError({"text": _("Message must be 1..1028 characters.")})
if sender_user and not sender_user.is_staff:
if thread.user_id != sender_user.pk:
raise PermissionDenied
msg = ChatMessage.objects.create(
thread=thread,
sender_type=sender_type,
sender_user=sender_user if sender_user and sender_user.is_authenticated else None,
text=text,
sent_at=timezone.now(),
)
thread.last_message_at = msg.sent_at
thread.save(update_fields=["last_message_at", "modified"])
_broadcast_thread_event(
thread,
{
"type": "thread.message",
"message": {
"id": str(msg.uuid),
"thread": str(thread.uuid),
"text": msg.text,
"sender_type": msg.sender_type,
"sender_user": str(msg.sender_user.uuid) if msg.sender_user else None,
"sent_at": msg.sent_at.isoformat(),
},
},
)
if sender_type != SenderType.STAFF:
auto_reply(thread)
return msg
def auto_reply(thread: ChatThread) -> None:
text = _("We're searching for the operator to answer you already, hold by!")
msg = ChatMessage.objects.create( # type: ignore [misc]
thread=thread,
sender_type=SenderType.SYSTEM,
sender_user=None,
text=text,
sent_at=timezone.now(),
)
thread.last_message_at = msg.sent_at
thread.save(update_fields=["last_message_at", "modified"])
_broadcast_thread_event(
thread,
{
"type": "thread.message",
"message": {
"id": str(msg.uuid),
"thread": str(thread.uuid),
"text": msg.text,
"sender_type": msg.sender_type,
"sender_user": None,
"sent_at": msg.sent_at.isoformat(),
},
},
)
def claim_thread(thread: ChatThread, staff_user: User) -> ChatThread:
if not staff_user.is_staff:
raise PermissionDenied
if thread.assigned_to_id and not staff_user.is_superuser:
raise PermissionDenied
thread.assigned_to = staff_user
thread.save(update_fields=["assigned_to", "modified"])
_broadcast_staff_inbox(
{"type": "staff.thread.assigned", "thread_id": str(thread.uuid), "user": str(staff_user.uuid)}
)
return thread
def reassign_thread(thread: ChatThread, superuser: User, new_staff: User) -> ChatThread:
if not superuser.is_superuser:
raise PermissionDenied
if not new_staff.is_staff:
raise ValidationError({"assigned_to": _("Assignee must be a staff user.")})
thread.assigned_to = new_staff
thread.save(update_fields=["assigned_to", "modified"])
_broadcast_staff_inbox(
{"type": "staff.thread.reassigned", "thread_id": str(thread.uuid), "user": str(new_staff.uuid)}
)
return thread
def close_thread(thread: ChatThread, actor: User | None) -> ChatThread:
if actor and actor.is_staff:
pass
elif actor and not actor.is_staff:
raise PermissionDenied
thread.status = ThreadStatus.CLOSED
thread.save(update_fields=["status", "modified"])
_broadcast_staff_inbox({"type": "staff.thread.closed", "thread_id": str(thread.uuid)})
_broadcast_thread_event(thread, {"type": "thread.closed", "thread_id": str(thread.uuid)})
return thread
def find_least_load_staff() -> User | None:
qs = (
User.objects.filter(is_staff=True, is_active=True)
.annotate(
open_threads=Count(
"assigned_chat_threads", filter=models.Q(assigned_chat_threads__status=ThreadStatus.OPEN)
)
)
.order_by("open_threads", "date_joined")
)
return qs.first()