From 0464c1b11b085495e5c34faf168238b268f92a74 Mon Sep 17 00:00:00 2001 From: Egor fureunoir Gorbunov Date: Wed, 12 Nov 2025 11:23:44 +0300 Subject: [PATCH] Features: 1) Introduced Telegram forwarder with bot functionality for message forwarding and user support; 2) Added new commands (`/start`, `/unlink`, `/help`) for Telegram bot; 3) Enabled webhook integration and message linking via Telegram. Fixes: 1) Replaced legacy `TELEGRAM_API_TOKEN` configuration with `TELEGRAM_TOKEN`; 2) Incorporated anti-spam checks for user messages to prevent abuse. Extra: Refactored websocket consumers by integrating Telegram support and enhancing thread-assignment workflows; improved logging and API consistency; minor cleanup and deprecations. --- engine/vibes_auth/messaging/consumers.py | 247 +++++++++++++++++- .../messaging/forwarders/__init__.py | 0 .../messaging/forwarders/telegram.py | 198 ++++++++++++++ engine/vibes_auth/messaging/services.py | 9 +- evibes/settings/constance.py | 2 - evibes/settings/extensions.py | 2 + 6 files changed, 445 insertions(+), 13 deletions(-) create mode 100644 engine/vibes_auth/messaging/forwarders/__init__.py create mode 100644 engine/vibes_auth/messaging/forwarders/telegram.py diff --git a/engine/vibes_auth/messaging/consumers.py b/engine/vibes_auth/messaging/consumers.py index 8d4c4488..9021ad32 100644 --- a/engine/vibes_auth/messaging/consumers.py +++ b/engine/vibes_auth/messaging/consumers.py @@ -2,16 +2,77 @@ from __future__ import annotations from typing import Any +from asgiref.sync import sync_to_async from channels.generic.websocket import AsyncJsonWebsocketConsumer from drf_spectacular_websocket.decorators import extend_ws_schema +from django.conf import settings +from django.core.cache import cache +from django.utils import timezone from engine.vibes_auth.docs.drf.messaging import ( STAFF_INBOX_CONSUMER_SCHEMA, THREAD_CONSUMER_SCHEMA, USER_MESSAGE_CONSUMER_SCHEMA, ) +from engine.vibes_auth.messaging.services import ( + STAFF_INBOX_GROUP, + THREAD_GROUP_PREFIX, + claim_thread, + close_thread, + get_or_create_user_thread, + send_message, +) +from engine.vibes_auth.models import ChatThread, User +from engine.vibes_auth.choices import SenderType, ThreadStatus MAX_MESSAGE_LENGTH = 1028 +USER_SUPPORT_GROUP_NAME = "User Support" +ANTISPAM_LIMIT_PER_MIN = getattr(settings, "MESSAGING_ANTISPAM_LIMIT_PER_MIN", 20) + + +def _get_ip(scope) -> str: + client = scope.get("client") + if client and isinstance(client, (list, tuple)) and client: + return str(client[0]) + return "0.0.0.0" + + +async def _is_user_support(user: Any) -> bool: + if not getattr(user, "is_authenticated", False) or not getattr(user, "is_staff", False): + return False + return await sync_to_async(user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists)() + + +async def _get_or_create_ip_thread(ip: str) -> ChatThread: + def _inner() -> ChatThread: + thread = ChatThread.objects.filter(attributes__ip=ip, status=ThreadStatus.OPEN).order_by("-modified").first() + if thread: + return thread + return ChatThread.objects.create(email="", attributes={"ip": ip}) + + return await sync_to_async(_inner)() + + +async def _get_or_create_active_thread_for(user: User | None, ip: str) -> ChatThread: + if user and getattr(user, "is_authenticated", False) and not getattr(user, "is_staff", False): + + def _inner_user() -> ChatThread: + t = ChatThread.objects.filter(user=user, status=ThreadStatus.OPEN).order_by("-modified").first() + if t: + return t + return get_or_create_user_thread(user) + + return await sync_to_async(_inner_user)() + return await _get_or_create_ip_thread(ip) + + +async def _antispam_check(ip: str) -> bool: + key = f"msg_rate:{ip}:{timezone.now().strftime('%Y%m%d%H%M')}" + cnt = cache.get(key, 0) + if cnt >= ANTISPAM_LIMIT_PER_MIN: + return False + cache.set(key, cnt + 1, timeout=70) + return True class UserMessageConsumer(AsyncJsonWebsocketConsumer): @@ -23,25 +84,139 @@ class UserMessageConsumer(AsyncJsonWebsocketConsumer): action = content.get("action") if action == "ping": await self.send_json({"type": "pong"}) - else: - text = content.get("text", "") - if isinstance(text, str) and len(text) <= MAX_MESSAGE_LENGTH: - await self.send_json({"echo": text}) - else: - await self.send_json({"error": "invalid_payload"}) + return + + text = content.get("text", "") + if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH): + await self.send_json({"error": "invalid_payload"}) + return + + ip = _get_ip(self.scope) + if not await _antispam_check(ip): + await self.send_json({"error": "rate_limited"}) + return + + user: User | None = self.scope.get("user") + thread = await _get_or_create_active_thread_for(user if user and user.is_authenticated else None, ip) + + msg = await sync_to_async(send_message)( + thread, + sender_user=user if user and user.is_authenticated and not user.is_staff else None, + sender_type=SenderType.USER, + text=text, + ) + + await self.send_json( + { + "ok": True, + "thread_id": str(thread.uuid), + "message_id": str(msg.uuid), + } + ) class StaffInboxConsumer(AsyncJsonWebsocketConsumer): async def connect(self) -> None: user = self.scope.get("user") - if not getattr(user, "is_staff", False): + if not await _is_user_support(user): await self.close(code=4403) return + await self.channel_layer.group_add(STAFF_INBOX_GROUP, self.channel_name) await self.accept() + async def disconnect(self, code: int) -> None: + await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name) + @extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA) async def receive_json(self, content: dict[str, Any], **kwargs) -> None: - await self.send_json({"ok": True}) + action = content.get("action") + user: User = self.scope.get("user") + + if action == "ping": + await self.send_json({"type": "pong"}) + return + + if action == "list_open": + + def _list(): + qs = ( + ChatThread.objects.filter(status=ThreadStatus.OPEN) + .values("uuid", "user_id", "email", "assigned_to_id", "last_message_at") + .order_by("-modified") + ) + return list(qs) + + data = await sync_to_async(_list)() + await self.send_json({"type": "inbox.list", "threads": data}) + return + + if action == "assign": + thread_id = content.get("thread_id") + if not thread_id: + await self.send_json({"error": "thread_id_required"}) + return + + def _assign(): + thread = ChatThread.objects.get(uuid=thread_id) + return claim_thread(thread, user) + + try: + t = await sync_to_async(_assign)() + await self.send_json({"type": "assigned", "thread_id": str(t.uuid), "user": str(user.uuid)}) + except Exception as e: # noqa: BLE001 + await self.send_json({"error": "assign_failed", "detail": str(e)}) + return + + if action == "reply": + thread_id = content.get("thread_id") + text = content.get("text", "") + if not thread_id or not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH): + await self.send_json({"error": "invalid_payload"}) + return + + def _can_reply_and_send(): + thread = ChatThread.objects.get(uuid=thread_id) + if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser: + raise PermissionError("not_assigned") + return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text) + + try: + msg = await sync_to_async(_can_reply_and_send)() + await self.send_json({"type": "replied", "message_id": str(msg.uuid)}) + except Exception as e: # noqa: BLE001 + await self.send_json({"error": "reply_failed", "detail": str(e)}) + return + + if action == "close": + thread_id = content.get("thread_id") + if not thread_id: + await self.send_json({"error": "thread_id_required"}) + return + + def _close(): + thread = ChatThread.objects.get(uuid=thread_id) + return close_thread(thread, user) + + try: + t = await sync_to_async(_close)() + await self.send_json({"type": "closed", "thread_id": str(t.uuid)}) + except Exception as e: # noqa: BLE001 + await self.send_json({"error": "close_failed", "detail": str(e)}) + return + + await self.send_json({"error": "unknown_action"}) + + async def staff_thread_created(self, event): + await self.send_json({"type": "staff.thread.created", **{k: v for k, v in event.items() if k != "type"}}) + + async def staff_thread_assigned(self, event): + await self.send_json({"type": "staff.thread.assigned", **{k: v for k, v in event.items() if k != "type"}}) + + async def staff_thread_reassigned(self, event): + await self.send_json({"type": "staff.thread.reassigned", **{k: v for k, v in event.items() if k != "type"}}) + + async def staff_thread_closed(self, event): + await self.send_json({"type": "staff.thread.closed", **{k: v for k, v in event.items() if k != "type"}}) class ThreadConsumer(AsyncJsonWebsocketConsumer): @@ -49,12 +224,66 @@ class ThreadConsumer(AsyncJsonWebsocketConsumer): async def connect(self) -> None: user = self.scope.get("user") - if not getattr(user, "is_staff", False): + if not await _is_user_support(user): await self.close(code=4403) return self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id") + await self.channel_layer.group_add(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name) await self.accept() + async def disconnect(self, code: int) -> None: + if self.thread_id: + await self.channel_layer.group_discard(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name) + @extend_ws_schema(**THREAD_CONSUMER_SCHEMA) async def receive_json(self, content: dict[str, Any], **kwargs) -> None: + action = content.get("action") + user: User = self.scope.get("user") + + if action == "ping": + await self.send_json({"type": "pong", "thread": getattr(self, "thread_id", None)}) + return + + if action == "reply": + text = content.get("text", "") + if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH): + await self.send_json({"error": "invalid_payload"}) + return + + def _reply(): + thread = ChatThread.objects.get(uuid=self.thread_id) + if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser: + raise PermissionError("not_assigned") + return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text) + + try: + msg = await sync_to_async(_reply)() + await self.send_json({"type": "replied", "message_id": str(msg.uuid)}) + except Exception as e: # noqa: BLE001 + await self.send_json({"error": "reply_failed", "detail": str(e)}) + return + + if action == "close": + + def _close(): + thread = ChatThread.objects.get(uuid=self.thread_id) + return close_thread(thread, user) + + try: + t = await sync_to_async(_close)() + await self.send_json({"type": "closed", "thread_id": str(t.uuid)}) + except Exception as e: # noqa: BLE001 + await self.send_json({"error": "close_failed", "detail": str(e)}) + return + await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True}) + + async def thread_message(self, event): + await self.send_json({"type": "thread.message", **{k: v for k, v in event.items() if k != "type"}}) + + async def thread_closed(self, event): + await self.send_json({"type": "thread.closed", **{k: v for k, v in event.items() if k != "type"}}) + + +# TODO: Add functionality so non-staff users may audio call staff-user. The call must fall into the queue where +# staff users may take advantage of answering the call. Many non-staff users may call simultaneously, that's why we need the queue diff --git a/engine/vibes_auth/messaging/forwarders/__init__.py b/engine/vibes_auth/messaging/forwarders/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/engine/vibes_auth/messaging/forwarders/telegram.py b/engine/vibes_auth/messaging/forwarders/telegram.py new file mode 100644 index 00000000..a24d26e2 --- /dev/null +++ b/engine/vibes_auth/messaging/forwarders/telegram.py @@ -0,0 +1,198 @@ +from __future__ import annotations + +import asyncio +import logging +from contextlib import suppress +from typing import Optional + +from aiogram import Bot, Dispatcher, Router, types +from aiogram.enums import ParseMode +from aiogram.filters import Command +from aiogram.webhook.aiohttp_server import SimpleRequestHandler +from django.conf import settings +from django.contrib.auth import get_user_model +from django.db.models import Q + +from engine.vibes_auth.choices import SenderType +from engine.vibes_auth.messaging.services import send_message as svc_send_message +from engine.vibes_auth.models import ChatThread + +logger = logging.getLogger(__name__) + +USER_SUPPORT_GROUP_NAME = "User Support" + + +def is_telegram_enabled() -> bool: + return bool(settings.TELEGRAM_TOKEN) + + +def _get_bot() -> Optional["Bot"]: + if not is_telegram_enabled(): + logger.warning("Telegram forwarder disabled: missing aiogram or TELEGRAM_TOKEN") + return None + return Bot(token=settings.TELEGRAM_TOKEN, parse_mode=ParseMode.HTML) # type: ignore[arg-type] + + +def build_router() -> Optional["Router"]: + if not is_telegram_enabled(): + return None + + User = get_user_model() + router: Router = Router() + + @router.message(Command("start")) + async def cmd_start(message: types.Message): # type: ignore[valid-type] + parts = (message.text or "").split(maxsplit=1) + if len(parts) < 2: + await message.answer( + "Welcome! To link your account, send: /start ACTIVATION_TOKEN\n" + "You can find your activation token in your profile." + ) + return + token = parts[1].strip() + + def _link(): + try: + retrieved_user = User.objects.get(activation_token=token) + except User.DoesNotExist: # type: ignore[attr-defined] + return None + attrs = dict(retrieved_user.attributes or {}) + attrs["telegram_id"] = message.from_user.id if message.from_user else None # type: ignore[union-attr] + retrieved_user.attributes = attrs + retrieved_user.save(update_fields=["attributes", "modified"]) # type: ignore[attr-defined] + return retrieved_user + + user = await asyncio.to_thread(_link) + if not user: + await message.answer("Invalid activation token.") + return + await message.answer("Your Telegram account has been linked successfully.") + + @router.message(Command("unlink")) + async def cmd_unlink(message: types.Message): # type: ignore[valid-type] + tid = message.from_user.id if message.from_user else None # type: ignore[union-attr] + if not tid: + await message.answer("Cannot unlink: no Telegram user id.") + return + + def _unlink(): + q = Q(attributes__telegram_id=tid) + updated_query = User.objects.filter(q).update(attributes={}) + return updated_query + + updated = await asyncio.to_thread(_unlink) + if updated: + await message.answer("Unlinked successfully.") + else: + await message.answer("No linked account found.") + + @router.message(Command("help")) + async def cmd_help(message: types.Message): # type: ignore[valid-type] + await message.answer( + "Commands:\n" + "/start — link your account\n" + "/unlink — unlink your account\n" + "As staff, you may use: reply THREAD_UUID your message to respond." + ) + + @router.message() + async def any_message(message: types.Message): # type: ignore[valid-type] + if not message.from_user or not message.text: + return + tid = message.from_user.id + + def _resolve_staff_and_command(): + try: + staff_user = User.objects.get(attributes__telegram_id=tid, is_staff=True, is_active=True) + except User.DoesNotExist: # type: ignore[attr-defined] + return None, None, None + # group check + if not staff_user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists(): + return None, None, None + text = message.text.strip() + if text.lower().startswith("reply "): + parts = text.split(maxsplit=2) + if len(parts) < 3: + return staff_user, None, "Usage: reply " + thread_id, message_body = parts[1], parts[2] + try: + thread = ChatThread.objects.get(uuid=thread_id) + except ChatThread.DoesNotExist: + return staff_user, None, "Thread not found." + return staff_user, (thread, message_body), None + return staff_user, None, "Unknown command. Send /help" + + staff, payload, error = await asyncio.to_thread(_resolve_staff_and_command) + if not staff: + return + if error: + await message.answer(error) + return + if payload: + t, body = payload + + def _send(): + return svc_send_message(t, sender_user=staff, sender_type=SenderType.STAFF, text=body) + + await asyncio.to_thread(_send) + await message.answer("Sent.") + + return router + + +async def setup_webhook(webhook_base_url: str) -> None: + bot = _get_bot() + if not bot: + return + url = webhook_base_url.rstrip("/") + "/telegram/webhook/" + settings.TELEGRAM_TOKEN + with suppress(Exception): + await bot.delete_webhook(drop_pending_updates=True) + await bot.set_webhook(url=url, drop_pending_updates=True) + logger.info("Telegram webhook set to %s", url) + + +async def forward_thread_message_to_assigned_staff(thread_uuid: str, text: str) -> None: + """Forward a thread message to assigned staff via Telegram if possible. + + This is a best-effort; failures are logged and never raised. + """ + if not is_telegram_enabled(): + return + + def _resolve_chat_and_chat_id() -> tuple[Optional[int], Optional[str]]: + try: + t = ChatThread.objects.select_related("assigned_to").get(uuid=thread_uuid) + except ChatThread.DoesNotExist: + return None, None + staff = t.assigned_to + if not staff: + return None, None + attrs = staff.attributes or {} + chat_telegram_id = attrs.get("telegram_id") if isinstance(attrs, dict) else None + return int(chat_telegram_id) if chat_telegram_id else None, str(t.uuid) + + chat_id, _tid = await asyncio.to_thread(_resolve_chat_and_chat_id) + if not chat_id: + return + bot = _get_bot() + if not bot: + return + try: + await bot.send_message(chat_id=chat_id, text=text) + except Exception as exc: # noqa: BLE001 + logger.warning("Failed to forward Telegram message for thread %s: %s", _tid, exc) + + +def install_aiohttp_webhook(app) -> None: # pragma: no cover - integration helper + if not is_telegram_enabled(): + logger.warning("Telegram forwarder not installed: disabled") + return + dp = Dispatcher() # type: ignore[call-arg] + router = build_router() + if router: + dp.include_router(router) + bot = _get_bot() + if not bot: + return + SimpleRequestHandler(dispatcher=dp, bot=bot).register(app, path="/telegram/webhook/" + settings.TELEGRAM_TOKEN) # type: ignore[arg-type] + logger.info("Telegram webhook handler installed on aiohttp app.") diff --git a/engine/vibes_auth/messaging/services.py b/engine/vibes_auth/messaging/services.py index e4926448..18a9dfc4 100644 --- a/engine/vibes_auth/messaging/services.py +++ b/engine/vibes_auth/messaging/services.py @@ -11,8 +11,11 @@ from django.utils import timezone from django.utils.translation import gettext_lazy as _ from engine.vibes_auth.choices import SenderType, ThreadStatus -from engine.vibes_auth.models import ChatMessage, ChatThread -from engine.vibes_auth.models import User +from engine.vibes_auth.messaging.forwarders.telegram import ( + forward_thread_message_to_assigned_staff, + is_telegram_enabled, +) +from engine.vibes_auth.models import ChatMessage, ChatThread, User THREAD_GROUP_PREFIX = "thread:" STAFF_INBOX_GROUP = "staff:inbox" @@ -82,6 +85,8 @@ def send_message(thread: ChatThread, *, sender_user: User | None, sender_type: S }, ) if sender_type != SenderType.STAFF: + if is_telegram_enabled(): + async_to_sync(forward_thread_message_to_assigned_staff)(str(thread.uuid), text) auto_reply(thread) return msg diff --git a/evibes/settings/constance.py b/evibes/settings/constance.py index 2c52f41b..5181f7ea 100644 --- a/evibes/settings/constance.py +++ b/evibes/settings/constance.py @@ -35,7 +35,6 @@ CONSTANCE_CONFIG = OrderedDict( ("EMAIL_HOST_PASSWORD", (getenv("EMAIL_HOST_PASSWORD", "SUPERsecretPASSWORD"), _("SMTP password"))), ("EMAIL_FROM", (getenv("EMAIL_FROM", "eVibes"), _("Mail from option"))), ### Features Options ### - ("TELEGRAM_API_TOKEN", ("", _("Use Telegram-bot functionality"))), ("DAYS_TO_STORE_ANON_MSGS", (1, _("How many days we store messages from anonymous users"))), ("DAYS_TO_STORE_AUTH_MSGS", (365, _("How many days we store messages from authenticated users"))), ("DISABLED_COMMERCE", (getenv("DISABLED_COMMERCE", False), _("Disable buy functionality"))), @@ -71,7 +70,6 @@ CONSTANCE_CONFIG_FIELDSETS = OrderedDict( "EMAIL_FROM", ), gettext_noop("Features Options"): ( - "TELEGRAM_API_TOKEN", "DAYS_TO_STORE_ANON_MSGS", "DAYS_TO_STORE_AUTH_MSGS", "DISABLED_COMMERCE", diff --git a/evibes/settings/extensions.py b/evibes/settings/extensions.py index eed9c571..c3f74482 100644 --- a/evibes/settings/extensions.py +++ b/evibes/settings/extensions.py @@ -10,3 +10,5 @@ EXTENSIONS_MAX_UNIQUE_QUERY_ATTEMPTS = 500 HEALTHCHECK_CELERY_RESULT_TIMEOUT = 5 HEALTHCHECK_CELERY_QUEUE_TIMEOUT = 5 + +TELEGRAM_TOKEN = getenv("TELEGRAM_TOKEN", "") # noqa: F405