import datetime import json import logging from decimal import Decimal from typing import Optional, Self from constance import config from django.contrib.gis.db.models import PointField from django.contrib.postgres.indexes import GinIndex from django.core.cache import cache from django.core.exceptions import BadRequest, ValidationError from django.core.validators import MaxValueValidator, MinValueValidator from django.db.models import ( CASCADE, PROTECT, Avg, BooleanField, CharField, DateTimeField, DecimalField, FileField, FloatField, ForeignKey, ImageField, IntegerField, JSONField, ManyToManyField, Max, OneToOneField, PositiveIntegerField, TextField, ) from django.db.models.indexes import Index from django.http import Http404 from django.utils import timezone from django.utils.encoding import force_bytes from django.utils.http import urlsafe_base64_encode from django.utils.translation import gettext_lazy as _ from django_extensions.db.fields import AutoSlugField from django_prometheus.models import ExportModelOperationsMixin from mptt.fields import TreeForeignKey from mptt.models import MPTTModel from core.abstract import NiceModel from core.choices import ORDER_PRODUCT_STATUS_CHOICES, ORDER_STATUS_CHOICES from core.errors import DisabledCommerceError, NotEnoughMoneyError from core.managers import AddressManager from core.utils import ( generate_human_readable_id, get_product_uuid_as_path, get_random_code, ) from core.utils.db import TweakedAutoSlugField, unicode_slugify_function from core.utils.lists import FAILED_STATUSES from core.validators import validate_category_image_dimensions from evibes.settings import CURRENCY_CODE from payments.models import Transaction logger = logging.getLogger(__name__) class AttributeGroup(ExportModelOperationsMixin("attribute_group"), NiceModel): is_publicly_visible = True parent: Self = ForeignKey( # type: ignore "self", on_delete=CASCADE, null=True, blank=True, related_name="children", help_text=_("parent of this group"), verbose_name=_("parent attribute group"), ) name: str = CharField( # type: ignore max_length=255, verbose_name=_("attribute group's name"), help_text=_("attribute group's name"), unique=True, ) def __str__(self): return self.name class Meta: verbose_name = _("attribute group") verbose_name_plural = _("attribute groups") class Vendor(ExportModelOperationsMixin("vendor"), NiceModel): is_publicly_visible = False authentication: dict = JSONField( # type: ignore blank=True, null=True, help_text=_( "stores credentials and endpoints required for vendor communication" ), verbose_name=_("authentication info"), ) markup_percent: int = IntegerField( # type: ignore default=0, validators=[MinValueValidator(0), MaxValueValidator(100)], help_text=_("define the markup for products retrieved from this vendor"), verbose_name=_("vendor markup percentage"), ) name: str = CharField( # type: ignore max_length=255, help_text=_("name of this vendor"), verbose_name=_("vendor name"), blank=False, null=False, unique=True, ) def __str__(self) -> str: return self.name class Meta: verbose_name = _("vendor") verbose_name_plural = _("vendors") indexes = [ GinIndex(fields=["authentication"]), ] class ProductTag(ExportModelOperationsMixin("product_tag"), NiceModel): is_publicly_visible = True tag_name: str = CharField( # type: ignore blank=False, null=False, max_length=255, help_text=_("internal tag identifier for the product tag"), verbose_name=_("tag name"), ) name: str = CharField( # type: ignore max_length=255, help_text=_("user-friendly name for the product tag"), verbose_name=_("tag display name"), unique=True, ) def __str__(self): return self.tag_name class Meta: verbose_name = _("product tag") verbose_name_plural = _("product tags") class CategoryTag(ExportModelOperationsMixin("category_tag"), NiceModel): is_publicly_visible = True tag_name: str = CharField( # type: ignore blank=False, null=False, max_length=255, help_text=_("internal tag identifier for the product tag"), verbose_name=_("tag name"), ) name: str = CharField( # type: ignore max_length=255, help_text=_("user-friendly name for the product tag"), verbose_name=_("tag display name"), unique=True, ) def __str__(self): return self.tag_name class Meta: verbose_name = _("category tag") verbose_name_plural = _("category tags") class Category(ExportModelOperationsMixin("category"), NiceModel, MPTTModel): is_publicly_visible = True image = ImageField( # type: ignore blank=True, null=True, help_text=_("upload an image representing this category"), upload_to="categories/", validators=[validate_category_image_dimensions], verbose_name=_("category image"), ) markup_percent: int = IntegerField( # type: ignore default=0, validators=[MinValueValidator(0), MaxValueValidator(100)], help_text=_("define a markup percentage for products in this category"), verbose_name=_("markup percentage"), ) parent: Self = TreeForeignKey( "self", on_delete=CASCADE, blank=True, null=True, related_name="children", help_text=_("parent of this category to form a hierarchical structure"), verbose_name=_("parent category"), ) name: str = CharField( # type: ignore max_length=255, verbose_name=_("category name"), help_text=_("provide a name for this category"), unique=True, ) description: str = TextField( # type: ignore blank=True, null=True, help_text=_("add a detailed description for this category"), verbose_name=_("category description"), ) slug: str = TweakedAutoSlugField( # type: ignore populate_from=( "parent__name", "name", ), slugify_function=unicode_slugify_function, allow_unicode=True, unique=True, editable=False, max_length=88, overwrite=True, null=True, verbose_name=_("Slug"), ) tags: CategoryTag = ManyToManyField( # type: ignore "core.CategoryTag", blank=True, help_text=_("tags that help describe or group this category"), verbose_name=_("category tags"), ) priority: int = PositiveIntegerField( # type: ignore default=0, null=False, blank=False, verbose_name=_("priority"), ) def __str__(self): return self.name def get_tree_depth(self): if self.is_leaf_node(): return 0 return ( self.get_descendants().aggregate(max_depth=Max("level"))["max_depth"] - self.get_level() ) class Meta: verbose_name = _("category") verbose_name_plural = _("categories") ordering = ["tree_id", "lft"] class Brand(ExportModelOperationsMixin("brand"), NiceModel): is_publicly_visible = True name: str = CharField( # type: ignore max_length=255, help_text=_("name of this brand"), verbose_name=_("brand name"), unique=True, ) small_logo = ImageField( # type: ignore upload_to="brands/", blank=True, null=True, help_text=_("upload a logo representing this brand"), validators=[validate_category_image_dimensions], verbose_name=_("brand small image"), ) big_logo = ImageField( # type: ignore upload_to="brands/", blank=True, null=True, help_text=_("upload a big logo representing this brand"), validators=[validate_category_image_dimensions], verbose_name=_("brand big image"), ) description: str = TextField( # type: ignore blank=True, null=True, help_text=_("add a detailed description of the brand"), verbose_name=_("brand description"), ) categories: Category = ManyToManyField( # type: ignore "core.Category", blank=True, help_text=_("optional categories that this brand is associated with"), verbose_name=_("associated categories"), ) slug: str = AutoSlugField( # type: ignore populate_from=("name",), allow_unicode=True, unique=True, editable=False, max_length=88, overwrite=True, null=True, slugify_function=unicode_slugify_function, verbose_name=_("Slug"), ) priority: int = PositiveIntegerField( # type: ignore default=0, null=False, blank=False, verbose_name=_("priority"), ) def __str__(self): return self.name class Meta: verbose_name = _("brand") verbose_name_plural = _("brands") class Product(ExportModelOperationsMixin("product"), NiceModel): is_publicly_visible = True category: Category = ForeignKey( # type: ignore "core.Category", on_delete=CASCADE, help_text=_("category this product belongs to"), verbose_name=_("category"), related_name="products", ) brand: Brand = ForeignKey( # type: ignore "core.Brand", on_delete=CASCADE, blank=True, null=True, help_text=_("optionally associate this product with a brand"), verbose_name=_("brand"), ) tags: ProductTag = ManyToManyField( # type: ignore "core.ProductTag", blank=True, help_text=_("tags that help describe or group this product"), verbose_name=_("product tags"), ) is_digital: bool = BooleanField( # type: ignore default=False, help_text=_("indicates whether this product is digitally delivered"), verbose_name=_("is product digital"), blank=False, null=False, ) name: str = CharField( # type: ignore max_length=255, help_text=_("provide a clear identifying name for the product"), verbose_name=_("product name"), ) description: str = TextField( # type: ignore blank=True, null=True, help_text=_("add a detailed description of the product"), verbose_name=_("product description"), ) partnumber: str = CharField( # type: ignore unique=True, default=None, blank=False, null=True, help_text=_("part number for this product"), verbose_name=_("part number"), ) slug: str | None = AutoSlugField( # type: ignore populate_from=( "name", "brand__slug", "category__slug", "uuid", ), max_length=88, overwrite=True, allow_unicode=True, unique=True, slugify_function=unicode_slugify_function, editable=False, null=True, verbose_name=_("Slug"), ) class Meta: verbose_name = _("product") verbose_name_plural = _("products") def __str__(self): return self.name @property def rating(self): cache_key = f"product_rating_{self.pk}" rating = cache.get(cache_key) if rating is None: feedbacks = Feedback.objects.filter(order_product__product_id=self.pk) rating = feedbacks.aggregate(Avg("rating"))["rating__avg"] or 0 cache.set(cache_key, rating, 604800) return round(rating, 2) @rating.setter def rating(self, value): self.__dict__["rating"] = value @property def feedbacks_count(self): cache_key = f"product_feedbacks_count_{self.pk}" feedbacks_count = cache.get(cache_key) if feedbacks_count is None: feedbacks_count = Feedback.objects.filter( order_product__product_id=self.pk ).count() cache.set(cache_key, feedbacks_count, 604800) return feedbacks_count @property def price(self) -> float: stock = self.stocks.order_by("price").only("price").first() price = stock.price if stock else 0.0 return round(price, 2) @property def quantity(self) -> int: cache_key = f"product_quantity_{self.pk}" quantity = cache.get(cache_key, 0) if not quantity: stocks = self.stocks.only("quantity") for stock in stocks: quantity += stock.quantity cache.set(cache_key, quantity, 3600) return quantity @property def total_orders(self): return OrderProduct.objects.filter( product__uuid=self.uuid, status__in=["FINISHED", "DELIVERING", "CREATED", "PAYMENT"], ).count() class Attribute(ExportModelOperationsMixin("attribute"), NiceModel): is_publicly_visible = True categories: Category = ManyToManyField( # type: ignore "core.Category", related_name="attributes", help_text=_("category of this attribute"), verbose_name=_("categories"), ) group: AttributeGroup = ForeignKey( # type: ignore "core.AttributeGroup", on_delete=CASCADE, related_name="attributes", help_text=_("group of this attribute"), verbose_name=_("attribute group"), ) value_type: str = CharField( # type: ignore max_length=50, choices=[ ("string", _("string")), ("integer", _("integer")), ("float", _("float")), ("boolean", _("boolean")), ("array", _("array")), ("object", _("object")), ], help_text=_("type of the attribute's value"), verbose_name=_("value type"), ) name: str = CharField( # type: ignore max_length=255, help_text=_("name of this attribute"), verbose_name=_("attribute's name"), unique=True, ) def __str__(self): return self.name class Meta: verbose_name = _("attribute") verbose_name_plural = _("attributes") class AttributeValue(ExportModelOperationsMixin("attribute_value"), NiceModel): is_publicly_visible = True attribute: Attribute = ForeignKey( # type: ignore "core.Attribute", on_delete=CASCADE, related_name="values", help_text=_("attribute of this value"), verbose_name=_("attribute"), ) product: Product = ForeignKey( # type: ignore "core.Product", on_delete=CASCADE, blank=False, null=True, help_text=_("the specific product associated with this attribute's value"), verbose_name=_("associated product"), related_name="attributes", ) value: str = TextField( # type: ignore verbose_name=_("attribute value"), help_text=_("the specific value for this attribute"), ) def __str__(self): return f"{self.attribute!s}: {self.value}" class Meta: verbose_name = _("attribute value") verbose_name_plural = _("attribute values") class ProductImage(ExportModelOperationsMixin("product_image"), NiceModel): is_publicly_visible = True alt: str = CharField( # type: ignore max_length=255, help_text=_("provide alternative text for the image for accessibility"), verbose_name=_("image alt text"), ) image = ImageField( help_text=_("upload the image file for this product"), verbose_name=_("product image"), upload_to=get_product_uuid_as_path, ) priority: int = IntegerField( # type: ignore default=1, validators=[MinValueValidator(1)], help_text=_("determines the order in which images are displayed"), verbose_name=_("display priority"), ) product: ForeignKey = ForeignKey( "core.Product", on_delete=CASCADE, help_text=_("the product that this image represents"), verbose_name=_("associated product"), related_name="images", ) def get_product_uuid_as_path(self, *args): return str(self.product.uuid) + "/" + args[0] def __str__(self) -> str: return self.alt class Meta: ordering = ("priority",) verbose_name = _("product image") verbose_name_plural = _("product images") class Promotion(ExportModelOperationsMixin("promotion"), NiceModel): is_publicly_visible = True discount_percent: int = IntegerField( # type: ignore validators=[MinValueValidator(1), MaxValueValidator(100)], help_text=_("percentage discount for the selected products"), verbose_name=_("discount percentage"), ) name: str = CharField( # type: ignore max_length=256, unique=True, help_text=_("provide a unique name for this promotion"), verbose_name=_("promotion name"), ) description: str = TextField( # type: ignore blank=True, null=True, help_text=_("add a detailed description of the product"), verbose_name=_("promotion description"), ) products: ManyToManyField = ManyToManyField( # type: ignore "core.Product", blank=True, help_text=_("select which products are included in this promotion"), verbose_name=_("included products"), ) class Meta: verbose_name = _("promotion") verbose_name_plural = _("promotions") def __str__(self) -> str: if self.name: return self.name return str(self.id) class Stock(ExportModelOperationsMixin("stock"), NiceModel): is_publicly_visible = False vendor: ForeignKey = ForeignKey( "core.Vendor", on_delete=CASCADE, help_text=_("the vendor supplying this product stock"), verbose_name=_("associated vendor"), ) price: float = FloatField( # type: ignore default=0.0, help_text=_("final price to the customer after markups"), verbose_name=_("selling price"), ) product: ForeignKey = ForeignKey( # type: ignore "core.Product", on_delete=CASCADE, help_text=_("the product associated with this stock entry"), verbose_name=_("associated product"), related_name="stocks", blank=True, null=True, ) purchase_price: float = FloatField( # type: ignore default=0.0, help_text=_("the price paid to the vendor for this product"), verbose_name=_("vendor purchase price"), ) quantity: int = IntegerField( # type: ignore default=0, help_text=_("available quantity of the product in stock"), verbose_name=_("quantity in stock"), ) sku: str = CharField( # type: ignore max_length=255, help_text=_("vendor-assigned SKU for identifying the product"), verbose_name=_("vendor sku"), ) digital_asset = FileField( default=None, blank=True, null=True, help_text=_("digital file associated with this stock if applicable"), verbose_name=_("digital file"), upload_to="downloadables/", ) def __str__(self) -> str: return f"{self.vendor.name} - {self.product!s}" # type: ignore class Meta: verbose_name = _("stock") verbose_name_plural = _("stock entries") class Wishlist(ExportModelOperationsMixin("wishlist"), NiceModel): is_publicly_visible = False products: ManyToManyField = ManyToManyField( # type: ignore "core.Product", blank=True, help_text=_("products that the user has marked as wanted"), verbose_name=_("wishlisted products"), ) user: OneToOneField = OneToOneField( # type: ignore "vibes_auth.User", on_delete=CASCADE, blank=True, null=True, help_text=_("user who owns this wishlist"), verbose_name=_("wishlist owner"), related_name="user_related_wishlist", ) def __str__(self): return f"{self.user.email}'s wishlist" class Meta: verbose_name = _("wishlist") verbose_name_plural = _("wishlists") def add_product(self, product_uuid): try: product = Product.objects.get(uuid=product_uuid) if product in self.products.all(): return self self.products.add(product) except Product.DoesNotExist: name = "Product" raise Http404(_(f"{name} does not exist: {product_uuid}")) return self def remove_product(self, product_uuid): try: product = Product.objects.get(uuid=product_uuid) if product not in self.products.all(): return self self.products.remove(product) except Product.DoesNotExist: name = "Product" raise Http404(_(f"{name} does not exist: {product_uuid}")) return self def bulk_add_products(self, product_uuids): self.products.add(*Product.objects.filter(uuid__in=product_uuids)) return self def bulk_remove_products(self, product_uuids): self.products.remove(*Product.objects.filter(uuid__in=product_uuids)) return self class Documentary(ExportModelOperationsMixin("attribute_group"), NiceModel): is_publicly_visible = True product: ForeignKey = ForeignKey( to=Product, on_delete=CASCADE, related_name="documentaries" ) document = FileField(upload_to=get_product_uuid_as_path) class Meta: verbose_name = _("documentary") verbose_name_plural = _("documentaries") def __str__(self): return f"{self.product.name} - {self.document.name}" def get_product_uuid_as_path(self, *args): return str(self.product.uuid) + "/" + args[0] @property def file_type(self): return self.document.name.split(".")[-1] or _("unresolved") class Address(ExportModelOperationsMixin("address"), NiceModel): is_publicly_visible = False address_line: str = TextField( # type: ignore blank=True, null=True, help_text=_("address line for the customer"), verbose_name=_("address line"), ) street: str = CharField(_("street"), max_length=255, null=True) # type: ignore district: str = CharField(_("district"), max_length=255, null=True) # type: ignore city: str = CharField(_("city"), max_length=100, null=True) # type: ignore region: str = CharField(_("region"), max_length=100, null=True) # type: ignore postal_code: str = CharField(_("postal code"), max_length=20, null=True) # type: ignore country: str = CharField(_("country"), max_length=40, null=True) # type: ignore location: PointField = PointField( # type: ignore geography=True, srid=4326, null=True, blank=True, help_text=_("geolocation point: (longitude, latitude)"), ) raw_data: dict = JSONField(blank=True, null=True, help_text=_("full JSON response from geocoder for this address")) # type: ignore api_response: dict = JSONField( # type: ignore blank=True, null=True, help_text=_("stored JSON response from the geocoding service"), ) user: ForeignKey = ForeignKey(to="vibes_auth.User", on_delete=CASCADE, blank=True, null=True) # type: ignore objects = AddressManager() class Meta: verbose_name = _("address") verbose_name_plural = _("addresses") indexes = [ Index(fields=["location"]), ] def __str__(self): base = f"{self.street}, {self.city}, {self.country}" return f"{base} for {self.user.email}" if self.user else base class PromoCode(ExportModelOperationsMixin("promocode"), NiceModel): is_publicly_visible = False code: str = CharField( # type: ignore max_length=20, unique=True, default=get_random_code, help_text=_("unique code used by a user to redeem a discount"), verbose_name=_("promo code identifier"), ) discount_amount: Decimal = DecimalField( # type: ignore max_digits=10, decimal_places=2, blank=True, null=True, help_text=_("fixed discount amount applied if percent is not used"), verbose_name=_("fixed discount amount"), ) discount_percent: int = IntegerField( # type: ignore validators=[MinValueValidator(1), MaxValueValidator(100)], blank=True, null=True, help_text=_("percentage discount applied if fixed amount is not used"), verbose_name=_("percentage discount"), ) end_time: datetime = DateTimeField( # type: ignore blank=True, null=True, help_text=_("timestamp when the promocode expires"), verbose_name=_("end validity time"), ) start_time: datetime = DateTimeField( # type: ignore blank=True, null=True, help_text=_("timestamp from which this promocode is valid"), verbose_name=_("start validity time"), ) used_on: datetime = DateTimeField( # type: ignore blank=True, null=True, help_text=_("timestamp when the promocode was used, blank if not used yet"), verbose_name=_("usage timestamp"), ) user: ForeignKey = ForeignKey( # type: ignore "vibes_auth.User", on_delete=CASCADE, help_text=_("user assigned to this promocode if applicable"), verbose_name=_("assigned user"), null=True, blank=True, related_name="promocodes", ) class Meta: verbose_name = _("promo code") verbose_name_plural = _("promo codes") def save(self, **kwargs): if (self.discount_amount is not None and self.discount_percent is not None) or ( self.discount_amount is None and self.discount_percent is None ): raise ValidationError( _( "only one type of discount should be defined (amount or percent), but not both or neither." ) ) super().save(**kwargs) def __str__(self) -> str: return self.code @property def discount_type(self): if self.discount_amount is not None: return "amount" return "percent" def use(self, order: "Order") -> float: if self.used_on: raise ValueError(_("promocode already used")) amount = order.total_price if self.discount_type == "percent": amount -= round(amount * (self.discount_percent / 100), 2) order.attributes.update( {"promocode": str(self.uuid), "final_price": amount} ) order.save() elif self.discount_type == "amount": amount -= round(float(self.discount_amount), 2) order.attributes.update( {"promocode": str(self.uuid), "final_price": amount} ) order.save() else: raise ValueError(_(f"invalid discount type for promocode {self.uuid}")) self.used_on = datetime.datetime.now() self.save() return amount class Order(ExportModelOperationsMixin("order"), NiceModel): is_publicly_visible = False billing_address: Address = ForeignKey( # type: ignore "core.Address", on_delete=CASCADE, blank=True, null=True, related_name="billing_address_order", help_text=_("the billing address used for this order"), verbose_name=_("billing address"), ) promo_code: PromoCode = ForeignKey( # type: ignore "core.PromoCode", on_delete=PROTECT, blank=True, null=True, help_text=_("optional promo code applied to this order"), verbose_name=_("applied promo code"), ) shipping_address: Address = ForeignKey( # type: ignore "core.Address", on_delete=CASCADE, blank=True, null=True, related_name="shipping_address_order", help_text=_("the shipping address used for this order"), verbose_name=_("shipping address"), ) status: str = CharField( # type: ignore default="PENDING", max_length=64, choices=ORDER_STATUS_CHOICES, help_text=_("current status of the order in its lifecycle"), verbose_name=_("order status"), ) notifications: dict = JSONField( # type: ignore blank=True, null=True, help_text=_("json structure of notifications to display to users"), verbose_name=_("notifications"), ) attributes: dict = JSONField( # type: ignore blank=True, null=True, help_text=_("json representation of order attributes for this order"), verbose_name=_("attributes"), ) user = ForeignKey( # type: ignore "vibes_auth.User", on_delete=CASCADE, help_text=_("the user who placed the order"), verbose_name=_("user"), related_name="orders", blank=True, null=True, ) buy_time: datetime = DateTimeField( # type: ignore help_text=_("the timestamp when the order was finalized"), verbose_name=_("buy time"), default=None, null=True, blank=True, ) human_readable_id: str = CharField( # type: ignore max_length=8, help_text=_("a human-readable identifier for the order"), verbose_name=_("human readable id"), unique=True, default=generate_human_readable_id, ) class Meta: verbose_name = _("order") verbose_name_plural = _("orders") def __str__(self) -> str: return f"#{self.pk} for {self.user.email if self.user else 'unregistered user'}" # type: ignore @property def is_business(self) -> bool: return self.attributes.get("is_business", False) if self.attributes else False def save(self, **kwargs): pending_orders = 0 if self.user: pending_orders = self.user.orders.filter(status="PENDING").count() if self.status == "PENDING" and pending_orders > 1: raise ValueError(_("a user must have only one pending order at a time")) return super().save(**kwargs) @property def total_price(self) -> float: return ( round( sum( ( order_product.buy_price * order_product.quantity if order_product.status not in FAILED_STATUSES and order_product.buy_price is not None else 0.0 ) for order_product in self.order_products.all() ), 2, ) or 0.0 ) @property def total_quantity(self) -> int: return sum([op.quantity for op in self.order_products.all()]) def add_product( self, product_uuid: str | None = None, attributes: list | None = None, update_quantity: bool = True, ): if attributes is None: attributes = [] if self.status not in ["PENDING", "MOMENTAL"]: raise ValueError( _("you cannot add products to an order that is not a pending one") ) try: product = Product.objects.get(uuid=product_uuid) if not product.is_active: raise BadRequest(_("you cannot add inactive products to order")) buy_price = product.price promotions = Promotion.objects.filter( is_active=True, products__in=[product] ).order_by("discount_percent") if promotions.exists(): buy_price -= round(product.price * (promotions.first().discount_percent / 100), 2) # type: ignore order_product, is_created = OrderProduct.objects.get_or_create( product=product, order=self, attributes=json.dumps(attributes), defaults={"quantity": 1, "buy_price": product.price}, ) if not is_created and update_quantity: if product.quantity < order_product.quantity + 1: raise BadRequest( _("you cannot add more products than available in stock") ) order_product.quantity += 1 order_product.buy_price = product.price order_product.save() return self except Product.DoesNotExist: name = "Product" raise Http404(_(f"{name} does not exist: {product_uuid}")) def remove_product( self, product_uuid: str | None = None, attributes: dict | None = None, zero_quantity: bool = False, ): if attributes is None: attributes = {} if self.status not in ["PENDING", "MOMENTAL"]: raise ValueError( _("you cannot remove products from an order that is not a pending one") ) try: product = Product.objects.get(uuid=product_uuid) order_product = self.order_products.get(product=product, order=self) if zero_quantity: order_product.delete() return self if order_product.quantity == 1: self.order_products.remove(order_product) order_product.delete() else: order_product.quantity -= 1 order_product.save() return self except Product.DoesNotExist: name = "Product" raise Http404(_(f"{name} does not exist: {product_uuid}")) except OrderProduct.DoesNotExist: name = "OrderProduct" query = ( f"product: {product_uuid}, order: {self.uuid}, attributes: {attributes}" ) raise Http404(_(f"{name} does not exist with query <{query}>")) def remove_all_products(self): if self.status not in ["PENDING", "MOMENTAL"]: raise ValueError( _("you cannot remove products from an order that is not a pending one") ) for order_product in self.order_products.all(): self.order_products.remove(order_product) order_product.delete() return self def remove_products_of_a_kind(self, product_uuid: str): if self.status not in ["PENDING", "MOMENTAL"]: raise ValueError( _("you cannot remove products from an order that is not a pending one") ) try: product = Product.objects.get(uuid=product_uuid) order_product = self.order_products.get(product=product, order=self) self.order_products.remove(order_product) order_product.delete() except Product.DoesNotExist: name = "Product" raise Http404(_(f"{name} does not exist: {product_uuid}")) return self @property def is_whole_digital(self): return ( self.order_products.count() == self.order_products.filter(product__is_digital=True).count() ) def apply_promocode(self, promocode_uuid: str): try: promocode: PromoCode = PromoCode.objects.get(uuid=promocode_uuid) except PromoCode.DoesNotExist: raise Http404(_("promocode does not exist")) return promocode.use(self) def apply_addresses(self, billing_address_uuid, shipping_address_uuid): try: if not any([shipping_address_uuid, billing_address_uuid]): if self.is_whole_digital: return else: raise ValueError( _( "you can only buy physical products with shipping address specified" ) ) if billing_address_uuid and not shipping_address_uuid: shipping_address = Address.objects.get(uuid=billing_address_uuid) billing_address = shipping_address elif shipping_address_uuid and not billing_address_uuid: billing_address = Address.objects.get(uuid=shipping_address_uuid) shipping_address = billing_address else: billing_address = Address.objects.get(uuid=billing_address_uuid) shipping_address = Address.objects.get(uuid=shipping_address_uuid) self.billing_address = billing_address self.shipping_address = shipping_address self.save() except Address.DoesNotExist: raise Http404(_("address does not exist")) def buy( self, force_balance: bool = False, force_payment: bool = False, promocode_uuid: str | None = None, billing_address: str | None = None, shipping_address: str | None = None, ) -> Self | Transaction | None: if config.DISABLED_COMMERCE: raise DisabledCommerceError( _("you can not buy at this moment, please try again in a few minutes") ) if (not force_balance and not force_payment) or ( force_balance and force_payment ): raise ValueError(_("invalid force value")) self.apply_addresses(billing_address, shipping_address) if self.total_quantity < 1: raise ValueError(_("you cannot purchase an empty order!")) force = None if force_balance: force = "balance" if force_payment: force = "payment" amount = ( self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price ) match force: case "balance": if self.user.payments_balance.amount < amount: # type: ignore raise NotEnoughMoneyError( _("insufficient funds to complete the order") ) self.status = "CREATED" self.buy_time = timezone.now() self.order_products.all().update(status="DELIVERING") self.save() return self case "payment": self.status = "PAYMENT" self.save() return Transaction.objects.create( balance=self.user.payments_balance, # type: ignore amount=amount, currency=CURRENCY_CODE, order=self, ) return self def buy_without_registration( self, products: list, promocode_uuid: str, **kwargs ) -> Transaction | None: if config.DISABLED_COMMERCE: raise DisabledCommerceError( _("you can not buy at this moment, please try again in a few minutes") ) if len(products) < 1: raise ValueError(_("you cannot purchase an empty order!")) customer_name = kwargs.get("customer_name") customer_email = kwargs.get("customer_email") customer_phone_number = kwargs.get("customer_phone_number") if not all([customer_name, customer_email, customer_phone_number]): raise ValueError( _( "you cannot buy without registration, please provide the following information:" " customer name, customer email, customer phone number" ) ) payment_method = kwargs.get("payment_method") available_payment_methods = cache.get("payment_methods").get("payment_methods") if payment_method not in available_payment_methods: raise ValueError( _( f"invalid payment method: {payment_method} from {available_payment_methods}" ) ) billing_customer_address_uuid = kwargs.get("billing_customer_address") shipping_customer_address_uuid = kwargs.get("shipping_customer_address") self.apply_addresses( billing_customer_address_uuid, shipping_customer_address_uuid ) for product_uuid in products: self.add_product(product_uuid) amount = ( self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price ) self.status = "CREATED" if self.attributes is None: self.attributes = {} self.attributes.update( { "customer_name": customer_name, "customer_email": customer_email, "customer_phone_number": customer_phone_number, "is_business": kwargs.get("is_business", False), } ) self.save() return Transaction.objects.create( amount=amount, currency=CURRENCY_CODE, order=self, payment_method=kwargs.get("payment_method"), ) def finalize(self): if ( self.order_products.filter( status__in=[ "ACCEPTED", "FAILED", "RETURNED", "CANCELED", "FINISHED", ] ).count() == self.order_products.count() ): self.status = "FINISHED" self.save() def bulk_add_products(self, products: list): for product in products: self.add_product( product.get("uuid"), attributes=product.get("attributes"), update_quantity=False, ) return self def bulk_remove_products(self, products: list): for product in products: self.remove_product( product.get("uuid"), attributes=product.get("attributes"), zero_quantity=True, ) return self class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel): is_publicly_visible = False buy_price: float = FloatField( # type: ignore blank=True, null=True, help_text=_("the price paid by the customer for this product at purchase time"), verbose_name=_("purchase price at order time"), ) comments: str = TextField( # type: ignore blank=True, null=True, help_text=_("internal comments for admins about this ordered product"), verbose_name=_("internal comments"), ) notifications: dict = JSONField( # type: ignore blank=True, null=True, help_text=_("json structure of notifications to display to users"), verbose_name=_("user notifications"), ) attributes: dict = JSONField( # type: ignore blank=True, null=True, help_text=_("json representation of this item's attributes"), verbose_name=_("ordered product attributes"), ) order: Order = ForeignKey( # type: ignore "core.Order", on_delete=CASCADE, help_text=_("reference to the parent order that contains this product"), verbose_name=_("parent order"), related_name="order_products", null=True, ) product: Product = ForeignKey( # type: ignore "core.Product", on_delete=PROTECT, blank=True, null=True, help_text=_("the specific product associated with this order line"), verbose_name=_("associated product"), ) quantity: int = PositiveIntegerField( # type: ignore blank=False, null=False, default=1, help_text=_("quantity of this specific product in the order"), verbose_name=_("product quantity"), ) status: str = CharField( # type: ignore max_length=128, blank=False, null=False, choices=ORDER_PRODUCT_STATUS_CHOICES, help_text=_("current status of this product in the order"), verbose_name=_("product line status"), default="PENDING", ) def __str__(self) -> str: return f"{self.product.name} for ({self.order.user.email if self.order.user else 'unregistered user'})" # type: ignore class Meta: verbose_name = _("order product") verbose_name_plural = _("order products") indexes = [ GinIndex(fields=["notifications", "attributes"]), ] def return_balance_back(self): self.status = "RETURNED" self.save() self.order.user.payments_balance.amount += self.buy_price self.order.user.payments_balance.save() def add_error(self, error=None): if self.notifications is not None: order_product_errors = self.notifications.get("errors", []) if not order_product_errors: self.notifications.update( { "errors": [ { "detail": ( error if error else f"Something went wrong with {self.uuid} for some reason..." ) }, ] } ) else: order_product_errors.append({"detail": error}) self.notifications.update({"errors": order_product_errors}) else: self.notifications = {"errors": [{"detail": error}]} self.status = "FAILED" self.save() return self @property def total_price(self) -> float: return round(self.buy_price * self.quantity, 2) @property def download_url(self) -> str: if self.product.is_digital and self.product.stocks.first().digital_asset: # type: ignore return self.download.url return "" def do_feedback( self, rating: int = 10, comment: str = "", action: str = "add" ) -> Optional["Feedback"]: if action not in ["add", "remove"]: raise ValueError(_(f"wrong action specified for feedback: {action}")) if action == "remove" and self.feedback: self.feedback.delete() return None if action == "add" and not self.feedback: if self.order.status not in ["MOMENTAL", "PENDING"]: # type: ignore return Feedback.objects.create( rating=rating, comment=comment, order_product=self ) else: raise ValueError( _("you cannot feedback an order which is not received") ) return None class DigitalAssetDownload(ExportModelOperationsMixin("attribute_group"), NiceModel): is_publicly_visible = False order_product: OneToOneField = OneToOneField(to=OrderProduct, on_delete=CASCADE, related_name="download") # type: ignore num_downloads: int = IntegerField(default=0) # type: ignore class Meta: verbose_name = _("download") verbose_name_plural = _("downloads") def __str__(self): return f"{self.order_product} - {self.num_downloads}" @property def url(self): if self.order_product.status != "FINISHED": raise ValueError( _("you can not download a digital asset for a non-finished order") ) return f"https://api.{config.BASE_DOMAIN}/download/{urlsafe_base64_encode(force_bytes(self.order_product.uuid))}" class Feedback(ExportModelOperationsMixin("feedback"), NiceModel): is_publicly_visible = True comment: str = TextField( # type: ignore blank=True, null=True, help_text=_("user-provided comments about their experience with the product"), verbose_name=_("feedback comments"), ) order_product: OrderProduct = OneToOneField( # type: ignore "core.OrderProduct", on_delete=CASCADE, blank=False, null=False, help_text=_( "references the specific product in an order that this feedback is about" ), verbose_name=_("related order product"), ) rating: float = FloatField( # type: ignore blank=True, null=True, help_text=_("user-assigned rating for the product"), verbose_name=_("product rating"), validators=[MinValueValidator(0), MaxValueValidator(10)], ) def __str__(self) -> str: return f"{self.rating} by {self.order_product.order.user.email}" class Meta: verbose_name = _("feedback") verbose_name_plural = _("feedbacks")