from dataclasses import dataclass from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from django.core.exceptions import PermissionDenied, ValidationError from django.db import models from django.db.models import Count from django.utils import timezone from django.utils.translation import gettext_lazy as _ from engine.vibes_auth.choices import SenderType, ThreadStatus from engine.vibes_auth.messaging.forwarders.telegram import ( forward_thread_message_to_assigned_staff, is_telegram_enabled, ) from engine.vibes_auth.models import ChatMessage, ChatThread, User THREAD_GROUP_PREFIX = "thread:" STAFF_INBOX_GROUP = "staff:inbox" @dataclass class MessageDTO: id: str thread: str text: str sender_type: str sender_user: str | None sent_at: str def _broadcast_thread_event(thread: ChatThread, event: dict) -> None: layer = get_channel_layer() async_to_sync(layer.group_send)(f"{THREAD_GROUP_PREFIX}{thread.uuid}", event) def _broadcast_staff_inbox(event: dict) -> None: layer = get_channel_layer() async_to_sync(layer.group_send)(STAFF_INBOX_GROUP, event) def get_or_create_user_thread(user: User) -> ChatThread: thread, _ = ChatThread.objects.get_or_create( user=user, defaults={"email": user.email or ""} ) return thread def create_anon_thread(email: str) -> ChatThread: if not email or "@" not in email: raise ValidationError( {"email": _("Valid email is required for anonymous chats.")} ) thread = ChatThread.objects.create(email=email) _broadcast_staff_inbox( {"type": "staff.thread.created", "thread_id": str(thread.uuid)} ) return thread def send_message( thread: ChatThread, *, sender_user: User | None, sender_type: SenderType, text: str ) -> ChatMessage: if not text or len(text) > 1028: raise ValidationError({"text": _("Message must be 1..1028 characters.")}) if sender_user and not sender_user.is_staff: if thread.user_id != sender_user.pk: # ty: ignore[unresolved-attribute] raise PermissionDenied msg = ChatMessage.objects.create( thread=thread, sender_type=sender_type, sender_user=sender_user if sender_user and sender_user.is_authenticated else None, text=text, sent_at=timezone.now(), ) thread.last_message_at = msg.sent_at thread.save(update_fields=["last_message_at", "modified"]) _broadcast_thread_event( thread, { "type": "thread.message", "message": { "id": str(msg.uuid), "thread": str(thread.uuid), "text": msg.text, "sender_type": msg.sender_type, "sender_user": str(msg.sender_user.uuid) if msg.sender_user else None, "sent_at": msg.sent_at.isoformat(), }, }, ) if sender_type != SenderType.STAFF: if is_telegram_enabled(): async_to_sync(forward_thread_message_to_assigned_staff)( str(thread.uuid), text ) auto_reply(thread) return msg def auto_reply(thread: ChatThread) -> None: text = _("We're searching for the operator to answer you already, hold by!") msg = ChatMessage.objects.create( thread=thread, sender_type=SenderType.SYSTEM, sender_user=None, text=text, sent_at=timezone.now(), ) thread.last_message_at = msg.sent_at thread.save(update_fields=["last_message_at", "modified"]) _broadcast_thread_event( thread, { "type": "thread.message", "message": { "id": str(msg.uuid), "thread": str(thread.uuid), "text": msg.text, "sender_type": msg.sender_type, "sender_user": None, "sent_at": msg.sent_at.isoformat(), }, }, ) def claim_thread(thread: ChatThread, staff_user: User) -> ChatThread: if not staff_user.is_staff: raise PermissionDenied if thread.assigned_to_id and not staff_user.is_superuser: # ty: ignore[unresolved-attribute] raise PermissionDenied thread.assigned_to = staff_user thread.save(update_fields=["assigned_to", "modified"]) _broadcast_staff_inbox( { "type": "staff.thread.assigned", "thread_id": str(thread.uuid), "user": str(staff_user.uuid), } ) return thread def reassign_thread(thread: ChatThread, superuser: User, new_staff: User) -> ChatThread: if not superuser.is_superuser: raise PermissionDenied if not new_staff.is_staff: raise ValidationError({"assigned_to": _("Assignee must be a staff user.")}) thread.assigned_to = new_staff thread.save(update_fields=["assigned_to", "modified"]) _broadcast_staff_inbox( { "type": "staff.thread.reassigned", "thread_id": str(thread.uuid), "user": str(new_staff.uuid), } ) return thread def close_thread(thread: ChatThread, actor: User | None) -> ChatThread: if actor and actor.is_staff: pass elif actor and not actor.is_staff: raise PermissionDenied thread.status = ThreadStatus.CLOSED thread.save(update_fields=["status", "modified"]) _broadcast_staff_inbox( {"type": "staff.thread.closed", "thread_id": str(thread.uuid)} ) _broadcast_thread_event( thread, {"type": "thread.closed", "thread_id": str(thread.uuid)} ) return thread def find_least_load_staff() -> User | None: qs = ( User.objects.filter(is_staff=True, is_active=True) .annotate( open_threads=Count( "assigned_chat_threads", filter=models.Q(assigned_chat_threads__status=ThreadStatus.OPEN), ) ) .order_by("open_threads", "date_joined") ) return qs.first()