import datetime import json import logging import traceback from contextlib import suppress from typing import Any, Optional, Self, Iterable from constance import config from django.contrib.gis.db.models import PointField from django.contrib.postgres.indexes import GinIndex from django.core.cache import cache from django.core.exceptions import BadRequest, ValidationError from django.core.validators import MaxValueValidator, MinValueValidator from django.db import transaction from django.db.models import ( CASCADE, PROTECT, Avg, BooleanField, CharField, DateTimeField, DecimalField, F, FileField, FloatField, ForeignKey, ImageField, IntegerField, JSONField, ManyToManyField, Max, OneToOneField, PositiveIntegerField, QuerySet, Sum, TextField, URLField, ) from django.db.models.indexes import Index from django.db.models.functions import Length from django.http import Http404 from django.utils import timezone from django.utils.encoding import force_bytes from django.utils.functional import cached_property from django.utils.http import urlsafe_base64_encode from django.utils.translation import gettext_lazy as _ from django_extensions.db.fields import AutoSlugField from django_prometheus.models import ExportModelOperationsMixin from mptt.fields import TreeForeignKey from mptt.models import MPTTModel from core.abstract import NiceModel from core.choices import ORDER_PRODUCT_STATUS_CHOICES, ORDER_STATUS_CHOICES from core.errors import DisabledCommerceError, NotEnoughMoneyError from core.managers import AddressManager, ProductManager from core.typing import FilterableAttribute from core.utils import ( generate_human_readable_id, generate_human_readable_token, get_product_uuid_as_path, get_random_code, get_vendor_name_as_path, ) from core.utils.db import TweakedAutoSlugField, unicode_slugify_function from core.utils.lists import FAILED_STATUSES from core.validators import validate_category_image_dimensions from evibes.settings import CURRENCY_CODE from evibes.utils.misc import create_object from payments.models import Transaction logger = logging.getLogger("django") class AttributeGroup(ExportModelOperationsMixin("attribute_group"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a group of attributes, which can be hierarchical." " This class is used to manage and organize attribute groups." " An attribute group can have a parent group, forming a hierarchical structure." " This can be useful for categorizing and managing attributes more effectively in acomplex system." ) is_publicly_visible = True attributes: QuerySet["Attribute"] children: QuerySet["Self"] parent = ForeignKey( "self", on_delete=CASCADE, null=True, blank=True, related_name="children", help_text=_("parent of this group"), verbose_name=_("parent attribute group"), ) name = CharField( max_length=255, verbose_name=_("attribute group's name"), help_text=_("attribute group's name"), unique=True, ) def __str__(self) -> str: return self.name class Meta: verbose_name = _("attribute group") verbose_name_plural = _("attribute groups") class Vendor(ExportModelOperationsMixin("vendor"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a vendor entity capable of storing information about external vendors and their interaction requirements." " The Vendor class is used to define and manage information related to an external vendor." " It stores the vendor's name, authentication details required for communication," " and the percentage markup applied to products retrieved from the vendor." " This model also maintains additional metadata and constraints," " making it suitable for use in systems that interact with third-party vendors." ) is_publicly_visible = False authentication = JSONField( blank=True, null=True, help_text=_("stores credentials and endpoints required for vendor communication"), verbose_name=_("authentication info"), ) markup_percent = IntegerField( default=0, validators=[MinValueValidator(0), MaxValueValidator(100)], help_text=_("define the markup for products retrieved from this vendor"), verbose_name=_("vendor markup percentage"), ) name = CharField( max_length=255, help_text=_("name of this vendor"), verbose_name=_("vendor name"), blank=False, null=False, unique=True, ) users = ManyToManyField(to="vibes_auth.User", related_name="vendors", blank=True) b2b_auth_token = CharField(default=generate_human_readable_token, max_length=20, null=True, blank=True) last_processing_response = FileField( upload_to=get_vendor_name_as_path, blank=True, null=True, verbose_name=_("response file"), help_text=_("vendor's last processing response"), ) integration_path = CharField( null=True, blank=True, max_length=255, help_text=_("vendor's integration file path"), verbose_name=_("integration path"), ) def __str__(self) -> str: return self.name def save( # type: ignore [override] self, *, force_insert: bool = False, force_update: bool = False, using: str | None = None, update_fields: list[str] | tuple[str, ...] | None = None, update_modified: bool = True, ) -> None: users = self.users.filter(is_active=True) users = users.exclude(attributes__icontains="is_business") if users.count() > 0: for user in users: if not user.attributes: user.attributes = {} user.attributes.update({"is_business": True}) user.save() return super().save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields, update_modified=update_modified, ) class Meta: verbose_name = _("vendor") verbose_name_plural = _("vendors") indexes = [ GinIndex(fields=["authentication"]), Index(fields=["name"]), ] class ProductTag(ExportModelOperationsMixin("product_tag"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a product tag used for classifying or identifying products." " The ProductTag class is designed to uniquely identify and classify products through a combination" " of an internal tag identifier and a user-friendly display name." " It supports operations exported through mixins and provides metadata customization for administrative purposes." ) is_publicly_visible = True tag_name = CharField( blank=False, null=False, max_length=255, help_text=_("internal tag identifier for the product tag"), verbose_name=_("tag name"), ) name = CharField( max_length=255, help_text=_("user-friendly name for the product tag"), verbose_name=_("tag display name"), unique=True, ) def __str__(self): return self.tag_name class Meta: verbose_name = _("product tag") verbose_name_plural = _("product tags") class CategoryTag(ExportModelOperationsMixin("category_tag"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a category tag used for products." " This class models a category tag that can be used to associate and classify products." " It includes attributes for an internal tag identifier and a user-friendly display name." ) is_publicly_visible = True tag_name = CharField( blank=False, null=False, max_length=255, help_text=_("internal tag identifier for the product tag"), verbose_name=_("tag name"), ) name = CharField( max_length=255, help_text=_("user-friendly name for the product tag"), verbose_name=_("tag display name"), unique=True, ) def __str__(self): return self.tag_name class Meta: verbose_name = _("category tag") verbose_name_plural = _("category tags") class Category(ExportModelOperationsMixin("category"), NiceModel, MPTTModel): # type: ignore [misc, django-manager-missing] __doc__ = _( # type: ignore "Represents a category entity to organize and group related items in a hierarchical structure." " Categories may have hierarchical relationships with other categories, supporting parent-child relationships." " The class includes fields for metadata and visual representation," " which serve as a foundation for category-related features." " This class is typically used to define and manage product categories or other similar groupings within an application," " allowing users or administrators to specify the name, description, and hierarchy of categories," " as well as assign attributes like images, tags, or priority." ) is_publicly_visible = True image = ImageField( blank=True, null=True, help_text=_("upload an image representing this category"), upload_to="categories/", validators=[validate_category_image_dimensions], verbose_name=_("category image"), ) markup_percent = IntegerField( default=0, validators=[MinValueValidator(0), MaxValueValidator(100)], help_text=_("define a markup percentage for products in this category"), verbose_name=_("markup percentage"), ) parent = TreeForeignKey( "self", on_delete=CASCADE, blank=True, null=True, related_name="children", help_text=_("parent of this category to form a hierarchical structure"), verbose_name=_("parent category"), ) name = CharField( max_length=255, verbose_name=_("category name"), help_text=_("provide a name for this category"), unique=True, ) description = TextField( blank=True, null=True, help_text=_("add a detailed description for this category"), verbose_name=_("category description"), ) slug = TweakedAutoSlugField( populate_from=( "parent__name", "name", ), slugify_function=unicode_slugify_function, allow_unicode=True, unique=True, editable=False, max_length=88, overwrite=True, null=True, verbose_name=_("Slug"), ) tags = ManyToManyField( "core.CategoryTag", blank=True, help_text=_("tags that help describe or group this category"), verbose_name=_("category tags"), ) priority = PositiveIntegerField( default=0, null=False, blank=False, verbose_name=_("priority"), ) def __str__(self): return self.name def get_tree_depth(self): if self.is_leaf_node(): return 0 return self.get_descendants().aggregate(max_depth=Max("level"))["max_depth"] - self.get_level() @classmethod def bulk_prefetch_filterable_attributes(cls, categories: Iterable["Category"]) -> None: cat_list = [c for c in categories] if not cat_list: return cat_ids = [c.id for c in cat_list if c.id] if not cat_ids: return rows = ( AttributeValue.objects.annotate(value_length=Length("value")) .filter( product__category_id__in=cat_ids, attribute__is_filterable=True, value_length__lte=30, ) .values_list( "product__category_id", "attribute_id", "attribute__name", "attribute__value_type", "value", ) .distinct() ) per_cat: dict[int, dict[int, dict]] = {} for cat_id, attr_id, attr_name, value_type, value in rows: cat_bucket = per_cat.get(cat_id) if cat_bucket is None: cat_bucket = {} per_cat[cat_id] = cat_bucket bucket = cat_bucket.get(attr_id) if bucket is None: bucket = { "attribute_name": attr_name, "possible_values": [], "value_type": value_type, } cat_bucket[attr_id] = bucket if len(bucket["possible_values"]) < 128 and value not in bucket["possible_values"]: bucket["possible_values"].append(value) for c in cat_list: data = list(per_cat.get(c.id, {}).values()) c.__dict__["filterable_attributes"] = data @cached_property def filterable_attributes(self) -> list[FilterableAttribute]: rows = ( AttributeValue.objects.annotate(value_length=Length("value")) .filter( product__category=self, attribute__is_filterable=True, value_length__lte=30, ) .values_list("attribute_id", "attribute__name", "attribute__value_type", "value") .distinct() ) by_attr: dict[int, dict] = {} for attr_id, attr_name, value_type, value in rows: bucket = by_attr.get(attr_id) if bucket is None: bucket = { "attribute_name": attr_name, "possible_values": [], "value_type": value_type, } by_attr[attr_id] = bucket if len(bucket["possible_values"]) < 128 and value not in bucket["possible_values"]: bucket["possible_values"].append(value) return list(by_attr.values()) # type: ignore [arg-type] class Meta: verbose_name = _("category") verbose_name_plural = _("categories") ordering = ["tree_id", "lft"] class Brand(ExportModelOperationsMixin("brand"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a Brand object in the system. " "This class handles information and attributes related to a brand, including its name, logos, " "description, associated categories, a unique slug, and priority order. " "It allows for the organization and representation of brand-related data within the application." ) is_publicly_visible = True name = CharField( max_length=255, help_text=_("name of this brand"), verbose_name=_("brand name"), unique=True, ) small_logo = ImageField( upload_to="brands/", blank=True, null=True, help_text=_("upload a logo representing this brand"), validators=[validate_category_image_dimensions], verbose_name=_("brand small image"), ) big_logo = ImageField( upload_to="brands/", blank=True, null=True, help_text=_("upload a big logo representing this brand"), validators=[validate_category_image_dimensions], verbose_name=_("brand big image"), ) description = TextField( blank=True, null=True, help_text=_("add a detailed description of the brand"), verbose_name=_("brand description"), ) categories = ManyToManyField( "core.Category", blank=True, help_text=_("optional categories that this brand is associated with"), verbose_name=_("associated categories"), ) slug = AutoSlugField( populate_from=("name",), allow_unicode=True, unique=True, editable=False, max_length=88, overwrite=True, null=True, slugify_function=unicode_slugify_function, verbose_name=_("Slug"), ) priority = PositiveIntegerField( default=0, null=False, blank=False, verbose_name=_("priority"), ) def __str__(self): return self.name class Meta: verbose_name = _("brand") verbose_name_plural = _("brands") class Stock(ExportModelOperationsMixin("stock"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents the stock of a product managed in the system." " This class provides details about the relationship between vendors, products, and their stock information, " "as well as inventory-related properties like price, purchase price, quantity, SKU, and digital assets." " It is part of the inventory management system to allow tracking and evaluation of products available" " from various vendors." ) is_publicly_visible = False vendor = ForeignKey( "core.Vendor", on_delete=CASCADE, help_text=_("the vendor supplying this product stock"), verbose_name=_("associated vendor"), ) price = FloatField( default=0.0, help_text=_("final price to the customer after markups"), verbose_name=_("selling price"), ) product = ForeignKey( "core.Product", on_delete=CASCADE, help_text=_("the product associated with this stock entry"), verbose_name=_("associated product"), related_name="stocks", blank=True, null=True, ) purchase_price = FloatField( default=0.0, help_text=_("the price paid to the vendor for this product"), verbose_name=_("vendor purchase price"), ) quantity = IntegerField( default=0, help_text=_("available quantity of the product in stock"), verbose_name=_("quantity in stock"), ) sku = CharField( max_length=255, help_text=_("vendor-assigned SKU for identifying the product"), verbose_name=_("vendor sku"), ) digital_asset = FileField( default=None, blank=True, null=True, help_text=_("digital file associated with this stock if applicable"), verbose_name=_("digital file"), upload_to="downloadables/", ) def __str__(self) -> str: return f"{self.vendor.name} - {self.product!s}" class Meta: verbose_name = _("stock") verbose_name_plural = _("stock entries") class Product(ExportModelOperationsMixin("product"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a product with attributes such as category, brand, tags, digital status, name, description, part number, and slug." " Provides related utility properties to retrieve ratings, feedback counts, price, quantity, and total orders." " Designed for use in a system that handles e-commerce or inventory management." " This class interacts with related models (such as Category, Brand, and ProductTag) and manages caching" " for frequently accessed properties to improve performance. It is used to define and manipulate product data and" " its associated information within an application." ) is_publicly_visible = True category = ForeignKey( "core.Category", on_delete=CASCADE, help_text=_("category this product belongs to"), verbose_name=_("category"), related_name="products", ) brand = ForeignKey( "core.Brand", on_delete=CASCADE, blank=True, null=True, help_text=_("optionally associate this product with a brand"), verbose_name=_("brand"), ) tags = ManyToManyField( "core.ProductTag", blank=True, help_text=_("tags that help describe or group this product"), verbose_name=_("product tags"), ) is_digital = BooleanField( default=False, help_text=_("indicates whether this product is digitally delivered"), verbose_name=_("is product digital"), blank=False, null=False, ) name = CharField( max_length=255, help_text=_("provide a clear identifying name for the product"), verbose_name=_("product name"), db_index=True, ) description = TextField( blank=True, null=True, help_text=_("add a detailed description of the product"), verbose_name=_("product description"), ) partnumber = CharField( unique=True, default=None, blank=False, null=True, help_text=_("part number for this product"), verbose_name=_("part number"), ) slug = AutoSlugField( populate_from=( "name", "brand__slug", "category__slug", "uuid", ), max_length=88, overwrite=True, allow_unicode=True, unique=True, slugify_function=unicode_slugify_function, editable=False, null=True, verbose_name=_("Slug"), ) sku = CharField( help_text=_("stock keeping unit for this product"), verbose_name=_("SKU"), max_length=8, unique=True, default=generate_human_readable_id, ) objects = ProductManager() class Meta: verbose_name = _("product") verbose_name_plural = _("products") indexes = [ Index(fields=["is_active", "brand", "category"]), Index(fields=["slug"]), Index(fields=["sku"]), ] def __str__(self): return self.name @property def rating(self) -> float: feedbacks = Feedback.objects.filter(order_product__product_id=self.pk) rating = feedbacks.aggregate(Avg("rating"))["rating__avg"] or 0 return float(round(rating, 2)) @rating.setter def rating(self, value: float): self.__dict__["rating"] = value @cached_property def feedbacks_count(self) -> int: return Feedback.objects.filter(order_product__product_id=self.pk).count() @property def price(self: Self) -> float: stock = self.stocks.only("price").order_by("-price").first() return round(stock.price, 2) if stock else 0.0 @cached_property def quantity(self) -> int: return self.stocks.aggregate(total=Sum("quantity"))["total"] or 0 @property def total_orders(self): return OrderProduct.objects.filter( product__uuid=self.uuid, status__in=["FINISHED", "DELIVERING", "CREATED", "PAYMENT"], ).count() @property def personal_orders_only(self) -> bool: return not (self.quantity > 0 and self.price > 0.0) @personal_orders_only.setter def personal_orders_only(self, value): self.__dict__["personal_orders_only"] = value class Attribute(ExportModelOperationsMixin("attribute"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents an attribute in the system." " This class is used to define and manage attributes," " which are customizable pieces of data that can be associated with other entities." " Attributes have associated categories, groups, value types, and names." " The model supports multiple types of values, including string, integer, float, boolean, array, and object." " This allows for dynamic and flexible data structuring." ) is_publicly_visible = True group = ForeignKey( "core.AttributeGroup", on_delete=CASCADE, related_name="attributes", help_text=_("group of this attribute"), verbose_name=_("attribute group"), ) value_type = CharField( max_length=50, choices=[ ("string", _("string")), ("integer", _("integer")), ("float", _("float")), ("boolean", _("boolean")), ("array", _("array")), ("object", _("object")), ], help_text=_("type of the attribute's value"), verbose_name=_("value type"), ) name = CharField( max_length=255, help_text=_("name of this attribute"), verbose_name=_("attribute's name"), ) is_filterable = BooleanField( default=True, verbose_name=_("is filterable"), help_text=_("designates whether this attribute can be used for filtering or not"), ) def __str__(self): return self.name class Meta: unique_together = ( "name", "group", "value_type", ) verbose_name = _("attribute") verbose_name_plural = _("attributes") class AttributeValue(ExportModelOperationsMixin("attribute_value"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a specific value for an attribute that is linked to a product. " "It links the 'attribute' to a unique 'value', allowing " "better organization and dynamic representation of product characteristics." ) is_publicly_visible = True attribute = ForeignKey( "core.Attribute", on_delete=CASCADE, related_name="values", help_text=_("attribute of this value"), verbose_name=_("attribute"), ) product = ForeignKey( "core.Product", on_delete=CASCADE, blank=False, null=True, help_text=_("the specific product associated with this attribute's value"), verbose_name=_("associated product"), related_name="attributes", ) value = TextField( verbose_name=_("attribute value"), help_text=_("the specific value for this attribute"), ) def __str__(self): return f"{self.attribute!s}: {self.value}" class Meta: verbose_name = _("attribute value") verbose_name_plural = _("attribute values") class ProductImage(ExportModelOperationsMixin("product_image"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a product image associated with a product in the system. " "This class is designed to manage images for products, including functionality " "for uploading image files, associating them with specific products, and " "determining their display order. It also includes an accessibility feature " "with alternative text for the images." ) is_publicly_visible = True alt = CharField( max_length=255, help_text=_("provide alternative text for the image for accessibility"), verbose_name=_("image alt text"), ) image = ImageField( help_text=_("upload the image file for this product"), verbose_name=_("product image"), upload_to=get_product_uuid_as_path, ) priority = PositiveIntegerField( default=1, help_text=_("determines the order in which images are displayed"), verbose_name=_("display priority"), ) product = ForeignKey( "core.Product", on_delete=CASCADE, help_text=_("the product that this image represents"), verbose_name=_("associated product"), related_name="images", ) def get_product_uuid_as_path(self, *args): return str(self.product.uuid) + "/" + args[0] def __str__(self) -> str: return self.alt class Meta: ordering = ("priority",) verbose_name = _("product image") verbose_name_plural = _("product images") class Promotion(ExportModelOperationsMixin("promotion"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a promotional campaign for products with a discount. " "This class is used to define and manage promotional campaigns that offer a " "percentage-based discount for products. The class includes attributes for " "setting the discount rate, providing details about the promotion, and linking " "it to the applicable products. It integrates with the product catalog to " "determine the affected items in the campaign." ) is_publicly_visible = True discount_percent = IntegerField( validators=[MinValueValidator(1), MaxValueValidator(100)], help_text=_("percentage discount for the selected products"), verbose_name=_("discount percentage"), ) name = CharField( max_length=256, unique=True, help_text=_("provide a unique name for this promotion"), verbose_name=_("promotion name"), ) description = TextField( blank=True, null=True, help_text=_("add a detailed description of the product"), verbose_name=_("promotion description"), ) products = ManyToManyField( "core.Product", blank=True, help_text=_("select which products are included in this promotion"), verbose_name=_("included products"), ) class Meta: verbose_name = _("promotion") verbose_name_plural = _("promotions") def __str__(self) -> str: if self.name: return self.name return str(self.id) class Wishlist(ExportModelOperationsMixin("wishlist"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a user's wishlist for storing and managing desired products. " "The class provides functionality to manage a collection of products, " "supporting operations such as adding and removing products, " "as well as supporting operations for adding and removing multiple " "products at once." ) is_publicly_visible = False products = ManyToManyField( "core.Product", blank=True, help_text=_("products that the user has marked as wanted"), verbose_name=_("wishlisted products"), ) user = OneToOneField( "vibes_auth.User", on_delete=CASCADE, blank=True, null=True, help_text=_("user who owns this wishlist"), verbose_name=_("wishlist owner"), related_name="user_related_wishlist", ) def __str__(self): return f"{self.user.email}'s wishlist" class Meta: verbose_name = _("wishlist") verbose_name_plural = _("wishlists") def add_product(self, product_uuid): try: product = Product.objects.get(uuid=product_uuid) if product in self.products.all(): return self self.products.add(product) except Product.DoesNotExist as dne: name = "Product" uuid = product_uuid raise Http404(_(f"{name} does not exist: {uuid}")) from dne return self def remove_product(self, product_uuid): try: product = Product.objects.get(uuid=product_uuid) if product not in self.products.all(): return self self.products.remove(product) except Product.DoesNotExist as dne: name = "Product" uuid = product_uuid raise Http404(_(f"{name} does not exist: {uuid}")) from dne return self def bulk_add_products(self, product_uuids): self.products.add(*Product.objects.filter(uuid__in=product_uuids)) return self def bulk_remove_products(self, product_uuids): self.products.remove(*Product.objects.filter(uuid__in=product_uuids)) return self class Documentary(ExportModelOperationsMixin("attribute_group"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a documentary record tied to a product. " "This class is used to store information about documentaries related to specific " "products, including file uploads and their metadata. It contains methods and " "properties to handle the file type and storage path for the documentary files. " "It extends functionality from specific mixins and provides additional custom features." ) is_publicly_visible = True product = ForeignKey(to=Product, on_delete=CASCADE, related_name="documentaries") document = FileField(upload_to=get_product_uuid_as_path) class Meta: verbose_name = _("documentary") verbose_name_plural = _("documentaries") def __str__(self): return f"{self.product.name} - {self.document.name}" def get_product_uuid_as_path(self, *args): return str(self.product.uuid) + "/" + args[0] @property def file_type(self): return self.document.name.split(".")[-1] or _("unresolved") class Address(ExportModelOperationsMixin("address"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents an address entity that includes location details and associations with a user. " "Provides functionality for geographic and address data storage, as well " "as integration with geocoding services. " "This class is designed to store detailed address information including components " "like street, city, region, country, and geolocation (longitude and latitude). " "It supports integration with geocoding APIs, enabling the storage of raw API " "responses for further processing or inspection. The class also allows associating " "an address with a user, facilitating personalized data handling." ) is_publicly_visible = False address_line = TextField( blank=True, null=True, help_text=_("address line for the customer"), verbose_name=_("address line"), ) street = CharField(_("street"), max_length=255, null=True) district = CharField(_("district"), max_length=255, null=True) city = CharField(_("city"), max_length=100, null=True) region = CharField(_("region"), max_length=100, null=True) postal_code = CharField(_("postal code"), max_length=20, null=True) country = CharField(_("country"), max_length=40, null=True) location: PointField = PointField( geography=True, srid=4326, null=True, blank=True, help_text=_("geolocation point: (longitude, latitude)"), ) raw_data = JSONField(blank=True, null=True, help_text=_("full JSON response from geocoder for this address")) api_response = JSONField( blank=True, null=True, help_text=_("stored JSON response from the geocoding service"), ) user = ForeignKey(to="vibes_auth.User", on_delete=CASCADE, blank=True, null=True) objects = AddressManager() class Meta: verbose_name = _("address") verbose_name_plural = _("addresses") indexes = [ Index(fields=["location"]), ] def __str__(self): base = f"{self.street}, {self.city}, {self.country}" return f"{base} for {self.user.email}" if self.user else base class PromoCode(ExportModelOperationsMixin("promocode"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents a promotional code that can be used for discounts, managing its validity, " "type of discount, and application. " "The PromoCode class stores details about a promotional code, including its unique " "identifier, discount properties (amount or percentage), validity period, associated " "user (if any), and status of its usage. It includes functionality to validate and " "apply the promo code to an order while ensuring constraints are met." ) is_publicly_visible = False code = CharField( max_length=20, unique=True, default=get_random_code, help_text=_("unique code used by a user to redeem a discount"), verbose_name=_("promo code identifier"), ) discount_amount = DecimalField( max_digits=10, decimal_places=2, blank=True, null=True, help_text=_("fixed discount amount applied if percent is not used"), verbose_name=_("fixed discount amount"), ) discount_percent = IntegerField( validators=[MinValueValidator(1), MaxValueValidator(100)], blank=True, null=True, help_text=_("percentage discount applied if fixed amount is not used"), verbose_name=_("percentage discount"), ) end_time = DateTimeField( blank=True, null=True, help_text=_("timestamp when the promocode expires"), verbose_name=_("end validity time"), ) start_time = DateTimeField( blank=True, null=True, help_text=_("timestamp from which this promocode is valid"), verbose_name=_("start validity time"), ) used_on = DateTimeField( blank=True, null=True, help_text=_("timestamp when the promocode was used, blank if not used yet"), verbose_name=_("usage timestamp"), ) user = ForeignKey( "vibes_auth.User", on_delete=CASCADE, help_text=_("user assigned to this promocode if applicable"), verbose_name=_("assigned user"), null=True, blank=True, related_name="promocodes", ) class Meta: verbose_name = _("promo code") verbose_name_plural = _("promo codes") def save( self, *args, force_insert=False, force_update=False, using=None, update_fields=None, update_modified: bool = True, ) -> None: if (self.discount_amount is not None and self.discount_percent is not None) or ( self.discount_amount is None and self.discount_percent is None ): raise ValidationError( _("only one type of discount should be defined (amount or percent), but not both or neither.") ) return super().save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields, update_modified=update_modified, ) def __str__(self) -> str: return self.code @property def discount_type(self): if self.discount_amount is not None: return "amount" return "percent" def use(self, order) -> float: if self.used_on: raise ValueError(_("promocode already used")) if not order.attributes: order.attributes = {} promo_amount = order.total_price if self.discount_type == "percent": promo_amount -= round(promo_amount * (float(self.discount_percent) / 100), 2) # type: ignore [arg-type] order.attributes.update({"promocode_uuid": str(self.uuid), "final_price": promo_amount}) order.save() elif self.discount_type == "amount": promo_amount -= round(float(self.discount_amount), 2) # type: ignore [arg-type] order.attributes.update({"promocode_uuid": str(self.uuid), "final_price": promo_amount}) order.save() else: raise ValueError(_(f"invalid discount type for promocode {self.uuid}")) self.used_on = datetime.datetime.now() self.save() return promo_amount class Order(ExportModelOperationsMixin("order"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents an order placed by a user." " This class models an order within the application," " including its various attributes such as billing and shipping information," " status, associated user, notifications, and related operations." " Orders can have associated products, promotions can be applied, addresses set," " and shipping or billing details updated." " Equally, functionality supports managing the products in the order lifecycle." ) is_publicly_visible = False billing_address = ForeignKey( "core.Address", on_delete=CASCADE, blank=True, null=True, related_name="billing_address_order", help_text=_("the billing address used for this order"), verbose_name=_("billing address"), ) promo_code = ForeignKey( "core.PromoCode", on_delete=PROTECT, blank=True, null=True, help_text=_("optional promo code applied to this order"), verbose_name=_("applied promo code"), ) shipping_address = ForeignKey( "core.Address", on_delete=CASCADE, blank=True, null=True, related_name="shipping_address_order", help_text=_("the shipping address used for this order"), verbose_name=_("shipping address"), ) status = CharField( default="PENDING", max_length=64, choices=ORDER_STATUS_CHOICES, help_text=_("current status of the order in its lifecycle"), verbose_name=_("order status"), ) notifications = JSONField( blank=True, null=True, help_text=_("json structure of notifications to display to users"), verbose_name=_("notifications"), ) attributes = JSONField( blank=True, null=True, help_text=_("json representation of order attributes for this order"), verbose_name=_("attributes"), ) user = ForeignKey( "vibes_auth.User", on_delete=CASCADE, help_text=_("the user who placed the order"), verbose_name=_("user"), related_name="orders", blank=True, null=True, ) buy_time = DateTimeField( help_text=_("the timestamp when the order was finalized"), verbose_name=_("buy time"), default=None, null=True, blank=True, ) human_readable_id = CharField( max_length=8, help_text=_("a human-readable identifier for the order"), verbose_name=_("human readable id"), unique=True, default=generate_human_readable_id, ) class Meta: verbose_name = _("order") verbose_name_plural = _("orders") indexes = [ Index(fields=["user", "status"]), Index(fields=["status", "buy_time"]), ] def __str__(self) -> str: return f"#{self.human_readable_id} for {self.user.email if self.user else 'unregistered user'}" @property def is_business(self) -> bool: if type(self.attributes) is not dict: self.attributes = {} self.save() return False if self.user: if type(self.user.attributes) is not dict: self.user.attributes = {} self.user.save() return False with suppress(Exception): return (self.attributes.get("is_business", False) if self.attributes else False) or ( (self.user.attributes.get("is_business", False) and self.user.attributes.get("business_identificator")) # type: ignore [union-attr] if self.user else False ) return False def save( self, *args, force_insert=False, force_update=False, using=None, update_fields=None, update_modified: bool = True, ) -> None: pending_orders = 0 if self.user: pending_orders = self.user.orders.filter(status="PENDING").count() if self.status == "PENDING" and pending_orders > 1: raise ValueError(_("a user must have only one pending order at a time")) return super().save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields, update_modified=update_modified, ) @property def total_price(self) -> float: total = self.order_products.exclude(status__in=FAILED_STATUSES).aggregate( total=Sum(F("buy_price") * F("quantity"), output_field=FloatField()) )["total"] return round(total or 0.0, 2) @property def total_quantity(self) -> int: total = self.order_products.aggregate(total=Sum("quantity"))["total"] return total or 0 def add_product( self, product_uuid=None, attributes: list | dict | None = None, update_quantity=True, ): if attributes is None: attributes = [] if self.status not in ["PENDING", "MOMENTAL"]: raise ValueError(_("you cannot add products to an order that is not a pending one")) try: product = Product.objects.get(uuid=product_uuid) if not product.is_active: raise BadRequest(_("you cannot add inactive products to order")) buy_price = product.price promotions = Promotion.objects.filter(is_active=True, products__in=[product]).order_by("discount_percent") if promotions.exists(): buy_price -= round(product.price * (promotions.first().discount_percent / 100), 2) # type: ignore [union-attr] order_product, is_created = OrderProduct.objects.get_or_create( product=product, order=self, attributes=json.dumps(attributes), defaults={"quantity": 1, "buy_price": product.price}, ) if not is_created and update_quantity: if product.quantity < order_product.quantity + 1: raise BadRequest(_("you cannot add more products than available in stock")) order_product.quantity += 1 order_product.buy_price = product.price order_product.save() return self except Product.DoesNotExist as dne: name = "Product" uuid = product_uuid raise Http404(_(f"{name} does not exist: {uuid}")) from dne def remove_product( self, product_uuid=None, attributes=None, zero_quantity=False, ): if attributes is None: attributes = {} if self.status not in ["PENDING", "MOMENTAL"]: raise ValueError(_("you cannot remove products from an order that is not a pending one")) try: product = Product.objects.get(uuid=product_uuid) order_product = self.order_products.get(product=product, order=self) if zero_quantity: order_product.delete() return self if order_product.quantity == 1: self.order_products.remove(order_product) order_product.delete() else: order_product.quantity -= 1 order_product.save() return self except Product.DoesNotExist as dne: name = "Product" uuid = product_uuid raise Http404(_(f"{name} does not exist: {uuid}")) from dne except OrderProduct.DoesNotExist as dne: name = "OrderProduct" query = f"product: {product_uuid}, order: {self.uuid}, attributes: {attributes}" raise Http404(_(f"{name} does not exist with query <{query}>")) from dne def remove_all_products(self): if self.status not in ["PENDING", "MOMENTAL"]: raise ValueError(_("you cannot remove products from an order that is not a pending one")) for order_product in self.order_products.all(): self.order_products.remove(order_product) order_product.delete() return self def remove_products_of_a_kind(self, product_uuid): if self.status not in ["PENDING", "MOMENTAL"]: raise ValueError(_("you cannot remove products from an order that is not a pending one")) try: product = Product.objects.get(uuid=product_uuid) order_product = self.order_products.get(product=product, order=self) self.order_products.remove(order_product) order_product.delete() except Product.DoesNotExist as dne: name = "Product" uuid = product_uuid raise Http404(_(f"{name} does not exist: {uuid}")) from dne return self @property def is_whole_digital(self): return self.order_products.count() == self.order_products.filter(product__is_digital=True).count() def apply_promocode(self, promocode_uuid): try: promocode = PromoCode.objects.get(uuid=promocode_uuid) except PromoCode.DoesNotExist as dne: raise Http404(_("promocode does not exist")) from dne return promocode.use(self) def apply_addresses(self, billing_address_uuid: str | None = None, shipping_address_uuid: str | None = None): try: if not any([shipping_address_uuid, billing_address_uuid]) and not self.is_whole_digital: raise ValueError(_("you can only buy physical products with shipping address specified")) if billing_address_uuid and not shipping_address_uuid: shipping_address = Address.objects.get(uuid=billing_address_uuid) billing_address = shipping_address elif shipping_address_uuid and not billing_address_uuid: billing_address = Address.objects.get(uuid=shipping_address_uuid) shipping_address = billing_address else: billing_address = Address.objects.get(uuid=str(billing_address_uuid)) shipping_address = Address.objects.get(uuid=str(shipping_address_uuid)) self.billing_address = billing_address self.shipping_address = shipping_address self.save() except Address.DoesNotExist as dne: raise Http404(_("address does not exist")) from dne def buy( self, force_balance: bool = False, force_payment: bool = False, promocode_uuid: str | None = None, billing_address: str | None = None, shipping_address: str | None = None, chosen_products: list | None = None, ) -> Self | Transaction | None: order = self if not self.attributes or type(self.attributes) is not dict: self.attributes = {} if chosen_products: order = Order.objects.create(status="MOMENTAL", user=self.user) order.bulk_add_products(chosen_products, update_quantity=True) if config.DISABLED_COMMERCE: raise DisabledCommerceError(_("you can not buy at this moment, please try again in a few minutes")) if (not force_balance and not force_payment) or (force_balance and force_payment): raise ValueError(_("invalid force value")) if any([billing_address, shipping_address]): order.apply_addresses(billing_address, shipping_address) if order.total_quantity < 1: raise ValueError(_("you cannot purchase an empty order!")) force = None if force_balance: force = "balance" if force_payment: force = "payment" amount = self.attributes.get("final_amount") or order.total_price if self.attributes.get("promocode_uuid") and not self.attributes.get("final_amount"): amount = order.apply_promocode(self.attributes.get("promocode_uuid")) if promocode_uuid and not self.attributes.get("final_amount"): amount = order.apply_promocode(promocode_uuid) if not order.user: raise ValueError(_("you cannot buy an order without a user")) if type(order.user.attributes) is dict: if order.user.attributes.get("is_business", False) or order.user.attributes.get( "business_identificator", "" ): if type(order.attributes) is not dict: order.attributes = {} order.attributes.update({"is_business": True}) else: order.user.attributes = {} order.user.save() if not order.user.payments_balance: raise ValueError(_("a user without a balance cannot buy with balance")) match force: case "balance": if order.user.payments_balance.amount < amount: raise NotEnoughMoneyError(_("insufficient funds to complete the order")) with transaction.atomic(): order.status = "CREATED" order.buy_time = timezone.now() order.update_order_products_statuses("DELIVERING") order.save() return order case "payment": order.status = "PAYMENT" order.buy_time = timezone.now() order.save() return Transaction.objects.create( balance=order.user.payments_balance, amount=amount, currency=CURRENCY_CODE, order=order, ) case _: raise ValueError(_("invalid force value")) def buy_without_registration(self, products: list, promocode_uuid, **kwargs) -> Transaction | None: if config.DISABLED_COMMERCE: raise DisabledCommerceError(_("you can not buy at this moment, please try again in a few minutes")) if len(products) < 1: raise ValueError(_("you cannot purchase an empty order!")) customer_name = kwargs.get("customer_name") or kwargs.get("business_identificator") customer_email = kwargs.get("customer_email") customer_phone_number = kwargs.get("customer_phone_number") if not all([customer_name, customer_email, customer_phone_number]): raise ValueError( _( "you cannot buy without registration, please provide the following information:" " customer name, customer email, customer phone number" ) ) payment_method = kwargs.get("payment_method") available_payment_methods = cache.get("payment_methods").get("payment_methods") if payment_method not in available_payment_methods: raise ValueError(_(f"invalid payment method: {payment_method} from {available_payment_methods}")) billing_customer_address_uuid = kwargs.get("billing_customer_address") shipping_customer_address_uuid = kwargs.get("shipping_customer_address") self.apply_addresses(billing_customer_address_uuid, shipping_customer_address_uuid) for product_uuid in products: self.add_product(product_uuid) amount = self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price self.status = "CREATED" if self.attributes is None: self.attributes = {} self.attributes.update( { "customer_name": customer_name, "customer_email": customer_email, "customer_phone_number": customer_phone_number, "is_business": kwargs.get("is_business", False), } ) self.save() return Transaction.objects.create( amount=amount, currency=CURRENCY_CODE, order=self, payment_method=kwargs.get("payment_method"), ) def finalize(self): if ( self.order_products.filter( status__in=[ "ACCEPTED", "FAILED", "RETURNED", "CANCELED", "FINISHED", ] ).count() == self.order_products.count() ): self.status = "FINISHED" self.save() def update_order_products_statuses(self, status: str = "PENDING"): self.order_products.update(status=status) def bulk_add_products(self, products: list[dict[str, Any]], update_quantity: bool = False): for product in products: self.add_product( product.get("uuid") or product.get("product_uuid"), attributes=product.get("attributes"), update_quantity=update_quantity, ) return self def bulk_remove_products(self, products: list): for product in products: self.remove_product( product.get("uuid") or product.get("product_uuid"), attributes=product.get("attributes"), zero_quantity=True, ) return self def trigger_crm(self): crm_links = OrderCrmLink.objects.filter(order=self) if crm_links.exists(): crm_link = crm_links.first() crm_integration = create_object(crm_link.crm.integration_location, crm_link.crm.name) try: crm_integration.process_order_changes(self) return True except Exception as e: logger.error(f"failed to trigger CRM integration {crm_link.crm.name} for order {self.uuid}: {e}") logger.error(traceback.format_exc()) return False else: crm = CustomerRelationshipManagementProvider.objects.get(default=True) crm_integration = create_object(crm.integration_location, crm.name) try: crm_integration.process_order_changes(self) return True except Exception as e: logger.error(f"failed to trigger CRM integration {crm.name} for order {self.uuid}: {e}") logger.error(traceback.format_exc()) return False @property def business_identificator(self) -> str | None: if self.attributes: return self.attributes.get("business_identificator") or self.attributes.get("businessIdentificator") if self.user: if self.user.attributes: return self.user.attributes.get("business_identificator") or self.user.attributes.get( "businessIdentificator" ) return None class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents products associated with orders and their attributes. " "The OrderProduct model maintains information about a product that is part of an order, " "including details such as purchase price, quantity, product attributes, and status. It " "manages notifications for the user and administrators and handles operations such as " "returning the product balance or adding feedback. This model also provides methods and " "properties that support business logic, such as calculating the total price or generating " "a download URL for digital products. The model integrates with the Order and Product models " "and stores a reference to them." ) is_publicly_visible = False buy_price = FloatField( blank=True, null=True, help_text=_("the price paid by the customer for this product at purchase time"), verbose_name=_("purchase price at order time"), ) comments = TextField( blank=True, null=True, help_text=_("internal comments for admins about this ordered product"), verbose_name=_("internal comments"), ) notifications = JSONField( blank=True, null=True, help_text=_("json structure of notifications to display to users"), verbose_name=_("user notifications"), ) attributes = JSONField( blank=True, null=True, help_text=_("json representation of this item's attributes"), verbose_name=_("ordered product attributes"), ) order = ForeignKey( "core.Order", on_delete=CASCADE, help_text=_("reference to the parent order that contains this product"), verbose_name=_("parent order"), related_name="order_products", null=True, ) product = ForeignKey( "core.Product", on_delete=PROTECT, blank=True, null=True, help_text=_("the specific product associated with this order line"), verbose_name=_("associated product"), ) quantity = PositiveIntegerField( blank=False, null=False, default=1, help_text=_("quantity of this specific product in the order"), verbose_name=_("product quantity"), ) status = CharField( max_length=128, blank=False, null=False, choices=ORDER_PRODUCT_STATUS_CHOICES, help_text=_("current status of this product in the order"), verbose_name=_("product line status"), default="PENDING", ) def __str__(self) -> str: return ( f"{self.product.name if self.product else self.uuid}" f" for ({self.order.user.email if self.order and self.order.user else 'unregistered user'})" ) class Meta: verbose_name = _("order product") verbose_name_plural = _("order products") indexes = [ GinIndex(fields=["notifications", "attributes"]), Index(fields=["order", "status"]), Index(fields=["product", "status"]), ] def return_balance_back(self): self.status = "RETURNED" self.save() self.order.user.payments_balance.amount += self.buy_price self.order.user.payments_balance.save() def add_error(self, error=None): if self.notifications is not None: order_product_errors = self.notifications.get("errors", []) if not order_product_errors: self.notifications.update( { "errors": [ { "detail": ( error if error else f"Something went wrong with {self.uuid} for some reason..." ) }, ] } ) else: order_product_errors.append({"detail": error}) self.notifications.update({"errors": order_product_errors}) else: self.notifications = {"errors": [{"detail": error}]} self.status = "FAILED" self.save() return self @property def total_price(self: Self) -> float: return round(float(self.buy_price) * self.quantity, 2) # type: ignore [arg-type] @property def download_url(self: Self) -> str: if self.product and self.product.stocks: if self.product.is_digital and self.product.stocks.first().digital_asset: # type: ignore [union-attr] if hasattr(self, "download"): return self.download.url else: return DigitalAssetDownload.objects.create(order_product=self).url return "" def do_feedback(self, rating=10, comment="", action="add") -> Optional["Feedback"] | int: if not self.order: raise ValueError(_("order product must have an order")) if action not in ["add", "remove"]: raise ValueError(_(f"wrong action specified for feedback: {action}")) feedback_qs = Feedback.objects.filter(order_product=self) feedback_exists = feedback_qs.exists() if action == "remove": if feedback_exists: return feedback_qs.delete()[0] if action == "add": if not feedback_exists: if self.order.status == "FINISHED": return Feedback.objects.create(rating=rating, comment=comment, order_product=self) else: raise ValueError(_("you cannot feedback an order which is not received")) return None class CustomerRelationshipManagementProvider(ExportModelOperationsMixin("crm_provider"), NiceModel): # type: ignore [misc] name = CharField(max_length=128, unique=True, verbose_name=_("name")) integration_url = URLField(blank=True, null=True, help_text=_("URL of the integration")) authentication = JSONField(blank=True, null=True, help_text=_("authentication credentials")) attributes = JSONField(blank=True, null=True, verbose_name=_("attributes")) integration_location = CharField(max_length=128, blank=True, null=True) default = BooleanField(default=False) def __str__(self) -> str: return self.name def save( self, *args, force_insert=False, force_update=False, using=None, update_fields=None, update_modified: bool = True, ) -> None: if self.default: qs = type(self).objects.all() if self.pk: qs = qs.exclude(pk=self.pk) if qs.filter(default=True).exists(): raise ValueError(_("you can only have one default CRM provider")) super().save( force_insert=force_insert, force_update=force_update, using=using, update_fields=update_fields, update_modified=update_modified, ) class Meta: verbose_name = _("CRM") verbose_name_plural = _("CRMs") class OrderCrmLink(ExportModelOperationsMixin("order_crm_link"), NiceModel): # type: ignore order = ForeignKey(to=Order, on_delete=PROTECT, related_name="crm_links") crm = ForeignKey(to=CustomerRelationshipManagementProvider, on_delete=PROTECT, related_name="order_links") crm_lead_id = CharField(max_length=30, unique=True, db_index=True) def __str__(self) -> str: return self.crm_lead_id class Meta: verbose_name = _("order CRM link") verbose_name_plural = _("orders CRM links") class DigitalAssetDownload(ExportModelOperationsMixin("attribute_group"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Represents the downloading functionality for digital assets associated with orders. " "The DigitalAssetDownload class provides the ability to manage and access " "downloads related to order products. It maintains information about the " "associated order product, the number of downloads, and whether the asset " "is publicly visible. It includes a method to generate a URL for downloading " "the asset when the associated order is in a completed status." ) is_publicly_visible = False order_product = OneToOneField(to=OrderProduct, on_delete=CASCADE, related_name="download") num_downloads = IntegerField(default=0) class Meta: verbose_name = _("download") verbose_name_plural = _("downloads") def __str__(self): return f"{self.order_product} - {self.num_downloads}" @property def url(self): return ( f"https://api.{config.BASE_DOMAIN}/download/{urlsafe_base64_encode(force_bytes(self.order_product.uuid))}" ) class Feedback(ExportModelOperationsMixin("feedback"), NiceModel): # type: ignore [misc] __doc__ = _( # type: ignore "Manages user feedback for products. " "This class is designed to capture and store user feedback for specific products " "that they have purchased. It contains attributes to store user comments, " "a reference to the related product in the order, and a user-assigned rating. The " "class uses database fields to effectively model and manage feedback data." ) is_publicly_visible = True comment = TextField( blank=True, null=True, help_text=_("user-provided comments about their experience with the product"), verbose_name=_("feedback comments"), ) order_product = OneToOneField( "core.OrderProduct", on_delete=CASCADE, blank=False, null=False, help_text=_("references the specific product in an order that this feedback is about"), verbose_name=_("related order product"), ) rating = FloatField( blank=True, null=True, help_text=_("user-assigned rating for the product"), verbose_name=_("product rating"), validators=[MinValueValidator(0), MaxValueValidator(10)], ) def __str__(self) -> str: if self.order_product and self.order_product.order and self.order_product.order.user: return f"{self.rating} by {self.order_product.order.user.email}" return f"{self.rating} | {self.uuid}" class Meta: verbose_name = _("feedback") verbose_name_plural = _("feedbacks")