schon/engine/core/utils/commerce.py
Egor fureunoir Gorbunov f664b088a4 refactor(category): replace cache usage with model property for min/max price
remove redundant cache lookups for `min_price` and `max_price` in the category model by leveraging cached properties. minimizes complexity and improves maintainability while ensuring consistent behavior.
2026-02-25 12:19:39 +03:00

308 lines
9.1 KiB
Python

from contextlib import suppress
from datetime import date, timedelta
from typing import Any
from constance import config
from django.db.models import Count, F, QuerySet, Sum
from django.db.models.functions import Coalesce, TruncDate
from django.urls import reverse
from django.utils.timezone import now
from engine.core.models import Category, Order, OrderProduct, Product
def get_period_order_products(
period: timedelta = timedelta(days=30), statuses: list[str] | None = None
) -> QuerySet[OrderProduct]:
if statuses is None:
statuses = ["FINISHED"]
current = now()
perioded = current - period
orders = Order.objects.filter(
status="FINISHED", buy_time__lte=current, buy_time__gte=perioded
)
return OrderProduct.objects.filter(status__in=statuses, order__in=orders)
def get_revenue(clear: bool = True, period: timedelta = timedelta(days=30)) -> float:
order_products = get_period_order_products(period)
total: float = (
order_products.aggregate(
total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0)
).get("total")
or 0.0
)
try:
total = float(total)
except (TypeError, ValueError):
total = 0.0
if not clear:
return round(float(total), 2)
try:
tax_rate = float(config.TAX_RATE or 0)
except (TypeError, ValueError):
tax_rate = 0.0
tax_included = False
with suppress(Exception):
tax_included = bool(getattr(config, "TAX_INCLUDED", False))
if tax_rate <= 0:
net = total
else:
if tax_included:
divisor = 1.0 + (tax_rate / 100.0)
if divisor <= 0:
net = total
else:
net = total / divisor
else:
net = total
return round(float(net or 0.0), 2)
def get_returns(period: timedelta = timedelta(days=30)) -> float:
"""Get total value of returned order products within the period.
Returns are counted regardless of order status - a RETURNED OrderProduct
counts as a return whether the order is FINISHED or FAILED.
"""
current = now()
period_start = current - period
total_returns: float = (
OrderProduct.objects.filter(
status="RETURNED",
order__buy_time__lte=current,
order__buy_time__gte=period_start,
)
.aggregate(total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0))
.get("total")
or 0.0
)
try:
return round(float(total_returns), 2)
except (TypeError, ValueError):
return 0.0
def get_total_processed_orders(period: timedelta = timedelta(days=30)) -> int:
return get_period_order_products(period, ["RETURNED", "FINISHED"]).count()
def get_daily_finished_orders_count(
period: timedelta = timedelta(days=30),
) -> dict[date, int]:
current = now()
period_start = current - period
qs = (
Order.objects.filter(
status="FINISHED", buy_time__lte=current, buy_time__gte=period_start
)
.annotate(day=TruncDate("buy_time"))
.values("day")
.annotate(cnt=Count("pk"))
.order_by("day")
)
result: dict[date, int] = {}
for row in qs:
d = row.get("day")
c = int(row.get("cnt", 0) or 0)
if d:
result[d] = c
return result
def get_daily_gross_revenue(
period: timedelta = timedelta(days=30),
) -> dict[date, float]:
qs = (
get_period_order_products(period, ["FINISHED"]) # OrderProduct queryset
.annotate(day=TruncDate("order__buy_time"))
.values("day")
.annotate(total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0))
.order_by("day")
)
result: dict[date, float] = {}
for row in qs:
d = row.get("day")
total = row.get("total") or 0.0
try:
total_f = round(float(total), 2)
except (TypeError, ValueError):
total_f = 0.0
if d:
result[d] = total_f
return result
def get_top_returned_products(
period: timedelta = timedelta(days=30), limit: int = 10
) -> list[dict[str, Any]]:
"""Get top returned products within the period.
Returns are counted regardless of order status - a RETURNED OrderProduct
counts as a return whether the order is FINISHED or FAILED.
"""
current = now()
period_start = current - period
qs = (
OrderProduct.objects.filter(
status="RETURNED",
order__buy_time__lte=current,
order__buy_time__gte=period_start,
product__isnull=False,
)
.values("product")
.annotate(
returned_qty=Coalesce(Sum("quantity"), 0),
returned_amount=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0),
)
.order_by("-returned_qty")[:limit]
)
result: list[dict[str, Any]] = []
prod_ids = [row["product"] for row in qs if row.get("product")]
products = Product.objects.filter(pk__in=prod_ids)
product_by_id = {p.pk: p for p in products}
for row in qs:
pid = row.get("product")
if not pid or pid not in product_by_id:
continue
p = product_by_id[pid]
img = ""
with suppress(Exception):
img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute]
result.append(
{
"name": p.name,
"image": img,
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
"count": int(row.get("returned_qty", 0) or 0),
"amount": round(float(row.get("returned_amount", 0.0) or 0.0), 2),
}
)
return result
def get_customer_mix(period: timedelta = timedelta(days=30)) -> dict[str, int]:
current = now()
period_start = current - period
period_users = (
Order.objects.filter(
status="FINISHED",
buy_time__lte=current,
buy_time__gte=period_start,
user__isnull=False,
)
.values_list("user_id", flat=True)
.distinct()
)
if not period_users:
return {"new": 0, "returning": 0}
lifetime_counts = (
Order.objects.filter(status="FINISHED", user_id__in=period_users)
.values("user_id")
.annotate(c=Count("pk"))
)
new_cnt = 0
ret_cnt = 0
for row in lifetime_counts:
c = int(row.get("c", 0) or 0)
if c <= 1:
new_cnt += 1
else:
ret_cnt += 1
return {"new": new_cnt, "returning": ret_cnt}
def get_top_categories_by_qty(
period: timedelta = timedelta(days=30), limit: int = 10
) -> list[dict[str, Any]]:
current = now()
period_start = current - period
qs = (
OrderProduct.objects.filter(
status="FINISHED",
order__status="FINISHED",
order__buy_time__lte=current,
order__buy_time__gte=period_start,
product__isnull=False,
product__category__isnull=False,
)
.values("product__category")
.annotate(
qty=Coalesce(Sum("quantity"), 0),
gross=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0),
)
.order_by("-qty", "-gross")[:limit]
)
cat_ids = [row["product__category"] for row in qs if row.get("product__category")]
cats = Category.objects.filter(pk__in=cat_ids)
cat_by_id = {c.pk: c for c in cats}
result: list[dict[str, Any]] = []
for row in qs:
cid = row.get("product__category")
if not cid or cid not in cat_by_id:
continue
c = cat_by_id[cid]
result.append(
{
"name": c.name,
"admin_url": reverse("admin:core_category_change", args=[c.pk]),
"qty": int(row.get("qty", 0) or 0),
"gross": round(float(row.get("gross", 0.0) or 0.0), 2),
}
)
return result
def get_shipped_vs_digital_mix(
period: timedelta = timedelta(days=30),
) -> dict[str, float | int]:
current = now()
period_start = current - period
qs = (
OrderProduct.objects.filter(
status="FINISHED",
order__status="FINISHED",
order__buy_time__lte=current,
order__buy_time__gte=period_start,
product__isnull=False,
)
.values("product__is_digital")
.annotate(
qty=Coalesce(Sum("quantity"), 0),
gross=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0),
)
)
digital_qty = 0
shipped_qty = 0
digital_gross = 0.0
shipped_gross = 0.0
for row in qs:
is_digital = bool(row.get("product__is_digital"))
q = int(row.get("qty", 0) or 0)
g = row.get("gross", 0.0) or 0.0
try:
g = float(g)
except (TypeError, ValueError):
g = 0.0
if is_digital:
digital_qty += q
digital_gross += g
else:
shipped_qty += q
shipped_gross += g
return {
"digital_qty": int(digital_qty),
"shipped_qty": int(shipped_qty),
"digital_gross": round(float(digital_gross), 2),
"shipped_gross": round(float(shipped_gross), 2),
}