schon/engine/vibes_auth/messaging/consumers.py
Egor fureunoir Gorbunov 0464c1b11b Features: 1) Introduced Telegram forwarder with bot functionality for message forwarding and user support; 2) Added new commands (/start, /unlink, /help) for Telegram bot; 3) Enabled webhook integration and message linking via Telegram.
Fixes: 1) Replaced legacy `TELEGRAM_API_TOKEN` configuration with `TELEGRAM_TOKEN`; 2) Incorporated anti-spam checks for user messages to prevent abuse.

Extra: Refactored websocket consumers by integrating Telegram support and enhancing thread-assignment workflows; improved logging and API consistency; minor cleanup and deprecations.
2025-11-12 11:23:44 +03:00

289 lines
11 KiB
Python

from __future__ import annotations
from typing import Any
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from drf_spectacular_websocket.decorators import extend_ws_schema
from django.conf import settings
from django.core.cache import cache
from django.utils import timezone
from engine.vibes_auth.docs.drf.messaging import (
STAFF_INBOX_CONSUMER_SCHEMA,
THREAD_CONSUMER_SCHEMA,
USER_MESSAGE_CONSUMER_SCHEMA,
)
from engine.vibes_auth.messaging.services import (
STAFF_INBOX_GROUP,
THREAD_GROUP_PREFIX,
claim_thread,
close_thread,
get_or_create_user_thread,
send_message,
)
from engine.vibes_auth.models import ChatThread, User
from engine.vibes_auth.choices import SenderType, ThreadStatus
MAX_MESSAGE_LENGTH = 1028
USER_SUPPORT_GROUP_NAME = "User Support"
ANTISPAM_LIMIT_PER_MIN = getattr(settings, "MESSAGING_ANTISPAM_LIMIT_PER_MIN", 20)
def _get_ip(scope) -> str:
client = scope.get("client")
if client and isinstance(client, (list, tuple)) and client:
return str(client[0])
return "0.0.0.0"
async def _is_user_support(user: Any) -> bool:
if not getattr(user, "is_authenticated", False) or not getattr(user, "is_staff", False):
return False
return await sync_to_async(user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists)()
async def _get_or_create_ip_thread(ip: str) -> ChatThread:
def _inner() -> ChatThread:
thread = ChatThread.objects.filter(attributes__ip=ip, status=ThreadStatus.OPEN).order_by("-modified").first()
if thread:
return thread
return ChatThread.objects.create(email="", attributes={"ip": ip})
return await sync_to_async(_inner)()
async def _get_or_create_active_thread_for(user: User | None, ip: str) -> ChatThread:
if user and getattr(user, "is_authenticated", False) and not getattr(user, "is_staff", False):
def _inner_user() -> ChatThread:
t = ChatThread.objects.filter(user=user, status=ThreadStatus.OPEN).order_by("-modified").first()
if t:
return t
return get_or_create_user_thread(user)
return await sync_to_async(_inner_user)()
return await _get_or_create_ip_thread(ip)
async def _antispam_check(ip: str) -> bool:
key = f"msg_rate:{ip}:{timezone.now().strftime('%Y%m%d%H%M')}"
cnt = cache.get(key, 0)
if cnt >= ANTISPAM_LIMIT_PER_MIN:
return False
cache.set(key, cnt + 1, timeout=70)
return True
class UserMessageConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None: # noqa: D401
await self.accept()
@extend_ws_schema(**USER_MESSAGE_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action")
if action == "ping":
await self.send_json({"type": "pong"})
return
text = content.get("text", "")
if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"error": "invalid_payload"})
return
ip = _get_ip(self.scope)
if not await _antispam_check(ip):
await self.send_json({"error": "rate_limited"})
return
user: User | None = self.scope.get("user")
thread = await _get_or_create_active_thread_for(user if user and user.is_authenticated else None, ip)
msg = await sync_to_async(send_message)(
thread,
sender_user=user if user and user.is_authenticated and not user.is_staff else None,
sender_type=SenderType.USER,
text=text,
)
await self.send_json(
{
"ok": True,
"thread_id": str(thread.uuid),
"message_id": str(msg.uuid),
}
)
class StaffInboxConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None:
user = self.scope.get("user")
if not await _is_user_support(user):
await self.close(code=4403)
return
await self.channel_layer.group_add(STAFF_INBOX_GROUP, self.channel_name)
await self.accept()
async def disconnect(self, code: int) -> None:
await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name)
@extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action")
user: User = self.scope.get("user")
if action == "ping":
await self.send_json({"type": "pong"})
return
if action == "list_open":
def _list():
qs = (
ChatThread.objects.filter(status=ThreadStatus.OPEN)
.values("uuid", "user_id", "email", "assigned_to_id", "last_message_at")
.order_by("-modified")
)
return list(qs)
data = await sync_to_async(_list)()
await self.send_json({"type": "inbox.list", "threads": data})
return
if action == "assign":
thread_id = content.get("thread_id")
if not thread_id:
await self.send_json({"error": "thread_id_required"})
return
def _assign():
thread = ChatThread.objects.get(uuid=thread_id)
return claim_thread(thread, user)
try:
t = await sync_to_async(_assign)()
await self.send_json({"type": "assigned", "thread_id": str(t.uuid), "user": str(user.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "assign_failed", "detail": str(e)})
return
if action == "reply":
thread_id = content.get("thread_id")
text = content.get("text", "")
if not thread_id or not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"error": "invalid_payload"})
return
def _can_reply_and_send():
thread = ChatThread.objects.get(uuid=thread_id)
if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
raise PermissionError("not_assigned")
return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
try:
msg = await sync_to_async(_can_reply_and_send)()
await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "reply_failed", "detail": str(e)})
return
if action == "close":
thread_id = content.get("thread_id")
if not thread_id:
await self.send_json({"error": "thread_id_required"})
return
def _close():
thread = ChatThread.objects.get(uuid=thread_id)
return close_thread(thread, user)
try:
t = await sync_to_async(_close)()
await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "close_failed", "detail": str(e)})
return
await self.send_json({"error": "unknown_action"})
async def staff_thread_created(self, event):
await self.send_json({"type": "staff.thread.created", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_assigned(self, event):
await self.send_json({"type": "staff.thread.assigned", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_reassigned(self, event):
await self.send_json({"type": "staff.thread.reassigned", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_closed(self, event):
await self.send_json({"type": "staff.thread.closed", **{k: v for k, v in event.items() if k != "type"}})
class ThreadConsumer(AsyncJsonWebsocketConsumer):
thread_id: Any = None
async def connect(self) -> None:
user = self.scope.get("user")
if not await _is_user_support(user):
await self.close(code=4403)
return
self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id")
await self.channel_layer.group_add(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
await self.accept()
async def disconnect(self, code: int) -> None:
if self.thread_id:
await self.channel_layer.group_discard(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
@extend_ws_schema(**THREAD_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action")
user: User = self.scope.get("user")
if action == "ping":
await self.send_json({"type": "pong", "thread": getattr(self, "thread_id", None)})
return
if action == "reply":
text = content.get("text", "")
if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"error": "invalid_payload"})
return
def _reply():
thread = ChatThread.objects.get(uuid=self.thread_id)
if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
raise PermissionError("not_assigned")
return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
try:
msg = await sync_to_async(_reply)()
await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "reply_failed", "detail": str(e)})
return
if action == "close":
def _close():
thread = ChatThread.objects.get(uuid=self.thread_id)
return close_thread(thread, user)
try:
t = await sync_to_async(_close)()
await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "close_failed", "detail": str(e)})
return
await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True})
async def thread_message(self, event):
await self.send_json({"type": "thread.message", **{k: v for k, v in event.items() if k != "type"}})
async def thread_closed(self, event):
await self.send_json({"type": "thread.closed", **{k: v for k, v in event.items() if k != "type"}})
# TODO: Add functionality so non-staff users may audio call staff-user. The call must fall into the queue where
# staff users may take advantage of answering the call. Many non-staff users may call simultaneously, that's why we need the queue