This commit is contained in:
Egor Pavlovich Gorbunov 2025-05-26 16:12:59 +03:00
parent 1219067721
commit d86a6ed3c8
40 changed files with 357 additions and 359 deletions

View file

@ -18,15 +18,19 @@ class PostAdmin(admin.ModelAdmin):
readonly_fields = ("preview_html",)
fieldsets = (
(None, {
(
None,
{
"fields": (
"author", "title",
"author",
"title",
"content",
"preview_html",
"file",
"tags",
)
}),
},
),
)
def preview_html(self, obj):

View file

@ -18,8 +18,12 @@ class PostDocument(ActiveOnlyMixin, Document):
class Index:
name = "posts"
settings = {"number_of_shards": 1, "number_of_replicas": 0,
"analysis": COMMON_ANALYSIS, "index": {"max_ngram_diff": 18}}
settings = {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": COMMON_ANALYSIS,
"index": {"max_ngram_diff": 18},
}
class Django:
model = Post
@ -28,4 +32,5 @@ class PostDocument(ActiveOnlyMixin, Document):
def prepare_title(self, instance):
return getattr(instance, "title", "") or ""
registry.register_document(PostDocument)

View file

@ -10,12 +10,12 @@ from core.abstract import NiceModel
class Post(NiceModel):
is_publicly_visible = True
author = ForeignKey(
to="vibes_auth.User", on_delete=CASCADE, blank=False, null=False, related_name="posts"
author = ForeignKey(to="vibes_auth.User", on_delete=CASCADE, blank=False, null=False, related_name="posts")
title = CharField(
unique=True, max_length=128, blank=False, null=False, help_text=_("post title"), verbose_name=_("title")
)
title = CharField(unique=True, max_length=128, blank=False, null=False, help_text=_("post title"),
verbose_name=_("title"))
content = MarkdownField("content",
content = MarkdownField(
"content",
extensions=[
TocExtension(toc_depth=3),
"pymdownx.arithmatex",
@ -46,15 +46,13 @@ class Post(NiceModel):
"pymdownx.striphtml",
"pymdownx.superfences",
"pymdownx.tasklist",
"pymdownx.tilde"
], blank=True, null=True)
file = FileField(upload_to="posts/", blank=True, null=True)
slug = AutoSlugField(
populate_from='title',
allow_unicode=True,
unique=True,
editable=False
"pymdownx.tilde",
],
blank=True,
null=True,
)
file = FileField(upload_to="posts/", blank=True, null=True)
slug = AutoSlugField(populate_from="title", allow_unicode=True, unique=True, editable=False)
tags = ManyToManyField(to="blog.PostTag", blank=True, related_name="posts")
def __str__(self):

View file

@ -4,18 +4,12 @@ from django.utils.safestring import mark_safe
class MarkdownEditorWidget(forms.Textarea):
class Media:
css = {
'all': (
'https://cdnjs.cloudflare.com/ajax/libs/easymde/2.14.0/easymde.min.css',
)
}
js = (
'https://cdnjs.cloudflare.com/ajax/libs/easymde/2.14.0/easymde.min.js',
)
css = {"all": ("https://cdnjs.cloudflare.com/ajax/libs/easymde/2.14.0/easymde.min.css",)}
js = ("https://cdnjs.cloudflare.com/ajax/libs/easymde/2.14.0/easymde.min.js",)
def render(self, name, value, attrs=None, renderer=None):
textarea_html = super().render(name, value, attrs, renderer)
textarea_id = attrs.get('id', f'id_{name}')
textarea_id = attrs.get("id", f"id_{name}")
init_js = f"""
<script>
document.addEventListener('DOMContentLoaded', function() {{

View file

@ -65,7 +65,7 @@ class AttributeValueInline(TabularInline):
is_navtab = True
verbose_name = _("attribute value")
verbose_name_plural = _("attribute values")
autocomplete_fields = ['attribute']
autocomplete_fields = ["attribute"]
@admin.register(AttributeGroup)

View file

@ -43,15 +43,16 @@ core_router.register(r"promotions", PromotionViewSet, basename="promotions")
core_router.register(r"addresses", AddressViewSet, basename="addresses")
sitemaps = {
'products': ProductSitemap,
'categories': CategorySitemap,
'brands': BrandSitemap,
"products": ProductSitemap,
"categories": CategorySitemap,
"brands": BrandSitemap,
}
urlpatterns = [
path("core/", include(core_router.urls)),
path("sitemap.xml", sitemap_index, {"sitemaps": sitemaps, "sitemap_url_name": "sitemap-detail"},
name="sitemap-index"),
path(
"sitemap.xml", sitemap_index, {"sitemaps": sitemaps, "sitemap_url_name": "sitemap-detail"}, name="sitemap-index"
),
path("sitemap-<section>.xml", sitemap_detail, {"sitemaps": sitemaps}, name="sitemap-detail"),
path("sitemap-<section>-<int:page>.xml", sitemap_detail, {"sitemaps": sitemaps}, name="sitemap-detail"),
path("download/<str:order_product_uuid>/", download_digital_asset_view, name="download_digital_asset"),

View file

@ -5,10 +5,10 @@ from rest_framework.fields import CharField, DictField, JSONField, ListField
from core.docs.drf import error
from core.serializers import (
BuyAsBusinessOrderSerializer,
CacheOperatorSerializer,
ContactUsSerializer,
LanguageSerializer,
BuyAsBusinessOrderSerializer,
)
from payments.serializers import TransactionProcessSerializer
@ -16,7 +16,8 @@ CACHE_SCHEMA = {
"post": extend_schema(
summary=_("cache I/O"),
description=_(
"apply only a key to read permitted data from cache.\napply key, data and timeout with authentication to write data to cache." # noqa: E501
"apply only a key to read permitted data from cache.\n"
"apply key, data and timeout with authentication to write data to cache."
),
request=CacheOperatorSerializer,
responses={

View file

@ -183,9 +183,7 @@ ORDER_SCHEMA = {
),
"buy_unregistered": extend_schema(
summary=_("purchase an order without account creation"),
description=_(
"finalizes the order purchase for a non-registered user."
),
description=_("finalizes the order purchase for a non-registered user."),
request=BuyUnregisteredOrderSerializer,
responses={
status.HTTP_202_ACCEPTED: TransactionProcessSerializer,
@ -270,7 +268,7 @@ ATTRIBUTES_DESC = _(
"`true`/`false` for booleans, integers, floats; otherwise treated as string. \n"
"• **Base64**: prefix with `b64-` to URL-safe base64-encode the raw value. \n"
"Examples: \n"
"`color=exact-red`, `size=gt-10`, `features=in-[\"wifi\",\"bluetooth\"]`, \n"
'`color=exact-red`, `size=gt-10`, `features=in-["wifi","bluetooth"]`, \n'
"`b64-description=icontains-aGVhdC1jb2xk`"
)
@ -490,7 +488,8 @@ ADDRESS_SCHEMA = {
),
"autocomplete": extend_schema(
summary=_("autocomplete address suggestions"),
parameters=[OpenApiParameter(
parameters=[
OpenApiParameter(
name="q",
location=OpenApiParameter.QUERY,
description=_("raw data query string, please append with data from geo-IP endpoint"),
@ -501,7 +500,8 @@ ADDRESS_SCHEMA = {
location=OpenApiParameter.QUERY,
description=_("limit the results amount, 1 < limit < 10, default: 5"),
type=int,
)],
),
],
responses={
status.HTTP_200_OK: AddressSuggestionSerializer(many=True),
**BASE_ERRORS,

View file

@ -55,7 +55,7 @@ def process_query(query: str = ""):
Q(
"multi_match",
query=query,
fields=[f for f in SMART_FIELDS if f.endswith('.auto')],
fields=[f for f in SMART_FIELDS if f.endswith(".auto")],
type="bool_prefix",
),
],
@ -76,11 +76,13 @@ def process_query(query: str = ""):
idx = hit.meta.index
if idx in results:
results[idx].append({
results[idx].append(
{
"uuid": str(obj_uuid),
"name": obj_name,
"slug": obj_slug,
})
}
)
return results
except NotFoundError:
raise Http404

View file

@ -83,8 +83,12 @@ class BrandDocument(ActiveOnlyMixin, Document):
class Index:
name = "brands"
settings = {"number_of_shards": 1, "number_of_replicas": 0,
"analysis": COMMON_ANALYSIS, "index": {"max_ngram_diff": 18}}
settings = {
"number_of_shards": 1,
"number_of_replicas": 0,
"analysis": COMMON_ANALYSIS,
"index": {"max_ngram_diff": 18},
}
class Django:
model = Brand

View file

@ -179,13 +179,21 @@ class BuyOrder(BaseMutation):
transaction = Field(TransactionType, required=False)
@staticmethod
def mutate(_parent, info, order_uuid=None, order_hr_id=None, force_balance=False, force_payment=False,
promocode_uuid=None, shipping_address=None, billing_address=None):
def mutate(
_parent,
info,
order_uuid=None,
order_hr_id=None,
force_balance=False,
force_payment=False,
promocode_uuid=None,
shipping_address=None,
billing_address=None,
):
if not any([order_uuid, order_hr_id]) or all([order_uuid, order_hr_id]):
raise BadRequest(_("please provide either order_uuid or order_hr_id - mutually exclusive"))
user = info.context.user
try:
order = None
if order_uuid:
@ -194,8 +202,11 @@ class BuyOrder(BaseMutation):
order = Order.objects.get(user=user, human_readable_id=order_hr_id)
instance = order.buy(
force_balance=force_balance, force_payment=force_payment, promocode_uuid=promocode_uuid,
shipping_address=shipping_address, billing_address=billing_address
force_balance=force_balance,
force_payment=force_payment,
promocode_uuid=promocode_uuid,
shipping_address=shipping_address,
billing_address=billing_address,
)
match str(type(instance)):
@ -228,10 +239,22 @@ class BuyUnregisteredOrder(BaseMutation):
transaction = Field(TransactionType, required=False)
@staticmethod
def mutate(_parent, info, products, customer_name, customer_email, customer_phone, customer_billing_address,
payment_method, customer_shipping_address=None, promocode_uuid=None, is_business=False):
def mutate(
_parent,
info,
products,
customer_name,
customer_email,
customer_phone,
customer_billing_address,
payment_method,
customer_shipping_address=None,
promocode_uuid=None,
is_business=False,
):
order = Order.objects.create(status="MOMENTAL")
transaction = order.buy_without_registration(products=products,
transaction = order.buy_without_registration(
products=products,
promocode_uuid=promocode_uuid,
customer_name=customer_name,
customer_email=customer_email,
@ -239,7 +262,8 @@ class BuyUnregisteredOrder(BaseMutation):
billing_customer_address=customer_billing_address,
shipping_customer_address=customer_shipping_address,
payment_method=payment_method,
is_business=is_business)
is_business=is_business,
)
return BuyUnregisteredOrder(transaction=transaction)
@ -458,10 +482,7 @@ class DeleteProduct(BaseMutation):
class CreateAddress(BaseMutation):
class Arguments:
raw_data = String(
required=True,
description=_("original address string provided by the user")
)
raw_data = String(required=True, description=_("original address string provided by the user"))
address = Field(AddressType)
@ -469,10 +490,7 @@ class CreateAddress(BaseMutation):
def mutate(_parent, info, raw_data):
user = info.context.user if info.context.user.is_authenticated else None
address = Address.objects.create(
raw_data=raw_data,
user=user
)
address = Address.objects.create(raw_data=raw_data, user=user)
return CreateAddress(address=address)

View file

@ -131,9 +131,7 @@ class Query(ObjectType):
@staticmethod
def resolve_products(_parent, info, **kwargs):
if info.context.user.is_authenticated and kwargs.get("uuid"):
product = Product.objects.get(
uuid=kwargs["uuid"]
)
product = Product.objects.get(uuid=kwargs["uuid"])
if product.is_active and product.brand.is_active and product.category.is_active:
info.context.user.add_to_recently_viewed(product.uuid)
return (
@ -141,7 +139,9 @@ class Query(ObjectType):
if info.context.user.has_perm("core.view_product")
else Product.objects.filter(
is_active=True, brand__is_active=True, category__is_active=True, stocks__isnull=False
).select_related("brand", "category").prefetch_related("images", "stocks")
)
.select_related("brand", "category")
.prefetch_related("images", "stocks")
)
@staticmethod

View file

@ -12,7 +12,7 @@ class Command(BaseCommand):
# 1. Clean up duplicate Stock entries per product and vendor:
# Group stocks by (product, vendor)
stocks_by_group = defaultdict(list)
for stock in Stock.objects.all().order_by('modified'):
for stock in Stock.objects.all().order_by("modified"):
key = (stock.product_id, stock.vendor)
stocks_by_group[key].append(stock)
@ -37,13 +37,11 @@ class Command(BaseCommand):
if stock_deletions:
Stock.objects.filter(id__in=stock_deletions).delete()
self.stdout.write(
self.style.SUCCESS(f"Deleted {len(stock_deletions)} duplicate stock entries.")
)
self.stdout.write(self.style.SUCCESS(f"Deleted {len(stock_deletions)} duplicate stock entries."))
# 2. Clean up duplicate Category entries based on name (case-insensitive)
category_groups = defaultdict(list)
for cat in Category.objects.all().order_by('modified'):
for cat in Category.objects.all().order_by("modified"):
key = cat.name.lower()
category_groups[key].append(cat)
@ -80,19 +78,13 @@ class Command(BaseCommand):
count_inactive = inactive_products.count()
if count_inactive:
inactive_products.update(is_active=False)
self.stdout.write(self.style.SUCCESS(
f"Set {count_inactive} product(s) as inactive due to missing stocks."
))
self.stdout.write(self.style.SUCCESS(f"Set {count_inactive} product(s) as inactive due to missing stocks."))
# 4. Delete stocks without an associated product.
orphan_stocks = Stock.objects.filter(product__isnull=True)
orphan_count = orphan_stocks.count()
if orphan_count:
orphan_stocks.delete()
self.stdout.write(
self.style.SUCCESS(f"Deleted {orphan_count} stock(s) without an associated product.")
)
self.stdout.write(self.style.SUCCESS(f"Deleted {orphan_count} stock(s) without an associated product."))
self.stdout.write(self.style.SUCCESS(
"Started fetching products task in worker container without errors!"
))
self.stdout.write(self.style.SUCCESS("Started fetching products task in worker container without errors!"))

View file

@ -77,11 +77,7 @@ def load_po_sanitized(path: str) -> polib.POFile | None:
parts = text.split("\n\n", 1)
header = parts[0]
rest = parts[1] if len(parts) > 1 else ""
rest_clean = re.sub(
r"^msgid \"\"\s*\nmsgstr \"\"\s*\n?", "",
rest,
flags=re.MULTILINE
)
rest_clean = re.sub(r"^msgid \"\"\s*\nmsgstr \"\"\s*\n?", "", rest, flags=re.MULTILINE)
sanitized = header + "\n\n" + rest_clean
tmp = NamedTemporaryFile(mode="w+", delete=False, suffix=".po", encoding="utf-8") # noqa: SIM115
try:
@ -101,35 +97,37 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"-l", "--language",
"-l",
"--language",
dest="target_languages",
action="append",
required=True,
metavar="LANG",
help="Locale code for translation, e.g. de-DE, fr-FR."
help="Locale code for translation, e.g. de-DE, fr-FR.",
)
parser.add_argument(
"-a", "--app",
"-a",
"--app",
dest="target_apps",
action="append",
required=True,
metavar="APP",
help="App label for translation, e.g. core, payments."
help="App label for translation, e.g. core, payments.",
)
def handle(self, *args, **options) -> None:
target_langs: list[str] = options['target_languages']
target_apps: set[str] = set(options['target_apps'])
auth_key = os.environ.get('DEEPL_AUTH_KEY')
target_langs: list[str] = options["target_languages"]
target_apps: set[str] = set(options["target_apps"])
auth_key = os.environ.get("DEEPL_AUTH_KEY")
if not auth_key:
raise CommandError('DEEPL_AUTH_KEY not set')
raise CommandError("DEEPL_AUTH_KEY not set")
for target_lang in target_langs:
api_code = DEEPL_TARGET_LANGUAGES_MAPPING.get(target_lang)
if not api_code:
self.stdout.write(self.style.WARNING(f"Unknown language '{target_lang}'"))
continue
if api_code == 'unsupported':
if api_code == "unsupported":
self.stdout.write(self.style.WARNING(f"Unsupported language '{target_lang}'"))
continue
@ -139,7 +137,7 @@ class Command(BaseCommand):
if app_conf.label not in target_apps:
continue
en_path = os.path.join(app_conf.path, 'locale', 'en_GB', 'LC_MESSAGES', 'django.po')
en_path = os.path.join(app_conf.path, "locale", "en_GB", "LC_MESSAGES", "django.po")
if not os.path.isfile(en_path):
self.stdout.write(self.style.WARNING(f"{app_conf.label}: no en_GB PO"))
continue
@ -162,9 +160,9 @@ class Command(BaseCommand):
entries = [e for e in en_po if e.msgid and not e.obsolete]
source_map = {e.msgid: e.msgstr for e in entries}
tgt_dir = os.path.join(app_conf.path, 'locale', target_lang.replace('-', '_'), 'LC_MESSAGES')
tgt_dir = os.path.join(app_conf.path, "locale", target_lang.replace("-", "_"), "LC_MESSAGES")
os.makedirs(tgt_dir, exist_ok=True)
tgt_path = os.path.join(tgt_dir, 'django.po')
tgt_path = os.path.join(tgt_dir, "django.po")
old_tgt = None
if os.path.exists(tgt_path):
@ -176,19 +174,21 @@ class Command(BaseCommand):
new_po = polib.POFile()
new_po.metadata = en_po.metadata.copy()
new_po.metadata['Language'] = target_lang
new_po.metadata["Language"] = target_lang
for e in entries:
prev = old_tgt.find(e.msgid) if old_tgt else None
new_po.append(polib.POEntry(
new_po.append(
polib.POEntry(
msgid=e.msgid,
msgstr=prev.msgstr if prev and prev.msgstr else '',
msgstr=prev.msgstr if prev and prev.msgstr else "",
msgctxt=e.msgctxt,
comment=e.comment,
tcomment=e.tcomment,
occurrences=e.occurrences,
flags=e.flags,
))
)
)
to_trans = [e for e in new_po if not e.msgstr]
if not to_trans:
@ -204,22 +204,22 @@ class Command(BaseCommand):
maps.append(p_map)
data = [
('auth_key', auth_key),
('target_lang', api_code),
] + [('text', t) for t in protected]
resp = requests.post('https://api.deepl.com/v2/translate', data=data)
("auth_key", auth_key),
("target_lang", api_code),
] + [("text", t) for t in protected]
resp = requests.post("https://api.deepl.com/v2/translate", data=data)
try:
resp.raise_for_status()
result = resp.json()
except Exception as exc:
raise CommandError(f"DeepL error: {exc} {resp.text}")
trans = result.get('translations', [])
trans = result.get("translations", [])
if len(trans) != len(to_trans):
raise CommandError(f"Got {len(trans)} translations, expected {len(to_trans)}")
for e, obj, pmap in zip(to_trans, trans, maps):
e.msgstr = deplaceholderize(obj['text'], pmap)
e.msgstr = deplaceholderize(obj["text"], pmap)
new_po.save(tgt_path)
self.stdout.write(self.style.SUCCESS(f"Saved {tgt_path}"))

View file

@ -2,6 +2,7 @@ import logging
from django.core.management.base import BaseCommand
from django.db import transaction
from core.models import Product
logger = logging.getLogger(__name__)

View file

@ -13,6 +13,7 @@ PLACEHOLDER_REGEXES = [
re.compile(r"%\([^)]+\)[sd]"), # %(verbose_name)s, %(count)d
]
def extract_placeholders(text: str) -> set[str]:
"""
Extract all placeholders from given text.
@ -33,7 +34,7 @@ def load_po_sanitized(path: str) -> polib.POFile:
except Exception:
# read raw text
try:
with open(path, encoding='utf-8') as f:
with open(path, encoding="utf-8") as f:
text = f.read()
except OSError as e:
raise CommandError(f"{path}: cannot read file ({e})")
@ -41,10 +42,10 @@ def load_po_sanitized(path: str) -> polib.POFile:
text = re.sub(r"^#,(?!\s)", "#, ", text, flags=re.MULTILINE)
parts = text.split("\n\n", 1)
header = parts[0]
rest = parts[1] if len(parts) > 1 else ''
rest = re.sub(r"^msgid \"\"\s*\nmsgstr \"\"\s*\n?", '', rest, flags=re.MULTILINE)
rest = parts[1] if len(parts) > 1 else ""
rest = re.sub(r"^msgid \"\"\s*\nmsgstr \"\"\s*\n?", "", rest, flags=re.MULTILINE)
sanitized = header + "\n\n" + rest
tmp = NamedTemporaryFile(mode='w+', delete=False, suffix='.po', encoding='utf-8') # noqa: SIM115
tmp = NamedTemporaryFile(mode="w+", delete=False, suffix=".po", encoding="utf-8") # noqa: SIM115
try:
tmp.write(sanitized)
tmp.flush()
@ -56,40 +57,42 @@ def load_po_sanitized(path: str) -> polib.POFile:
with contextlib.suppress(OSError):
os.unlink(tmp.name)
class Command(BaseCommand):
help = (
"Scan target-language .po files and report any placeholder mismatches, grouped by app."
)
help = "Scan target-language .po files and report any placeholder mismatches, grouped by app."
def add_arguments(self, parser):
parser.add_argument(
'-l', '--language',
dest='target_languages',
action='append',
"-l",
"--language",
dest="target_languages",
action="append",
required=True,
metavar='LANG',
help='Locale code(s) to scan, e.g. de-DE, fr-FR'
metavar="LANG",
help="Locale code(s) to scan, e.g. de-DE, fr-FR",
)
parser.add_argument(
'-a', '--app',
dest='target_apps',
action='append',
"-a",
"--app",
dest="target_apps",
action="append",
required=True,
metavar='APP',
help='App label(s) to scan, e.g. core, payments'
metavar="APP",
help="App label(s) to scan, e.g. core, payments",
)
parser.add_argument(
'-p', '--path',
dest='root_path',
"-p",
"--path",
dest="root_path",
required=False,
metavar='ROOT_PATH',
help='Root path prefix to adjust file links'
metavar="ROOT_PATH",
help="Root path prefix to adjust file links",
)
def handle(self, *args, **options) -> None:
langs: list[str] = options['target_languages']
apps_to_scan: set[str] = set(options['target_apps'])
root_path: str = options.get('root_path') or '/app/'
langs: list[str] = options["target_languages"]
apps_to_scan: set[str] = set(options["target_apps"])
root_path: str = options.get("root_path") or "/app/"
for app_conf in apps.get_app_configs():
if app_conf.label not in apps_to_scan:
@ -99,10 +102,8 @@ class Command(BaseCommand):
app_issues: list[str] = []
for lang in langs:
loc = lang.replace('-', '_')
po_path = os.path.join(
app_conf.path, 'locale', loc, 'LC_MESSAGES', 'django.po'
)
loc = lang.replace("-", "_")
po_path = os.path.join(app_conf.path, "locale", loc, "LC_MESSAGES", "django.po")
if not os.path.exists(po_path):
continue
@ -121,13 +122,11 @@ class Command(BaseCommand):
missing = src_ph - dst_ph
extra = dst_ph - src_ph
if missing or extra:
line_no = entry.linenum or '?'
display = po_path.replace('/app/', root_path)
if '\\' in root_path:
display = display.replace('/', '\\')
lang_issues.append(
f" {display}:{line_no}: missing={sorted(missing)} extra={sorted(extra)}"
)
line_no = entry.linenum or "?"
display = po_path.replace("/app/", root_path)
if "\\" in root_path:
display = display.replace("/", "\\")
lang_issues.append(f" {display}:{line_no}: missing={sorted(missing)} extra={sorted(extra)}")
if lang_issues:
# Header for language with issues
@ -142,9 +141,7 @@ class Command(BaseCommand):
self.stdout.write("")
else:
# No issues in any language for this app
self.stdout.write(
self.style.SUCCESS(f"App {app_conf.label} has no placeholder issues.")
)
self.stdout.write(self.style.SUCCESS(f"App {app_conf.label} has no placeholder issues."))
self.stdout.write("")
self.stdout.write(self.style.SUCCESS("Done scanning."))

View file

@ -37,18 +37,16 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
"-t", "--target", required=True,
help=(
"Dotted path to the field to translate, "
"e.g. core.models.Product.description"
),
"-t",
"--target",
required=True,
help=("Dotted path to the field to translate, e.g. core.models.Product.description"),
)
parser.add_argument(
"-l", "--language", required=True,
help=(
"Modeltranslation language code to translate into, "
"e.g. de-de, fr-fr, zh-hans"
),
"-l",
"--language",
required=True,
help=("Modeltranslation language code to translate into, e.g. de-de, fr-fr, zh-hans"),
)
def handle(self, *args, **options):
@ -87,8 +85,7 @@ class Command(BaseCommand):
if not auth_key:
raise CommandError("Environment variable DEEPL_AUTH_KEY is not set.")
qs = model.objects.exclude(**{f"{field_name}__isnull": True}) \
.exclude(**{f"{field_name}": ""})
qs = model.objects.exclude(**{f"{field_name}__isnull": True}).exclude(**{f"{field_name}": ""})
total = qs.count()
if total == 0:
self.stdout.write("No instances with non-empty source field found.")
@ -113,9 +110,7 @@ class Command(BaseCommand):
timeout=30,
)
if resp.status_code != 200:
self.stderr.write(
f"DeepL API error for {obj.pk}: {resp.status_code} {resp.text}"
)
self.stderr.write(f"DeepL API error for {obj.pk}: {resp.status_code} {resp.text}")
continue
data = resp.json()

View file

@ -18,9 +18,9 @@ class AddressManager(models.Manager):
# Query Nominatim
params = {
'format': 'json',
'addressdetails': 1,
'q': raw_data,
"format": "json",
"addressdetails": 1,
"q": raw_data,
}
resp = requests.get(config.NOMINATIM_URL, params=params)
resp.raise_for_status()
@ -30,18 +30,18 @@ class AddressManager(models.Manager):
data = results[0]
# Parse address components
addr = data.get('address', {})
street = addr.get('road') or addr.get('pedestrian') or ''
district = addr.get('city_district') or addr.get('suburb') or ''
city = addr.get('city') or addr.get('town') or addr.get('village') or ''
region = addr.get('state') or addr.get('region') or ''
postal_code = addr.get('postcode') or ''
country = addr.get('country') or ''
addr = data.get("address", {})
street = addr.get("road") or addr.get("pedestrian") or ""
district = addr.get("city_district") or addr.get("suburb") or ""
city = addr.get("city") or addr.get("town") or addr.get("village") or ""
region = addr.get("state") or addr.get("region") or ""
postal_code = addr.get("postcode") or ""
country = addr.get("country") or ""
# Parse location
try:
lat = float(data.get('lat'))
lon = float(data.get('lon'))
lat = float(data.get("lat"))
lon = float(data.get("lon"))
location = Point(lon, lat, srid=4326)
except (TypeError, ValueError):
location = None
@ -57,5 +57,5 @@ class AddressManager(models.Manager):
country=country,
location=location,
api_response=data,
**kwargs
**kwargs,
)

View file

@ -635,7 +635,6 @@ class Order(NiceModel):
return promocode.use(self)
def apply_addresses(self, billing_address_uuid, shipping_address_uuid):
try:
if not any([shipping_address_uuid, billing_address_uuid]):
if self.is_whole_digital:
@ -663,8 +662,13 @@ class Order(NiceModel):
raise Http404(_("address does not exist"))
def buy(
self, force_balance: bool = False, force_payment: bool = False, promocode_uuid: str | None = None,
billing_address: str | None = None, shipping_address: str | None = None, **kwargs
self,
force_balance: bool = False,
force_payment: bool = False,
promocode_uuid: str | None = None,
billing_address: str | None = None,
shipping_address: str | None = None,
**kwargs,
) -> Self | Transaction | None:
if config.DISABLED_COMMERCE:
raise DisabledCommerceError(_("you can not buy at this moment, please try again in a few minutes"))
@ -1022,7 +1026,6 @@ class PromoCode(NiceModel):
return "percent"
def use(self, order: Order) -> float:
if self.used_on:
raise ValueError(_("promocode already used"))
@ -1251,31 +1254,14 @@ class Address(NiceModel):
country = CharField(_("country"), max_length=40, null=True) # noqa: DJ001
location = PointField(
geography=True,
srid=4326,
null=True,
blank=True,
help_text=_("geolocation point: (longitude, latitude)")
geography=True, srid=4326, null=True, blank=True, help_text=_("geolocation point: (longitude, latitude)")
)
raw_data = JSONField(
blank=True,
null=True,
help_text=_("full JSON response from geocoder for this address")
)
raw_data = JSONField(blank=True, null=True, help_text=_("full JSON response from geocoder for this address"))
api_response = JSONField(
blank=True,
null=True,
help_text=_("stored JSON response from the geocoding service")
)
api_response = JSONField(blank=True, null=True, help_text=_("stored JSON response from the geocoding service"))
user = ForeignKey(
to="vibes_auth.User",
on_delete=CASCADE,
blank=True,
null=True
)
user = ForeignKey(to="vibes_auth.User", on_delete=CASCADE, blank=True, null=True)
objects = AddressManager()

View file

@ -25,29 +25,34 @@ class EvibesPermission(permissions.BasePermission):
"""
ACTION_PERM_MAP = {
'retrieve': 'view',
'list': 'view',
'create': 'add',
'update': 'change',
'partial_update': 'change',
'destroy': 'delete',
"retrieve": "view",
"list": "view",
"create": "add",
"update": "change",
"partial_update": "change",
"destroy": "delete",
}
USER_SCOPED_ACTIONS = {
'buy', 'buy_unregistered', 'current',
'add_order_product', 'remove_order_product',
'add_wishlist_product', 'remove_wishlist_product',
'bulk_add_wishlist_products', 'bulk_remove_wishlist_products',
'autocomplete',
"buy",
"buy_unregistered",
"current",
"add_order_product",
"remove_order_product",
"add_wishlist_product",
"remove_wishlist_product",
"bulk_add_wishlist_products",
"bulk_remove_wishlist_products",
"autocomplete",
}
def has_permission(self, request, view):
action = getattr(view, 'action', None)
action = getattr(view, "action", None)
model = view.queryset.model
app_label = model._meta.app_label
model_name = model._meta.model_name
if action == 'create' and view.additional.get('create') == 'ALLOW':
if action == "create" and view.additional.get("create") == "ALLOW":
return True
if action in self.USER_SCOPED_ACTIONS:
@ -59,7 +64,7 @@ class EvibesPermission(permissions.BasePermission):
if request.user.has_perm(f"{app_label}.{codename}"):
return True
return bool(action in ('list', 'retrieve') and getattr(model, 'is_publicly_visible', False))
return bool(action in ("list", "retrieve") and getattr(model, "is_publicly_visible", False))
def has_queryset_permission(self, request, view, queryset):
"""
@ -73,7 +78,7 @@ class EvibesPermission(permissions.BasePermission):
if view.action in self.USER_SCOPED_ACTIONS:
return queryset.filter(user=request.user)
if view.action in ('list', 'retrieve'):
if view.action in ("list", "retrieve"):
if request.user.has_perm(f"{app_label}.view_{model_name}"):
if request.user.is_staff:
return queryset
@ -81,10 +86,15 @@ class EvibesPermission(permissions.BasePermission):
return queryset.none()
base = queryset.filter(is_active=True)
if view.action in ('update', 'partial_update'):
match view.action:
case "update":
if request.user.has_perm(f"{app_label}.change_{model_name}"):
return base
if view.action == 'destroy':
case "partial_update":
if request.user.has_perm(f"{app_label}.change_{model_name}"):
return base
case "destroy":
if request.user.has_perm(f"{app_label}.delete_{model_name}"):
return base
return queryset.none()

View file

@ -106,15 +106,8 @@ class BuyAsBusinessOrderSerializer(Serializer):
class AddressAutocompleteInputSerializer(Serializer):
q = CharField(
required=True
)
limit = IntegerField(
required=False,
min_value=1,
max_value=10,
default=5
)
q = CharField(required=True)
limit = IntegerField(required=False, min_value=1, max_value=10, default=5)
class AddressSuggestionSerializer(Serializer):

View file

@ -12,11 +12,15 @@ class ProductSitemap(Sitemap):
limit = 40000
def items(self):
return Product.objects.filter(
return (
Product.objects.filter(
is_active=True,
brand__is_active=True,
category__is_active=True,
).only("uuid", "name", "modified", "slug").order_by("-modified")
)
.only("uuid", "name", "modified", "slug")
.order_by("-modified")
)
def lastmod(self, obj):
return obj.modified

View file

@ -209,8 +209,9 @@ def process_promotions() -> tuple[bool, str]:
product = eligible_products.order_by("?").first()
selected_products.append(product)
promotion = Promotion.objects.update_or_create(name=promotion_name,
defaults={"discount_percent": discount_percent, "is_active": True})
promotion = Promotion.objects.update_or_create(
name=promotion_name, defaults={"discount_percent": discount_percent, "is_active": True}
)
promotion.products.set(selected_products)

View file

@ -7,7 +7,7 @@ register = template.Library()
def attributes_length(value, arg):
"""Returns True if the value length is more than the argument."""
if isinstance(value, dict):
count = int()
count = 0
for attribute, _value in value.items():
if attribute.endswith("_system"):
continue

View file

@ -21,7 +21,7 @@ class AttributeValueOptions(TranslationOptions):
@register(Brand)
class BrandTranslationOptions(TranslationOptions):
fields = ("description", )
fields = ("description",)
@register(Category)

View file

@ -128,8 +128,10 @@ def resolve_translations_for_elasticsearch(instance, field_name):
if not field:
setattr(instance, f"{field_name}_{LANGUAGE_CODE}", filled_field)
CROCKFORD = "ABCDEFGHJKLMNPQRSTUVWXYZ23456789"
def generate_human_readable_id(length: int = 6) -> str:
"""
Generate a human-readable ID of `length` characters (from the Crockford set),

View file

@ -5,19 +5,16 @@ from constance import config
from django.utils.translation import gettext as _
def fetch_address_suggestions(
query: str,
limit: int = 5
) -> List[Dict]:
def fetch_address_suggestions(query: str, limit: int = 5) -> List[Dict]:
if not config.NOMINATIM_URL:
raise ValueError(_("NOMINATIM_URL must be configured."))
url = config.NOMINATIM_URL.rstrip('/') + '/search'
url = config.NOMINATIM_URL.rstrip("/") + "/search"
params = {
'format': 'json',
'addressdetails': 1,
'q': query,
'limit': limit,
"format": "json",
"addressdetails": 1,
"q": query,
"limit": limit,
}
response = requests.get(url, params=params)
response.raise_for_status()
@ -25,10 +22,12 @@ def fetch_address_suggestions(
suggestions = []
for item in results:
suggestions.append({
'display_name': item.get('display_name'),
'lat': item.get('lat'),
'lon': item.get('lon'),
'address': item.get('address', {}),
})
suggestions.append(
{
"display_name": item.get("display_name"),
"lat": item.get("lat"),
"lon": item.get("lon"),
"address": item.get("address", {}),
}
)
return suggestions

View file

@ -176,7 +176,7 @@ class AbstractVendor:
def round_price_marketologically(price: float) -> float:
up_int = ceil(price)
s = str(up_int)
s = (s[:-1] if len(s) > 1 else '0') + '9'
s = (s[:-1] if len(s) > 1 else "0") + "9"
return float(f"{int(s):.2f}")
def get_vendor_instance(self):
@ -214,7 +214,6 @@ class AbstractVendor:
self.get_attribute_values_queryset().delete()
def process_attribute(self, key: str, value, product: Product, attr_group: AttributeGroup):
if not value:
return

View file

@ -169,8 +169,8 @@ class ProductViewSet(EvibesViewSet):
action_serializer_classes = {
"list": ProductSimpleSerializer,
}
lookup_field = 'lookup'
lookup_url_kwarg = 'lookup'
lookup_field = "lookup"
lookup_url_kwarg = "lookup"
def get_object(self):
queryset = self.filter_queryset(self.get_queryset())
@ -476,9 +476,9 @@ class AddressViewSet(EvibesViewSet):
additional = {"create": "ALLOW"}
def get_serializer_class(self):
if self.action == 'create':
if self.action == "create":
return AddressCreateSerializer
if self.action == 'autocomplete':
if self.action == "autocomplete":
return AddressAutocompleteInputSerializer
return AddressSerializer

View file

@ -28,7 +28,7 @@ class CustomCommonMiddleware(CommonMiddleware):
class CustomLocaleMiddleware(LocaleMiddleware):
def process_request(self, request):
lang = translation.get_language_from_request(request)
parts = lang.replace('_', '-').split('-')
parts = lang.replace("_", "-").split("-")
if len(parts) == 2:
lang_code = parts[0].lower()
region = parts[1].upper()

View file

@ -304,17 +304,18 @@ if getenv("SENTRY_DSN"):
]
if DEBUG:
ignore_errors.extend(["billiard.exceptions.WorkerLostError",
"billiard.exceptions.TimeLimitExceeded"])
ignore_errors.extend(["billiard.exceptions.WorkerLostError", "billiard.exceptions.TimeLimitExceeded"])
sentry_sdk.init(
dsn=getenv("SENTRY_DSN"),
traces_sample_rate=1.0 if DEBUG else 0.2,
profiles_sample_rate=1.0 if DEBUG else 0.1,
integrations=[DjangoIntegration(), LoggingIntegration(
level=logging.INFO,
event_level=logging.ERROR
), CeleryIntegration(), RedisIntegration()],
integrations=[
DjangoIntegration(),
LoggingIntegration(level=logging.INFO, event_level=logging.ERROR),
CeleryIntegration(),
RedisIntegration(),
],
environment="development" if DEBUG else "production",
debug=DEBUG,
release=f"evibes@{EVIBES_VERSION}",
@ -327,7 +328,7 @@ LANGUAGE_COOKIE_HTTPONLY = True
DATA_UPLOAD_MAX_NUMBER_FIELDS = 8888
ADMINS = [('Egor Gorbunov', 'contact@fureunoir.com')]
ADMINS = [("Egor Gorbunov", "contact@fureunoir.com")]
STORAGES = {
"default": {
@ -338,5 +339,5 @@ STORAGES = {
},
"dbbackup": {
"BACKEND": "django.core.files.storage.FileSystemStorage",
}
},
}

View file

@ -41,5 +41,5 @@ EXPOSABLE_KEYS = [
"EMAIL_HOST_USER",
"EMAIL_FROM",
"PAYMENT_GATEWAY_MINIMUM",
"PAYMENT_GATEWAY_MAXIMUM"
"PAYMENT_GATEWAY_MAXIMUM",
]

View file

@ -1,10 +1,10 @@
from evibes.settings.base import getenv # noqa: I001
from evibes.settings.base import getenv
DBBACKUP_CONNECTORS = {
'default': {
'SINGLE_TRANSACTION': False,
'IF_EXISTS': True,
'RESTORE_SUFFIX': '--set ON_ERROR_STOP=off',
"default": {
"SINGLE_TRANSACTION": False,
"IF_EXISTS": True,
"RESTORE_SUFFIX": "--set ON_ERROR_STOP=off",
}
}

View file

@ -94,7 +94,7 @@ Current API version: {EVIBES_VERSION}
SPECTACULAR_PLATFORM_SETTINGS = {
"TITLE": f"{CONSTANCE_CONFIG.get('PROJECT_NAME')[0]} API",
"DESCRIPTION": SPECTACULAR_PLATFORM_DESCRIPTION,
"VERSION": EVIBES_VERSION,
"VERSION": EVIBES_VERSION, # noqa: F405
"TOS": "https://wiseless.xyz/evibes/terms-of-service",
"SWAGGER_UI_DIST": "SIDECAR",
"CAMELIZE_NAMES": True,
@ -145,7 +145,7 @@ SPECTACULAR_PLATFORM_SETTINGS = {
SPECTACULAR_B2B_SETTINGS = {
"TITLE": f"{CONSTANCE_CONFIG.get('PROJECT_NAME')[0]} API",
"DESCRIPTION": SPECTACULAR_B2B_DESCRIPTION,
"VERSION": EVIBES_VERSION,
"VERSION": EVIBES_VERSION, # noqa: F405
"TOS": "https://wiseless.xyz/evibes/terms-of-service",
"SWAGGER_UI_DIST": "SIDECAR",
"CAMELIZE_NAMES": True,

View file

@ -1,11 +1,11 @@
from evibes.settings.base import * # noqa: F403
ELASTICSEARCH_DSL = {
'default': {
'hosts': ['http://elasticsearch:9200'],
'basic_auth': ('elastic', getenv("ELASTIC_PASSWORD")), # noqa: F405
'verify_certs': False,
'ssl_show_warn': False,
"default": {
"hosts": ["http://elasticsearch:9200"],
"basic_auth": ("elastic", getenv("ELASTIC_PASSWORD")), # noqa: F405
"verify_certs": False,
"ssl_show_warn": False,
},
}

View file

@ -73,12 +73,12 @@ LOGGING = {
},
"celery.app.trace": {
"handlers": ["console_debug" if DEBUG else "console_production"], # noqa: F405
"level": "DEBUG" if DEBUG else "INFO",
"level": "DEBUG" if DEBUG else "INFO", # noqa: F405
"propagate": False,
},
"celery.worker.strategy": {
"handlers": ["console_debug" if DEBUG else "console_production"], # noqa: F405
"level": "DEBUG" if DEBUG else "INFO",
"level": "DEBUG" if DEBUG else "INFO", # noqa: F405
"propagate": False,
},
"elastic_transport.transport": {

View file

@ -31,10 +31,7 @@ USER_SCHEMA = {
"reset_password": extend_schema(
summary=_("reset a user's password by sending a reset password email"),
request=ResetPasswordSerializer,
responses={
status.HTTP_200_OK: {},
**BASE_ERRORS
},
responses={status.HTTP_200_OK: {}, **BASE_ERRORS},
),
"upload_avatar": extend_schema(
summary=_("handle avatar upload for a user"),
@ -42,7 +39,7 @@ USER_SCHEMA = {
status.HTTP_200_OK: UserSerializer,
status.HTTP_400_BAD_REQUEST: {"description": "Invalid Request"},
status.HTTP_403_FORBIDDEN: {"description": "Bad credentials"},
**BASE_ERRORS
**BASE_ERRORS,
},
),
"confirm_password_reset": extend_schema(
@ -51,7 +48,7 @@ USER_SCHEMA = {
responses={
status.HTTP_200_OK: {"description": "Password reset successfully"},
status.HTTP_400_BAD_REQUEST: {"description": _("passwords do not match")},
**BASE_ERRORS
**BASE_ERRORS,
},
),
"activate": extend_schema(
@ -60,7 +57,7 @@ USER_SCHEMA = {
responses={
status.HTTP_200_OK: UserSerializer,
status.HTTP_400_BAD_REQUEST: {"description": _("activation link is invalid or account already activated")},
**BASE_ERRORS
**BASE_ERRORS,
},
),
}

View file

@ -109,11 +109,7 @@ class UserType(DjangoObjectType):
products_by_uuid = {str(p.uuid): p for p in qs}
ordered_products = [
products_by_uuid[u]
for u in uuid_list
if u in products_by_uuid
]
ordered_products = [products_by_uuid[u] for u in uuid_list if u in products_by_uuid]
return connection_from_array(ordered_products, kwargs)

View file

@ -26,6 +26,4 @@ def send_user_verification_email(instance, **kwargs):
if old.email != instance.email:
instance.is_active = False
transaction.on_commit(
lambda: send_verification_email_task.delay(instance.pk)
)
transaction.on_commit(lambda: send_verification_email_task.delay(instance.pk))