Fixes: 1) Replaced legacy `TELEGRAM_API_TOKEN` configuration with `TELEGRAM_TOKEN`; 2) Incorporated anti-spam checks for user messages to prevent abuse. Extra: Refactored websocket consumers by integrating Telegram support and enhancing thread-assignment workflows; improved logging and API consistency; minor cleanup and deprecations.
289 lines
11 KiB
Python
289 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from asgiref.sync import sync_to_async
|
|
from channels.generic.websocket import AsyncJsonWebsocketConsumer
|
|
from drf_spectacular_websocket.decorators import extend_ws_schema
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
from django.utils import timezone
|
|
|
|
from engine.vibes_auth.docs.drf.messaging import (
|
|
STAFF_INBOX_CONSUMER_SCHEMA,
|
|
THREAD_CONSUMER_SCHEMA,
|
|
USER_MESSAGE_CONSUMER_SCHEMA,
|
|
)
|
|
from engine.vibes_auth.messaging.services import (
|
|
STAFF_INBOX_GROUP,
|
|
THREAD_GROUP_PREFIX,
|
|
claim_thread,
|
|
close_thread,
|
|
get_or_create_user_thread,
|
|
send_message,
|
|
)
|
|
from engine.vibes_auth.models import ChatThread, User
|
|
from engine.vibes_auth.choices import SenderType, ThreadStatus
|
|
|
|
MAX_MESSAGE_LENGTH = 1028
|
|
USER_SUPPORT_GROUP_NAME = "User Support"
|
|
ANTISPAM_LIMIT_PER_MIN = getattr(settings, "MESSAGING_ANTISPAM_LIMIT_PER_MIN", 20)
|
|
|
|
|
|
def _get_ip(scope) -> str:
|
|
client = scope.get("client")
|
|
if client and isinstance(client, (list, tuple)) and client:
|
|
return str(client[0])
|
|
return "0.0.0.0"
|
|
|
|
|
|
async def _is_user_support(user: Any) -> bool:
|
|
if not getattr(user, "is_authenticated", False) or not getattr(user, "is_staff", False):
|
|
return False
|
|
return await sync_to_async(user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists)()
|
|
|
|
|
|
async def _get_or_create_ip_thread(ip: str) -> ChatThread:
|
|
def _inner() -> ChatThread:
|
|
thread = ChatThread.objects.filter(attributes__ip=ip, status=ThreadStatus.OPEN).order_by("-modified").first()
|
|
if thread:
|
|
return thread
|
|
return ChatThread.objects.create(email="", attributes={"ip": ip})
|
|
|
|
return await sync_to_async(_inner)()
|
|
|
|
|
|
async def _get_or_create_active_thread_for(user: User | None, ip: str) -> ChatThread:
|
|
if user and getattr(user, "is_authenticated", False) and not getattr(user, "is_staff", False):
|
|
|
|
def _inner_user() -> ChatThread:
|
|
t = ChatThread.objects.filter(user=user, status=ThreadStatus.OPEN).order_by("-modified").first()
|
|
if t:
|
|
return t
|
|
return get_or_create_user_thread(user)
|
|
|
|
return await sync_to_async(_inner_user)()
|
|
return await _get_or_create_ip_thread(ip)
|
|
|
|
|
|
async def _antispam_check(ip: str) -> bool:
|
|
key = f"msg_rate:{ip}:{timezone.now().strftime('%Y%m%d%H%M')}"
|
|
cnt = cache.get(key, 0)
|
|
if cnt >= ANTISPAM_LIMIT_PER_MIN:
|
|
return False
|
|
cache.set(key, cnt + 1, timeout=70)
|
|
return True
|
|
|
|
|
|
class UserMessageConsumer(AsyncJsonWebsocketConsumer):
|
|
async def connect(self) -> None: # noqa: D401
|
|
await self.accept()
|
|
|
|
@extend_ws_schema(**USER_MESSAGE_CONSUMER_SCHEMA)
|
|
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
|
|
action = content.get("action")
|
|
if action == "ping":
|
|
await self.send_json({"type": "pong"})
|
|
return
|
|
|
|
text = content.get("text", "")
|
|
if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
|
|
await self.send_json({"error": "invalid_payload"})
|
|
return
|
|
|
|
ip = _get_ip(self.scope)
|
|
if not await _antispam_check(ip):
|
|
await self.send_json({"error": "rate_limited"})
|
|
return
|
|
|
|
user: User | None = self.scope.get("user")
|
|
thread = await _get_or_create_active_thread_for(user if user and user.is_authenticated else None, ip)
|
|
|
|
msg = await sync_to_async(send_message)(
|
|
thread,
|
|
sender_user=user if user and user.is_authenticated and not user.is_staff else None,
|
|
sender_type=SenderType.USER,
|
|
text=text,
|
|
)
|
|
|
|
await self.send_json(
|
|
{
|
|
"ok": True,
|
|
"thread_id": str(thread.uuid),
|
|
"message_id": str(msg.uuid),
|
|
}
|
|
)
|
|
|
|
|
|
class StaffInboxConsumer(AsyncJsonWebsocketConsumer):
|
|
async def connect(self) -> None:
|
|
user = self.scope.get("user")
|
|
if not await _is_user_support(user):
|
|
await self.close(code=4403)
|
|
return
|
|
await self.channel_layer.group_add(STAFF_INBOX_GROUP, self.channel_name)
|
|
await self.accept()
|
|
|
|
async def disconnect(self, code: int) -> None:
|
|
await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name)
|
|
|
|
@extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA)
|
|
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
|
|
action = content.get("action")
|
|
user: User = self.scope.get("user")
|
|
|
|
if action == "ping":
|
|
await self.send_json({"type": "pong"})
|
|
return
|
|
|
|
if action == "list_open":
|
|
|
|
def _list():
|
|
qs = (
|
|
ChatThread.objects.filter(status=ThreadStatus.OPEN)
|
|
.values("uuid", "user_id", "email", "assigned_to_id", "last_message_at")
|
|
.order_by("-modified")
|
|
)
|
|
return list(qs)
|
|
|
|
data = await sync_to_async(_list)()
|
|
await self.send_json({"type": "inbox.list", "threads": data})
|
|
return
|
|
|
|
if action == "assign":
|
|
thread_id = content.get("thread_id")
|
|
if not thread_id:
|
|
await self.send_json({"error": "thread_id_required"})
|
|
return
|
|
|
|
def _assign():
|
|
thread = ChatThread.objects.get(uuid=thread_id)
|
|
return claim_thread(thread, user)
|
|
|
|
try:
|
|
t = await sync_to_async(_assign)()
|
|
await self.send_json({"type": "assigned", "thread_id": str(t.uuid), "user": str(user.uuid)})
|
|
except Exception as e: # noqa: BLE001
|
|
await self.send_json({"error": "assign_failed", "detail": str(e)})
|
|
return
|
|
|
|
if action == "reply":
|
|
thread_id = content.get("thread_id")
|
|
text = content.get("text", "")
|
|
if not thread_id or not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
|
|
await self.send_json({"error": "invalid_payload"})
|
|
return
|
|
|
|
def _can_reply_and_send():
|
|
thread = ChatThread.objects.get(uuid=thread_id)
|
|
if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
|
|
raise PermissionError("not_assigned")
|
|
return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
|
|
|
|
try:
|
|
msg = await sync_to_async(_can_reply_and_send)()
|
|
await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
|
|
except Exception as e: # noqa: BLE001
|
|
await self.send_json({"error": "reply_failed", "detail": str(e)})
|
|
return
|
|
|
|
if action == "close":
|
|
thread_id = content.get("thread_id")
|
|
if not thread_id:
|
|
await self.send_json({"error": "thread_id_required"})
|
|
return
|
|
|
|
def _close():
|
|
thread = ChatThread.objects.get(uuid=thread_id)
|
|
return close_thread(thread, user)
|
|
|
|
try:
|
|
t = await sync_to_async(_close)()
|
|
await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
|
|
except Exception as e: # noqa: BLE001
|
|
await self.send_json({"error": "close_failed", "detail": str(e)})
|
|
return
|
|
|
|
await self.send_json({"error": "unknown_action"})
|
|
|
|
async def staff_thread_created(self, event):
|
|
await self.send_json({"type": "staff.thread.created", **{k: v for k, v in event.items() if k != "type"}})
|
|
|
|
async def staff_thread_assigned(self, event):
|
|
await self.send_json({"type": "staff.thread.assigned", **{k: v for k, v in event.items() if k != "type"}})
|
|
|
|
async def staff_thread_reassigned(self, event):
|
|
await self.send_json({"type": "staff.thread.reassigned", **{k: v for k, v in event.items() if k != "type"}})
|
|
|
|
async def staff_thread_closed(self, event):
|
|
await self.send_json({"type": "staff.thread.closed", **{k: v for k, v in event.items() if k != "type"}})
|
|
|
|
|
|
class ThreadConsumer(AsyncJsonWebsocketConsumer):
|
|
thread_id: Any = None
|
|
|
|
async def connect(self) -> None:
|
|
user = self.scope.get("user")
|
|
if not await _is_user_support(user):
|
|
await self.close(code=4403)
|
|
return
|
|
self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id")
|
|
await self.channel_layer.group_add(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
|
|
await self.accept()
|
|
|
|
async def disconnect(self, code: int) -> None:
|
|
if self.thread_id:
|
|
await self.channel_layer.group_discard(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
|
|
|
|
@extend_ws_schema(**THREAD_CONSUMER_SCHEMA)
|
|
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
|
|
action = content.get("action")
|
|
user: User = self.scope.get("user")
|
|
|
|
if action == "ping":
|
|
await self.send_json({"type": "pong", "thread": getattr(self, "thread_id", None)})
|
|
return
|
|
|
|
if action == "reply":
|
|
text = content.get("text", "")
|
|
if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
|
|
await self.send_json({"error": "invalid_payload"})
|
|
return
|
|
|
|
def _reply():
|
|
thread = ChatThread.objects.get(uuid=self.thread_id)
|
|
if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
|
|
raise PermissionError("not_assigned")
|
|
return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
|
|
|
|
try:
|
|
msg = await sync_to_async(_reply)()
|
|
await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
|
|
except Exception as e: # noqa: BLE001
|
|
await self.send_json({"error": "reply_failed", "detail": str(e)})
|
|
return
|
|
|
|
if action == "close":
|
|
|
|
def _close():
|
|
thread = ChatThread.objects.get(uuid=self.thread_id)
|
|
return close_thread(thread, user)
|
|
|
|
try:
|
|
t = await sync_to_async(_close)()
|
|
await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
|
|
except Exception as e: # noqa: BLE001
|
|
await self.send_json({"error": "close_failed", "detail": str(e)})
|
|
return
|
|
|
|
await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True})
|
|
|
|
async def thread_message(self, event):
|
|
await self.send_json({"type": "thread.message", **{k: v for k, v in event.items() if k != "type"}})
|
|
|
|
async def thread_closed(self, event):
|
|
await self.send_json({"type": "thread.closed", **{k: v for k, v in event.items() if k != "type"}})
|
|
|
|
|
|
# TODO: Add functionality so non-staff users may audio call staff-user. The call must fall into the queue where
|
|
# staff users may take advantage of answering the call. Many non-staff users may call simultaneously, that's why we need the queue
|