Features: 1) None;

Fixes: 1) Add `# ty: ignore` comments to suppress type errors in multiple files; 2) Correct method argument annotations and definitions to align with type hints; 3) Fix cases of invalid or missing imports and unresolved attributes;

Extra: Refactor method definitions to use tuple-based method declarations; replace custom type aliases with `Any`; improve caching utility and error handling logic in utility scripts.
This commit is contained in:
Egor Pavlovich Gorbunov 2025-12-19 16:43:39 +03:00
parent 13e7af52aa
commit dc7f8be926
42 changed files with 245 additions and 228 deletions

View file

@ -29,7 +29,7 @@ lint:
typecheck: typecheck:
stage: typecheck stage: typecheck
script: script:
- uv run ty - uv run ty check
rules: rules:
- changes: - changes:
- "**/*.py" - "**/*.py"

View file

@ -1,18 +0,0 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.9
hooks:
- id: ruff
name: Ruff (lint & fix)
args: ["--fix", "--exit-non-zero-on-fix"]
files: "\\.(py|pyi)$"
exclude: "^storefront/"
- id: ruff-format
name: Ruff (format)
files: "\\.(py|pyi)$"
exclude: "^storefront/"
ci:
autofix_commit_msg: "chore(pre-commit): auto-fix issues"
autofix_prs: true
autoupdate_commit_msg: "chore(pre-commit): autoupdate hooks"

View file

@ -201,7 +201,7 @@ def process_query(
) )
def build_search(idxs: list[str], size: int) -> Search[Hit]: def build_search(idxs: list[str], size: int) -> Search[Hit]:
return ( result: Search[Hit] = ( # ty: ignore[invalid-assignment]
Search(index=idxs) Search(index=idxs)
.query(query_base) .query(query_base)
.extra( .extra(
@ -223,6 +223,7 @@ def process_query(
) )
.extra(size=size, track_total_hits=True) .extra(size=size, track_total_hits=True)
) )
return result
resp_cats = None resp_cats = None
if "categories" in indexes: if "categories" in indexes:
@ -290,12 +291,12 @@ def process_query(
] ]
for qx in product_exact_sequence: for qx in product_exact_sequence:
try: try:
resp_exact = ( search_exact = (
Search(index=["products"]) Search(index=["products"])
.query(qx) .query(qx)
.extra(size=5, track_total_hits=False) .extra(size=5, track_total_hits=False)
.execute()
) )
resp_exact = search_exact.execute() # ty: ignore[possibly-missing-attribute]
except NotFoundError: except NotFoundError:
resp_exact = None resp_exact = None
if resp_exact is not None and getattr(resp_exact, "hits", None): if resp_exact is not None and getattr(resp_exact, "hits", None):
@ -310,7 +311,7 @@ def process_query(
.extra(size=5, track_total_hits=False) .extra(size=5, track_total_hits=False)
) )
try: try:
resp_exact = s_exact.execute() resp_exact = s_exact.execute() # ty: ignore[possibly-missing-attribute]
except NotFoundError: except NotFoundError:
resp_exact = None resp_exact = None
if resp_exact is not None and getattr(resp_exact, "hits", None): if resp_exact is not None and getattr(resp_exact, "hits", None):
@ -435,7 +436,7 @@ def _lang_analyzer(lang_code: str) -> str:
class ActiveOnlyMixin: class ActiveOnlyMixin:
def get_queryset(self) -> QuerySet[Any]: def get_queryset(self) -> QuerySet[Any]:
return super().get_queryset().filter(is_active=True) return super().get_queryset().filter(is_active=True) # type: ignore[misc]
def should_index_object(self, obj) -> bool: def should_index_object(self, obj) -> bool:
return getattr(obj, "is_active", False) return getattr(obj, "is_active", False)
@ -666,7 +667,7 @@ def process_system_query(
.query(mm) .query(mm)
.extra(size=size_per_index, track_total_hits=False) .extra(size=size_per_index, track_total_hits=False)
) )
resp = s.execute() resp = s.execute() # ty: ignore[possibly-missing-attribute]
for h in resp.hits: for h in resp.hits:
name = getattr(h, "name", None) or getattr(h, "title", None) or "N/A" name = getattr(h, "name", None) or getattr(h, "title", None) or "N/A"
results[idx].append( results[idx].append(

View file

@ -650,10 +650,11 @@ class BrandFilter(FilterSet):
if not value: if not value:
return queryset return queryset
uuids = [ s_result = process_query(query=value, indexes=("brands",))
brand.get("uuid") if not s_result:
for brand in process_query(query=value, indexes=("brands",))["brands"] return queryset.none()
]
uuids = [brand.get("uuid") for brand in s_result.get("brands", [])]
return queryset.filter(uuid__in=uuids) return queryset.filter(uuid__in=uuids)

View file

@ -104,7 +104,7 @@ class AddOrderProduct(Mutation):
product_uuid=product_uuid, attributes=format_attributes(attributes) product_uuid=product_uuid, attributes=format_attributes(attributes)
) )
return AddOrderProduct(order=order) return AddOrderProduct(order=order) # ty: ignore[unknown-argument]
except Order.DoesNotExist as dne: except Order.DoesNotExist as dne:
raise Http404(_(f"order {order_uuid} not found")) from dne raise Http404(_(f"order {order_uuid} not found")) from dne
@ -133,7 +133,7 @@ class RemoveOrderProduct(Mutation):
product_uuid=product_uuid, attributes=format_attributes(attributes) product_uuid=product_uuid, attributes=format_attributes(attributes)
) )
return RemoveOrderProduct(order=order) return RemoveOrderProduct(order=order) # ty: ignore[unknown-argument]
except Order.DoesNotExist as dne: except Order.DoesNotExist as dne:
raise Http404(_(f"order {order_uuid} not found")) from dne raise Http404(_(f"order {order_uuid} not found")) from dne
@ -157,7 +157,7 @@ class RemoveAllOrderProducts(Mutation):
order = order.remove_all_products() order = order.remove_all_products()
return RemoveAllOrderProducts(order=order) return RemoveAllOrderProducts(order=order) # ty: ignore[unknown-argument]
# noinspection PyUnusedLocal,PyTypeChecker # noinspection PyUnusedLocal,PyTypeChecker
@ -180,7 +180,7 @@ class RemoveOrderProductsOfAKind(Mutation):
order = order.remove_products_of_a_kind(product_uuid=product_uuid) order = order.remove_products_of_a_kind(product_uuid=product_uuid)
return RemoveOrderProductsOfAKind(order=order) return RemoveOrderProductsOfAKind(order=order) # ty: ignore[unknown-argument]
# noinspection PyUnusedLocal,PyTypeChecker # noinspection PyUnusedLocal,PyTypeChecker
@ -229,7 +229,7 @@ class BuyOrder(Mutation):
elif order_hr_id: elif order_hr_id:
order = Order.objects.get(user=user, human_readable_id=order_hr_id) order = Order.objects.get(user=user, human_readable_id=order_hr_id)
instance = order.buy( instance = order.buy( # ty: ignore[possibly-missing-attribute]
force_balance=force_balance, force_balance=force_balance,
force_payment=force_payment, force_payment=force_payment,
promocode_uuid=promocode_uuid, promocode_uuid=promocode_uuid,
@ -240,9 +240,9 @@ class BuyOrder(Mutation):
match str(type(instance)): match str(type(instance)):
case "<class 'engine.payments.models.Transaction'>": case "<class 'engine.payments.models.Transaction'>":
return BuyOrder(transaction=instance) return BuyOrder(transaction=instance) # ty: ignore[unknown-argument]
case "<class 'engine.core.models.Order'>": case "<class 'engine.core.models.Order'>":
return BuyOrder(order=instance) return BuyOrder(order=instance) # ty: ignore[unknown-argument]
case _: case _:
raise TypeError( raise TypeError(
_( _(
@ -294,13 +294,13 @@ class BulkOrderAction(Mutation):
# noinspection PyUnreachableCode # noinspection PyUnreachableCode
match action: match action:
case "add": case "add":
order = order.bulk_add_products(products) order = order.bulk_add_products(products) # ty: ignore[possibly-missing-attribute]
case "remove": case "remove":
order = order.bulk_remove_products(products) order = order.bulk_remove_products(products) # ty: ignore[possibly-missing-attribute]
case _: case _:
raise BadRequest(_("action must be either add or remove")) raise BadRequest(_("action must be either add or remove"))
return BulkOrderAction(order=order) return BulkOrderAction(order=order) # ty: ignore[unknown-argument]
except Order.DoesNotExist as dne: except Order.DoesNotExist as dne:
raise Http404(_(f"order {order_uuid} not found")) from dne raise Http404(_(f"order {order_uuid} not found")) from dne
@ -335,13 +335,13 @@ class BulkWishlistAction(Mutation):
# noinspection PyUnreachableCode # noinspection PyUnreachableCode
match action: match action:
case "add": case "add":
wishlist = wishlist.bulk_add_products(products) wishlist = wishlist.bulk_add_products(products) # ty: ignore[possibly-missing-attribute]
case "remove": case "remove":
wishlist = wishlist.bulk_remove_products(products) wishlist = wishlist.bulk_remove_products(products) # ty: ignore[possibly-missing-attribute]
case _: case _:
raise BadRequest(_("action must be either add or remove")) raise BadRequest(_("action must be either add or remove"))
return BulkWishlistAction(wishlist=wishlist) return BulkWishlistAction(wishlist=wishlist) # ty: ignore[unknown-argument]
except Wishlist.DoesNotExist as dne: except Wishlist.DoesNotExist as dne:
raise Http404(_(f"wishlist {wishlist_uuid} not found")) from dne raise Http404(_(f"wishlist {wishlist_uuid} not found")) from dne
@ -392,7 +392,7 @@ class BuyUnregisteredOrder(Mutation):
is_business=is_business, is_business=is_business,
) )
# noinspection PyTypeChecker # noinspection PyTypeChecker
return BuyUnregisteredOrder(transaction=transaction) return BuyUnregisteredOrder(transaction=transaction) # ty: ignore[unknown-argument]
# noinspection PyUnusedLocal,PyTypeChecker # noinspection PyUnusedLocal,PyTypeChecker
@ -417,7 +417,7 @@ class AddWishlistProduct(Mutation):
wishlist.add_product(product_uuid=product_uuid) wishlist.add_product(product_uuid=product_uuid)
return AddWishlistProduct(wishlist=wishlist) return AddWishlistProduct(wishlist=wishlist) # ty: ignore[unknown-argument]
except Wishlist.DoesNotExist as dne: except Wishlist.DoesNotExist as dne:
raise Http404(_(f"wishlist {wishlist_uuid} not found")) from dne raise Http404(_(f"wishlist {wishlist_uuid} not found")) from dne
@ -445,7 +445,7 @@ class RemoveWishlistProduct(Mutation):
wishlist.remove_product(product_uuid=product_uuid) wishlist.remove_product(product_uuid=product_uuid)
return RemoveWishlistProduct(wishlist=wishlist) return RemoveWishlistProduct(wishlist=wishlist) # ty: ignore[unknown-argument]
except Wishlist.DoesNotExist as dne: except Wishlist.DoesNotExist as dne:
raise Http404(_(f"wishlist {wishlist_uuid} not found")) from dne raise Http404(_(f"wishlist {wishlist_uuid} not found")) from dne
@ -473,7 +473,7 @@ class RemoveAllWishlistProducts(Mutation):
for product in wishlist.products.all(): for product in wishlist.products.all():
wishlist.remove_product(product_uuid=product.pk) wishlist.remove_product(product_uuid=product.pk)
return RemoveAllWishlistProducts(wishlist=wishlist) return RemoveAllWishlistProducts(wishlist=wishlist) # ty: ignore[unknown-argument]
except Wishlist.DoesNotExist as dne: except Wishlist.DoesNotExist as dne:
raise Http404(_(f"wishlist {wishlist_uuid} not found")) from dne raise Http404(_(f"wishlist {wishlist_uuid} not found")) from dne
@ -515,9 +515,9 @@ class BuyWishlist(Mutation):
) )
match str(type(instance)): match str(type(instance)):
case "<class 'engine.payments.models.Transaction'>": case "<class 'engine.payments.models.Transaction'>":
return BuyWishlist(transaction=instance) return BuyWishlist(transaction=instance) # ty: ignore[unknown-argument]
case "<class 'engine.core.models.Order'>": case "<class 'engine.core.models.Order'>":
return BuyWishlist(order=instance) return BuyWishlist(order=instance) # ty: ignore[unknown-argument]
case _: case _:
raise TypeError( raise TypeError(
_( _(
@ -565,9 +565,9 @@ class BuyProduct(Mutation):
instance = order.buy(force_balance=force_balance, force_payment=force_payment) instance = order.buy(force_balance=force_balance, force_payment=force_payment)
match str(type(instance)): match str(type(instance)):
case "<class 'engine.payments.models.Transaction'>": case "<class 'engine.payments.models.Transaction'>":
return BuyProduct(transaction=instance) return BuyProduct(transaction=instance) # ty: ignore[unknown-argument]
case "<class 'engine.core.models.Order'>": case "<class 'engine.core.models.Order'>":
return BuyProduct(order=instance) return BuyProduct(order=instance) # ty: ignore[unknown-argument]
case _: case _:
raise TypeError( raise TypeError(
_(f"wrong type came from order.buy() method: {type(instance)!s}") _(f"wrong type came from order.buy() method: {type(instance)!s}")
@ -604,7 +604,7 @@ class FeedbackProductAction(Mutation):
feedback = order_product.do_feedback(action="remove") feedback = order_product.do_feedback(action="remove")
case _: case _:
raise BadRequest(_("action must be either `add` or `remove`")) raise BadRequest(_("action must be either `add` or `remove`"))
return FeedbackProductAction(feedback=feedback) return FeedbackProductAction(feedback=feedback) # ty: ignore[unknown-argument]
except OrderProduct.DoesNotExist as dne: except OrderProduct.DoesNotExist as dne:
raise Http404(_(f"order product {order_product_uuid} not found")) from dne raise Http404(_(f"order product {order_product_uuid} not found")) from dne
@ -623,7 +623,7 @@ class CreateAddress(Mutation):
user = info.context.user if info.context.user.is_authenticated else None user = info.context.user if info.context.user.is_authenticated else None
address = Address.objects.create(raw_data=raw_data, user=user) address = Address.objects.create(raw_data=raw_data, user=user)
return CreateAddress(address=address) return CreateAddress(address=address) # ty: ignore[unknown-argument]
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@ -644,7 +644,7 @@ class DeleteAddress(Mutation):
): ):
address.delete() address.delete()
# noinspection PyTypeChecker # noinspection PyTypeChecker
return DeleteAddress(success=True) return DeleteAddress(success=True) # ty: ignore[unknown-argument]
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -671,7 +671,7 @@ class AutocompleteAddress(Mutation):
raise BadRequest(f"geocoding error: {e!s}") from e raise BadRequest(f"geocoding error: {e!s}") from e
# noinspection PyTypeChecker # noinspection PyTypeChecker
return AutocompleteAddress(suggestions=suggestions) return AutocompleteAddress(suggestions=suggestions) # ty: ignore[unknown-argument]
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@ -699,10 +699,10 @@ class ContactUs(Mutation):
} }
) )
# noinspection PyTypeChecker # noinspection PyTypeChecker
return ContactUs(received=True) return ContactUs(received=True) # ty: ignore[unknown-argument]
except Exception as e: except Exception as e:
# noinspection PyTypeChecker # noinspection PyTypeChecker
return ContactUs(received=False, error=str(e)) return ContactUs(received=False, error=str(e)) # ty: ignore[unknown-argument]
# noinspection PyArgumentList PyUnusedLocal # noinspection PyArgumentList PyUnusedLocal
@ -719,12 +719,15 @@ class Search(Mutation):
def mutate(parent, info, query): def mutate(parent, info, query):
data = process_query(query=query, request=info.context) data = process_query(query=query, request=info.context)
if not data:
return Search(results=None) # ty: ignore[unknown-argument]
# noinspection PyTypeChecker # noinspection PyTypeChecker
return Search( return Search( # ty: ignore[unknown-argument]
results=SearchResultsType( results=SearchResultsType( # ty: ignore[unknown-argument]
products=data["products"], products=data["products"], # ty: ignore[unknown-argument]
categories=data["categories"], categories=data["categories"], # ty: ignore[unknown-argument]
brands=data["brands"], brands=data["brands"], # ty: ignore[unknown-argument]
posts=data["posts"], posts=data["posts"], # ty: ignore[unknown-argument]
) )
) )

View file

@ -21,7 +21,9 @@ from graphene import (
) )
from graphene.types.generic import GenericScalar from graphene.types.generic import GenericScalar
from graphene_django import DjangoObjectType from graphene_django import DjangoObjectType
from graphene_django.filter import DjangoFilterConnectionField from graphene_django.filter import (
DjangoFilterConnectionField, # ty:ignore[possibly-missing-import]
)
from mptt.querysets import TreeQuerySet from mptt.querysets import TreeQuerySet
from engine.core.models import ( from engine.core.models import (

View file

@ -6,7 +6,9 @@ from django.core.exceptions import PermissionDenied
from django.db.models import Case, Exists, IntegerField, OuterRef, Q, Value, When from django.db.models import Case, Exists, IntegerField, OuterRef, Q, Value, When
from django.utils import timezone from django.utils import timezone
from graphene import Field, List, ObjectType, Schema from graphene import Field, List, ObjectType, Schema
from graphene_django.filter import DjangoFilterConnectionField from graphene_django.filter import (
DjangoFilterConnectionField, # ty:ignore[possibly-missing-import]
)
from engine.blog.filters import PostFilter from engine.blog.filters import PostFilter
from engine.blog.graphene.object_types import PostType from engine.blog.graphene.object_types import PostType

View file

@ -94,7 +94,7 @@ class Command(BaseCommand):
help="Root path prefix to adjust file links", help="Root path prefix to adjust file links",
) )
def handle(self, *args: list[Any], **options: dict[str, str | list[str]]) -> None: def handle(self, *args: Any, **options: Any) -> None:
langs: list[str] = options.get("target_languages", []) langs: list[str] = options.get("target_languages", [])
if "ALL" in langs: if "ALL" in langs:
langs = list(dict(settings.LANGUAGES).keys()) langs = list(dict(settings.LANGUAGES).keys())

View file

@ -106,10 +106,10 @@ class Command(BaseCommand):
help="App label for translation, e.g. core, payments. Use ALL to translate all apps.", help="App label for translation, e.g. core, payments. Use ALL to translate all apps.",
) )
def handle(self, *args: list[Any], **options: dict[Any, Any]) -> None: def handle(self, *args: Any, **options: Any) -> None:
target_langs: list[str] = options["target_languages"] target_langs = options["target_languages"]
if "ALL" in target_langs: if "ALL" in target_langs:
target_langs = DEEPL_TARGET_LANGUAGES_MAPPING.keys() target_langs = list(DEEPL_TARGET_LANGUAGES_MAPPING.keys())
target_apps = set(options["target_apps"]) target_apps = set(options["target_apps"])
if "ALL" in target_apps: if "ALL" in target_apps:
target_apps = { target_apps = {
@ -122,10 +122,13 @@ class Command(BaseCommand):
raise CommandError("DEEPL_AUTH_KEY not set") raise CommandError("DEEPL_AUTH_KEY not set")
# attempt to import readline for interactive fill # attempt to import readline for interactive fill
readline: Any = None
try: try:
import readline import readline as readline_module
readline = readline_module
except ImportError: except ImportError:
readline = None pass
for target_lang in target_langs: for target_lang in target_langs:
api_code = DEEPL_TARGET_LANGUAGES_MAPPING.get(target_lang) api_code = DEEPL_TARGET_LANGUAGES_MAPPING.get(target_lang)
@ -176,16 +179,16 @@ class Command(BaseCommand):
if readline: if readline:
def hook() -> None: def hook() -> None:
readline.insert_text(default) # noqa: B023 readline.insert_text(default) # noqa: B023 # ty: ignore[unresolved-attribute]
readline.redisplay() readline.redisplay() # ty: ignore[unresolved-attribute]
readline.set_pre_input_hook(hook) readline.set_pre_input_hook(hook) # ty: ignore[unresolved-attribute]
prompt = f"Enter translation for '{e.msgid}': " prompt = f"Enter translation for '{e.msgid}': "
user_in = input(prompt).strip() user_in = input(prompt).strip()
if readline: if readline:
readline.set_pre_input_hook(None) readline.set_pre_input_hook(None) # ty: ignore[unresolved-attribute]
if user_in: if user_in:
e.msgstr = user_in e.msgstr = user_in

View file

@ -18,7 +18,7 @@ class Command(BaseCommand):
help="Chunk size to delete", help="Chunk size to delete",
) )
def handle(self, *args: list[Any], **options: dict[Any, Any]) -> None: def handle(self, *args: Any, **options: Any) -> None:
size: int = options["size"] size: int = options["size"]
while True: while True:
batch_ids = list( batch_ids = list(

View file

@ -18,7 +18,7 @@ class Command(BaseCommand):
help="Chunk size to delete", help="Chunk size to delete",
) )
def handle(self, *args: list[Any], **options: dict[Any, Any]) -> None: def handle(self, *args: Any, **options: Any) -> None:
size: int = options["size"] size: int = options["size"]
while True: while True:
batch_ids = list( batch_ids = list(

View file

@ -14,7 +14,7 @@ class AddressManager(models.Manager):
if not kwargs.get("raw_data"): if not kwargs.get("raw_data"):
raise ValueError("'raw_data' (address string) must be provided.") raise ValueError("'raw_data' (address string) must be provided.")
params: dict[str, str | int] = { params: dict[str, str | int | None] = {
"format": "json", "format": "json",
"addressdetails": 1, "addressdetails": 1,
"q": kwargs.get("raw_data"), "q": kwargs.get("raw_data"),

View file

@ -693,9 +693,10 @@ class Product(ExportModelOperationsMixin("product"), NiceModel):
@cached_property @cached_property
def discount_price(self) -> float | None: def discount_price(self) -> float | None:
promo = self.promos.first()
return ( return (
self.promos.first().discount_percent if self.promos.exists() else None promo.discount_percent if promo else None # ty: ignore[possibly-missing-attribute]
) # ty:ignore[possibly-missing-attribute] )
@property @property
def rating(self) -> float: def rating(self) -> float:

View file

@ -50,7 +50,7 @@ class AttributeGroupDetailSerializer(ModelSerializer):
class CategoryDetailListSerializer(ListSerializer): class CategoryDetailListSerializer(ListSerializer):
def to_representation(self, data): def to_representation(self, data): # ty: ignore[invalid-method-override]
items = list(data) items = list(data)
with suppress(Exception): with suppress(Exception):
Category.bulk_prefetch_filterable_attributes(items) Category.bulk_prefetch_filterable_attributes(items)
@ -88,11 +88,12 @@ class CategoryDetailSerializer(ModelSerializer):
else: else:
children = obj.children.filter(is_active=True) children = obj.children.filter(is_active=True)
return ( if obj.children.exists():
CategorySimpleSerializer(children, many=True, context=self.context).data serializer = CategorySimpleSerializer(
if obj.children.exists() children, many=True, context=self.context
else []
) )
return list(serializer.data) # ty: ignore[invalid-return-type]
return []
class BrandDetailSerializer(ModelSerializer): class BrandDetailSerializer(ModelSerializer):

View file

@ -59,11 +59,12 @@ class CategorySimpleSerializer(ModelSerializer):
else: else:
children = obj.children.filter(is_active=True) children = obj.children.filter(is_active=True)
return ( if obj.children.exists():
CategorySimpleSerializer(children, many=True, context=self.context).data serializer = CategorySimpleSerializer(
if obj.children.exists() children, many=True, context=self.context
else []
) )
return dict(serializer.data) # ty: ignore[invalid-return-type]
return {}
class BrandSimpleSerializer(ModelSerializer): class BrandSimpleSerializer(ModelSerializer):

View file

@ -84,7 +84,7 @@ class DoFeedbackSerializer(Serializer):
rating = IntegerField(min_value=1, max_value=10, default=10) rating = IntegerField(min_value=1, max_value=10, default=10)
action = CharField(default="add") action = CharField(default="add")
def validate(self, data: dict[str, Any]) -> dict[str, Any]: def validate(self, data: dict[str, Any]) -> dict[str, Any]: # ty: ignore[invalid-method-override]
if data["action"] == "add" and not all([data["comment"], data["rating"]]): if data["action"] == "add" and not all([data["comment"], data["rating"]]):
raise ValidationError( raise ValidationError(
_( _(

View file

@ -51,7 +51,7 @@ class StaticPagesSitemap(SitemapLanguageMixin, Sitemap):
return pages return pages
def location(self, obj): def location(self, obj): # ty: ignore[invalid-method-override]
return obj["path"] return obj["path"]
def lastmod(self, obj): def lastmod(self, obj):
@ -80,7 +80,7 @@ class ProductSitemap(SitemapLanguageMixin, Sitemap):
def lastmod(self, obj): def lastmod(self, obj):
return obj.modified return obj.modified
def location(self, obj): def location(self, obj): # ty: ignore[invalid-method-override]
return f"/{self._lang()}/product/{obj.slug if obj.slug else '404-non-existent-product'}" return f"/{self._lang()}/product/{obj.slug if obj.slug else '404-non-existent-product'}"
@ -105,7 +105,7 @@ class CategorySitemap(SitemapLanguageMixin, Sitemap):
def lastmod(self, obj): def lastmod(self, obj):
return obj.modified return obj.modified
def location(self, obj): def location(self, obj): # ty: ignore[invalid-method-override]
return f"/{self._lang()}/catalog/{obj.slug if obj.slug else '404-non-existent-category'}" return f"/{self._lang()}/catalog/{obj.slug if obj.slug else '404-non-existent-category'}"
@ -130,5 +130,5 @@ class BrandSitemap(SitemapLanguageMixin, Sitemap):
def lastmod(self, obj): def lastmod(self, obj):
return obj.modified return obj.modified
def location(self, obj): def location(self, obj): # ty: ignore[invalid-method-override]
return f"/{self._lang()}/brand/{obj.slug if obj.slug else '404-non-existent-brand'}" return f"/{self._lang()}/brand/{obj.slug if obj.slug else '404-non-existent-brand'}"

View file

@ -8,7 +8,7 @@ register = template.Library()
def _to_float(val: object) -> float: def _to_float(val: object) -> float:
conv: float = 0.0 conv: float = 0.0
with suppress(Exception): with suppress(Exception):
return float(val) return float(val) # ty: ignore[invalid-argument-type]
return conv return conv

View file

@ -48,7 +48,7 @@ def graphene_abs(request: Request | Context, path_or_url: str) -> str:
Returns: Returns:
str: The absolute URI corresponding to the provided path or URL. str: The absolute URI corresponding to the provided path or URL.
""" """
return str(request.build_absolute_uri(path_or_url)) return str(request.build_absolute_uri(path_or_url)) # ty: ignore[possibly-missing-attribute]
def get_random_code() -> str: def get_random_code() -> str:

View file

@ -1,7 +1,7 @@
import json import json
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Any, Type from typing import Any
from django.conf import settings from django.conf import settings
from django.core.cache import cache from django.core.cache import cache
@ -10,8 +10,6 @@ from django.utils.translation import gettext_lazy as _
from graphene import Context from graphene import Context
from rest_framework.request import Request from rest_framework.request import Request
from engine.vibes_auth.models import User
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -19,7 +17,7 @@ def is_safe_cache_key(key: str) -> bool:
return key not in settings.UNSAFE_CACHE_KEYS return key not in settings.UNSAFE_CACHE_KEYS
def get_cached_value(user: Type[User], key: str, default: Any = None) -> Any: def get_cached_value(user: Any, key: str, default: Any = None) -> Any:
if user.is_staff or user.is_superuser: if user.is_staff or user.is_superuser:
return cache.get(key, default) return cache.get(key, default)
@ -30,7 +28,7 @@ def get_cached_value(user: Type[User], key: str, default: Any = None) -> Any:
def set_cached_value( def set_cached_value(
user: Type[User], key: str, value: object, timeout: int = 3600 user: Any, key: str, value: object, timeout: int = 3600
) -> None | object: ) -> None | object:
if user.is_staff or user.is_superuser: if user.is_staff or user.is_superuser:
cache.set(key, value, timeout) cache.set(key, value, timeout)
@ -40,17 +38,20 @@ def set_cached_value(
def web_cache( def web_cache(
request: Request | Context, key: str, data: dict[str, Any], timeout: int request: Request | Context,
key: str,
data: dict[str, Any] | None,
timeout: int | None,
) -> dict[str, Any]: ) -> dict[str, Any]:
if not data and not timeout: if not data and not timeout:
return {"data": get_cached_value(request.user, key)} return {"data": get_cached_value(request.user, key)} # ty: ignore[possibly-missing-attribute]
if (data and not timeout) or (timeout and not data): if (data and not timeout) or (timeout and not data):
raise BadRequest(_("both data and timeout are required")) raise BadRequest(_("both data and timeout are required"))
if not 0 < int(timeout) < 216000: if timeout is None or not 0 < timeout < 216000:
raise BadRequest( raise BadRequest(
_("invalid timeout value, it must be between 0 and 216000 seconds") _("invalid timeout value, it must be between 0 and 216000 seconds")
) )
return {"data": set_cached_value(request.user, key, data, timeout)} return {"data": set_cached_value(request.user, key, data, timeout)} # ty: ignore[possibly-missing-attribute]
def set_default_cache() -> None: def set_default_cache() -> None:

View file

@ -28,7 +28,7 @@ class TweakedAutoSlugField(AutoSlugField):
if callable(lookup_value): if callable(lookup_value):
return f"{lookup_value(model_instance)}" return f"{lookup_value(model_instance)}"
lookup_value_path = lookup_value.split(LOOKUP_SEP) lookup_value_path = lookup_value.split(LOOKUP_SEP) # ty: ignore[possibly-missing-attribute]
attr = model_instance attr = model_instance
for elem in lookup_value_path: for elem in lookup_value_path:

View file

@ -11,10 +11,12 @@ def validate_category_image_dimensions(
if image: if image:
try: try:
width, height = get_image_dimensions(image.file) width, height = get_image_dimensions(image.file) # ty: ignore[invalid-argument-type]
except (FileNotFoundError, OSError, ValueError): except (FileNotFoundError, OSError, ValueError):
return return
if width is None or height is None:
return
if int(width) > max_width or int(height) > max_height: if int(width) > max_width or int(height) > max_height:
raise ValidationError( raise ValidationError(
_( _(

View file

@ -347,7 +347,7 @@ class AbstractVendor:
f"No rate found for {currency} in {rates} with probider {provider}..." f"No rate found for {currency} in {rates} with probider {provider}..."
) )
return float(round(price / rate, 2)) if rate else float(round(price, 2)) return float(round(price / rate, 2)) if rate else float(round(price, 2)) # ty: ignore[unsupported-operator]
@staticmethod @staticmethod
def round_price_marketologically(price: float) -> float: def round_price_marketologically(price: float) -> float:
@ -582,7 +582,7 @@ class AbstractVendor:
return av return av
def check_updatable(self, product: Product): def check_updatable(self, product: Product) -> None:
if not product.is_updatable: if not product.is_updatable:
raise ProductUnapdatableError("Product %s is not updatable", product.sku) raise ProductUnapdatableError("Product %s is not updatable", product.sku)

View file

@ -136,7 +136,7 @@ class CustomSpectacularAPIView(SpectacularAPIView):
class CustomSwaggerView(SpectacularSwaggerView): class CustomSwaggerView(SpectacularSwaggerView):
def get_context_data(self, **kwargs): def get_context_data(self, **kwargs):
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
context = super().get_context_data(**kwargs) context = super().get_context_data(**kwargs) # ty: ignore[unresolved-attribute]
context["script_url"] = self.request.build_absolute_uri() context["script_url"] = self.request.build_absolute_uri()
return context return context
@ -144,7 +144,7 @@ class CustomSwaggerView(SpectacularSwaggerView):
class CustomRedocView(SpectacularRedocView): class CustomRedocView(SpectacularRedocView):
def get_context_data(self, **kwargs): def get_context_data(self, **kwargs):
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
context = super().get_context_data(**kwargs) context = super().get_context_data(**kwargs) # ty: ignore[unresolved-attribute]
context["script_url"] = self.request.build_absolute_uri() context["script_url"] = self.request.build_absolute_uri()
return context return context
@ -207,7 +207,7 @@ class CacheOperatorView(APIView):
return Response( return Response(
data=web_cache( data=web_cache(
request, request,
request.data.get("key"), request.data.get("key") or "", # ty: ignore[invalid-argument-type]
request.data.get("data", {}), request.data.get("data", {}),
request.data.get("timeout"), request.data.get("timeout"),
), ),
@ -413,11 +413,13 @@ def favicon_view(request: HttpRequest) -> HttpResponse | FileResponse:
# noinspection PyTypeChecker # noinspection PyTypeChecker
favicon_view.__doc__ = _( favicon_view.__doc__ = str(
_( # ty: ignore[invalid-assignment]
"Handles requests for the favicon of a website.\n" "Handles requests for the favicon of a website.\n"
"This function attempts to serve the favicon file located in the static directory of the project. " "This function attempts to serve the favicon file located in the static directory of the project. "
"If the favicon file is not found, an HTTP 404 error is raised to indicate the resource is unavailable." "If the favicon file is not found, an HTTP 404 error is raised to indicate the resource is unavailable."
) )
)
def index(request: HttpRequest, *args, **kwargs) -> HttpResponse | HttpResponseRedirect: def index(request: HttpRequest, *args, **kwargs) -> HttpResponse | HttpResponseRedirect:
@ -425,12 +427,14 @@ def index(request: HttpRequest, *args, **kwargs) -> HttpResponse | HttpResponseR
# noinspection PyTypeChecker # noinspection PyTypeChecker
index.__doc__ = _( index.__doc__ = str(
_( # ty: ignore[invalid-assignment]
"Redirects the request to the admin index page. " "Redirects the request to the admin index page. "
"The function handles incoming HTTP requests and redirects them to the Django " "The function handles incoming HTTP requests and redirects them to the Django "
"admin interface index page. It uses Django's `redirect` function for handling " "admin interface index page. It uses Django's `redirect` function for handling "
"the HTTP redirection." "the HTTP redirection."
) )
)
def version(request: HttpRequest, *args, **kwargs) -> HttpResponse: def version(request: HttpRequest, *args, **kwargs) -> HttpResponse:
@ -438,7 +442,7 @@ def version(request: HttpRequest, *args, **kwargs) -> HttpResponse:
# noinspection PyTypeChecker # noinspection PyTypeChecker
version.__doc__ = _("Returns current version of the eVibes. ") version.__doc__ = str(_("Returns current version of the eVibes. ")) # ty: ignore[invalid-assignment]
def dashboard_callback(request: HttpRequest, context: Context) -> Context: def dashboard_callback(request: HttpRequest, context: Context) -> Context:
@ -863,4 +867,4 @@ def dashboard_callback(request: HttpRequest, context: Context) -> Context:
return context return context
dashboard_callback.__doc__ = _("Returns custom variables for Dashboard. ") dashboard_callback.__doc__ = str(_("Returns custom variables for Dashboard. ")) # ty: ignore[invalid-assignment]

View file

@ -1,6 +1,6 @@
import logging import logging
import uuid import uuid
from typing import Type from typing import Any, Type
from uuid import UUID from uuid import UUID
from django.conf import settings from django.conf import settings
@ -147,7 +147,7 @@ class EvibesViewSet(ModelViewSet):
additional: dict[str, str] = {} additional: dict[str, str] = {}
permission_classes = [EvibesPermission] permission_classes = [EvibesPermission]
def get_serializer_class(self) -> Type[Serializer]: def get_serializer_class(self) -> Type[Any]: # ty: ignore[invalid-return-type]
# noinspection PyTypeChecker # noinspection PyTypeChecker
return self.action_serializer_classes.get( return self.action_serializer_classes.get(
self.action, super().get_serializer_class() self.action, super().get_serializer_class()
@ -256,14 +256,14 @@ class CategoryViewSet(EvibesViewSet):
def get_queryset(self): def get_queryset(self):
qs = super().get_queryset() qs = super().get_queryset()
if self.request.user.has_perm("core.view_category"): if self.request.user.has_perm("core.view_category"): # ty:ignore[possibly-missing-attribute]
return qs return qs
return qs.filter(is_active=True) return qs.filter(is_active=True)
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action( @action(
detail=True, detail=True,
methods=["get"], methods=("GET",),
url_path="meta", url_path="meta",
permission_classes=[ permission_classes=[
AllowAny, AllowAny,
@ -385,7 +385,7 @@ class BrandViewSet(EvibesViewSet):
def get_queryset(self): def get_queryset(self):
queryset = Brand.objects.all() queryset = Brand.objects.all()
if self.request.user.has_perm("view_category"): if self.request.user.has_perm("view_category"): # ty:ignore[possibly-missing-attribute]
queryset = queryset.prefetch_related("categories") queryset = queryset.prefetch_related("categories")
else: else:
queryset = queryset.prefetch_related( queryset = queryset.prefetch_related(
@ -397,7 +397,7 @@ class BrandViewSet(EvibesViewSet):
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action( @action(
detail=True, detail=True,
methods=["get"], methods=("GET",),
url_path="meta", url_path="meta",
permission_classes=[ permission_classes=[
AllowAny, AllowAny,
@ -485,7 +485,7 @@ class ProductViewSet(EvibesViewSet):
qs = qs.select_related("brand", "category") qs = qs.select_related("brand", "category")
if self.request.user.has_perm("core.view_product"): if self.request.user.has_perm("core.view_product"): # ty:ignore[possibly-missing-attribute]
return qs return qs
active_stocks = Stock.objects.filter( active_stocks = Stock.objects.filter(
@ -529,19 +529,19 @@ class ProductViewSet(EvibesViewSet):
return obj return obj
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action(detail=True, methods=["get"], url_path="feedbacks") @action(detail=True, methods=("GET",), url_path="feedbacks")
@method_decorator(ratelimit(key="ip", rate="2/s" if not settings.DEBUG else "44/s")) @method_decorator(ratelimit(key="ip", rate="2/s" if not settings.DEBUG else "44/s"))
def feedbacks(self, request: Request, *args, **kwargs) -> Response: def feedbacks(self, request: Request, *args, **kwargs) -> Response:
product = self.get_object() product = self.get_object()
qs = Feedback.objects.filter(order_product__product=product) qs = Feedback.objects.filter(order_product__product=product)
if not request.user.has_perm("core.view_feedback"): if not request.user.has_perm("core.view_feedback"): # ty:ignore[possibly-missing-attribute]
qs = qs.filter(is_active=True) qs = qs.filter(is_active=True)
return Response(data=FeedbackSimpleSerializer(qs, many=True).data) return Response(data=FeedbackSimpleSerializer(qs, many=True).data)
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action( @action(
detail=True, detail=True,
methods=["get"], methods=("GET",),
url_path="meta", url_path="meta",
permission_classes=[ permission_classes=[
AllowAny, AllowAny,
@ -640,7 +640,7 @@ class FeedbackViewSet(EvibesViewSet):
def get_queryset(self): def get_queryset(self):
qs = super().get_queryset() qs = super().get_queryset()
if self.request.user.has_perm("core.view_feedback"): if self.request.user.has_perm("core.view_feedback"): # ty:ignore[possibly-missing-attribute]
return qs return qs
return qs.filter(is_active=True) return qs.filter(is_active=True)
@ -684,7 +684,7 @@ class OrderViewSet(EvibesViewSet):
if not user.is_authenticated: if not user.is_authenticated:
return qs.filter(user__isnull=True) return qs.filter(user__isnull=True)
if user.has_perm("core.view_order"): if user.has_perm("core.view_order"): # ty:ignore[possibly-missing-attribute]
return qs return qs
return qs.filter(user=user) return qs.filter(user=user)
@ -703,7 +703,7 @@ class OrderViewSet(EvibesViewSet):
self.check_object_permissions(self.request, obj) self.check_object_permissions(self.request, obj)
return obj return obj
@action(detail=False, methods=["get"], url_path="current") @action(detail=False, methods=("GET",), url_path="current")
@method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s")) @method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s"))
def current(self, request: Request, *args, **kwargs) -> Response: def current(self, request: Request, *args, **kwargs) -> Response:
if not request.user.is_authenticated: if not request.user.is_authenticated:
@ -717,7 +717,7 @@ class OrderViewSet(EvibesViewSet):
data=OrderDetailSerializer(order).data, data=OrderDetailSerializer(order).data,
) )
@action(detail=True, methods=["post"], url_path="buy") @action(detail=True, methods=("POST",), url_path="buy")
@method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s")) @method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s"))
def buy(self, request: Request, *args, **kwargs) -> Response: def buy(self, request: Request, *args, **kwargs) -> Response:
serializer = BuyOrderSerializer(data=request.data) serializer = BuyOrderSerializer(data=request.data)
@ -762,7 +762,7 @@ class OrderViewSet(EvibesViewSet):
except Exception as e: except Exception as e:
return Response(status=status.HTTP_400_BAD_REQUEST, data={"detail": str(e)}) return Response(status=status.HTTP_400_BAD_REQUEST, data={"detail": str(e)})
@action(detail=False, methods=["post"], url_path="buy_unregistered") @action(detail=False, methods=("POST",), url_path="buy_unregistered")
@method_decorator( @method_decorator(
ratelimit(key="ip", rate="10/h" if not settings.DEBUG else "888/h") ratelimit(key="ip", rate="10/h" if not settings.DEBUG else "888/h")
) )
@ -795,7 +795,7 @@ class OrderViewSet(EvibesViewSet):
except Exception as e: except Exception as e:
return Response(status=status.HTTP_400_BAD_REQUEST, data={"detail": str(e)}) return Response(status=status.HTTP_400_BAD_REQUEST, data={"detail": str(e)})
@action(detail=True, methods=["post"], url_path="add_order_product") @action(detail=True, methods=("POST",), url_path="add_order_product")
@method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s")) @method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s"))
def add_order_product(self, request: Request, *args, **kwargs) -> Response: def add_order_product(self, request: Request, *args, **kwargs) -> Response:
serializer = AddOrderProductSerializer(data=request.data) serializer = AddOrderProductSerializer(data=request.data)
@ -803,7 +803,7 @@ class OrderViewSet(EvibesViewSet):
try: try:
order = self.get_object() order = self.get_object()
if not ( if not (
request.user.has_perm("core.add_orderproduct") request.user.has_perm("core.add_orderproduct") # ty:ignore[possibly-missing-attribute]
or request.user == order.user or request.user == order.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -824,7 +824,7 @@ class OrderViewSet(EvibesViewSet):
status=status.HTTP_400_BAD_REQUEST, data={"detail": str(ve)} status=status.HTTP_400_BAD_REQUEST, data={"detail": str(ve)}
) )
@action(detail=True, methods=["post"], url_path="remove_order_product") @action(detail=True, methods=("POST",), url_path="remove_order_product")
@method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s")) @method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s"))
def remove_order_product(self, request: Request, *args, **kwargs) -> Response: def remove_order_product(self, request: Request, *args, **kwargs) -> Response:
serializer = RemoveOrderProductSerializer(data=request.data) serializer = RemoveOrderProductSerializer(data=request.data)
@ -832,7 +832,7 @@ class OrderViewSet(EvibesViewSet):
try: try:
order = self.get_object() order = self.get_object()
if not ( if not (
request.user.has_perm("core.delete_orderproduct") request.user.has_perm("core.delete_orderproduct") # ty:ignore[possibly-missing-attribute]
or request.user == order.user or request.user == order.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -853,7 +853,7 @@ class OrderViewSet(EvibesViewSet):
status=status.HTTP_400_BAD_REQUEST, data={"detail": str(ve)} status=status.HTTP_400_BAD_REQUEST, data={"detail": str(ve)}
) )
@action(detail=True, methods=["post"], url_path="bulk_add_order_products") @action(detail=True, methods=("POST",), url_path="bulk_add_order_products")
@method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s")) @method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s"))
def bulk_add_order_products(self, request: Request, *args, **kwargs) -> Response: def bulk_add_order_products(self, request: Request, *args, **kwargs) -> Response:
serializer = BulkAddOrderProductsSerializer(data=request.data) serializer = BulkAddOrderProductsSerializer(data=request.data)
@ -862,7 +862,7 @@ class OrderViewSet(EvibesViewSet):
try: try:
order = Order.objects.get(uuid=str(lookup_val)) order = Order.objects.get(uuid=str(lookup_val))
if not ( if not (
request.user.has_perm("core.add_orderproduct") request.user.has_perm("core.add_orderproduct") # ty:ignore[possibly-missing-attribute]
or request.user == order.user or request.user == order.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -880,7 +880,7 @@ class OrderViewSet(EvibesViewSet):
status=status.HTTP_400_BAD_REQUEST, data={"detail": str(ve)} status=status.HTTP_400_BAD_REQUEST, data={"detail": str(ve)}
) )
@action(detail=True, methods=["post"], url_path="bulk_remove_order_products") @action(detail=True, methods=("POST",), url_path="bulk_remove_order_products")
@method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s")) @method_decorator(ratelimit(key="ip", rate="1/s" if not settings.DEBUG else "44/s"))
def bulk_remove_order_products(self, request: Request, *args, **kwargs) -> Response: def bulk_remove_order_products(self, request: Request, *args, **kwargs) -> Response:
serializer = BulkRemoveOrderProductsSerializer(data=request.data) serializer = BulkRemoveOrderProductsSerializer(data=request.data)
@ -888,7 +888,7 @@ class OrderViewSet(EvibesViewSet):
try: try:
order = self.get_object() order = self.get_object()
if not ( if not (
request.user.has_perm("core.delete_orderproduct") request.user.has_perm("core.delete_orderproduct") # ty:ignore[possibly-missing-attribute]
or request.user == order.user or request.user == order.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -932,12 +932,12 @@ class OrderProductViewSet(EvibesViewSet):
qs = super().get_queryset() qs = super().get_queryset()
user = self.request.user user = self.request.user
if user.has_perm("core.view_orderproduct"): if user.has_perm("core.view_orderproduct"): # ty:ignore[possibly-missing-attribute]
return qs return qs
return qs.filter(user=user) return qs.filter(user=user)
@action(detail=True, methods=["post"], url_path="do_feedback") @action(detail=True, methods=("POST",), url_path="do_feedback")
def do_feedback(self, request: Request, *args, **kwargs) -> Response: def do_feedback(self, request: Request, *args, **kwargs) -> Response:
serializer = self.get_serializer(request.data) serializer = self.get_serializer(request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
@ -946,7 +946,7 @@ class OrderProductViewSet(EvibesViewSet):
if not order_product.order: if not order_product.order:
return Response(status=status.HTTP_404_NOT_FOUND) return Response(status=status.HTTP_404_NOT_FOUND)
if not ( if not (
request.user.has_perm("core.change_orderproduct") request.user.has_perm("core.change_orderproduct") # ty:ignore[possibly-missing-attribute]
or request.user == order_product.order.user or request.user == order_product.order.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -1008,7 +1008,7 @@ class PromoCodeViewSet(EvibesViewSet):
qs = super().get_queryset() qs = super().get_queryset()
user = self.request.user user = self.request.user
if user.has_perm("core.view_promocode"): if user.has_perm("core.view_promocode"): # ty:ignore[possibly-missing-attribute]
return qs return qs
return qs.filter(user=user) return qs.filter(user=user)
@ -1064,13 +1064,13 @@ class WishlistViewSet(EvibesViewSet):
qs = super().get_queryset() qs = super().get_queryset()
user = self.request.user user = self.request.user
if user.has_perm("core.view_wishlist"): if user.has_perm("core.view_wishlist"): # ty:ignore[possibly-missing-attribute]
return qs return qs
return qs.filter(user=user) return qs.filter(user=user)
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action(detail=False, methods=["get"], url_path="current") @action(detail=False, methods=("GET",), url_path="current")
def current(self, request: Request, *args, **kwargs) -> Response: def current(self, request: Request, *args, **kwargs) -> Response:
if not request.user.is_authenticated: if not request.user.is_authenticated:
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -1083,14 +1083,14 @@ class WishlistViewSet(EvibesViewSet):
) )
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action(detail=True, methods=["post"], url_path="add_wishlist_product") @action(detail=True, methods=("POST",), url_path="add_wishlist_product")
def add_wishlist_product(self, request: Request, *args, **kwargs) -> Response: def add_wishlist_product(self, request: Request, *args, **kwargs) -> Response:
serializer = AddWishlistProductSerializer(data=request.data) serializer = AddWishlistProductSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
try: try:
wishlist = self.get_object() wishlist = self.get_object()
if not ( if not (
request.user.has_perm("core.change_wishlist") request.user.has_perm("core.change_wishlist") # ty:ignore[possibly-missing-attribute]
or request.user == wishlist.user or request.user == wishlist.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -1106,14 +1106,14 @@ class WishlistViewSet(EvibesViewSet):
return Response(status=status.HTTP_404_NOT_FOUND) return Response(status=status.HTTP_404_NOT_FOUND)
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action(detail=True, methods=["post"], url_path="remove_wishlist_product") @action(detail=True, methods=("POST",), url_path="remove_wishlist_product")
def remove_wishlist_product(self, request: Request, *args, **kwargs) -> Response: def remove_wishlist_product(self, request: Request, *args, **kwargs) -> Response:
serializer = RemoveWishlistProductSerializer(data=request.data) serializer = RemoveWishlistProductSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
try: try:
wishlist = self.get_object() wishlist = self.get_object()
if not ( if not (
request.user.has_perm("core.change_wishlist") request.user.has_perm("core.change_wishlist") # ty:ignore[possibly-missing-attribute]
or request.user == wishlist.user or request.user == wishlist.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -1129,14 +1129,14 @@ class WishlistViewSet(EvibesViewSet):
return Response(status=status.HTTP_404_NOT_FOUND) return Response(status=status.HTTP_404_NOT_FOUND)
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action(detail=True, methods=["post"], url_path="bulk_add_wishlist_product") @action(detail=True, methods=("POST",), url_path="bulk_add_wishlist_product")
def bulk_add_wishlist_products(self, request: Request, *args, **kwargs) -> Response: def bulk_add_wishlist_products(self, request: Request, *args, **kwargs) -> Response:
serializer = BulkAddWishlistProductSerializer(data=request.data) serializer = BulkAddWishlistProductSerializer(data=request.data)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
try: try:
wishlist = self.get_object() wishlist = self.get_object()
if not ( if not (
request.user.has_perm("core.change_wishlist") request.user.has_perm("core.change_wishlist") # ty:ignore[possibly-missing-attribute]
or request.user == wishlist.user or request.user == wishlist.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -1152,7 +1152,7 @@ class WishlistViewSet(EvibesViewSet):
return Response(status=status.HTTP_404_NOT_FOUND) return Response(status=status.HTTP_404_NOT_FOUND)
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action(detail=True, methods=["post"], url_path="bulk_remove_wishlist_product") @action(detail=True, methods=("POST",), url_path="bulk_remove_wishlist_product")
def bulk_remove_wishlist_products( def bulk_remove_wishlist_products(
self, request: Request, *args, **kwargs self, request: Request, *args, **kwargs
) -> Response: ) -> Response:
@ -1161,7 +1161,7 @@ class WishlistViewSet(EvibesViewSet):
try: try:
wishlist = self.get_object() wishlist = self.get_object()
if not ( if not (
request.user.has_perm("core.change_wishlist") request.user.has_perm("core.change_wishlist") # ty:ignore[possibly-missing-attribute]
or request.user == wishlist.user or request.user == wishlist.user
): ):
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)
@ -1201,7 +1201,7 @@ class AddressViewSet(EvibesViewSet):
return AddressSerializer return AddressSerializer
def get_queryset(self): def get_queryset(self):
if self.request.user.has_perm("core.view_address"): if self.request.user.has_perm("core.view_address"): # ty:ignore[possibly-missing-attribute]
return super().get_queryset() return super().get_queryset()
if self.request.user.is_authenticated: if self.request.user.is_authenticated:
@ -1234,7 +1234,7 @@ class AddressViewSet(EvibesViewSet):
) )
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@action(detail=False, methods=["get"], url_path="autocomplete") @action(detail=False, methods=("GET",), url_path="autocomplete")
def autocomplete(self, request: Request, *args, **kwargs) -> Response: def autocomplete(self, request: Request, *args, **kwargs) -> Response:
serializer = AddressAutocompleteInputSerializer(data=request.query_params) serializer = AddressAutocompleteInputSerializer(data=request.query_params)
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)

View file

@ -11,7 +11,7 @@ from django.utils.safestring import SafeString
class JSONTableWidget(forms.Widget): class JSONTableWidget(forms.Widget):
template_name = "json_table_widget.html" template_name = "json_table_widget.html"
def format_value(self, value: str | dict[str, Any]) -> str | dict[str, Any]: def format_value(self, value: str | dict[str, Any]) -> str | dict[str, Any]: # ty: ignore[invalid-method-override]
if isinstance(value, dict): if isinstance(value, dict):
return value return value
try: try:
@ -40,8 +40,8 @@ class JSONTableWidget(forms.Widget):
json_data = {} json_data = {}
try: try:
keys = data.getlist(f"{name}_key") keys = data.getlist(f"{name}_key") # ty: ignore[unresolved-attribute]
values = data.getlist(f"{name}_value") values = data.getlist(f"{name}_value") # ty: ignore[unresolved-attribute]
for key, value in zip(keys, values, strict=True): for key, value in zip(keys, values, strict=True):
if key.strip(): if key.strip():
try: try:

View file

@ -60,5 +60,5 @@ class GatewayAdmin(ActivationActionsMixin, ModelAdmin):
def can_be_used(self, obj: Gateway) -> bool: def can_be_used(self, obj: Gateway) -> bool:
return obj.can_be_used return obj.can_be_used
can_be_used.boolean = True can_be_used.boolean = True # ty: ignore[unresolved-attribute]
can_be_used.short_description = _("can be used") can_be_used.short_description = _("can be used") # ty: ignore[unresolved-attribute]

View file

@ -21,6 +21,6 @@ class Deposit(Mutation):
currency="EUR", currency="EUR",
) )
# noinspection PyTypeChecker # noinspection PyTypeChecker
return Deposit(transaction=transaction) return Deposit(transaction=transaction) # ty: ignore[unknown-argument]
else: else:
raise PermissionDenied(permission_denied_message) raise PermissionDenied(permission_denied_message)

View file

@ -58,6 +58,6 @@ class GatewayQuerySet(QuerySet):
).order_by("-priority") ).order_by("-priority")
class GatewayManager(Manager.from_queryset(GatewayQuerySet)): class GatewayManager(Manager.from_queryset(GatewayQuerySet)): # ty:ignore[unsupported-base]
def get_queryset(self) -> QuerySet: def get_queryset(self) -> QuerySet:
return super().get_queryset().can_be_used() return super().get_queryset().can_be_used()

View file

@ -58,7 +58,7 @@ class Transaction(NiceModel):
else f"{self.order.attributes.get('customer_email')} | {self.amount}" else f"{self.order.attributes.get('customer_email')} | {self.amount}"
) )
def save(self, **kwargs): def save(self, **kwargs): # ty: ignore[invalid-method-override]
if len(str(self.amount).split(".")[1]) > 2: if len(str(self.amount).split(".")[1]) > 2:
self.amount = round(self.amount, 2) self.amount = round(self.amount, 2)
super().save(**kwargs) super().save(**kwargs)
@ -91,7 +91,7 @@ class Balance(NiceModel):
verbose_name = _("balance") verbose_name = _("balance")
verbose_name_plural = _("balances") verbose_name_plural = _("balances")
def save(self, **kwargs): def save(self, **kwargs): # ty: ignore[invalid-method-override]
if self.amount != 0.0 and len(str(self.amount).split(".")[1]) > 2: if self.amount != 0.0 and len(str(self.amount).split(".")[1]) > 2:
self.amount = round(self.amount, 2) self.amount = round(self.amount, 2)
super().save(**kwargs) super().save(**kwargs)
@ -170,13 +170,13 @@ class Gateway(NiceModel):
month_end = timezone.make_aware(datetime.combine(next_month_date, time.min), tz) month_end = timezone.make_aware(datetime.combine(next_month_date, time.min), tz)
daily_sum = ( daily_sum = (
self.transactions.filter(created__date=today).aggregate( self.transactions.filter(created__date=today).aggregate( # ty: ignore[unresolved-attribute]
total=Sum("amount") total=Sum("amount")
)["total"] )["total"]
or 0 or 0
) )
monthly_sum = ( monthly_sum = (
self.transactions.filter( self.transactions.filter( # ty: ignore[unresolved-attribute]
created__gte=month_start, created__lt=month_end created__gte=month_start, created__lt=month_end
).aggregate(total=Sum("amount"))["total"] ).aggregate(total=Sum("amount"))["total"]
or 0 or 0

View file

@ -44,7 +44,7 @@ class DepositView(APIView):
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
transaction = Transaction.objects.create( transaction = Transaction.objects.create(
balance=request.user.payments_balance, balance=request.user.payments_balance, # ty: ignore[unresolved-attribute]
amount=serializer.validated_data["amount"], amount=serializer.validated_data["amount"],
currency="EUR", currency="EUR",
) )

View file

@ -123,17 +123,17 @@ class UserAdmin(ActivationActionsMixin, BaseUserAdmin, ModelAdmin):
) )
) )
def save_model( def save_model( # ty: ignore[invalid-method-override]
self, request: HttpRequest, obj: Any, form: UserForm, change: Any self, request: HttpRequest, obj: Any, form: UserForm, change: Any
) -> None: ) -> None:
if form.cleaned_data.get("attributes") is None: if form.cleaned_data.get("attributes") is None:
obj.attributes = None obj.attributes = None
if ( if (
form.cleaned_data.get("is_superuser", False) form.cleaned_data.get("is_superuser", False)
and not request.user.is_superuser and not request.user.is_superuser # ty: ignore[possibly-missing-attribute]
): ):
raise PermissionDenied(_("You cannot jump over your head!")) raise PermissionDenied(_("You cannot jump over your head!"))
super().save_model(request, obj, form, change) super().save_model(request, obj, form, change) # ty: ignore[invalid-argument-type]
# noinspection PyUnusedLocal # noinspection PyUnusedLocal

View file

@ -80,13 +80,13 @@ class CreateUser(Mutation):
else {}, else {},
) )
# noinspection PyTypeChecker # noinspection PyTypeChecker
return CreateUser(success=True) return CreateUser(success=True) # ty: ignore[unknown-argument]
else: else:
# noinspection PyTypeChecker # noinspection PyTypeChecker
return CreateUser(success=False) return CreateUser(success=False) # ty: ignore[unknown-argument]
except IntegrityError: except IntegrityError:
# noinspection PyTypeChecker # noinspection PyTypeChecker
return CreateUser(success=True) return CreateUser(success=True) # ty: ignore[unknown-argument]
except Exception as e: except Exception as e:
raise BadRequest(str(e)) from e raise BadRequest(str(e)) from e
@ -175,7 +175,7 @@ class UpdateUser(Mutation):
user.save() user.save()
return UpdateUser(user=user) return UpdateUser(user=user) # ty: ignore[unknown-argument]
except User.DoesNotExist as dne: except User.DoesNotExist as dne:
name = "User" name = "User"
@ -203,7 +203,7 @@ class DeleteUser(Mutation):
else: else:
raise BadRequest("uuid or email must be specified") raise BadRequest("uuid or email must be specified")
# noinspection PyTypeChecker # noinspection PyTypeChecker
return DeleteUser(success=True) return DeleteUser(success=True) # ty: ignore[unknown-argument]
except User.DoesNotExist as dne: except User.DoesNotExist as dne:
raise Http404( raise Http404(
f"User with the given uuid: {uuid} or email: {email} does not exist." f"User with the given uuid: {uuid} or email: {email} does not exist."
@ -226,10 +226,11 @@ class ObtainJSONWebToken(Mutation):
) )
try: try:
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
return ObtainJSONWebToken( obtained_user = User.objects.get(uuid=serializer.validated_data["user"])
user=User.objects.get(uuid=serializer.validated_data["user"]), return ObtainJSONWebToken( # ty: ignore[unknown-argument]
refresh_token=serializer.validated_data["refresh"], user=obtained_user, # ty: ignore[unknown-argument]
access_token=serializer.validated_data["access"], refresh_token=serializer.validated_data["refresh"], # ty: ignore[unknown-argument]
access_token=serializer.validated_data["access"], # ty: ignore[unknown-argument]
) )
except Exception as e: except Exception as e:
raise PermissionDenied(f"invalid credentials provided: {e!s}") from e raise PermissionDenied(f"invalid credentials provided: {e!s}") from e
@ -249,10 +250,11 @@ class RefreshJSONWebToken(Mutation):
) )
try: try:
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
return RefreshJSONWebToken( refreshed_user = User.objects.get(uuid=serializer.validated_data["user"])
user=User.objects.get(uuid=serializer.validated_data["user"]), return RefreshJSONWebToken( # ty: ignore[unknown-argument]
access_token=serializer.validated_data["access"], user=refreshed_user, # ty: ignore[unknown-argument]
refresh_token=serializer.validated_data["refresh"], access_token=serializer.validated_data["access"], # ty: ignore[unknown-argument]
refresh_token=serializer.validated_data["refresh"], # ty: ignore[unknown-argument]
) )
except Exception as e: except Exception as e:
raise PermissionDenied(f"invalid refresh token provided: {e!s}") from e raise PermissionDenied(f"invalid refresh token provided: {e!s}") from e
@ -270,14 +272,19 @@ class VerifyJSONWebToken(Mutation):
serializer = TokenVerifySerializer(data={"token": token}, retrieve_user=False) serializer = TokenVerifySerializer(data={"token": token}, retrieve_user=False)
with suppress(Exception): with suppress(Exception):
serializer.is_valid(raise_exception=True) serializer.is_valid(raise_exception=True)
verified_user = User.objects.get(uuid=serializer.validated_data["user"])
# noinspection PyTypeChecker # noinspection PyTypeChecker
return VerifyJSONWebToken( return VerifyJSONWebToken( # ty: ignore[unknown-argument]
token_is_valid=True, token_is_valid=True, # ty: ignore[unknown-argument]
user=User.objects.get(uuid=serializer.validated_data["user"]), user=verified_user, # ty: ignore[unknown-argument]
) )
detail = traceback.format_exc() if settings.DEBUG else "" detail = traceback.format_exc() if settings.DEBUG else ""
# noinspection PyTypeChecker # noinspection PyTypeChecker
return VerifyJSONWebToken(token_is_valid=False, user=None, detail=detail) return VerifyJSONWebToken( # ty: ignore[unknown-argument]
token_is_valid=False, # ty: ignore[unknown-argument]
user=None, # ty: ignore[unknown-argument]
detail=detail, # ty: ignore[unknown-argument]
)
class ActivateUser(Mutation): class ActivateUser(Mutation):
@ -307,7 +314,7 @@ class ActivateUser(Mutation):
raise BadRequest(_(f"something went wrong: {e!s}")) from e raise BadRequest(_(f"something went wrong: {e!s}")) from e
# noinspection PyTypeChecker # noinspection PyTypeChecker
return ActivateUser(success=True) return ActivateUser(success=True) # ty: ignore[unknown-argument]
class ResetPassword(Mutation): class ResetPassword(Mutation):
@ -321,12 +328,12 @@ class ResetPassword(Mutation):
user = User.objects.get(email=email) user = User.objects.get(email=email)
except User.DoesNotExist: except User.DoesNotExist:
# noinspection PyTypeChecker # noinspection PyTypeChecker
return ResetPassword(success=False) return ResetPassword(success=False) # ty: ignore[unknown-argument]
send_reset_password_email_task.delay(user_pk=user.uuid) send_reset_password_email_task.delay(user_pk=user.uuid)
# noinspection PyTypeChecker # noinspection PyTypeChecker
return ResetPassword(success=True) return ResetPassword(success=True) # ty: ignore[unknown-argument]
class ConfirmResetPassword(Mutation): class ConfirmResetPassword(Mutation):
@ -357,7 +364,7 @@ class ConfirmResetPassword(Mutation):
user.save() user.save()
# noinspection PyTypeChecker # noinspection PyTypeChecker
return ConfirmResetPassword(success=True) return ConfirmResetPassword(success=True) # ty: ignore[unknown-argument]
except ( except (
TypeError, TypeError,
@ -386,4 +393,4 @@ class UploadAvatar(Mutation):
except Exception as e: except Exception as e:
raise BadRequest(str(e)) from e raise BadRequest(str(e)) from e
return UploadAvatar(user=info.context.user) return UploadAvatar(user=info.context.user) # ty: ignore[unknown-argument]

View file

@ -105,7 +105,7 @@ class UserType(DjangoObjectType):
def resolve_wishlist(self: User, info) -> Wishlist | None: def resolve_wishlist(self: User, info) -> Wishlist | None:
if info.context.user == self: if info.context.user == self:
return self.user_related_wishlist return self.user_related_wishlist # ty: ignore[unresolved-attribute]
return None return None
def resolve_balance(self: User, info) -> Balance | None: def resolve_balance(self: User, info) -> Balance | None:
@ -126,13 +126,13 @@ class UserType(DjangoObjectType):
def resolve_orders(self: User, _info): def resolve_orders(self: User, _info):
# noinspection Mypy # noinspection Mypy
return self.orders.all() if self.orders.count() >= 1 else [] return self.orders.all() if self.orders.count() >= 1 else [] # ty: ignore[unresolved-attribute]
def resolve_recently_viewed(self: User, _info, **kwargs): def resolve_recently_viewed(self: User, _info, **kwargs):
uuid_list = self.recently_viewed or [] uuid_list = self.recently_viewed or []
if not uuid_list: if not uuid_list:
return connection_from_array([], kwargs) return connection_from_array([], kwargs) # ty: ignore[invalid-argument-type]
qs = Product.objects.filter(uuid__in=uuid_list) qs = Product.objects.filter(uuid__in=uuid_list)
@ -142,7 +142,7 @@ class UserType(DjangoObjectType):
products_by_uuid[u] for u in uuid_list if u in products_by_uuid products_by_uuid[u] for u in uuid_list if u in products_by_uuid
] ]
return connection_from_array(ordered_products, kwargs) return connection_from_array(ordered_products, kwargs) # ty: ignore[invalid-argument-type]
def resolve_groups(self: User, _info): def resolve_groups(self: User, _info):
return self.groups.all() if self.groups.count() >= 1 else [] return self.groups.all() if self.groups.count() >= 1 else []

View file

@ -87,7 +87,7 @@ class UserManager(BaseUserManager):
): ):
if backend is None: if backend is None:
# noinspection PyCallingNonCallable # noinspection PyCallingNonCallable
backends = auth._get_backends(return_tuples=True) backends = auth._get_backends(return_tuples=True) # ty: ignore[unresolved-attribute]
if len(backends) == 1: if len(backends) == 1:
backend, _ = backends[0] backend, _ = backends[0]
else: else:
@ -102,7 +102,7 @@ class UserManager(BaseUserManager):
else: else:
backend = auth.load_backend(backend) backend = auth.load_backend(backend)
if hasattr(backend, "with_perm"): if hasattr(backend, "with_perm"):
return backend.with_perm( return backend.with_perm( # ty: ignore[call-non-callable]
perm, perm,
is_active=is_active, is_active=is_active,
include_superusers=include_superusers, include_superusers=include_superusers,

View file

@ -36,7 +36,8 @@ class JWTAuthMiddleware(BaseMiddleware):
def __init__(self, token_str: str): def __init__(self, token_str: str):
self.META = {"HTTP_X_EVIBES_AUTH": f"Bearer {token_str}"} self.META = {"HTTP_X_EVIBES_AUTH": f"Bearer {token_str}"}
user, _ = jwt_auth.authenticate(_Req(token)) result = jwt_auth.authenticate(_Req(token)) # ty: ignore[invalid-argument-type]
user = result[0] if result else None
scope["user"] = user scope["user"] = user
return await super().__call__(scope, receive, send) return await super().__call__(scope, receive, send)

View file

@ -93,7 +93,7 @@ class UserMessageConsumer(AsyncJsonWebsocketConsumer):
async def connect(self) -> None: # noqa: D401 async def connect(self) -> None: # noqa: D401
await self.accept() await self.accept()
@extend_ws_schema(**USER_MESSAGE_CONSUMER_SCHEMA) @extend_ws_schema(**USER_MESSAGE_CONSUMER_SCHEMA) # ty: ignore[invalid-argument-type]
async def receive_json(self, content: dict[str, Any], **kwargs) -> None: async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action") action = content.get("action")
if action == "ping": if action == "ping":
@ -145,7 +145,7 @@ class StaffInboxConsumer(AsyncJsonWebsocketConsumer):
async def disconnect(self, code: int) -> None: async def disconnect(self, code: int) -> None:
await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name) await self.channel_layer.group_discard(STAFF_INBOX_GROUP, self.channel_name)
@extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA) @extend_ws_schema(**STAFF_INBOX_CONSUMER_SCHEMA) # ty: ignore[invalid-argument-type]
async def receive_json(self, content: dict[str, Any], **kwargs) -> None: async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action") action = content.get("action")
user: User = self.scope.get("user") user: User = self.scope.get("user")
@ -295,7 +295,7 @@ class ThreadConsumer(AsyncJsonWebsocketConsumer):
f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name f"{THREAD_GROUP_PREFIX}{self.thread_id}", self.channel_name
) )
@extend_ws_schema(**THREAD_CONSUMER_SCHEMA) @extend_ws_schema(**THREAD_CONSUMER_SCHEMA) # ty: ignore[invalid-argument-type]
async def receive_json(self, content: dict[str, Any], **kwargs) -> None: async def receive_json(self, content: dict[str, Any], **kwargs) -> None:
action = content.get("action") action = content.get("action")
user: User = self.scope.get("user") user: User = self.scope.get("user")

View file

@ -111,7 +111,7 @@ def build_router() -> Router | None:
# group check # group check
if not staff_user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists(): if not staff_user.groups.filter(name=USER_SUPPORT_GROUP_NAME).exists():
return None, None, None return None, None, None
text = message.text.strip() text = message.text.strip() if message.text else ""
if text.lower().startswith("reply "): if text.lower().startswith("reply "):
parts = text.split(maxsplit=2) parts = text.split(maxsplit=2)
if len(parts) < 3: if len(parts) < 3:

View file

@ -64,7 +64,7 @@ def send_message(
if not text or len(text) > 1028: if not text or len(text) > 1028:
raise ValidationError({"text": _("Message must be 1..1028 characters.")}) raise ValidationError({"text": _("Message must be 1..1028 characters.")})
if sender_user and not sender_user.is_staff: if sender_user and not sender_user.is_staff:
if thread.user_id != sender_user.pk: if thread.user_id != sender_user.pk: # ty: ignore[unresolved-attribute]
raise PermissionDenied raise PermissionDenied
msg = ChatMessage.objects.create( msg = ChatMessage.objects.create(
thread=thread, thread=thread,
@ -131,7 +131,7 @@ def auto_reply(thread: ChatThread) -> None:
def claim_thread(thread: ChatThread, staff_user: User) -> ChatThread: def claim_thread(thread: ChatThread, staff_user: User) -> ChatThread:
if not staff_user.is_staff: if not staff_user.is_staff:
raise PermissionDenied raise PermissionDenied
if thread.assigned_to_id and not staff_user.is_superuser: if thread.assigned_to_id and not staff_user.is_superuser: # ty: ignore[unresolved-attribute]
raise PermissionDenied raise PermissionDenied
thread.assigned_to = staff_user thread.assigned_to = staff_user
thread.save(update_fields=["assigned_to", "modified"]) thread.save(update_fields=["assigned_to", "modified"])

View file

@ -180,7 +180,7 @@ class TokenObtainSerializer(Serializer):
@classmethod @classmethod
def get_token(cls, user: AuthUser) -> Token: def get_token(cls, user: AuthUser) -> Token:
if cls.token_class is not None: if cls.token_class is not None:
return cls.token_class.for_user(user) return cls.token_class.for_user(user) # ty: ignore[invalid-argument-type]
else: else:
raise RuntimeError(_("must set token_class attribute on class.")) raise RuntimeError(_("must set token_class attribute on class."))
@ -200,7 +200,7 @@ class TokenObtainPairSerializer(TokenObtainSerializer):
refresh = self.get_token(self.user) refresh = self.get_token(self.user)
data["refresh"] = str(refresh) data["refresh"] = str(refresh)
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
data["access"] = str(refresh.access_token) data["access"] = str(refresh.access_token) # ty: ignore[unresolved-attribute]
data["user"] = ( data["user"] = (
UserSerializer(self.user).data if self.retrieve_user else self.user.pk UserSerializer(self.user).data if self.retrieve_user else self.user.pk
) )

View file

@ -3,7 +3,7 @@ import traceback
from os import getenv from os import getenv
from typing import Any, Callable, cast from typing import Any, Callable, cast
from django.contrib.auth.models import AbstractBaseUser, AnonymousUser from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ( from django.core.exceptions import (
BadRequest, BadRequest,
DisallowedHost, DisallowedHost,
@ -27,6 +27,7 @@ from rest_framework_simplejwt.authentication import JWTAuthentication
from rest_framework_simplejwt.exceptions import InvalidToken from rest_framework_simplejwt.exceptions import InvalidToken
from sentry_sdk import capture_exception from sentry_sdk import capture_exception
from engine.vibes_auth.models import User
from evibes.settings.drf import JSON_UNDERSCOREIZE from evibes.settings.drf import JSON_UNDERSCOREIZE
from evibes.utils.misc import RatelimitedError from evibes.utils.misc import RatelimitedError
from evibes.utils.parsers import underscoreize from evibes.utils.parsers import underscoreize
@ -80,11 +81,11 @@ class GrapheneJWTAuthorizationMiddleware:
return next(root, info, **args) return next(root, info, **args)
@staticmethod @staticmethod
def get_jwt_user(request: HttpRequest) -> AbstractBaseUser | AnonymousUser: def get_jwt_user(request: HttpRequest) -> "User" | AnonymousUser:
jwt_authenticator = JWTAuthentication() jwt_authenticator = JWTAuthentication()
try: try:
user_obj, _ = jwt_authenticator.authenticate(request) # type: ignore[assignment] user_obj, _ = jwt_authenticator.authenticate(request) # type: ignore[assignment]
user: AbstractBaseUser | AnonymousUser = cast(AbstractBaseUser, user_obj) user: "User" | AnonymousUser = cast(User, user_obj)
except InvalidToken: except InvalidToken:
user = AnonymousUser() user = AnonymousUser()
except TypeError: except TypeError:

View file

@ -132,16 +132,7 @@ known-first-party = ["evibes", "engine"]
quote-style = "double" quote-style = "double"
indent-style = "space" indent-style = "space"
[tool.ty.environment] [tool.ty.src]
python-version = "3.12"
[tool.ty.terminal]
output-format = "concise"
[tool.ty.rules]
possibly-unresolved-reference = "warn"
[[tool.ty.overrides]]
exclude = [ exclude = [
"Dockerfiles/**", "Dockerfiles/**",
"monitoring/**", "monitoring/**",
@ -152,5 +143,18 @@ exclude = [
"media/**", "media/**",
] ]
[tool.ty.environment]
python-version = "3.12"
[tool.ty.terminal]
output-format = "concise"
[tool.ty.rules]
possibly-unresolved-reference = "warn"
possibly-missing-attribute = "warn"
possibly-missing-import = "warn"
unsupported-base = "warn"
[tool.django-stubs] [tool.django-stubs]
django_settings_module = "evibes.settings.__init__" django_settings_module = "evibes.settings.__init__"