from __future__ import annotations import asyncio import logging from contextlib import suppress from typing import Optional from aiogram import Bot, Dispatcher, Router, types from aiogram.enums import ParseMode from aiogram.filters import Command from aiogram.webhook.aiohttp_server import SimpleRequestHandler from django.conf import settings from django.contrib.auth import get_user_model from django.db.models import Q from engine.vibes_auth.choices import SenderType from engine.vibes_auth.models import ChatThread logger = logging.getLogger(__name__) USER_SUPPORT_GROUP_NAME = "User Support" def is_telegram_enabled() -> bool: return bool(settings.TELEGRAM_TOKEN) def _get_bot() -> Optional["Bot"]: if not is_telegram_enabled(): logger.warning("Telegram forwarder disabled: missing aiogram or TELEGRAM_TOKEN") return None return Bot(token=settings.TELEGRAM_TOKEN, parse_mode=ParseMode.HTML) # type: ignore[arg-type] def build_router() -> Optional["Router"]: if not is_telegram_enabled(): return None User = get_user_model() router: Router = Router() @router.message(Command("start")) async def cmd_start(message: types.Message): # type: ignore[valid-type] parts = (message.text or "").split(maxsplit=1) if len(parts) < 2: await message.answer( "Welcome! To link your account, send: /start ACTIVATION_TOKEN\n" "You can find your activation token in your profile." ) return token = parts[1].strip() def _link(): try: retrieved_user = User.objects.get(activation_token=token) except User.DoesNotExist: # type: ignore[attr-defined] return None attrs = dict(retrieved_user.attributes or {}) attrs["telegram_id"] = message.from_user.id if message.from_user else None # type: ignore[union-attr] retrieved_user.attributes = attrs retrieved_user.save(update_fields=["attributes", "modified"]) # type: ignore[attr-defined] return retrieved_user user = await asyncio.to_thread(_link) if not user: await message.answer("Invalid activation token.") return await message.answer("Your Telegram account has been linked successfully.") @router.message(Command("unlink")) async def cmd_unlink(message: types.Message): # type: ignore[valid-type] tid = message.from_user.id if message.from_user else None # type: ignore[union-attr] if not tid: await message.answer("Cannot unlink: no Telegram user id.") return def _unlink(): q = Q(attributes__telegram_id=tid) updated_query = User.objects.filter(q).update(attributes={}) return updated_query updated = await asyncio.to_thread(_unlink) if updated: await message.answer("Unlinked successfully.") else: await message.answer("No linked account found.") @router.message(Command("help")) async def cmd_help(message: types.Message): # type: ignore[valid-type] await message.answer( "Commands:\n" "/start — link your account\n" "/unlink — unlink your account\n" "As staff, you may use: reply THREAD_UUID your message to respond." ) @router.message() async def any_message(message: types.Message): # type: ignore[valid-type] from engine.vibes_auth.messaging.services import send_message as svc_send_message if not message.from_user or not message.text: return tid = message.from_user.id def _resolve_staff_and_command(): try: staff_user = User.objects.get(attributes__telegram_id=tid, is_staff=True, is_active=True) except User.DoesNotExist: # type: ignore[attr-defined] return None, None, None # group check if not staff_user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists(): return None, None, None text = message.text.strip() if text.lower().startswith("reply "): parts = text.split(maxsplit=2) if len(parts) < 3: return staff_user, None, "Usage: reply " thread_id, message_body = parts[1], parts[2] try: thread = ChatThread.objects.get(uuid=thread_id) except ChatThread.DoesNotExist: return staff_user, None, "Thread not found." return staff_user, (thread, message_body), None return staff_user, None, "Unknown command. Send /help" staff, payload, error = await asyncio.to_thread(_resolve_staff_and_command) if not staff: return if error: await message.answer(error) return if payload: t, body = payload def _send(): return svc_send_message(t, sender_user=staff, sender_type=SenderType.STAFF, text=body) await asyncio.to_thread(_send) await message.answer("Sent.") return router async def setup_webhook(webhook_base_url: str) -> None: bot = _get_bot() if not bot: return url = webhook_base_url.rstrip("/") + "/telegram/webhook/" + settings.TELEGRAM_TOKEN with suppress(Exception): await bot.delete_webhook(drop_pending_updates=True) await bot.set_webhook(url=url, drop_pending_updates=True) logger.info("Telegram webhook set to %s", url) async def forward_thread_message_to_assigned_staff(thread_uuid: str, text: str) -> None: """Forward a thread message to assigned staff via Telegram if possible. This is a best-effort; failures are logged and never raised. """ if not is_telegram_enabled(): return def _resolve_chat_and_chat_id() -> tuple[Optional[int], Optional[str]]: try: t = ChatThread.objects.select_related("assigned_to").get(uuid=thread_uuid) except ChatThread.DoesNotExist: return None, None staff = t.assigned_to if not staff: return None, None attrs = staff.attributes or {} chat_telegram_id = attrs.get("telegram_id") if isinstance(attrs, dict) else None return int(chat_telegram_id) if chat_telegram_id else None, str(t.uuid) chat_id, _tid = await asyncio.to_thread(_resolve_chat_and_chat_id) if not chat_id: return bot = _get_bot() if not bot: return try: await bot.send_message(chat_id=chat_id, text=text) except Exception as exc: # noqa: BLE001 logger.warning("Failed to forward Telegram message for thread %s: %s", _tid, exc) def install_aiohttp_webhook(app) -> None: # pragma: no cover - integration helper if not is_telegram_enabled(): logger.warning("Telegram forwarder not installed: disabled") return dp = Dispatcher() # type: ignore[call-arg] router = build_router() if router: dp.include_router(router) bot = _get_bot() if not bot: return SimpleRequestHandler(dispatcher=dp, bot=bot).register(app, path="/telegram/webhook/" + settings.TELEGRAM_TOKEN) # type: ignore[arg-type] logger.info("Telegram webhook handler installed on aiohttp app.")