schon/engine/vibes_auth/messaging/consumers.py
Egor fureunoir Gorbunov aa8d40c781 Features: 1) Add mutations for product management (CreateProduct, UpdateProduct, DeleteProduct) with improved attribute and tag resolution; 2) Introduce enhanced GraphQL inputs for better product handling; 3) Add translation support to payment views and callback handling; 4) Refactor Telegram forwarder to use modern typing annotations (| syntax);
Fixes: 1) Remove redundant `from __future__ import annotations` in multiple files; 2) Correct callback integration to handle missing gateway scenarios gracefully; 3) Fix typos and update class references in tests and views;

Extra: Refactor deprecated mutation definitions and cleanup legacy product mutation references in GraphQL schema.
2025-11-14 17:07:40 +03:00

287 lines
11 KiB
Python

from typing import Any
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.conf import settings
from django.core.cache import cache
from django.utils import timezone
from drf_spectacular_websocket.decorators import extend_ws_schema
from engine.vibes_auth.choices import SenderType, ThreadStatus
from engine.vibes_auth.docs.drf.messaging import (
STAFF_INBOX_CONSUMER_SCHEMA,
THREAD_CONSUMER_SCHEMA,
USER_MESSAGE_CONSUMER_SCHEMA,
)
from engine.vibes_auth.messaging.services import (
STAFF_INBOX_GROUP,
THREAD_GROUP_PREFIX,
claim_thread,
close_thread,
get_or_create_user_thread,
send_message,
)
from engine.vibes_auth.models import ChatThread, User
MAX_MESSAGE_LENGTH = 1028
USER_SUPPORT_GROUP_NAME = "User Support"
ANTISPAM_LIMIT_PER_MIN = getattr(settings, "MESSAGING_ANTISPAM_LIMIT_PER_MIN", 20)
def _get_ip(scope) -> str:
client = scope.get("client")
if client and isinstance(client, (list, tuple)) and client:
return str(client[0])
return "0.0.0.0"
async def _is_user_support(user: Any) -> bool:
if not getattr(user, "is_authenticated", False) or not getattr(user, "is_staff", False):
return False
return await sync_to_async(user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists)()
async def _get_or_create_ip_thread(ip: str) -> ChatThread:
def _inner() -> ChatThread:
thread = ChatThread.objects.filter(attributes__ip=ip, status=ThreadStatus.OPEN).order_by("-modified").first()
if thread:
return thread
return ChatThread.objects.create(email="", attributes={"ip": ip})
return await sync_to_async(_inner)()
async def _get_or_create_active_thread_for(user: User | None, ip: str) -> ChatThread:
if user and getattr(user, "is_authenticated", False) and not getattr(user, "is_staff", False):
def _inner_user() -> ChatThread:
t = ChatThread.objects.filter(user=user, status=ThreadStatus.OPEN).order_by("-modified").first()
if t:
return t
return get_or_create_user_thread(user)
return await sync_to_async(_inner_user)()
return await _get_or_create_ip_thread(ip)
async def _antispam_check(ip: str) -> bool:
key = f"msg_rate:{ip}:{timezone.now().strftime('%Y%m%d%H%M')}"
cnt = cache.get(key, 0)
if cnt >= ANTISPAM_LIMIT_PER_MIN:
return False
cache.set(key, cnt + 1, timeout=70)
return True
class UserMessageConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None: # noqa: D401
await self.accept()
@extend_ws_schema(**USER_MESSAGE_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action")
if action == "ping":
await self.send_json({"type": "pong"})
return
text = content.get("text", "")
if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"error": "invalid_payload"})
return
ip = _get_ip(self.scope)
if not await _antispam_check(ip):
await self.send_json({"error": "rate_limited"})
return
user: User | None = self.scope.get("user")
thread = await _get_or_create_active_thread_for(user if user and user.is_authenticated else None, ip)
msg = await sync_to_async(send_message)(
thread,
sender_user=user if user and user.is_authenticated and not user.is_staff else None,
sender_type=SenderType.USER,
text=text,
)
await self.send_json(
{
"ok": True,
"thread_id": str(thread.uuid),
"message_id": str(msg.uuid),
}
)
class StaffInboxConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None:
user = self.scope.get("user")
if not await _is_user_support(user):
await self.close(code=4403)
return
await self.channel_layer.group_add(STAFF_INBOX_GROUP, self.channel_name)
await self.accept()
async def disconnect(self, code: int) -> None:
await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name)
@extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action")
user: User = self.scope.get("user")
if action == "ping":
await self.send_json({"type": "pong"})
return
if action == "list_open":
def _list():
qs = (
ChatThread.objects.filter(status=ThreadStatus.OPEN)
.values("uuid", "user_id", "email", "assigned_to_id", "last_message_at")
.order_by("-modified")
)
return list(qs)
data = await sync_to_async(_list)()
await self.send_json({"type": "inbox.list", "threads": data})
return
if action == "assign":
thread_id = content.get("thread_id")
if not thread_id:
await self.send_json({"error": "thread_id_required"})
return
def _assign():
thread = ChatThread.objects.get(uuid=thread_id)
return claim_thread(thread, user)
try:
t = await sync_to_async(_assign)()
await self.send_json({"type": "assigned", "thread_id": str(t.uuid), "user": str(user.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "assign_failed", "detail": str(e)})
return
if action == "reply":
thread_id = content.get("thread_id")
text = content.get("text", "")
if not thread_id or not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"error": "invalid_payload"})
return
def _can_reply_and_send():
thread = ChatThread.objects.get(uuid=thread_id)
if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
raise PermissionError("not_assigned")
return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
try:
msg = await sync_to_async(_can_reply_and_send)()
await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "reply_failed", "detail": str(e)})
return
if action == "close":
thread_id = content.get("thread_id")
if not thread_id:
await self.send_json({"error": "thread_id_required"})
return
def _close():
thread = ChatThread.objects.get(uuid=thread_id)
return close_thread(thread, user)
try:
t = await sync_to_async(_close)()
await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "close_failed", "detail": str(e)})
return
await self.send_json({"error": "unknown_action"})
async def staff_thread_created(self, event):
await self.send_json({"type": "staff.thread.created", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_assigned(self, event):
await self.send_json({"type": "staff.thread.assigned", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_reassigned(self, event):
await self.send_json({"type": "staff.thread.reassigned", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_closed(self, event):
await self.send_json({"type": "staff.thread.closed", **{k: v for k, v in event.items() if k != "type"}})
class ThreadConsumer(AsyncJsonWebsocketConsumer):
thread_id: Any = None
async def connect(self) -> None:
user = self.scope.get("user")
if not await _is_user_support(user):
await self.close(code=4403)
return
self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id")
await self.channel_layer.group_add(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
await self.accept()
async def disconnect(self, code: int) -> None:
if self.thread_id:
await self.channel_layer.group_discard(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
@extend_ws_schema(**THREAD_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action")
user: User = self.scope.get("user")
if action == "ping":
await self.send_json({"type": "pong", "thread": getattr(self, "thread_id", None)})
return
if action == "reply":
text = content.get("text", "")
if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"error": "invalid_payload"})
return
def _reply():
thread = ChatThread.objects.get(uuid=self.thread_id)
if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
raise PermissionError("not_assigned")
return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
try:
msg = await sync_to_async(_reply)()
await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "reply_failed", "detail": str(e)})
return
if action == "close":
def _close():
thread = ChatThread.objects.get(uuid=self.thread_id)
return close_thread(thread, user)
try:
t = await sync_to_async(_close)()
await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "close_failed", "detail": str(e)})
return
await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True})
async def thread_message(self, event):
await self.send_json({"type": "thread.message", **{k: v for k, v in event.items() if k != "type"}})
async def thread_closed(self, event):
await self.send_json({"type": "thread.closed", **{k: v for k, v in event.items() if k != "type"}})
# TODO: Add functionality so non-staff users may audio call staff-user. The call must fall into the queue where
# staff users may take advantage of answering the call. Many non-staff users may call simultaneously, that's why we need the queue