schon/engine/vibes_auth/messaging/services.py
Egor fureunoir Gorbunov dc7f8be926 Features: 1) None;
Fixes: 1) Add `# ty: ignore` comments to suppress type errors in multiple files; 2) Correct method argument annotations and definitions to align with type hints; 3) Fix cases of invalid or missing imports and unresolved attributes;

Extra: Refactor method definitions to use tuple-based method declarations; replace custom type aliases with `Any`; improve caching utility and error handling logic in utility scripts.
2025-12-19 16:43:39 +03:00

192 lines
5.9 KiB
Python

from dataclasses import dataclass
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.core.exceptions import PermissionDenied, ValidationError
from django.db import models
from django.db.models import Count
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from engine.vibes_auth.choices import SenderType, ThreadStatus
from engine.vibes_auth.messaging.forwarders.telegram import (
forward_thread_message_to_assigned_staff,
is_telegram_enabled,
)
from engine.vibes_auth.models import ChatMessage, ChatThread, User
THREAD_GROUP_PREFIX = "thread:"
STAFF_INBOX_GROUP = "staff:inbox"
@dataclass
class MessageDTO:
id: str
thread: str
text: str
sender_type: str
sender_user: str | None
sent_at: str
def _broadcast_thread_event(thread: ChatThread, event: dict) -> None:
layer = get_channel_layer()
async_to_sync(layer.group_send)(f"{THREAD_GROUP_PREFIX}{thread.uuid}", event)
def _broadcast_staff_inbox(event: dict) -> None:
layer = get_channel_layer()
async_to_sync(layer.group_send)(STAFF_INBOX_GROUP, event)
def get_or_create_user_thread(user: User) -> ChatThread:
thread, _ = ChatThread.objects.get_or_create(
user=user, defaults={"email": user.email or ""}
)
return thread
def create_anon_thread(email: str) -> ChatThread:
if not email or "@" not in email:
raise ValidationError(
{"email": _("Valid email is required for anonymous chats.")}
)
thread = ChatThread.objects.create(email=email)
_broadcast_staff_inbox(
{"type": "staff.thread.created", "thread_id": str(thread.uuid)}
)
return thread
def send_message(
thread: ChatThread, *, sender_user: User | None, sender_type: SenderType, text: str
) -> ChatMessage:
if not text or len(text) > 1028:
raise ValidationError({"text": _("Message must be 1..1028 characters.")})
if sender_user and not sender_user.is_staff:
if thread.user_id != sender_user.pk: # ty: ignore[unresolved-attribute]
raise PermissionDenied
msg = ChatMessage.objects.create(
thread=thread,
sender_type=sender_type,
sender_user=sender_user
if sender_user and sender_user.is_authenticated
else None,
text=text,
sent_at=timezone.now(),
)
thread.last_message_at = msg.sent_at
thread.save(update_fields=["last_message_at", "modified"])
_broadcast_thread_event(
thread,
{
"type": "thread.message",
"message": {
"id": str(msg.uuid),
"thread": str(thread.uuid),
"text": msg.text,
"sender_type": msg.sender_type,
"sender_user": str(msg.sender_user.uuid) if msg.sender_user else None,
"sent_at": msg.sent_at.isoformat(),
},
},
)
if sender_type != SenderType.STAFF:
if is_telegram_enabled():
async_to_sync(forward_thread_message_to_assigned_staff)(
str(thread.uuid), text
)
auto_reply(thread)
return msg
def auto_reply(thread: ChatThread) -> None:
text = _("We're searching for the operator to answer you already, hold by!")
msg = ChatMessage.objects.create(
thread=thread,
sender_type=SenderType.SYSTEM,
sender_user=None,
text=text,
sent_at=timezone.now(),
)
thread.last_message_at = msg.sent_at
thread.save(update_fields=["last_message_at", "modified"])
_broadcast_thread_event(
thread,
{
"type": "thread.message",
"message": {
"id": str(msg.uuid),
"thread": str(thread.uuid),
"text": msg.text,
"sender_type": msg.sender_type,
"sender_user": None,
"sent_at": msg.sent_at.isoformat(),
},
},
)
def claim_thread(thread: ChatThread, staff_user: User) -> ChatThread:
if not staff_user.is_staff:
raise PermissionDenied
if thread.assigned_to_id and not staff_user.is_superuser: # ty: ignore[unresolved-attribute]
raise PermissionDenied
thread.assigned_to = staff_user
thread.save(update_fields=["assigned_to", "modified"])
_broadcast_staff_inbox(
{
"type": "staff.thread.assigned",
"thread_id": str(thread.uuid),
"user": str(staff_user.uuid),
}
)
return thread
def reassign_thread(thread: ChatThread, superuser: User, new_staff: User) -> ChatThread:
if not superuser.is_superuser:
raise PermissionDenied
if not new_staff.is_staff:
raise ValidationError({"assigned_to": _("Assignee must be a staff user.")})
thread.assigned_to = new_staff
thread.save(update_fields=["assigned_to", "modified"])
_broadcast_staff_inbox(
{
"type": "staff.thread.reassigned",
"thread_id": str(thread.uuid),
"user": str(new_staff.uuid),
}
)
return thread
def close_thread(thread: ChatThread, actor: User | None) -> ChatThread:
if actor and actor.is_staff:
pass
elif actor and not actor.is_staff:
raise PermissionDenied
thread.status = ThreadStatus.CLOSED
thread.save(update_fields=["status", "modified"])
_broadcast_staff_inbox(
{"type": "staff.thread.closed", "thread_id": str(thread.uuid)}
)
_broadcast_thread_event(
thread, {"type": "thread.closed", "thread_id": str(thread.uuid)}
)
return thread
def find_least_load_staff() -> User | None:
qs = (
User.objects.filter(is_staff=True, is_active=True)
.annotate(
open_threads=Count(
"assigned_chat_threads",
filter=models.Q(assigned_chat_threads__status=ThreadStatus.OPEN),
)
)
.order_by("open_threads", "date_joined")
)
return qs.first()