870 lines
31 KiB
Python
870 lines
31 KiB
Python
import logging
|
|
import mimetypes
|
|
import os
|
|
import traceback
|
|
from contextlib import suppress
|
|
from datetime import date, timedelta
|
|
|
|
import requests
|
|
from constance import config
|
|
from django.conf import settings
|
|
from django.contrib.sitemaps.views import index as _sitemap_index_view
|
|
from django.contrib.sitemaps.views import sitemap as _sitemap_detail_view
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import BadRequest
|
|
from django.db.models import Count, F, Sum
|
|
from django.http import (
|
|
FileResponse,
|
|
HttpRequest,
|
|
HttpResponse,
|
|
HttpResponseRedirect,
|
|
JsonResponse,
|
|
)
|
|
from django.shortcuts import redirect
|
|
from django.template import Context
|
|
from django.urls import reverse
|
|
from django.utils.decorators import method_decorator
|
|
from django.utils.http import urlsafe_base64_decode
|
|
from django.utils.timezone import now as tz_now
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.views.decorators.cache import cache_page
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.vary import vary_on_headers
|
|
from django_ratelimit.decorators import ratelimit
|
|
from drf_spectacular.utils import extend_schema_view
|
|
from drf_spectacular.views import (
|
|
SpectacularAPIView,
|
|
SpectacularRedocView,
|
|
SpectacularSwaggerView,
|
|
)
|
|
from graphene_file_upload.django import FileUploadGraphQLView
|
|
from rest_framework import status
|
|
from rest_framework.permissions import AllowAny
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
from sentry_sdk import capture_exception
|
|
|
|
from engine.core.docs.drf.views import (
|
|
BUY_AS_BUSINESS_SCHEMA,
|
|
CACHE_SCHEMA,
|
|
CONTACT_US_SCHEMA,
|
|
CUSTOM_OPENAPI_SCHEMA,
|
|
DOWNLOAD_DIGITAL_ASSET_SCHEMA,
|
|
LANGUAGE_SCHEMA,
|
|
PARAMETERS_SCHEMA,
|
|
REQUEST_CURSED_URL_SCHEMA,
|
|
SEARCH_SCHEMA,
|
|
)
|
|
from engine.core.elasticsearch import process_query
|
|
from engine.core.models import (
|
|
DigitalAssetDownload,
|
|
Order,
|
|
OrderProduct,
|
|
Product,
|
|
Wishlist,
|
|
)
|
|
from engine.core.serializers import (
|
|
BuyAsBusinessOrderSerializer,
|
|
CacheOperatorSerializer,
|
|
ContactUsSerializer,
|
|
LanguageSerializer,
|
|
)
|
|
from engine.core.utils import get_project_parameters, is_url_safe
|
|
from engine.core.utils.caching import web_cache
|
|
from engine.core.utils.commerce import (
|
|
get_customer_mix,
|
|
get_daily_finished_orders_count,
|
|
get_daily_gross_revenue,
|
|
get_returns,
|
|
get_revenue,
|
|
get_shipped_vs_digital_mix,
|
|
get_top_categories_by_qty,
|
|
get_top_returned_products,
|
|
get_total_processed_orders,
|
|
)
|
|
from engine.core.utils.emailing import contact_us_email
|
|
from engine.core.utils.languages import get_flag_by_language
|
|
from engine.payments.serializers import TransactionProcessSerializer
|
|
from schon.utils.renderers import camelize
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@cache_page(60 * 60 * 12)
|
|
@vary_on_headers("Host")
|
|
def sitemap_index(request, *args, **kwargs):
|
|
response = _sitemap_index_view(request, *args, **kwargs)
|
|
response["Content-Type"] = "application/xml; charset=utf-8"
|
|
return response
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
sitemap_index.__doc__ = _( # pyright: ignore[reportUnknownVariableType]
|
|
"Handles the request for the sitemap index and returns an XML response. "
|
|
"It ensures the response includes the appropriate content type header for XML."
|
|
)
|
|
|
|
|
|
@cache_page(60 * 60 * 24)
|
|
@vary_on_headers("Host")
|
|
def sitemap_detail(request, *args, **kwargs):
|
|
response = _sitemap_detail_view(request, *args, **kwargs)
|
|
response["Content-Type"] = "application/xml; charset=utf-8"
|
|
return response
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
sitemap_detail.__doc__ = _( # pyright: ignore[reportUnknownVariableType]
|
|
"Handles the detailed view response for a sitemap. "
|
|
"This function processes the request, fetches the appropriate "
|
|
"sitemap detail response, and sets the Content-Type header for XML."
|
|
)
|
|
|
|
|
|
class CustomGraphQLView(FileUploadGraphQLView):
|
|
def get_context(self, request):
|
|
return request
|
|
|
|
|
|
@extend_schema_view(**CUSTOM_OPENAPI_SCHEMA)
|
|
class CustomSpectacularAPIView(SpectacularAPIView):
|
|
def get(self, request, *args, **kwargs):
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
|
|
class CustomSwaggerView(SpectacularSwaggerView):
|
|
def get_context_data(self, **kwargs):
|
|
# noinspection PyUnresolvedReferences
|
|
context = super().get_context_data(**kwargs) # ty: ignore[unresolved-attribute]
|
|
context["script_url"] = self.request.build_absolute_uri()
|
|
return context
|
|
|
|
|
|
class CustomRedocView(SpectacularRedocView):
|
|
def get_context_data(self, **kwargs):
|
|
# noinspection PyUnresolvedReferences
|
|
context = super().get_context_data(**kwargs) # ty: ignore[unresolved-attribute]
|
|
context["script_url"] = self.request.build_absolute_uri()
|
|
return context
|
|
|
|
|
|
@extend_schema_view(**LANGUAGE_SCHEMA)
|
|
class SupportedLanguagesView(APIView):
|
|
__doc__ = _( # pyright: ignore[reportUnknownVariableType]
|
|
"Returns a list of supported languages and their corresponding information."
|
|
)
|
|
|
|
serializer_class = LanguageSerializer
|
|
permission_classes = [
|
|
AllowAny,
|
|
]
|
|
|
|
def get(self, request: Request, *args, **kwargs) -> Response:
|
|
return Response(
|
|
data=self.serializer_class(
|
|
[
|
|
{
|
|
"code": lang[0],
|
|
"name": lang[1],
|
|
"flag": get_flag_by_language(lang[0]),
|
|
}
|
|
for lang in settings.LANGUAGES
|
|
],
|
|
many=True,
|
|
).data,
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
|
|
|
|
@extend_schema_view(**PARAMETERS_SCHEMA)
|
|
class WebsiteParametersView(APIView):
|
|
__doc__ = _("Returns the parameters of the website as a JSON object.")
|
|
|
|
serializer_class = None
|
|
permission_classes = [
|
|
AllowAny,
|
|
]
|
|
|
|
def get(self, request: Request, *args, **kwargs) -> Response:
|
|
return Response(
|
|
data=camelize(get_project_parameters()), status=status.HTTP_200_OK
|
|
)
|
|
|
|
|
|
@extend_schema_view(**CACHE_SCHEMA)
|
|
class CacheOperatorView(APIView):
|
|
__doc__ = _(
|
|
"Handles cache operations such as reading and setting cache data with a specified key and timeout."
|
|
)
|
|
|
|
serializer_class = CacheOperatorSerializer
|
|
permission_classes = [
|
|
AllowAny,
|
|
]
|
|
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
return Response(
|
|
data=web_cache(
|
|
request,
|
|
request.data.get("key") or "", # ty: ignore[invalid-argument-type]
|
|
request.data.get("data", {}),
|
|
request.data.get("timeout"),
|
|
),
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
|
|
|
|
@extend_schema_view(**CONTACT_US_SCHEMA)
|
|
class ContactUsView(APIView):
|
|
__doc__ = _("Handles `contact us` form submissions.")
|
|
|
|
serializer_class = ContactUsSerializer
|
|
|
|
@method_decorator(ratelimit(key="ip", rate="2/h", method="POST", block=True))
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
serializer = self.serializer_class(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
contact_us_email.delay(serializer.validated_data)
|
|
|
|
return Response(data=serializer.data, status=status.HTTP_200_OK)
|
|
|
|
|
|
@extend_schema_view(**REQUEST_CURSED_URL_SCHEMA)
|
|
class RequestCursedURLView(APIView):
|
|
__doc__ = _(
|
|
"Handles requests for processing and validating URLs from incoming POST requests."
|
|
)
|
|
|
|
permission_classes = [
|
|
AllowAny,
|
|
]
|
|
|
|
@method_decorator(ratelimit(key="ip", rate="10/h"))
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
url = request.data.get("url")
|
|
if not is_url_safe(str(url)):
|
|
return Response(
|
|
data={"error": _("only URLs starting with http(s):// are allowed")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
try:
|
|
data = cache.get(url, None)
|
|
if not data:
|
|
response = requests.get(
|
|
str(url), headers={"content-type": "application/json"}
|
|
)
|
|
response.raise_for_status()
|
|
data = camelize(response.json())
|
|
cache.set(url, data, 86400)
|
|
return Response(
|
|
data=data,
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
except Exception as e:
|
|
return Response(
|
|
data={"error": str(e)},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
|
|
@extend_schema_view(**SEARCH_SCHEMA)
|
|
class GlobalSearchView(APIView):
|
|
__doc__ = _("Handles global search queries.")
|
|
|
|
def get(self, request: Request, *args, **kwargs) -> Response:
|
|
return Response(
|
|
camelize(
|
|
{
|
|
"results": process_query(
|
|
query=request.GET.get("q", "").strip(), request=request
|
|
)
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
@extend_schema_view(**BUY_AS_BUSINESS_SCHEMA)
|
|
class BuyAsBusinessView(APIView):
|
|
__doc__ = _("Handles the logic of buying as a business without registration.")
|
|
|
|
# noinspection PyUnusedLocal
|
|
@method_decorator(
|
|
ratelimit(key="ip", rate="10/h" if not settings.DEBUG else "888/h")
|
|
)
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
serializer = BuyAsBusinessOrderSerializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
order = Order.objects.create(status="MOMENTAL")
|
|
products = [
|
|
product.get("product_uuid")
|
|
for product in serializer.validated_data.get("products")
|
|
]
|
|
try:
|
|
transaction = order.buy_without_registration(
|
|
products=products,
|
|
promocode_uuid=serializer.validated_data.get("promocode_uuid"),
|
|
customer_name=serializer.validated_data.get("business_identificator"),
|
|
customer_email=serializer.validated_data.get("business_email"),
|
|
customer_phone_number=serializer.validated_data.get(
|
|
"business_phone_number"
|
|
),
|
|
billing_customer_address=serializer.validated_data.get(
|
|
"billing_business_address_uuid"
|
|
),
|
|
shipping_customer_address=serializer.validated_data.get(
|
|
"shipping_business_address_uuid"
|
|
),
|
|
payment_method=serializer.validated_data.get("payment_method"),
|
|
is_business=True,
|
|
)
|
|
return Response(
|
|
status=status.HTTP_201_CREATED,
|
|
data=TransactionProcessSerializer(transaction).data,
|
|
)
|
|
except Exception as e:
|
|
order.is_active = False
|
|
order.save()
|
|
return Response(
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
data={"detail": str(e)},
|
|
)
|
|
|
|
|
|
@extend_schema_view(**DOWNLOAD_DIGITAL_ASSET_SCHEMA)
|
|
class DownloadDigitalAssetView(APIView):
|
|
__doc__ = _(
|
|
"Handles the downloading of a digital asset associated with an order.\n"
|
|
"This function attempts to serve the digital asset file located in the "
|
|
"storage directory of the project. If the file is not found, an HTTP 404 "
|
|
"error is raised to indicate the resource is unavailable."
|
|
)
|
|
|
|
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse | FileResponse:
|
|
try:
|
|
op_uuid = str(kwargs.get("order_product_uuid"))
|
|
|
|
if not op_uuid:
|
|
raise BadRequest(_("order_product_uuid is required"))
|
|
|
|
uuid = urlsafe_base64_decode(op_uuid).decode("utf-8")
|
|
|
|
try:
|
|
order_product = OrderProduct.objects.get(uuid=uuid)
|
|
except OrderProduct.DoesNotExist as dne:
|
|
raise BadRequest(_("order product does not exist")) from dne
|
|
|
|
if order_product.download.num_downloads >= 1:
|
|
raise BadRequest(_("you can only download the digital asset once"))
|
|
|
|
if order_product.download.order_product.status != "FINISHED":
|
|
raise BadRequest(
|
|
_("the order must be paid before downloading the digital asset")
|
|
)
|
|
|
|
order_product.download.num_downloads += 1
|
|
order_product.download.save()
|
|
|
|
if not order_product.download.order_product.product:
|
|
raise BadRequest(_("the order product does not have a product"))
|
|
|
|
file_path = order_product.download.order_product.product.stocks.first().digital_asset.path
|
|
|
|
content_type, encoding = mimetypes.guess_type(file_path)
|
|
if not content_type:
|
|
content_type = "application/octet-stream"
|
|
|
|
response = FileResponse(open(file_path, "rb"), content_type=content_type)
|
|
filename = os.path.basename(file_path)
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}"'
|
|
return response
|
|
|
|
except BadRequest as e:
|
|
return Response(
|
|
data=camelize({"error": str(e)}), status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
except DigitalAssetDownload.DoesNotExist:
|
|
return Response(
|
|
data=camelize({"error": "Digital asset not found"}),
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
except Exception as e:
|
|
capture_exception(e)
|
|
return Response(
|
|
data=camelize(
|
|
{
|
|
"error": "An error occurred while trying to download the digital asset",
|
|
"detail": traceback.format_exc() if settings.DEBUG else None,
|
|
}
|
|
),
|
|
status=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
)
|
|
|
|
|
|
@csrf_exempt
|
|
def favicon_view(request: HttpRequest) -> HttpResponse | FileResponse:
|
|
try:
|
|
favicon_path = os.path.join(settings.BASE_DIR, "static/favicon.png")
|
|
return FileResponse(open(favicon_path, "rb"), content_type="image/x-icon")
|
|
except FileNotFoundError:
|
|
return HttpResponse(status=404, content=_("favicon not found"))
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
favicon_view.__doc__ = str(
|
|
_( # ty: ignore[invalid-assignment]
|
|
"Handles requests for the favicon of a website.\n"
|
|
"This function attempts to serve the favicon file located in the static directory of the project. "
|
|
"If the favicon file is not found, an HTTP 404 error is raised to indicate the resource is unavailable."
|
|
)
|
|
)
|
|
|
|
|
|
def index(request: HttpRequest, *args, **kwargs) -> HttpResponse | HttpResponseRedirect:
|
|
return redirect("admin:index")
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
index.__doc__ = str(
|
|
_( # ty: ignore[invalid-assignment]
|
|
"Redirects the request to the admin index page. "
|
|
"The function handles incoming HTTP requests and redirects them to the Django "
|
|
"admin interface index page. It uses Django's `redirect` function for handling "
|
|
"the HTTP redirection."
|
|
)
|
|
)
|
|
|
|
|
|
def version(request: HttpRequest, *args, **kwargs) -> HttpResponse:
|
|
return JsonResponse(camelize({"version": settings.SCHON_VERSION}), status=200)
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
version.__doc__ = str(_("Returns current version of the Schon. ")) # ty: ignore[invalid-assignment]
|
|
|
|
|
|
def dashboard_callback(request: HttpRequest, context: Context) -> Context:
|
|
tf_map: dict[str, int] = {"7": 7, "30": 30, "90": 90, "360": 360}
|
|
tf_param = str(request.GET.get("tf", "30") or "30")
|
|
period_days: int = tf_map.get(tf_param, 30)
|
|
period = timedelta(days=period_days)
|
|
|
|
now_dt = tz_now()
|
|
cur_start = now_dt - period
|
|
prev_start = now_dt - timedelta(days=period_days * 2)
|
|
prev_end = cur_start
|
|
|
|
revenue_gross_cur: float = get_revenue(clear=False, period=period)
|
|
revenue_net_cur: float = get_revenue(clear=True, period=period)
|
|
returns_cur: float = get_returns(period=period)
|
|
processed_orders_cur: int = get_total_processed_orders(period=period)
|
|
|
|
orders_finished_cur = 0
|
|
|
|
with suppress(Exception):
|
|
orders_finished_cur: int = Order.objects.filter(
|
|
status="FINISHED", buy_time__lte=now_dt, buy_time__gte=cur_start
|
|
).count()
|
|
|
|
def sum_gross_between(start: date | None, end: date | None) -> float:
|
|
result = 0.0
|
|
with suppress(Exception):
|
|
qs = (
|
|
OrderProduct.objects.filter(
|
|
status__in=["FINISHED"], order__status="FINISHED"
|
|
)
|
|
.filter(order__buy_time__lt=end, order__buy_time__gte=start)
|
|
.aggregate(total=Sum(F("buy_price") * F("quantity")))
|
|
)
|
|
total = qs.get("total") or 0.0
|
|
result = round(float(total), 2)
|
|
return result
|
|
|
|
def sum_returns_between(start: date | None, end: date | None) -> float:
|
|
result = 0.0
|
|
with suppress(Exception):
|
|
qs = (
|
|
OrderProduct.objects.filter(status__in=["RETURNED"]) # returned items
|
|
.filter(order__buy_time__lt=end, order__buy_time__gte=start)
|
|
.aggregate(total=Sum(F("buy_price") * F("quantity")))
|
|
)
|
|
total = qs.get("total") or 0.0
|
|
result = round(float(total), 2)
|
|
return result
|
|
|
|
def count_finished_orders_between(start: date | None, end: date | None) -> int:
|
|
result = 0
|
|
with suppress(Exception):
|
|
result = Order.objects.filter(
|
|
status="FINISHED", buy_time__lt=end, buy_time__gte=start
|
|
).count()
|
|
return result
|
|
|
|
revenue_gross_prev = sum_gross_between(prev_start, prev_end)
|
|
returns_prev = sum_returns_between(prev_start, prev_end)
|
|
orders_finished_prev = count_finished_orders_between(prev_start, prev_end)
|
|
|
|
tax_rate = 0.0
|
|
tax_included = False
|
|
with suppress(Exception):
|
|
tax_rate = float(getattr(config, "TAX_RATE", 0.0) or 0.0)
|
|
tax_included = bool(getattr(config, "TAX_INCLUDED", False))
|
|
|
|
if tax_rate <= 0:
|
|
revenue_net_prev = revenue_gross_prev
|
|
else:
|
|
if tax_included:
|
|
divisor = 1.0 + (tax_rate / 100.0)
|
|
revenue_net_prev = (
|
|
revenue_gross_prev / divisor if divisor > 0 else revenue_gross_prev
|
|
)
|
|
else:
|
|
revenue_net_prev = revenue_gross_prev
|
|
revenue_net_prev = round(float(revenue_net_prev or 0.0), 2)
|
|
|
|
def pct_delta(cur: float | int, prev: float | int) -> float:
|
|
result = 0.0
|
|
with suppress(Exception):
|
|
cur_f = float(cur or 0)
|
|
prev_f = float(prev or 0)
|
|
if prev_f == 0:
|
|
result = 0.0 if cur_f == 0 else 100.0
|
|
result = round(((cur_f - prev_f) / prev_f) * 100.0, 1)
|
|
return result
|
|
|
|
aov_cur: float = (
|
|
round((revenue_gross_cur / orders_finished_cur), 2)
|
|
if orders_finished_cur > 0
|
|
else 0.0
|
|
)
|
|
refund_rate_cur: float = (
|
|
round(((returns_cur / revenue_gross_cur) * 100.0), 1)
|
|
if revenue_gross_cur > 0
|
|
else 0.0
|
|
)
|
|
|
|
aov_prev: float = (
|
|
round((revenue_gross_prev / orders_finished_prev), 2)
|
|
if orders_finished_prev > 0
|
|
else 0.0
|
|
)
|
|
refund_rate_prev: float = (
|
|
round(((returns_prev / revenue_gross_prev) * 100.0), 1)
|
|
if revenue_gross_prev > 0
|
|
else 0.0
|
|
)
|
|
|
|
kpi = {
|
|
"gmv": {
|
|
"value": revenue_gross_cur,
|
|
"prev": revenue_gross_prev,
|
|
"delta_pct": pct_delta(revenue_gross_cur, revenue_gross_prev),
|
|
},
|
|
"orders": {
|
|
"value": orders_finished_cur,
|
|
"prev": orders_finished_prev,
|
|
"delta_pct": pct_delta(orders_finished_cur, orders_finished_prev),
|
|
},
|
|
"aov": {
|
|
"value": aov_cur,
|
|
"prev": aov_prev,
|
|
"delta_pct": pct_delta(aov_cur, aov_prev),
|
|
},
|
|
"net": {
|
|
"value": revenue_net_cur,
|
|
"prev": revenue_net_prev,
|
|
"delta_pct": pct_delta(revenue_net_cur, revenue_net_prev),
|
|
},
|
|
"refund_rate": {
|
|
"value": refund_rate_cur,
|
|
"prev": refund_rate_prev,
|
|
"delta_pct": pct_delta(refund_rate_cur, refund_rate_prev),
|
|
},
|
|
}
|
|
currency_symbol: str = ""
|
|
with suppress(Exception):
|
|
currency_symbol = dict(getattr(settings, "CURRENCIES_WITH_SYMBOLS", ())).get(
|
|
getattr(settings, "CURRENCY_CODE", ""), ""
|
|
)
|
|
|
|
quick_links: list[dict[str, str]] = []
|
|
with suppress(Exception):
|
|
quick_links_section = settings.UNFOLD.get("SIDEBAR", {}).get("navigation", [])[
|
|
1
|
|
]
|
|
for item in quick_links_section.get("items", []):
|
|
title = item.get("title")
|
|
link = item.get("link")
|
|
if not title or not link:
|
|
continue
|
|
quick_links.append(
|
|
{
|
|
"title": str(title),
|
|
"link": str(link),
|
|
**({"icon": item.get("icon")} if item.get("icon") else {}),
|
|
}
|
|
)
|
|
|
|
most_wished: dict[str, str | int | float | None] | None = None
|
|
most_wished_list: list[dict[str, str | int | float | None]] = []
|
|
with suppress(Exception):
|
|
wished_qs = (
|
|
Wishlist.objects.filter(user__is_active=True, user__is_staff=False)
|
|
.values("products")
|
|
.exclude(products__isnull=True)
|
|
.annotate(cnt=Count("products"))
|
|
.order_by("-cnt")
|
|
)
|
|
wished_first = wished_qs.first()
|
|
if wished_first and wished_first.get("products"):
|
|
product = Product.objects.filter(pk=wished_first["products"]).first()
|
|
if product:
|
|
img = (
|
|
product.images.first().image_url if product.images.exists() else ""
|
|
)
|
|
most_wished = {
|
|
"name": product.name,
|
|
"image": img,
|
|
"admin_url": reverse(
|
|
"admin:core_product_change", args=[product.pk]
|
|
),
|
|
}
|
|
|
|
wished_top10 = list(wished_qs[:10])
|
|
if wished_top10:
|
|
counts_map = {
|
|
row["products"]: row["cnt"]
|
|
for row in wished_top10
|
|
if row.get("products")
|
|
}
|
|
products = Product.objects.filter(pk__in=counts_map.keys())
|
|
product_by_id = {p.pk: p for p in products}
|
|
for row in wished_top10:
|
|
pid = row.get("products")
|
|
if not pid or pid not in product_by_id:
|
|
continue
|
|
p = product_by_id[pid]
|
|
img = p.images.first().image_url if p.images.exists() else ""
|
|
most_wished_list.append(
|
|
{
|
|
"name": p.name,
|
|
"image": img,
|
|
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
|
|
"count": int(row.get("cnt", 0)),
|
|
}
|
|
)
|
|
|
|
try:
|
|
today = tz_now().date()
|
|
days = period_days
|
|
date_axis = [today - timedelta(days=i) for i in range(days - 1, -1, -1)]
|
|
|
|
orders_map = get_daily_finished_orders_count(timedelta(days=days))
|
|
gross_map = get_daily_gross_revenue(timedelta(days=days))
|
|
|
|
labels = [d.strftime("%d %b") for d in date_axis]
|
|
orders_series = [int(orders_map.get(d, 0) or 0) for d in date_axis]
|
|
gross_series = [float(gross_map.get(d, 0.0) or 0.0) for d in date_axis]
|
|
|
|
context["daily_labels"] = labels
|
|
context["daily_orders"] = orders_series
|
|
context["daily_gross"] = gross_series
|
|
context["daily_title"] = _("Revenue & Orders (last %(days)d)") % {
|
|
"days": period_days
|
|
}
|
|
except Exception as e:
|
|
logger.warning("Failed to build daily stats: %s", e)
|
|
context["daily_labels"] = []
|
|
context["daily_orders"] = []
|
|
context["daily_gross"] = []
|
|
with suppress(Exception):
|
|
today = tz_now().date()
|
|
days = period_days
|
|
date_axis = [today - timedelta(days=i) for i in range(days - 1, -1, -1)]
|
|
context["daily_labels"] = [d.strftime("%d %b") for d in date_axis]
|
|
context["daily_orders"] = [0 for _i in date_axis]
|
|
context["daily_gross"] = [0.0 for _j in date_axis]
|
|
context["daily_title"] = _("Revenue & Orders (last %(days)d)") % {
|
|
"days": period_days
|
|
}
|
|
|
|
low_stock_list: list[dict[str, str | int]] = []
|
|
with suppress(Exception):
|
|
products = (
|
|
Product.objects.annotate(total_qty=Sum("stocks__quantity"))
|
|
.values("id", "name", "sku", "total_qty")
|
|
.order_by("total_qty")[:5]
|
|
)
|
|
for p in products:
|
|
qty = int(p.get("total_qty") or 0)
|
|
low_stock_list.append(
|
|
{
|
|
"name": str(p.get("name") or ""),
|
|
"sku": str(p.get("sku") or ""),
|
|
"qty": qty,
|
|
"admin_url": reverse(
|
|
"admin:core_product_change", args=[p.get("id")]
|
|
),
|
|
}
|
|
)
|
|
|
|
cache_key = f"dashboard_cb:{period_days}"
|
|
cached_pack = None
|
|
with suppress(Exception):
|
|
cached_pack = cache.get(cache_key)
|
|
if cached_pack is None:
|
|
cached_pack = {
|
|
"kpi": kpi,
|
|
"revenue_gross": revenue_gross_cur,
|
|
"revenue_net": revenue_net_cur,
|
|
"returns_amount": returns_cur,
|
|
"orders_finished": orders_finished_cur,
|
|
"aov": aov_cur,
|
|
"refund_rate": refund_rate_cur,
|
|
"low_stock_products": low_stock_list,
|
|
}
|
|
with suppress(Exception):
|
|
cache.set(cache_key, cached_pack, 600)
|
|
|
|
most_popular: dict[str, str | int | float | None] | None = None
|
|
most_popular_list: list[dict[str, str | int | float | None]] = []
|
|
with suppress(Exception):
|
|
popular_qs = (
|
|
OrderProduct.objects.filter(
|
|
status="FINISHED", order__status="FINISHED", product__isnull=False
|
|
)
|
|
.values("product")
|
|
.annotate(total_qty=Sum("quantity"))
|
|
.order_by("-total_qty")
|
|
)
|
|
popular_first = popular_qs.first()
|
|
if popular_first and popular_first.get("product"):
|
|
product = Product.objects.filter(pk=popular_first["product"]).first()
|
|
if product:
|
|
img = (
|
|
product.images.first().image_url if product.images.exists() else ""
|
|
)
|
|
most_popular = {
|
|
"name": product.name,
|
|
"image": img,
|
|
"admin_url": reverse(
|
|
"admin:core_product_change", args=[product.pk]
|
|
),
|
|
}
|
|
|
|
popular_top10 = list(popular_qs[:10])
|
|
if popular_top10:
|
|
qty_map = {
|
|
row["product"]: row["total_qty"]
|
|
for row in popular_top10
|
|
if row.get("product")
|
|
}
|
|
products = Product.objects.filter(pk__in=qty_map.keys())
|
|
product_by_id = {p.pk: p for p in products}
|
|
for row in popular_top10:
|
|
pid = row.get("product")
|
|
if not pid or pid not in product_by_id:
|
|
continue
|
|
p = product_by_id[pid]
|
|
img = p.images.first().image_url if p.images.exists() else ""
|
|
most_popular_list.append(
|
|
{
|
|
"name": p.name,
|
|
"image": img,
|
|
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
|
|
"count": int(row.get("total_qty", 0) or 0),
|
|
}
|
|
)
|
|
|
|
customers_mix: dict[str, int | float] = {
|
|
"new": 0,
|
|
"returning": 0,
|
|
"new_pct": 0.0,
|
|
"returning_pct": 0.0,
|
|
}
|
|
with suppress(Exception):
|
|
mix = get_customer_mix()
|
|
n = int(mix.get("new", 0))
|
|
r = int(mix.get("returning", 0))
|
|
t = max(n + r, 0)
|
|
new_pct = round((n / t * 100.0), 1) if t > 0 else 0.0
|
|
ret_pct = round((r / t * 100.0), 1) if t > 0 else 0.0
|
|
customers_mix = {
|
|
"new": n,
|
|
"returning": r,
|
|
"new_pct": new_pct,
|
|
"returning_pct": ret_pct,
|
|
"total": t,
|
|
}
|
|
|
|
shipped_vs_digital: dict[str, int | float] = {
|
|
"digital_qty": 0,
|
|
"shipped_qty": 0,
|
|
"digital_gross": 0.0,
|
|
"shipped_gross": 0.0,
|
|
"digital_pct": 0.0,
|
|
"shipped_pct": 0.0,
|
|
}
|
|
with suppress(Exception):
|
|
svd = get_shipped_vs_digital_mix()
|
|
dq = int(svd.get("digital_qty", 0))
|
|
sq = int(svd.get("shipped_qty", 0))
|
|
total_q = dq + sq
|
|
digital_pct = round((dq / total_q * 100.0), 1) if total_q > 0 else 0.0
|
|
shipped_pct = round((sq / total_q * 100.0), 1) if total_q > 0 else 0.0
|
|
shipped_vs_digital.update(
|
|
{
|
|
"digital_qty": dq,
|
|
"shipped_qty": sq,
|
|
"digital_gross": float(svd.get("digital_gross", 0.0) or 0.0),
|
|
"shipped_gross": float(svd.get("shipped_gross", 0.0) or 0.0),
|
|
"digital_pct": digital_pct,
|
|
"shipped_pct": shipped_pct,
|
|
}
|
|
)
|
|
|
|
most_returned_products: list[dict[str, str | int | float]] = []
|
|
with suppress(Exception):
|
|
most_returned_products = get_top_returned_products()
|
|
|
|
top_categories: list[dict[str, str | int | float]] = []
|
|
with suppress(Exception):
|
|
top_categories = get_top_categories_by_qty()
|
|
|
|
context.update(
|
|
{
|
|
"custom_variable": "value",
|
|
"timeframe_days": period_days,
|
|
"tf": period_days,
|
|
"kpi": kpi,
|
|
"revenue_gross": revenue_gross_cur,
|
|
"revenue_net": revenue_net_cur,
|
|
"returns_amount": returns_cur,
|
|
"orders_finished": orders_finished_cur,
|
|
"aov": aov_cur,
|
|
"refund_rate": refund_rate_cur,
|
|
"revenue_gross_30": revenue_gross_cur,
|
|
"revenue_net_30": revenue_net_cur,
|
|
"returns_30": returns_cur,
|
|
"processed_orders_30": processed_orders_cur,
|
|
"schon_version": settings.SCHON_VERSION,
|
|
"quick_links": quick_links,
|
|
"most_wished_product": most_wished,
|
|
"most_popular_product": most_popular,
|
|
"most_wished_products": most_wished_list,
|
|
"most_popular_products": most_popular_list,
|
|
"currency_symbol": currency_symbol,
|
|
"customers_mix": customers_mix,
|
|
"shipped_vs_digital": shipped_vs_digital,
|
|
"most_returned_products": most_returned_products,
|
|
"top_categories": top_categories,
|
|
"low_stock_products": low_stock_list,
|
|
}
|
|
)
|
|
|
|
return context
|
|
|
|
|
|
dashboard_callback.__doc__ = str(_("Returns custom variables for Dashboard. ")) # ty: ignore[invalid-assignment]
|