from contextlib import suppress from datetime import date, timedelta from typing import Any from constance import config from django.db.models import Count, F, QuerySet, Sum from django.db.models.functions import Coalesce, TruncDate from django.urls import reverse from django.utils.timezone import now from engine.core.models import Category, Order, OrderProduct, Product def get_period_order_products( period: timedelta = timedelta(days=30), statuses: list[str] | None = None ) -> QuerySet[OrderProduct]: if statuses is None: statuses = ["FINISHED"] current = now() perioded = current - period orders = Order.objects.filter( status="FINISHED", buy_time__lte=current, buy_time__gte=perioded ) return OrderProduct.objects.filter(status__in=statuses, order__in=orders) def get_revenue(clear: bool = True, period: timedelta = timedelta(days=30)) -> float: order_products = get_period_order_products(period) total: float = ( order_products.aggregate( total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0) ).get("total") or 0.0 ) try: total = float(total) except (TypeError, ValueError): total = 0.0 if not clear: return round(float(total), 2) try: tax_rate = float(config.TAX_RATE or 0) except (TypeError, ValueError): tax_rate = 0.0 tax_included = False with suppress(Exception): tax_included = bool(getattr(config, "TAX_INCLUDED", False)) if tax_rate <= 0: net = total else: if tax_included: divisor = 1.0 + (tax_rate / 100.0) if divisor <= 0: net = total else: net = total / divisor else: net = total return round(float(net or 0.0), 2) def get_returns(period: timedelta = timedelta(days=30)) -> float: """Get total value of returned order products within the period. Returns are counted regardless of order status - a RETURNED OrderProduct counts as a return whether the order is FINISHED or FAILED. """ current = now() period_start = current - period total_returns: float = ( OrderProduct.objects.filter( status="RETURNED", order__buy_time__lte=current, order__buy_time__gte=period_start, ) .aggregate(total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0)) .get("total") or 0.0 ) try: return round(float(total_returns), 2) except (TypeError, ValueError): return 0.0 def get_total_processed_orders(period: timedelta = timedelta(days=30)) -> int: return get_period_order_products(period, ["RETURNED", "FINISHED"]).count() def get_daily_finished_orders_count( period: timedelta = timedelta(days=30), ) -> dict[date, int]: current = now() period_start = current - period qs = ( Order.objects.filter( status="FINISHED", buy_time__lte=current, buy_time__gte=period_start ) .annotate(day=TruncDate("buy_time")) .values("day") .annotate(cnt=Count("pk")) .order_by("day") ) result: dict[date, int] = {} for row in qs: d = row.get("day") c = int(row.get("cnt", 0) or 0) if d: result[d] = c return result def get_daily_gross_revenue( period: timedelta = timedelta(days=30), ) -> dict[date, float]: qs = ( get_period_order_products(period, ["FINISHED"]) # OrderProduct queryset .annotate(day=TruncDate("order__buy_time")) .values("day") .annotate(total=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0)) .order_by("day") ) result: dict[date, float] = {} for row in qs: d = row.get("day") total = row.get("total") or 0.0 try: total_f = round(float(total), 2) except (TypeError, ValueError): total_f = 0.0 if d: result[d] = total_f return result def get_top_returned_products( period: timedelta = timedelta(days=30), limit: int = 10 ) -> list[dict[str, Any]]: """Get top returned products within the period. Returns are counted regardless of order status - a RETURNED OrderProduct counts as a return whether the order is FINISHED or FAILED. """ current = now() period_start = current - period qs = ( OrderProduct.objects.filter( status="RETURNED", order__buy_time__lte=current, order__buy_time__gte=period_start, product__isnull=False, ) .values("product") .annotate( returned_qty=Coalesce(Sum("quantity"), 0), returned_amount=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0), ) .order_by("-returned_qty")[:limit] ) result: list[dict[str, Any]] = [] prod_ids = [row["product"] for row in qs if row.get("product")] products = Product.objects.filter(pk__in=prod_ids) product_by_id = {p.pk: p for p in products} for row in qs: pid = row.get("product") if not pid or pid not in product_by_id: continue p = product_by_id[pid] img = "" with suppress(Exception): img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute] result.append( { "name": p.name, "image": img, "admin_url": reverse("admin:core_product_change", args=[p.pk]), "count": int(row.get("returned_qty", 0) or 0), "amount": round(float(row.get("returned_amount", 0.0) or 0.0), 2), } ) return result def get_customer_mix(period: timedelta = timedelta(days=30)) -> dict[str, int]: current = now() period_start = current - period period_users = ( Order.objects.filter( status="FINISHED", buy_time__lte=current, buy_time__gte=period_start, user__isnull=False, ) .values_list("user_id", flat=True) .distinct() ) if not period_users: return {"new": 0, "returning": 0} lifetime_counts = ( Order.objects.filter(status="FINISHED", user_id__in=period_users) .values("user_id") .annotate(c=Count("pk")) ) new_cnt = 0 ret_cnt = 0 for row in lifetime_counts: c = int(row.get("c", 0) or 0) if c <= 1: new_cnt += 1 else: ret_cnt += 1 return {"new": new_cnt, "returning": ret_cnt} def get_top_categories_by_qty( period: timedelta = timedelta(days=30), limit: int = 10 ) -> list[dict[str, Any]]: current = now() period_start = current - period qs = ( OrderProduct.objects.filter( status="FINISHED", order__status="FINISHED", order__buy_time__lte=current, order__buy_time__gte=period_start, product__isnull=False, product__category__isnull=False, ) .values("product__category") .annotate( qty=Coalesce(Sum("quantity"), 0), gross=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0), ) .order_by("-qty", "-gross")[:limit] ) cat_ids = [row["product__category"] for row in qs if row.get("product__category")] cats = Category.objects.filter(pk__in=cat_ids) cat_by_id = {c.pk: c for c in cats} result: list[dict[str, Any]] = [] for row in qs: cid = row.get("product__category") if not cid or cid not in cat_by_id: continue c = cat_by_id[cid] result.append( { "name": c.name, "admin_url": reverse("admin:core_category_change", args=[c.pk]), "qty": int(row.get("qty", 0) or 0), "gross": round(float(row.get("gross", 0.0) or 0.0), 2), } ) return result def get_shipped_vs_digital_mix( period: timedelta = timedelta(days=30), ) -> dict[str, float | int]: current = now() period_start = current - period qs = ( OrderProduct.objects.filter( status="FINISHED", order__status="FINISHED", order__buy_time__lte=current, order__buy_time__gte=period_start, product__isnull=False, ) .values("product__is_digital") .annotate( qty=Coalesce(Sum("quantity"), 0), gross=Coalesce(Sum(F("buy_price") * F("quantity")), 0.0), ) ) digital_qty = 0 shipped_qty = 0 digital_gross = 0.0 shipped_gross = 0.0 for row in qs: is_digital = bool(row.get("product__is_digital")) q = int(row.get("qty", 0) or 0) g = row.get("gross", 0.0) or 0.0 try: g = float(g) except (TypeError, ValueError): g = 0.0 if is_digital: digital_qty += q digital_gross += g else: shipped_qty += q shipped_gross += g return { "digital_qty": int(digital_qty), "shipped_qty": int(shipped_qty), "digital_gross": round(float(digital_gross), 2), "shipped_gross": round(float(shipped_gross), 2), }