import json from contextlib import suppress from decimal import Decimal from math import ceil, log10 from typing import Any from django.db import IntegrityError, transaction from core.elasticsearch import process_system_query from core.models import ( Attribute, AttributeGroup, AttributeValue, Brand, Category, Product, ProductImage, Stock, Vendor, ) from payments.errors import RatesError from payments.utils import get_rates class NotEnoughBalanceError(Exception): """ Custom exception raised when a financial operation exceeds the available balance. This exception is designed to enforce balance constraints on operations such as withdrawals or payments, ensuring that transactions do not cause the balance to go below the allowed limit. """ pass class WrongUserAttributesError(Exception): """ Exception class representing an error for incorrect user attributes. This exception is raised when invalid or inconsistent attributes are provided for a user during an operation. It can be used to signal issues related to user data validation or parameter checks. """ pass class VendorError(Exception): """ Exception class representing an error for vendor-related operations. This exception is raised when unexpected output is received from Vendor API. """ pass class VendorInactiveError(VendorError): pass class AbstractVendor: """ Abstract class defining vendor-related operations and handling. This class provides methods to manage and manipulate data related to a vendor and its associated products, stocks, and attributes. These include utility methods for chunking data, resolving prices based on vendor's or category's specific markup percentages, retrieving vendor instances, fetching queryset data for products and stocks, and performing bulk operations like updates or deletions on inactive objects. Attributes: vendor_name (str | None): Name of the vendor associated with this class instance. """ def __init__(self, vendor_name: str | None = None, currency: str = "USD") -> None: self.vendor_name = vendor_name self.currency = currency self.blocked_attributes: list[Any] = [] @staticmethod def chunk_data(data: list[Any] | None = None, num_chunks: int = 20) -> list[list[Any]] | list[Any]: if not data: return [] total = len(data) if total == 0: return [] chunk_size = max(1, (total + num_chunks - 1) // num_chunks) return [data[i : i + chunk_size] for i in range(0, total, chunk_size)] @staticmethod def auto_convert_value(value: Any) -> tuple[Any, str]: """ Attempts to convert a value to a more specific type. Handles booleans, numbers, objects (dicts), and arrays (lists), even when they are provided as strings. Returns a tuple of (converted_value, type_label). """ # First, handle native types if isinstance(value, bool): return value, "boolean" if isinstance(value, int): return value, "integer" if isinstance(value, float): return value, "float" if isinstance(value, dict): # Convert dict to a JSON string for consistency in storage return json.dumps(value), "object" if isinstance(value, list): # Similarly, convert a list to JSON string return json.dumps(value), "array" # Now, if it's a string, try to parse it further if isinstance(value, str): lower_val = value.lower().strip() # Handle booleans in string form. if lower_val == "true": return True, "boolean" if lower_val == "false": return False, "boolean" # Try integer conversion. with suppress(ValueError): int_val = int(value) # Check that converting back to string gives the same value (avoid "100.0" issues). if str(int_val) == value: return int_val, "integer" # Try float conversion. with suppress(ValueError): float_val = float(value) return float_val, "float" # Try to detect a JSON object or array. stripped_value = value.strip() if (stripped_value.startswith("{") and stripped_value.endswith("}")) or ( stripped_value.startswith("[") and stripped_value.endswith("]") ): with suppress(Exception): parsed = json.loads(value) if isinstance(parsed, dict): # Store as a JSON string for consistency. return json.dumps(parsed), "object" elif isinstance(parsed, list): return json.dumps(parsed), "array" # Default case: treat as a plain string. return value, "string" @staticmethod def auto_resolver_helper(model: type[Brand] | type[Category], resolving_name: str) -> Brand | Category | None: queryset = model.objects.filter(name=resolving_name) if not queryset.exists(): if len(resolving_name) > 255: resolving_name = resolving_name[:255] return model.objects.get_or_create(name=resolving_name, defaults={"is_active": False})[0] elif queryset.filter(is_active=True).count() > 1: queryset = queryset.filter(is_active=True) elif queryset.filter(is_active=False).count() > 1: queryset = queryset.filter(is_active=False) chosen = queryset.first() if not chosen: raise VendorError(f"No matching {model.__name__} found with name {resolving_name!r}...") queryset = queryset.exclude(uuid=chosen.uuid) queryset.delete() return chosen def auto_resolve_category(self, category_name: str = "") -> Category | None: if category_name: try: search = process_system_query(query=category_name, indexes=("categories",)) uuid = search["categories"][0]["uuid"] if search else None if uuid: return Category.objects.get(uuid=uuid) except KeyError: pass except IndexError: pass except Category.MultipleObjectsReturned: pass except Category.DoesNotExist: pass return self.auto_resolver_helper(Category, category_name) def auto_resolve_brand(self, brand_name: str = "") -> Brand | None: if brand_name: try: search = process_system_query(query=brand_name, indexes=("brands",)) uuid = search["brands"][0]["uuid"] if search else None if uuid: return Brand.objects.get(uuid=uuid) except KeyError: pass except IndexError: pass except Brand.MultipleObjectsReturned: pass except Brand.DoesNotExist: pass return self.auto_resolver_helper(Brand, brand_name) def resolve_price( self, original_price: int | float, vendor: Vendor | None = None, category: Category | None = None, ) -> float: if not vendor: vendor = self.get_vendor_instance() if not category and not vendor: raise ValueError("Either category or vendor must be provided.") price = float(original_price) if category and category.markup_percent: price *= 1 + float(category.markup_percent) / 100.0 elif vendor and vendor.markup_percent: price *= 1 + vendor.markup_percent / 100.0 return round(price, 2) def resolve_price_with_currency(self, price: float | int | Decimal, provider: str, currency: str = "") -> float: rates = get_rates(provider) rate = rates.get(currency or self.currency) if not rate: raise RatesError(f"No rate found for {currency or self.currency} in {rates} with probider {provider}...") return float(round(price / rate, 2)) if rate else float(round(price, 2)) @staticmethod def round_price_marketologically(price: float) -> float: """ Marketological rounding with no cents: - Prices < 1: leave exactly as-is. - Prices ≥ 1: drop any fractional part, then bump to the next 'psychological' threshold at the correct order of magnitude and subtract 1. E.g., 2.34 → 2 → 3 – 1 = 2 12.34 → 12 → 13 – 1 = 12 123.45 → 123 → 130 – 1 = 129 """ if price < 1: # sub-currency prices stay as they are return round(price, 2) # strip off any cents whole = int(price) # figure out the size: # 10**0 = 1 for [1–9], 10**1 = 10 for [10–99], 10**2 = 100 for [100–999], etc. size = 10 ** max(int(log10(whole)) - 1, 0) # next multiple of that size next_threshold = ceil(whole / size) * size # step back 1 to land on a “9” ending psychological = next_threshold - 1 return float(psychological) def get_vendor_instance(self) -> Vendor | None: try: vendor = Vendor.objects.get(name=self.vendor_name) if vendor.is_active: return vendor raise VendorInactiveError(f"Vendor {self.vendor_name!r} is inactive...") except Vendor.DoesNotExist as dne: raise Exception(f"No matching vendor found with name {self.vendor_name!r}...") from dne def get_products(self) -> None: pass def get_products_queryset(self): return Product.objects.filter(stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True) def get_stocks_queryset(self): return Stock.objects.filter(product__in=self.get_products_queryset(), product__orderproduct__isnull=True) def get_attribute_values_queryset(self): return AttributeValue.objects.filter( product__in=self.get_products_queryset(), product__orderproduct__isnull=True ) def prepare_for_stock_update(self, method: str = "deactivate") -> None: products = self.get_products_queryset() if products is None: return # noinspection PyUnreachableCode match method: case "deactivate": products.update(is_active=False) case "delete": products.delete() case "description": products.update(description="EVIBES_DELETED_PRODUCT") case _: raise ValueError(f"Invalid method {method!r} for products update...") def delete_inactives(self, inactivation_method: str = "deactivate", size: int = 5000) -> None: filter_kwargs: dict[str, Any] = dict() match inactivation_method: case "deactivate": filter_kwargs: dict[str, Any] = {"is_active": False} case "description": filter_kwargs: dict[str, Any] = {"description__exact": "EVIBES_DELETED_PRODUCT"} case _: raise ValueError(f"Invalid method {inactivation_method!r} for products cleaner...") if filter_kwargs == {}: raise ValueError("Invalid filter kwargs...") while True: products = self.get_products_queryset() if products is None: return batch_ids = list(products.filter(**filter_kwargs).values_list("pk", flat=True)[:size]) if not batch_ids: break with suppress(Exception): AttributeValue.objects.filter(product_id__in=batch_ids).delete() ProductImage.objects.filter(product_id__in=batch_ids).delete() Product.objects.filter(pk__in=batch_ids).delete() def delete_belongings(self) -> None: self.get_products_queryset().delete() # type: ignore [union-attr] self.get_stocks_queryset().delete() # type: ignore [union-attr] self.get_attribute_values_queryset().delete() # type: ignore [union-attr] def get_or_create_attribute_safe(self, *, name: str, attr_group: AttributeGroup) -> Attribute: key = name[:255] try: attr = Attribute.objects.get(name=key) except Attribute.DoesNotExist: try: with transaction.atomic(): attr = Attribute.objects.create( name=key, group=attr_group, is_active=True, value_type="string", ) except IntegrityError: attr = Attribute.objects.get(name=key) if not attr.is_active: attr.is_active = True attr.save(update_fields=["is_active"]) return attr def process_attribute(self, key: str, value: Any, product: Product, attr_group: AttributeGroup) -> None: if not value: return if not attr_group: return if key in self.blocked_attributes: return value, attr_value_type = self.auto_convert_value(value) is_created = False if len(key) > 255: key = key[:255] try: attribute, is_created = Attribute.objects.get_or_create( name=key, group=attr_group, value_type=attr_value_type, defaults={"is_active": True}, ) except Attribute.MultipleObjectsReturned: attribute = Attribute.objects.filter(name=key, group=attr_group).order_by("uuid").first() # type: ignore [assignment] attribute.is_active = True attribute.value_type = attr_value_type attribute.save() # type: ignore [no-untyped-call] except IntegrityError: return attribute.categories.add(product.category) attribute.save() # type: ignore [no-untyped-call] if not is_created: return AttributeValue.objects.get_or_create( attribute=attribute, value=value, product=product, defaults={"is_active": True}, ) def update_stock(self) -> None: pass def update_order_products_statuses(self) -> None: pass def delete_stale() -> None: Product.objects.filter(stocks__isnull=True, orderproduct__isnull=True).delete()