schon/engine/vibes_auth/messaging/forwarders/telegram.py
Egor fureunoir Gorbunov a96aab33cb Features: 1) Add initialization timestamp tracking to prevent redundant instance initialization; 2) Include .initialized file in .gitignore for production environments;
Fixes: 1) Remove unnecessary pragma from `install_aiohttp_webhook` definition;

Extra: 1) Add logging for `.initialized` read/write failures; 2) General cleanup and formatting improvements in initialization logic;
2025-12-03 13:33:04 +03:00

196 lines
7.2 KiB
Python

import asyncio
import logging
from contextlib import suppress
from aiogram import Bot, Dispatcher, Router, types
from aiogram.enums import ParseMode
from aiogram.filters import Command
from aiogram.webhook.aiohttp_server import SimpleRequestHandler
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db.models import Q
from engine.vibes_auth.choices import SenderType
from engine.vibes_auth.models import ChatThread
logger = logging.getLogger(__name__)
USER_SUPPORT_GROUP_NAME = "User Support"
def is_telegram_enabled() -> bool:
return bool(settings.TELEGRAM_TOKEN)
def _get_bot() -> Bot | None:
if not is_telegram_enabled():
logger.warning("Telegram forwarder disabled: missing aiogram or TELEGRAM_TOKEN")
return None
return Bot(token=settings.TELEGRAM_TOKEN, parse_mode=ParseMode.HTML) # type: ignore[arg-type]
def build_router() -> Router | None:
if not is_telegram_enabled():
return None
User = get_user_model()
router: Router = Router()
@router.message(Command("start"))
async def cmd_start(message: types.Message): # type: ignore[valid-type]
parts = (message.text or "").split(maxsplit=1)
if len(parts) < 2:
await message.answer(
"Welcome! To link your account, send: <code>/start ACTIVATION_TOKEN</code>\n"
"You can find your activation token in your profile."
)
return
token = parts[1].strip()
def _link():
try:
retrieved_user = User.objects.get(activation_token=token)
except User.DoesNotExist: # type: ignore[attr-defined]
return None
attrs = dict(retrieved_user.attributes or {})
attrs["telegram_id"] = message.from_user.id if message.from_user else None # type: ignore[union-attr]
retrieved_user.attributes = attrs
retrieved_user.save(update_fields=["attributes", "modified"]) # type: ignore[attr-defined]
return retrieved_user
user = await asyncio.to_thread(_link)
if not user:
await message.answer("Invalid activation token.")
return
await message.answer("Your Telegram account has been linked successfully.")
@router.message(Command("unlink"))
async def cmd_unlink(message: types.Message): # type: ignore[valid-type]
tid = message.from_user.id if message.from_user else None # type: ignore[union-attr]
if not tid:
await message.answer("Cannot unlink: no Telegram user id.")
return
def _unlink():
q = Q(attributes__telegram_id=tid)
updated_query = User.objects.filter(q).update(attributes={})
return updated_query
updated = await asyncio.to_thread(_unlink)
if updated:
await message.answer("Unlinked successfully.")
else:
await message.answer("No linked account found.")
@router.message(Command("help"))
async def cmd_help(message: types.Message): # type: ignore[valid-type]
await message.answer(
"Commands:\n"
"/start <token> — link your account\n"
"/unlink — unlink your account\n"
"As staff, you may use: <code>reply THREAD_UUID your message</code> to respond."
)
@router.message()
async def any_message(message: types.Message): # type: ignore[valid-type]
from engine.vibes_auth.messaging.services import send_message as svc_send_message
if not message.from_user or not message.text:
return
tid = message.from_user.id
def _resolve_staff_and_command():
try:
staff_user = User.objects.get(attributes__telegram_id=tid, is_staff=True, is_active=True)
except User.DoesNotExist: # type: ignore[attr-defined]
return None, None, None
# group check
if not staff_user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists():
return None, None, None
text = message.text.strip()
if text.lower().startswith("reply "):
parts = text.split(maxsplit=2)
if len(parts) < 3:
return staff_user, None, "Usage: reply <THREAD_UUID> <message>"
thread_id, message_body = parts[1], parts[2]
try:
thread = ChatThread.objects.get(uuid=thread_id)
except ChatThread.DoesNotExist:
return staff_user, None, "Thread not found."
return staff_user, (thread, message_body), None
return staff_user, None, "Unknown command. Send /help"
staff, payload, error = await asyncio.to_thread(_resolve_staff_and_command)
if not staff:
return
if error:
await message.answer(error)
return
if payload:
t, body = payload
def _send():
return svc_send_message(t, sender_user=staff, sender_type=SenderType.STAFF, text=body)
await asyncio.to_thread(_send)
await message.answer("Sent.")
return router
async def setup_webhook(webhook_base_url: str) -> None:
bot = _get_bot()
if not bot:
return
url = webhook_base_url.rstrip("/") + "/telegram/webhook/" + settings.TELEGRAM_TOKEN
with suppress(Exception):
await bot.delete_webhook(drop_pending_updates=True)
await bot.set_webhook(url=url, drop_pending_updates=True)
logger.info("Telegram webhook set to %s", url)
async def forward_thread_message_to_assigned_staff(thread_uuid: str, text: str) -> None:
"""Forward a thread message to assigned staff via Telegram if possible.
This is a best-effort; failures are logged and never raised.
"""
if not is_telegram_enabled():
return
def _resolve_chat_and_chat_id() -> tuple[int | None, str | None]:
try:
t = ChatThread.objects.select_related("assigned_to").get(uuid=thread_uuid)
except ChatThread.DoesNotExist:
return None, None
staff = t.assigned_to
if not staff:
return None, None
attrs = staff.attributes or {}
chat_telegram_id = attrs.get("telegram_id") if isinstance(attrs, dict) else None
return int(chat_telegram_id) if chat_telegram_id else None, str(t.uuid)
chat_id, _tid = await asyncio.to_thread(_resolve_chat_and_chat_id)
if not chat_id:
return
bot = _get_bot()
if not bot:
return
try:
await bot.send_message(chat_id=chat_id, text=text)
except Exception as exc: # noqa: BLE001
logger.warning("Failed to forward Telegram message for thread %s: %s", _tid, exc)
def install_aiohttp_webhook(app) -> None:
if not is_telegram_enabled():
logger.warning("Telegram forwarder not installed: disabled")
return
dp = Dispatcher() # type: ignore[call-arg]
router = build_router()
if router:
dp.include_router(router)
bot = _get_bot()
if not bot:
return
SimpleRequestHandler(dispatcher=dp, bot=bot).register(app, path="/telegram/webhook/" + settings.TELEGRAM_TOKEN) # type: ignore[arg-type]
logger.info("Telegram webhook handler installed on aiohttp app.")