feat(vendor-management): introduce custom querysets and managers for streamlined vendor integration

Add tailored querysets and managers for `Product`, `Stock`, and `AttributeValue` models to simplify vendor-specific operations. Abstract methods were added to `AbstractVendor` for handling essential vendor processes, ensuring clear API expectations.

This change standardizes vendor-related queries, improves clarity, and ensures efficient bulk operations.
This commit is contained in:
Egor Pavlovich Gorbunov 2026-01-28 13:41:38 +03:00
parent 8d8c281eab
commit 0db5b9b712
3 changed files with 396 additions and 59 deletions

View file

@ -1,10 +1,15 @@
import logging
from typing import TYPE_CHECKING, Self
import requests
from constance import config
from django.contrib.gis.geos import Point
from django.db import models
from modeltranslation.manager import MultilingualManager
from django.db.models import QuerySet
from modeltranslation.manager import MultilingualManager, MultilingualQuerySet
if TYPE_CHECKING:
from engine.core.models import AttributeValue, Product, Stock, Vendor # noqa: F401
logger = logging.getLogger(__name__)
@ -67,8 +72,11 @@ class AddressManager(models.Manager):
)[0]
class ProductManager(MultilingualManager):
def available(self):
class ProductQuerySet(MultilingualQuerySet["Product"]):
"""Custom QuerySet for Product with vendor-related operations."""
def available(self) -> Self:
"""Filter to only available products."""
return self.filter(
is_active=True,
brand__is_active=True,
@ -77,7 +85,8 @@ class ProductManager(MultilingualManager):
stocks__vendor__is_active=True,
)
def available_in_stock(self):
def available_in_stock(self) -> Self:
"""Filter to available products with stock quantity > 0."""
return self.filter(
is_active=True,
brand__is_active=True,
@ -87,7 +96,151 @@ class ProductManager(MultilingualManager):
stocks__quantity__gt=0,
)
def with_related(self):
def with_related(self) -> Self:
"""Prefetch related objects for performance."""
return self.select_related("category", "brand").prefetch_related(
"tags", "stocks", "images", "attributes__attribute__group"
)
def stale(self) -> Self:
"""Filter products with no orders and no stocks."""
return self.filter(orderproduct__isnull=True, stocks__isnull=True)
def for_vendor(self, vendor: "Vendor") -> Self:
"""Filter products that have stocks from a specific vendor."""
return self.filter(stocks__vendor=vendor)
def not_in_orders(self) -> Self:
"""Filter products that are not part of any order."""
return self.filter(orderproduct__isnull=True)
def for_vendor_not_in_orders(self, vendor: "Vendor") -> Self:
"""Filter products for a vendor that are not in any orders."""
return self.for_vendor(vendor).filter(orderproduct__isnull=True)
def mark_inactive(self) -> int:
"""Bulk update products to inactive status. Returns count updated."""
return self.update(is_active=False)
def mark_for_deletion(self, marker: str = "SCHON_DELETED_PRODUCT") -> int:
"""Mark products for deletion by setting description. Returns count updated."""
return self.update(description=marker)
def marked_for_deletion(self, marker: str = "SCHON_DELETED_PRODUCT") -> Self:
"""Get products marked for deletion."""
return self.filter(description__exact=marker)
def delete_with_related(self, batch_size: int = 5000) -> int:
"""
Delete products in batches along with their related objects.
Returns the total number of products deleted.
"""
from engine.core.models import AttributeValue, ProductImage
total_deleted = 0
while True:
batch_ids = list(self.values_list("pk", flat=True)[:batch_size])
if not batch_ids:
break
AttributeValue.objects.filter(product_id__in=batch_ids).delete()
ProductImage.objects.filter(product_id__in=batch_ids).delete()
deleted_count, _ = self.model.objects.filter(pk__in=batch_ids).delete()
total_deleted += deleted_count
return total_deleted
class ProductManager(MultilingualManager["Product"]):
"""Manager for Product model with custom queryset methods."""
_queryset_class = ProductQuerySet
def get_queryset(self) -> ProductQuerySet:
return ProductQuerySet(self.model, using=self._db)
def available(self) -> ProductQuerySet:
return self.get_queryset().available()
def available_in_stock(self) -> ProductQuerySet:
return self.get_queryset().available_in_stock()
def with_related(self) -> ProductQuerySet:
return self.get_queryset().with_related()
def stale(self) -> ProductQuerySet:
return self.get_queryset().stale()
def for_vendor(self, vendor: "Vendor") -> ProductQuerySet:
return self.get_queryset().for_vendor(vendor)
def not_in_orders(self) -> ProductQuerySet:
return self.get_queryset().not_in_orders()
def for_vendor_not_in_orders(self, vendor: "Vendor") -> ProductQuerySet:
return self.get_queryset().for_vendor_not_in_orders(vendor)
class StockQuerySet(QuerySet["Stock"]):
"""Custom QuerySet for Stock with vendor-related operations."""
def for_vendor(self, vendor: "Vendor") -> Self:
"""Filter stocks for a specific vendor."""
return self.filter(vendor=vendor)
def not_in_orders(self) -> Self:
"""Filter stocks whose products are not in any orders."""
return self.filter(product__orderproduct__isnull=True)
def for_vendor_not_in_orders(self, vendor: "Vendor") -> Self:
"""Filter stocks for a vendor whose products are not in orders."""
return self.for_vendor(vendor).filter(product__orderproduct__isnull=True)
class StockManager(models.Manager["Stock"]):
"""Manager for Stock model with custom queryset methods."""
def get_queryset(self) -> StockQuerySet:
return StockQuerySet(self.model, using=self._db)
def for_vendor(self, vendor: "Vendor") -> StockQuerySet:
return self.get_queryset().for_vendor(vendor)
def not_in_orders(self) -> StockQuerySet:
return self.get_queryset().not_in_orders()
def for_vendor_not_in_orders(self, vendor: "Vendor") -> StockQuerySet:
return self.get_queryset().for_vendor_not_in_orders(vendor)
class AttributeValueQuerySet(QuerySet["AttributeValue"]):
"""Custom QuerySet for AttributeValue with vendor-related operations."""
def for_vendor(self, vendor: "Vendor") -> Self:
"""Filter attribute values for products from a specific vendor."""
return self.filter(product__stocks__vendor=vendor)
def not_in_orders(self) -> Self:
"""Filter attribute values whose products are not in any orders."""
return self.filter(product__orderproduct__isnull=True)
def for_vendor_not_in_orders(self, vendor: "Vendor") -> Self:
"""Filter attribute values for a vendor whose products are not in orders."""
return self.for_vendor(vendor).filter(product__orderproduct__isnull=True)
class AttributeValueManager(models.Manager["AttributeValue"]):
"""Manager for AttributeValue model with custom queryset methods."""
def get_queryset(self) -> AttributeValueQuerySet:
return AttributeValueQuerySet(self.model, using=self._db)
def for_vendor(self, vendor: "Vendor") -> AttributeValueQuerySet:
return self.get_queryset().for_vendor(vendor)
def not_in_orders(self) -> AttributeValueQuerySet:
return self.get_queryset().not_in_orders()
def for_vendor_not_in_orders(self, vendor: "Vendor") -> AttributeValueQuerySet:
return self.get_queryset().for_vendor_not_in_orders(vendor)

View file

@ -52,7 +52,12 @@ from mptt.models import MPTTModel
from engine.core.abstract import NiceModel
from engine.core.choices import ORDER_PRODUCT_STATUS_CHOICES, ORDER_STATUS_CHOICES
from engine.core.errors import DisabledCommerceError, NotEnoughMoneyError
from engine.core.managers import AddressManager, ProductManager
from engine.core.managers import (
AddressManager,
AttributeValueManager,
ProductManager,
StockManager,
)
from engine.core.typing import FilterableAttribute
from engine.core.utils import (
generate_human_readable_id,
@ -569,6 +574,8 @@ class Stock(ExportModelOperationsMixin("stock"), NiceModel):
default=dict, verbose_name=_("system attributes"), blank=True
)
objects = StockManager()
def __str__(self) -> str:
return f"{self.vendor.name} - {self.product!s}"
@ -833,6 +840,8 @@ class AttributeValue(ExportModelOperationsMixin("attribute_value"), NiceModel):
help_text=_("the specific value for this attribute"),
)
objects = AttributeValueManager()
def __str__(self):
return f"{self.attribute!s}: {self.value}"

View file

@ -2,21 +2,26 @@ import gzip
import json
import logging
import time
from abc import ABC, abstractmethod
from contextlib import suppress
from datetime import datetime
from decimal import Decimal
from io import BytesIO
from math import ceil, log10
from typing import Any
from typing import TYPE_CHECKING, Any
from constance import config
from django.conf import settings
from django.core.files.base import ContentFile
from django.db import IntegrityError, transaction
from django.db.models import QuerySet
from django.db.utils import OperationalError
from engine.core.elasticsearch import process_system_query
from engine.core.managers import (
AttributeValueQuerySet,
ProductQuerySet,
StockQuerySet,
)
from engine.core.models import (
Attribute,
AttributeGroup,
@ -24,7 +29,6 @@ from engine.core.models import (
Brand,
Category,
Product,
ProductImage,
Stock,
Vendor,
)
@ -32,6 +36,9 @@ from engine.payments.errors import RatesError
from engine.payments.utils import get_rates
from schon.utils.misc import LoggingError, LogLevel
if TYPE_CHECKING:
from engine.core.models import OrderProduct
logger = logging.getLogger(__name__)
@ -87,9 +94,9 @@ class ProductUnapdatableError(VendorError):
pass
class AbstractVendor:
class AbstractVendor(ABC):
"""
Abstract class defining vendor-related operations and handling.
Abstract base class defining vendor-related operations and handling.
This class provides methods to manage and manipulate data related to a vendor
and its associated products, stocks, and attributes. These include utility
@ -97,12 +104,48 @@ class AbstractVendor:
specific markup percentages, retrieving vendor instances, fetching queryset
data for products and stocks, and performing bulk operations like updates or
deletions on inactive objects.
Subclasses must implement the following abstract methods:
- get_products(): Fetch products from vendor's API
- update_stock(): Synchronize product stock with vendor
- update_order_products_statuses(): Update order product statuses from vendor
- buy_order_product(): Process purchase of a digital product from vendor
Example usage:
class MyVendorIntegration(AbstractVendor):
def __init__(self):
super().__init__(vendor_name="MyVendor", currency="EUR")
def get_products(self) -> list[dict]:
# Fetch products from vendor API
return api_client.get_products()
def update_stock(self) -> None:
products = self.get_products()
self.prepare_for_stock_update()
# Process and save products...
self.delete_inactives()
def update_order_products_statuses(self) -> None:
# Check and update order statuses from vendor
pass
def buy_order_product(self, order_product: OrderProduct) -> None:
# Purchase digital product from vendor
pass
"""
#: Name of the vendor as stored in the database
vendor_name: str
#: Default currency for price operations
currency: str
#: List of attribute names to skip during processing
blocked_attributes: list[Any]
def __init__(self, vendor_name: str = "", currency: str = "USD") -> None:
self.vendor_name = vendor_name
self.currency = currency
self.blocked_attributes: list[Any] = []
self.blocked_attributes = []
def __str__(self) -> str:
vendor = self.get_vendor_instance(safe=True)
@ -383,6 +426,19 @@ class AbstractVendor:
return float(psychological)
def get_vendor_instance(self, safe: bool = False) -> Vendor | None:
"""
Retrieve the Vendor model instance for this integration.
Args:
safe: If True, return None instead of raising exceptions.
Returns:
The Vendor instance if found and active, None if safe=True and not found.
Raises:
VendorInactiveError: If vendor exists but is inactive (when safe=False).
Exception: If vendor does not exist (when safe=False).
"""
try:
vendor = Vendor.objects.get(name=self.vendor_name)
if vendor.is_active:
@ -397,80 +453,127 @@ class AbstractVendor:
f"No matching vendor found with name {self.vendor_name!r}..."
) from dne
@abstractmethod
def get_products(self) -> Any:
pass
"""
Fetch products from the vendor's external API or data source.
def get_products_queryset(self) -> QuerySet[Product]:
return Product.objects.filter(
stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True
)
This method should be implemented to retrieve product data from
the vendor's system. The format of returned data is vendor-specific.
def get_stocks_queryset(self) -> QuerySet[Stock]:
return Stock.objects.filter(
product__in=self.get_products_queryset(), product__orderproduct__isnull=True
)
Returns:
Product data in vendor-specific format (list, dict, etc.)
"""
...
def get_attribute_values_queryset(self) -> QuerySet[AttributeValue]:
return AttributeValue.objects.filter(
product__in=self.get_products_queryset(), product__orderproduct__isnull=True
)
def get_products_queryset(self) -> ProductQuerySet:
"""
Get a queryset of products associated with this vendor.
Returns products that:
- Have stocks from this vendor
- Are not part of any orders (safe to modify/delete)
"""
vendor = self.get_vendor_instance()
if not vendor:
return Product.objects.none() # type: ignore[return-value]
return Product.objects.for_vendor_not_in_orders(vendor)
def get_stocks_queryset(self) -> StockQuerySet:
"""
Get a queryset of stocks associated with this vendor.
Returns stocks that:
- Belong to this vendor
- Are for products not in any orders
"""
vendor = self.get_vendor_instance()
if not vendor:
return Stock.objects.none() # type: ignore[return-value]
return Stock.objects.for_vendor_not_in_orders(vendor)
def get_attribute_values_queryset(self) -> AttributeValueQuerySet:
"""
Get a queryset of attribute values for this vendor's products.
Returns attribute values for products that:
- Have stocks from this vendor
- Are not part of any orders
"""
vendor = self.get_vendor_instance()
if not vendor:
return AttributeValue.objects.none() # type: ignore[return-value]
return AttributeValue.objects.for_vendor_not_in_orders(vendor)
def prepare_for_stock_update(self, method: str = "deactivate") -> None:
"""
Prepare products for stock update by marking them for potential deletion.
This should be called before update_stock() to mark existing products.
Products that are re-activated during update_stock() will be kept;
those that remain marked will be cleaned up by delete_inactives().
Args:
method: How to mark products:
- "deactivate": Set is_active=False (default)
- "delete": Delete immediately (use with caution)
- "description": Mark with special description marker
"""
products = self.get_products_queryset()
if products is None:
if not products.exists():
return
# noinspection PyUnreachableCode
match method:
case "deactivate":
products.update(is_active=False)
products.mark_inactive()
case "delete":
products.delete()
case "description":
products.update(description="SCHON_DELETED_PRODUCT")
products.mark_for_deletion()
case _:
raise ValueError(f"Invalid method {method!r} for products update...")
def delete_inactives(
self, inactivation_method: str = "deactivate", size: int = 5000
) -> None:
filter_kwargs: dict[str, Any] = dict()
self, inactivation_method: str = "deactivate", batch_size: int = 5000
) -> int:
"""
Delete products that were marked during prepare_for_stock_update().
This should be called after update_stock() completes to clean up
products that no longer exist in the vendor's catalog.
Args:
inactivation_method: The method used in prepare_for_stock_update():
- "deactivate": Delete products with is_active=False
- "description": Delete products with deletion marker
batch_size: Number of products to delete per batch.
Returns:
Total number of products deleted.
"""
products_qs = self.get_products_queryset()
match inactivation_method:
case "deactivate":
filter_kwargs: dict[str, Any] = {"is_active": False}
products_qs = products_qs.filter(is_active=False)
case "description":
filter_kwargs: dict[str, Any] = {
"description__exact": "SCHON_DELETED_PRODUCT"
}
products_qs = products_qs.marked_for_deletion()
case _:
raise ValueError(
f"Invalid method {inactivation_method!r} for products cleaner..."
)
if filter_kwargs == {}:
raise ValueError("Invalid filter kwargs...")
while True:
products = self.get_products_queryset()
if products is None:
return
batch_ids = list(
products.filter(**filter_kwargs).values_list("pk", flat=True)[:size]
)
if not batch_ids:
break
with suppress(Exception):
AttributeValue.objects.filter(product_id__in=batch_ids).delete()
ProductImage.objects.filter(product_id__in=batch_ids).delete()
Product.objects.filter(pk__in=batch_ids).delete()
return products_qs.delete_with_related(batch_size=batch_size)
def delete_belongings(self) -> None:
self.get_products_queryset().delete()
self.get_stocks_queryset().delete()
"""
Delete all products, stocks, and attribute values for this vendor.
Warning: This is a destructive operation. Use with caution.
"""
self.get_attribute_values_queryset().delete()
self.get_stocks_queryset().delete()
self.get_products_queryset().delete()
def get_or_create_attribute_safe(
self, *, name: str, attr_group: AttributeGroup
@ -578,15 +681,87 @@ class AbstractVendor:
return av
def check_updatable(self, product: Product) -> None:
"""
Check if a product can be updated by vendor sync.
Args:
product: The product to check.
Raises:
ProductUnapdatableError: If the product is marked as non-updatable.
"""
if not product.is_updatable:
raise ProductUnapdatableError("Product %s is not updatable", product.sku)
@abstractmethod
def update_stock(self) -> None:
pass
"""
Synchronize product stock data from the vendor's system.
This method should:
1. Call prepare_for_stock_update() to mark existing products
2. Fetch current product data via get_products()
3. Create/update products, stocks, and attributes
4. Call delete_inactives() to remove stale products
Example implementation:
def update_stock(self) -> None:
self.prepare_for_stock_update(method="deactivate")
products_data = self.get_products()
for item in products_data:
# Process and save product...
pass
self.delete_inactives(inactivation_method="deactivate")
"""
...
@abstractmethod
def update_order_products_statuses(self) -> None:
pass
"""
Update the statuses of order products from the vendor's system.
This method should check the vendor's API for status updates
on pending orders and update the corresponding OrderProduct
records in the database.
Example implementation:
def update_order_products_statuses(self) -> None:
pending_orders = OrderProduct.objects.filter(
status="DELIVERING",
product__stocks__vendor__name=self.vendor_name
)
for order_product in pending_orders:
status = self.api.check_status(order_product.external_id)
if status == "delivered":
order_product.status = "FINISHED"
order_product.save()
"""
...
@abstractmethod
def buy_order_product(self, order_product: "OrderProduct") -> None:
"""
Process the purchase of a product from the vendor.
This method is called when a customer orders a digital product
that needs to be fulfilled through the vendor's API.
Args:
order_product: The OrderProduct instance to fulfill.
Example implementation:
def buy_order_product(self, order_product: OrderProduct) -> None:
result = self.api.purchase(
sku=order_product.product.sku,
quantity=order_product.quantity
)
order_product.external_id = result["order_id"]
order_product.status = "DELIVERING"
order_product.save()
"""
...
def delete_stale() -> None:
Product.objects.filter(stocks__isnull=True, orderproduct__isnull=True).delete()
"""Delete all stale products (products with no stocks and not in orders)."""
Product.objects.stale().delete()