Features: 1) Introduced Telegram forwarder with bot functionality for message forwarding and user support; 2) Added new commands (/start, /unlink, /help) for Telegram bot; 3) Enabled webhook integration and message linking via Telegram.

Fixes: 1) Replaced legacy `TELEGRAM_API_TOKEN` configuration with `TELEGRAM_TOKEN`; 2) Incorporated anti-spam checks for user messages to prevent abuse.

Extra: Refactored websocket consumers by integrating Telegram support and enhancing thread-assignment workflows; improved logging and API consistency; minor cleanup and deprecations.
This commit is contained in:
Egor Pavlovich Gorbunov 2025-11-12 11:23:44 +03:00
parent 73162635be
commit 0464c1b11b
6 changed files with 445 additions and 13 deletions

View file

@ -2,16 +2,77 @@ from __future__ import annotations
from typing import Any from typing import Any
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer from channels.generic.websocket import AsyncJsonWebsocketConsumer
from drf_spectacular_websocket.decorators import extend_ws_schema from drf_spectacular_websocket.decorators import extend_ws_schema
from django.conf import settings
from django.core.cache import cache
from django.utils import timezone
from engine.vibes_auth.docs.drf.messaging import ( from engine.vibes_auth.docs.drf.messaging import (
STAFF_INBOX_CONSUMER_SCHEMA, STAFF_INBOX_CONSUMER_SCHEMA,
THREAD_CONSUMER_SCHEMA, THREAD_CONSUMER_SCHEMA,
USER_MESSAGE_CONSUMER_SCHEMA, USER_MESSAGE_CONSUMER_SCHEMA,
) )
from engine.vibes_auth.messaging.services import (
STAFF_INBOX_GROUP,
THREAD_GROUP_PREFIX,
claim_thread,
close_thread,
get_or_create_user_thread,
send_message,
)
from engine.vibes_auth.models import ChatThread, User
from engine.vibes_auth.choices import SenderType, ThreadStatus
MAX_MESSAGE_LENGTH = 1028 MAX_MESSAGE_LENGTH = 1028
USER_SUPPORT_GROUP_NAME = "User Support"
ANTISPAM_LIMIT_PER_MIN = getattr(settings, "MESSAGING_ANTISPAM_LIMIT_PER_MIN", 20)
def _get_ip(scope) -> str:
client = scope.get("client")
if client and isinstance(client, (list, tuple)) and client:
return str(client[0])
return "0.0.0.0"
async def _is_user_support(user: Any) -> bool:
if not getattr(user, "is_authenticated", False) or not getattr(user, "is_staff", False):
return False
return await sync_to_async(user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists)()
async def _get_or_create_ip_thread(ip: str) -> ChatThread:
def _inner() -> ChatThread:
thread = ChatThread.objects.filter(attributes__ip=ip, status=ThreadStatus.OPEN).order_by("-modified").first()
if thread:
return thread
return ChatThread.objects.create(email="", attributes={"ip": ip})
return await sync_to_async(_inner)()
async def _get_or_create_active_thread_for(user: User | None, ip: str) -> ChatThread:
if user and getattr(user, "is_authenticated", False) and not getattr(user, "is_staff", False):
def _inner_user() -> ChatThread:
t = ChatThread.objects.filter(user=user, status=ThreadStatus.OPEN).order_by("-modified").first()
if t:
return t
return get_or_create_user_thread(user)
return await sync_to_async(_inner_user)()
return await _get_or_create_ip_thread(ip)
async def _antispam_check(ip: str) -> bool:
key = f"msg_rate:{ip}:{timezone.now().strftime('%Y%m%d%H%M')}"
cnt = cache.get(key, 0)
if cnt >= ANTISPAM_LIMIT_PER_MIN:
return False
cache.set(key, cnt + 1, timeout=70)
return True
class UserMessageConsumer(AsyncJsonWebsocketConsumer): class UserMessageConsumer(AsyncJsonWebsocketConsumer):
@ -23,25 +84,139 @@ class UserMessageConsumer(AsyncJsonWebsocketConsumer):
action = content.get("action") action = content.get("action")
if action == "ping": if action == "ping":
await self.send_json({"type": "pong"}) await self.send_json({"type": "pong"})
else: return
text = content.get("text", "") text = content.get("text", "")
if isinstance(text, str) and len(text) <= MAX_MESSAGE_LENGTH: if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"echo": text})
else:
await self.send_json({"error": "invalid_payload"}) await self.send_json({"error": "invalid_payload"})
return
ip = _get_ip(self.scope)
if not await _antispam_check(ip):
await self.send_json({"error": "rate_limited"})
return
user: User | None = self.scope.get("user")
thread = await _get_or_create_active_thread_for(user if user and user.is_authenticated else None, ip)
msg = await sync_to_async(send_message)(
thread,
sender_user=user if user and user.is_authenticated and not user.is_staff else None,
sender_type=SenderType.USER,
text=text,
)
await self.send_json(
{
"ok": True,
"thread_id": str(thread.uuid),
"message_id": str(msg.uuid),
}
)
class StaffInboxConsumer(AsyncJsonWebsocketConsumer): class StaffInboxConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None: async def connect(self) -> None:
user = self.scope.get("user") user = self.scope.get("user")
if not getattr(user, "is_staff", False): if not await _is_user_support(user):
await self.close(code=4403) await self.close(code=4403)
return return
await self.channel_layer.group_add(STAFF_INBOX_GROUP, self.channel_name)
await self.accept() await self.accept()
async def disconnect(self, code: int) -> None:
await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name)
@extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA) @extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None: async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
await self.send_json({"ok": True}) action = content.get("action")
user: User = self.scope.get("user")
if action == "ping":
await self.send_json({"type": "pong"})
return
if action == "list_open":
def _list():
qs = (
ChatThread.objects.filter(status=ThreadStatus.OPEN)
.values("uuid", "user_id", "email", "assigned_to_id", "last_message_at")
.order_by("-modified")
)
return list(qs)
data = await sync_to_async(_list)()
await self.send_json({"type": "inbox.list", "threads": data})
return
if action == "assign":
thread_id = content.get("thread_id")
if not thread_id:
await self.send_json({"error": "thread_id_required"})
return
def _assign():
thread = ChatThread.objects.get(uuid=thread_id)
return claim_thread(thread, user)
try:
t = await sync_to_async(_assign)()
await self.send_json({"type": "assigned", "thread_id": str(t.uuid), "user": str(user.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "assign_failed", "detail": str(e)})
return
if action == "reply":
thread_id = content.get("thread_id")
text = content.get("text", "")
if not thread_id or not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"error": "invalid_payload"})
return
def _can_reply_and_send():
thread = ChatThread.objects.get(uuid=thread_id)
if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
raise PermissionError("not_assigned")
return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
try:
msg = await sync_to_async(_can_reply_and_send)()
await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "reply_failed", "detail": str(e)})
return
if action == "close":
thread_id = content.get("thread_id")
if not thread_id:
await self.send_json({"error": "thread_id_required"})
return
def _close():
thread = ChatThread.objects.get(uuid=thread_id)
return close_thread(thread, user)
try:
t = await sync_to_async(_close)()
await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "close_failed", "detail": str(e)})
return
await self.send_json({"error": "unknown_action"})
async def staff_thread_created(self, event):
await self.send_json({"type": "staff.thread.created", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_assigned(self, event):
await self.send_json({"type": "staff.thread.assigned", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_reassigned(self, event):
await self.send_json({"type": "staff.thread.reassigned", **{k: v for k, v in event.items() if k != "type"}})
async def staff_thread_closed(self, event):
await self.send_json({"type": "staff.thread.closed", **{k: v for k, v in event.items() if k != "type"}})
class ThreadConsumer(AsyncJsonWebsocketConsumer): class ThreadConsumer(AsyncJsonWebsocketConsumer):
@ -49,12 +224,66 @@ class ThreadConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None: async def connect(self) -> None:
user = self.scope.get("user") user = self.scope.get("user")
if not getattr(user, "is_staff", False): if not await _is_user_support(user):
await self.close(code=4403) await self.close(code=4403)
return return
self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id") self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id")
await self.channel_layer.group_add(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
await self.accept() await self.accept()
async def disconnect(self, code: int) -> None:
if self.thread_id:
await self.channel_layer.group_discard(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
@extend_ws_schema(**THREAD_CONSUMER_SCHEMA) @extend_ws_schema(**THREAD_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None: async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action")
user: User = self.scope.get("user")
if action == "ping":
await self.send_json({"type": "pong", "thread": getattr(self, "thread_id", None)})
return
if action == "reply":
text = content.get("text", "")
if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
await self.send_json({"error": "invalid_payload"})
return
def _reply():
thread = ChatThread.objects.get(uuid=self.thread_id)
if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
raise PermissionError("not_assigned")
return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
try:
msg = await sync_to_async(_reply)()
await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "reply_failed", "detail": str(e)})
return
if action == "close":
def _close():
thread = ChatThread.objects.get(uuid=self.thread_id)
return close_thread(thread, user)
try:
t = await sync_to_async(_close)()
await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
except Exception as e: # noqa: BLE001
await self.send_json({"error": "close_failed", "detail": str(e)})
return
await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True}) await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True})
async def thread_message(self, event):
await self.send_json({"type": "thread.message", **{k: v for k, v in event.items() if k != "type"}})
async def thread_closed(self, event):
await self.send_json({"type": "thread.closed", **{k: v for k, v in event.items() if k != "type"}})
# TODO: Add functionality so non-staff users may audio call staff-user. The call must fall into the queue where
# staff users may take advantage of answering the call. Many non-staff users may call simultaneously, that's why we need the queue

View file

@ -0,0 +1,198 @@
from __future__ import annotations
import asyncio
import logging
from contextlib import suppress
from typing import Optional
from aiogram import Bot, Dispatcher, Router, types
from aiogram.enums import ParseMode
from aiogram.filters import Command
from aiogram.webhook.aiohttp_server import SimpleRequestHandler
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db.models import Q
from engine.vibes_auth.choices import SenderType
from engine.vibes_auth.messaging.services import send_message as svc_send_message
from engine.vibes_auth.models import ChatThread
logger = logging.getLogger(__name__)
USER_SUPPORT_GROUP_NAME = "User Support"
def is_telegram_enabled() -> bool:
return bool(settings.TELEGRAM_TOKEN)
def _get_bot() -> Optional["Bot"]:
if not is_telegram_enabled():
logger.warning("Telegram forwarder disabled: missing aiogram or TELEGRAM_TOKEN")
return None
return Bot(token=settings.TELEGRAM_TOKEN, parse_mode=ParseMode.HTML) # type: ignore[arg-type]
def build_router() -> Optional["Router"]:
if not is_telegram_enabled():
return None
User = get_user_model()
router: Router = Router()
@router.message(Command("start"))
async def cmd_start(message: types.Message): # type: ignore[valid-type]
parts = (message.text or "").split(maxsplit=1)
if len(parts) < 2:
await message.answer(
"Welcome! To link your account, send: <code>/start ACTIVATION_TOKEN</code>\n"
"You can find your activation token in your profile."
)
return
token = parts[1].strip()
def _link():
try:
retrieved_user = User.objects.get(activation_token=token)
except User.DoesNotExist: # type: ignore[attr-defined]
return None
attrs = dict(retrieved_user.attributes or {})
attrs["telegram_id"] = message.from_user.id if message.from_user else None # type: ignore[union-attr]
retrieved_user.attributes = attrs
retrieved_user.save(update_fields=["attributes", "modified"]) # type: ignore[attr-defined]
return retrieved_user
user = await asyncio.to_thread(_link)
if not user:
await message.answer("Invalid activation token.")
return
await message.answer("Your Telegram account has been linked successfully.")
@router.message(Command("unlink"))
async def cmd_unlink(message: types.Message): # type: ignore[valid-type]
tid = message.from_user.id if message.from_user else None # type: ignore[union-attr]
if not tid:
await message.answer("Cannot unlink: no Telegram user id.")
return
def _unlink():
q = Q(attributes__telegram_id=tid)
updated_query = User.objects.filter(q).update(attributes={})
return updated_query
updated = await asyncio.to_thread(_unlink)
if updated:
await message.answer("Unlinked successfully.")
else:
await message.answer("No linked account found.")
@router.message(Command("help"))
async def cmd_help(message: types.Message): # type: ignore[valid-type]
await message.answer(
"Commands:\n"
"/start <token> — link your account\n"
"/unlink — unlink your account\n"
"As staff, you may use: <code>reply THREAD_UUID your message</code> to respond."
)
@router.message()
async def any_message(message: types.Message): # type: ignore[valid-type]
if not message.from_user or not message.text:
return
tid = message.from_user.id
def _resolve_staff_and_command():
try:
staff_user = User.objects.get(attributes__telegram_id=tid, is_staff=True, is_active=True)
except User.DoesNotExist: # type: ignore[attr-defined]
return None, None, None
# group check
if not staff_user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists():
return None, None, None
text = message.text.strip()
if text.lower().startswith("reply "):
parts = text.split(maxsplit=2)
if len(parts) < 3:
return staff_user, None, "Usage: reply <THREAD_UUID> <message>"
thread_id, message_body = parts[1], parts[2]
try:
thread = ChatThread.objects.get(uuid=thread_id)
except ChatThread.DoesNotExist:
return staff_user, None, "Thread not found."
return staff_user, (thread, message_body), None
return staff_user, None, "Unknown command. Send /help"
staff, payload, error = await asyncio.to_thread(_resolve_staff_and_command)
if not staff:
return
if error:
await message.answer(error)
return
if payload:
t, body = payload
def _send():
return svc_send_message(t, sender_user=staff, sender_type=SenderType.STAFF, text=body)
await asyncio.to_thread(_send)
await message.answer("Sent.")
return router
async def setup_webhook(webhook_base_url: str) -> None:
bot = _get_bot()
if not bot:
return
url = webhook_base_url.rstrip("/") + "/telegram/webhook/" + settings.TELEGRAM_TOKEN
with suppress(Exception):
await bot.delete_webhook(drop_pending_updates=True)
await bot.set_webhook(url=url, drop_pending_updates=True)
logger.info("Telegram webhook set to %s", url)
async def forward_thread_message_to_assigned_staff(thread_uuid: str, text: str) -> None:
"""Forward a thread message to assigned staff via Telegram if possible.
This is a best-effort; failures are logged and never raised.
"""
if not is_telegram_enabled():
return
def _resolve_chat_and_chat_id() -> tuple[Optional[int], Optional[str]]:
try:
t = ChatThread.objects.select_related("assigned_to").get(uuid=thread_uuid)
except ChatThread.DoesNotExist:
return None, None
staff = t.assigned_to
if not staff:
return None, None
attrs = staff.attributes or {}
chat_telegram_id = attrs.get("telegram_id") if isinstance(attrs, dict) else None
return int(chat_telegram_id) if chat_telegram_id else None, str(t.uuid)
chat_id, _tid = await asyncio.to_thread(_resolve_chat_and_chat_id)
if not chat_id:
return
bot = _get_bot()
if not bot:
return
try:
await bot.send_message(chat_id=chat_id, text=text)
except Exception as exc: # noqa: BLE001
logger.warning("Failed to forward Telegram message for thread %s: %s", _tid, exc)
def install_aiohttp_webhook(app) -> None: # pragma: no cover - integration helper
if not is_telegram_enabled():
logger.warning("Telegram forwarder not installed: disabled")
return
dp = Dispatcher() # type: ignore[call-arg]
router = build_router()
if router:
dp.include_router(router)
bot = _get_bot()
if not bot:
return
SimpleRequestHandler(dispatcher=dp, bot=bot).register(app, path="/telegram/webhook/" + settings.TELEGRAM_TOKEN) # type: ignore[arg-type]
logger.info("Telegram webhook handler installed on aiohttp app.")

View file

@ -11,8 +11,11 @@ from django.utils import timezone
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from engine.vibes_auth.choices import SenderType, ThreadStatus from engine.vibes_auth.choices import SenderType, ThreadStatus
from engine.vibes_auth.models import ChatMessage, ChatThread from engine.vibes_auth.messaging.forwarders.telegram import (
from engine.vibes_auth.models import User forward_thread_message_to_assigned_staff,
is_telegram_enabled,
)
from engine.vibes_auth.models import ChatMessage, ChatThread, User
THREAD_GROUP_PREFIX = "thread:" THREAD_GROUP_PREFIX = "thread:"
STAFF_INBOX_GROUP = "staff:inbox" STAFF_INBOX_GROUP = "staff:inbox"
@ -82,6 +85,8 @@ def send_message(thread: ChatThread, *, sender_user: User | None, sender_type: S
}, },
) )
if sender_type != SenderType.STAFF: if sender_type != SenderType.STAFF:
if is_telegram_enabled():
async_to_sync(forward_thread_message_to_assigned_staff)(str(thread.uuid), text)
auto_reply(thread) auto_reply(thread)
return msg return msg

View file

@ -35,7 +35,6 @@ CONSTANCE_CONFIG = OrderedDict(
("EMAIL_HOST_PASSWORD", (getenv("EMAIL_HOST_PASSWORD", "SUPERsecretPASSWORD"), _("SMTP password"))), ("EMAIL_HOST_PASSWORD", (getenv("EMAIL_HOST_PASSWORD", "SUPERsecretPASSWORD"), _("SMTP password"))),
("EMAIL_FROM", (getenv("EMAIL_FROM", "eVibes"), _("Mail from option"))), ("EMAIL_FROM", (getenv("EMAIL_FROM", "eVibes"), _("Mail from option"))),
### Features Options ### ### Features Options ###
("TELEGRAM_API_TOKEN", ("", _("Use Telegram-bot functionality"))),
("DAYS_TO_STORE_ANON_MSGS", (1, _("How many days we store messages from anonymous users"))), ("DAYS_TO_STORE_ANON_MSGS", (1, _("How many days we store messages from anonymous users"))),
("DAYS_TO_STORE_AUTH_MSGS", (365, _("How many days we store messages from authenticated users"))), ("DAYS_TO_STORE_AUTH_MSGS", (365, _("How many days we store messages from authenticated users"))),
("DISABLED_COMMERCE", (getenv("DISABLED_COMMERCE", False), _("Disable buy functionality"))), ("DISABLED_COMMERCE", (getenv("DISABLED_COMMERCE", False), _("Disable buy functionality"))),
@ -71,7 +70,6 @@ CONSTANCE_CONFIG_FIELDSETS = OrderedDict(
"EMAIL_FROM", "EMAIL_FROM",
), ),
gettext_noop("Features Options"): ( gettext_noop("Features Options"): (
"TELEGRAM_API_TOKEN",
"DAYS_TO_STORE_ANON_MSGS", "DAYS_TO_STORE_ANON_MSGS",
"DAYS_TO_STORE_AUTH_MSGS", "DAYS_TO_STORE_AUTH_MSGS",
"DISABLED_COMMERCE", "DISABLED_COMMERCE",

View file

@ -10,3 +10,5 @@ EXTENSIONS_MAX_UNIQUE_QUERY_ATTEMPTS = 500
HEALTHCHECK_CELERY_RESULT_TIMEOUT = 5 HEALTHCHECK_CELERY_RESULT_TIMEOUT = 5
HEALTHCHECK_CELERY_QUEUE_TIMEOUT = 5 HEALTHCHECK_CELERY_QUEUE_TIMEOUT = 5
TELEGRAM_TOKEN = getenv("TELEGRAM_TOKEN", "") # noqa: F405