Features: 1) Introduce AmoOrderGateway to handle interaction with AMO CRM; 2) Add models CustomerRelationshipManagementProvider and OrderCrmLink to support CRM integration; 3) Implement any_crm_integrations utility function to check CRM provider existence;

Fixes: None;

Extra: 1) Add `URLField` import in models.py; 2) Adjust `save` method signature in models.py to improve typing consistency; 3) Raise custom `CRMException` for CRM setup errors;
This commit is contained in:
Egor Pavlovich Gorbunov 2025-09-06 03:07:26 +03:00
parent 880f3f19b1
commit 752f96fcdd
5 changed files with 146 additions and 1 deletions

5
core/crm/__init__.py Normal file
View file

@ -0,0 +1,5 @@
from core.models import CustomerRelationshipManagementProvider
def any_crm_integrations():
return CustomerRelationshipManagementProvider.objects.exists()

0
core/crm/amo/__init__.py Normal file
View file

110
core/crm/amo/gateway.py Normal file
View file

@ -0,0 +1,110 @@
import logging
import time
from typing import Optional
import requests
from django.core.cache import cache
from django.db import transaction
from core.crm.exceptions import CRMException
from core.models import CustomerRelationshipManagementProvider, Order, OrderCrmLink
logger = logging.getLogger("django")
class AmoOrderGateway:
def __init__(self):
try:
self.instance = CustomerRelationshipManagementProvider.objects.get(name="amo")
except CustomerRelationshipManagementProvider.DoesNotExist:
logger.warning("AMO CRM provider not found")
raise CRMException("AMO CRM provider not found")
self.base = f"https://{self.instance.integration_url}"
self.client_id = self.instance.authentication.get("client_id")
self.client_secret = self.instance.authentication.get("client_secret")
self.refresh_token = cache.get("amo_refresh_token")
self.redirect_uri = self.instance.attributes.get("redirect_uri")
self.pipeline_id = self.instance.attributes.get("pipeline_id")
self.stage_map = self.instance.attributes.get("stage_map")
self.responsible_user_id = self.instance.attributes.get("responsible_user_id")
if not all(
[
self.base,
self.client_id,
self.client_secret,
self.redirect_uri,
self.pipeline_id,
self.stage_map,
]
):
raise CRMException("AMO CRM provider not configured")
def _token(self) -> str:
cached = getattr(self, "_cached_token", None)
expiry = getattr(self, "_cached_expiry", 0)
if cached and time.time() < expiry - 15:
return cached
payload = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"redirect_uri": self.redirect_uri,
}
r = requests.post(f"{self.base}/oauth2/access_token", json=payload, timeout=15)
r.raise_for_status()
data = r.json()
self._cached_token = data["access_token"]
self._cached_expiry = time.time() + int(data.get("expires_in", 900))
self.refresh_token = data.get("refresh_token", self.refresh_token)
return self._cached_token
def _headers(self) -> dict:
return {"Authorization": f"Bearer {self._token()}", "Content-Type": "application/json"}
def _build_lead_payload(self, order: Order) -> dict:
name = f"Order #{order.human_readable_id}"
price = int(round(order.total_price))
stage_id = self.stage_map.get(order.status)
payload = {"name": name, "price": price, "pipeline_id": self.pipeline_id}
if stage_id:
payload["status_id"] = stage_id
if self.responsible_user_id:
payload["responsible_user_id"] = self.responsible_user_id
return payload
def upsert_order(self, order: Order) -> str:
with transaction.atomic():
link: Optional[OrderCrmLink] = OrderCrmLink.objects.filter(order=order).first()
if link:
lead_id = link.crm_lead_id
payload = self._build_lead_payload(order)
r = requests.patch(
f"{self.base}/api/v4/leads/{lead_id}", json=payload, headers=self._headers(), timeout=15
)
if r.status_code not in (200, 204):
r.raise_for_status()
return lead_id
payload = self._build_lead_payload(order)
r = requests.post(f"{self.base}/api/v4/leads", json=[payload], headers=self._headers(), timeout=15)
r.raise_for_status()
body = r.json()
lead_id = str(body["_embedded"]["leads"][0]["id"])
OrderCrmLink.objects.create(order_uuid=order.uuid, crm_lead_id=lead_id)
return lead_id
def update_order_status(self, crm_lead_id: str, new_status_code: str) -> None:
link = OrderCrmLink.objects.filter(crm_lead_id=crm_lead_id).first()
if not link:
return
from core.models import Order
order = Order.objects.get(uuid=link.order_uuid)
if order.status == new_status_code:
return
order.status = new_status_code
order.save(update_fields=["status"])

2
core/crm/exceptions.py Normal file
View file

@ -0,0 +1,2 @@
class CRMException(Exception):
pass

View file

@ -30,6 +30,7 @@ from django.db.models import (
PositiveIntegerField, PositiveIntegerField,
QuerySet, QuerySet,
TextField, TextField,
URLField,
) )
from django.db.models.indexes import Index from django.db.models.indexes import Index
from django.http import Http404 from django.http import Http404
@ -1431,7 +1432,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel): # type: ignore [mi
self.user.attributes.get("is_business", False) if self.user else False self.user.attributes.get("is_business", False) if self.user else False
) )
def save(self, **kwargs: dict) -> Self: def save(self, **kwargs) -> Self:
pending_orders = 0 pending_orders = 0
if self.user: if self.user:
pending_orders = self.user.orders.filter(status="PENDING").count() pending_orders = self.user.orders.filter(status="PENDING").count()
@ -1929,6 +1930,33 @@ class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel): # t
return None return None
class CustomerRelationshipManagementProvider(ExportModelOperationsMixin("crm_provider"), NiceModel):
name = CharField(max_length=128, unique=True, verbose_name=_("name"))
integration_url = URLField(blank=True, null=True, help_text=_("URL of the integration"))
authentication = JSONField(blank=True, null=True, help_text=_("authentication credentials"))
attributes = JSONField(blank=True, null=True, help_name=_("attributes"))
def __str__(self) -> str:
return self.crm_lead_id
class Meta:
verbose_name = _("order CRM link")
verbose_name_plural = _("orders CRM links")
class OrderCrmLink(ExportModelOperationsMixin("order_crm_link"), NiceModel):
order_uuid = ForeignKey(to=Order, on_delete=PROTECT, related_name="crm_links")
crm = ForeignKey(to=CustomerRelationshipManagementProvider, on_delete=PROTECT, related_name="order_links")
crm_lead_id = CharField(max_length=30, unique=True, db_index=True)
def __str__(self) -> str:
return self.crm_lead_id
class Meta:
verbose_name = _("order CRM link")
verbose_name_plural = _("orders CRM links")
class DigitalAssetDownload(ExportModelOperationsMixin("attribute_group"), NiceModel): # type: ignore [misc, django-manager-missing] class DigitalAssetDownload(ExportModelOperationsMixin("attribute_group"), NiceModel): # type: ignore [misc, django-manager-missing]
""" """
Represents the downloading functionality for digital assets associated Represents the downloading functionality for digital assets associated