schon/engine/core/management/commands/demo_data.py
Egor fureunoir Gorbunov 79be6ed4e4 feat(demo_data): use translation override to ensure consistent locale
wraps actions in a `with override("en")` block to enforce the use of the English locale during execution. This ensures consistent behavior and message formatting regardless of the server's default language settings.
2026-02-27 19:02:55 +03:00

727 lines
26 KiB
Python

import json
import random
import shutil
from datetime import timedelta
from pathlib import Path
from typing import Any
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
from django.db import transaction
from django.utils import timezone
from django.utils.translation import override
from engine.blog.models import Post, PostTag
from engine.core.models import (
Address,
Attribute,
AttributeGroup,
AttributeValue,
Brand,
Category,
CategoryTag,
Order,
OrderProduct,
Product,
ProductImage,
ProductTag,
Stock,
Vendor,
Wishlist,
)
from engine.payments.models import Balance
from engine.vibes_auth.models import Group, User
DEMO_EMAIL_DOMAIN = "wiseless.xyz"
DEMO_IMAGES_DIR = Path(settings.BASE_DIR) / "engine/core/fixtures/demo_products_images"
DEMO_BLOG_DIR = Path(settings.BASE_DIR) / "engine/core/fixtures/demo_blog_posts"
class Command(BaseCommand):
help = "Install or remove demo fixtures for Schon"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.demo_data: dict[str, Any] = {}
def add_arguments(self, parser):
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
install_parser = subparsers.add_parser("install", help="Install demo fixtures")
install_parser.add_argument(
"--users",
type=int,
default=50,
help="Number of demo users to create (default: 50)",
)
install_parser.add_argument(
"--orders",
type=int,
default=100,
help="Number of demo orders to create (default: 100)",
)
install_parser.add_argument(
"--days",
type=int,
default=30,
help="Number of days to spread orders over (default: 30)",
)
install_parser.add_argument(
"--skip-products",
action="store_true",
help="Skip creating gem products (use if already created)",
)
subparsers.add_parser("remove", help="Remove all demo fixtures")
def handle(self, *args: list[Any], **options: dict[str, Any]) -> None:
action = options.get("action")
if not action:
self.stdout.write(
self.style.ERROR("Please specify an action: install or remove")
)
self.stdout.write("Usage: python manage.py demo_data install|remove")
return
self._load_demo_data()
with override("en"):
if action == "install":
self._install(options)
elif action == "remove":
self._remove()
else:
self.stdout.write(self.style.ERROR(f"Unknown action: {action}"))
@property
def staff_user(self):
user, _ = User.objects.get_or_create(
email=f"staff@{DEMO_EMAIL_DOMAIN}",
first_name="Alice",
last_name="Schon",
is_staff=True,
is_active=True,
is_verified=True,
)
if _:
user.set_password("Staff!Demo888")
user.save()
if not user.groups.filter(name="E-Commerce Admin").exists():
user.groups.add(Group.objects.get(name="E-Commerce Admin"))
return user
@property
def super_user(self):
user, _ = User.objects.get_or_create(
email=f"super@{DEMO_EMAIL_DOMAIN}",
first_name="Bob",
last_name="Schon",
is_superuser=True,
is_staff=True,
is_active=True,
is_verified=True,
)
if _:
user.set_password("Super!Demo888")
user.save()
return user
def _load_demo_data(self) -> None:
fixture_path = Path(settings.BASE_DIR) / "engine/core/fixtures/demo.json"
with open(fixture_path, encoding="utf-8") as f:
self.demo_data = json.load(f)
def _install(self, options: dict[str, Any]) -> None:
num_users = options["users"]
num_orders = options["orders"]
num_days = options["days"]
skip_products = options.get("skip_products", False)
self.stdout.write(self.style.NOTICE("Starting demo fixture installation..."))
if not skip_products:
self.stdout.write("Creating gem products with translations...")
self._create_gem_products()
self.stdout.write(self.style.SUCCESS("Gem products created!"))
self.stdout.write(f"Creating {num_users} demo users...")
users = self._create_demo_users(num_users)
self.stdout.write(self.style.SUCCESS(f"Created {len(users)} users"))
products = list(Product.objects.filter(is_active=True))
if not products:
self.stdout.write(self.style.ERROR("No products found!"))
return
self.stdout.write(f"Creating {num_orders} orders over {num_days} days...")
orders, refunded_count = self._create_demo_orders(
users, products, num_orders, num_days
)
self.stdout.write(
self.style.SUCCESS(
f"Created {len(orders)} orders ({refunded_count} refunded)"
)
)
self.stdout.write("Creating wishlists for demo users...")
wishlist_count = self._create_demo_wishlists(users, products)
self.stdout.write(self.style.SUCCESS(f"Created {wishlist_count} wishlists"))
self.stdout.write("Creating blog posts...")
blog_count = self._create_blog_posts()
self.stdout.write(self.style.SUCCESS(f"Created {blog_count} blog posts"))
self.stdout.write(
self.style.SUCCESS(f"Created staff {self.staff_user.email} user")
)
self.stdout.write(
self.style.SUCCESS(f"Created super {self.super_user.email} user")
)
self._print_summary(users, orders, refunded_count, num_days)
def _remove(self) -> None:
self.stdout.write(self.style.WARNING("Removing demo fixtures..."))
with transaction.atomic():
demo_users = User.objects.filter(email__endswith=f"@{DEMO_EMAIL_DOMAIN}")
user_count = demo_users.count()
orders = Order.objects.filter(user__in=demo_users)
order_count = orders.count()
OrderProduct.objects.filter(order__in=orders).delete()
orders.delete()
Address.objects.filter(user__in=demo_users).delete()
Balance.objects.filter(user__in=demo_users).delete()
Wishlist.objects.filter(user__in=demo_users).delete()
post_titles = [p["title"] for p in self.demo_data.get("blog_posts", [])]
blog_count = Post.objects.filter(title__in=post_titles).delete()[0]
self.stdout.write(f" Removed blog posts: {blog_count}")
post_tag_names = [
t["tag_name"] for t in self.demo_data.get("post_tags", [])
]
PostTag.objects.filter(tag_name__in=post_tag_names).delete()
demo_users.delete()
demo_vendor_name = self.demo_data["vendor"]["name"]
try:
vendor = Vendor.objects.get(name=demo_vendor_name)
Stock.objects.filter(vendor=vendor).delete()
vendor.delete()
self.stdout.write(f" Removed vendor: {demo_vendor_name}")
except Vendor.DoesNotExist:
pass
partnumbers = [p["partnumber"] for p in self.demo_data["products"]]
products = Product.objects.filter(partnumber__in=partnumbers)
product_count = products.count()
for product in products:
for image in product.images.all():
if image.image:
image.image.delete(save=False)
image.delete()
product_dir: Path = settings.MEDIA_ROOT / "products" / str(product.uuid)
if product_dir.exists() and not any(product_dir.iterdir()):
shutil.rmtree(product_dir, ignore_errors=True)
# Delete OrderProducts referencing demo products (from non-demo users)
# to avoid ProtectedError since OrderProduct.product uses PROTECT
OrderProduct.objects.filter(product__in=products).delete()
products.delete()
brand_names = [b["name"] for b in self.demo_data["brands"]]
Brand.objects.filter(name__in=brand_names).delete()
for cat_data in reversed(self.demo_data["categories"]):
Category.objects.filter(name=cat_data["name"]).delete()
tag_names = [t["tag_name"] for t in self.demo_data["category_tags"]]
CategoryTag.objects.filter(tag_name__in=tag_names).delete()
tag_names = [t["tag_name"] for t in self.demo_data["product_tags"]]
ProductTag.objects.filter(tag_name__in=tag_names).delete()
group_names = [g["name"] for g in self.demo_data["attribute_groups"]]
Attribute.objects.filter(group__name__in=group_names).delete()
AttributeGroup.objects.filter(name__in=group_names).delete()
self.staff_user.delete()
self.super_user.delete()
self.stdout.write("")
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(self.style.SUCCESS("Demo fixtures removed successfully!"))
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(f" Users removed: {user_count}")
self.stdout.write(f" Orders removed: {order_count}")
self.stdout.write(f" Products removed: {product_count}")
def _print_summary(
self, users: list, orders: list, refunded_count: int, num_days: int
) -> None:
password = self.demo_data["demo_users"]["password"]
self.stdout.write("")
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(self.style.SUCCESS("Demo fixtures installed successfully!"))
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(f" Products: {Product.objects.count()}")
self.stdout.write(f" Categories: {Category.objects.count()}")
self.stdout.write(f" Brands: {Brand.objects.count()}")
self.stdout.write(f" Blog posts: {Post.objects.count()}")
self.stdout.write(f" Users created: {len(users)}")
self.stdout.write(f" Orders created: {len(orders)}")
self.stdout.write(f" Refunded orders: {refunded_count}")
self.stdout.write(f" Date range: {num_days} days")
self.stdout.write(f" Demo password: {password}")
self.stdout.write("")
self.stdout.write("Sample demo accounts:")
for user in users[:5]:
self.stdout.write(f" - {user.email}")
@transaction.atomic
def _create_gem_products(self) -> None:
data = self.demo_data
vendor, _ = Vendor.objects.get_or_create(
name=data["vendor"]["name"],
defaults={"markup_percent": data["vendor"]["markup_percent"]},
)
for tag_data in data["category_tags"]:
tag, created = CategoryTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
for tag_data in data["product_tags"]:
tag, created = ProductTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
attr_groups = {}
for group_data in data["attribute_groups"]:
group, created = AttributeGroup.objects.get_or_create(
name=group_data["name"]
)
if created and "name_ru" in group_data:
group.name_ru_ru = group_data["name_ru"]
group.save()
attr_groups[group_data["name"]] = group
for attr_data in data["attributes"]:
group = attr_groups.get(attr_data["group"])
if group:
attr, created = Attribute.objects.get_or_create(
group=group,
name=attr_data["name"],
defaults={
"value_type": attr_data["value_type"],
"is_filterable": attr_data["is_filterable"],
},
)
if created and "name_ru" in attr_data:
attr.name_ru_ru = attr_data["name_ru"]
attr.save()
attr_lookup = {}
for attr_data in data["attributes"]:
group = attr_groups.get(attr_data["group"])
if group:
try:
attr_lookup[attr_data["name"]] = Attribute.objects.get(
group=group, name=attr_data["name"]
)
except Attribute.DoesNotExist:
pass
brands = {}
for brand_data in data["brands"]:
brand, created = Brand.objects.get_or_create(
name=brand_data["name"],
defaults={"description": brand_data["description"]},
)
if created and "description_ru" in brand_data:
brand.description_ru_ru = brand_data["description_ru"]
brand.save()
brands[brand_data["name"]] = brand
categories = {}
for cat_data in data["categories"]:
parent = categories.get(cat_data["parent"]) if cat_data["parent"] else None
category, created = Category.objects.get_or_create(
name=cat_data["name"],
defaults={
"description": cat_data["description"],
"parent": parent,
"markup_percent": cat_data["markup_percent"],
},
)
if created:
if "name_ru" in cat_data:
category.name_ru_ru = cat_data["name_ru"]
if "description_ru" in cat_data:
category.description_ru_ru = cat_data["description_ru"]
category.save()
categories[cat_data["name"]] = category
for prod_data in data["products"]:
category = categories.get(prod_data["category"])
brand = brands.get(prod_data["brand"])
if not category:
continue
product, created = Product.objects.get_or_create(
partnumber=prod_data["partnumber"],
defaults={
"name": prod_data["name"],
"description": prod_data["description"],
"category": category,
"brand": brand,
"is_digital": prod_data.get("is_digital", False),
},
)
if created:
if "name_ru" in prod_data:
product.name_ru_ru = prod_data["name_ru"] # ty: ignore[invalid-assignment]
if "description_ru" in prod_data:
product.description_ru_ru = prod_data["description_ru"] # ty: ignore[invalid-assignment]
product.save()
Stock.objects.create(
vendor=vendor,
product=product,
sku=f"GS-{prod_data['partnumber']}",
price=prod_data["price"],
purchase_price=prod_data["purchase_price"],
quantity=prod_data["quantity"],
)
# Add product image
self._add_product_image(product, prod_data["partnumber"])
# Add attribute values
for attr_name, av_data in prod_data.get("attribute_values", {}).items():
attr = attr_lookup.get(attr_name)
if attr:
if isinstance(av_data, dict):
value = str(av_data["en"])
value_ru = av_data.get("ru")
else:
value = str(av_data)
value_ru = None
av, created = AttributeValue.objects.get_or_create(
product=product,
attribute=attr,
defaults={"value": value},
)
if created and value_ru:
av.value_ru_ru = value_ru # ty:ignore[invalid-assignment]
av.save()
def _find_image(self, partnumber: str, suffix: str = "") -> Path | None:
extensions = (".jpg", ".jpeg", ".png", ".webp")
for ext in extensions:
candidate = DEMO_IMAGES_DIR / f"{partnumber}{suffix}{ext}"
if candidate.exists():
return candidate
return None
def _add_product_image(self, product: Product, partnumber: str) -> None:
primary = self._find_image(partnumber)
if not primary:
primary = DEMO_IMAGES_DIR / "placeholder.png"
if not primary.exists():
self.stdout.write(
self.style.WARNING(f" No image found for {partnumber}, skipping...")
)
return
self._save_product_image(product, primary, priority=1)
n = 2
while True:
variant = self._find_image(partnumber, f" ({n})")
if not variant:
break
self._save_product_image(product, variant, priority=n)
n += 1
def _save_product_image(
self, product: Product, image_path: Path, priority: int
) -> None:
with open(image_path, "rb") as f:
image_content = f.read()
product_image = ProductImage(
product=product,
alt=product.name,
priority=priority,
)
product_image.image.save(image_path.name, ContentFile(image_content), save=True)
@transaction.atomic
def _create_demo_users(self, count: int) -> list:
users = []
user_data = self.demo_data["demo_users"]
existing_emails = set(User.objects.values_list("email", flat=True))
first_names = user_data["first_names"]
last_names = user_data["last_names"]
cities = user_data["cities"]
streets = user_data["streets"]
password = user_data["password"]
email_domain = user_data["email_domain"]
for _ in range(count):
first_name = random.choice(first_names)
last_name = random.choice(last_names)
base_email = f"{first_name.lower()}.{last_name.lower()}@{email_domain}"
email = base_email
counter = 1
while email in existing_emails:
email = (
f"{first_name.lower()}.{last_name.lower()}{counter}@{email_domain}"
)
counter += 1
existing_emails.add(email)
# Create user
user = User(
email=email,
first_name=first_name,
last_name=last_name,
is_active=True,
is_verified=True,
)
user.set_password(password)
user.save()
Balance.objects.get_or_create(
user=user,
defaults={"amount": round(random.uniform(100, 5000), 2)},
)
city_data = random.choice(cities)
street_num = random.randint(1, 999)
street = random.choice(streets)
address_line = (
f"{street_num} {street}, {city_data['city']}, "
f"{city_data['region']} {city_data['postal_code']}, {city_data['country']}"
)
address = Address(
user=user,
street=f"{street_num} {street}",
city=city_data["city"],
region=city_data["region"],
postal_code=city_data["postal_code"],
country=city_data["country"],
address_line=address_line,
raw_data=address_line,
)
address.save()
users.append(user)
return users
@transaction.atomic
def _create_demo_orders(
self,
users: list,
products: list,
count: int,
days: int,
) -> tuple[list, int]:
orders = []
refunded_count = 0
now = timezone.now()
refund_target = int(count * 0.08)
refund_indices = set(random.sample(range(count), refund_target))
day_weights = [1 + (i / days) for i in range(days)]
total_weight = sum(day_weights)
day_probabilities = [w / total_weight for w in day_weights]
for i in range(count):
user = random.choice(users)
is_refunded = i in refund_indices
day_offset = random.choices(range(days), weights=day_probabilities)[0]
order_date = now - timedelta(
days=days - 1 - day_offset,
hours=random.randint(0, 23),
minutes=random.randint(0, 59),
)
if is_refunded:
status = "FAILED"
refunded_count += 1
elif day_offset > days * 0.7: # Recent orders
status = random.choice(["CREATED", "DELIVERING", "FINISHED"])
else:
status = "FINISHED"
address = Address.objects.filter(user=user).first()
order = Order.objects.create(
user=user,
status=status,
buy_time=order_date,
billing_address=address,
shipping_address=address,
)
Order.objects.filter(pk=order.pk).update(created=order_date)
num_products = random.randint(1, 4)
order_products = random.sample(products, min(num_products, len(products)))
for product in order_products:
quantity = random.randint(1, 3)
price = product.price if product.price else random.uniform(100, 5000)
if is_refunded:
op_status = "RETURNED"
elif status == "FINISHED":
op_status = "FINISHED"
elif status == "DELIVERING":
op_status = random.choice(["DELIVERING", "DELIVERED"])
else:
op_status = random.choice(["ACCEPTED", "PENDING"])
OrderProduct.objects.create(
order=order,
product=product,
quantity=quantity,
buy_price=round(price, 2),
status=op_status,
)
orders.append(order)
return orders, refunded_count
@transaction.atomic
def _create_demo_wishlists(self, users: list, products: list) -> int:
"""
Ensure exactly 5 products are wishlisted, each by 2-4 random demo users.
"""
if len(products) < 5:
self.stdout.write(
self.style.WARNING(
f"Not enough products ({len(products)}) to create 5 wishlisted items"
)
)
wishlisted_products = products
else:
wishlisted_products = random.sample(products, 5)
if len(users) < 2:
self.stdout.write(
self.style.WARNING("Not enough users to create wishlists")
)
return 0
users_with_wishlists = set()
for product in wishlisted_products:
num_users = random.randint(2, min(4, len(users)))
selected_users = random.sample(users, num_users)
for user in selected_users:
wishlist, _ = Wishlist.objects.get_or_create(user=user)
wishlist.products.add(product)
users_with_wishlists.add(user.id)
return len(users_with_wishlists)
@transaction.atomic
def _create_blog_posts(self) -> int:
data = self.demo_data
author = self.staff_user
count = 0
for tag_data in data.get("post_tags", []):
tag, created = PostTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
for post_data in data.get("blog_posts", []):
if Post.objects.filter(title=post_data["title"]).exists():
continue
content_en = self._load_blog_content(post_data["content_file"], "en")
content_ru = self._load_blog_content(post_data["content_file"], "ru")
if not content_en:
self.stdout.write(
self.style.WARNING(
f" No content found for {post_data['content_file']}, skipping..."
)
)
continue
post = Post(
author=author,
title=post_data["title"],
content=content_en,
meta_description=post_data.get("meta_description", ""),
is_static_page=post_data.get("is_static_page", False),
)
if "title_ru" in post_data:
post.title_ru_ru = post_data["title_ru"] # ty:ignore[unresolved-attribute]
if content_ru:
post.content_ru_ru = content_ru # ty:ignore[unresolved-attribute]
if "meta_description_ru" in post_data:
post.meta_description_ru_ru = post_data["meta_description_ru"] # ty:ignore[unresolved-attribute]
post.save()
for tag_name in post_data.get("tags", []):
try:
tag = PostTag.objects.get(tag_name=tag_name)
post.tags.add(tag)
except PostTag.DoesNotExist:
pass
count += 1
return count
def _load_blog_content(self, content_file: str, lang: str) -> str | None:
file_path = DEMO_BLOG_DIR / f"{content_file}.{lang}.md"
if not file_path.exists():
return None
with open(file_path, encoding="utf-8") as f:
return f.read()