schon/core/vendors/__init__.py
Egor fureunoir Gorbunov 2712ccdeb7 Features: 1) Add retry mechanism with exponential backoff for saving attributes to handle deadlocks.
Fixes: 1) Remove unused `traceback` import; 2) Add missing import for `OperationalError`; 3) Prevent redundant saves for unchanged attributes.

Extra: 1) Simplify attribute save logic and improve efficiency; 2) Code cleanup for better readability.
2025-10-19 01:44:28 +03:00

489 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gzip
import json
import time
from contextlib import suppress
from datetime import datetime
from decimal import Decimal
from io import BytesIO
from math import ceil, log10
from typing import Any
from celery.utils.log import get_task_logger
from constance import config
from django.conf import settings
from django.core.files.base import ContentFile
from django.db import IntegrityError, transaction
from django.db.models import QuerySet
from django.db.utils import OperationalError
from core.elasticsearch import process_system_query
from core.models import (
Attribute,
AttributeGroup,
AttributeValue,
Brand,
Category,
Product,
ProductImage,
Stock,
Vendor,
)
from payments.errors import RatesError
from payments.utils import get_rates
async_logger = get_task_logger(__name__)
class NotEnoughBalanceError(Exception):
"""
Custom exception raised when a financial operation exceeds
the available balance.
This exception is designed to enforce balance constraints on
operations such as withdrawals or payments, ensuring that
transactions do not cause the balance to go below the allowed
limit.
"""
pass
class WrongUserAttributesError(Exception):
"""
Exception class representing an error for incorrect user attributes.
This exception is raised when invalid or inconsistent attributes
are provided for a user during an operation. It can be used to
signal issues related to user data validation or parameter checks.
"""
pass
class VendorError(Exception):
"""
Exception class representing an error for vendor-related operations.
This exception is raised when unexpected output is received from Vendor API.
"""
pass
class VendorDebuggingError(VendorError):
"""
Custom exception raised when a debugging operation fails
"""
pass
class VendorInactiveError(VendorError):
pass
class AbstractVendor:
"""
Abstract class defining vendor-related operations and handling.
This class provides methods to manage and manipulate data related to a vendor
and its associated products, stocks, and attributes. These include utility
methods for chunking data, resolving prices based on vendor's or category's
specific markup percentages, retrieving vendor instances, fetching queryset
data for products and stocks, and performing bulk operations like updates or
deletions on inactive objects.
Attributes:
vendor_name (str | None): Name of the vendor associated with this class
instance.
"""
def __init__(self, vendor_name: str | None = None, currency: str = "USD") -> None:
self.vendor_name = vendor_name
self.currency = currency
self.blocked_attributes: list[Any] = []
def __str__(self) -> str:
return self.vendor_name or self.get_vendor_instance().name
def save_response(self, data: dict[Any, Any] | list[Any]) -> None:
with suppress(Exception):
if settings.DEBUG or config.SAVE_VENDORS_RESPONSES:
vendor_instance = self.get_vendor_instance()
if vendor_instance.last_processing_response:
with suppress(Exception):
vendor_instance.last_processing_response.delete(save=False)
json_data = json.dumps(data, indent=2, ensure_ascii=False, default=str)
json_bytes = json_data.encode("utf-8")
size_threshold = 1024 * 1024 # 1MB
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if len(json_bytes) > size_threshold:
buffer = BytesIO()
with gzip.GzipFile(fileobj=buffer, mode="wb", compresslevel=9) as gz_file:
gz_file.write(json_bytes)
compressed_data = buffer.getvalue()
filename = f"response_{timestamp}.json.gz"
content = ContentFile(compressed_data)
else:
filename = f"response_{timestamp}.json"
content = ContentFile(json_bytes)
vendor_instance.last_processing_response.save(filename, content, save=True)
return
raise VendorDebuggingError("Could not save response")
@staticmethod
def chunk_data(data: list[Any] | None = None, num_chunks: int = 20) -> list[list[Any]] | list[Any]:
if not data:
return []
total = len(data)
if total == 0:
return []
chunk_size = max(1, (total + num_chunks - 1) // num_chunks)
return [data[i : i + chunk_size] for i in range(0, total, chunk_size)]
@staticmethod
def auto_convert_value(value: Any) -> tuple[Any, str]:
"""
Attempts to convert a value to a more specific type.
Handles booleans, numbers, objects (dicts), and arrays (lists),
even when they are provided as strings.
Returns a tuple of (converted_value, type_label).
"""
# First, handle native types
if isinstance(value, bool):
return value, "boolean"
if isinstance(value, int):
return value, "integer"
if isinstance(value, float):
return value, "float"
if isinstance(value, dict):
# Convert dict to a JSON string for consistency in storage
return json.dumps(value), "object"
if isinstance(value, list):
# Similarly, convert a list to JSON string
return json.dumps(value), "array"
# Now, if it's a string, try to parse it further
if isinstance(value, str):
lower_val = value.lower().strip()
# Handle booleans in string form.
if lower_val == "true":
return True, "boolean"
if lower_val == "false":
return False, "boolean"
# Try integer conversion.
with suppress(ValueError):
int_val = int(value)
# Check that converting back to string gives the same value (avoid "100.0" issues).
if str(int_val) == value:
return int_val, "integer"
# Try float conversion.
with suppress(ValueError):
float_val = float(value)
return float_val, "float"
# Try to detect a JSON object or array.
stripped_value = value.strip()
if (stripped_value.startswith("{") and stripped_value.endswith("}")) or (
stripped_value.startswith("[") and stripped_value.endswith("]")
):
with suppress(Exception):
parsed = json.loads(value)
if isinstance(parsed, dict):
# Store as a JSON string for consistency.
return json.dumps(parsed), "object"
elif isinstance(parsed, list):
return json.dumps(parsed), "array"
# Default case: treat as a plain string.
return value, "string"
@staticmethod
def auto_resolver_helper(model: type[Brand] | type[Category], resolving_name: str) -> Brand | Category | None:
queryset = model.objects.filter(name=resolving_name)
if not queryset.exists():
if len(resolving_name) > 255:
resolving_name = resolving_name[:255]
return model.objects.get_or_create(name=resolving_name, defaults={"is_active": False})[0]
elif queryset.filter(is_active=True).count() > 1:
queryset = queryset.filter(is_active=True)
elif queryset.filter(is_active=False).count() > 1:
queryset = queryset.filter(is_active=False)
chosen = queryset.first()
if not chosen:
raise VendorError(f"No matching {model.__name__} found with name {resolving_name!r}...")
queryset = queryset.exclude(uuid=chosen.uuid)
queryset.delete()
return chosen
def auto_resolve_category(self, category_name: str = "") -> Category | None:
if category_name:
try:
search = process_system_query(query=category_name, indexes=("categories",))
uuid = search["categories"][0]["uuid"] if search else None
if uuid:
return Category.objects.get(uuid=uuid)
except KeyError:
pass
except IndexError:
pass
except Category.MultipleObjectsReturned:
pass
except Category.DoesNotExist:
pass
return self.auto_resolver_helper(Category, category_name)
def auto_resolve_brand(self, brand_name: str = "") -> Brand | None:
if brand_name:
try:
search = process_system_query(query=brand_name, indexes=("brands",))
uuid = search["brands"][0]["uuid"] if search else None
if uuid:
return Brand.objects.get(uuid=uuid)
except KeyError:
pass
except IndexError:
pass
except Brand.MultipleObjectsReturned:
pass
except Brand.DoesNotExist:
pass
return self.auto_resolver_helper(Brand, brand_name)
def resolve_price(
self,
original_price: int | float,
vendor: Vendor | None = None,
category: Category | None = None,
) -> float:
if not vendor:
vendor = self.get_vendor_instance()
if not category and not vendor:
raise ValueError("Either category or vendor must be provided.")
price = float(original_price)
if category and category.markup_percent:
price *= 1 + float(category.markup_percent) / 100.0
elif vendor and vendor.markup_percent:
price *= 1 + vendor.markup_percent / 100.0
return round(price, 2)
def resolve_price_with_currency(self, price: float | int | Decimal, provider: str, currency: str = "") -> float:
rates = get_rates(provider)
rate = rates.get(currency or self.currency) if rates else 1
if not rate:
raise RatesError(f"No rate found for {currency or self.currency} in {rates} with probider {provider}...")
return float(round(price / rate, 2)) if rate else float(round(price, 2)) # type: ignore [arg-type, operator]
@staticmethod
def round_price_marketologically(price: float) -> float:
"""
Marketological rounding with no cents:
- Prices < 1: leave exactly as-is.
- Prices ≥ 1: drop any fractional part, then
bump to the next 'psychological' threshold
at the correct order of magnitude and subtract 1.
E.g., 2.34 → 2 → 3 1 = 2
12.34 → 12 → 13 1 = 12
123.45 → 123 → 130 1 = 129
"""
if price < 1:
# sub-currency prices stay as they are
return round(price, 2)
# strip off any cents
whole = int(price)
# figure out the size:
# 10**0 = 1 for [19], 10**1 = 10 for [1099], 10**2 = 100 for [100999], etc.
size = 10 ** max(int(log10(whole)) - 1, 0)
# next multiple of that size
next_threshold = ceil(whole / size) * size
# step back 1 to land on a “9” ending
psychological = next_threshold - 1
return float(psychological)
def get_vendor_instance(self) -> Vendor | None:
try:
vendor = Vendor.objects.get(name=self.vendor_name)
if vendor.is_active:
return vendor
raise VendorInactiveError(f"Vendor {self.vendor_name!r} is inactive...")
except Vendor.DoesNotExist as dne:
raise Exception(f"No matching vendor found with name {self.vendor_name!r}...") from dne
def get_products(self) -> None:
pass
def get_products_queryset(self) -> QuerySet[Product]:
return Product.objects.filter(stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True)
def get_stocks_queryset(self) -> QuerySet[Stock]:
return Stock.objects.filter(product__in=self.get_products_queryset(), product__orderproduct__isnull=True)
def get_attribute_values_queryset(self) -> QuerySet[AttributeValue]:
return AttributeValue.objects.filter(
product__in=self.get_products_queryset(), product__orderproduct__isnull=True
)
def prepare_for_stock_update(self, method: str = "deactivate") -> None:
products = self.get_products_queryset()
if products is None:
return
# noinspection PyUnreachableCode
match method:
case "deactivate":
products.update(is_active=False)
case "delete":
products.delete()
case "description":
products.update(description="EVIBES_DELETED_PRODUCT")
case _:
raise ValueError(f"Invalid method {method!r} for products update...")
def delete_inactives(self, inactivation_method: str = "deactivate", size: int = 5000) -> None:
filter_kwargs: dict[str, Any] = dict()
match inactivation_method:
case "deactivate":
filter_kwargs: dict[str, Any] = {"is_active": False}
case "description":
filter_kwargs: dict[str, Any] = {"description__exact": "EVIBES_DELETED_PRODUCT"}
case _:
raise ValueError(f"Invalid method {inactivation_method!r} for products cleaner...")
if filter_kwargs == {}:
raise ValueError("Invalid filter kwargs...")
while True:
products = self.get_products_queryset()
if products is None:
return
batch_ids = list(products.filter(**filter_kwargs).values_list("pk", flat=True)[:size])
if not batch_ids:
break
with suppress(Exception):
AttributeValue.objects.filter(product_id__in=batch_ids).delete()
ProductImage.objects.filter(product_id__in=batch_ids).delete()
Product.objects.filter(pk__in=batch_ids).delete()
def delete_belongings(self) -> None:
self.get_products_queryset().delete()
self.get_stocks_queryset().delete()
self.get_attribute_values_queryset().delete()
def get_or_create_attribute_safe(self, *, name: str, attr_group: AttributeGroup) -> Attribute:
key = name[:255]
try:
attr = Attribute.objects.get(name=key)
except Attribute.DoesNotExist:
try:
with transaction.atomic():
attr = Attribute.objects.create(
name=key,
group=attr_group,
is_active=True,
value_type="string",
)
except IntegrityError:
attr = Attribute.objects.get(name=key)
if not attr.is_active:
attr.is_active = True
attr.save(update_fields=["is_active"])
return attr
def process_attribute(self, key: str, value: Any, product: Product, attr_group: AttributeGroup) -> None:
if not value:
async_logger.warning(f"No value for attribute {key!r} at {product.name!r}...")
return
if not attr_group:
async_logger.warning(f"No group for attribute {key!r} at {product.name!r}...")
return
if key in self.blocked_attributes:
return
value, attr_value_type = self.auto_convert_value(value)
is_created = False
if len(key) > 255:
key = key[:255]
try:
attribute, is_created = Attribute.objects.get_or_create(
name=key,
group=attr_group,
value_type=attr_value_type,
defaults={"is_active": True},
)
except Attribute.MultipleObjectsReturned:
attribute = Attribute.objects.filter(name=key, group=attr_group).order_by("uuid").first() # type: ignore [assignment]
fields_to_update: list[str] = []
if not attribute.is_active:
attribute.is_active = True
fields_to_update.append("is_active")
if attribute.value_type != attr_value_type:
attribute.value_type = attr_value_type
fields_to_update.append("value_type")
if fields_to_update:
for attempt in range(5):
try:
attribute.save(update_fields=fields_to_update)
break
except OperationalError as e:
if "deadlock detected" in str(e):
time.sleep(0.1 * (2**attempt))
continue
raise
except IntegrityError:
async_logger.warning(f"IntegrityError while processing attribute {key!r}...")
return
if not is_created:
return
AttributeValue.objects.get_or_create(
attribute=attribute,
value=value,
product=product,
defaults={"is_active": True},
)
def update_stock(self) -> None:
pass
def update_order_products_statuses(self) -> None:
pass
def delete_stale() -> None:
Product.objects.filter(stocks__isnull=True, orderproduct__isnull=True).delete()