schon/payments/signals.py
Egor fureunoir Gorbunov 3fe3d571bb Features: 1) Add GatewayManager and GatewayQuerySet to handle advanced gateway filtering and usage checks; 2) Integrate GatewayManager with Gateway model for automatic query filtering; 3) Introduce can_be_used query annotation for gateway availability.
Fixes: 1) Default to a usable `Gateway` when processing new transactions if none is specified.

Extra: 1) Add missing import for `GatewayManager` in `payments.models`; 2) Refactor gateway availability logic into the manager and query set for cleaner code organization.
2025-10-22 11:12:30 +03:00

39 lines
1.5 KiB
Python

import logging
import traceback
from typing import Any
from django.db.models.signals import post_save
from django.dispatch import receiver
from payments.models import Balance, Transaction, Gateway
from payments.utils.emailing import balance_deposit_email
from vibes_auth.models import User
logger = logging.getLogger("django")
# noinspection PyUnusedLocal
@receiver(post_save, sender=User)
def create_balance_on_user_creation_signal(instance: User, created: bool, **kwargs: dict[Any, Any]) -> None:
if created:
Balance.objects.create(user=instance)
# noinspection PyUnusedLocal
@receiver(post_save, sender=Transaction)
def process_transaction_changes(instance: Transaction, created: bool, **kwargs: dict[Any, Any]) -> None:
if created:
if not instance.gateway:
instance.gateway = Gateway.objects.can_be_used().first()
try:
gateway = instance.gateway.get_integration_class_object()
gateway.process_transaction(instance)
except Exception as e:
instance.process = {"status": "ERRORED", "error": str(e)}
logger.error(f"Error processing transaction {instance.uuid}: {e}\n{traceback.format_exc()}")
if not created:
status = str(instance.process.get("status", "")).lower()
success = instance.process.get("success", False)
if ("success" in status or success) and (instance.process.get("notify", False)):
balance_deposit_email.delay(instance.uuid) # type: ignore [attr-defined]