From 0db5b9b712d698d20412ea8f033771b945719f6a Mon Sep 17 00:00:00 2001 From: Egor fureunoir Gorbunov Date: Wed, 28 Jan 2026 13:41:38 +0300 Subject: [PATCH] feat(vendor-management): introduce custom querysets and managers for streamlined vendor integration Add tailored querysets and managers for `Product`, `Stock`, and `AttributeValue` models to simplify vendor-specific operations. Abstract methods were added to `AbstractVendor` for handling essential vendor processes, ensuring clear API expectations. This change standardizes vendor-related queries, improves clarity, and ensures efficient bulk operations. --- engine/core/managers.py | 163 +++++++++++++++++- engine/core/models.py | 11 +- engine/core/vendors/__init__.py | 281 ++++++++++++++++++++++++++------ 3 files changed, 396 insertions(+), 59 deletions(-) diff --git a/engine/core/managers.py b/engine/core/managers.py index 6dae05a1..68562b8f 100644 --- a/engine/core/managers.py +++ b/engine/core/managers.py @@ -1,10 +1,15 @@ import logging +from typing import TYPE_CHECKING, Self import requests from constance import config from django.contrib.gis.geos import Point from django.db import models -from modeltranslation.manager import MultilingualManager +from django.db.models import QuerySet +from modeltranslation.manager import MultilingualManager, MultilingualQuerySet + +if TYPE_CHECKING: + from engine.core.models import AttributeValue, Product, Stock, Vendor # noqa: F401 logger = logging.getLogger(__name__) @@ -67,8 +72,11 @@ class AddressManager(models.Manager): )[0] -class ProductManager(MultilingualManager): - def available(self): +class ProductQuerySet(MultilingualQuerySet["Product"]): + """Custom QuerySet for Product with vendor-related operations.""" + + def available(self) -> Self: + """Filter to only available products.""" return self.filter( is_active=True, brand__is_active=True, @@ -77,7 +85,8 @@ class ProductManager(MultilingualManager): stocks__vendor__is_active=True, ) - def available_in_stock(self): + def available_in_stock(self) -> Self: + """Filter to available products with stock quantity > 0.""" return self.filter( is_active=True, brand__is_active=True, @@ -87,7 +96,151 @@ class ProductManager(MultilingualManager): stocks__quantity__gt=0, ) - def with_related(self): + def with_related(self) -> Self: + """Prefetch related objects for performance.""" return self.select_related("category", "brand").prefetch_related( "tags", "stocks", "images", "attributes__attribute__group" ) + + def stale(self) -> Self: + """Filter products with no orders and no stocks.""" + return self.filter(orderproduct__isnull=True, stocks__isnull=True) + + def for_vendor(self, vendor: "Vendor") -> Self: + """Filter products that have stocks from a specific vendor.""" + return self.filter(stocks__vendor=vendor) + + def not_in_orders(self) -> Self: + """Filter products that are not part of any order.""" + return self.filter(orderproduct__isnull=True) + + def for_vendor_not_in_orders(self, vendor: "Vendor") -> Self: + """Filter products for a vendor that are not in any orders.""" + return self.for_vendor(vendor).filter(orderproduct__isnull=True) + + def mark_inactive(self) -> int: + """Bulk update products to inactive status. Returns count updated.""" + return self.update(is_active=False) + + def mark_for_deletion(self, marker: str = "SCHON_DELETED_PRODUCT") -> int: + """Mark products for deletion by setting description. Returns count updated.""" + return self.update(description=marker) + + def marked_for_deletion(self, marker: str = "SCHON_DELETED_PRODUCT") -> Self: + """Get products marked for deletion.""" + return self.filter(description__exact=marker) + + def delete_with_related(self, batch_size: int = 5000) -> int: + """ + Delete products in batches along with their related objects. + + Returns the total number of products deleted. + """ + from engine.core.models import AttributeValue, ProductImage + + total_deleted = 0 + while True: + batch_ids = list(self.values_list("pk", flat=True)[:batch_size]) + if not batch_ids: + break + + AttributeValue.objects.filter(product_id__in=batch_ids).delete() + ProductImage.objects.filter(product_id__in=batch_ids).delete() + deleted_count, _ = self.model.objects.filter(pk__in=batch_ids).delete() + total_deleted += deleted_count + + return total_deleted + + +class ProductManager(MultilingualManager["Product"]): + """Manager for Product model with custom queryset methods.""" + + _queryset_class = ProductQuerySet + + def get_queryset(self) -> ProductQuerySet: + return ProductQuerySet(self.model, using=self._db) + + def available(self) -> ProductQuerySet: + return self.get_queryset().available() + + def available_in_stock(self) -> ProductQuerySet: + return self.get_queryset().available_in_stock() + + def with_related(self) -> ProductQuerySet: + return self.get_queryset().with_related() + + def stale(self) -> ProductQuerySet: + return self.get_queryset().stale() + + def for_vendor(self, vendor: "Vendor") -> ProductQuerySet: + return self.get_queryset().for_vendor(vendor) + + def not_in_orders(self) -> ProductQuerySet: + return self.get_queryset().not_in_orders() + + def for_vendor_not_in_orders(self, vendor: "Vendor") -> ProductQuerySet: + return self.get_queryset().for_vendor_not_in_orders(vendor) + + +class StockQuerySet(QuerySet["Stock"]): + """Custom QuerySet for Stock with vendor-related operations.""" + + def for_vendor(self, vendor: "Vendor") -> Self: + """Filter stocks for a specific vendor.""" + return self.filter(vendor=vendor) + + def not_in_orders(self) -> Self: + """Filter stocks whose products are not in any orders.""" + return self.filter(product__orderproduct__isnull=True) + + def for_vendor_not_in_orders(self, vendor: "Vendor") -> Self: + """Filter stocks for a vendor whose products are not in orders.""" + return self.for_vendor(vendor).filter(product__orderproduct__isnull=True) + + +class StockManager(models.Manager["Stock"]): + """Manager for Stock model with custom queryset methods.""" + + def get_queryset(self) -> StockQuerySet: + return StockQuerySet(self.model, using=self._db) + + def for_vendor(self, vendor: "Vendor") -> StockQuerySet: + return self.get_queryset().for_vendor(vendor) + + def not_in_orders(self) -> StockQuerySet: + return self.get_queryset().not_in_orders() + + def for_vendor_not_in_orders(self, vendor: "Vendor") -> StockQuerySet: + return self.get_queryset().for_vendor_not_in_orders(vendor) + + +class AttributeValueQuerySet(QuerySet["AttributeValue"]): + """Custom QuerySet for AttributeValue with vendor-related operations.""" + + def for_vendor(self, vendor: "Vendor") -> Self: + """Filter attribute values for products from a specific vendor.""" + return self.filter(product__stocks__vendor=vendor) + + def not_in_orders(self) -> Self: + """Filter attribute values whose products are not in any orders.""" + return self.filter(product__orderproduct__isnull=True) + + def for_vendor_not_in_orders(self, vendor: "Vendor") -> Self: + """Filter attribute values for a vendor whose products are not in orders.""" + return self.for_vendor(vendor).filter(product__orderproduct__isnull=True) + + +class AttributeValueManager(models.Manager["AttributeValue"]): + """Manager for AttributeValue model with custom queryset methods.""" + + def get_queryset(self) -> AttributeValueQuerySet: + return AttributeValueQuerySet(self.model, using=self._db) + + def for_vendor(self, vendor: "Vendor") -> AttributeValueQuerySet: + return self.get_queryset().for_vendor(vendor) + + def not_in_orders(self) -> AttributeValueQuerySet: + return self.get_queryset().not_in_orders() + + def for_vendor_not_in_orders(self, vendor: "Vendor") -> AttributeValueQuerySet: + return self.get_queryset().for_vendor_not_in_orders(vendor) diff --git a/engine/core/models.py b/engine/core/models.py index 15a54e30..20f905b0 100644 --- a/engine/core/models.py +++ b/engine/core/models.py @@ -52,7 +52,12 @@ from mptt.models import MPTTModel from engine.core.abstract import NiceModel from engine.core.choices import ORDER_PRODUCT_STATUS_CHOICES, ORDER_STATUS_CHOICES from engine.core.errors import DisabledCommerceError, NotEnoughMoneyError -from engine.core.managers import AddressManager, ProductManager +from engine.core.managers import ( + AddressManager, + AttributeValueManager, + ProductManager, + StockManager, +) from engine.core.typing import FilterableAttribute from engine.core.utils import ( generate_human_readable_id, @@ -569,6 +574,8 @@ class Stock(ExportModelOperationsMixin("stock"), NiceModel): default=dict, verbose_name=_("system attributes"), blank=True ) + objects = StockManager() + def __str__(self) -> str: return f"{self.vendor.name} - {self.product!s}" @@ -833,6 +840,8 @@ class AttributeValue(ExportModelOperationsMixin("attribute_value"), NiceModel): help_text=_("the specific value for this attribute"), ) + objects = AttributeValueManager() + def __str__(self): return f"{self.attribute!s}: {self.value}" diff --git a/engine/core/vendors/__init__.py b/engine/core/vendors/__init__.py index 585df0fd..876bbd11 100644 --- a/engine/core/vendors/__init__.py +++ b/engine/core/vendors/__init__.py @@ -2,21 +2,26 @@ import gzip import json import logging import time +from abc import ABC, abstractmethod from contextlib import suppress from datetime import datetime from decimal import Decimal from io import BytesIO from math import ceil, log10 -from typing import Any +from typing import TYPE_CHECKING, Any from constance import config from django.conf import settings from django.core.files.base import ContentFile from django.db import IntegrityError, transaction -from django.db.models import QuerySet from django.db.utils import OperationalError from engine.core.elasticsearch import process_system_query +from engine.core.managers import ( + AttributeValueQuerySet, + ProductQuerySet, + StockQuerySet, +) from engine.core.models import ( Attribute, AttributeGroup, @@ -24,7 +29,6 @@ from engine.core.models import ( Brand, Category, Product, - ProductImage, Stock, Vendor, ) @@ -32,6 +36,9 @@ from engine.payments.errors import RatesError from engine.payments.utils import get_rates from schon.utils.misc import LoggingError, LogLevel +if TYPE_CHECKING: + from engine.core.models import OrderProduct + logger = logging.getLogger(__name__) @@ -87,9 +94,9 @@ class ProductUnapdatableError(VendorError): pass -class AbstractVendor: +class AbstractVendor(ABC): """ - Abstract class defining vendor-related operations and handling. + Abstract base class defining vendor-related operations and handling. This class provides methods to manage and manipulate data related to a vendor and its associated products, stocks, and attributes. These include utility @@ -97,12 +104,48 @@ class AbstractVendor: specific markup percentages, retrieving vendor instances, fetching queryset data for products and stocks, and performing bulk operations like updates or deletions on inactive objects. + + Subclasses must implement the following abstract methods: + - get_products(): Fetch products from vendor's API + - update_stock(): Synchronize product stock with vendor + - update_order_products_statuses(): Update order product statuses from vendor + - buy_order_product(): Process purchase of a digital product from vendor + + Example usage: + class MyVendorIntegration(AbstractVendor): + def __init__(self): + super().__init__(vendor_name="MyVendor", currency="EUR") + + def get_products(self) -> list[dict]: + # Fetch products from vendor API + return api_client.get_products() + + def update_stock(self) -> None: + products = self.get_products() + self.prepare_for_stock_update() + # Process and save products... + self.delete_inactives() + + def update_order_products_statuses(self) -> None: + # Check and update order statuses from vendor + pass + + def buy_order_product(self, order_product: OrderProduct) -> None: + # Purchase digital product from vendor + pass """ + #: Name of the vendor as stored in the database + vendor_name: str + #: Default currency for price operations + currency: str + #: List of attribute names to skip during processing + blocked_attributes: list[Any] + def __init__(self, vendor_name: str = "", currency: str = "USD") -> None: self.vendor_name = vendor_name self.currency = currency - self.blocked_attributes: list[Any] = [] + self.blocked_attributes = [] def __str__(self) -> str: vendor = self.get_vendor_instance(safe=True) @@ -383,6 +426,19 @@ class AbstractVendor: return float(psychological) def get_vendor_instance(self, safe: bool = False) -> Vendor | None: + """ + Retrieve the Vendor model instance for this integration. + + Args: + safe: If True, return None instead of raising exceptions. + + Returns: + The Vendor instance if found and active, None if safe=True and not found. + + Raises: + VendorInactiveError: If vendor exists but is inactive (when safe=False). + Exception: If vendor does not exist (when safe=False). + """ try: vendor = Vendor.objects.get(name=self.vendor_name) if vendor.is_active: @@ -397,80 +453,127 @@ class AbstractVendor: f"No matching vendor found with name {self.vendor_name!r}..." ) from dne + @abstractmethod def get_products(self) -> Any: - pass + """ + Fetch products from the vendor's external API or data source. - def get_products_queryset(self) -> QuerySet[Product]: - return Product.objects.filter( - stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True - ) + This method should be implemented to retrieve product data from + the vendor's system. The format of returned data is vendor-specific. - def get_stocks_queryset(self) -> QuerySet[Stock]: - return Stock.objects.filter( - product__in=self.get_products_queryset(), product__orderproduct__isnull=True - ) + Returns: + Product data in vendor-specific format (list, dict, etc.) + """ + ... - def get_attribute_values_queryset(self) -> QuerySet[AttributeValue]: - return AttributeValue.objects.filter( - product__in=self.get_products_queryset(), product__orderproduct__isnull=True - ) + def get_products_queryset(self) -> ProductQuerySet: + """ + Get a queryset of products associated with this vendor. + + Returns products that: + - Have stocks from this vendor + - Are not part of any orders (safe to modify/delete) + """ + vendor = self.get_vendor_instance() + if not vendor: + return Product.objects.none() # type: ignore[return-value] + return Product.objects.for_vendor_not_in_orders(vendor) + + def get_stocks_queryset(self) -> StockQuerySet: + """ + Get a queryset of stocks associated with this vendor. + + Returns stocks that: + - Belong to this vendor + - Are for products not in any orders + """ + vendor = self.get_vendor_instance() + if not vendor: + return Stock.objects.none() # type: ignore[return-value] + return Stock.objects.for_vendor_not_in_orders(vendor) + + def get_attribute_values_queryset(self) -> AttributeValueQuerySet: + """ + Get a queryset of attribute values for this vendor's products. + + Returns attribute values for products that: + - Have stocks from this vendor + - Are not part of any orders + """ + vendor = self.get_vendor_instance() + if not vendor: + return AttributeValue.objects.none() # type: ignore[return-value] + return AttributeValue.objects.for_vendor_not_in_orders(vendor) def prepare_for_stock_update(self, method: str = "deactivate") -> None: + """ + Prepare products for stock update by marking them for potential deletion. + + This should be called before update_stock() to mark existing products. + Products that are re-activated during update_stock() will be kept; + those that remain marked will be cleaned up by delete_inactives(). + + Args: + method: How to mark products: + - "deactivate": Set is_active=False (default) + - "delete": Delete immediately (use with caution) + - "description": Mark with special description marker + """ products = self.get_products_queryset() - if products is None: + if not products.exists(): return - # noinspection PyUnreachableCode match method: case "deactivate": - products.update(is_active=False) + products.mark_inactive() case "delete": products.delete() case "description": - products.update(description="SCHON_DELETED_PRODUCT") + products.mark_for_deletion() case _: raise ValueError(f"Invalid method {method!r} for products update...") def delete_inactives( - self, inactivation_method: str = "deactivate", size: int = 5000 - ) -> None: - filter_kwargs: dict[str, Any] = dict() + self, inactivation_method: str = "deactivate", batch_size: int = 5000 + ) -> int: + """ + Delete products that were marked during prepare_for_stock_update(). + + This should be called after update_stock() completes to clean up + products that no longer exist in the vendor's catalog. + + Args: + inactivation_method: The method used in prepare_for_stock_update(): + - "deactivate": Delete products with is_active=False + - "description": Delete products with deletion marker + batch_size: Number of products to delete per batch. + + Returns: + Total number of products deleted. + """ + products_qs = self.get_products_queryset() match inactivation_method: case "deactivate": - filter_kwargs: dict[str, Any] = {"is_active": False} + products_qs = products_qs.filter(is_active=False) case "description": - filter_kwargs: dict[str, Any] = { - "description__exact": "SCHON_DELETED_PRODUCT" - } + products_qs = products_qs.marked_for_deletion() case _: raise ValueError( f"Invalid method {inactivation_method!r} for products cleaner..." ) - if filter_kwargs == {}: - raise ValueError("Invalid filter kwargs...") - - while True: - products = self.get_products_queryset() - - if products is None: - return - - batch_ids = list( - products.filter(**filter_kwargs).values_list("pk", flat=True)[:size] - ) - if not batch_ids: - break - with suppress(Exception): - AttributeValue.objects.filter(product_id__in=batch_ids).delete() - ProductImage.objects.filter(product_id__in=batch_ids).delete() - Product.objects.filter(pk__in=batch_ids).delete() + return products_qs.delete_with_related(batch_size=batch_size) def delete_belongings(self) -> None: - self.get_products_queryset().delete() - self.get_stocks_queryset().delete() + """ + Delete all products, stocks, and attribute values for this vendor. + + Warning: This is a destructive operation. Use with caution. + """ self.get_attribute_values_queryset().delete() + self.get_stocks_queryset().delete() + self.get_products_queryset().delete() def get_or_create_attribute_safe( self, *, name: str, attr_group: AttributeGroup @@ -578,15 +681,87 @@ class AbstractVendor: return av def check_updatable(self, product: Product) -> None: + """ + Check if a product can be updated by vendor sync. + + Args: + product: The product to check. + + Raises: + ProductUnapdatableError: If the product is marked as non-updatable. + """ if not product.is_updatable: raise ProductUnapdatableError("Product %s is not updatable", product.sku) + @abstractmethod def update_stock(self) -> None: - pass + """ + Synchronize product stock data from the vendor's system. + This method should: + 1. Call prepare_for_stock_update() to mark existing products + 2. Fetch current product data via get_products() + 3. Create/update products, stocks, and attributes + 4. Call delete_inactives() to remove stale products + + Example implementation: + def update_stock(self) -> None: + self.prepare_for_stock_update(method="deactivate") + products_data = self.get_products() + for item in products_data: + # Process and save product... + pass + self.delete_inactives(inactivation_method="deactivate") + """ + ... + + @abstractmethod def update_order_products_statuses(self) -> None: - pass + """ + Update the statuses of order products from the vendor's system. + + This method should check the vendor's API for status updates + on pending orders and update the corresponding OrderProduct + records in the database. + + Example implementation: + def update_order_products_statuses(self) -> None: + pending_orders = OrderProduct.objects.filter( + status="DELIVERING", + product__stocks__vendor__name=self.vendor_name + ) + for order_product in pending_orders: + status = self.api.check_status(order_product.external_id) + if status == "delivered": + order_product.status = "FINISHED" + order_product.save() + """ + ... + + @abstractmethod + def buy_order_product(self, order_product: "OrderProduct") -> None: + """ + Process the purchase of a product from the vendor. + + This method is called when a customer orders a digital product + that needs to be fulfilled through the vendor's API. + + Args: + order_product: The OrderProduct instance to fulfill. + + Example implementation: + def buy_order_product(self, order_product: OrderProduct) -> None: + result = self.api.purchase( + sku=order_product.product.sku, + quantity=order_product.quantity + ) + order_product.external_id = result["order_id"] + order_product.status = "DELIVERING" + order_product.save() + """ + ... def delete_stale() -> None: - Product.objects.filter(stocks__isnull=True, orderproduct__isnull=True).delete() + """Delete all stale products (products with no stocks and not in orders).""" + Product.objects.stale().delete()