schon/engine/core/crm/amo/gateway.py
Egor fureunoir Gorbunov 5f9f07d8f4 Features: 1) Replace config.BASE_DOMAIN with settings.BASE_DOMAIN across the codebase; 2) Add support for dynamic STATIC_URL and MEDIA_URL based on initialization state; 3) Introduce INITIALIZED flag to determine application state;
Fixes: 1) Add missing import for `settings` in multiple modules;

Extra: 1) Remove unused `SerializerMethodField` from serializers; 2) Update `RELEASE_DATE` to align with new version; 3) General cleanup and consistency adjustments.
2025-11-11 15:08:44 +03:00

195 lines
7.8 KiB
Python

import logging
import requests
from django.conf import settings
from django.core.cache import cache
from django.db import transaction
from engine.core.crm.exceptions import CRMException
from engine.core.models import CustomerRelationshipManagementProvider, Order, OrderCrmLink
from engine.core.utils import is_status_code_success
logger = logging.getLogger(__name__)
class AmoCRM:
STATUS_MAP: dict[str, str] = {}
def __init__(self):
try:
self.instance = CustomerRelationshipManagementProvider.objects.get(name="AmoCRM")
except CustomerRelationshipManagementProvider.DoesNotExist as dne:
logger.warning("AMO CRM provider not found")
raise CRMException("AMO CRM provider not found") from dne
except CustomerRelationshipManagementProvider.MultipleObjectsReturned as mre:
logger.warning("Multiple AMO CRM providers found")
raise CRMException("Multiple AMO CRM providers found") from mre
self.base = f"{self.instance.integration_url}"
self.client_id = self.instance.authentication.get("client_id")
self.client_secret = self.instance.authentication.get("client_secret")
self.authorization_code = self.instance.authentication.get("authorization_code")
self.refresh_token = cache.get("amo_refresh_token")
self.responsible_user_id = self.instance.attributes.get("responsible_user_id")
self.fns_api_key = self.instance.attributes.get("fns_api_key")
if not all(
[
self.base,
self.client_id,
self.client_secret,
self.authorization_code,
self.fns_api_key,
]
):
raise CRMException("AMO CRM provider not configured")
def _token(self) -> str:
payload = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"redirect_uri": f"https://api.{settings.BASE_DOMAIN}/",
}
if self.refresh_token:
payload["grant_type"] = "refresh_token"
payload["refresh_token"] = self.refresh_token
else:
payload["grant_type"] = "authorization_code"
payload["code"] = self.authorization_code
r = requests.post(f"{self.base}/oauth2/access_token", json=payload, timeout=15)
if not is_status_code_success(r.status_code):
logger.error("Unable to get AMO access token: %s %s", r.status_code, r.text)
raise CRMException("Unable to get AMO access token")
data = r.json()
self.access_token = data["access_token"]
cache.set("amo_refresh_token", data["refresh_token"], 604800)
self.refresh_token = data["refresh_token"]
return self.access_token
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self._token()}", "Content-Type": "application/json"}
def _build_lead_payload(self, order: Order) -> dict:
name = f"Заказ #{order.human_readable_id}"
price = int(round(order.total_price))
payload: dict = {"name": name, "price": price}
contact_id = self._create_contact(order)
if contact_id:
payload["_embedded"] = {"contacts": [{"id": contact_id}]}
if self.responsible_user_id:
payload["responsible_user_id"] = self.responsible_user_id
return payload
def _get_customer_name(self, order: Order) -> str:
if type(order.attributes) is not dict:
raise ValueError("order.attributes must be a dict")
if order.user:
if type(order.user.attributes) is not dict:
order.user.attributes = {}
order.user.save()
if not order.business_identificator:
return (
order.user.get_full_name()
if order.user
else None
or (
f"{order.attributes.get('customer_name')} | "
f"{order.attributes.get('customer_phone_number') or order.attributes.get('customer_email')}"
)
)
try:
r = requests.get(
f"https://api-fns.ru/api/egr?req={order.business_identificator}&key={self.fns_api_key}", timeout=15
)
r.raise_for_status()
body = r.json()
except requests.exceptions.RequestException as rex:
logger.error("Unable to get company info with FNS: %s", rex, exc_info=True)
return ""
ul = body.get("ЮЛ")
ip = body.get("ИП")
if ul and not ip:
return f"{ul.get('НаимСокрЮЛ')} | {order.business_identificator}"
if ip and not ul:
return f"ИП {ip.get('ФИОПолн')} | {order.business_identificator}"
return ""
def _create_contact(self, order: Order) -> int | None:
try:
customer_name = self._get_customer_name(order)
if customer_name:
r = requests.get(
f"{self.base}/api/v4/contacts",
headers=self._headers().update({"filter[name]": customer_name, "limit": 1}),
timeout=15,
)
if r.status_code == 200:
body = r.json()
return body.get("_embedded", {}).get("contacts", [{}])[0].get("id", None)
create_contact_payload = {"name": customer_name}
if self.responsible_user_id:
create_contact_payload["responsible_user_id"] = self.responsible_user_id
if order.user:
create_contact_payload["first_name"] = order.user.first_name or ""
create_contact_payload["last_name"] = order.user.last_name or ""
r = requests.post(
f"{self.base}/api/v4/contacts", json={"name": customer_name}, headers=self._headers(), timeout=15
)
if r.status_code == 200:
body = r.json()
return body.get("_embedded", {}).get("contacts", [{}])[0].get("id", None)
return None
else:
return None
except requests.exceptions.RequestException as rex:
logger.error("Unable to create a company in AmoCRM: %s", rex, exc_info=True)
raise CRMException("Unable to create a company in AmoCRM") from rex
def process_order_changes(self, order: Order) -> str:
with transaction.atomic():
try:
link: OrderCrmLink | None = OrderCrmLink.objects.get(order=order)
except OrderCrmLink.MultipleObjectsReturned:
link = OrderCrmLink.objects.filter(order=order).first()
except OrderCrmLink.DoesNotExist:
link = None
if link:
lead_id = link.crm_lead_id
payload = self._build_lead_payload(order)
r = requests.patch(
f"{self.base}/api/v4/leads/{lead_id}", json=payload, headers=self._headers(), timeout=15
)
if r.status_code not in (200, 204):
r.raise_for_status()
return lead_id
payload = self._build_lead_payload(order)
r = requests.post(f"{self.base}/api/v4/leads", json=[payload], headers=self._headers(), timeout=15)
r.raise_for_status()
body = r.json()
lead_id = str(body["_embedded"]["leads"][0]["id"])
OrderCrmLink.objects.create(order=order, crm_lead_id=lead_id, crm=self.instance)
return lead_id
def update_order_status(self, crm_lead_id: str, new_status: str) -> None:
link = OrderCrmLink.objects.get(crm_lead_id=crm_lead_id)
if link.order.status == new_status:
return
link.order.status = self.STATUS_MAP[new_status]
link.order.save(update_fields=["status"])