import gzip import json import logging import time from contextlib import suppress from datetime import datetime from decimal import Decimal from io import BytesIO from math import ceil, log10 from typing import Any from celery import current_task from celery.utils.log import get_task_logger from constance import config from django.conf import settings from django.core.files.base import ContentFile from django.db import IntegrityError, transaction from django.db.models import QuerySet from django.db.utils import OperationalError from core.elasticsearch import process_system_query from core.models import ( Attribute, AttributeGroup, AttributeValue, Brand, Category, Product, ProductImage, Stock, Vendor, ) from evibes.utils.misc import LogLevel from payments.errors import RatesError from payments.utils import get_rates async_logger = get_task_logger(__name__) logger = logging.getLogger("django") class NotEnoughBalanceError(Exception): """ Custom exception raised when a financial operation exceeds the available balance. This exception is designed to enforce balance constraints on operations such as withdrawals or payments, ensuring that transactions do not cause the balance to go below the allowed limit. """ pass class WrongUserAttributesError(Exception): """ Exception class representing an error for incorrect user attributes. This exception is raised when invalid or inconsistent attributes are provided for a user during an operation. It can be used to signal issues related to user data validation or parameter checks. """ pass class VendorError(Exception): """ Exception class representing an error for vendor-related operations. This exception is raised when unexpected output is received from Vendor API. """ pass class VendorDebuggingError(VendorError): """ Custom exception raised when a debugging operation fails """ pass class VendorInactiveError(VendorError): pass class AbstractVendor: """ Abstract class defining vendor-related operations and handling. This class provides methods to manage and manipulate data related to a vendor and its associated products, stocks, and attributes. These include utility methods for chunking data, resolving prices based on vendor's or category's specific markup percentages, retrieving vendor instances, fetching queryset data for products and stocks, and performing bulk operations like updates or deletions on inactive objects. Attributes: vendor_name (str | None): Name of the vendor associated with this class instance. """ def __init__(self, vendor_name: str | None = None, currency: str = "USD") -> None: self.vendor_name = vendor_name self.currency = currency self.blocked_attributes: list[Any] = [] self.log(LogLevel.INFO, f"Initializing {self}...") def __str__(self) -> str: return self.get_vendor_instance(safe=True).name if self.get_vendor_instance() else self.vendor_name def log(self, level: LogLevel, message: str) -> None: is_celery_runtime = False with suppress(Exception): is_celery_runtime = bool(getattr(current_task, "request", None)) current_logger = async_logger if is_celery_runtime else logger match level: case LogLevel.DEBUG: if settings.DEBUG: current_logger.debug(message) case LogLevel.TRACE: if settings.DEBUG: current_logger.debug(f"[TRACE] {message}") case LogLevel.INFO: current_logger.info(message) case LogLevel.WARNING: current_logger.warning(message) case LogLevel.ERROR: current_logger.error(message) case LogLevel.CRITICAL: current_logger.critical(message) case _: current_logger.info(message) def save_response(self, data: dict[Any, Any] | list[Any]) -> None: with suppress(Exception): if settings.DEBUG or config.SAVE_VENDORS_RESPONSES: vendor_instance = self.get_vendor_instance() if not vendor_instance: return if vendor_instance.last_processing_response: with suppress(Exception): vendor_instance.last_processing_response.delete(save=False) json_data = json.dumps(data, indent=2, ensure_ascii=False, default=str) json_bytes = json_data.encode("utf-8") size_threshold = 1024 * 1024 # 1MB timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if len(json_bytes) > size_threshold: buffer = BytesIO() with gzip.GzipFile(fileobj=buffer, mode="wb", compresslevel=9) as gz_file: gz_file.write(json_bytes) compressed_data = buffer.getvalue() filename = f"response_{timestamp}.json.gz" content = ContentFile(compressed_data) else: filename = f"response_{timestamp}.json" content = ContentFile(json_bytes) vendor_instance.last_processing_response.save(filename, content, save=True) return raise VendorDebuggingError("Could not save response") @staticmethod def chunk_data(data: list[Any] | None = None, num_chunks: int = 20) -> list[list[Any]] | list[Any]: if not data: return [] total = len(data) if total == 0: return [] chunk_size = max(1, (total + num_chunks - 1) // num_chunks) return [data[i : i + chunk_size] for i in range(0, total, chunk_size)] @staticmethod def auto_convert_value(value: Any) -> tuple[Any, str]: """ Attempts to convert a value to a more specific type. Handles booleans, numbers, objects (dicts), and arrays (lists), even when they are provided as strings. Returns a tuple of (converted_value, type_label). """ # First, handle native types if isinstance(value, bool): return value, "boolean" if isinstance(value, int): return value, "integer" if isinstance(value, float): return value, "float" if isinstance(value, dict): # Convert dict to a JSON string for consistency in storage return json.dumps(value), "object" if isinstance(value, list): # Similarly, convert a list to JSON string return json.dumps(value), "array" # Now, if it's a string, try to parse it further if isinstance(value, str): lower_val = value.lower().strip() # Handle booleans in string form. if lower_val == "true": return True, "boolean" if lower_val == "false": return False, "boolean" # Try integer conversion. with suppress(ValueError): int_val = int(value) # Check that converting back to string gives the same value (avoid "100.0" issues). if str(int_val) == value: return int_val, "integer" # Try float conversion. with suppress(ValueError): float_val = float(value) return float_val, "float" # Try to detect a JSON object or array. stripped_value = value.strip() if (stripped_value.startswith("{") and stripped_value.endswith("}")) or ( stripped_value.startswith("[") and stripped_value.endswith("]") ): with suppress(Exception): parsed = json.loads(value) if isinstance(parsed, dict): # Store as a JSON string for consistency. return json.dumps(parsed), "object" elif isinstance(parsed, list): return json.dumps(parsed), "array" # Default case: treat as a plain string. return value, "string" @staticmethod def auto_resolver_helper(model: type[Brand] | type[Category], resolving_name: str) -> Brand | Category | None: queryset = model.objects.filter(name=resolving_name) if not queryset.exists(): if len(resolving_name) > 255: resolving_name = resolving_name[:255] return model.objects.get_or_create(name=resolving_name, defaults={"is_active": False})[0] elif queryset.filter(is_active=True).count() > 1: queryset = queryset.filter(is_active=True) elif queryset.filter(is_active=False).count() > 1: queryset = queryset.filter(is_active=False) chosen = queryset.first() if not chosen: raise VendorError(f"No matching {model.__name__} found with name {resolving_name!r}...") queryset = queryset.exclude(uuid=chosen.uuid) queryset.delete() return chosen def auto_resolve_category(self, category_name: str = "") -> Category | None: if category_name: try: search = process_system_query(query=category_name, indexes=("categories",)) uuid = search["categories"][0]["uuid"] if search else None if uuid: return Category.objects.get(uuid=uuid) except KeyError: pass except IndexError: pass except Category.MultipleObjectsReturned: pass except Category.DoesNotExist: pass return self.auto_resolver_helper(Category, category_name) def auto_resolve_brand(self, brand_name: str = "") -> Brand | None: if brand_name: try: search = process_system_query(query=brand_name, indexes=("brands",)) uuid = search["brands"][0]["uuid"] if search else None if uuid: return Brand.objects.get(uuid=uuid) except KeyError: pass except IndexError: pass except Brand.MultipleObjectsReturned: pass except Brand.DoesNotExist: pass return self.auto_resolver_helper(Brand, brand_name) def resolve_price( self, original_price: int | float, vendor: Vendor | None = None, category: Category | None = None, ) -> float: if not vendor: vendor = self.get_vendor_instance() if not category and not vendor: raise ValueError("Either category or vendor must be provided.") price = float(original_price) if category and category.markup_percent: price *= 1 + float(category.markup_percent) / 100.0 elif vendor and vendor.markup_percent: price *= 1 + vendor.markup_percent / 100.0 return round(price, 2) def resolve_price_with_currency(self, price: float | int | Decimal, provider: str, currency: str = "") -> float: if currency == self.currency or currency == settings.CURRENCY_CODE: return float(price) rates = get_rates(provider) rate = rates.get(currency or self.currency) if rates else 1 if not rate: raise RatesError(f"No rate found for {currency or self.currency} in {rates} with probider {provider}...") return float(round(price / rate, 2)) if rate else float(round(price, 2)) # type: ignore [arg-type, operator] @staticmethod def round_price_marketologically(price: float) -> float: """ Marketological rounding with no cents: - Prices < 1: leave exactly as-is. - Prices ≥ 1: drop any fractional part, then bump to the next 'psychological' threshold at the correct order of magnitude and subtract 1. E.g., 2.34 → 2 → 3 – 1 = 2 12.34 → 12 → 13 – 1 = 12 123.45 → 123 → 130 – 1 = 129 """ if price < 1: # sub-currency prices stay as they are return round(price, 2) # strip off any cents whole = int(price) # figure out the size: # 10**0 = 1 for [1–9], 10**1 = 10 for [10–99], 10**2 = 100 for [100–999], etc. size = 10 ** max(int(log10(whole)) - 1, 0) # next multiple of that size next_threshold = ceil(whole / size) * size # step back 1 to land on a “9” ending psychological = next_threshold - 1 return float(psychological) def get_vendor_instance(self, safe: bool = False) -> Vendor | None: try: vendor = Vendor.objects.get(name=self.vendor_name) if vendor.is_active: return vendor if safe: return None raise VendorInactiveError(f"Vendor {self.vendor_name!r} is inactive...") except Vendor.DoesNotExist as dne: if safe: return None raise Exception(f"No matching vendor found with name {self.vendor_name!r}...") from dne def get_products(self) -> None: pass def get_products_queryset(self) -> QuerySet[Product]: return Product.objects.filter(stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True) def get_stocks_queryset(self) -> QuerySet[Stock]: return Stock.objects.filter(product__in=self.get_products_queryset(), product__orderproduct__isnull=True) def get_attribute_values_queryset(self) -> QuerySet[AttributeValue]: return AttributeValue.objects.filter( product__in=self.get_products_queryset(), product__orderproduct__isnull=True ) def prepare_for_stock_update(self, method: str = "deactivate") -> None: products = self.get_products_queryset() if products is None: return # noinspection PyUnreachableCode match method: case "deactivate": products.update(is_active=False) case "delete": products.delete() case "description": products.update(description="EVIBES_DELETED_PRODUCT") case _: raise ValueError(f"Invalid method {method!r} for products update...") def delete_inactives(self, inactivation_method: str = "deactivate", size: int = 5000) -> None: filter_kwargs: dict[str, Any] = dict() match inactivation_method: case "deactivate": filter_kwargs: dict[str, Any] = {"is_active": False} case "description": filter_kwargs: dict[str, Any] = {"description__exact": "EVIBES_DELETED_PRODUCT"} case _: raise ValueError(f"Invalid method {inactivation_method!r} for products cleaner...") if filter_kwargs == {}: raise ValueError("Invalid filter kwargs...") while True: products = self.get_products_queryset() if products is None: return batch_ids = list(products.filter(**filter_kwargs).values_list("pk", flat=True)[:size]) if not batch_ids: break with suppress(Exception): AttributeValue.objects.filter(product_id__in=batch_ids).delete() ProductImage.objects.filter(product_id__in=batch_ids).delete() Product.objects.filter(pk__in=batch_ids).delete() def delete_belongings(self) -> None: self.get_products_queryset().delete() self.get_stocks_queryset().delete() self.get_attribute_values_queryset().delete() def get_or_create_attribute_safe(self, *, name: str, attr_group: AttributeGroup) -> Attribute: key = name[:255] try: attr = Attribute.objects.get(name=key) except Attribute.DoesNotExist: try: with transaction.atomic(): attr = Attribute.objects.create( name=key, group=attr_group, is_active=True, value_type="string", ) except IntegrityError: attr = Attribute.objects.get(name=key) if not attr.is_active: attr.is_active = True attr.save(update_fields=["is_active"]) return attr def process_attribute(self, key: str, value: Any, product: Product, attr_group: AttributeGroup) -> None: if not value: self.log(LogLevel.WARNING, f"No value for attribute {key!r} at {product.name!r}...") return if not attr_group: self.log(LogLevel.WARNING, f"No group for attribute {key!r} at {product.name!r}...") return if key in self.blocked_attributes: return value, attr_value_type = self.auto_convert_value(value) is_created = False if len(key) > 255: key = key[:255] try: attribute, is_created = Attribute.objects.get_or_create( name=key, group=attr_group, value_type=attr_value_type, defaults={"is_active": True}, ) except Attribute.MultipleObjectsReturned: attribute = Attribute.objects.filter(name=key, group=attr_group).order_by("uuid").first() # type: ignore [assignment] fields_to_update: list[str] = [] if not attribute.is_active: attribute.is_active = True fields_to_update.append("is_active") if attribute.value_type != attr_value_type: attribute.value_type = attr_value_type fields_to_update.append("value_type") if fields_to_update: for attempt in range(5): try: attribute.save(update_fields=fields_to_update) break except OperationalError as e: if "deadlock detected" in str(e): time.sleep(0.1 * (2**attempt)) continue raise except IntegrityError: self.log(LogLevel.WARNING, f"IntegrityError while processing attribute {key!r}...") return if not is_created: return AttributeValue.objects.get_or_create( attribute=attribute, value=value, product=product, defaults={"is_active": True}, ) def update_stock(self) -> None: pass def update_order_products_statuses(self) -> None: pass def delete_stale() -> None: Product.objects.filter(stocks__isnull=True, orderproduct__isnull=True).delete()