Features: 1) Introduced strict parameter for zip function in widgets.py; 2) Added EXTENSIONS_MAX_UNIQUE_QUERY_ATTEMPTS setting;

Fixes: 1) Resolved redundant lines and formatting inconsistencies across multiple files; 2) Corrected Collection typing imports and Optional replacements with union types (e.g., `str | None`);

Extra: Improved formatting and readability by consolidating single-line code sections and simplifying expressions.
This commit is contained in:
Egor Pavlovich Gorbunov 2025-06-21 20:38:37 +03:00
parent 67794a7997
commit fdd92dbf8b
24 changed files with 193 additions and 447 deletions

View file

@ -424,9 +424,7 @@ class ConstanceAdmin(BaseConstanceAdmin):
self.admin_site.admin_view(self.changelist_view),
name=f"{info}_changelist",
),
path(
"", self.admin_site.admin_view(self.changelist_view), name=f"{info}_add"
),
path("", self.admin_site.admin_view(self.changelist_view), name=f"{info}_add"),
]

View file

@ -33,9 +33,7 @@ SMART_FIELDS = [
]
def process_query(
query: str = "", request: Request | None = None
) -> dict[str, list[dict]] | None:
def process_query(query: str = "", request: Request | None = None) -> dict[str, list[dict]] | None:
"""
Perform a lenient, typotolerant, multiindex search.
@ -84,19 +82,13 @@ def process_query(
boost_mode="sum",
)
search = (
Search(index=["products", "categories", "brands", "posts"])
.query(function_score_query)
.extra(size=100)
)
search = Search(index=["products", "categories", "brands", "posts"]).query(function_score_query).extra(size=100)
response = search.execute()
results: dict = {"products": [], "categories": [], "brands": [], "posts": []}
for hit in response.hits:
obj_uuid = getattr(hit, "uuid", None) or hit.meta.id
obj_name = (
getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A"
)
obj_name = getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A"
obj_slug = ""
raw_slug = getattr(hit, "slug", None)
if raw_slug:
@ -226,9 +218,7 @@ def _add_multilang_fields(cls):
copy_to="name",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(
analyzer="name_ngram", search_analyzer="query_lc"
),
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="query_lc"),
"phonetic": fields.TextField(analyzer="name_phonetic"),
},
),
@ -251,9 +241,7 @@ def _add_multilang_fields(cls):
copy_to="description",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(
analyzer="name_ngram", search_analyzer="query_lc"
),
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="query_lc"),
"phonetic": fields.TextField(analyzer="name_phonetic"),
},
),

View file

@ -11,13 +11,9 @@ class _BaseDoc(ActiveOnlyMixin, Document):
analyzer="standard",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(
analyzer="name_ngram", search_analyzer="query_lc"
),
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="query_lc"),
"phonetic": fields.TextField(analyzer="name_phonetic"),
"auto": fields.TextField(
analyzer="autocomplete", search_analyzer="autocomplete_search"
),
"auto": fields.TextField(analyzer="autocomplete", search_analyzer="autocomplete_search"),
},
)
description = fields.TextField(
@ -25,13 +21,9 @@ class _BaseDoc(ActiveOnlyMixin, Document):
analyzer="standard",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(
analyzer="name_ngram", search_analyzer="query_lc"
),
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="query_lc"),
"phonetic": fields.TextField(analyzer="name_phonetic"),
"auto": fields.TextField(
analyzer="autocomplete", search_analyzer="autocomplete_search"
),
"auto": fields.TextField(analyzer="autocomplete", search_analyzer="autocomplete_search"),
},
)
slug = fields.KeywordField(attr="slug", index=False)
@ -90,9 +82,7 @@ class BrandDocument(ActiveOnlyMixin, Document):
analyzer="standard",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(
analyzer="name_ngram", search_analyzer="query_lc"
),
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="query_lc"),
"phonetic": fields.TextField(analyzer="name_phonetic"),
},
)

View file

@ -60,31 +60,19 @@ class CaseInsensitiveListFilter(BaseInFilter, CharFilter):
class ProductFilter(FilterSet):
uuid = UUIDFilter(field_name="uuid", lookup_expr="exact", label=_("UUID"))
name = CharFilter(method="filter_name", label=_("Name"))
categories = CaseInsensitiveListFilter(
field_name="category__name", label=_("Categories")
)
categories = CaseInsensitiveListFilter(field_name="category__name", label=_("Categories"))
category_uuid = CharFilter(method="filter_category", label="Category (UUID)")
categories_slugs = CaseInsensitiveListFilter(
field_name="category__slug", label=_("Categories Slugs")
)
categories_slugs = CaseInsensitiveListFilter(field_name="category__slug", label=_("Categories Slugs"))
tags = CaseInsensitiveListFilter(field_name="tags__tag_name", label=_("Tags"))
min_price = NumberFilter(
field_name="stocks__price", lookup_expr="gte", label=_("Min Price")
)
max_price = NumberFilter(
field_name="stocks__price", lookup_expr="lte", label=_("Max Price")
)
min_price = NumberFilter(field_name="stocks__price", lookup_expr="gte", label=_("Min Price"))
max_price = NumberFilter(field_name="stocks__price", lookup_expr="lte", label=_("Max Price"))
is_active = BooleanFilter(field_name="is_active", label=_("Is Active"))
brand = CharFilter(field_name="brand__name", lookup_expr="iexact", label=_("Brand"))
attributes = CharFilter(method="filter_attributes", label=_("Attributes"))
quantity = NumberFilter(
field_name="stocks__quantity", lookup_expr="gt", label=_("Quantity")
)
quantity = NumberFilter(field_name="stocks__quantity", lookup_expr="gt", label=_("Quantity"))
slug = CharFilter(field_name="slug", lookup_expr="exact", label=_("Slug"))
is_digital = BooleanFilter(field_name="is_digital", label=_("Is Digital"))
include_subcategories = BooleanFilter(
method="filter_include_flag", label=_("Include sub-categories")
)
include_subcategories = BooleanFilter(method="filter_include_flag", label=_("Include sub-categories"))
order_by = OrderingFilter(
fields=(
@ -152,15 +140,11 @@ class ProductFilter(FilterSet):
output_field=IntegerField(),
)
return (
queryset.filter(uuid__in=uuids).annotate(_order=ordering).order_by("_order")
)
return queryset.filter(uuid__in=uuids).annotate(_order=ordering).order_by("_order")
def filter_include_flag(self, queryset, **_kwargs):
if not self.data.get("category_uuid"):
raise BadRequest(
_("there must be a category_uuid to use include_subcategories flag")
)
raise BadRequest(_("there must be a category_uuid to use include_subcategories flag"))
return queryset
def filter_attributes(self, queryset, _name, value):
@ -247,7 +231,7 @@ class ProductFilter(FilterSet):
def _infer_type(value):
try:
parsed_value = json.loads(value)
if isinstance(parsed_value, (list, dict)):
if isinstance(parsed_value, list | dict):
return parsed_value
except (json.JSONDecodeError, TypeError):
pass
@ -296,20 +280,12 @@ class OrderFilter(FilterSet):
label=_("Search (ID, product name or part number)"),
)
min_buy_time = DateTimeFilter(
field_name="buy_time", lookup_expr="gte", label=_("Bought after (inclusive)")
)
max_buy_time = DateTimeFilter(
field_name="buy_time", lookup_expr="lte", label=_("Bought before (inclusive)")
)
min_buy_time = DateTimeFilter(field_name="buy_time", lookup_expr="gte", label=_("Bought after (inclusive)"))
max_buy_time = DateTimeFilter(field_name="buy_time", lookup_expr="lte", label=_("Bought before (inclusive)"))
uuid = UUIDFilter(field_name="uuid", lookup_expr="exact")
user_email = CharFilter(
field_name="user__email", lookup_expr="iexact", label=_("User email")
)
user = UUIDFilter(
field_name="user__uuid", lookup_expr="exact", label=_("User UUID")
)
user_email = CharFilter(field_name="user__email", lookup_expr="iexact", label=_("User email"))
user = UUIDFilter(field_name="user__uuid", lookup_expr="exact", label=_("User UUID"))
status = CharFilter(field_name="status", lookup_expr="icontains", label=_("Status"))
human_readable_id = CharFilter(
field_name="human_readable_id",
@ -355,12 +331,8 @@ class OrderFilter(FilterSet):
class WishlistFilter(FilterSet):
uuid = UUIDFilter(field_name="uuid", lookup_expr="exact")
user_email = CharFilter(
field_name="user__email", lookup_expr="iexact", label=_("User email")
)
user = UUIDFilter(
field_name="user__uuid", lookup_expr="exact", label=_("User UUID")
)
user_email = CharFilter(field_name="user__email", lookup_expr="iexact", label=_("User email"))
user = UUIDFilter(field_name="user__uuid", lookup_expr="exact", label=_("User UUID"))
order_by = OrderingFilter(
fields=(
@ -422,9 +394,7 @@ class CategoryFilter(FilterSet):
output_field=IntegerField(),
)
return (
queryset.filter(uuid__in=uuids).annotate(_order=ordering).order_by("_order")
)
return queryset.filter(uuid__in=uuids).annotate(_order=ordering).order_by("_order")
def filter_whole_categories(self, queryset, _name, value):
has_own_products = Exists(Product.objects.filter(category=OuterRef("pk")))
@ -456,9 +426,7 @@ class CategoryFilter(FilterSet):
class BrandFilter(FilterSet):
uuid = UUIDFilter(field_name="uuid", lookup_expr="exact")
name = CharFilter(method="filter_name", label=_("Name"))
categories = CaseInsensitiveListFilter(
field_name="categories__uuid", lookup_expr="exact", label=_("Categories")
)
categories = CaseInsensitiveListFilter(field_name="categories__uuid", lookup_expr="exact", label=_("Categories"))
order_by = OrderingFilter(
fields=(
@ -484,9 +452,7 @@ class BrandFilter(FilterSet):
output_field=IntegerField(),
)
return (
queryset.filter(uuid__in=uuids).annotate(_order=ordering).order_by("_order")
)
return queryset.filter(uuid__in=uuids).annotate(_order=ordering).order_by("_order")
class FeedbackFilter(FilterSet):
@ -520,12 +486,8 @@ class FeedbackFilter(FilterSet):
class AddressFilter(FilterSet):
uuid = UUIDFilter(field_name="uuid", lookup_expr="exact", label=_("UUID"))
user_uuid = UUIDFilter(
field_name="user__uuid", lookup_expr="exact", label=_("User UUID")
)
user_email = CharFilter(
field_name="user__email", lookup_expr="iexact", label=_("User email")
)
user_uuid = UUIDFilter(field_name="user__uuid", lookup_expr="exact", label=_("User UUID"))
user_email = CharFilter(field_name="user__email", lookup_expr="iexact", label=_("User email"))
order_by = OrderingFilter(
fields=(

View file

@ -35,9 +35,7 @@ class CacheOperator(BaseMutation):
description = _("cache I/O")
class Arguments:
key = String(
required=True, description=_("key to look for in or set into the cache")
)
key = String(required=True, description=_("key to look for in or set into the cache"))
data = GenericScalar(required=False, description=_("data to store in cache"))
timeout = Int(
required=False,
@ -67,9 +65,7 @@ class RequestCursedURL(BaseMutation):
try:
data = cache.get(url, None)
if not data:
response = requests.get(
url, headers={"content-type": "application/json"}
)
response = requests.get(url, headers={"content-type": "application/json"})
response.raise_for_status()
data = camelize(response.json())
cache.set(url, data, 86400)
@ -97,9 +93,7 @@ class AddOrderProduct(BaseMutation):
if not (user.has_perm("core.add_orderproduct") or user == order.user):
raise PermissionDenied(permission_denied_message)
order = order.add_product(
product_uuid=product_uuid, attributes=format_attributes(attributes)
)
order = order.add_product(product_uuid=product_uuid, attributes=format_attributes(attributes))
return AddOrderProduct(order=order)
except Order.DoesNotExist:
@ -125,9 +119,7 @@ class RemoveOrderProduct(BaseMutation):
if not (user.has_perm("core.change_orderproduct") or user == order.user):
raise PermissionDenied(permission_denied_message)
order = order.remove_product(
product_uuid=product_uuid, attributes=format_attributes(attributes)
)
order = order.remove_product(product_uuid=product_uuid, attributes=format_attributes(attributes))
return AddOrderProduct(order=order)
except Order.DoesNotExist:
@ -206,11 +198,7 @@ class BuyOrder(BaseMutation):
billing_address=None,
):
if not any([order_uuid, order_hr_id]) or all([order_uuid, order_hr_id]):
raise BadRequest(
_(
"please provide either order_uuid or order_hr_id - mutually exclusive"
)
)
raise BadRequest(_("please provide either order_uuid or order_hr_id - mutually exclusive"))
user = info.context.user
try:
order = None
@ -234,11 +222,7 @@ class BuyOrder(BaseMutation):
case "<class 'core.models.Order'>":
return BuyOrder(order=instance)
case _:
raise TypeError(
_(
f"wrong type came from order.buy() method: {type(instance)!s}"
)
)
raise TypeError(_(f"wrong type came from order.buy() method: {type(instance)!s}"))
except Order.DoesNotExist:
raise Http404(_(f"order {order_uuid} not found"))
@ -266,11 +250,7 @@ class BulkOrderAction(BaseMutation):
order_hr_id=None,
):
if not any([order_uuid, order_hr_id]) or all([order_uuid, order_hr_id]):
raise BadRequest(
_(
"please provide either order_uuid or order_hr_id - mutually exclusive"
)
)
raise BadRequest(_("please provide either order_uuid or order_hr_id - mutually exclusive"))
user = info.context.user
try:
order = None
@ -452,20 +432,14 @@ class BuyWishlist(BaseMutation):
):
order.add_product(product_uuid=product.pk)
instance = order.buy(
force_balance=force_balance, force_payment=force_payment
)
instance = order.buy(force_balance=force_balance, force_payment=force_payment)
match str(type(instance)):
case "<class 'payments.models.Transaction'>":
return BuyWishlist(transaction=instance)
case "<class 'core.models.Order'>":
return BuyWishlist(order=instance)
case _:
raise TypeError(
_(
f"wrong type came from order.buy() method: {type(instance)!s}"
)
)
raise TypeError(_(f"wrong type came from order.buy() method: {type(instance)!s}"))
except Wishlist.DoesNotExist:
raise Http404(_(f"wishlist {wishlist_uuid} not found"))
@ -479,9 +453,7 @@ class BuyProduct(BaseMutation):
product_uuid = UUID(required=True)
attributes = String(
required=False,
description=_(
"please send the attributes as the string formatted like attr1=value1,attr2=value2"
),
description=_("please send the attributes as the string formatted like attr1=value1,attr2=value2"),
)
force_balance = Boolean(required=False)
force_payment = Boolean(required=False)
@ -500,9 +472,7 @@ class BuyProduct(BaseMutation):
):
user = info.context.user
order = Order.objects.create(user=user, status="MOMENTAL")
order.add_product(
product_uuid=product_uuid, attributes=format_attributes(attributes)
)
order.add_product(product_uuid=product_uuid, attributes=format_attributes(attributes))
instance = order.buy(force_balance=force_balance, force_payment=force_payment)
match str(type(instance)):
case "<class 'payments.models.Transaction'>":
@ -510,9 +480,7 @@ class BuyProduct(BaseMutation):
case "<class 'core.models.Order'>":
return BuyProduct(order=instance)
case _:
raise TypeError(
_(f"wrong type came from order.buy() method: {type(instance)!s}")
)
raise TypeError(_(f"wrong type came from order.buy() method: {type(instance)!s}"))
class CreateProduct(BaseMutation):
@ -528,9 +496,7 @@ class CreateProduct(BaseMutation):
if not info.context.user.has_perm("core.add_product"):
raise PermissionDenied(permission_denied_message)
category = Category.objects.get(uuid=category_uuid)
product = Product.objects.create(
name=name, description=description, category=category
)
product = Product.objects.create(name=name, description=description, category=category)
return CreateProduct(product=product)
@ -577,9 +543,7 @@ class DeleteProduct(BaseMutation):
class CreateAddress(BaseMutation):
class Arguments:
raw_data = String(
required=True, description=_("original address string provided by the user")
)
raw_data = String(required=True, description=_("original address string provided by the user"))
address = Field(AddressType)

View file

@ -101,16 +101,10 @@ class BrandType(DjangoObjectType):
return self.categories.filter(is_active=True)
def resolve_big_logo(self: Brand, info):
return (
info.context.build_absolute_uri(self.big_logo.url) if self.big_logo else ""
)
return info.context.build_absolute_uri(self.big_logo.url) if self.big_logo else ""
def resolve_small_logo(self: Brand, info):
return (
info.context.build_absolute_uri(self.small_logo.url)
if self.small_logo
else ""
)
return info.context.build_absolute_uri(self.small_logo.url) if self.small_logo else ""
class FilterableAttributeType(ObjectType):
@ -132,22 +126,14 @@ class CategoryType(DjangoObjectType):
markup_percent = Float(required=False, description=_("markup percentage"))
filterable_attributes = List(
NonNull(FilterableAttributeType),
description=_(
"which attributes and values can be used for filtering this category."
),
description=_("which attributes and values can be used for filtering this category."),
)
min_max_prices = Field(
NonNull(MinMaxPriceType),
description=_(
"minimum and maximum prices for products in this category, if available."
),
)
tags = DjangoFilterConnectionField(
lambda: CategoryTagType, description=_("tags for this category")
)
products = DjangoFilterConnectionField(
lambda: ProductType, description=_("products in this category")
description=_("minimum and maximum prices for products in this category, if available."),
)
tags = DjangoFilterConnectionField(lambda: CategoryTagType, description=_("tags for this category"))
products = DjangoFilterConnectionField(lambda: ProductType, description=_("products in this category"))
class Meta:
model = Category
@ -223,9 +209,7 @@ class CategoryType(DjangoObjectType):
)
min_max_prices["min_price"] = price_aggregation.get("min_price", 0.0)
min_max_prices["max_price"] = price_aggregation.get("max_price", 0.0)
cache.set(
key=f"{self.name}_min_max_prices", value=min_max_prices, timeout=86400
)
cache.set(key=f"{self.name}_min_max_prices", value=min_max_prices, timeout=86400)
return {
"min_price": min_max_prices["min_price"],
@ -275,9 +259,7 @@ class AddressType(DjangoObjectType):
class FeedbackType(DjangoObjectType):
comment = String(description=_("comment"))
rating = Int(
description=_("rating value from 1 to 10, inclusive, or 0 if not set.")
)
rating = Int(description=_("rating value from 1 to 10, inclusive, or 0 if not set."))
class Meta:
model = Feedback
@ -290,9 +272,7 @@ class FeedbackType(DjangoObjectType):
class OrderProductType(DjangoObjectType):
attributes = GenericScalar(description=_("attributes"))
notifications = GenericScalar(description=_("notifications"))
download_url = String(
description=_("download url for this order product if applicable")
)
download_url = String(description=_("download url for this order product if applicable"))
class Meta:
model = OrderProduct
@ -326,9 +306,7 @@ class OrderType(DjangoObjectType):
billing_address = Field(AddressType, description=_("billing address"))
shipping_address = Field(
AddressType,
description=_(
"shipping address for this order, leave blank if same as billing address or if not applicable"
),
description=_("shipping address for this order, leave blank if same as billing address or if not applicable"),
)
total_price = Float(description=_("total price of this order"))
total_quantity = Int(description=_("total quantity of products in order"))
@ -386,9 +364,7 @@ class ProductType(DjangoObjectType):
images = DjangoFilterConnectionField(ProductImageType, description=_("images"))
feedbacks = DjangoFilterConnectionField(FeedbackType, description=_("feedbacks"))
brand = Field(BrandType, description=_("brand"))
attribute_groups = DjangoFilterConnectionField(
AttributeGroupType, description=_("attribute groups")
)
attribute_groups = DjangoFilterConnectionField(AttributeGroupType, description=_("attribute groups"))
price = Float(description=_("price"))
quantity = Float(description=_("quantity"))
feedbacks_count = Int(description=_("number of feedbacks"))
@ -425,9 +401,7 @@ class ProductType(DjangoObjectType):
def resolve_attribute_groups(self: Product, info):
info.context._product_uuid = self.uuid
return AttributeGroup.objects.filter(
attributes__values__product=self
).distinct()
return AttributeGroup.objects.filter(attributes__values__product=self).distinct()
def resolve_quantity(self, _info) -> int:
return self.quantity or 0
@ -462,20 +436,14 @@ class PromoCodeType(DjangoObjectType):
description = _("promocodes")
def resolve_discount(self: PromoCode, _info) -> float:
return (
float(self.discount_percent)
if self.discount_percent
else float(self.discount_amount)
)
return float(self.discount_percent) if self.discount_percent else float(self.discount_amount)
def resolve_discount_type(self: PromoCode, _info) -> str:
return "percent" if self.discount_percent else "amount"
class PromotionType(DjangoObjectType):
products = DjangoFilterConnectionField(
ProductType, description=_("products on sale")
)
products = DjangoFilterConnectionField(ProductType, description=_("products on sale"))
class Meta:
model = Promotion
@ -498,9 +466,7 @@ class StockType(DjangoObjectType):
class WishlistType(DjangoObjectType):
products = DjangoFilterConnectionField(
ProductType, description=_("wishlisted products")
)
products = DjangoFilterConnectionField(ProductType, description=_("wishlisted products"))
class Meta:
model = Wishlist
@ -510,9 +476,7 @@ class WishlistType(DjangoObjectType):
class ProductTagType(DjangoObjectType):
product_set = DjangoFilterConnectionField(
ProductType, description=_("tagged products")
)
product_set = DjangoFilterConnectionField(ProductType, description=_("tagged products"))
class Meta:
model = ProductTag
@ -523,9 +487,7 @@ class ProductTagType(DjangoObjectType):
class CategoryTagType(DjangoObjectType):
category_set = DjangoFilterConnectionField(
CategoryType, description=_("tagged categories")
)
category_set = DjangoFilterConnectionField(CategoryType, description=_("tagged categories"))
class Meta:
model = CategoryTag
@ -541,11 +503,7 @@ class ConfigType(ObjectType):
company_name = String(description=_("company name"))
company_address = String(description=_("company address"))
company_phone_number = String(description=_("company phone number"))
email_from = String(
description=_(
"email from, sometimes it must be used instead of host user value"
)
)
email_from = String(description=_("email from, sometimes it must be used instead of host user value"))
email_host_user = String(description=_("email host user"))
payment_gateway_maximum = Float(description=_("maximum amount for payment"))
payment_gateway_minimum = Float(description=_("minimum amount for payment"))
@ -593,15 +551,9 @@ class SearchPostsResultsType(ObjectType):
class SearchResultsType(ObjectType):
products = List(
description=_("products search results"), of_type=SearchProductsResultsType
)
categories = List(
description=_("products search results"), of_type=SearchCategoriesResultsType
)
brands = List(
description=_("products search results"), of_type=SearchBrandsResultsType
)
products = List(description=_("products search results"), of_type=SearchProductsResultsType)
categories = List(description=_("products search results"), of_type=SearchCategoriesResultsType)
brands = List(description=_("products search results"), of_type=SearchBrandsResultsType)
posts = List(description=_("posts search results"), of_type=SearchPostsResultsType)

View file

@ -79,7 +79,9 @@ def load_po_sanitized(path: str) -> polib.POFile | None:
rest = parts[1] if len(parts) > 1 else ""
rest_clean = re.sub(r"^msgid \"\"\s*\nmsgstr \"\"\s*\n?", "", rest, flags=re.MULTILINE)
sanitized = header + "\n\n" + rest_clean
tmp = NamedTemporaryFile(mode="w+", delete=False, suffix=".po", encoding="utf-8") # noqa: SIM115
tmp = NamedTemporaryFile( # noqa: SIM115
mode="w+", delete=False, suffix=".po", encoding="utf-8"
)
try:
tmp.write(sanitized)
tmp.flush()
@ -163,7 +165,12 @@ class Command(BaseCommand):
entries = [e for e in en_po if e.msgid and not e.obsolete]
source_map = {e.msgid: e.msgstr for e in entries}
tgt_dir = os.path.join(app_conf.path, "locale", target_lang.replace("-", "_"), "LC_MESSAGES")
tgt_dir = os.path.join(
app_conf.path,
"locale",
target_lang.replace("-", "_"),
"LC_MESSAGES",
)
os.makedirs(tgt_dir, exist_ok=True)
tgt_path = os.path.join(tgt_dir, "django.po")
@ -221,7 +228,7 @@ class Command(BaseCommand):
if len(trans) != len(to_trans):
raise CommandError(f"Got {len(trans)} translations, expected {len(to_trans)}")
for e, obj, pmap in zip(to_trans, trans, maps):
for e, obj, pmap in zip(to_trans, trans, maps, strict=True):
e.msgstr = deplaceholderize(obj["text"], pmap)
new_po.save(tgt_path)

View file

@ -11,17 +11,10 @@ class Command(BaseCommand):
def reset_em(self, queryset):
total = queryset.count()
self.stdout.write(
f"Starting slug rebuilding for {total} {queryset.model._meta.verbose_name_plural}"
)
self.stdout.write(f"Starting slug rebuilding for {total} {queryset.model._meta.verbose_name_plural}")
for idx, instance in enumerate(queryset.iterator(), start=1):
try:
while (
queryset.filter(name=instance.name)
.exclude(uuid=instance.uuid)
.count()
>= 1
):
while queryset.filter(name=instance.name).exclude(uuid=instance.uuid).count() >= 1:
instance.name = f"{instance.name} - {get_random_string(length=3, allowed_chars='0123456789')}"
instance.save()
instance.slug = None
@ -43,7 +36,6 @@ class Command(BaseCommand):
)
def handle(self, *args, **options):
for queryset in [
Brand.objects.all(),
Category.objects.all(),

View file

@ -50,6 +50,7 @@ from core.utils import (
get_product_uuid_as_path,
get_random_code,
)
from core.utils.db import TweakedAutoSlugField
from core.utils.lists import FAILED_STATUSES
from core.validators import validate_category_image_dimensions
from evibes.settings import CURRENCY_CODE
@ -91,9 +92,7 @@ class Vendor(ExportModelOperationsMixin("vendor"), NiceModel):
authentication: dict = JSONField( # type: ignore
blank=True,
null=True,
help_text=_(
"stores credentials and endpoints required for vendor communication"
),
help_text=_("stores credentials and endpoints required for vendor communication"),
verbose_name=_("authentication info"),
)
markup_percent: int = IntegerField( # type: ignore
@ -213,9 +212,9 @@ class Category(ExportModelOperationsMixin("category"), NiceModel, MPTTModel):
verbose_name=_("category description"),
)
slug: str = AutoSlugField( # type: ignore
slug: str = TweakedAutoSlugField( # type: ignore
populate_from=(
"uuid",
"parent__name",
"name",
),
allow_unicode=True,
@ -238,10 +237,7 @@ class Category(ExportModelOperationsMixin("category"), NiceModel, MPTTModel):
def get_tree_depth(self):
if self.is_leaf_node():
return 0
return (
self.get_descendants().aggregate(max_depth=Max("level"))["max_depth"]
- self.get_level()
)
return self.get_descendants().aggregate(max_depth=Max("level"))["max_depth"] - self.get_level()
class Meta:
verbose_name = _("category")
@ -402,9 +398,7 @@ class Product(ExportModelOperationsMixin("product"), NiceModel):
cache_key = f"product_feedbacks_count_{self.pk}"
feedbacks_count = cache.get(cache_key)
if feedbacks_count is None:
feedbacks_count = Feedback.objects.filter(
order_product__product_id=self.pk
).count()
feedbacks_count = Feedback.objects.filter(order_product__product_id=self.pk).count()
cache.set(cache_key, feedbacks_count, 604800)
return feedbacks_count
@ -703,9 +697,7 @@ class Wishlist(ExportModelOperationsMixin("wishlist"), NiceModel):
class Documentary(ExportModelOperationsMixin("attribute_group"), NiceModel):
is_publicly_visible = True
product: ForeignKey = ForeignKey(
to=Product, on_delete=CASCADE, related_name="documentaries"
)
product: ForeignKey = ForeignKey(to=Product, on_delete=CASCADE, related_name="documentaries")
document = FileField(upload_to=get_product_uuid_as_path)
class Meta:
@ -833,9 +825,7 @@ class PromoCode(ExportModelOperationsMixin("promocode"), NiceModel):
self.discount_amount is None and self.discount_percent is None
):
raise ValidationError(
_(
"only one type of discount should be defined (amount or percent), but not both or neither."
)
_("only one type of discount should be defined (amount or percent), but not both or neither.")
)
super().save(**kwargs)
@ -856,15 +846,11 @@ class PromoCode(ExportModelOperationsMixin("promocode"), NiceModel):
if self.discount_type == "percent":
amount -= round(amount * (self.discount_percent / 100), 2)
order.attributes.update(
{"promocode": str(self.uuid), "final_price": amount}
)
order.attributes.update({"promocode": str(self.uuid), "final_price": amount})
order.save()
elif self.discount_type == "amount":
amount -= round(float(self.discount_amount), 2)
order.attributes.update(
{"promocode": str(self.uuid), "final_price": amount}
)
order.attributes.update({"promocode": str(self.uuid), "final_price": amount})
order.save()
else:
raise ValueError(_(f"invalid discount type for promocode {self.uuid}"))
@ -972,8 +958,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
sum(
(
order_product.buy_price * order_product.quantity
if order_product.status not in FAILED_STATUSES
and order_product.buy_price is not None
if order_product.status not in FAILED_STATUSES and order_product.buy_price is not None
else 0.0
)
for order_product in self.order_products.all()
@ -990,16 +975,14 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
def add_product(
self,
product_uuid: str | None = None,
attributes: Optional[list] = None,
attributes: list | None = None,
update_quantity: bool = True,
):
if attributes is None:
attributes = []
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(
_("you cannot add products to an order that is not a pending one")
)
raise ValueError(_("you cannot add products to an order that is not a pending one"))
try:
product = Product.objects.get(uuid=product_uuid)
@ -1008,9 +991,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
buy_price = product.price
promotions = Promotion.objects.filter(
is_active=True, products__in=[product]
).order_by("discount_percent")
promotions = Promotion.objects.filter(is_active=True, products__in=[product]).order_by("discount_percent")
if promotions.exists():
buy_price -= round(product.price * (promotions.first().discount_percent / 100), 2) # type: ignore
@ -1023,9 +1004,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
)
if not is_created and update_quantity:
if product.quantity < order_product.quantity + 1:
raise BadRequest(
_("you cannot add more products than available in stock")
)
raise BadRequest(_("you cannot add more products than available in stock"))
order_product.quantity += 1
order_product.buy_price = product.price
order_product.save()
@ -1046,9 +1025,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
attributes = {}
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
raise ValueError(_("you cannot remove products from an order that is not a pending one"))
try:
product = Product.objects.get(uuid=product_uuid)
order_product = self.order_products.get(product=product, order=self)
@ -1067,16 +1044,12 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
raise Http404(_(f"{name} does not exist: {product_uuid}"))
except OrderProduct.DoesNotExist:
name = "OrderProduct"
query = (
f"product: {product_uuid}, order: {self.uuid}, attributes: {attributes}"
)
query = f"product: {product_uuid}, order: {self.uuid}, attributes: {attributes}"
raise Http404(_(f"{name} does not exist with query <{query}>"))
def remove_all_products(self):
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
raise ValueError(_("you cannot remove products from an order that is not a pending one"))
for order_product in self.order_products.all():
self.order_products.remove(order_product)
order_product.delete()
@ -1084,9 +1057,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
def remove_products_of_a_kind(self, product_uuid: str):
if self.status not in ["PENDING", "MOMENTAL"]:
raise ValueError(
_("you cannot remove products from an order that is not a pending one")
)
raise ValueError(_("you cannot remove products from an order that is not a pending one"))
try:
product = Product.objects.get(uuid=product_uuid)
order_product = self.order_products.get(product=product, order=self)
@ -1099,10 +1070,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
@property
def is_whole_digital(self):
return (
self.order_products.count()
== self.order_products.filter(product__is_digital=True).count()
)
return self.order_products.count() == self.order_products.filter(product__is_digital=True).count()
def apply_promocode(self, promocode_uuid: str):
try:
@ -1117,11 +1085,7 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
if self.is_whole_digital:
return
else:
raise ValueError(
_(
"you can only buy physical products with shipping address specified"
)
)
raise ValueError(_("you can only buy physical products with shipping address specified"))
if billing_address_uuid and not shipping_address_uuid:
shipping_address = Address.objects.get(uuid=billing_address_uuid)
@ -1151,13 +1115,9 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
shipping_address: str | None = None,
) -> Self | Transaction | None:
if config.DISABLED_COMMERCE:
raise DisabledCommerceError(
_("you can not buy at this moment, please try again in a few minutes")
)
raise DisabledCommerceError(_("you can not buy at this moment, please try again in a few minutes"))
if (not force_balance and not force_payment) or (
force_balance and force_payment
):
if (not force_balance and not force_payment) or (force_balance and force_payment):
raise ValueError(_("invalid force value"))
self.apply_addresses(billing_address, shipping_address)
@ -1173,16 +1133,12 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
if force_payment:
force = "payment"
amount = (
self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
)
amount = self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
match force:
case "balance":
if self.user.payments_balance.amount < amount: # type: ignore
raise NotEnoughMoneyError(
_("insufficient funds to complete the order")
)
raise NotEnoughMoneyError(_("insufficient funds to complete the order"))
self.status = "CREATED"
self.buy_time = timezone.now()
self.order_products.all().update(status="DELIVERING")
@ -1200,13 +1156,9 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
return self
def buy_without_registration(
self, products: list, promocode_uuid: str, **kwargs
) -> Transaction | None:
def buy_without_registration(self, products: list, promocode_uuid: str, **kwargs) -> Transaction | None:
if config.DISABLED_COMMERCE:
raise DisabledCommerceError(
_("you can not buy at this moment, please try again in a few minutes")
)
raise DisabledCommerceError(_("you can not buy at this moment, please try again in a few minutes"))
if len(products) < 1:
raise ValueError(_("you cannot purchase an empty order!"))
@ -1227,25 +1179,17 @@ class Order(ExportModelOperationsMixin("order"), NiceModel):
available_payment_methods = cache.get("payment_methods").get("payment_methods")
if payment_method not in available_payment_methods:
raise ValueError(
_(
f"invalid payment method: {payment_method} from {available_payment_methods}"
)
)
raise ValueError(_(f"invalid payment method: {payment_method} from {available_payment_methods}"))
billing_customer_address_uuid = kwargs.get("billing_customer_address")
shipping_customer_address_uuid = kwargs.get("shipping_customer_address")
self.apply_addresses(
billing_customer_address_uuid, shipping_customer_address_uuid
)
self.apply_addresses(billing_customer_address_uuid, shipping_customer_address_uuid)
for product_uuid in products:
self.add_product(product_uuid)
amount = (
self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
)
amount = self.apply_promocode(promocode_uuid) if promocode_uuid else self.total_price
self.status = "CREATED"
@ -1390,9 +1334,7 @@ class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel):
"errors": [
{
"detail": (
error
if error
else f"Something went wrong with {self.uuid} for some reason..."
error if error else f"Something went wrong with {self.uuid} for some reason..."
)
},
]
@ -1417,9 +1359,7 @@ class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel):
return self.download.url
return ""
def do_feedback(
self, rating: int = 10, comment: str = "", action: str = "add"
) -> Optional["Feedback"]:
def do_feedback(self, rating: int = 10, comment: str = "", action: str = "add") -> Optional["Feedback"]:
if action not in ["add", "remove"]:
raise ValueError(_(f"wrong action specified for feedback: {action}"))
if action == "remove" and self.feedback:
@ -1427,13 +1367,9 @@ class OrderProduct(ExportModelOperationsMixin("order_product"), NiceModel):
return None
if action == "add" and not self.feedback:
if self.order.status not in ["MOMENTAL", "PENDING"]: # type: ignore
return Feedback.objects.create(
rating=rating, comment=comment, order_product=self
)
return Feedback.objects.create(rating=rating, comment=comment, order_product=self)
else:
raise ValueError(
_("you cannot feedback an order which is not received")
)
raise ValueError(_("you cannot feedback an order which is not received"))
return None
@ -1453,11 +1389,11 @@ class DigitalAssetDownload(ExportModelOperationsMixin("attribute_group"), NiceMo
@property
def url(self):
if self.order_product.status != "FINISHED":
raise ValueError(
_("you can not download a digital asset for a non-finished order")
)
raise ValueError(_("you can not download a digital asset for a non-finished order"))
return f"https://api.{config.BASE_DOMAIN}/download/{urlsafe_base64_encode(force_bytes(self.order_product.uuid))}"
return (
f"https://api.{config.BASE_DOMAIN}/download/{urlsafe_base64_encode(force_bytes(self.order_product.uuid))}"
)
class Feedback(ExportModelOperationsMixin("feedback"), NiceModel):
@ -1474,9 +1410,7 @@ class Feedback(ExportModelOperationsMixin("feedback"), NiceModel):
on_delete=CASCADE,
blank=False,
null=False,
help_text=_(
"references the specific product in an order that this feedback is about"
),
help_text=_("references the specific product in an order that this feedback is about"),
verbose_name=_("related order product"),
)
rating: float = FloatField( # type: ignore

View file

@ -1,6 +1,7 @@
import logging
from collections.abc import Collection
from contextlib import suppress
from typing import Any, Collection, Optional
from typing import Any
from django.contrib.auth.models import AnonymousUser
from django.core.cache import cache
@ -69,7 +70,7 @@ class CategoryDetailSerializer(ModelSerializer):
"modified",
]
def get_image(self, obj: Category) -> Optional[str]:
def get_image(self, obj: Category) -> str | None:
with suppress(ValueError):
return obj.image.url
return None
@ -148,12 +149,12 @@ class BrandDetailSerializer(ModelSerializer):
"small_logo",
]
def get_small_logo(self, obj: Brand) -> Optional[str]:
def get_small_logo(self, obj: Brand) -> str | None:
with suppress(ValueError):
return obj.small_logo.url
return None
def get_big_logo(self, obj: Brand) -> Optional[str]:
def get_big_logo(self, obj: Brand) -> str | None:
with suppress(ValueError):
return obj.big_logo.url
return None
@ -174,12 +175,12 @@ class BrandProductDetailSerializer(ModelSerializer):
"small_logo",
]
def get_small_logo(self, obj: Brand) -> Optional[str]:
def get_small_logo(self, obj: Brand) -> str | None:
with suppress(ValueError):
return obj.small_logo.url
return None
def get_big_logo(self, obj: Brand) -> Optional[str]:
def get_big_logo(self, obj: Brand) -> str | None:
with suppress(ValueError):
return obj.big_logo.url
return None

View file

@ -1,5 +1,5 @@
from collections.abc import Collection
from contextlib import suppress
from typing import Collection, Optional
from rest_framework.fields import JSONField, SerializerMethodField
from rest_framework.relations import PrimaryKeyRelatedField
@ -54,7 +54,7 @@ class CategorySimpleSerializer(ModelSerializer):
"children",
]
def get_image(self, obj: Category) -> Optional[str]:
def get_image(self, obj: Category) -> str | None:
with suppress(ValueError):
return obj.image.url
return None
@ -89,12 +89,12 @@ class BrandSimpleSerializer(ModelSerializer):
"big_logo",
]
def get_small_logo(self, obj: Brand) -> Optional[str]:
def get_small_logo(self, obj: Brand) -> str | None:
with suppress(ValueError):
return obj.small_logo.url
return None
def get_big_logo(self, obj: Brand) -> Optional[str]:
def get_big_logo(self, obj: Brand) -> str | None:
with suppress(ValueError):
return obj.big_logo.url
return None

View file

@ -33,9 +33,7 @@ def create_order_on_user_creation_signal(instance, created, **_kwargs):
if Order.objects.filter(human_readable_id=human_readable_id).exists():
human_readable_id = generate_human_readable_id()
continue
Order.objects.create(
user=instance, status="PENDING", human_readable_id=human_readable_id
)
Order.objects.create(user=instance, status="PENDING", human_readable_id=human_readable_id)
break
@ -49,9 +47,7 @@ def create_wishlist_on_user_creation_signal(instance, created, **_kwargs):
def create_promocode_on_user_referring(instance, created, **_kwargs):
try:
if created and instance.attributes.get("referrer", ""):
referrer_uuid = urlsafe_base64_decode(
instance.attributes.get("referrer", "")
)
referrer_uuid = urlsafe_base64_decode(instance.attributes.get("referrer", ""))
referrer = User.objects.get(uuid=referrer_uuid)
code = f"WELCOME-{get_random_string(6)}"
PromoCode.objects.create(
@ -78,9 +74,7 @@ def process_order_changes(instance, created, **_kwargs):
except IntegrityError:
human_readable_id = generate_human_readable_id()
while True:
if Order.objects.filter(
human_readable_id=human_readable_id
).exists():
if Order.objects.filter(human_readable_id=human_readable_id).exists():
human_readable_id = generate_human_readable_id()
continue
Order.objects.create(
@ -100,31 +94,20 @@ def process_order_changes(instance, created, **_kwargs):
try:
vendor_name = (
order_product.product.stocks.filter(
price=order_product.buy_price
)
.first()
.vendor.name.lower()
order_product.product.stocks.filter(price=order_product.buy_price).first().vendor.name.lower()
)
vendor = create_object(
f"core.vendors.{vendor_name}", f"{vendor_name.title()}Vendor"
)
vendor = create_object(f"core.vendors.{vendor_name}", f"{vendor_name.title()}Vendor")
vendor.buy_order_product(order_product)
except Exception as e:
order_product.add_error(
f"Failed to buy {order_product.uuid}. Reason: {e}..."
)
order_product.add_error(f"Failed to buy {order_product.uuid}. Reason: {e}...")
else:
instance.finalize()
if (
instance.order_products.filter(status="FAILED").count()
== instance.order_products.count()
):
if instance.order_products.filter(status="FAILED").count() == instance.order_products.count():
instance.status = "FAILED"
instance.save()

View file

@ -1,5 +1,7 @@
from django.db.models import Model
from django.db.models.constants import LOOKUP_SEP
from django.utils.translation import gettext_lazy as _
from django_extensions.db.fields import AutoSlugField
def list_to_queryset(model: Model, data: list):
@ -10,3 +12,21 @@ def list_to_queryset(model: Model, data: list):
pk_list = [obj.pk for obj in data]
return model.objects.filter(pk__in=pk_list)
class TweakedAutoSlugField(AutoSlugField):
def get_slug_fields(self, model_instance, lookup_value):
if callable(lookup_value):
return f"{lookup_value(model_instance)}"
lookup_value_path = lookup_value.split(LOOKUP_SEP)
attr = model_instance
for elem in lookup_value_path:
try:
attr = getattr(attr, elem)
except AttributeError:
attr = ""
if callable(attr):
return f"{attr()}"
return attr

View file

@ -1,11 +1,9 @@
from typing import Dict, List
import requests
from constance import config
from django.utils.translation import gettext as _
def fetch_address_suggestions(query: str, limit: int = 5) -> List[Dict]:
def fetch_address_suggestions(query: str, limit: int = 5) -> list[dict]:
if not config.NOMINATIM_URL:
raise ValueError(_("NOMINATIM_URL must be configured."))

View file

@ -145,23 +145,17 @@ class AbstractVendor:
return value, "string"
@staticmethod
def auto_resolver_helper(
model: Brand | Category, resolving_name: str
) -> Brand | Category | None:
def auto_resolver_helper(model: Brand | Category, resolving_name: str) -> Brand | Category | None:
queryset = model.objects.filter(name=resolving_name)
if not queryset.exists():
return model.objects.get_or_create(
name=resolving_name, defaults={"is_active": False}
)[0]
return model.objects.get_or_create(name=resolving_name, defaults={"is_active": False})[0]
elif queryset.filter(is_active=True).count() > 1:
queryset = queryset.filter(is_active=True)
elif queryset.filter(is_active=False).count() > 1:
queryset = queryset.filter(is_active=False)
chosen = queryset.first()
if not chosen:
raise VendorError(
f"No matching {model.__name__} found with name {resolving_name!r}..."
)
raise VendorError(f"No matching {model.__name__} found with name {resolving_name!r}...")
queryset = queryset.exclude(uuid=chosen.uuid)
queryset.delete()
return chosen
@ -227,9 +221,7 @@ class AbstractVendor:
rate = rates.get(currency or self.currency)
if not rate:
raise RatesError(
f"No rate found for {currency or self.currency} in {rates} with probider {provider}..."
)
raise RatesError(f"No rate found for {currency or self.currency} in {rates} with probider {provider}...")
return round(price / rate, 2) if rate else round(price, 2)
@ -273,22 +265,16 @@ class AbstractVendor:
return vendor
raise VendorError(f"Vendor {self.vendor_name!r} is inactive...")
except Vendor.DoesNotExist:
raise Exception(
f"No matching vendor found with name {self.vendor_name!r}..."
)
raise Exception(f"No matching vendor found with name {self.vendor_name!r}...")
def get_products(self):
pass
def get_products_queryset(self):
return Product.objects.filter(
stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True
)
return Product.objects.filter(stocks__vendor=self.get_vendor_instance(), orderproduct__isnull=True)
def get_stocks_queryset(self):
return Stock.objects.filter(
product__in=self.get_products_queryset(), product__orderproduct__isnull=True
)
return Stock.objects.filter(product__in=self.get_products_queryset(), product__orderproduct__isnull=True)
def get_attribute_values_queryset(self):
return AttributeValue.objects.filter(
@ -306,9 +292,7 @@ class AbstractVendor:
self.get_stocks_queryset().delete()
self.get_attribute_values_queryset().delete()
def process_attribute(
self, key: str, value, product: Product, attr_group: AttributeGroup
):
def process_attribute(self, key: str, value, product: Product, attr_group: AttributeGroup):
if not value:
return

View file

@ -129,9 +129,7 @@ class WebsiteParametersView(APIView):
]
def get(self, request):
return Response(
data=camelize(get_project_parameters()), status=status.HTTP_200_OK
)
return Response(data=camelize(get_project_parameters()), status=status.HTTP_200_OK)
@extend_schema_view(**CACHE_SCHEMA)
@ -201,9 +199,7 @@ class RequestCursedURLView(APIView):
try:
data = cache.get(url, None)
if not data:
response = requests.get(
url, headers={"content-type": "application/json"}
)
response = requests.get(url, headers={"content-type": "application/json"})
response.raise_for_status()
data = camelize(response.json())
cache.set(url, data, 86400)
@ -233,15 +229,7 @@ class GlobalSearchView(APIView):
]
def get(self, request, *args, **kwargs):
return Response(
camelize(
{
"results": process_query(
query=request.GET.get("q", "").strip(), request=request
)
}
)
)
return Response(camelize({"results": process_query(query=request.GET.get("q", "").strip(), request=request)}))
@extend_schema_view(**BUY_AS_BUSINESS_SCHEMA)
@ -251,22 +239,15 @@ class BuyAsBusinessView(APIView):
serializer = BuyAsBusinessOrderSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
order = Order.objects.create(status="MOMENTAL")
products = [
product.get("product_uuid")
for product in serializer.validated_data.get("products")
]
products = [product.get("product_uuid") for product in serializer.validated_data.get("products")]
transaction = order.buy_without_registration(
products=products,
promocode_uuid=serializer.validated_data.get("promocode_uuid"),
customer_name=serializer.validated_data.get("customer_name"),
customer_email=serializer.validated_data.get("customer_email"),
customer_phone=serializer.validated_data.get("customer_phone"),
customer_billing_address=serializer.validated_data.get(
"customer_billing_address_uuid"
),
customer_shipping_address=serializer.validated_data.get(
"customer_shipping_address_uuid"
),
customer_billing_address=serializer.validated_data.get("customer_billing_address_uuid"),
customer_shipping_address=serializer.validated_data.get("customer_shipping_address_uuid"),
payment_method=serializer.validated_data.get("payment_method"),
is_business=True,
)
@ -287,9 +268,7 @@ def download_digital_asset_view(request, *args, **kwargs):
download.num_downloads += 1
download.save()
file_path = (
download.order_product.product.stocks.first().digital_asset.file.path
)
file_path = download.order_product.product.stocks.first().digital_asset.file.path
content_type, encoding = mimetypes.guess_type(file_path)
if not content_type:

View file

@ -27,7 +27,7 @@ class JSONTableWidget(forms.Widget):
try:
keys = data.getlist(f"{name}_key")
values = data.getlist(f"{name}_value")
for key, value in zip(keys, values):
for key, value in zip(keys, values, strict=True):
if key.strip():
try:
json_data[key] = json.loads(value)

View file

@ -25,9 +25,7 @@ urlpatterns = [
),
path(
r"docs/",
SpectacularAPIView.as_view(
urlconf="evibes.api_urls", custom_settings=SPECTACULAR_PLATFORM_SETTINGS
),
SpectacularAPIView.as_view(urlconf="evibes.api_urls", custom_settings=SPECTACULAR_PLATFORM_SETTINGS),
name="schema-platform",
),
path(

View file

@ -88,10 +88,7 @@ class BlockInvalidHostMiddleware:
allowed_hosts += getenv("ALLOWED_HOSTS").split(" ")
if not hasattr(request, "META"):
return BadRequest("Invalid Request")
if (
request.META.get("HTTP_HOST") not in allowed_hosts
and "*" not in allowed_hosts
):
if request.META.get("HTTP_HOST") not in allowed_hosts and "*" not in allowed_hosts:
return HttpResponseForbidden("Invalid Host Header")
return self.get_response(request)

View file

@ -2,3 +2,5 @@ GRAPH_MODELS = {
"all_applications": True,
"group_models": True,
}
EXTENSIONS_MAX_UNIQUE_QUERY_ATTEMPTS = 500

View file

@ -20,8 +20,9 @@ def balance_email(user_pk: str) -> tuple[bool, str]:
email = EmailMessage(
"eVibes | Successful Order",
render_to_string("balance_deposit_email.html",
{"user": user, "current_year": timezone.now().year, "config": config}),
render_to_string(
"balance_deposit_email.html", {"user": user, "current_year": timezone.now().year, "config": config}
),
to=[user.email],
from_email=f"{config.PROJECT_NAME} <{config.EMAIL_FROM}>",
)

View file

@ -105,7 +105,7 @@ exclude = ["*/migrations/*", "./evibes/settings/drf.py"]
[tool.ruff]
line-length = 120
target-version = "py38"
target-version = "py312"
exclude = ["migrations", "media", "static", "storefront"]
[tool.ruff.lint]

View file

@ -1,6 +1,7 @@
import logging
from collections.abc import Collection
from contextlib import suppress
from typing import Any, Collection, Dict, Optional, Type
from typing import Any
from constance import config
from django.contrib.auth import authenticate
@ -95,15 +96,16 @@ class UserSerializer(ModelSerializer):
Returns a list of serialized ProductSimpleSerializer representations
for the UUIDs in obj.recently_viewed.
"""
# noinspection PyTypeChecker
# noinspection PytypeChecker
return ProductSimpleSerializer(
Product.objects.filter(uuid__in=obj.recently_viewed, is_active=True), many=True
Product.objects.filter(uuid__in=obj.recently_viewed, is_active=True),
many=True,
).data
class TokenObtainSerializer(Serializer):
username_field = User.USERNAME_FIELD
token_class: Optional[Type[Token]] = None
token_class: type[Token] | None = None
default_error_messages = {"no_active_account": _("no active account")}
@ -114,7 +116,7 @@ class TokenObtainSerializer(Serializer):
self.fields[self.username_field] = CharField(write_only=True)
self.fields["password"] = PasswordField()
def validate(self, attrs: Dict[str, Any]) -> Dict[Any, Any]:
def validate(self, attrs: dict[str, Any]) -> dict[Any, Any]:
authenticate_kwargs = {
self.username_field: attrs[self.username_field],
"password": attrs["password"],
@ -140,7 +142,7 @@ class TokenObtainSerializer(Serializer):
class TokenObtainPairSerializer(TokenObtainSerializer):
token_class = RefreshToken
def validate(self, attrs: Dict[str, Any]) -> Dict[str, str]:
def validate(self, attrs: dict[str, Any]) -> dict[str, str]:
data = super().validate(attrs)
logger.debug("Data validated")
@ -165,7 +167,7 @@ class TokenRefreshSerializer(Serializer):
access = CharField(read_only=True)
token_class = RefreshToken
def validate(self, attrs: Dict[str, Any]) -> Dict[str, str]:
def validate(self, attrs: dict[str, Any]) -> dict[str, str]:
refresh = self.token_class(attrs["refresh"])
data = {"access": str(refresh.access_token)}
@ -189,7 +191,7 @@ class TokenRefreshSerializer(Serializer):
class TokenVerifySerializer(Serializer):
token = CharField(write_only=True)
def validate(self, attrs: Dict[str, None]) -> Dict[Any, Any]:
def validate(self, attrs: dict[str, None]) -> dict[Any, Any]:
token = UntypedToken(attrs["token"])
if (

View file

@ -46,9 +46,7 @@ class AuthTests(TestCase):
def test_obtain_token_view(self):
url = reverse("token_obtain_pair")
response = self.api_client.post(
url, {"email": self.user.email, "password": "testpassword"}
)
response = self.api_client.post(url, {"email": self.user.email, "password": "testpassword"})
self.assertEqual(response.status_code, 200)
self.assertIn("access", response.data)
self.assertIn("refresh", response.data)
@ -58,9 +56,7 @@ class AuthTests(TestCase):
refresh_url = reverse("token_refresh")
# Obtain tokens
obtain_response = self.api_client.post(
obtain_url, {"email": self.user.email, "password": "testpassword"}
)
obtain_response = self.api_client.post(obtain_url, {"email": self.user.email, "password": "testpassword"})
refresh_token = obtain_response.data["refresh"]
# Refresh tokens
@ -73,9 +69,7 @@ class AuthTests(TestCase):
verify_url = reverse("token_verify")
# Obtain tokens
obtain_response = self.api_client.post(
obtain_url, {"email": self.user.email, "password": "testpassword"}
)
obtain_response = self.api_client.post(obtain_url, {"email": self.user.email, "password": "testpassword"})
access_token = obtain_response.data["access"]
# Verify token