remove redundant cache lookups for `min_price` and `max_price` in the category model by leveraging cached properties. minimizes complexity and improves maintainability while ensuring consistent behavior.
308 lines
9.1 KiB
Python
308 lines
9.1 KiB
Python
from contextlib import suppress
|
|
from datetime import date, timedelta
|
|
from typing import Any
|
|
|
|
from constance import config
|
|
from django.db.models import Count, F, QuerySet, Sum
|
|
from django.db.models.functions import Coalesce, TruncDate
|
|
from django.urls import reverse
|
|
from django.utils.timezone import now
|
|
|
|
from engine.core.models import Category, Order, OrderProduct, Product
|
|
|
|
|
|
def get_period_order_products(
|
|
period: timedelta = timedelta(days=30), statuses: list[str] | None = None
|
|
) -> QuerySet[OrderProduct]:
|
|
if statuses is None:
|
|
statuses = ["FINISHED"]
|
|
current = now()
|
|
perioded = current - period
|
|
orders = Order.objects.filter(
|
|
status="FINISHED", buy_time__lte=current, buy_time__gte=perioded
|
|
)
|
|
return OrderProduct.objects.filter(status__in=statuses, order__in=orders)
|
|
|
|
|
|
def get_revenue(clear: bool = True, period: timedelta = timedelta(days=30)) -> float:
|
|
order_products = get_period_order_products(period)
|
|
total: float = (
|
|
order_products.aggregate(
|
|
total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0)
|
|
).get("total")
|
|
or 0.0
|
|
)
|
|
try:
|
|
total = float(total)
|
|
except (TypeError, ValueError):
|
|
total = 0.0
|
|
|
|
if not clear:
|
|
return round(float(total), 2)
|
|
|
|
try:
|
|
tax_rate = float(config.TAX_RATE or 0)
|
|
except (TypeError, ValueError):
|
|
tax_rate = 0.0
|
|
|
|
tax_included = False
|
|
with suppress(Exception):
|
|
tax_included = bool(getattr(config, "TAX_INCLUDED", False))
|
|
|
|
if tax_rate <= 0:
|
|
net = total
|
|
else:
|
|
if tax_included:
|
|
divisor = 1.0 + (tax_rate / 100.0)
|
|
if divisor <= 0:
|
|
net = total
|
|
else:
|
|
net = total / divisor
|
|
else:
|
|
net = total
|
|
|
|
return round(float(net or 0.0), 2)
|
|
|
|
|
|
def get_returns(period: timedelta = timedelta(days=30)) -> float:
|
|
"""Get total value of returned order products within the period.
|
|
|
|
Returns are counted regardless of order status - a RETURNED OrderProduct
|
|
counts as a return whether the order is FINISHED or FAILED.
|
|
"""
|
|
current = now()
|
|
period_start = current - period
|
|
total_returns: float = (
|
|
OrderProduct.objects.filter(
|
|
status="RETURNED",
|
|
order__buy_time__lte=current,
|
|
order__buy_time__gte=period_start,
|
|
)
|
|
.aggregate(total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0))
|
|
.get("total")
|
|
or 0.0
|
|
)
|
|
try:
|
|
return round(float(total_returns), 2)
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
|
|
|
|
def get_total_processed_orders(period: timedelta = timedelta(days=30)) -> int:
|
|
return get_period_order_products(period, ["RETURNED", "FINISHED"]).count()
|
|
|
|
|
|
def get_daily_finished_orders_count(
|
|
period: timedelta = timedelta(days=30),
|
|
) -> dict[date, int]:
|
|
current = now()
|
|
period_start = current - period
|
|
qs = (
|
|
Order.objects.filter(
|
|
status="FINISHED", buy_time__lte=current, buy_time__gte=period_start
|
|
)
|
|
.annotate(day=TruncDate("buy_time"))
|
|
.values("day")
|
|
.annotate(cnt=Count("pk"))
|
|
.order_by("day")
|
|
)
|
|
result: dict[date, int] = {}
|
|
for row in qs:
|
|
d = row.get("day")
|
|
c = int(row.get("cnt", 0) or 0)
|
|
if d:
|
|
result[d] = c
|
|
return result
|
|
|
|
|
|
def get_daily_gross_revenue(
|
|
period: timedelta = timedelta(days=30),
|
|
) -> dict[date, float]:
|
|
qs = (
|
|
get_period_order_products(period, ["FINISHED"]) # OrderProduct queryset
|
|
.annotate(day=TruncDate("order__buy_time"))
|
|
.values("day")
|
|
.annotate(total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0))
|
|
.order_by("day")
|
|
)
|
|
result: dict[date, float] = {}
|
|
for row in qs:
|
|
d = row.get("day")
|
|
total = row.get("total") or 0.0
|
|
try:
|
|
total_f = round(float(total), 2)
|
|
except (TypeError, ValueError):
|
|
total_f = 0.0
|
|
if d:
|
|
result[d] = total_f
|
|
return result
|
|
|
|
|
|
def get_top_returned_products(
|
|
period: timedelta = timedelta(days=30), limit: int = 10
|
|
) -> list[dict[str, Any]]:
|
|
"""Get top returned products within the period.
|
|
|
|
Returns are counted regardless of order status - a RETURNED OrderProduct
|
|
counts as a return whether the order is FINISHED or FAILED.
|
|
"""
|
|
current = now()
|
|
period_start = current - period
|
|
qs = (
|
|
OrderProduct.objects.filter(
|
|
status="RETURNED",
|
|
order__buy_time__lte=current,
|
|
order__buy_time__gte=period_start,
|
|
product__isnull=False,
|
|
)
|
|
.values("product")
|
|
.annotate(
|
|
returned_qty=Coalesce(Sum("quantity"), 0),
|
|
returned_amount=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0),
|
|
)
|
|
.order_by("-returned_qty")[:limit]
|
|
)
|
|
|
|
result: list[dict[str, Any]] = []
|
|
prod_ids = [row["product"] for row in qs if row.get("product")]
|
|
products = Product.objects.filter(pk__in=prod_ids)
|
|
product_by_id = {p.pk: p for p in products}
|
|
for row in qs:
|
|
pid = row.get("product")
|
|
if not pid or pid not in product_by_id:
|
|
continue
|
|
p = product_by_id[pid]
|
|
img = ""
|
|
with suppress(Exception):
|
|
img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute]
|
|
result.append(
|
|
{
|
|
"name": p.name,
|
|
"image": img,
|
|
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
|
|
"count": int(row.get("returned_qty", 0) or 0),
|
|
"amount": round(float(row.get("returned_amount", 0.0) or 0.0), 2),
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
def get_customer_mix(period: timedelta = timedelta(days=30)) -> dict[str, int]:
|
|
current = now()
|
|
period_start = current - period
|
|
period_users = (
|
|
Order.objects.filter(
|
|
status="FINISHED",
|
|
buy_time__lte=current,
|
|
buy_time__gte=period_start,
|
|
user__isnull=False,
|
|
)
|
|
.values_list("user_id", flat=True)
|
|
.distinct()
|
|
)
|
|
if not period_users:
|
|
return {"new": 0, "returning": 0}
|
|
|
|
lifetime_counts = (
|
|
Order.objects.filter(status="FINISHED", user_id__in=period_users)
|
|
.values("user_id")
|
|
.annotate(c=Count("pk"))
|
|
)
|
|
new_cnt = 0
|
|
ret_cnt = 0
|
|
for row in lifetime_counts:
|
|
c = int(row.get("c", 0) or 0)
|
|
if c <= 1:
|
|
new_cnt += 1
|
|
else:
|
|
ret_cnt += 1
|
|
return {"new": new_cnt, "returning": ret_cnt}
|
|
|
|
|
|
def get_top_categories_by_qty(
|
|
period: timedelta = timedelta(days=30), limit: int = 10
|
|
) -> list[dict[str, Any]]:
|
|
current = now()
|
|
period_start = current - period
|
|
qs = (
|
|
OrderProduct.objects.filter(
|
|
status="FINISHED",
|
|
order__status="FINISHED",
|
|
order__buy_time__lte=current,
|
|
order__buy_time__gte=period_start,
|
|
product__isnull=False,
|
|
product__category__isnull=False,
|
|
)
|
|
.values("product__category")
|
|
.annotate(
|
|
qty=Coalesce(Sum("quantity"), 0),
|
|
gross=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0),
|
|
)
|
|
.order_by("-qty", "-gross")[:limit]
|
|
)
|
|
|
|
cat_ids = [row["product__category"] for row in qs if row.get("product__category")]
|
|
cats = Category.objects.filter(pk__in=cat_ids)
|
|
cat_by_id = {c.pk: c for c in cats}
|
|
result: list[dict[str, Any]] = []
|
|
for row in qs:
|
|
cid = row.get("product__category")
|
|
if not cid or cid not in cat_by_id:
|
|
continue
|
|
c = cat_by_id[cid]
|
|
result.append(
|
|
{
|
|
"name": c.name,
|
|
"admin_url": reverse("admin:core_category_change", args=[c.pk]),
|
|
"qty": int(row.get("qty", 0) or 0),
|
|
"gross": round(float(row.get("gross", 0.0) or 0.0), 2),
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
def get_shipped_vs_digital_mix(
|
|
period: timedelta = timedelta(days=30),
|
|
) -> dict[str, float | int]:
|
|
current = now()
|
|
period_start = current - period
|
|
qs = (
|
|
OrderProduct.objects.filter(
|
|
status="FINISHED",
|
|
order__status="FINISHED",
|
|
order__buy_time__lte=current,
|
|
order__buy_time__gte=period_start,
|
|
product__isnull=False,
|
|
)
|
|
.values("product__is_digital")
|
|
.annotate(
|
|
qty=Coalesce(Sum("quantity"), 0),
|
|
gross=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0),
|
|
)
|
|
)
|
|
|
|
digital_qty = 0
|
|
shipped_qty = 0
|
|
digital_gross = 0.0
|
|
shipped_gross = 0.0
|
|
for row in qs:
|
|
is_digital = bool(row.get("product__is_digital"))
|
|
q = int(row.get("qty", 0) or 0)
|
|
g = row.get("gross", 0.0) or 0.0
|
|
try:
|
|
g = float(g)
|
|
except (TypeError, ValueError):
|
|
g = 0.0
|
|
if is_digital:
|
|
digital_qty += q
|
|
digital_gross += g
|
|
else:
|
|
shipped_qty += q
|
|
shipped_gross += g
|
|
|
|
return {
|
|
"digital_qty": int(digital_qty),
|
|
"shipped_qty": int(shipped_qty),
|
|
"digital_gross": round(float(digital_gross), 2),
|
|
"shipped_gross": round(float(shipped_gross), 2),
|
|
}
|