from __future__ import annotations from typing import Any from channels.generic.websocket import AsyncJsonWebsocketConsumer MAX_MESSAGE_LENGTH = 1028 class UserMessageConsumer(AsyncJsonWebsocketConsumer): async def connect(self) -> None: # noqa: D401 await self.accept() async def receive_json(self, content: dict[str, Any], **kwargs) -> None: action = content.get("action") if action == "ping": await self.send_json({"type": "pong"}) else: text = content.get("text", "") if isinstance(text, str) and len(text) <= MAX_MESSAGE_LENGTH: await self.send_json({"echo": text}) else: await self.send_json({"error": "invalid_payload"}) class StaffInboxConsumer(AsyncJsonWebsocketConsumer): async def connect(self) -> None: user = self.scope.get("user") if not getattr(user, "is_staff", False): await self.close(code=4403) return await self.accept() async def receive_json(self, content: dict[str, Any], **kwargs) -> None: await self.send_json({"ok": True}) class ThreadConsumer(AsyncJsonWebsocketConsumer): thread_id: Any = None async def connect(self) -> None: user = self.scope.get("user") if not getattr(user, "is_staff", False): await self.close(code=4403) return self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id") await self.accept() async def receive_json(self, content: dict[str, Any], **kwargs) -> None: await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True})