from typing import Any from asgiref.sync import sync_to_async from channels.generic.websocket import AsyncJsonWebsocketConsumer from django.conf import settings from django.core.cache import cache from django.utils import timezone from drf_spectacular_websocket.decorators import extend_ws_schema from engine.vibes_auth.choices import SenderType, ThreadStatus from engine.vibes_auth.docs.drf.messaging import ( STAFF_INBOX_CONSUMER_SCHEMA, THREAD_CONSUMER_SCHEMA, USER_MESSAGE_CONSUMER_SCHEMA, ) from engine.vibes_auth.messaging.services import ( STAFF_INBOX_GROUP, THREAD_GROUP_PREFIX, claim_thread, close_thread, get_or_create_user_thread, send_message, ) from engine.vibes_auth.models import ChatThread, User MAX_MESSAGE_LENGTH = 1028 USER_SUPPORT_GROUP_NAME = "User Support" ANTISPAM_LIMIT_PER_MIN = getattr(settings, "MESSAGING_ANTISPAM_LIMIT_PER_MIN", 20) def _get_ip(scope) -> str: client = scope.get("client") if client and isinstance(client, (list, tuple)) and client: return str(client[0]) return "0.0.0.0" async def _is_user_support(user: Any) -> bool: if not getattr(user, "is_authenticated", False) or not getattr( user, "is_staff", False ): return False return await sync_to_async( user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists )() async def _get_or_create_ip_thread(ip: str) -> ChatThread: def _inner() -> ChatThread: thread = ( ChatThread.objects.filter(attributes__ip=ip, status=ThreadStatus.OPEN) .order_by("-modified") .first() ) if thread: return thread return ChatThread.objects.create(email="", attributes={"ip": ip}) return await sync_to_async(_inner)() async def _get_or_create_active_thread_for(user: User | None, ip: str) -> ChatThread: if ( user and getattr(user, "is_authenticated", False) and not getattr(user, "is_staff", False) ): def _inner_user() -> ChatThread: t = ( ChatThread.objects.filter(user=user, status=ThreadStatus.OPEN) .order_by("-modified") .first() ) if t: return t return get_or_create_user_thread(user) return await sync_to_async(_inner_user)() return await _get_or_create_ip_thread(ip) async def _antispam_check(ip: str) -> bool: key = f"msg_rate:{ip}:{timezone.now().strftime('%Y%m%d%H%M')}" cnt = cache.get(key, 0) if cnt >= ANTISPAM_LIMIT_PER_MIN: return False cache.set(key, cnt + 1, timeout=70) return True class UserMessageConsumer(AsyncJsonWebsocketConsumer): async def connect(self) -> None: # noqa: D401 await self.accept() @extend_ws_schema(**USER_MESSAGE_CONSUMER_SCHEMA) # ty: ignore[invalid-argument-type] async def receive_json(self, content: dict[str, Any], **kwargs) -> None: action = content.get("action") if action == "ping": await self.send_json({"type": "pong"}) return text = content.get("text", "") if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH): await self.send_json({"error": "invalid_payload"}) return ip = _get_ip(self.scope) if not await _antispam_check(ip): await self.send_json({"error": "rate_limited"}) return user: User | None = self.scope.get("user") thread = await _get_or_create_active_thread_for( user if user and user.is_authenticated else None, ip ) msg = await sync_to_async(send_message)( thread, sender_user=user if user and user.is_authenticated and not user.is_staff else None, sender_type=SenderType.USER, text=text, ) await self.send_json( { "ok": True, "thread_id": str(thread.uuid), "message_id": str(msg.uuid), } ) class StaffInboxConsumer(AsyncJsonWebsocketConsumer): async def connect(self) -> None: user = self.scope.get("user") if not await _is_user_support(user): await self.close(code=4403) return await self.channel_layer.group_add(STAFF_INBOX_GROUP, self.channel_name) await self.accept() async def disconnect(self, code: int) -> None: await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name) @extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA) # ty: ignore[invalid-argument-type] async def receive_json(self, content: dict[str, Any], **kwargs) -> None: action = content.get("action") user: User = self.scope.get("user") if action == "ping": await self.send_json({"type": "pong"}) return if action == "list_open": def _list(): qs = ( ChatThread.objects.filter(status=ThreadStatus.OPEN) .values( "uuid", "user_id", "email", "assigned_to_id", "last_message_at" ) .order_by("-modified") ) return list(qs) data = await sync_to_async(_list)() await self.send_json({"type": "inbox.list", "threads": data}) return if action == "assign": thread_id = content.get("thread_id") if not thread_id: await self.send_json({"error": "thread_id_required"}) return def _assign(): thread = ChatThread.objects.get(uuid=thread_id) return claim_thread(thread, user) try: t = await sync_to_async(_assign)() await self.send_json( { "type": "assigned", "thread_id": str(t.uuid), "user": str(user.uuid), } ) except Exception as e: # noqa: BLE001 await self.send_json({"error": "assign_failed", "detail": str(e)}) return if action == "reply": thread_id = content.get("thread_id") text = content.get("text", "") if ( not thread_id or not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH) ): await self.send_json({"error": "invalid_payload"}) return def _can_reply_and_send(): thread = ChatThread.objects.get(uuid=thread_id) if ( thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser ): raise PermissionError("not_assigned") return send_message( thread, sender_user=user, sender_type=SenderType.STAFF, text=text ) try: msg = await sync_to_async(_can_reply_and_send)() await self.send_json({"type": "replied", "message_id": str(msg.uuid)}) except Exception as e: # noqa: BLE001 await self.send_json({"error": "reply_failed", "detail": str(e)}) return if action == "close": thread_id = content.get("thread_id") if not thread_id: await self.send_json({"error": "thread_id_required"}) return def _close(): thread = ChatThread.objects.get(uuid=thread_id) return close_thread(thread, user) try: t = await sync_to_async(_close)() await self.send_json({"type": "closed", "thread_id": str(t.uuid)}) except Exception as e: # noqa: BLE001 await self.send_json({"error": "close_failed", "detail": str(e)}) return await self.send_json({"error": "unknown_action"}) async def staff_thread_created(self, event): await self.send_json( { "type": "staff.thread.created", **{k: v for k, v in event.items() if k != "type"}, } ) async def staff_thread_assigned(self, event): await self.send_json( { "type": "staff.thread.assigned", **{k: v for k, v in event.items() if k != "type"}, } ) async def staff_thread_reassigned(self, event): await self.send_json( { "type": "staff.thread.reassigned", **{k: v for k, v in event.items() if k != "type"}, } ) async def staff_thread_closed(self, event): await self.send_json( { "type": "staff.thread.closed", **{k: v for k, v in event.items() if k != "type"}, } ) class ThreadConsumer(AsyncJsonWebsocketConsumer): thread_id: Any = None async def connect(self) -> None: user = self.scope.get("user") if not await _is_user_support(user): await self.close(code=4403) return self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id") await self.channel_layer.group_add( f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name ) await self.accept() async def disconnect(self, code: int) -> None: if self.thread_id: await self.channel_layer.group_discard( f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name ) @extend_ws_schema(**THREAD_CONSUMER_SCHEMA) # ty: ignore[invalid-argument-type] async def receive_json(self, content: dict[str, Any], **kwargs) -> None: action = content.get("action") user: User = self.scope.get("user") if action == "ping": await self.send_json( {"type": "pong", "thread": getattr(self, "thread_id", None)} ) return if action == "reply": text = content.get("text", "") if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH): await self.send_json({"error": "invalid_payload"}) return def _reply(): thread = ChatThread.objects.get(uuid=self.thread_id) if ( thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser ): raise PermissionError("not_assigned") return send_message( thread, sender_user=user, sender_type=SenderType.STAFF, text=text ) try: msg = await sync_to_async(_reply)() await self.send_json({"type": "replied", "message_id": str(msg.uuid)}) except Exception as e: # noqa: BLE001 await self.send_json({"error": "reply_failed", "detail": str(e)}) return if action == "close": def _close(): thread = ChatThread.objects.get(uuid=self.thread_id) return close_thread(thread, user) try: t = await sync_to_async(_close)() await self.send_json({"type": "closed", "thread_id": str(t.uuid)}) except Exception as e: # noqa: BLE001 await self.send_json({"error": "close_failed", "detail": str(e)}) return await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True}) async def thread_message(self, event): await self.send_json( { "type": "thread.message", **{k: v for k, v in event.items() if k != "type"}, } ) async def thread_closed(self, event): await self.send_json( {"type": "thread.closed", **{k: v for k, v in event.items() if k != "type"}} ) # TODO: Add functionality so non-staff users may audio call staff-user. The call must fall into the queue where # staff users may take advantage of answering the call. Many non-staff users may call simultaneously, that's why we need the queue