Replace WYSIWYG editor with Markdown editor across all relevant models and admin fields. Add utilities for rendering and stripping markdown. Adjust serializers, views, and templates to support markdown content. Introduce `PastedImage` model and upload endpoint for handling inline image uploads in markdown. This change simplifies content formatting while enhancing flexibility with markdown support.
885 lines
31 KiB
Python
885 lines
31 KiB
Python
import logging
|
|
import mimetypes
|
|
import os
|
|
import traceback
|
|
from contextlib import suppress
|
|
from datetime import date, timedelta
|
|
|
|
import requests
|
|
from constance import config
|
|
from django.conf import settings
|
|
from django.contrib.sitemaps.views import index as _sitemap_index_view
|
|
from django.contrib.sitemaps.views import sitemap as _sitemap_detail_view
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import BadRequest
|
|
from django.db.models import Count, F, Sum
|
|
from django.http import (
|
|
FileResponse,
|
|
HttpRequest,
|
|
HttpResponse,
|
|
HttpResponseRedirect,
|
|
JsonResponse,
|
|
)
|
|
from django.shortcuts import redirect
|
|
from django.template import Context
|
|
from django.urls import reverse
|
|
from django.utils.decorators import method_decorator
|
|
from django.utils.http import urlsafe_base64_decode
|
|
from django.utils.timezone import now as tz_now
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.views.decorators.cache import cache_page
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.vary import vary_on_headers
|
|
from django.views.generic import TemplateView
|
|
from django_ratelimit.decorators import ratelimit
|
|
from drf_spectacular.utils import extend_schema_view
|
|
from drf_spectacular.views import SpectacularAPIView
|
|
from graphene_file_upload.django import FileUploadGraphQLView
|
|
from rest_framework import status
|
|
from rest_framework.parsers import MultiPartParser
|
|
from rest_framework.permissions import AllowAny, IsAdminUser
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
from rest_framework.views import APIView
|
|
from sentry_sdk import capture_exception
|
|
|
|
from engine.core.docs.drf.views import (
|
|
BUY_AS_BUSINESS_SCHEMA,
|
|
CACHE_SCHEMA,
|
|
CONTACT_US_SCHEMA,
|
|
CUSTOM_OPENAPI_SCHEMA,
|
|
DOWNLOAD_DIGITAL_ASSET_SCHEMA,
|
|
LANGUAGE_SCHEMA,
|
|
PARAMETERS_SCHEMA,
|
|
REQUEST_CURSED_URL_SCHEMA,
|
|
SEARCH_SCHEMA,
|
|
)
|
|
from engine.core.elasticsearch import process_query
|
|
from engine.core.models import (
|
|
DigitalAssetDownload,
|
|
Order,
|
|
OrderProduct,
|
|
PastedImage,
|
|
Product,
|
|
Wishlist,
|
|
)
|
|
from engine.core.serializers import (
|
|
BuyAsBusinessOrderSerializer,
|
|
CacheOperatorSerializer,
|
|
ContactUsSerializer,
|
|
LanguageSerializer,
|
|
)
|
|
from engine.core.utils import get_project_parameters, is_url_safe
|
|
from engine.core.utils.caching import web_cache
|
|
from engine.core.utils.commerce import (
|
|
get_customer_mix,
|
|
get_daily_finished_orders_count,
|
|
get_daily_gross_revenue,
|
|
get_returns,
|
|
get_revenue,
|
|
get_shipped_vs_digital_mix,
|
|
get_top_categories_by_qty,
|
|
get_top_returned_products,
|
|
get_total_processed_orders,
|
|
)
|
|
from engine.core.utils.emailing import contact_us_email
|
|
from engine.core.utils.languages import get_flag_by_language
|
|
from engine.payments.serializers import TransactionProcessSerializer
|
|
from schon.utils.renderers import camelize
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@cache_page(60 * 60 * 12)
|
|
@vary_on_headers("Host")
|
|
def sitemap_index(request, *args, **kwargs):
|
|
response = _sitemap_index_view(request, *args, **kwargs)
|
|
response["Content-Type"] = "application/xml; charset=utf-8"
|
|
return response
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
sitemap_index.__doc__ = _( # ty:ignore[invalid-assignment]
|
|
"Handles the request for the sitemap index and returns an XML response. "
|
|
"It ensures the response includes the appropriate content type header for XML."
|
|
)
|
|
|
|
|
|
@cache_page(60 * 60 * 24)
|
|
@vary_on_headers("Host")
|
|
def sitemap_detail(request, *args, **kwargs):
|
|
response = _sitemap_detail_view(request, *args, **kwargs)
|
|
response["Content-Type"] = "application/xml; charset=utf-8"
|
|
return response
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
sitemap_detail.__doc__ = _( # ty:ignore[invalid-assignment]
|
|
"Handles the detailed view response for a sitemap. "
|
|
"This function processes the request, fetches the appropriate "
|
|
"sitemap detail response, and sets the Content-Type header for XML."
|
|
)
|
|
|
|
|
|
class CustomGraphQLView(FileUploadGraphQLView):
|
|
def get_context(self, request):
|
|
return request
|
|
|
|
|
|
@extend_schema_view(**CUSTOM_OPENAPI_SCHEMA)
|
|
class CustomSpectacularAPIView(SpectacularAPIView):
|
|
def get(self, request, *args, **kwargs):
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
|
|
class RapiDocView(TemplateView):
|
|
template_name = "rapidoc.html"
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["title"] = settings.SPECTACULAR_SETTINGS.get("TITLE", "API")
|
|
context["schema_url"] = self.request.build_absolute_uri("/docs/schema/")
|
|
return context
|
|
|
|
|
|
@extend_schema_view(**LANGUAGE_SCHEMA)
|
|
class SupportedLanguagesView(APIView):
|
|
__doc__ = _( # pyright: ignore[reportUnknownVariableType]
|
|
"Returns a list of supported languages and their corresponding information."
|
|
)
|
|
|
|
serializer_class = LanguageSerializer
|
|
permission_classes = [
|
|
AllowAny,
|
|
]
|
|
|
|
def get(self, request: Request, *args, **kwargs) -> Response:
|
|
return Response(
|
|
data=self.serializer_class(
|
|
[
|
|
{
|
|
"code": lang[0],
|
|
"name": lang[1],
|
|
"flag": get_flag_by_language(lang[0]),
|
|
}
|
|
for lang in settings.LANGUAGES
|
|
],
|
|
many=True,
|
|
).data,
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
|
|
|
|
@extend_schema_view(**PARAMETERS_SCHEMA)
|
|
class WebsiteParametersView(APIView):
|
|
__doc__ = _("Returns the parameters of the website as a JSON object.")
|
|
|
|
serializer_class = None
|
|
permission_classes = [
|
|
AllowAny,
|
|
]
|
|
|
|
def get(self, request: Request, *args, **kwargs) -> Response:
|
|
return Response(
|
|
data=camelize(get_project_parameters()), status=status.HTTP_200_OK
|
|
)
|
|
|
|
|
|
@extend_schema_view(**CACHE_SCHEMA)
|
|
class CacheOperatorView(APIView):
|
|
__doc__ = _(
|
|
"Handles cache operations such as reading and setting cache data with a specified key and timeout."
|
|
)
|
|
|
|
serializer_class = CacheOperatorSerializer
|
|
permission_classes = [
|
|
AllowAny,
|
|
]
|
|
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
return Response(
|
|
data=web_cache(
|
|
request,
|
|
request.data.get("key") or "",
|
|
request.data.get("data", {}),
|
|
request.data.get("timeout"),
|
|
),
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
|
|
|
|
@extend_schema_view(**CONTACT_US_SCHEMA)
|
|
class ContactUsView(APIView):
|
|
__doc__ = _("Handles `contact us` form submissions.")
|
|
|
|
serializer_class = ContactUsSerializer
|
|
|
|
@method_decorator(ratelimit(key="ip", rate="2/h", method="POST", block=True))
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
serializer = self.serializer_class(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
contact_us_email.delay(serializer.validated_data)
|
|
|
|
return Response(data=serializer.data, status=status.HTTP_200_OK)
|
|
|
|
|
|
@extend_schema_view(**REQUEST_CURSED_URL_SCHEMA)
|
|
class RequestCursedURLView(APIView):
|
|
__doc__ = _(
|
|
"Handles requests for processing and validating URLs from incoming POST requests."
|
|
)
|
|
|
|
permission_classes = [
|
|
AllowAny,
|
|
]
|
|
|
|
@method_decorator(ratelimit(key="ip", rate="10/h"))
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
url = request.data.get("url")
|
|
if not is_url_safe(str(url)):
|
|
return Response(
|
|
data={"error": _("only URLs starting with http(s):// are allowed")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
try:
|
|
data = cache.get(url, None)
|
|
if not data:
|
|
response = requests.get(
|
|
str(url), headers={"content-type": "application/json"}
|
|
)
|
|
response.raise_for_status()
|
|
data = camelize(response.json())
|
|
cache.set(url, data, 86400)
|
|
return Response(
|
|
data=data,
|
|
status=status.HTTP_200_OK,
|
|
)
|
|
except Exception as e:
|
|
return Response(
|
|
data={"error": str(e)},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
|
|
|
|
@extend_schema_view(**SEARCH_SCHEMA)
|
|
class GlobalSearchView(APIView):
|
|
__doc__ = _("Handles global search queries.")
|
|
|
|
def get(self, request: Request, *args, **kwargs) -> Response:
|
|
return Response(
|
|
camelize(
|
|
{
|
|
"results": process_query(
|
|
query=request.GET.get("q", "").strip(), request=request
|
|
)
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
@extend_schema_view(**BUY_AS_BUSINESS_SCHEMA)
|
|
class BuyAsBusinessView(APIView):
|
|
__doc__ = _("Handles the logic of buying as a business without registration.")
|
|
|
|
# noinspection PyUnusedLocal
|
|
@method_decorator(
|
|
ratelimit(key="ip", rate="10/h" if not settings.DEBUG else "888/h")
|
|
)
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
serializer = BuyAsBusinessOrderSerializer(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
order = Order.objects.create(status="MOMENTAL")
|
|
products = [
|
|
product.get("product_uuid")
|
|
for product in serializer.validated_data.get("products")
|
|
]
|
|
try:
|
|
transaction = order.buy_without_registration(
|
|
products=products,
|
|
promocode_uuid=serializer.validated_data.get("promocode_uuid"),
|
|
customer_name=serializer.validated_data.get("business_identificator"),
|
|
customer_email=serializer.validated_data.get("business_email"),
|
|
customer_phone_number=serializer.validated_data.get(
|
|
"business_phone_number"
|
|
),
|
|
billing_customer_address=serializer.validated_data.get(
|
|
"billing_business_address_uuid"
|
|
),
|
|
shipping_customer_address=serializer.validated_data.get(
|
|
"shipping_business_address_uuid"
|
|
),
|
|
payment_method=serializer.validated_data.get("payment_method"),
|
|
is_business=True,
|
|
)
|
|
return Response(
|
|
status=status.HTTP_201_CREATED,
|
|
data=TransactionProcessSerializer(transaction).data,
|
|
)
|
|
except Exception as e:
|
|
order.is_active = False
|
|
order.save()
|
|
return Response(
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
data={"detail": str(e)},
|
|
)
|
|
|
|
|
|
@extend_schema_view(**DOWNLOAD_DIGITAL_ASSET_SCHEMA)
|
|
class DownloadDigitalAssetView(APIView):
|
|
__doc__ = _(
|
|
"Handles the downloading of a digital asset associated with an order.\n"
|
|
"This function attempts to serve the digital asset file located in the "
|
|
"storage directory of the project. If the file is not found, an HTTP 404 "
|
|
"error is raised to indicate the resource is unavailable."
|
|
)
|
|
|
|
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse | FileResponse:
|
|
try:
|
|
op_uuid = str(kwargs.get("order_product_uuid"))
|
|
|
|
if not op_uuid:
|
|
raise BadRequest(_("order_product_uuid is required"))
|
|
|
|
uuid = urlsafe_base64_decode(op_uuid).decode("utf-8")
|
|
|
|
try:
|
|
order_product = OrderProduct.objects.get(uuid=uuid)
|
|
except OrderProduct.DoesNotExist as dne:
|
|
raise BadRequest(_("order product does not exist")) from dne
|
|
|
|
if order_product.download.num_downloads >= 1:
|
|
raise BadRequest(_("you can only download the digital asset once"))
|
|
|
|
if order_product.download.order_product.status != "FINISHED":
|
|
raise BadRequest(
|
|
_("the order must be paid before downloading the digital asset")
|
|
)
|
|
|
|
order_product.download.num_downloads += 1
|
|
order_product.download.save()
|
|
|
|
if not order_product.download.order_product.product:
|
|
raise BadRequest(_("the order product does not have a product"))
|
|
|
|
file_path = order_product.download.order_product.product.stocks.first().digital_asset.path
|
|
|
|
content_type, encoding = mimetypes.guess_type(file_path)
|
|
if not content_type:
|
|
content_type = "application/octet-stream"
|
|
|
|
response = FileResponse(open(file_path, "rb"), content_type=content_type)
|
|
filename = os.path.basename(file_path)
|
|
response["Content-Disposition"] = f'attachment; filename="{filename}"'
|
|
return response
|
|
|
|
except BadRequest as e:
|
|
return Response(
|
|
data=camelize({"error": str(e)}), status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
|
|
except DigitalAssetDownload.DoesNotExist:
|
|
return Response(
|
|
data=camelize({"error": "Digital asset not found"}),
|
|
status=status.HTTP_404_NOT_FOUND,
|
|
)
|
|
|
|
except Exception as e:
|
|
capture_exception(e)
|
|
return Response(
|
|
data=camelize(
|
|
{
|
|
"error": "An error occurred while trying to download the digital asset",
|
|
"detail": traceback.format_exc() if settings.DEBUG else None,
|
|
}
|
|
),
|
|
status=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
)
|
|
|
|
|
|
@csrf_exempt
|
|
def favicon_view(request: HttpRequest) -> HttpResponse | FileResponse:
|
|
try:
|
|
favicon_path = os.path.join(settings.BASE_DIR, "static/favicon.png")
|
|
return FileResponse(open(favicon_path, "rb"), content_type="image/x-icon")
|
|
except FileNotFoundError:
|
|
return HttpResponse(status=404, content=_("favicon not found"))
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
favicon_view.__doc__ = str(
|
|
_(
|
|
"Handles requests for the favicon of a website.\n"
|
|
"This function attempts to serve the favicon file located in the static directory of the project. "
|
|
"If the favicon file is not found, an HTTP 404 error is raised to indicate the resource is unavailable."
|
|
)
|
|
)
|
|
|
|
|
|
def index(request: HttpRequest, *args, **kwargs) -> HttpResponse | HttpResponseRedirect:
|
|
return redirect("admin:index")
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
index.__doc__ = str(
|
|
_(
|
|
"Redirects the request to the admin index page. "
|
|
"The function handles incoming HTTP requests and redirects them to the Django "
|
|
"admin interface index page. It uses Django's `redirect` function for handling "
|
|
"the HTTP redirection."
|
|
)
|
|
)
|
|
|
|
|
|
def version(request: HttpRequest, *args, **kwargs) -> HttpResponse:
|
|
return JsonResponse(camelize({"version": settings.SCHON_VERSION}), status=200)
|
|
|
|
|
|
# noinspection PyTypeChecker
|
|
version.__doc__ = str(_("Returns current version of the Schon. "))
|
|
|
|
|
|
class PastedImageUploadView(APIView):
|
|
permission_classes = [IsAdminUser]
|
|
parser_classes = [MultiPartParser]
|
|
|
|
def post(self, request: Request, *args, **kwargs) -> Response:
|
|
image = request.FILES.get("image")
|
|
if not image:
|
|
return Response(
|
|
{"error": _("no image file provided")},
|
|
status=status.HTTP_400_BAD_REQUEST,
|
|
)
|
|
obj = PastedImage.objects.create(
|
|
name=image.name or "pasted",
|
|
image=image,
|
|
)
|
|
return Response(
|
|
{"data": {"filePath": request.build_absolute_uri(obj.image.url)}},
|
|
status=status.HTTP_201_CREATED,
|
|
)
|
|
|
|
|
|
def dashboard_callback(request: HttpRequest, context: Context) -> Context:
|
|
tf_map: dict[str, int] = {"7": 7, "30": 30, "90": 90, "360": 360}
|
|
tf_param = str(request.GET.get("tf", "30") or "30")
|
|
period_days: int = tf_map.get(tf_param, 30)
|
|
period = timedelta(days=period_days)
|
|
|
|
now_dt = tz_now()
|
|
cur_start = now_dt - period
|
|
prev_start = now_dt - timedelta(days=period_days * 2)
|
|
prev_end = cur_start
|
|
|
|
revenue_gross_cur: float = get_revenue(clear=False, period=period)
|
|
returns_cur: float = get_returns(period=period)
|
|
revenue_net_before_returns: float = get_revenue(clear=True, period=period)
|
|
revenue_net_cur: float = max(revenue_net_before_returns - returns_cur, 0.0)
|
|
processed_orders_cur: int = get_total_processed_orders(period=period)
|
|
|
|
orders_finished_cur: int = Order.objects.filter(
|
|
status="FINISHED", buy_time__lte=now_dt, buy_time__gte=cur_start
|
|
).count()
|
|
|
|
def sum_gross_between(start: date | None, end: date | None) -> float:
|
|
qs = (
|
|
OrderProduct.objects.filter(
|
|
status__in=["FINISHED"], order__status="FINISHED"
|
|
)
|
|
.filter(order__buy_time__lt=end, order__buy_time__gte=start)
|
|
.aggregate(total=Sum(F("buy_price") * F("quantity")))
|
|
)
|
|
total = qs.get("total") or 0.0
|
|
result = round(float(total), 2)
|
|
return result
|
|
|
|
def sum_returns_between(start: date | None, end: date | None) -> float:
|
|
qs = (
|
|
OrderProduct.objects.filter(status__in=["RETURNED"]) # returned items
|
|
.filter(order__buy_time__lt=end, order__buy_time__gte=start)
|
|
.aggregate(total=Sum(F("buy_price") * F("quantity")))
|
|
)
|
|
total = qs.get("total") or 0.0
|
|
result = round(float(total), 2)
|
|
return result
|
|
|
|
def count_finished_orders_between(start: date | None, end: date | None) -> int:
|
|
result = Order.objects.filter(
|
|
status="FINISHED", buy_time__lt=end, buy_time__gte=start
|
|
).count()
|
|
return result
|
|
|
|
revenue_gross_prev = sum_gross_between(prev_start, prev_end)
|
|
returns_prev = sum_returns_between(prev_start, prev_end)
|
|
orders_finished_prev = count_finished_orders_between(prev_start, prev_end)
|
|
|
|
tax_rate = float(getattr(config, "TAX_RATE", 0.0) or 0.0)
|
|
tax_included = bool(getattr(config, "TAX_INCLUDED", False))
|
|
|
|
if tax_rate <= 0:
|
|
revenue_net_before_returns_prev = revenue_gross_prev
|
|
else:
|
|
if tax_included:
|
|
divisor = 1.0 + (tax_rate / 100.0)
|
|
revenue_net_before_returns_prev = (
|
|
revenue_gross_prev / divisor if divisor > 0 else revenue_gross_prev
|
|
)
|
|
else:
|
|
revenue_net_before_returns_prev = revenue_gross_prev
|
|
revenue_net_prev = max(
|
|
round(float(revenue_net_before_returns_prev or 0.0), 2) - returns_prev, 0.0
|
|
)
|
|
|
|
def pct_delta(cur: float | int, prev: float | int) -> float:
|
|
cur_f = float(cur or 0)
|
|
prev_f = float(prev or 0)
|
|
if prev_f == 0:
|
|
result = 0.0 if cur_f == 0 else 100.0
|
|
else:
|
|
result = round(((cur_f - prev_f) / prev_f) * 100.0, 1)
|
|
return result
|
|
|
|
aov_cur: float = (
|
|
round((revenue_gross_cur / orders_finished_cur), 2)
|
|
if orders_finished_cur > 0
|
|
else 0.0
|
|
)
|
|
refund_rate_cur: float = (
|
|
round(((returns_cur / revenue_gross_cur) * 100.0), 1)
|
|
if revenue_gross_cur > 0
|
|
else 0.0
|
|
)
|
|
|
|
aov_prev: float = (
|
|
round((revenue_gross_prev / orders_finished_prev), 2)
|
|
if orders_finished_prev > 0
|
|
else 0.0
|
|
)
|
|
refund_rate_prev: float = (
|
|
round(((returns_prev / revenue_gross_prev) * 100.0), 1)
|
|
if revenue_gross_prev > 0
|
|
else 0.0
|
|
)
|
|
|
|
kpi = {
|
|
"gmv": {
|
|
"value": revenue_gross_cur,
|
|
"prev": revenue_gross_prev,
|
|
"delta_pct": pct_delta(revenue_gross_cur, revenue_gross_prev),
|
|
},
|
|
"orders": {
|
|
"value": orders_finished_cur,
|
|
"prev": orders_finished_prev,
|
|
"delta_pct": pct_delta(orders_finished_cur, orders_finished_prev),
|
|
},
|
|
"aov": {
|
|
"value": aov_cur,
|
|
"prev": aov_prev,
|
|
"delta_pct": pct_delta(aov_cur, aov_prev),
|
|
},
|
|
"net": {
|
|
"value": revenue_net_cur,
|
|
"prev": revenue_net_prev,
|
|
"delta_pct": pct_delta(revenue_net_cur, revenue_net_prev),
|
|
},
|
|
"refund_rate": {
|
|
"value": refund_rate_cur,
|
|
"prev": refund_rate_prev,
|
|
"delta_pct": pct_delta(refund_rate_cur, refund_rate_prev),
|
|
},
|
|
}
|
|
currency_symbol: str = ""
|
|
try:
|
|
currency_symbol = dict(getattr(settings, "CURRENCIES_WITH_SYMBOLS", ())).get(
|
|
getattr(settings, "CURRENCY_CODE", ""), ""
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Failed to get currency symbol: %s", exc)
|
|
|
|
quick_links: list[dict[str, str]] = []
|
|
try:
|
|
quick_links_section = settings.UNFOLD.get("SIDEBAR", {}).get("navigation", [])[
|
|
1
|
|
]
|
|
for item in quick_links_section.get("items", []):
|
|
title = item.get("title")
|
|
link = item.get("link")
|
|
if not title or not link:
|
|
continue
|
|
quick_links.append(
|
|
{
|
|
"title": str(title),
|
|
"link": str(link),
|
|
**({"icon": item.get("icon")} if item.get("icon") else {}),
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Failed to build quick links: %s", exc)
|
|
|
|
most_wished: dict[str, str | int | float | None] | None = None
|
|
most_wished_list: list[dict[str, str | int | float | None]] = []
|
|
try:
|
|
wished_qs = (
|
|
Wishlist.objects.filter(user__is_active=True, user__is_staff=False)
|
|
.values("products")
|
|
.exclude(products__isnull=True)
|
|
.annotate(cnt=Count("products"))
|
|
.order_by("-cnt")
|
|
)
|
|
wished_first = wished_qs.first()
|
|
if wished_first and wished_first.get("products"):
|
|
product = Product.objects.filter(pk=wished_first["products"]).first()
|
|
if product:
|
|
img = (
|
|
product.images.first().image_url if product.images.exists() else "" # ty: ignore[possibly-missing-attribute]
|
|
)
|
|
most_wished = {
|
|
"name": product.name,
|
|
"image": img,
|
|
"admin_url": reverse(
|
|
"admin:core_product_change", args=[product.pk]
|
|
),
|
|
}
|
|
|
|
wished_top10 = list(wished_qs[:10])
|
|
if wished_top10:
|
|
counts_map = {
|
|
row["products"]: row["cnt"]
|
|
for row in wished_top10
|
|
if row.get("products")
|
|
}
|
|
products = Product.objects.filter(pk__in=counts_map.keys())
|
|
product_by_id = {p.pk: p for p in products}
|
|
for row in wished_top10:
|
|
pid = row.get("products")
|
|
if not pid or pid not in product_by_id:
|
|
continue
|
|
p = product_by_id[pid]
|
|
img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute]
|
|
most_wished_list.append(
|
|
{
|
|
"name": p.name,
|
|
"image": img,
|
|
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
|
|
"count": int(row.get("cnt", 0)),
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Failed to build most wished list: %s", exc)
|
|
|
|
try:
|
|
today = tz_now().date()
|
|
days = period_days
|
|
date_axis = [today - timedelta(days=i) for i in range(days - 1, -1, -1)]
|
|
|
|
orders_map = get_daily_finished_orders_count(timedelta(days=days))
|
|
gross_map = get_daily_gross_revenue(timedelta(days=days))
|
|
|
|
labels = [d.strftime("%d %b") for d in date_axis]
|
|
orders_series = [int(orders_map.get(d, 0) or 0) for d in date_axis]
|
|
gross_series = [float(gross_map.get(d, 0.0) or 0.0) for d in date_axis]
|
|
|
|
context["daily_labels"] = labels
|
|
context["daily_orders"] = orders_series
|
|
context["daily_gross"] = gross_series
|
|
context["daily_title"] = _("Revenue & Orders (last %(days)d)") % {
|
|
"days": period_days
|
|
}
|
|
except Exception as e:
|
|
logger.warning("Failed to build daily stats: %s", e)
|
|
context["daily_labels"] = []
|
|
context["daily_orders"] = []
|
|
context["daily_gross"] = []
|
|
try:
|
|
today = tz_now().date()
|
|
days = period_days
|
|
date_axis = [today - timedelta(days=i) for i in range(days - 1, -1, -1)]
|
|
context["daily_labels"] = [d.strftime("%d %b") for d in date_axis]
|
|
context["daily_orders"] = [0 for _i in date_axis]
|
|
context["daily_gross"] = [0.0 for _j in date_axis]
|
|
context["daily_title"] = _("Revenue & Orders (last %(days)d)") % {
|
|
"days": period_days
|
|
}
|
|
except Exception as exc:
|
|
logger.error("Failed to build daily stats: %s", exc)
|
|
|
|
low_stock_list: list[dict[str, str | int | None]] = []
|
|
try:
|
|
products = (
|
|
Product.objects.annotate(total_qty=Sum("stocks__quantity"))
|
|
.prefetch_related("images")
|
|
.order_by("total_qty")[:5]
|
|
)
|
|
for p in products:
|
|
qty = int(p.total_qty or 0) # ty: ignore[possibly-missing-attribute]
|
|
img = ""
|
|
with suppress(Exception):
|
|
img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute]
|
|
low_stock_list.append(
|
|
{
|
|
"name": p.name,
|
|
"sku": p.sku or "",
|
|
"qty": qty,
|
|
"image": img,
|
|
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
logger.error(f"Error fetching low stock products: {exc}")
|
|
|
|
cache_key = f"dashboard_cb:{period_days}"
|
|
cached_pack = cache.get(cache_key, None)
|
|
if cached_pack is None:
|
|
cached_pack = {
|
|
"kpi": kpi,
|
|
"revenue_gross": revenue_gross_cur,
|
|
"revenue_net": revenue_net_cur,
|
|
"returns_amount": returns_cur,
|
|
"orders_finished": orders_finished_cur,
|
|
"aov": aov_cur,
|
|
"refund_rate": refund_rate_cur,
|
|
"low_stock_products": low_stock_list,
|
|
}
|
|
cache.set(cache_key, cached_pack, 600)
|
|
|
|
most_popular: dict[str, str | int | float | None] | None = None
|
|
most_popular_list: list[dict[str, str | int | float | None]] = []
|
|
try:
|
|
popular_qs = (
|
|
OrderProduct.objects.filter(
|
|
status="FINISHED", order__status="FINISHED", product__isnull=False
|
|
)
|
|
.values("product")
|
|
.annotate(total_qty=Sum("quantity"))
|
|
.order_by("-total_qty")
|
|
)
|
|
popular_first = popular_qs.first()
|
|
if popular_first and popular_first.get("product"):
|
|
product = Product.objects.filter(pk=popular_first["product"]).first()
|
|
if product:
|
|
img = (
|
|
product.images.first().image_url if product.images.exists() else "" # ty: ignore[possibly-missing-attribute]
|
|
)
|
|
most_popular = {
|
|
"name": product.name,
|
|
"image": img,
|
|
"admin_url": reverse(
|
|
"admin:core_product_change", args=[product.pk]
|
|
),
|
|
}
|
|
|
|
popular_top10 = list(popular_qs[:10])
|
|
if popular_top10:
|
|
qty_map = {
|
|
row["product"]: row["total_qty"]
|
|
for row in popular_top10
|
|
if row.get("product")
|
|
}
|
|
products = Product.objects.filter(pk__in=qty_map.keys())
|
|
product_by_id = {p.pk: p for p in products}
|
|
for row in popular_top10:
|
|
pid = row.get("product")
|
|
if not pid or pid not in product_by_id:
|
|
continue
|
|
p = product_by_id[pid]
|
|
img = p.images.first().image_url if p.images.exists() else "" # ty: ignore[possibly-missing-attribute]
|
|
most_popular_list.append(
|
|
{
|
|
"name": p.name,
|
|
"image": img,
|
|
"admin_url": reverse("admin:core_product_change", args=[p.pk]),
|
|
"count": int(row.get("total_qty", 0) or 0),
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Failed to build most popular list: %s", exc)
|
|
|
|
customers_mix: dict[str, int | float] = {
|
|
"new": 0,
|
|
"returning": 0,
|
|
"new_pct": 0.0,
|
|
"returning_pct": 0.0,
|
|
}
|
|
try:
|
|
mix = get_customer_mix()
|
|
n = int(mix.get("new", 0))
|
|
r = int(mix.get("returning", 0))
|
|
t = max(n + r, 0)
|
|
new_pct = round((n / t * 100.0), 1) if t > 0 else 0.0
|
|
ret_pct = round((r / t * 100.0), 1) if t > 0 else 0.0
|
|
customers_mix = {
|
|
"new": n,
|
|
"returning": r,
|
|
"new_pct": new_pct,
|
|
"returning_pct": ret_pct,
|
|
"total": t,
|
|
}
|
|
except Exception as exc:
|
|
logger.error("Failed to build customer mix: %s", exc)
|
|
|
|
shipped_vs_digital: dict[str, int | float] = {
|
|
"digital_qty": 0,
|
|
"shipped_qty": 0,
|
|
"digital_gross": 0.0,
|
|
"shipped_gross": 0.0,
|
|
"digital_pct": 0.0,
|
|
"shipped_pct": 0.0,
|
|
}
|
|
try:
|
|
svd = get_shipped_vs_digital_mix()
|
|
dq = int(svd.get("digital_qty", 0))
|
|
sq = int(svd.get("shipped_qty", 0))
|
|
total_q = dq + sq
|
|
digital_pct = round((dq / total_q * 100.0), 1) if total_q > 0 else 0.0
|
|
shipped_pct = round((sq / total_q * 100.0), 1) if total_q > 0 else 0.0
|
|
shipped_vs_digital.update(
|
|
{
|
|
"digital_qty": dq,
|
|
"shipped_qty": sq,
|
|
"digital_gross": float(svd.get("digital_gross", 0.0) or 0.0),
|
|
"shipped_gross": float(svd.get("shipped_gross", 0.0) or 0.0),
|
|
"digital_pct": digital_pct,
|
|
"shipped_pct": shipped_pct,
|
|
}
|
|
)
|
|
except Exception as exc:
|
|
logger.error("Failed to build shipped vs digital mix: %s", exc)
|
|
|
|
most_returned_products = get_top_returned_products()
|
|
|
|
top_categories = get_top_categories_by_qty()
|
|
|
|
context.update(
|
|
{
|
|
"custom_variable": "value",
|
|
"timeframe_days": period_days,
|
|
"tf": period_days,
|
|
"kpi": kpi,
|
|
"revenue_gross": revenue_gross_cur,
|
|
"revenue_net": revenue_net_cur,
|
|
"returns_amount": returns_cur,
|
|
"orders_finished": orders_finished_cur,
|
|
"aov": aov_cur,
|
|
"refund_rate": refund_rate_cur,
|
|
"revenue_gross_30": revenue_gross_cur,
|
|
"revenue_net_30": revenue_net_cur,
|
|
"returns_30": returns_cur,
|
|
"processed_orders_30": processed_orders_cur,
|
|
"schon_version": settings.SCHON_VERSION,
|
|
"quick_links": quick_links,
|
|
"most_wished_product": most_wished,
|
|
"most_popular_product": most_popular,
|
|
"most_wished_products": most_wished_list,
|
|
"most_popular_products": most_popular_list,
|
|
"currency_symbol": currency_symbol,
|
|
"customers_mix": customers_mix,
|
|
"shipped_vs_digital": shipped_vs_digital,
|
|
"most_returned_products": most_returned_products,
|
|
"top_categories": top_categories,
|
|
"low_stock_products": low_stock_list,
|
|
}
|
|
)
|
|
|
|
return context
|
|
|
|
|
|
dashboard_callback.__doc__ = str(_("Returns custom variables for Dashboard. "))
|