Features: 1) Improved support for nested multi-line typing hints across numerous functions; 2) Refactored variable assignments for enhanced readability within bulk_add_products and bulk_remove_products; 3) Updated token-related test methods to ensure consistent styling;

Fixes: 1) Removed unused imports including `Order` from `payments/models.py` and `health_check.contrib.redis` from settings; 2) Fixed inconsistent formatting and parameter alignment in method definitions; 3) Corrected type annotations and adjusted verbose text handling;

Extra: Cleaned up formatting in docstrings, comments, and JSON field help text across multiple files for better code style compliance and readability.
This commit is contained in:
Egor Pavlovich Gorbunov 2025-06-18 16:55:49 +03:00
parent 8fb4ca3362
commit d616d2e18b
6 changed files with 200 additions and 64 deletions

View file

@ -45,12 +45,15 @@ from core.abstract import NiceModel
from core.choices import ORDER_PRODUCT_STATUS_CHOICES, ORDER_STATUS_CHOICES
from core.errors import DisabledCommerceError, NotEnoughMoneyError
from core.managers import AddressManager
from core.utils import generate_human_readable_id, get_product_uuid_as_path, get_random_code
from core.utils import (
generate_human_readable_id,
get_product_uuid_as_path,
get_random_code,
)
from core.utils.lists import FAILED_STATUSES
from core.validators import validate_category_image_dimensions
from evibes.settings import CURRENCY_CODE
from payments.models import Transaction
from vibes_auth.models import User
logger = logging.getLogger(__name__)
@ -88,7 +91,9 @@ class Vendor(ExportModelOperationsMixin("vendor"), NiceModel):
authentication: dict = JSONField( # type: ignore
blank=True,
null=True,
help_text=_("stores credentials and endpoints required for vendor communication"),
help_text=_(
"stores credentials and endpoints required for vendor communication"
),
verbose_name=_("authentication info"),
)
markup_percent: int = IntegerField( # type: ignore
@ -228,7 +233,10 @@ class Category(ExportModelOperationsMixin("category"), NiceModel, MPTTModel):
def get_tree_depth(self):
if self.is_leaf_node():
return 0
return self.get_descendants().aggregate(max_depth=Max("level"))["max_depth"] - self.get_level()
return (
self.get_descendants().aggregate(max_depth=Max("level"))["max_depth"]
- self.get_level()
)
class Meta:
verbose_name = _("category")
@ -366,7 +374,9 @@ class Product(ExportModelOperationsMixin("product"), NiceModel):
cache_key = f"product_feedbacks_count_{self.pk}"
feedbacks_count = cache.get(cache_key)
if feedbacks_count is None:
feedbacks_count = Feedback.objects.filter(order_product__product_id=self.pk).count()
feedbacks_count = Feedback.objects.filter(
order_product__product_id=self.pk
).count()
cache.set(cache_key, feedbacks_count, 604800)
return feedbacks_count
@ -661,7 +671,9 @@ class Wishlist(ExportModelOperationsMixin("wishlist"), NiceModel):
class Documentary(ExportModelOperationsMixin("attribute_group"), NiceModel):
is_publicly_visible = True
product: ForeignKey = ForeignKey(to=Product, on_delete=CASCADE, related_name="documentaries")
product: ForeignKey = ForeignKey(
to=Product, on_delete=CASCADE, related_name="documentaries"
)
document = FileField(upload_to=get_product_uuid_as_path)
class Meta:
@ -696,13 +708,19 @@ class Address(ExportModelOperationsMixin("address"), NiceModel):
country: str = CharField(_("country"), max_length=40, null=True) # type: ignore
location: PointField = PointField( # type: ignore
geography=True, srid=4326, null=True, blank=True, help_text=_("geolocation point: (longitude, latitude)")
geography=True,
srid=4326,
null=True,
blank=True,
help_text=_("geolocation point: (longitude, latitude)"),
)
raw_data: dict = JSONField(blank=True, null=True, help_text=_("full JSON response from geocoder for this address")) # type: ignore
api_response: dict = JSONField( # type: ignore
blank=True, null=True, help_text=_("stored JSON response from the geocoding service")
blank=True,
null=True,
help_text=_("stored JSON response from the geocoding service"),
)
user: ForeignKey = ForeignKey(to="vibes_auth.User", on_delete=CASCADE, blank=True, null=True) # type: ignore
@ -783,7 +801,9 @@ class PromoCode(ExportModelOperationsMixin("promocode"), NiceModel):
self.discount_amount is None and self.discount_percent is None
):
raise ValidationError(
_("only one type of discount should be defined (amount or percent), but not both or neither.")
_(
"only one type of discount should be defined (amount or percent), but not both or neither."
)
)
super().save(**kwargs)
@ -804,11 +824,15 @@ class PromoCode(ExportModelOperationsMixin("promocode"), NiceModel):
if self.discount_type == "percent":
amount -= round(amount * (self.discount_percent / 100), 2)
order.attributes.update({"promocode": str(self.uuid), "final_price": amount})
order.attributes.update(
{"promocode": str(self.uuid), "final_price": amount}
)
order.save()
elif self.discount_type == "amount":
amount -= round(float(self.discount_amount), 2)
order.attributes.update({"promocode": str(self.uuid), "final_price": amount})
order.attributes.update(
{"promocode": str(self.uuid), "final_price": amount}
)
order.save()
else:
raise ValueError(_(f"invalid discount type for promocode {self.uuid}"))
@ -866,7 +890,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
help_text=_("json representation of order attributes for this order"),
verbose_name=_("attributes"),
)
user: User = ForeignKey( # type: ignore
user = ForeignKey( # type: ignore
"vibes_auth.User",
on_delete=CASCADE,
help_text=_("the user who placed the order"),
@ -914,9 +938,12 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
return (
round(
sum(
(
order_product.buy_price * order_product.quantity
if order_product.status not in FAILED_STATUSES and order_product.buy_price is not None
if order_product.status not in FAILED_STATUSES
and order_product.buy_price is not None
else 0.0
)
for order_product in self.order_products.all()
),
2,
@ -929,13 +956,18 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
return sum([op.quantity for op in self.order_products.all()])
def add_product(
self, product_uuid: str | None = None, attributes: Optional[list] = None, update_quantity: bool = True
self,
product_uuid: str | None = None,
attributes: Optional[list] = None,
update_quantity: bool = True,
):
if attributes is None:
attributes = []
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(_("you cannot add products to an order that is not a pending one"))
raise ValueError(
_("you cannot add products to an order that is not a pending one")
)
try:
product = Product.objects.get(uuid=product_uuid)
@ -944,7 +976,9 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
buy_price = product.price
promotions = Promotion.objects.filter(is_active=True, products__in=[product]).order_by("discount_percent")
promotions = Promotion.objects.filter(
is_active=True, products__in=[product]
).order_by("discount_percent")
if promotions.exists():
buy_price -= round(product.price * (promotions.first().discount_percent / 100), 2) # type: ignore
@ -957,7 +991,9 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
)
if not is_created and update_quantity:
if product.quantity < order_product.quantity + 1:
raise BadRequest(_("you cannot add more products than available in stock"))
raise BadRequest(
_("you cannot add more products than available in stock")
)
order_product.quantity += 1
order_product.buy_price = product.price
order_product.save()
@ -969,13 +1005,18 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
raise Http404(_(f"{name} does not exist: {product_uuid}"))
def remove_product(
self, product_uuid: str | None = None, attributes: dict | None = None, zero_quantity: bool = False
self,
product_uuid: str | None = None,
attributes: dict | None = None,
zero_quantity: bool = False,
):
if attributes is None:
attributes = {}
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(_("you cannot remove products from an order that is not a pending one"))
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
try:
product = Product.objects.get(uuid=product_uuid)
order_product = self.order_products.get(product=product, order=self)
@ -994,12 +1035,16 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
raise Http404(_(f"{name} does not exist: {product_uuid}"))
except OrderProduct.DoesNotExist:
name = "OrderProduct"
query = f"product: {product_uuid}, order: {self.uuid}, attributes: {attributes}"
query = (
f"product: {product_uuid}, order: {self.uuid}, attributes: {attributes}"
)
raise Http404(_(f"{name} does not exist with query <{query}>"))
def remove_all_products(self):
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(_("you cannot remove products from an order that is not a pending one"))
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
for order_product in self.order_products.all():
self.order_products.remove(order_product)
order_product.delete()
@ -1007,7 +1052,9 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
def remove_products_of_a_kind(self, product_uuid: str):
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(_("you cannot remove products from an order that is not a pending one"))
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
try:
product = Product.objects.get(uuid=product_uuid)
order_product = self.order_products.get(product=product, order=self)
@ -1020,7 +1067,10 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
@property
def is_whole_digital(self):
return self.order_products.count() == self.order_products.filter(product__is_digital=True).count()
return (
self.order_products.count()
== self.order_products.filter(product__is_digital=True).count()
)
def apply_promocode(self, promocode_uuid: str):
try:
@ -1035,7 +1085,11 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
if self.is_whole_digital:
return
else:
raise ValueError(_("you can only buy physical products with shipping address specified"))
raise ValueError(
_(
"you can only buy physical products with shipping address specified"
)
)
if billing_address_uuid and not shipping_address_uuid:
shipping_address = Address.objects.get(uuid=billing_address_uuid)
@ -1065,9 +1119,13 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
shipping_address: str | None = None,
) -> Self | Transaction | None:
if config.DISABLED_COMMERCE:
raise DisabledCommerceError(_("you can not buy at this moment, please try again in a few minutes"))
raise DisabledCommerceError(
_("you can not buy at this moment, please try again in a few minutes")
)
if (not force_balance and not force_payment) or (force_balance and force_payment):
if (not force_balance and not force_payment) or (
force_balance and force_payment
):
raise ValueError(_("invalid force value"))
self.apply_addresses(billing_address, shipping_address)
@ -1083,12 +1141,16 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
if force_payment:
force = "payment"
amount = self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
amount = (
self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
)
match force:
case "balance":
if self.user.payments_balance.amount < amount: # type: ignore
raise NotEnoughMoneyError(_("insufficient funds to complete the order"))
raise NotEnoughMoneyError(
_("insufficient funds to complete the order")
)
self.status = "CREATED"
self.buy_time = timezone.now()
self.order_products.all().update(status="DELIVERING")
@ -1106,9 +1168,13 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
return self
def buy_without_registration(self, products: list, promocode_uuid: str, **kwargs) -> Transaction | None:
def buy_without_registration(
self, products: list, promocode_uuid: str, **kwargs
) -> Transaction | None:
if config.DISABLED_COMMERCE:
raise DisabledCommerceError(_("you can not buy at this moment, please try again in a few minutes"))
raise DisabledCommerceError(
_("you can not buy at this moment, please try again in a few minutes")
)
if len(products) < 1:
raise ValueError(_("you cannot purchase an empty order!"))
@ -1129,17 +1195,25 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
available_payment_methods = cache.get("payment_methods").get("payment_methods")
if payment_method not in available_payment_methods:
raise ValueError(_(f"invalid payment method: {payment_method} from {available_payment_methods}"))
raise ValueError(
_(
f"invalid payment method: {payment_method} from {available_payment_methods}"
)
)
billing_customer_address_uuid = kwargs.get("billing_customer_address")
shipping_customer_address_uuid = kwargs.get("shipping_customer_address")
self.apply_addresses(billing_customer_address_uuid, shipping_customer_address_uuid)
self.apply_addresses(
billing_customer_address_uuid, shipping_customer_address_uuid
)
for product_uuid in products:
self.add_product(product_uuid)
amount = self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
amount = (
self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
)
self.status = "CREATED"
@ -1182,12 +1256,20 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
def bulk_add_products(self, products: list):
for product in products:
self.add_product(product.get("uuid"), attributes=product.get("attributes"), update_quantity=False)
self.add_product(
product.get("uuid"),
attributes=product.get("attributes"),
update_quantity=False,
)
return self
def bulk_remove_products(self, products: list):
for product in products:
self.remove_product(product.get("uuid"), attributes=product.get("attributes"), zero_quantity=True)
self.remove_product(
product.get("uuid"),
attributes=product.get("attributes"),
zero_quantity=True,
)
return self
@ -1274,7 +1356,13 @@ class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel):
self.notifications.update(
{
"errors": [
{"detail": error if error else f"Something went wrong with {self.uuid} for some reason..."},
{
"detail": (
error
if error
else f"Something went wrong with {self.uuid} for some reason..."
)
},
]
}
)
@ -1297,7 +1385,9 @@ class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel):
return self.download.url
return ""
def do_feedback(self, rating: int = 10, comment: str = "", action: str = "add") -> Optional["Feedback"]:
def do_feedback(
self, rating: int = 10, comment: str = "", action: str = "add"
) -> Optional["Feedback"]:
if action not in ["add", "remove"]:
raise ValueError(_(f"wrong action specified for feedback: {action}"))
if action == "remove" and self.feedback:
@ -1305,9 +1395,13 @@ class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel):
return None
if action == "add" and not self.feedback:
if self.order.status not in ["MOMENTAL", "PENDING"]: # type: ignore
return Feedback.objects.create(rating=rating, comment=comment, order_product=self)
return Feedback.objects.create(
rating=rating, comment=comment, order_product=self
)
else:
raise ValueError(_("you cannot feedback an order which is not received"))
raise ValueError(
_("you cannot feedback an order which is not received")
)
return None
@ -1327,12 +1421,12 @@ class DigitalAssetDownload(ExportModelOperationsMixin("attribute_group"), NiceMo
@property
def url(self):
if self.order_product.status != "FINISHED":
raise ValueError(_("you can not download a digital asset for a non-finished order"))
return (
f"https://api.{config.BASE_DOMAIN}/download/{urlsafe_base64_encode(force_bytes(self.order_product.uuid))}"
raise ValueError(
_("you can not download a digital asset for a non-finished order")
)
return f"https://api.{config.BASE_DOMAIN}/download/{urlsafe_base64_encode(force_bytes(self.order_product.uuid))}"
class Feedback(ExportModelOperationsMixin("feedback"), NiceModel):
is_publicly_visible = True
@ -1348,7 +1442,9 @@ class Feedback(ExportModelOperationsMixin("feedback"), NiceModel):
on_delete=CASCADE,
blank=False,
null=False,
help_text=_("references the specific product in an order that this feedback is about"),
help_text=_(
"references the specific product in an order that this feedback is about"
),
verbose_name=_("related order product"),
)
rating: float = FloatField( # type: ignore

View file

@ -6,7 +6,16 @@ from typing import Any
from django.db import IntegrityError
from core.elasticsearch import process_query
from core.models import Attribute, AttributeGroup, AttributeValue, Brand, Category, Product, Stock, Vendor
from core.models import (
Attribute,
AttributeGroup,
AttributeValue,
Brand,
Category,
Product,
Stock,
Vendor,
)
from payments.errors import RatesError
from payments.utils import get_rates
@ -136,17 +145,23 @@ class AbstractVendor:
return value, "string"
@staticmethod
def auto_resolver_helper(model: Brand | Category, resolving_name: str) -> Brand | Category | None:
def auto_resolver_helper(
model: Brand | Category, resolving_name: str
) -> Brand | Category | None:
queryset = model.objects.filter(name=resolving_name)
if not queryset.exists():
return model.objects.get_or_create(name=resolving_name, defaults={"is_active": False})[0]
return model.objects.get_or_create(
name=resolving_name, defaults={"is_active": False}
)[0]
elif queryset.filter(is_active=True).count() > 1:
queryset = queryset.filter(is_active=True)
elif queryset.filter(is_active=False).count() > 1:
queryset = queryset.filter(is_active=False)
chosen = queryset.first()
if not chosen:
raise VendorError(f"No matching {model.__name__} found with name {resolving_name!r}...")
raise VendorError(
f"No matching {model.__name__} found with name {resolving_name!r}..."
)
queryset = queryset.exclude(uuid=chosen.uuid)
queryset.delete()
return chosen
@ -186,7 +201,10 @@ class AbstractVendor:
return self.auto_resolver_helper(Brand, brand_name) # type: ignore
def resolve_price(
self, original_price: int | float, vendor: Vendor | None = None, category: Category | None = None
self,
original_price: int | float,
vendor: Vendor | None = None,
category: Category | None = None,
) -> float:
if not vendor:
vendor = self.get_vendor_instance()
@ -209,7 +227,9 @@ class AbstractVendor:
rate = rates.get(currency or self.currency)
if not rate:
raise RatesError(f"No rate found for {currency or self.currency} in {rates} with probider {provider}...")
raise RatesError(
f"No rate found for {currency or self.currency} in {rates} with probider {provider}..."
)
return round(price / rate, 2) if rate else round(price, 2)
@ -253,16 +273,22 @@ class AbstractVendor:
return vendor
raise VendorError(f"Vendor {self.vendor_name!r} is inactive...")
except Vendor.DoesNotExist:
raise Exception(f"No matching vendor found with name {self.vendor_name!r}...")
raise Exception(
f"No matching vendor found with name {self.vendor_name!r}..."
)
def get_products(self):
pass
def get_products_queryset(self):
return Product.objects.filter(stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True)
return Product.objects.filter(
stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True
)
def get_stocks_queryset(self):
return Stock.objects.filter(product__in=self.get_products_queryset(), product__orderproduct__isnull=True)
return Stock.objects.filter(
product__in=self.get_products_queryset(), product__orderproduct__isnull=True
)
def get_attribute_values_queryset(self):
return AttributeValue.objects.filter(
@ -280,7 +306,9 @@ class AbstractVendor:
self.get_stocks_queryset().delete()
self.get_attribute_values_queryset().delete()
def process_attribute(self, key: str, value, product: Product, attr_group: AttributeGroup):
def process_attribute(
self, key: str, value, product: Product, attr_group: AttributeGroup
):
if not value:
return

View file

@ -88,7 +88,10 @@ class BlockInvalidHostMiddleware:
allowed_hosts += getenv("ALLOWED_HOSTS").split(" ")
if not hasattr(request, "META"):
return BadRequest("Invalid Request")
if request.META.get("HTTP_HOST") not in allowed_hosts and "*" not in allowed_hosts:
if (
request.META.get("HTTP_HOST") not in allowed_hosts
and "*" not in allowed_hosts
):
return HttpResponseForbidden("Invalid Host Header")
return self.get_response(request)

View file

@ -115,7 +115,6 @@ INSTALLED_APPS: list[str] = [
"health_check.contrib.celery",
"health_check.contrib.celery_ping",
"health_check.contrib.psutil",
"health_check.contrib.redis",
"health_check.contrib.db_heartbeat",
"cacheops",
"django_hosts",
@ -356,7 +355,12 @@ if getenv("SENTRY_DSN"):
]
if DEBUG:
ignore_errors.extend(["billiard.exceptions.WorkerLostError", "billiard.exceptions.TimeLimitExceeded"])
ignore_errors.extend(
[
"billiard.exceptions.WorkerLostError",
"billiard.exceptions.TimeLimitExceeded",
]
)
sentry_sdk.init(
dsn=getenv("SENTRY_DSN"),

View file

@ -4,7 +4,6 @@ from django.db.models import CASCADE, CharField, FloatField, ForeignKey, JSONFie
from django.utils.translation import gettext_lazy as _
from core.abstract import NiceModel
from core.models import Order
class Transaction(NiceModel):
@ -14,7 +13,7 @@ class Transaction(NiceModel):
) # type: ignore
currency: str = CharField(max_length=3, null=False, blank=False) # type: ignore
payment_method: str = CharField(max_length=20, null=True, blank=True) # type: ignore
order: Order = ForeignKey( # type: ignore
order = ForeignKey( # type: ignore
"core.Order",
on_delete=CASCADE,
blank=True,

View file

@ -46,7 +46,9 @@ class AuthTests(TestCase):
def test_obtain_token_view(self):
url = reverse("token_obtain_pair")
response = self.api_client.post(url, {"email": self.user.email, "password": "testpassword"})
response = self.api_client.post(
url, {"email": self.user.email, "password": "testpassword"}
)
self.assertEqual(response.status_code, 200)
self.assertIn("access", response.data)
self.assertIn("refresh", response.data)
@ -56,7 +58,9 @@ class AuthTests(TestCase):
refresh_url = reverse("token_refresh")
# Obtain tokens
obtain_response = self.api_client.post(obtain_url, {"email": self.user.email, "password": "testpassword"})
obtain_response = self.api_client.post(
obtain_url, {"email": self.user.email, "password": "testpassword"}
)
refresh_token = obtain_response.data["refresh"]
# Refresh tokens
@ -69,7 +73,9 @@ class AuthTests(TestCase):
verify_url = reverse("token_verify")
# Obtain tokens
obtain_response = self.api_client.post(obtain_url, {"email": self.user.email, "password": "testpassword"})
obtain_response = self.api_client.post(
obtain_url, {"email": self.user.email, "password": "testpassword"}
)
access_token = obtain_response.data["access"]
# Verify token