import logging import mimetypes import os import traceback from contextlib import suppress from datetime import date, timedelta from typing import Any import requests from constance import config from django.conf import settings from django.contrib.sitemaps.views import index as _sitemap_index_view from django.contrib.sitemaps.views import sitemap as _sitemap_detail_view from django.core.cache import cache from django.core.exceptions import BadRequest from django.db.models import Count, F, Sum from django.http import ( FileResponse, Http404, HttpRequest, HttpResponse, HttpResponseRedirect, JsonResponse, ) from django.shortcuts import redirect from django.template import Context from django.urls import reverse from django.utils.decorators import method_decorator from django.utils.http import urlsafe_base64_decode from django.utils.timezone import now as tz_now from django.utils.translation import gettext_lazy as _ from django.views.decorators.cache import cache_page from django.views.decorators.vary import vary_on_headers from django_ratelimit.decorators import ratelimit from drf_spectacular.utils import extend_schema_view from drf_spectacular.views import ( SpectacularAPIView, SpectacularRedocView, SpectacularSwaggerView, ) from graphene_file_upload.django import FileUploadGraphQLView from rest_framework import status from rest_framework.permissions import AllowAny from rest_framework.request import Request from rest_framework.response import Response from rest_framework.views import APIView from sentry_sdk import capture_exception from engine.core.docs.drf.views import ( BUY_AS_BUSINESS_SCHEMA, CACHE_SCHEMA, CONTACT_US_SCHEMA, CUSTOM_OPENAPI_SCHEMA, DOWNLOAD_DIGITAL_ASSET_SCHEMA, LANGUAGE_SCHEMA, PARAMETERS_SCHEMA, REQUEST_CURSED_URL_SCHEMA, SEARCH_SCHEMA, ) from engine.core.elasticsearch import process_query from engine.core.models import ( DigitalAssetDownload, Order, OrderProduct, Product, Wishlist, ) from engine.core.serializers import ( BuyAsBusinessOrderSerializer, CacheOperatorSerializer, ContactUsSerializer, LanguageSerializer, ) from engine.core.utils import get_project_parameters, is_url_safe from engine.core.utils.caching import web_cache from engine.core.utils.commerce import ( get_customer_mix, get_daily_finished_orders_count, get_daily_gross_revenue, get_returns, get_revenue, get_shipped_vs_digital_mix, get_top_categories_by_qty, get_top_returned_products, get_total_processed_orders, ) from engine.core.utils.emailing import contact_us_email from engine.core.utils.languages import get_flag_by_language from engine.payments.serializers import TransactionProcessSerializer from evibes.utils.renderers import camelize logger = logging.getLogger(__name__) @cache_page(60 * 60 * 12) @vary_on_headers("Host") def sitemap_index(request, *args, **kwargs): response = _sitemap_index_view(request, *args, **kwargs) response["Content-Type"] = "application/xml; charset=utf-8" return response # noinspection PyTypeChecker sitemap_index.__doc__ = _( # pyright: ignore[reportUnknownVariableType] "Handles the request for the sitemap index and returns an XML response. " "It ensures the response includes the appropriate content type header for XML." ) @cache_page(60 * 60 * 24) @vary_on_headers("Host") def sitemap_detail(request, *args, **kwargs): response = _sitemap_detail_view(request, *args, **kwargs) response["Content-Type"] = "application/xml; charset=utf-8" return response # noinspection PyTypeChecker sitemap_detail.__doc__ = _( # pyright: ignore[reportUnknownVariableType] "Handles the detailed view response for a sitemap. " "This function processes the request, fetches the appropriate " "sitemap detail response, and sets the Content-Type header for XML." ) class CustomGraphQLView(FileUploadGraphQLView): def get_context(self, request): return request @extend_schema_view(**CUSTOM_OPENAPI_SCHEMA) class CustomSpectacularAPIView(SpectacularAPIView): def get(self, request, *args, **kwargs): return super().get(request, *args, **kwargs) class CustomSwaggerView(SpectacularSwaggerView): def get_context_data(self, **kwargs): # noinspection PyUnresolvedReferences context = super().get_context_data(**kwargs) context["script_url"] = self.request.build_absolute_uri() return context class CustomRedocView(SpectacularRedocView): def get_context_data(self, **kwargs): # noinspection PyUnresolvedReferences context = super().get_context_data(**kwargs) context["script_url"] = self.request.build_absolute_uri() return context @extend_schema_view(**LANGUAGE_SCHEMA) class SupportedLanguagesView(APIView): __doc__ = _( # pyright: ignore[reportUnknownVariableType] "Returns a list of supported languages and their corresponding information." ) serializer_class = LanguageSerializer permission_classes = [ AllowAny, ] def get(self, request: Request, *args, **kwargs) -> Response: return Response( data=self.serializer_class( [ { "code": lang[0], "name": lang[1], "flag": get_flag_by_language(lang[0]), } for lang in settings.LANGUAGES ], many=True, ).data, status=status.HTTP_200_OK, ) @extend_schema_view(**PARAMETERS_SCHEMA) class WebsiteParametersView(APIView): __doc__ = _("Returns the parameters of the website as a JSON object.") serializer_class = None permission_classes = [ AllowAny, ] def get(self, request: Request, *args, **kwargs) -> Response: return Response( data=camelize(get_project_parameters()), status=status.HTTP_200_OK ) @extend_schema_view(**CACHE_SCHEMA) class CacheOperatorView(APIView): __doc__ = _( "Handles cache operations such as reading and setting cache data with a specified key and timeout." ) serializer_class = CacheOperatorSerializer permission_classes = [ AllowAny, ] def post(self, request: Request, *args, **kwargs) -> Response: return Response( data=web_cache( request, request.data.get("key"), request.data.get("data", {}), request.data.get("timeout"), ), status=status.HTTP_200_OK, ) @extend_schema_view(**CONTACT_US_SCHEMA) class ContactUsView(APIView): __doc__ = _("Handles `contact us` form submissions.") serializer_class = ContactUsSerializer @method_decorator(ratelimit(key="ip", rate="2/h", method="POST", block=True)) def post(self, request: Request, *args, **kwargs) -> Response: serializer = self.serializer_class(data=request.data) serializer.is_valid(raise_exception=True) contact_us_email.delay(serializer.validated_data) return Response(data=serializer.data, status=status.HTTP_200_OK) @extend_schema_view(**REQUEST_CURSED_URL_SCHEMA) class RequestCursedURLView(APIView): __doc__ = _( "Handles requests for processing and validating URLs from incoming POST requests." ) permission_classes = [ AllowAny, ] @method_decorator(ratelimit(key="ip", rate="10/h")) def post(self, request: Request, *args, **kwargs) -> Response: url = request.data.get("url") if not is_url_safe(str(url)): return Response( data={"error": _("only URLs starting with http(s):// are allowed")}, status=status.HTTP_400_BAD_REQUEST, ) try: data = cache.get(url, None) if not data: response = requests.get( str(url), headers={"content-type": "application/json"} ) response.raise_for_status() data = camelize(response.json()) cache.set(url, data, 86400) return Response( data=data, status=status.HTTP_200_OK, ) except Exception as e: return Response( data={"error": str(e)}, status=status.HTTP_400_BAD_REQUEST, ) @extend_schema_view(**SEARCH_SCHEMA) class GlobalSearchView(APIView): __doc__ = _("Handles global search queries.") def get(self, request: Request, *args, **kwargs) -> Response: return Response( camelize( { "results": process_query( query=request.GET.get("q", "").strip(), request=request ) } ) ) @extend_schema_view(**BUY_AS_BUSINESS_SCHEMA) class BuyAsBusinessView(APIView): __doc__ = _("Handles the logic of buying as a business without registration.") # noinspection PyUnusedLocal @method_decorator( ratelimit(key="ip", rate="10/h" if not settings.DEBUG else "888/h") ) def post(self, request: Request, *args, **kwargs) -> Response: serializer = BuyAsBusinessOrderSerializer(data=request.data) serializer.is_valid(raise_exception=True) order = Order.objects.create(status="MOMENTAL") products = [ product.get("product_uuid") for product in serializer.validated_data.get("products") ] try: transaction = order.buy_without_registration( products=products, promocode_uuid=serializer.validated_data.get("promocode_uuid"), customer_name=serializer.validated_data.get("business_identificator"), customer_email=serializer.validated_data.get("business_email"), customer_phone_number=serializer.validated_data.get( "business_phone_number" ), billing_customer_address=serializer.validated_data.get( "billing_business_address_uuid" ), shipping_customer_address=serializer.validated_data.get( "shipping_business_address_uuid" ), payment_method=serializer.validated_data.get("payment_method"), is_business=True, ) return Response( status=status.HTTP_201_CREATED, data=TransactionProcessSerializer(transaction).data, ) except Exception as e: order.is_active = False order.save() return Response( status=status.HTTP_400_BAD_REQUEST, data={"detail": str(e)}, ) @extend_schema_view(**DOWNLOAD_DIGITAL_ASSET_SCHEMA) class DownloadDigitalAssetView(APIView): __doc__ = _( "Handles the downloading of a digital asset associated with an order.\n" "This function attempts to serve the digital asset file located in the " "storage directory of the project. If the file is not found, an HTTP 404 " "error is raised to indicate the resource is unavailable." ) def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse | FileResponse: try: op_uuid = str(kwargs.get("order_product_uuid")) if not op_uuid: raise BadRequest(_("order_product_uuid is required")) uuid = urlsafe_base64_decode(op_uuid).decode("utf-8") try: order_product = OrderProduct.objects.get(uuid=uuid) except OrderProduct.DoesNotExist as dne: raise BadRequest(_("order product does not exist")) from dne if order_product.download.num_downloads >= 1: raise BadRequest(_("you can only download the digital asset once")) if order_product.download.order_product.status != "FINISHED": raise BadRequest( _("the order must be paid before downloading the digital asset") ) order_product.download.num_downloads += 1 order_product.download.save() if not order_product.download.order_product.product: raise BadRequest(_("the order product does not have a product")) file_path = order_product.download.order_product.product.stocks.first().digital_asset.path content_type, encoding = mimetypes.guess_type(file_path) if not content_type: content_type = "application/octet-stream" response = FileResponse(open(file_path, "rb"), content_type=content_type) filename = os.path.basename(file_path) response["Content-Disposition"] = f'attachment; filename="{filename}"' return response except BadRequest as e: return Response( data=camelize({"error": str(e)}), status=status.HTTP_400_BAD_REQUEST ) except DigitalAssetDownload.DoesNotExist: return Response( data=camelize({"error": "Digital asset not found"}), status=status.HTTP_404_NOT_FOUND, ) except Exception as e: capture_exception(e) return Response( data=camelize( { "error": "An error occurred while trying to download the digital asset", "detail": traceback.format_exc() if settings.DEBUG else None, } ), status=status.HTTP_503_SERVICE_UNAVAILABLE, ) def favicon_view( request: HttpRequest, *args: list[Any], **kwargs: dict[str, Any] ) -> HttpResponse | FileResponse | None: try: favicon_path = os.path.join(settings.BASE_DIR, "static/favicon.png") return FileResponse(open(favicon_path, "rb"), content_type="image/x-icon") except FileNotFoundError as fnfe: raise Http404(_("favicon not found")) from fnfe # noinspection PyTypeChecker favicon_view.__doc__ = _( "Handles requests for the favicon of a website.\n" "This function attempts to serve the favicon file located in the static directory of the project. " "If the favicon file is not found, an HTTP 404 error is raised to indicate the resource is unavailable." ) def index(request: HttpRequest, *args, **kwargs) -> HttpResponse | HttpResponseRedirect: return redirect("admin:index") # noinspection PyTypeChecker index.__doc__ = _( "Redirects the request to the admin index page. " "The function handles incoming HTTP requests and redirects them to the Django " "admin interface index page. It uses Django's `redirect` function for handling " "the HTTP redirection." ) def version(request: HttpRequest, *args, **kwargs) -> HttpResponse: return JsonResponse(camelize({"version": settings.EVIBES_VERSION}), status=200) # noinspection PyTypeChecker version.__doc__ = _("Returns current version of the eVibes. ") def dashboard_callback(request: HttpRequest, context: Context) -> Context: tf_map: dict[str, int] = {"7": 7, "30": 30, "90": 90, "360": 360} tf_param = str(request.GET.get("tf", "30") or "30") period_days: int = tf_map.get(tf_param, 30) period = timedelta(days=period_days) now_dt = tz_now() cur_start = now_dt - period prev_start = now_dt - timedelta(days=period_days * 2) prev_end = cur_start revenue_gross_cur: float = get_revenue(clear=False, period=period) revenue_net_cur: float = get_revenue(clear=True, period=period) returns_cur: float = get_returns(period=period) processed_orders_cur: int = get_total_processed_orders(period=period) orders_finished_cur = 0 with suppress(Exception): orders_finished_cur: int = Order.objects.filter( status="FINISHED", buy_time__lte=now_dt, buy_time__gte=cur_start ).count() def sum_gross_between(start: date | None, end: date | None) -> float: result = 0.0 with suppress(Exception): qs = ( OrderProduct.objects.filter( status__in=["FINISHED"], order__status="FINISHED" ) .filter(order__buy_time__lt=end, order__buy_time__gte=start) .aggregate(total=Sum(F("buy_price") * F("quantity"))) ) total = qs.get("total") or 0.0 result = round(float(total), 2) return result def sum_returns_between(start: date | None, end: date | None) -> float: result = 0.0 with suppress(Exception): qs = ( OrderProduct.objects.filter(status__in=["RETURNED"]) # returned items .filter(order__buy_time__lt=end, order__buy_time__gte=start) .aggregate(total=Sum(F("buy_price") * F("quantity"))) ) total = qs.get("total") or 0.0 result = round(float(total), 2) return result def count_finished_orders_between(start: date | None, end: date | None) -> int: result = 0 with suppress(Exception): result = Order.objects.filter( status="FINISHED", buy_time__lt=end, buy_time__gte=start ).count() return result revenue_gross_prev = sum_gross_between(prev_start, prev_end) returns_prev = sum_returns_between(prev_start, prev_end) orders_finished_prev = count_finished_orders_between(prev_start, prev_end) tax_rate = 0.0 tax_included = False with suppress(Exception): tax_rate = float(getattr(config, "TAX_RATE", 0.0) or 0.0) tax_included = bool(getattr(config, "TAX_INCLUDED", False)) if tax_rate <= 0: revenue_net_prev = revenue_gross_prev else: if tax_included: divisor = 1.0 + (tax_rate / 100.0) revenue_net_prev = ( revenue_gross_prev / divisor if divisor > 0 else revenue_gross_prev ) else: revenue_net_prev = revenue_gross_prev revenue_net_prev = round(float(revenue_net_prev or 0.0), 2) def pct_delta(cur: float | int, prev: float | int) -> float: result = 0.0 with suppress(Exception): cur_f = float(cur or 0) prev_f = float(prev or 0) if prev_f == 0: result = 0.0 if cur_f == 0 else 100.0 result = round(((cur_f - prev_f) / prev_f) * 100.0, 1) return result aov_cur: float = ( round((revenue_gross_cur / orders_finished_cur), 2) if orders_finished_cur > 0 else 0.0 ) refund_rate_cur: float = ( round(((returns_cur / revenue_gross_cur) * 100.0), 1) if revenue_gross_cur > 0 else 0.0 ) aov_prev: float = ( round((revenue_gross_prev / orders_finished_prev), 2) if orders_finished_prev > 0 else 0.0 ) refund_rate_prev: float = ( round(((returns_prev / revenue_gross_prev) * 100.0), 1) if revenue_gross_prev > 0 else 0.0 ) kpi = { "gmv": { "value": revenue_gross_cur, "prev": revenue_gross_prev, "delta_pct": pct_delta(revenue_gross_cur, revenue_gross_prev), }, "orders": { "value": orders_finished_cur, "prev": orders_finished_prev, "delta_pct": pct_delta(orders_finished_cur, orders_finished_prev), }, "aov": { "value": aov_cur, "prev": aov_prev, "delta_pct": pct_delta(aov_cur, aov_prev), }, "net": { "value": revenue_net_cur, "prev": revenue_net_prev, "delta_pct": pct_delta(revenue_net_cur, revenue_net_prev), }, "refund_rate": { "value": refund_rate_cur, "prev": refund_rate_prev, "delta_pct": pct_delta(refund_rate_cur, refund_rate_prev), }, } currency_symbol: str = "" with suppress(Exception): currency_symbol = dict(getattr(settings, "CURRENCIES_WITH_SYMBOLS", ())).get( getattr(settings, "CURRENCY_CODE", ""), "" ) quick_links: list[dict[str, str]] = [] with suppress(Exception): quick_links_section = settings.UNFOLD.get("SIDEBAR", {}).get("navigation", [])[ 1 ] for item in quick_links_section.get("items", []): title = item.get("title") link = item.get("link") if not title or not link: continue quick_links.append( { "title": str(title), "link": str(link), **({"icon": item.get("icon")} if item.get("icon") else {}), } ) most_wished: dict[str, str | int | float | None] | None = None most_wished_list: list[dict[str, str | int | float | None]] = [] with suppress(Exception): wished_qs = ( Wishlist.objects.filter(user__is_active=True, user__is_staff=False) .values("products") .exclude(products__isnull=True) .annotate(cnt=Count("products")) .order_by("-cnt") ) wished_first = wished_qs.first() if wished_first and wished_first.get("products"): product = Product.objects.filter(pk=wished_first["products"]).first() if product: img = ( product.images.first().image_url if product.images.exists() else "" ) most_wished = { "name": product.name, "image": img, "admin_url": reverse( "admin:core_product_change", args=[product.pk] ), } wished_top10 = list(wished_qs[:10]) if wished_top10: counts_map = { row["products"]: row["cnt"] for row in wished_top10 if row.get("products") } products = Product.objects.filter(pk__in=counts_map.keys()) product_by_id = {p.pk: p for p in products} for row in wished_top10: pid = row.get("products") if not pid or pid not in product_by_id: continue p = product_by_id[pid] img = p.images.first().image_url if p.images.exists() else "" most_wished_list.append( { "name": p.name, "image": img, "admin_url": reverse("admin:core_product_change", args=[p.pk]), "count": int(row.get("cnt", 0)), } ) try: today = tz_now().date() days = period_days date_axis = [today - timedelta(days=i) for i in range(days - 1, -1, -1)] orders_map = get_daily_finished_orders_count(timedelta(days=days)) gross_map = get_daily_gross_revenue(timedelta(days=days)) labels = [d.strftime("%d %b") for d in date_axis] orders_series = [int(orders_map.get(d, 0) or 0) for d in date_axis] gross_series = [float(gross_map.get(d, 0.0) or 0.0) for d in date_axis] context["daily_labels"] = labels context["daily_orders"] = orders_series context["daily_gross"] = gross_series context["daily_title"] = _("Revenue & Orders (last %(days)d)") % { "days": period_days } except Exception as e: logger.warning("Failed to build daily stats: %s", e) context["daily_labels"] = [] context["daily_orders"] = [] context["daily_gross"] = [] with suppress(Exception): today = tz_now().date() days = period_days date_axis = [today - timedelta(days=i) for i in range(days - 1, -1, -1)] context["daily_labels"] = [d.strftime("%d %b") for d in date_axis] context["daily_orders"] = [0 for _i in date_axis] context["daily_gross"] = [0.0 for _j in date_axis] context["daily_title"] = _("Revenue & Orders (last %(days)d)") % { "days": period_days } low_stock_list: list[dict[str, str | int]] = [] with suppress(Exception): products = ( Product.objects.annotate(total_qty=Sum("stocks__quantity")) .values("id", "name", "sku", "total_qty") .order_by("total_qty")[:5] ) for p in products: qty = int(p.get("total_qty") or 0) low_stock_list.append( { "name": str(p.get("name") or ""), "sku": str(p.get("sku") or ""), "qty": qty, "admin_url": reverse( "admin:core_product_change", args=[p.get("id")] ), } ) cache_key = f"dashboard_cb:{period_days}" cached_pack = None with suppress(Exception): cached_pack = cache.get(cache_key) if cached_pack is None: cached_pack = { "kpi": kpi, "revenue_gross": revenue_gross_cur, "revenue_net": revenue_net_cur, "returns_amount": returns_cur, "orders_finished": orders_finished_cur, "aov": aov_cur, "refund_rate": refund_rate_cur, "low_stock_products": low_stock_list, } with suppress(Exception): cache.set(cache_key, cached_pack, 600) most_popular: dict[str, str | int | float | None] | None = None most_popular_list: list[dict[str, str | int | float | None]] = [] with suppress(Exception): popular_qs = ( OrderProduct.objects.filter( status="FINISHED", order__status="FINISHED", product__isnull=False ) .values("product") .annotate(total_qty=Sum("quantity")) .order_by("-total_qty") ) popular_first = popular_qs.first() if popular_first and popular_first.get("product"): product = Product.objects.filter(pk=popular_first["product"]).first() if product: img = ( product.images.first().image_url if product.images.exists() else "" ) most_popular = { "name": product.name, "image": img, "admin_url": reverse( "admin:core_product_change", args=[product.pk] ), } popular_top10 = list(popular_qs[:10]) if popular_top10: qty_map = { row["product"]: row["total_qty"] for row in popular_top10 if row.get("product") } products = Product.objects.filter(pk__in=qty_map.keys()) product_by_id = {p.pk: p for p in products} for row in popular_top10: pid = row.get("product") if not pid or pid not in product_by_id: continue p = product_by_id[pid] img = p.images.first().image_url if p.images.exists() else "" most_popular_list.append( { "name": p.name, "image": img, "admin_url": reverse("admin:core_product_change", args=[p.pk]), "count": int(row.get("total_qty", 0) or 0), } ) customers_mix: dict[str, int | float] = { "new": 0, "returning": 0, "new_pct": 0.0, "returning_pct": 0.0, } with suppress(Exception): mix = get_customer_mix() n = int(mix.get("new", 0)) r = int(mix.get("returning", 0)) t = max(n + r, 0) new_pct = round((n / t * 100.0), 1) if t > 0 else 0.0 ret_pct = round((r / t * 100.0), 1) if t > 0 else 0.0 customers_mix = { "new": n, "returning": r, "new_pct": new_pct, "returning_pct": ret_pct, "total": t, } shipped_vs_digital: dict[str, int | float] = { "digital_qty": 0, "shipped_qty": 0, "digital_gross": 0.0, "shipped_gross": 0.0, "digital_pct": 0.0, "shipped_pct": 0.0, } with suppress(Exception): svd = get_shipped_vs_digital_mix() dq = int(svd.get("digital_qty", 0)) sq = int(svd.get("shipped_qty", 0)) total_q = dq + sq digital_pct = round((dq / total_q * 100.0), 1) if total_q > 0 else 0.0 shipped_pct = round((sq / total_q * 100.0), 1) if total_q > 0 else 0.0 shipped_vs_digital.update( { "digital_qty": dq, "shipped_qty": sq, "digital_gross": float(svd.get("digital_gross", 0.0) or 0.0), "shipped_gross": float(svd.get("shipped_gross", 0.0) or 0.0), "digital_pct": digital_pct, "shipped_pct": shipped_pct, } ) most_returned_products: list[dict[str, str | int | float]] = [] with suppress(Exception): most_returned_products = get_top_returned_products() top_categories: list[dict[str, str | int | float]] = [] with suppress(Exception): top_categories = get_top_categories_by_qty() context.update( { "custom_variable": "value", "timeframe_days": period_days, "tf": period_days, "kpi": kpi, "revenue_gross": revenue_gross_cur, "revenue_net": revenue_net_cur, "returns_amount": returns_cur, "orders_finished": orders_finished_cur, "aov": aov_cur, "refund_rate": refund_rate_cur, "revenue_gross_30": revenue_gross_cur, "revenue_net_30": revenue_net_cur, "returns_30": returns_cur, "processed_orders_30": processed_orders_cur, "evibes_version": settings.EVIBES_VERSION, "quick_links": quick_links, "most_wished_product": most_wished, "most_popular_product": most_popular, "most_wished_products": most_wished_list, "most_popular_products": most_popular_list, "currency_symbol": currency_symbol, "customers_mix": customers_mix, "shipped_vs_digital": shipped_vs_digital, "most_returned_products": most_returned_products, "top_categories": top_categories, "low_stock_products": low_stock_list, } ) return context dashboard_callback.__doc__ = _("Returns custom variables for Dashboard. ")