diff --git a/engine/vibes_auth/messaging/consumers.py b/engine/vibes_auth/messaging/consumers.py
index 8d4c4488..9021ad32 100644
--- a/engine/vibes_auth/messaging/consumers.py
+++ b/engine/vibes_auth/messaging/consumers.py
@@ -2,16 +2,77 @@ from __future__ import annotations
from typing import Any
+from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from drf_spectacular_websocket.decorators import extend_ws_schema
+from django.conf import settings
+from django.core.cache import cache
+from django.utils import timezone
from engine.vibes_auth.docs.drf.messaging import (
STAFF_INBOX_CONSUMER_SCHEMA,
THREAD_CONSUMER_SCHEMA,
USER_MESSAGE_CONSUMER_SCHEMA,
)
+from engine.vibes_auth.messaging.services import (
+ STAFF_INBOX_GROUP,
+ THREAD_GROUP_PREFIX,
+ claim_thread,
+ close_thread,
+ get_or_create_user_thread,
+ send_message,
+)
+from engine.vibes_auth.models import ChatThread, User
+from engine.vibes_auth.choices import SenderType, ThreadStatus
MAX_MESSAGE_LENGTH = 1028
+USER_SUPPORT_GROUP_NAME = "User Support"
+ANTISPAM_LIMIT_PER_MIN = getattr(settings, "MESSAGING_ANTISPAM_LIMIT_PER_MIN", 20)
+
+
+def _get_ip(scope) -> str:
+ client = scope.get("client")
+ if client and isinstance(client, (list, tuple)) and client:
+ return str(client[0])
+ return "0.0.0.0"
+
+
+async def _is_user_support(user: Any) -> bool:
+ if not getattr(user, "is_authenticated", False) or not getattr(user, "is_staff", False):
+ return False
+ return await sync_to_async(user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists)()
+
+
+async def _get_or_create_ip_thread(ip: str) -> ChatThread:
+ def _inner() -> ChatThread:
+ thread = ChatThread.objects.filter(attributes__ip=ip, status=ThreadStatus.OPEN).order_by("-modified").first()
+ if thread:
+ return thread
+ return ChatThread.objects.create(email="", attributes={"ip": ip})
+
+ return await sync_to_async(_inner)()
+
+
+async def _get_or_create_active_thread_for(user: User | None, ip: str) -> ChatThread:
+ if user and getattr(user, "is_authenticated", False) and not getattr(user, "is_staff", False):
+
+ def _inner_user() -> ChatThread:
+ t = ChatThread.objects.filter(user=user, status=ThreadStatus.OPEN).order_by("-modified").first()
+ if t:
+ return t
+ return get_or_create_user_thread(user)
+
+ return await sync_to_async(_inner_user)()
+ return await _get_or_create_ip_thread(ip)
+
+
+async def _antispam_check(ip: str) -> bool:
+ key = f"msg_rate:{ip}:{timezone.now().strftime('%Y%m%d%H%M')}"
+ cnt = cache.get(key, 0)
+ if cnt >= ANTISPAM_LIMIT_PER_MIN:
+ return False
+ cache.set(key, cnt + 1, timeout=70)
+ return True
class UserMessageConsumer(AsyncJsonWebsocketConsumer):
@@ -23,25 +84,139 @@ class UserMessageConsumer(AsyncJsonWebsocketConsumer):
action = content.get("action")
if action == "ping":
await self.send_json({"type": "pong"})
- else:
- text = content.get("text", "")
- if isinstance(text, str) and len(text) <= MAX_MESSAGE_LENGTH:
- await self.send_json({"echo": text})
- else:
- await self.send_json({"error": "invalid_payload"})
+ return
+
+ text = content.get("text", "")
+ if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
+ await self.send_json({"error": "invalid_payload"})
+ return
+
+ ip = _get_ip(self.scope)
+ if not await _antispam_check(ip):
+ await self.send_json({"error": "rate_limited"})
+ return
+
+ user: User | None = self.scope.get("user")
+ thread = await _get_or_create_active_thread_for(user if user and user.is_authenticated else None, ip)
+
+ msg = await sync_to_async(send_message)(
+ thread,
+ sender_user=user if user and user.is_authenticated and not user.is_staff else None,
+ sender_type=SenderType.USER,
+ text=text,
+ )
+
+ await self.send_json(
+ {
+ "ok": True,
+ "thread_id": str(thread.uuid),
+ "message_id": str(msg.uuid),
+ }
+ )
class StaffInboxConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None:
user = self.scope.get("user")
- if not getattr(user, "is_staff", False):
+ if not await _is_user_support(user):
await self.close(code=4403)
return
+ await self.channel_layer.group_add(STAFF_INBOX_GROUP, self.channel_name)
await self.accept()
+ async def disconnect(self, code: int) -> None:
+ await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name)
+
@extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
- await self.send_json({"ok": True})
+ action = content.get("action")
+ user: User = self.scope.get("user")
+
+ if action == "ping":
+ await self.send_json({"type": "pong"})
+ return
+
+ if action == "list_open":
+
+ def _list():
+ qs = (
+ ChatThread.objects.filter(status=ThreadStatus.OPEN)
+ .values("uuid", "user_id", "email", "assigned_to_id", "last_message_at")
+ .order_by("-modified")
+ )
+ return list(qs)
+
+ data = await sync_to_async(_list)()
+ await self.send_json({"type": "inbox.list", "threads": data})
+ return
+
+ if action == "assign":
+ thread_id = content.get("thread_id")
+ if not thread_id:
+ await self.send_json({"error": "thread_id_required"})
+ return
+
+ def _assign():
+ thread = ChatThread.objects.get(uuid=thread_id)
+ return claim_thread(thread, user)
+
+ try:
+ t = await sync_to_async(_assign)()
+ await self.send_json({"type": "assigned", "thread_id": str(t.uuid), "user": str(user.uuid)})
+ except Exception as e: # noqa: BLE001
+ await self.send_json({"error": "assign_failed", "detail": str(e)})
+ return
+
+ if action == "reply":
+ thread_id = content.get("thread_id")
+ text = content.get("text", "")
+ if not thread_id or not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
+ await self.send_json({"error": "invalid_payload"})
+ return
+
+ def _can_reply_and_send():
+ thread = ChatThread.objects.get(uuid=thread_id)
+ if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
+ raise PermissionError("not_assigned")
+ return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
+
+ try:
+ msg = await sync_to_async(_can_reply_and_send)()
+ await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
+ except Exception as e: # noqa: BLE001
+ await self.send_json({"error": "reply_failed", "detail": str(e)})
+ return
+
+ if action == "close":
+ thread_id = content.get("thread_id")
+ if not thread_id:
+ await self.send_json({"error": "thread_id_required"})
+ return
+
+ def _close():
+ thread = ChatThread.objects.get(uuid=thread_id)
+ return close_thread(thread, user)
+
+ try:
+ t = await sync_to_async(_close)()
+ await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
+ except Exception as e: # noqa: BLE001
+ await self.send_json({"error": "close_failed", "detail": str(e)})
+ return
+
+ await self.send_json({"error": "unknown_action"})
+
+ async def staff_thread_created(self, event):
+ await self.send_json({"type": "staff.thread.created", **{k: v for k, v in event.items() if k != "type"}})
+
+ async def staff_thread_assigned(self, event):
+ await self.send_json({"type": "staff.thread.assigned", **{k: v for k, v in event.items() if k != "type"}})
+
+ async def staff_thread_reassigned(self, event):
+ await self.send_json({"type": "staff.thread.reassigned", **{k: v for k, v in event.items() if k != "type"}})
+
+ async def staff_thread_closed(self, event):
+ await self.send_json({"type": "staff.thread.closed", **{k: v for k, v in event.items() if k != "type"}})
class ThreadConsumer(AsyncJsonWebsocketConsumer):
@@ -49,12 +224,66 @@ class ThreadConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None:
user = self.scope.get("user")
- if not getattr(user, "is_staff", False):
+ if not await _is_user_support(user):
await self.close(code=4403)
return
self.thread_id = self.scope["url_route"]["kwargs"].get("thread_id")
+ await self.channel_layer.group_add(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
await self.accept()
+ async def disconnect(self, code: int) -> None:
+ if self.thread_id:
+ await self.channel_layer.group_discard(f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name)
+
@extend_ws_schema(**THREAD_CONSUMER_SCHEMA)
async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
+ action = content.get("action")
+ user: User = self.scope.get("user")
+
+ if action == "ping":
+ await self.send_json({"type": "pong", "thread": getattr(self, "thread_id", None)})
+ return
+
+ if action == "reply":
+ text = content.get("text", "")
+ if not isinstance(text, str) or not (0 < len(text) <= MAX_MESSAGE_LENGTH):
+ await self.send_json({"error": "invalid_payload"})
+ return
+
+ def _reply():
+ thread = ChatThread.objects.get(uuid=self.thread_id)
+ if thread.assigned_to_id and thread.assigned_to_id != user.id and not user.is_superuser:
+ raise PermissionError("not_assigned")
+ return send_message(thread, sender_user=user, sender_type=SenderType.STAFF, text=text)
+
+ try:
+ msg = await sync_to_async(_reply)()
+ await self.send_json({"type": "replied", "message_id": str(msg.uuid)})
+ except Exception as e: # noqa: BLE001
+ await self.send_json({"error": "reply_failed", "detail": str(e)})
+ return
+
+ if action == "close":
+
+ def _close():
+ thread = ChatThread.objects.get(uuid=self.thread_id)
+ return close_thread(thread, user)
+
+ try:
+ t = await sync_to_async(_close)()
+ await self.send_json({"type": "closed", "thread_id": str(t.uuid)})
+ except Exception as e: # noqa: BLE001
+ await self.send_json({"error": "close_failed", "detail": str(e)})
+ return
+
await self.send_json({"thread": getattr(self, "thread_id", None), "ok": True})
+
+ async def thread_message(self, event):
+ await self.send_json({"type": "thread.message", **{k: v for k, v in event.items() if k != "type"}})
+
+ async def thread_closed(self, event):
+ await self.send_json({"type": "thread.closed", **{k: v for k, v in event.items() if k != "type"}})
+
+
+# TODO: Add functionality so non-staff users may audio call staff-user. The call must fall into the queue where
+# staff users may take advantage of answering the call. Many non-staff users may call simultaneously, that's why we need the queue
diff --git a/engine/vibes_auth/messaging/forwarders/__init__.py b/engine/vibes_auth/messaging/forwarders/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/engine/vibes_auth/messaging/forwarders/telegram.py b/engine/vibes_auth/messaging/forwarders/telegram.py
new file mode 100644
index 00000000..a24d26e2
--- /dev/null
+++ b/engine/vibes_auth/messaging/forwarders/telegram.py
@@ -0,0 +1,198 @@
+from __future__ import annotations
+
+import asyncio
+import logging
+from contextlib import suppress
+from typing import Optional
+
+from aiogram import Bot, Dispatcher, Router, types
+from aiogram.enums import ParseMode
+from aiogram.filters import Command
+from aiogram.webhook.aiohttp_server import SimpleRequestHandler
+from django.conf import settings
+from django.contrib.auth import get_user_model
+from django.db.models import Q
+
+from engine.vibes_auth.choices import SenderType
+from engine.vibes_auth.messaging.services import send_message as svc_send_message
+from engine.vibes_auth.models import ChatThread
+
+logger = logging.getLogger(__name__)
+
+USER_SUPPORT_GROUP_NAME = "User Support"
+
+
+def is_telegram_enabled() -> bool:
+ return bool(settings.TELEGRAM_TOKEN)
+
+
+def _get_bot() -> Optional["Bot"]:
+ if not is_telegram_enabled():
+ logger.warning("Telegram forwarder disabled: missing aiogram or TELEGRAM_TOKEN")
+ return None
+ return Bot(token=settings.TELEGRAM_TOKEN, parse_mode=ParseMode.HTML) # type: ignore[arg-type]
+
+
+def build_router() -> Optional["Router"]:
+ if not is_telegram_enabled():
+ return None
+
+ User = get_user_model()
+ router: Router = Router()
+
+ @router.message(Command("start"))
+ async def cmd_start(message: types.Message): # type: ignore[valid-type]
+ parts = (message.text or "").split(maxsplit=1)
+ if len(parts) < 2:
+ await message.answer(
+ "Welcome! To link your account, send: /start ACTIVATION_TOKEN\n"
+ "You can find your activation token in your profile."
+ )
+ return
+ token = parts[1].strip()
+
+ def _link():
+ try:
+ retrieved_user = User.objects.get(activation_token=token)
+ except User.DoesNotExist: # type: ignore[attr-defined]
+ return None
+ attrs = dict(retrieved_user.attributes or {})
+ attrs["telegram_id"] = message.from_user.id if message.from_user else None # type: ignore[union-attr]
+ retrieved_user.attributes = attrs
+ retrieved_user.save(update_fields=["attributes", "modified"]) # type: ignore[attr-defined]
+ return retrieved_user
+
+ user = await asyncio.to_thread(_link)
+ if not user:
+ await message.answer("Invalid activation token.")
+ return
+ await message.answer("Your Telegram account has been linked successfully.")
+
+ @router.message(Command("unlink"))
+ async def cmd_unlink(message: types.Message): # type: ignore[valid-type]
+ tid = message.from_user.id if message.from_user else None # type: ignore[union-attr]
+ if not tid:
+ await message.answer("Cannot unlink: no Telegram user id.")
+ return
+
+ def _unlink():
+ q = Q(attributes__telegram_id=tid)
+ updated_query = User.objects.filter(q).update(attributes={})
+ return updated_query
+
+ updated = await asyncio.to_thread(_unlink)
+ if updated:
+ await message.answer("Unlinked successfully.")
+ else:
+ await message.answer("No linked account found.")
+
+ @router.message(Command("help"))
+ async def cmd_help(message: types.Message): # type: ignore[valid-type]
+ await message.answer(
+ "Commands:\n"
+ "/start — link your account\n"
+ "/unlink — unlink your account\n"
+ "As staff, you may use: reply THREAD_UUID your message to respond."
+ )
+
+ @router.message()
+ async def any_message(message: types.Message): # type: ignore[valid-type]
+ if not message.from_user or not message.text:
+ return
+ tid = message.from_user.id
+
+ def _resolve_staff_and_command():
+ try:
+ staff_user = User.objects.get(attributes__telegram_id=tid, is_staff=True, is_active=True)
+ except User.DoesNotExist: # type: ignore[attr-defined]
+ return None, None, None
+ # group check
+ if not staff_user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists():
+ return None, None, None
+ text = message.text.strip()
+ if text.lower().startswith("reply "):
+ parts = text.split(maxsplit=2)
+ if len(parts) < 3:
+ return staff_user, None, "Usage: reply "
+ thread_id, message_body = parts[1], parts[2]
+ try:
+ thread = ChatThread.objects.get(uuid=thread_id)
+ except ChatThread.DoesNotExist:
+ return staff_user, None, "Thread not found."
+ return staff_user, (thread, message_body), None
+ return staff_user, None, "Unknown command. Send /help"
+
+ staff, payload, error = await asyncio.to_thread(_resolve_staff_and_command)
+ if not staff:
+ return
+ if error:
+ await message.answer(error)
+ return
+ if payload:
+ t, body = payload
+
+ def _send():
+ return svc_send_message(t, sender_user=staff, sender_type=SenderType.STAFF, text=body)
+
+ await asyncio.to_thread(_send)
+ await message.answer("Sent.")
+
+ return router
+
+
+async def setup_webhook(webhook_base_url: str) -> None:
+ bot = _get_bot()
+ if not bot:
+ return
+ url = webhook_base_url.rstrip("/") + "/telegram/webhook/" + settings.TELEGRAM_TOKEN
+ with suppress(Exception):
+ await bot.delete_webhook(drop_pending_updates=True)
+ await bot.set_webhook(url=url, drop_pending_updates=True)
+ logger.info("Telegram webhook set to %s", url)
+
+
+async def forward_thread_message_to_assigned_staff(thread_uuid: str, text: str) -> None:
+ """Forward a thread message to assigned staff via Telegram if possible.
+
+ This is a best-effort; failures are logged and never raised.
+ """
+ if not is_telegram_enabled():
+ return
+
+ def _resolve_chat_and_chat_id() -> tuple[Optional[int], Optional[str]]:
+ try:
+ t = ChatThread.objects.select_related("assigned_to").get(uuid=thread_uuid)
+ except ChatThread.DoesNotExist:
+ return None, None
+ staff = t.assigned_to
+ if not staff:
+ return None, None
+ attrs = staff.attributes or {}
+ chat_telegram_id = attrs.get("telegram_id") if isinstance(attrs, dict) else None
+ return int(chat_telegram_id) if chat_telegram_id else None, str(t.uuid)
+
+ chat_id, _tid = await asyncio.to_thread(_resolve_chat_and_chat_id)
+ if not chat_id:
+ return
+ bot = _get_bot()
+ if not bot:
+ return
+ try:
+ await bot.send_message(chat_id=chat_id, text=text)
+ except Exception as exc: # noqa: BLE001
+ logger.warning("Failed to forward Telegram message for thread %s: %s", _tid, exc)
+
+
+def install_aiohttp_webhook(app) -> None: # pragma: no cover - integration helper
+ if not is_telegram_enabled():
+ logger.warning("Telegram forwarder not installed: disabled")
+ return
+ dp = Dispatcher() # type: ignore[call-arg]
+ router = build_router()
+ if router:
+ dp.include_router(router)
+ bot = _get_bot()
+ if not bot:
+ return
+ SimpleRequestHandler(dispatcher=dp, bot=bot).register(app, path="/telegram/webhook/" + settings.TELEGRAM_TOKEN) # type: ignore[arg-type]
+ logger.info("Telegram webhook handler installed on aiohttp app.")
diff --git a/engine/vibes_auth/messaging/services.py b/engine/vibes_auth/messaging/services.py
index e4926448..18a9dfc4 100644
--- a/engine/vibes_auth/messaging/services.py
+++ b/engine/vibes_auth/messaging/services.py
@@ -11,8 +11,11 @@ from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from engine.vibes_auth.choices import SenderType, ThreadStatus
-from engine.vibes_auth.models import ChatMessage, ChatThread
-from engine.vibes_auth.models import User
+from engine.vibes_auth.messaging.forwarders.telegram import (
+ forward_thread_message_to_assigned_staff,
+ is_telegram_enabled,
+)
+from engine.vibes_auth.models import ChatMessage, ChatThread, User
THREAD_GROUP_PREFIX = "thread:"
STAFF_INBOX_GROUP = "staff:inbox"
@@ -82,6 +85,8 @@ def send_message(thread: ChatThread, *, sender_user: User | None, sender_type: S
},
)
if sender_type != SenderType.STAFF:
+ if is_telegram_enabled():
+ async_to_sync(forward_thread_message_to_assigned_staff)(str(thread.uuid), text)
auto_reply(thread)
return msg
diff --git a/evibes/settings/constance.py b/evibes/settings/constance.py
index 2c52f41b..5181f7ea 100644
--- a/evibes/settings/constance.py
+++ b/evibes/settings/constance.py
@@ -35,7 +35,6 @@ CONSTANCE_CONFIG = OrderedDict(
("EMAIL_HOST_PASSWORD", (getenv("EMAIL_HOST_PASSWORD", "SUPERsecretPASSWORD"), _("SMTP password"))),
("EMAIL_FROM", (getenv("EMAIL_FROM", "eVibes"), _("Mail from option"))),
### Features Options ###
- ("TELEGRAM_API_TOKEN", ("", _("Use Telegram-bot functionality"))),
("DAYS_TO_STORE_ANON_MSGS", (1, _("How many days we store messages from anonymous users"))),
("DAYS_TO_STORE_AUTH_MSGS", (365, _("How many days we store messages from authenticated users"))),
("DISABLED_COMMERCE", (getenv("DISABLED_COMMERCE", False), _("Disable buy functionality"))),
@@ -71,7 +70,6 @@ CONSTANCE_CONFIG_FIELDSETS = OrderedDict(
"EMAIL_FROM",
),
gettext_noop("Features Options"): (
- "TELEGRAM_API_TOKEN",
"DAYS_TO_STORE_ANON_MSGS",
"DAYS_TO_STORE_AUTH_MSGS",
"DISABLED_COMMERCE",
diff --git a/evibes/settings/extensions.py b/evibes/settings/extensions.py
index eed9c571..c3f74482 100644
--- a/evibes/settings/extensions.py
+++ b/evibes/settings/extensions.py
@@ -10,3 +10,5 @@ EXTENSIONS_MAX_UNIQUE_QUERY_ATTEMPTS = 500
HEALTHCHECK_CELERY_RESULT_TIMEOUT = 5
HEALTHCHECK_CELERY_QUEUE_TIMEOUT = 5
+
+TELEGRAM_TOKEN = getenv("TELEGRAM_TOKEN", "") # noqa: F405