schon/engine/core/views.py
Egor fureunoir Gorbunov b914b5fc91 feat(graphene): replace Float with Decimal for price and discount_price
Switched Float fields to Decimal in Graphene for increased precision in monetary values. Updated related queries and resolvers accordingly.

Additionally:
- Added custom DynamicMail health check for runtime-based email configuration.
- Enhanced admin login flow to handle unconfigured email backends by logging users directly.
2026-03-05 14:00:49 +03:00

911 lines
32 KiB
Python

import logging
import mimetypes
import os
import traceback
from contextlib import suppress
from datetime import date, timedelta
from decimal import Decimal
from os import getenv
import orjson
import requests
from constance import config
from django.conf import settings
from django.contrib.sitemaps.views import index as _sitemap_index_view
from django.contrib.sitemaps.views import sitemap as _sitemap_detail_view
from django.core.cache import cache
from django.core.exceptions import BadRequest
from django.db.models import Count, F, Sum
from django.http import (
FileResponse,
HttpRequest,
HttpResponse,
HttpResponseRedirect,
JsonResponse,
)
from django.shortcuts import redirect
from django.template import Context
from django.urls import reverse
from django.utils.decorators import method_decorator
from django.utils.http import urlsafe_base64_decode
from django.utils.timezone import now as tz_now
from django.utils.translation import gettext_lazy as _
from django.views.decorators.cache import cache_page
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.vary import vary_on_headers
from django.views.generic import TemplateView
from django_ratelimit.decorators import ratelimit
from drf_spectacular.utils import extend_schema_view
from drf_spectacular.views import SpectacularAPIView
from graphene_file_upload.django import FileUploadGraphQLView
from graphql.validation import NoSchemaIntrospectionCustomRule
from rest_framework import status
from rest_framework.parsers import MultiPartParser
from rest_framework.permissions import AllowAny, IsAdminUser
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView
from sentry_sdk import capture_exception
from engine.core.docs.drf.views import (
BUY_AS_BUSINESS_SCHEMA,
CACHE_SCHEMA,
CONTACT_US_SCHEMA,
CUSTOM_OPENAPI_SCHEMA,
DOWNLOAD_DIGITAL_ASSET_SCHEMA,
LANGUAGE_SCHEMA,
PARAMETERS_SCHEMA,
REQUEST_CURSED_URL_SCHEMA,
SEARCH_SCHEMA,
)
from engine.core.elasticsearch import process_query
from engine.core.models import (
DigitalAssetDownload,
Order,
OrderProduct,
PastedImage,
Product,
Wishlist,
)
from engine.core.serializers import (
BuyAsBusinessOrderSerializer,
CacheOperatorSerializer,
ContactUsSerializer,
LanguageSerializer,
)
from engine.core.utils import get_project_parameters, is_url_safe
from engine.core.utils.caching import web_cache
from engine.core.utils.commerce import (
get_customer_mix,
get_daily_finished_orders_count,
get_daily_gross_revenue,
get_returns,
get_revenue,
get_shipped_vs_digital_mix,
get_top_categories_by_qty,
get_top_returned_products,
get_total_processed_orders,
)
from engine.core.utils.emailing import contact_us_email
from engine.core.utils.languages import get_flag_by_language
from engine.payments.serializers import TransactionProcessSerializer
from schon.graphql_validators import QueryDepthLimitRule
from schon.utils.renderers import camelize
logger = logging.getLogger(__name__)
@cache_page(60 * 60 * 12)
@vary_on_headers("Host")
def sitemap_index(request, *args, **kwargs):
response = _sitemap_index_view(request, *args, **kwargs)
response["Content-Type"] = "application/xml; charset=utf-8"
return response
# noinspection PyTypeChecker
sitemap_index.__doc__ = _( # ty:ignore[invalid-assignment]
"Handles the request for the sitemap index and returns an XML response. "
"It ensures the response includes the appropriate content type header for XML."
)
@cache_page(60 * 60 * 24)
@vary_on_headers("Host")
def sitemap_detail(request, *args, **kwargs):
response = _sitemap_detail_view(request, *args, **kwargs)
response["Content-Type"] = "application/xml; charset=utf-8"
return response
# noinspection PyTypeChecker
sitemap_detail.__doc__ = _( # ty:ignore[invalid-assignment]
"Handles the detailed view response for a sitemap. "
"This function processes the request, fetches the appropriate "
"sitemap detail response, and sets the Content-Type header for XML."
)
_graphql_validation_rules = [QueryDepthLimitRule]
if getenv("GRAPHQL_INTROSPECTION", "").lower() not in ("1", "true", "yes"):
_graphql_validation_rules.append(NoSchemaIntrospectionCustomRule)
class CustomGraphQLView(FileUploadGraphQLView):
validation_rules = tuple(_graphql_validation_rules)
def get_context(self, request):
return request
def json_encode(self, request, d, pretty=False):
def _default(obj):
if isinstance(obj, Decimal):
return float(obj)
raise TypeError(
f"Object of type {type(obj).__name__} is not JSON serializable"
)
opts = orjson.OPT_NON_STR_KEYS
if pretty or request.GET.get("pretty"):
opts |= orjson.OPT_INDENT_2 | orjson.OPT_SORT_KEYS
return orjson.dumps(d, default=_default, option=opts).decode("utf-8")
@extend_schema_view(**CUSTOM_OPENAPI_SCHEMA)
class CustomSpectacularAPIView(SpectacularAPIView):
def get(self, request, *args, **kwargs):
return super().get(request, *args, **kwargs)
class RapiDocView(TemplateView):
template_name = "rapidoc.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["title"] = settings.SPECTACULAR_SETTINGS.get("TITLE", "API")
context["schema_url"] = self.request.build_absolute_uri("/docs/schema/")
return context
@extend_schema_view(**LANGUAGE_SCHEMA)
class SupportedLanguagesView(APIView):
__doc__ = _( # pyright: ignore[reportUnknownVariableType]
"Returns a list of supported languages and their corresponding information."
)
serializer_class = LanguageSerializer
permission_classes = [
AllowAny,
]
def get(self, request: Request, *args, **kwargs) -> Response:
return Response(
data=self.serializer_class(
[
{
"code": lang[0],
"name": lang[1],
"flag": get_flag_by_language(lang[0]),
}
for lang in settings.LANGUAGES
],
many=True,
).data,
status=status.HTTP_200_OK,
)
@extend_schema_view(**PARAMETERS_SCHEMA)
class WebsiteParametersView(APIView):
__doc__ = _("Returns the parameters of the website as a JSON object.")
serializer_class = None
permission_classes = [
AllowAny,
]
def get(self, request: Request, *args, **kwargs) -> Response:
return Response(
data=camelize(get_project_parameters()), status=status.HTTP_200_OK
)
@extend_schema_view(**CACHE_SCHEMA)
class CacheOperatorView(APIView):
__doc__ = _(
"Handles cache operations such as reading and setting cache data with a specified key and timeout."
)
serializer_class = CacheOperatorSerializer
permission_classes = [
AllowAny,
]
def post(self, request: Request, *args, **kwargs) -> Response:
return Response(
data=web_cache(
request,
request.data.get("key") or "",
request.data.get("data", {}),
request.data.get("timeout"),
),
status=status.HTTP_200_OK,
)
@extend_schema_view(**CONTACT_US_SCHEMA)
class ContactUsView(APIView):
__doc__ = _("Handles `contact us` form submissions.")
serializer_class = ContactUsSerializer
@method_decorator(ratelimit(key="ip", rate="2/h", method="POST", block=True))
def post(self, request: Request, *args, **kwargs) -> Response:
serializer = self.serializer_class(data=request.data)
serializer.is_valid(raise_exception=True)
contact_us_email.delay(serializer.validated_data)
return Response(data=serializer.data, status=status.HTTP_200_OK)
@extend_schema_view(**REQUEST_CURSED_URL_SCHEMA)
class RequestCursedURLView(APIView):
__doc__ = _(
"Handles requests for processing and validating URLs from incoming POST requests."
)
permission_classes = [
AllowAny,
]
@method_decorator(ratelimit(key="ip", rate="10/h"))
def post(self, request: Request, *args, **kwargs) -> Response:
url = request.data.get("url")
if not is_url_safe(str(url)):
return Response(
data={"error": _("only URLs starting with http(s):// are allowed")},
status=status.HTTP_400_BAD_REQUEST,
)
try:
data = cache.get(url, None)
if not data:
response = requests.get(
str(url), headers={"content-type": "application/json"}
)
response.raise_for_status()
data = camelize(response.json())
cache.set(url, data, 86400)
return Response(
data=data,
status=status.HTTP_200_OK,
)
except Exception as e:
return Response(
data={"error": str(e)},
status=status.HTTP_400_BAD_REQUEST,
)
@extend_schema_view(**SEARCH_SCHEMA)
class GlobalSearchView(APIView):
__doc__ = _("Handles global search queries.")
def get(self, request: Request, *args, **kwargs) -> Response:
return Response(
camelize(
{
"results": process_query(
query=request.GET.get("q", "").strip(), request=request
)
}
)
)
@extend_schema_view(**BUY_AS_BUSINESS_SCHEMA)
class BuyAsBusinessView(APIView):
__doc__ = _("Handles the logic of buying as a business without registration.")
# noinspection PyUnusedLocal
@method_decorator(
ratelimit(key="ip", rate="10/h" if not settings.DEBUG else "888/h")
)
def post(self, request: Request, *args, **kwargs) -> Response:
serializer = BuyAsBusinessOrderSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
order = Order.objects.create(status="MOMENTAL")
products = [
product.get("product_uuid")
for product in serializer.validated_data.get("products")
]
try:
transaction = order.buy_without_registration(
products=products,
promocode_uuid=serializer.validated_data.get("promocode_uuid"),
customer_name=serializer.validated_data.get("business_identificator"),
customer_email=serializer.validated_data.get("business_email"),
customer_phone_number=serializer.validated_data.get(
"business_phone_number"
),
billing_customer_address=serializer.validated_data.get(
"billing_business_address_uuid"
),
shipping_customer_address=serializer.validated_data.get(
"shipping_business_address_uuid"
),
payment_method=serializer.validated_data.get("payment_method"),
is_business=True,
)
return Response(
status=status.HTTP_201_CREATED,
data=TransactionProcessSerializer(transaction).data,
)
except Exception as e:
order.is_active = False
order.save()
return Response(
status=status.HTTP_400_BAD_REQUEST,
data={"detail": str(e)},
)
@extend_schema_view(**DOWNLOAD_DIGITAL_ASSET_SCHEMA)
class DownloadDigitalAssetView(APIView):
__doc__ = _(
"Handles the downloading of a digital asset associated with an order.\n"
"This function attempts to serve the digital asset file located in the "
"storage directory of the project. If the file is not found, an HTTP 404 "
"error is raised to indicate the resource is unavailable."
)
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse | FileResponse:
try:
op_uuid = str(kwargs.get("order_product_uuid"))
if not op_uuid:
raise BadRequest(_("order_product_uuid is required"))
uuid = urlsafe_base64_decode(op_uuid).decode("utf-8")
try:
order_product = OrderProduct.objects.get(uuid=uuid)
except OrderProduct.DoesNotExist as dne:
raise BadRequest(_("order product does not exist")) from dne
if order_product.download.num_downloads >= 1:
raise BadRequest(_("you can only download the digital asset once"))
if order_product.download.order_product.status != "FINISHED":
raise BadRequest(
_("the order must be paid before downloading the digital asset")
)
order_product.download.num_downloads += 1
order_product.download.save()
if not order_product.download.order_product.product:
raise BadRequest(_("the order product does not have a product"))
file_path = order_product.download.order_product.product.stocks.first().digital_asset.path
content_type, encoding = mimetypes.guess_type(file_path)
if not content_type:
content_type = "application/octet-stream"
response = FileResponse(open(file_path, "rb"), content_type=content_type)
filename = os.path.basename(file_path)
response["Content-Disposition"] = f'attachment; filename="{filename}"'
return response
except BadRequest as e:
return Response(
data=camelize({"error": str(e)}), status=status.HTTP_400_BAD_REQUEST
)
except DigitalAssetDownload.DoesNotExist:
return Response(
data=camelize({"error": "Digital asset not found"}),
status=status.HTTP_404_NOT_FOUND,
)
except Exception as e:
capture_exception(e)
return Response(
data=camelize(
{
"error": "An error occurred while trying to download the digital asset",
"detail": traceback.format_exc() if settings.DEBUG else None,
}
),
status=status.HTTP_503_SERVICE_UNAVAILABLE,
)
@csrf_exempt
def favicon_view(request: HttpRequest) -> HttpResponse | FileResponse:
try:
favicon_path = os.path.join(settings.BASE_DIR, "static/favicon.png")
return FileResponse(open(favicon_path, "rb"), content_type="image/x-icon")
except FileNotFoundError:
return HttpResponse(status=404, content=_("favicon not found"))
# noinspection PyTypeChecker
favicon_view.__doc__ = str(
_(
"Handles requests for the favicon of a website.\n"
"This function attempts to serve the favicon file located in the static directory of the project. "
"If the favicon file is not found, an HTTP 404 error is raised to indicate the resource is unavailable."
)
)
def index(request: HttpRequest, *args, **kwargs) -> HttpResponse | HttpResponseRedirect:
return redirect("admin:index")
# noinspection PyTypeChecker
index.__doc__ = str(
_(
"Redirects the request to the admin index page. "
"The function handles incoming HTTP requests and redirects them to the Django "
"admin interface index page. It uses Django's `redirect` function for handling "
"the HTTP redirection."
)
)
def version(request: HttpRequest, *args, **kwargs) -> HttpResponse:
return JsonResponse(camelize({"version": settings.SCHON_VERSION}), status=200)
# noinspection PyTypeChecker
version.__doc__ = str(_("Returns current version of the Schon. "))
class PastedImageUploadView(APIView):
permission_classes = [IsAdminUser]
parser_classes = [MultiPartParser]
def post(self, request: Request, *args, **kwargs) -> Response:
image = request.FILES.get("image")
if not image:
return Response(
{"error": _("no image file provided")},
status=status.HTTP_400_BAD_REQUEST,
)
obj = PastedImage.objects.create(
name=image.name or "pasted",
image=image,
)
return Response(
{"data": {"filePath": request.build_absolute_uri(obj.image.url)}},
status=status.HTTP_201_CREATED,
)
def dashboard_callback(request: HttpRequest, context: Context) -> Context:
tf_map: dict[str, int] = {"7": 7, "30": 30, "90": 90, "360": 360}
tf_param = str(request.GET.get("tf", "30") or "30")
period_days: int = tf_map.get(tf_param, 30)
period = timedelta(days=period_days)
now_dt = tz_now()
cur_start = now_dt - period
prev_start = now_dt - timedelta(days=period_days * 2)
prev_end = cur_start
revenue_gross_cur: float = get_revenue(clear=False, period=period)
returns_cur: float = get_returns(period=period)
revenue_net_before_returns: float = get_revenue(clear=True, period=period)
revenue_net_cur: float = max(revenue_net_before_returns - returns_cur, 0.0)
processed_orders_cur: int = get_total_processed_orders(period=period)
orders_finished_cur: int = Order.objects.filter(
status="FINISHED", buy_time__lte=now_dt, buy_time__gte=cur_start
).count()
def sum_gross_between(start: date | None, end: date | None) -> float:
qs = (
OrderProduct.objects.filter(
status__in=["FINISHED"], order__status="FINISHED"
)
.filter(order__buy_time__lt=end, order__buy_time__gte=start)
.aggregate(total=Sum(F("buy_price") * F("quantity")))
)
total = qs.get("total") or 0.0
result = round(float(total), 2)
return result
def sum_returns_between(start: date | None, end: date | None) -> float:
qs = (
OrderProduct.objects.filter(status__in=["RETURNED"]) # returned items
.filter(order__buy_time__lt=end, order__buy_time__gte=start)
.aggregate(total=Sum(F("buy_price") * F("quantity")))
)
total = qs.get("total") or 0.0
result = round(float(total), 2)
return result
def count_finished_orders_between(start: date | None, end: date | None) -> int:
result = Order.objects.filter(
status="FINISHED", buy_time__lt=end, buy_time__gte=start
).count()
return result
revenue_gross_prev = sum_gross_between(prev_start, prev_end)
returns_prev = sum_returns_between(prev_start, prev_end)
orders_finished_prev = count_finished_orders_between(prev_start, prev_end)
tax_rate = float(getattr(config, "TAX_RATE", 0.0) or 0.0)
tax_included = bool(getattr(config, "TAX_INCLUDED", False))
if tax_rate <= 0:
revenue_net_before_returns_prev = revenue_gross_prev
else:
if tax_included:
divisor = 1.0 + (tax_rate / 100.0)
revenue_net_before_returns_prev = (
revenue_gross_prev / divisor if divisor > 0 else revenue_gross_prev
)
else:
revenue_net_before_returns_prev = revenue_gross_prev
revenue_net_prev = max(
round(float(revenue_net_before_returns_prev or 0.0), 2) - returns_prev, 0.0
)
def pct_delta(cur: float | int, prev: float | int) -> float:
cur_f = float(cur or 0)
prev_f = float(prev or 0)
if prev_f == 0:
result = 0.0 if cur_f == 0 else 100.0
else:
result = round(((cur_f - prev_f) / prev_f) * 100.0, 1)
return result
aov_cur: float = (
round((revenue_gross_cur / orders_finished_cur), 2)
if orders_finished_cur > 0
else 0.0
)
refund_rate_cur: float = (
round(((returns_cur / revenue_gross_cur) * 100.0), 1)
if revenue_gross_cur > 0
else 0.0
)
aov_prev: float = (
round((revenue_gross_prev / orders_finished_prev), 2)
if orders_finished_prev > 0
else 0.0
)
refund_rate_prev: float = (
round(((returns_prev / revenue_gross_prev) * 100.0), 1)
if revenue_gross_prev > 0
else 0.0
)
kpi = {
"gmv": {
"value": revenue_gross_cur,
"prev": revenue_gross_prev,
"delta_pct": pct_delta(revenue_gross_cur, revenue_gross_prev),
},
"orders": {
"value": orders_finished_cur,
"prev": orders_finished_prev,
"delta_pct": pct_delta(orders_finished_cur, orders_finished_prev),
},
"aov": {
"value": aov_cur,
"prev": aov_prev,
"delta_pct": pct_delta(aov_cur, aov_prev),
},
"net": {
"value": revenue_net_cur,
"prev": revenue_net_prev,
"delta_pct": pct_delta(revenue_net_cur, revenue_net_prev),
},
"refund_rate": {
"value": refund_rate_cur,
"prev": refund_rate_prev,
"delta_pct": pct_delta(refund_rate_cur, refund_rate_prev),
},
}
currency_symbol: str = ""
try:
currency_symbol = dict(getattr(settings, "CURRENCIES_WITH_SYMBOLS", ())).get(
getattr(settings, "CURRENCY_CODE", ""), ""
)
except Exception as exc:
logger.error("Failed to get currency symbol: %s", exc)
quick_links: list[dict[str, str]] = []
try:
quick_links_section = settings.UNFOLD.get("SIDEBAR", {}).get("navigation", [])[
1
]
for item in quick_links_section.get("items", []):
title = item.get("title")
link = item.get("link")
if not title or not link:
continue
quick_links.append(
{
"title": str(title),
"link": str(link),
**({"icon": item.get("icon")} if item.get("icon") else {}),
}
)
except Exception as exc:
logger.error("Failed to build quick links: %s", exc)
most_wished: dict[str, str | int | float | None] | None = None
most_wished_list: list[dict[str, str | int | float | None]] = []
try:
wished_qs = (
Wishlist.objects.filter(user__is_active=True, user__is_staff=False)
.values("products")
.exclude(products__isnull=True)
.annotate(cnt=Count("products"))
.order_by("-cnt")
)
wished_first = wished_qs.first()
if wished_first and wished_first.get("products"):
product = Product.objects.filter(pk=wished_first["products"]).first()
if product:
img = (
product.images.first().image_url if product.images.exists() else "" # ty: ignore[possibly-missing-attribute]
)
most_wished = {
"name": product.name,
"image": img,
"admin_url": reverse(
"admin:core_product_change", args=[product.pk]
),
}
wished_top10 = list(wished_qs[:10])
if wished_top10:
counts_map = {
row["products"]: row["cnt"]
for row in wished_top10
if row.get("products")
}
products = Product.objects.filter(pk__in=counts_map.keys())
product_by_id = {p.pk: p for p in products}
for row in wished_top10:
pid = row.get("products")
if not pid or pid not in product_by_id:
continue
p = product_by_id[pid]
img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute]
most_wished_list.append(
{
"name": p.name,
"image": img,
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
"count": int(row.get("cnt", 0)),
}
)
except Exception as exc:
logger.error("Failed to build most wished list: %s", exc)
try:
today = tz_now().date()
days = period_days
date_axis = [today - timedelta(days=i) for i in range(days - 1, -1, -1)]
orders_map = get_daily_finished_orders_count(timedelta(days=days))
gross_map = get_daily_gross_revenue(timedelta(days=days))
labels = [d.strftime("%d %b") for d in date_axis]
orders_series = [int(orders_map.get(d, 0) or 0) for d in date_axis]
gross_series = [float(gross_map.get(d, 0.0) or 0.0) for d in date_axis]
context["daily_labels"] = labels
context["daily_orders"] = orders_series
context["daily_gross"] = gross_series
context["daily_title"] = _("Revenue & Orders (last %(days)d)") % {
"days": period_days
}
except Exception as e:
logger.warning("Failed to build daily stats: %s", e)
context["daily_labels"] = []
context["daily_orders"] = []
context["daily_gross"] = []
try:
today = tz_now().date()
days = period_days
date_axis = [today - timedelta(days=i) for i in range(days - 1, -1, -1)]
context["daily_labels"] = [d.strftime("%d %b") for d in date_axis]
context["daily_orders"] = [0 for _i in date_axis]
context["daily_gross"] = [0.0 for _j in date_axis]
context["daily_title"] = _("Revenue & Orders (last %(days)d)") % {
"days": period_days
}
except Exception as exc:
logger.error("Failed to build daily stats: %s", exc)
low_stock_list: list[dict[str, str | int | None]] = []
try:
products = (
Product.objects.annotate(total_qty=Sum("stocks__quantity"))
.prefetch_related("images")
.order_by("total_qty")[:5]
)
for p in products:
qty = int(p.total_qty or 0) # ty: ignore[possibly-missing-attribute]
img = ""
with suppress(Exception):
img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute]
low_stock_list.append(
{
"name": p.name,
"sku": p.sku or "",
"qty": qty,
"image": img,
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
}
)
except Exception as exc:
logger.error(f"Error fetching low stock products: {exc}")
cache_key = f"dashboard_cb:{period_days}"
cached_pack = cache.get(cache_key, None)
if cached_pack is None:
cached_pack = {
"kpi": kpi,
"revenue_gross": revenue_gross_cur,
"revenue_net": revenue_net_cur,
"returns_amount": returns_cur,
"orders_finished": orders_finished_cur,
"aov": aov_cur,
"refund_rate": refund_rate_cur,
"low_stock_products": low_stock_list,
}
cache.set(cache_key, cached_pack, 600)
most_popular: dict[str, str | int | float | None] | None = None
most_popular_list: list[dict[str, str | int | float | None]] = []
try:
popular_qs = (
OrderProduct.objects.filter(
status="FINISHED", order__status="FINISHED", product__isnull=False
)
.values("product")
.annotate(total_qty=Sum("quantity"))
.order_by("-total_qty")
)
popular_first = popular_qs.first()
if popular_first and popular_first.get("product"):
product = Product.objects.filter(pk=popular_first["product"]).first()
if product:
img = (
product.images.first().image_url if product.images.exists() else "" # ty: ignore[possibly-missing-attribute]
)
most_popular = {
"name": product.name,
"image": img,
"admin_url": reverse(
"admin:core_product_change", args=[product.pk]
),
}
popular_top10 = list(popular_qs[:10])
if popular_top10:
qty_map = {
row["product"]: row["total_qty"]
for row in popular_top10
if row.get("product")
}
products = Product.objects.filter(pk__in=qty_map.keys())
product_by_id = {p.pk: p for p in products}
for row in popular_top10:
pid = row.get("product")
if not pid or pid not in product_by_id:
continue
p = product_by_id[pid]
img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute]
most_popular_list.append(
{
"name": p.name,
"image": img,
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
"count": int(row.get("total_qty", 0) or 0),
}
)
except Exception as exc:
logger.error("Failed to build most popular list: %s", exc)
customers_mix: dict[str, int | float] = {
"new": 0,
"returning": 0,
"new_pct": 0.0,
"returning_pct": 0.0,
}
try:
mix = get_customer_mix()
n = int(mix.get("new", 0))
r = int(mix.get("returning", 0))
t = max(n + r, 0)
new_pct = round((n / t * 100.0), 1) if t > 0 else 0.0
ret_pct = round((r / t * 100.0), 1) if t > 0 else 0.0
customers_mix = {
"new": n,
"returning": r,
"new_pct": new_pct,
"returning_pct": ret_pct,
"total": t,
}
except Exception as exc:
logger.error("Failed to build customer mix: %s", exc)
shipped_vs_digital: dict[str, int | float] = {
"digital_qty": 0,
"shipped_qty": 0,
"digital_gross": 0.0,
"shipped_gross": 0.0,
"digital_pct": 0.0,
"shipped_pct": 0.0,
}
try:
svd = get_shipped_vs_digital_mix()
dq = int(svd.get("digital_qty", 0))
sq = int(svd.get("shipped_qty", 0))
total_q = dq + sq
digital_pct = round((dq / total_q * 100.0), 1) if total_q > 0 else 0.0
shipped_pct = round((sq / total_q * 100.0), 1) if total_q > 0 else 0.0
shipped_vs_digital.update(
{
"digital_qty": dq,
"shipped_qty": sq,
"digital_gross": float(svd.get("digital_gross", 0.0) or 0.0),
"shipped_gross": float(svd.get("shipped_gross", 0.0) or 0.0),
"digital_pct": digital_pct,
"shipped_pct": shipped_pct,
}
)
except Exception as exc:
logger.error("Failed to build shipped vs digital mix: %s", exc)
most_returned_products = get_top_returned_products()
top_categories = get_top_categories_by_qty()
context.update(
{
"custom_variable": "value",
"timeframe_days": period_days,
"tf": period_days,
"kpi": kpi,
"revenue_gross": revenue_gross_cur,
"revenue_net": revenue_net_cur,
"returns_amount": returns_cur,
"orders_finished": orders_finished_cur,
"aov": aov_cur,
"refund_rate": refund_rate_cur,
"revenue_gross_30": revenue_gross_cur,
"revenue_net_30": revenue_net_cur,
"returns_30": returns_cur,
"processed_orders_30": processed_orders_cur,
"schon_version": settings.SCHON_VERSION,
"quick_links": quick_links,
"most_wished_product": most_wished,
"most_popular_product": most_popular,
"most_wished_products": most_wished_list,
"most_popular_products": most_popular_list,
"currency_symbol": currency_symbol,
"customers_mix": customers_mix,
"shipped_vs_digital": shipped_vs_digital,
"most_returned_products": most_returned_products,
"top_categories": top_categories,
"low_stock_products": low_stock_list,
}
)
return context
dashboard_callback.__doc__ = str(_("Returns custom variables for Dashboard. "))