schon/core/models.py
Egor fureunoir Gorbunov 8c906a2880 Features: 1) Add support for auto-generating slugs for Brand, Category, and Product models; 2) Extend Elasticsearch documents with slug indexing and response processing; 3) Introduce image fetching in search results.
Fixes: 1) Update slug population logic in management commands.

Extra: Refactor renderer class formatting, query processing, and formatting for readability across multiple files.
2025-06-18 22:23:14 +03:00

1470 lines
47 KiB
Python

import datetime
import json
import logging
from decimal import Decimal
from typing import Optional, Self
from constance import config
from django.contrib.gis.db.models import PointField
from django.contrib.postgres.indexes import GinIndex
from django.core.cache import cache
from django.core.exceptions import BadRequest, ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db.models import (
CASCADE,
PROTECT,
Avg,
BooleanField,
CharField,
DateTimeField,
DecimalField,
FileField,
FloatField,
ForeignKey,
ImageField,
IntegerField,
JSONField,
ManyToManyField,
Max,
OneToOneField,
PositiveIntegerField,
TextField,
)
from django.db.models.indexes import Index
from django.http import Http404
from django.utils import timezone
from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode
from django.utils.translation import gettext_lazy as _
from django_extensions.db.fields import AutoSlugField
from django_prometheus.models import ExportModelOperationsMixin
from mptt.fields import TreeForeignKey
from mptt.models import MPTTModel
from core.abstract import NiceModel
from core.choices import ORDER_PRODUCT_STATUS_CHOICES, ORDER_STATUS_CHOICES
from core.errors import DisabledCommerceError, NotEnoughMoneyError
from core.managers import AddressManager
from core.utils import (
generate_human_readable_id,
get_product_uuid_as_path,
get_random_code,
)
from core.utils.lists import FAILED_STATUSES
from core.validators import validate_category_image_dimensions
from evibes.settings import CURRENCY_CODE
from payments.models import Transaction
logger = logging.getLogger(__name__)
class AttributeGroup(ExportModelOperationsMixin("attribute_group"), NiceModel):
is_publicly_visible = True
parent: Self = ForeignKey( # type: ignore
"self",
on_delete=CASCADE,
null=True,
blank=True,
related_name="children",
help_text=_("parent of this group"),
verbose_name=_("parent attribute group"),
)
name: str = CharField( # type: ignore
max_length=255,
verbose_name=_("attribute group's name"),
help_text=_("attribute group's name"),
unique=True,
)
def __str__(self):
return self.name
class Meta:
verbose_name = _("attribute group")
verbose_name_plural = _("attribute groups")
class Vendor(ExportModelOperationsMixin("vendor"), NiceModel):
is_publicly_visible = False
authentication: dict = JSONField( # type: ignore
blank=True,
null=True,
help_text=_(
"stores credentials and endpoints required for vendor communication"
),
verbose_name=_("authentication info"),
)
markup_percent: int = IntegerField( # type: ignore
default=0,
validators=[MinValueValidator(0), MaxValueValidator(100)],
help_text=_("define the markup for products retrieved from this vendor"),
verbose_name=_("vendor markup percentage"),
)
name: str = CharField( # type: ignore
max_length=255,
help_text=_("name of this vendor"),
verbose_name=_("vendor name"),
blank=False,
null=False,
unique=True,
)
def __str__(self) -> str:
return self.name
class Meta:
verbose_name = _("vendor")
verbose_name_plural = _("vendors")
indexes = [
GinIndex(fields=["authentication"]),
]
class ProductTag(ExportModelOperationsMixin("product_tag"), NiceModel):
is_publicly_visible = True
tag_name: str = CharField( # type: ignore
blank=False,
null=False,
max_length=255,
help_text=_("internal tag identifier for the product tag"),
verbose_name=_("tag name"),
)
name: str = CharField( # type: ignore
max_length=255,
help_text=_("user-friendly name for the product tag"),
verbose_name=_("tag display name"),
unique=True,
)
def __str__(self):
return self.tag_name
class Meta:
verbose_name = _("product tag")
verbose_name_plural = _("product tags")
class CategoryTag(ExportModelOperationsMixin("category_tag"), NiceModel):
is_publicly_visible = True
tag_name: str = CharField( # type: ignore
blank=False,
null=False,
max_length=255,
help_text=_("internal tag identifier for the product tag"),
verbose_name=_("tag name"),
)
name: str = CharField( # type: ignore
max_length=255,
help_text=_("user-friendly name for the product tag"),
verbose_name=_("tag display name"),
unique=True,
)
def __str__(self):
return self.tag_name
class Meta:
verbose_name = _("category tag")
verbose_name_plural = _("category tags")
class Category(ExportModelOperationsMixin("category"), NiceModel, MPTTModel):
is_publicly_visible = True
image = ImageField( # type: ignore
blank=True,
null=True,
help_text=_("upload an image representing this category"),
upload_to="categories/",
validators=[validate_category_image_dimensions],
verbose_name=_("category image"),
)
markup_percent: int = IntegerField( # type: ignore
default=0,
validators=[MinValueValidator(0), MaxValueValidator(100)],
help_text=_("define a markup percentage for products in this category"),
verbose_name=_("markup percentage"),
)
parent: Self = TreeForeignKey(
"self",
on_delete=CASCADE,
blank=True,
null=True,
related_name="children",
help_text=_("parent of this category to form a hierarchical structure"),
verbose_name=_("parent category"),
)
name: str = CharField( # type: ignore
max_length=255,
verbose_name=_("category name"),
help_text=_("provide a name for this category"),
unique=True,
)
description: str = TextField( # type: ignore
blank=True,
null=True,
help_text=_("add a detailed description for this category"),
verbose_name=_("category description"),
)
slug: str = AutoSlugField( # type: ignore
populate_from=("name",),
allow_unicode=True,
unique=True,
editable=False,
null=True,
)
tags: CategoryTag = ManyToManyField( # type: ignore
"core.CategoryTag",
blank=True,
help_text=_("tags that help describe or group this category"),
verbose_name=_("category tags"),
)
def __str__(self):
return self.name
def get_tree_depth(self):
if self.is_leaf_node():
return 0
return (
self.get_descendants().aggregate(max_depth=Max("level"))["max_depth"]
- self.get_level()
)
class Meta:
verbose_name = _("category")
verbose_name_plural = _("categories")
ordering = ["tree_id", "lft"]
class Brand(ExportModelOperationsMixin("brand"), NiceModel):
is_publicly_visible = True
name: str = CharField( # type: ignore
max_length=255,
help_text=_("name of this brand"),
verbose_name=_("brand name"),
unique=True,
)
small_logo = ImageField( # type: ignore
upload_to="brands/",
blank=True,
null=True,
help_text=_("upload a logo representing this brand"),
validators=[validate_category_image_dimensions],
verbose_name=_("brand small image"),
)
big_logo = ImageField( # type: ignore
upload_to="brands/",
blank=True,
null=True,
help_text=_("upload a big logo representing this brand"),
validators=[validate_category_image_dimensions],
verbose_name=_("brand big image"),
)
description: str = TextField( # type: ignore
blank=True,
null=True,
help_text=_("add a detailed description of the brand"),
verbose_name=_("brand description"),
)
categories: Category = ManyToManyField( # type: ignore
"core.Category",
blank=True,
help_text=_("optional categories that this brand is associated with"),
verbose_name=_("associated categories"),
)
slug: str = AutoSlugField( # type: ignore
populate_from=("name",),
allow_unicode=True,
unique=True,
editable=False,
null=True,
)
def __str__(self):
return self.name
class Meta:
verbose_name = _("brand")
verbose_name_plural = _("brands")
class Product(ExportModelOperationsMixin("product"), NiceModel):
is_publicly_visible = True
category: Category = ForeignKey(
"core.Category",
on_delete=CASCADE,
help_text=_("category this product belongs to"),
verbose_name=_("category"),
related_name="products",
)
brand: Brand = ForeignKey(
"core.Brand",
on_delete=CASCADE,
blank=True,
null=True,
help_text=_("optionally associate this product with a brand"),
verbose_name=_("brand"),
)
tags: ProductTag = ManyToManyField( # type: ignore
"core.ProductTag",
blank=True,
help_text=_("tags that help describe or group this product"),
verbose_name=_("product tags"),
)
is_digital: bool = BooleanField( # type: ignore
default=False,
help_text=_("indicates whether this product is digitally delivered"),
verbose_name=_("is product digital"),
blank=False,
null=False,
)
name: str = CharField( # type: ignore
max_length=255,
help_text=_("provide a clear identifying name for the product"),
verbose_name=_("product name"),
)
description: str = TextField( # type: ignore
blank=True,
null=True,
help_text=_("add a detailed description of the product"),
verbose_name=_("product description"),
)
partnumber: str = CharField( # type: ignore
unique=True,
default=None,
blank=False,
null=True,
help_text=_("part number for this product"),
verbose_name=_("part number"),
)
slug: str | None = AutoSlugField( # type: ignore
populate_from=("category__slug", "brand__slug", "name", "uuid"),
allow_unicode=True,
unique=True,
editable=False,
null=True,
)
class Meta:
verbose_name = _("product")
verbose_name_plural = _("products")
def __str__(self):
return self.name
@property
def rating(self):
cache_key = f"product_rating_{self.pk}"
rating = cache.get(cache_key)
if rating is None:
feedbacks = Feedback.objects.filter(order_product__product_id=self.pk)
rating = feedbacks.aggregate(Avg("rating"))["rating__avg"] or 0
cache.set(cache_key, rating, 604800)
return round(rating, 2)
@rating.setter
def rating(self, value):
self.__dict__["rating"] = value
@property
def feedbacks_count(self):
cache_key = f"product_feedbacks_count_{self.pk}"
feedbacks_count = cache.get(cache_key)
if feedbacks_count is None:
feedbacks_count = Feedback.objects.filter(
order_product__product_id=self.pk
).count()
cache.set(cache_key, feedbacks_count, 604800)
return feedbacks_count
@property
def price(self) -> float:
stock = self.stocks.order_by("price").only("price").first()
price = stock.price if stock else 0.0
return round(price, 2)
@property
def quantity(self) -> int:
cache_key = f"product_quantity_{self.pk}"
quantity = cache.get(cache_key, 0)
if not quantity:
stocks = self.stocks.only("quantity")
for stock in stocks:
quantity += stock.quantity
cache.set(cache_key, quantity, 3600)
return quantity
class Attribute(ExportModelOperationsMixin("attribute"), NiceModel):
is_publicly_visible = True
categories: Category = ManyToManyField( # type: ignore
"core.Category",
related_name="attributes",
help_text=_("category of this attribute"),
verbose_name=_("categories"),
)
group: AttributeGroup = ForeignKey( # type: ignore
"core.AttributeGroup",
on_delete=CASCADE,
related_name="attributes",
help_text=_("group of this attribute"),
verbose_name=_("attribute group"),
)
value_type: str = CharField( # type: ignore
max_length=50,
choices=[
("string", _("string")),
("integer", _("integer")),
("float", _("float")),
("boolean", _("boolean")),
("array", _("array")),
("object", _("object")),
],
help_text=_("type of the attribute's value"),
verbose_name=_("value type"),
)
name: str = CharField( # type: ignore
max_length=255,
help_text=_("name of this attribute"),
verbose_name=_("attribute's name"),
unique=True,
)
def __str__(self):
return self.name
class Meta:
verbose_name = _("attribute")
verbose_name_plural = _("attributes")
class AttributeValue(ExportModelOperationsMixin("attribute_value"), NiceModel):
is_publicly_visible = True
attribute: Attribute = ForeignKey(
"core.Attribute",
on_delete=CASCADE,
related_name="values",
help_text=_("attribute of this value"),
verbose_name=_("attribute"),
)
product: Product = ForeignKey(
"core.Product",
on_delete=CASCADE,
blank=False,
null=True,
help_text=_("the specific product associated with this attribute's value"),
verbose_name=_("associated product"),
related_name="attributes",
)
value: str = TextField( # type: ignore
verbose_name=_("attribute value"),
help_text=_("the specific value for this attribute"),
)
def __str__(self):
return f"{self.attribute!s}: {self.value}"
class Meta:
verbose_name = _("attribute value")
verbose_name_plural = _("attribute values")
class ProductImage(ExportModelOperationsMixin("product_image"), NiceModel):
is_publicly_visible = True
alt: str = CharField( # type: ignore
max_length=255,
help_text=_("provide alternative text for the image for accessibility"),
verbose_name=_("image alt text"),
)
image = ImageField(
help_text=_("upload the image file for this product"),
verbose_name=_("product image"),
upload_to=get_product_uuid_as_path,
)
priority: int = IntegerField( # type: ignore
default=1,
validators=[MinValueValidator(1)],
help_text=_("determines the order in which images are displayed"),
verbose_name=_("display priority"),
)
product: ForeignKey = ForeignKey(
"core.Product",
on_delete=CASCADE,
help_text=_("the product that this image represents"),
verbose_name=_("associated product"),
related_name="images",
)
def get_product_uuid_as_path(self, *args):
return str(self.product.uuid) + "/" + args[0]
def __str__(self) -> str:
return self.alt
class Meta:
ordering = ("priority",)
verbose_name = _("product image")
verbose_name_plural = _("product images")
class Promotion(ExportModelOperationsMixin("promotion"), NiceModel):
is_publicly_visible = True
discount_percent: int = IntegerField( # type: ignore
validators=[MinValueValidator(1), MaxValueValidator(100)],
help_text=_("percentage discount for the selected products"),
verbose_name=_("discount percentage"),
)
name: str = CharField( # type: ignore
max_length=256,
unique=True,
help_text=_("provide a unique name for this promotion"),
verbose_name=_("promotion name"),
)
description: str = TextField( # type: ignore
blank=True,
null=True,
help_text=_("add a detailed description of the product"),
verbose_name=_("promotion description"),
)
products: ManyToManyField = ManyToManyField( # type: ignore
"core.Product",
blank=True,
help_text=_("select which products are included in this promotion"),
verbose_name=_("included products"),
)
class Meta:
verbose_name = _("promotion")
verbose_name_plural = _("promotions")
def __str__(self) -> str:
if self.name:
return self.name
return str(self.id)
class Stock(ExportModelOperationsMixin("stock"), NiceModel):
is_publicly_visible = False
vendor: ForeignKey = ForeignKey(
"core.Vendor",
on_delete=CASCADE,
help_text=_("the vendor supplying this product stock"),
verbose_name=_("associated vendor"),
)
price: float = FloatField( # type: ignore
default=0.0,
help_text=_("final price to the customer after markups"),
verbose_name=_("selling price"),
)
product: ForeignKey = ForeignKey( # type: ignore
"core.Product",
on_delete=CASCADE,
help_text=_("the product associated with this stock entry"),
verbose_name=_("associated product"),
related_name="stocks",
blank=True,
null=True,
)
purchase_price: float = FloatField( # type: ignore
default=0.0,
help_text=_("the price paid to the vendor for this product"),
verbose_name=_("vendor purchase price"),
)
quantity: int = IntegerField( # type: ignore
default=0,
help_text=_("available quantity of the product in stock"),
verbose_name=_("quantity in stock"),
)
sku: str = CharField( # type: ignore
max_length=255,
help_text=_("vendor-assigned SKU for identifying the product"),
verbose_name=_("vendor sku"),
)
digital_asset = FileField(
default=None,
blank=True,
null=True,
help_text=_("digital file associated with this stock if applicable"),
verbose_name=_("digital file"),
upload_to="downloadables/",
)
def __str__(self) -> str:
return f"{self.vendor.name} - {self.product!s}" # type: ignore
class Meta:
verbose_name = _("stock")
verbose_name_plural = _("stock entries")
class Wishlist(ExportModelOperationsMixin("wishlist"), NiceModel):
is_publicly_visible = False
products: ManyToManyField = ManyToManyField( # type: ignore
"core.Product",
blank=True,
help_text=_("products that the user has marked as wanted"),
verbose_name=_("wishlisted products"),
)
user: OneToOneField = OneToOneField( # type: ignore
"vibes_auth.User",
on_delete=CASCADE,
blank=True,
null=True,
help_text=_("user who owns this wishlist"),
verbose_name=_("wishlist owner"),
related_name="user_related_wishlist",
)
def __str__(self):
return f"{self.user.email}'s wishlist"
class Meta:
verbose_name = _("wishlist")
verbose_name_plural = _("wishlists")
def add_product(self, product_uuid):
try:
product = Product.objects.get(uuid=product_uuid)
if product in self.products.all():
return self
self.products.add(product)
except Product.DoesNotExist:
name = "Product"
raise Http404(_(f"{name} does not exist: {product_uuid}"))
return self
def remove_product(self, product_uuid):
try:
product = Product.objects.get(uuid=product_uuid)
if product not in self.products.all():
return self
self.products.remove(product)
except Product.DoesNotExist:
name = "Product"
raise Http404(_(f"{name} does not exist: {product_uuid}"))
return self
def bulk_add_products(self, product_uuids):
self.products.add(*Product.objects.filter(uuid__in=product_uuids))
return self
def bulk_remove_products(self, product_uuids):
self.products.remove(*Product.objects.filter(uuid__in=product_uuids))
return self
class Documentary(ExportModelOperationsMixin("attribute_group"), NiceModel):
is_publicly_visible = True
product: ForeignKey = ForeignKey(
to=Product, on_delete=CASCADE, related_name="documentaries"
)
document = FileField(upload_to=get_product_uuid_as_path)
class Meta:
verbose_name = _("documentary")
verbose_name_plural = _("documentaries")
def __str__(self):
return f"{self.product.name} - {self.document.name}"
def get_product_uuid_as_path(self, *args):
return str(self.product.uuid) + "/" + args[0]
@property
def file_type(self):
return self.document.name.split(".")[-1] or _("unresolved")
class Address(ExportModelOperationsMixin("address"), NiceModel):
is_publicly_visible = False
address_line: str = TextField( # type: ignore
blank=True,
null=True,
help_text=_("address line for the customer"),
verbose_name=_("address line"),
)
street: str = CharField(_("street"), max_length=255, null=True) # type: ignore
district: str = CharField(_("district"), max_length=255, null=True) # type: ignore
city: str = CharField(_("city"), max_length=100, null=True) # type: ignore
region: str = CharField(_("region"), max_length=100, null=True) # type: ignore
postal_code: str = CharField(_("postal code"), max_length=20, null=True) # type: ignore
country: str = CharField(_("country"), max_length=40, null=True) # type: ignore
location: PointField = PointField( # type: ignore
geography=True,
srid=4326,
null=True,
blank=True,
help_text=_("geolocation point: (longitude, latitude)"),
)
raw_data: dict = JSONField(blank=True, null=True, help_text=_("full JSON response from geocoder for this address")) # type: ignore
api_response: dict = JSONField( # type: ignore
blank=True,
null=True,
help_text=_("stored JSON response from the geocoding service"),
)
user: ForeignKey = ForeignKey(to="vibes_auth.User", on_delete=CASCADE, blank=True, null=True) # type: ignore
objects = AddressManager()
class Meta:
verbose_name = _("address")
verbose_name_plural = _("addresses")
indexes = [
Index(fields=["location"]),
]
def __str__(self):
base = f"{self.street}, {self.city}, {self.country}"
return f"{base} for {self.user.email}" if self.user else base
class PromoCode(ExportModelOperationsMixin("promocode"), NiceModel):
is_publicly_visible = False
code: str = CharField( # type: ignore
max_length=20,
unique=True,
default=get_random_code,
help_text=_("unique code used by a user to redeem a discount"),
verbose_name=_("promo code identifier"),
)
discount_amount: Decimal = DecimalField( # type: ignore
max_digits=10,
decimal_places=2,
blank=True,
null=True,
help_text=_("fixed discount amount applied if percent is not used"),
verbose_name=_("fixed discount amount"),
)
discount_percent: int = IntegerField( # type: ignore
validators=[MinValueValidator(1), MaxValueValidator(100)],
blank=True,
null=True,
help_text=_("percentage discount applied if fixed amount is not used"),
verbose_name=_("percentage discount"),
)
end_time: datetime = DateTimeField( # type: ignore
blank=True,
null=True,
help_text=_("timestamp when the promocode expires"),
verbose_name=_("end validity time"),
)
start_time: datetime = DateTimeField( # type: ignore
blank=True,
null=True,
help_text=_("timestamp from which this promocode is valid"),
verbose_name=_("start validity time"),
)
used_on: datetime = DateTimeField( # type: ignore
blank=True,
null=True,
help_text=_("timestamp when the promocode was used, blank if not used yet"),
verbose_name=_("usage timestamp"),
)
user: ForeignKey = ForeignKey( # type: ignore
"vibes_auth.User",
on_delete=CASCADE,
help_text=_("user assigned to this promocode if applicable"),
verbose_name=_("assigned user"),
null=True,
blank=True,
related_name="promocodes",
)
class Meta:
verbose_name = _("promo code")
verbose_name_plural = _("promo codes")
def save(self, **kwargs):
if (self.discount_amount is not None and self.discount_percent is not None) or (
self.discount_amount is None and self.discount_percent is None
):
raise ValidationError(
_(
"only one type of discount should be defined (amount or percent), but not both or neither."
)
)
super().save(**kwargs)
def __str__(self) -> str:
return self.code
@property
def discount_type(self):
if self.discount_amount is not None:
return "amount"
return "percent"
def use(self, order: "Order") -> float:
if self.used_on:
raise ValueError(_("promocode already used"))
amount = order.total_price
if self.discount_type == "percent":
amount -= round(amount * (self.discount_percent / 100), 2)
order.attributes.update(
{"promocode": str(self.uuid), "final_price": amount}
)
order.save()
elif self.discount_type == "amount":
amount -= round(float(self.discount_amount), 2)
order.attributes.update(
{"promocode": str(self.uuid), "final_price": amount}
)
order.save()
else:
raise ValueError(_(f"invalid discount type for promocode {self.uuid}"))
self.used_on = datetime.datetime.now()
self.save()
return amount
class Order(ExportModelOperationsMixin("order"), NiceModel):
is_publicly_visible = False
billing_address: Address = ForeignKey(
"core.Address",
on_delete=CASCADE,
blank=True,
null=True,
related_name="billing_address_order",
help_text=_("the billing address used for this order"),
verbose_name=_("billing address"),
)
promo_code: PromoCode = ForeignKey(
"core.PromoCode",
on_delete=PROTECT,
blank=True,
null=True,
help_text=_("optional promo code applied to this order"),
verbose_name=_("applied promo code"),
)
shipping_address: Address = ForeignKey(
"core.Address",
on_delete=CASCADE,
blank=True,
null=True,
related_name="shipping_address_order",
help_text=_("the shipping address used for this order"),
verbose_name=_("shipping address"),
)
status: str = CharField( # type: ignore
default="PENDING",
max_length=64,
choices=ORDER_STATUS_CHOICES,
help_text=_("current status of the order in its lifecycle"),
verbose_name=_("order status"),
)
notifications: dict = JSONField( # type: ignore
blank=True,
null=True,
help_text=_("json structure of notifications to display to users"),
verbose_name=_("notifications"),
)
attributes: dict = JSONField( # type: ignore
blank=True,
null=True,
help_text=_("json representation of order attributes for this order"),
verbose_name=_("attributes"),
)
user = ForeignKey( # type: ignore
"vibes_auth.User",
on_delete=CASCADE,
help_text=_("the user who placed the order"),
verbose_name=_("user"),
related_name="orders",
blank=True,
null=True,
)
buy_time: datetime = DateTimeField( # type: ignore
help_text=_("the timestamp when the order was finalized"),
verbose_name=_("buy time"),
default=None,
null=True,
blank=True,
)
human_readable_id: str = CharField( # type: ignore
max_length=8,
help_text=_("a human-readable identifier for the order"),
verbose_name=_("human readable id"),
unique=True,
default=generate_human_readable_id,
)
class Meta:
verbose_name = _("order")
verbose_name_plural = _("orders")
def __str__(self) -> str:
return f"#{self.pk} for {self.user.email if self.user else 'unregistered user'}" # type: ignore
@property
def is_business(self) -> bool:
return self.attributes.get("is_business", False) if self.attributes else False
def save(self, **kwargs):
pending_orders = 0
if self.user:
pending_orders = self.user.orders.filter(status="PENDING").count()
if self.status == "PENDING" and pending_orders > 1:
raise ValueError(_("a user must have only one pending order at a time"))
return super().save(**kwargs)
@property
def total_price(self) -> float:
return (
round(
sum(
(
order_product.buy_price * order_product.quantity
if order_product.status not in FAILED_STATUSES
and order_product.buy_price is not None
else 0.0
)
for order_product in self.order_products.all()
),
2,
)
or 0.0
)
@property
def total_quantity(self) -> int:
return sum([op.quantity for op in self.order_products.all()])
def add_product(
self,
product_uuid: str | None = None,
attributes: Optional[list] = None,
update_quantity: bool = True,
):
if attributes is None:
attributes = []
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(
_("you cannot add products to an order that is not a pending one")
)
try:
product = Product.objects.get(uuid=product_uuid)
if not product.is_active:
raise BadRequest(_("you cannot add inactive products to order"))
buy_price = product.price
promotions = Promotion.objects.filter(
is_active=True, products__in=[product]
).order_by("discount_percent")
if promotions.exists():
buy_price -= round(product.price * (promotions.first().discount_percent / 100), 2) # type: ignore
order_product, is_created = OrderProduct.objects.get_or_create(
product=product,
order=self,
attributes=json.dumps(attributes),
defaults={"quantity": 1, "buy_price": product.price},
)
if not is_created and update_quantity:
if product.quantity < order_product.quantity + 1:
raise BadRequest(
_("you cannot add more products than available in stock")
)
order_product.quantity += 1
order_product.buy_price = product.price
order_product.save()
return self
except Product.DoesNotExist:
name = "Product"
raise Http404(_(f"{name} does not exist: {product_uuid}"))
def remove_product(
self,
product_uuid: str | None = None,
attributes: dict | None = None,
zero_quantity: bool = False,
):
if attributes is None:
attributes = {}
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
try:
product = Product.objects.get(uuid=product_uuid)
order_product = self.order_products.get(product=product, order=self)
if zero_quantity:
order_product.delete()
return self
if order_product.quantity == 1:
self.order_products.remove(order_product)
order_product.delete()
else:
order_product.quantity -= 1
order_product.save()
return self
except Product.DoesNotExist:
name = "Product"
raise Http404(_(f"{name} does not exist: {product_uuid}"))
except OrderProduct.DoesNotExist:
name = "OrderProduct"
query = (
f"product: {product_uuid}, order: {self.uuid}, attributes: {attributes}"
)
raise Http404(_(f"{name} does not exist with query <{query}>"))
def remove_all_products(self):
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
for order_product in self.order_products.all():
self.order_products.remove(order_product)
order_product.delete()
return self
def remove_products_of_a_kind(self, product_uuid: str):
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
try:
product = Product.objects.get(uuid=product_uuid)
order_product = self.order_products.get(product=product, order=self)
self.order_products.remove(order_product)
order_product.delete()
except Product.DoesNotExist:
name = "Product"
raise Http404(_(f"{name} does not exist: {product_uuid}"))
return self
@property
def is_whole_digital(self):
return (
self.order_products.count()
== self.order_products.filter(product__is_digital=True).count()
)
def apply_promocode(self, promocode_uuid: str):
try:
promocode: PromoCode = PromoCode.objects.get(uuid=promocode_uuid)
except PromoCode.DoesNotExist:
raise Http404(_("promocode does not exist"))
return promocode.use(self)
def apply_addresses(self, billing_address_uuid, shipping_address_uuid):
try:
if not any([shipping_address_uuid, billing_address_uuid]):
if self.is_whole_digital:
return
else:
raise ValueError(
_(
"you can only buy physical products with shipping address specified"
)
)
if billing_address_uuid and not shipping_address_uuid:
shipping_address = Address.objects.get(uuid=billing_address_uuid)
billing_address = shipping_address
elif shipping_address_uuid and not billing_address_uuid:
billing_address = Address.objects.get(uuid=shipping_address_uuid)
shipping_address = billing_address
else:
billing_address = Address.objects.get(uuid=billing_address_uuid)
shipping_address = Address.objects.get(uuid=shipping_address_uuid)
self.billing_address = billing_address
self.shipping_address = shipping_address
self.save()
except Address.DoesNotExist:
raise Http404(_("address does not exist"))
def buy(
self,
force_balance: bool = False,
force_payment: bool = False,
promocode_uuid: str | None = None,
billing_address: str | None = None,
shipping_address: str | None = None,
) -> Self | Transaction | None:
if config.DISABLED_COMMERCE:
raise DisabledCommerceError(
_("you can not buy at this moment, please try again in a few minutes")
)
if (not force_balance and not force_payment) or (
force_balance and force_payment
):
raise ValueError(_("invalid force value"))
self.apply_addresses(billing_address, shipping_address)
if self.total_quantity < 1:
raise ValueError(_("you cannot purchase an empty order!"))
force = None
if force_balance:
force = "balance"
if force_payment:
force = "payment"
amount = (
self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
)
match force:
case "balance":
if self.user.payments_balance.amount < amount: # type: ignore
raise NotEnoughMoneyError(
_("insufficient funds to complete the order")
)
self.status = "CREATED"
self.buy_time = timezone.now()
self.order_products.all().update(status="DELIVERING")
self.save()
return self
case "payment":
self.status = "PAYMENT"
self.save()
return Transaction.objects.create(
balance=self.user.payments_balance, # type: ignore
amount=amount,
currency=CURRENCY_CODE,
order=self,
)
return self
def buy_without_registration(
self, products: list, promocode_uuid: str, **kwargs
) -> Transaction | None:
if config.DISABLED_COMMERCE:
raise DisabledCommerceError(
_("you can not buy at this moment, please try again in a few minutes")
)
if len(products) < 1:
raise ValueError(_("you cannot purchase an empty order!"))
customer_name = kwargs.get("customer_name")
customer_email = kwargs.get("customer_email")
customer_phone_number = kwargs.get("customer_phone_number")
if not all([customer_name, customer_email, customer_phone_number]):
raise ValueError(
_(
"you cannot buy without registration, please provide the following information:"
" customer name, customer email, customer phone number"
)
)
payment_method = kwargs.get("payment_method")
available_payment_methods = cache.get("payment_methods").get("payment_methods")
if payment_method not in available_payment_methods:
raise ValueError(
_(
f"invalid payment method: {payment_method} from {available_payment_methods}"
)
)
billing_customer_address_uuid = kwargs.get("billing_customer_address")
shipping_customer_address_uuid = kwargs.get("shipping_customer_address")
self.apply_addresses(
billing_customer_address_uuid, shipping_customer_address_uuid
)
for product_uuid in products:
self.add_product(product_uuid)
amount = (
self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
)
self.status = "CREATED"
if self.attributes is None:
self.attributes = {}
self.attributes.update(
{
"customer_name": customer_name,
"customer_email": customer_email,
"customer_phone_number": customer_phone_number,
"is_business": kwargs.get("is_business", False),
}
)
self.save()
return Transaction.objects.create(
amount=amount,
currency=CURRENCY_CODE,
order=self,
payment_method=kwargs.get("payment_method"),
)
def finalize(self):
if (
self.order_products.filter(
status__in=[
"ACCEPTED",
"FAILED",
"RETURNED",
"CANCELED",
"FINISHED",
]
).count()
== self.order_products.count()
):
self.status = "FINISHED"
self.save()
def bulk_add_products(self, products: list):
for product in products:
self.add_product(
product.get("uuid"),
attributes=product.get("attributes"),
update_quantity=False,
)
return self
def bulk_remove_products(self, products: list):
for product in products:
self.remove_product(
product.get("uuid"),
attributes=product.get("attributes"),
zero_quantity=True,
)
return self
class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel):
is_publicly_visible = False
buy_price: float = FloatField( # type: ignore
blank=True,
null=True,
help_text=_("the price paid by the customer for this product at purchase time"),
verbose_name=_("purchase price at order time"),
)
comments: str = TextField( # type: ignore
blank=True,
null=True,
help_text=_("internal comments for admins about this ordered product"),
verbose_name=_("internal comments"),
)
notifications: dict = JSONField( # type: ignore
blank=True,
null=True,
help_text=_("json structure of notifications to display to users"),
verbose_name=_("user notifications"),
)
attributes: dict = JSONField( # type: ignore
blank=True,
null=True,
help_text=_("json representation of this item's attributes"),
verbose_name=_("ordered product attributes"),
)
order: Order = ForeignKey( # type: ignore
"core.Order",
on_delete=CASCADE,
help_text=_("reference to the parent order that contains this product"),
verbose_name=_("parent order"),
related_name="order_products",
null=True,
)
product: Product = ForeignKey( # type: ignore
"core.Product",
on_delete=PROTECT,
blank=True,
null=True,
help_text=_("the specific product associated with this order line"),
verbose_name=_("associated product"),
)
quantity: int = PositiveIntegerField( # type: ignore
blank=False,
null=False,
default=1,
help_text=_("quantity of this specific product in the order"),
verbose_name=_("product quantity"),
)
status: str = CharField( # type: ignore
max_length=128,
blank=False,
null=False,
choices=ORDER_PRODUCT_STATUS_CHOICES,
help_text=_("current status of this product in the order"),
verbose_name=_("product line status"),
default="PENDING",
)
def __str__(self) -> str:
return f"{self.product.name} for ({self.order.user.email if self.order.user else 'unregistered user'})" # type: ignore
class Meta:
verbose_name = _("order product")
verbose_name_plural = _("order products")
indexes = [
GinIndex(fields=["notifications", "attributes"]),
]
def return_balance_back(self):
self.status = "RETURNED"
self.save()
self.order.user.payments_balance.amount += self.buy_price
self.order.user.payments_balance.save()
def add_error(self, error=None):
if self.notifications is not None:
order_product_errors = self.notifications.get("errors", [])
if not order_product_errors:
self.notifications.update(
{
"errors": [
{
"detail": (
error
if error
else f"Something went wrong with {self.uuid} for some reason..."
)
},
]
}
)
else:
order_product_errors.append({"detail": error})
self.notifications.update({"errors": order_product_errors})
else:
self.notifications = {"errors": [{"detail": error}]}
self.status = "FAILED"
self.save()
return self
@property
def total_price(self) -> float:
return round(self.buy_price * self.quantity, 2)
@property
def download_url(self) -> str:
if self.product.is_digital and self.product.stocks.first().digital_asset: # type: ignore
return self.download.url
return ""
def do_feedback(
self, rating: int = 10, comment: str = "", action: str = "add"
) -> Optional["Feedback"]:
if action not in ["add", "remove"]:
raise ValueError(_(f"wrong action specified for feedback: {action}"))
if action == "remove" and self.feedback:
self.feedback.delete()
return None
if action == "add" and not self.feedback:
if self.order.status not in ["MOMENTAL", "PENDING"]: # type: ignore
return Feedback.objects.create(
rating=rating, comment=comment, order_product=self
)
else:
raise ValueError(
_("you cannot feedback an order which is not received")
)
return None
class DigitalAssetDownload(ExportModelOperationsMixin("attribute_group"), NiceModel):
is_publicly_visible = False
order_product: OneToOneField = OneToOneField(to=OrderProduct, on_delete=CASCADE, related_name="download") # type: ignore
num_downloads: int = IntegerField(default=0) # type: ignore
class Meta:
verbose_name = _("download")
verbose_name_plural = _("downloads")
def __str__(self):
return f"{self.order_product} - {self.num_downloads}"
@property
def url(self):
if self.order_product.status != "FINISHED":
raise ValueError(
_("you can not download a digital asset for a non-finished order")
)
return f"https://api.{config.BASE_DOMAIN}/download/{urlsafe_base64_encode(force_bytes(self.order_product.uuid))}"
class Feedback(ExportModelOperationsMixin("feedback"), NiceModel):
is_publicly_visible = True
comment: str = TextField( # type: ignore
blank=True,
null=True,
help_text=_("user-provided comments about their experience with the product"),
verbose_name=_("feedback comments"),
)
order_product: OrderProduct = OneToOneField( # type: ignore
"core.OrderProduct",
on_delete=CASCADE,
blank=False,
null=False,
help_text=_(
"references the specific product in an order that this feedback is about"
),
verbose_name=_("related order product"),
)
rating: float = FloatField( # type: ignore
blank=True,
null=True,
help_text=_("user-assigned rating for the product"),
verbose_name=_("product rating"),
validators=[MinValueValidator(0), MaxValueValidator(10)],
)
def __str__(self) -> str:
return f"{self.rating} by {self.order_product.order.user.email}"
class Meta:
verbose_name = _("feedback")
verbose_name_plural = _("feedbacks")