schon/engine/core/management/commands/demo_data.py
Egor fureunoir Gorbunov eef774c3a3 feat(markdown): integrate markdown rendering and editor support
Replace WYSIWYG editor with Markdown editor across all relevant models and admin fields. Add utilities for rendering and stripping markdown. Adjust serializers, views, and templates to support markdown content. Introduce `PastedImage` model and upload endpoint for handling inline image uploads in markdown.

This change simplifies content formatting while enhancing flexibility with markdown support.
2026-02-27 23:36:51 +03:00

741 lines
27 KiB
Python

import json
import random
import shutil
from datetime import timedelta
from pathlib import Path
from typing import Any
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
from django.db import transaction
from django.utils import timezone
from django.utils.translation import override
from engine.blog.models import Post, PostTag
from engine.core.models import (
Address,
Attribute,
AttributeGroup,
AttributeValue,
Brand,
Category,
CategoryTag,
Order,
OrderProduct,
Product,
ProductImage,
ProductTag,
Stock,
Vendor,
Wishlist,
)
from engine.payments.models import Balance
from engine.vibes_auth.models import Group, User
DEMO_EMAIL_DOMAIN = "wiseless.xyz"
DEMO_IMAGES_DIR = Path(settings.BASE_DIR) / "engine/core/fixtures/demo_products_images"
DEMO_BLOG_DIR = Path(settings.BASE_DIR) / "engine/core/fixtures/demo_blog_posts"
class Command(BaseCommand):
help = "Install or remove demo fixtures for Schon"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.demo_data: dict[str, Any] = {}
def add_arguments(self, parser):
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
install_parser = subparsers.add_parser("install", help="Install demo fixtures")
install_parser.add_argument(
"--users",
type=int,
default=50,
help="Number of demo users to create (default: 50)",
)
install_parser.add_argument(
"--orders",
type=int,
default=100,
help="Number of demo orders to create (default: 100)",
)
install_parser.add_argument(
"--days",
type=int,
default=30,
help="Number of days to spread orders over (default: 30)",
)
install_parser.add_argument(
"--skip-products",
action="store_true",
help="Skip creating gem products (use if already created)",
)
subparsers.add_parser("remove", help="Remove all demo fixtures")
def handle(self, *args: list[Any], **options: dict[str, Any]) -> None:
action = options.get("action")
if not action:
self.stdout.write(
self.style.ERROR("Please specify an action: install or remove")
)
self.stdout.write("Usage: python manage.py demo_data install|remove")
return
self._load_demo_data()
with override("en-gb"):
if action == "install":
self._install(options)
elif action == "remove":
self._remove()
else:
self.stdout.write(self.style.ERROR(f"Unknown action: {action}"))
@property
def staff_user(self):
user, _ = User.objects.get_or_create(
email=f"staff@{DEMO_EMAIL_DOMAIN}",
first_name="Alice",
last_name="Schon",
is_staff=True,
is_active=True,
is_verified=True,
)
if _:
user.set_password("Staff!Demo888")
user.save()
if not user.groups.filter(name="E-Commerce Admin").exists():
user.groups.add(Group.objects.get(name="E-Commerce Admin"))
return user
@property
def super_user(self):
user, _ = User.objects.get_or_create(
email=f"super@{DEMO_EMAIL_DOMAIN}",
first_name="Bob",
last_name="Schon",
is_superuser=True,
is_staff=True,
is_active=True,
is_verified=True,
)
if _:
user.set_password("Super!Demo888")
user.save()
return user
def _load_demo_data(self) -> None:
fixture_path = Path(settings.BASE_DIR) / "engine/core/fixtures/demo.json"
with open(fixture_path, encoding="utf-8") as f:
self.demo_data = json.load(f)
def _install(self, options: dict[str, Any]) -> None:
num_users = options["users"]
num_orders = options["orders"]
num_days = options["days"]
skip_products = options.get("skip_products", False)
self.stdout.write(self.style.NOTICE("Starting demo fixture installation..."))
if not skip_products:
self.stdout.write("Creating gem products with translations...")
self._create_gem_products()
self.stdout.write(self.style.SUCCESS("Gem products created!"))
self.stdout.write(f"Creating {num_users} demo users...")
users = self._create_demo_users(num_users)
self.stdout.write(self.style.SUCCESS(f"Created {len(users)} users"))
products = list(Product.objects.filter(is_active=True))
if not products:
self.stdout.write(self.style.ERROR("No products found!"))
return
self.stdout.write(f"Creating {num_orders} orders over {num_days} days...")
orders, refunded_count = self._create_demo_orders(
users, products, num_orders, num_days
)
self.stdout.write(
self.style.SUCCESS(
f"Created {len(orders)} orders ({refunded_count} refunded)"
)
)
self.stdout.write("Creating wishlists for demo users...")
wishlist_count = self._create_demo_wishlists(users, products)
self.stdout.write(self.style.SUCCESS(f"Created {wishlist_count} wishlists"))
self.stdout.write("Creating blog posts...")
blog_count = self._create_blog_posts()
self.stdout.write(self.style.SUCCESS(f"Created {blog_count} blog posts"))
self.stdout.write(
self.style.SUCCESS(f"Created staff {self.staff_user.email} user")
)
self.stdout.write(
self.style.SUCCESS(f"Created super {self.super_user.email} user")
)
self._print_summary(users, orders, refunded_count, num_days)
def _remove(self) -> None:
self.stdout.write(self.style.WARNING("Removing demo fixtures..."))
with transaction.atomic():
demo_users = User.objects.filter(email__endswith=f"@{DEMO_EMAIL_DOMAIN}")
user_count = demo_users.count()
orders = Order.objects.filter(user__in=demo_users)
order_count = orders.count()
OrderProduct.objects.filter(order__in=orders).delete()
orders.delete()
Address.objects.filter(user__in=demo_users).delete()
Balance.objects.filter(user__in=demo_users).delete()
Wishlist.objects.filter(user__in=demo_users).delete()
post_titles = [p["title"] for p in self.demo_data.get("blog_posts", [])]
blog_count = Post.objects.filter(title__in=post_titles).delete()[0]
self.stdout.write(f" Removed blog posts: {blog_count}")
post_tag_names = [
t["tag_name"] for t in self.demo_data.get("post_tags", [])
]
PostTag.objects.filter(tag_name__in=post_tag_names).delete()
demo_users.delete()
demo_vendor_name = self.demo_data["vendor"]["name"]
try:
vendor = Vendor.objects.get(name=demo_vendor_name)
Stock.objects.filter(vendor=vendor).delete()
vendor.delete()
self.stdout.write(f" Removed vendor: {demo_vendor_name}")
except Vendor.DoesNotExist:
pass
partnumbers = [p["partnumber"] for p in self.demo_data["products"]]
products = Product.objects.filter(partnumber__in=partnumbers)
product_count = products.count()
for product in products:
for image in product.images.all():
if image.image:
image.image.delete(save=False)
image.delete()
product_dir: Path = settings.MEDIA_ROOT / "products" / str(product.uuid)
if product_dir.exists() and not any(product_dir.iterdir()):
shutil.rmtree(product_dir, ignore_errors=True)
# Delete OrderProducts referencing demo products (from non-demo users)
# to avoid ProtectedError since OrderProduct.product uses PROTECT
OrderProduct.objects.filter(product__in=products).delete()
products.delete()
brand_names = [b["name"] for b in self.demo_data["brands"]]
Brand.objects.filter(name__in=brand_names).delete()
for cat_data in reversed(self.demo_data["categories"]):
Category.objects.filter(name=cat_data["name"]).delete()
tag_names = [t["tag_name"] for t in self.demo_data["category_tags"]]
CategoryTag.objects.filter(tag_name__in=tag_names).delete()
tag_names = [t["tag_name"] for t in self.demo_data["product_tags"]]
ProductTag.objects.filter(tag_name__in=tag_names).delete()
group_names = [g["name"] for g in self.demo_data["attribute_groups"]]
Attribute.objects.filter(group__name__in=group_names).delete()
AttributeGroup.objects.filter(name__in=group_names).delete()
User.objects.filter(email=f"staff@{DEMO_EMAIL_DOMAIN}").delete()
User.objects.filter(email=f"super@{DEMO_EMAIL_DOMAIN}").delete()
self.stdout.write("")
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(self.style.SUCCESS("Demo fixtures removed successfully!"))
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(f" Users removed: {user_count}")
self.stdout.write(f" Orders removed: {order_count}")
self.stdout.write(f" Products removed: {product_count}")
def _print_summary(
self, users: list, orders: list, refunded_count: int, num_days: int
) -> None:
password = self.demo_data["demo_users"]["password"]
self.stdout.write("")
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(self.style.SUCCESS("Demo fixtures installed successfully!"))
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(f" Products: {Product.objects.count()}")
self.stdout.write(f" Categories: {Category.objects.count()}")
self.stdout.write(f" Brands: {Brand.objects.count()}")
self.stdout.write(f" Blog posts: {Post.objects.count()}")
self.stdout.write(f" Users created: {len(users)}")
self.stdout.write(f" Orders created: {len(orders)}")
self.stdout.write(f" Refunded orders: {refunded_count}")
self.stdout.write(f" Date range: {num_days} days")
self.stdout.write(f" Demo password: {password}")
self.stdout.write("")
self.stdout.write("Sample demo accounts:")
for user in users[:5]:
self.stdout.write(f" - {user.email}")
@transaction.atomic
def _create_gem_products(self) -> None:
data = self.demo_data
vendor, _ = Vendor.objects.get_or_create(
name=data["vendor"]["name"],
defaults={"markup_percent": data["vendor"]["markup_percent"]},
)
for tag_data in data["category_tags"]:
tag, created = CategoryTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
for tag_data in data["product_tags"]:
tag, created = ProductTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
attr_groups = {}
for group_data in data["attribute_groups"]:
group, created = AttributeGroup.objects.get_or_create(
name=group_data["name"]
)
if created and "name_ru" in group_data:
group.name_ru_ru = group_data["name_ru"]
group.save()
attr_groups[group_data["name"]] = group
for attr_data in data["attributes"]:
group = attr_groups.get(attr_data["group"])
if group:
attr, created = Attribute.objects.get_or_create(
group=group,
name=attr_data["name"],
defaults={
"value_type": attr_data["value_type"],
"is_filterable": attr_data["is_filterable"],
},
)
if created and "name_ru" in attr_data:
attr.name_ru_ru = attr_data["name_ru"]
attr.save()
attr_lookup = {}
for attr_data in data["attributes"]:
group = attr_groups.get(attr_data["group"])
if group:
try:
attr_lookup[attr_data["name"]] = Attribute.objects.get(
group=group, name=attr_data["name"]
)
except Attribute.DoesNotExist:
pass
brands = {}
for brand_data in data["brands"]:
brand, created = Brand.objects.get_or_create(
name=brand_data["name"],
defaults={"description": brand_data["description"]},
)
if created and "description_ru" in brand_data:
brand.description_ru_ru = brand_data["description_ru"]
brand.save()
brands[brand_data["name"]] = brand
categories = {}
for cat_data in data["categories"]:
parent = categories.get(cat_data["parent"]) if cat_data["parent"] else None
category, created = Category.objects.get_or_create(
name=cat_data["name"],
defaults={
"description": cat_data["description"],
"parent": parent,
"markup_percent": cat_data["markup_percent"],
},
)
if created:
if "name_ru" in cat_data:
category.name_ru_ru = cat_data["name_ru"]
if "description_ru" in cat_data:
category.description_ru_ru = cat_data["description_ru"]
category.save()
categories[cat_data["name"]] = category
for prod_data in data["products"]:
category = categories.get(prod_data["category"])
brand = brands.get(prod_data["brand"])
if not category:
continue
product, created = Product.objects.get_or_create(
partnumber=prod_data["partnumber"],
defaults={
"name": prod_data["name"],
"description": prod_data["description"],
"category": category,
"brand": brand,
"is_digital": prod_data.get("is_digital", False),
},
)
if created:
if "name_ru" in prod_data:
product.name_ru_ru = prod_data["name_ru"] # ty: ignore[invalid-assignment]
if "description_ru" in prod_data:
product.description_ru_ru = prod_data["description_ru"] # ty: ignore[invalid-assignment]
product.save()
Stock.objects.get_or_create(
vendor=vendor,
product=product,
defaults={
"sku": f"GS-{prod_data['partnumber']}",
"price": prod_data["price"],
"purchase_price": prod_data["purchase_price"],
"quantity": prod_data["quantity"],
},
)
# Add product image
self._add_product_image(product, prod_data["partnumber"])
# Add attribute values
for attr_name, av_data in prod_data.get("attribute_values", {}).items():
attr = attr_lookup.get(attr_name)
if attr:
if isinstance(av_data, dict):
value = str(av_data["en"])
value_ru = av_data.get("ru")
else:
value = str(av_data)
value_ru = None
av, created = AttributeValue.objects.get_or_create(
product=product,
attribute=attr,
defaults={"value": value},
)
if created:
if value_ru:
av.value_ru_ru = value_ru # ty:ignore[invalid-assignment]
else:
av.value_ru_ru = value # ty:ignore[invalid-assignment]
av.save()
def _find_image(self, partnumber: str, suffix: str = "") -> Path | None:
extensions = (".jpg", ".jpeg", ".png", ".webp")
for ext in extensions:
candidate = DEMO_IMAGES_DIR / f"{partnumber}{suffix}{ext}"
if candidate.exists():
return candidate
return None
def _add_product_image(self, product: Product, partnumber: str) -> None:
primary = self._find_image(partnumber)
if not primary:
primary = DEMO_IMAGES_DIR / "placeholder.png"
if not primary.exists():
self.stdout.write(
self.style.WARNING(f" No image found for {partnumber}, skipping...")
)
return
self._save_product_image(product, primary, priority=1)
n = 2
while True:
variant = self._find_image(partnumber, f" ({n})")
if not variant:
break
self._save_product_image(product, variant, priority=n)
n += 1
def _save_product_image(
self, product: Product, image_path: Path, priority: int
) -> None:
if product.images.filter(priority=priority).exists():
return
with open(image_path, "rb") as f:
image_content = f.read()
product_image = ProductImage(
product=product,
alt=product.name,
priority=priority,
)
product_image.image.save(image_path.name, ContentFile(image_content), save=True)
@transaction.atomic
def _create_demo_users(self, count: int) -> list:
users = []
user_data = self.demo_data["demo_users"]
existing_emails = set(User.objects.values_list("email", flat=True))
first_names = user_data["first_names"]
last_names = user_data["last_names"]
cities = user_data["cities"]
streets = user_data["streets"]
password = user_data["password"]
email_domain = user_data["email_domain"]
for _ in range(count):
first_name = random.choice(first_names)
last_name = random.choice(last_names)
base_email = f"{first_name.lower()}.{last_name.lower()}@{email_domain}"
email = base_email
counter = 1
while email in existing_emails:
email = (
f"{first_name.lower()}.{last_name.lower()}{counter}@{email_domain}"
)
counter += 1
existing_emails.add(email)
user, created = User.objects.get_or_create(
email=email,
defaults={
"first_name": first_name,
"last_name": last_name,
"is_active": True,
"is_verified": True,
},
)
if created:
user.set_password(password)
user.save()
Balance.objects.get_or_create(
user=user,
defaults={"amount": round(random.uniform(100, 5000), 2)},
)
city_data = random.choice(cities)
street_num = random.randint(1, 999)
street = random.choice(streets)
address_line = (
f"{street_num} {street}, {city_data['city']}, "
f"{city_data['region']} {city_data['postal_code']}, {city_data['country']}"
)
address = Address(
user=user,
street=f"{street_num} {street}",
city=city_data["city"],
region=city_data["region"],
postal_code=city_data["postal_code"],
country=city_data["country"],
address_line=address_line,
raw_data=address_line,
)
address.save()
users.append(user)
return users
@transaction.atomic
def _create_demo_orders(
self,
users: list,
products: list,
count: int,
days: int,
) -> tuple[list, int]:
orders = []
refunded_count = 0
now = timezone.now()
refund_target = int(count * 0.08)
refund_indices = set(random.sample(range(count), refund_target))
day_weights = [1 + (i / days) for i in range(days)]
total_weight = sum(day_weights)
day_probabilities = [w / total_weight for w in day_weights]
for i in range(count):
user = random.choice(users)
is_refunded = i in refund_indices
day_offset = random.choices(range(days), weights=day_probabilities)[0]
order_date = now - timedelta(
days=days - 1 - day_offset,
hours=random.randint(0, 23),
minutes=random.randint(0, 59),
)
if is_refunded:
status = "FAILED"
refunded_count += 1
elif day_offset > days * 0.7: # Recent orders
status = random.choice(["CREATED", "DELIVERING", "FINISHED"])
else:
status = "FINISHED"
address = Address.objects.filter(user=user).first()
order, _ = Order.objects.get_or_create(
user=user,
status=status,
buy_time=order_date,
defaults={
"billing_address": address,
"shipping_address": address,
},
)
Order.objects.filter(pk=order.pk).update(created=order_date)
num_products = random.randint(1, 4)
order_products = random.sample(products, min(num_products, len(products)))
for product in order_products:
quantity = random.randint(1, 3)
price = product.price if product.price else random.uniform(100, 5000)
if is_refunded:
op_status = "RETURNED"
elif status == "FINISHED":
op_status = "FINISHED"
elif status == "DELIVERING":
op_status = random.choice(["DELIVERING", "DELIVERED"])
else:
op_status = random.choice(["ACCEPTED", "PENDING"])
OrderProduct.objects.get_or_create(
order=order,
product=product,
defaults={
"quantity": quantity,
"buy_price": round(price, 2),
"status": op_status,
},
)
orders.append(order)
return orders, refunded_count
@transaction.atomic
def _create_demo_wishlists(self, users: list, products: list) -> int:
"""
Ensure exactly 5 products are wishlisted, each by 2-4 random demo users.
"""
if len(products) < 5:
self.stdout.write(
self.style.WARNING(
f"Not enough products ({len(products)}) to create 5 wishlisted items"
)
)
wishlisted_products = products
else:
wishlisted_products = random.sample(products, 5)
if len(users) < 2:
self.stdout.write(
self.style.WARNING("Not enough users to create wishlists")
)
return 0
users_with_wishlists = set()
for product in wishlisted_products:
num_users = random.randint(2, min(4, len(users)))
selected_users = random.sample(users, num_users)
for user in selected_users:
wishlist, _ = Wishlist.objects.get_or_create(user=user)
wishlist.products.add(product)
users_with_wishlists.add(user.id)
return len(users_with_wishlists)
@transaction.atomic
def _create_blog_posts(self) -> int:
data = self.demo_data
author = self.staff_user
count = 0
for tag_data in data.get("post_tags", []):
tag, created = PostTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
for post_data in data.get("blog_posts", []):
content_en = self._load_blog_content(post_data["content_file"], "en")
content_ru = self._load_blog_content(post_data["content_file"], "ru")
if not content_en:
self.stdout.write(
self.style.WARNING(
f" No content found for {post_data['content_file']}, skipping..."
)
)
continue
post, created = Post.objects.get_or_create(
title=post_data["title"],
defaults={
"author": author,
"content": content_en,
"meta_description": post_data.get("meta_description", ""),
"is_static_page": post_data.get("is_static_page", False),
},
)
if created:
if "title_ru" in post_data:
post.title_ru_ru = post_data["title_ru"]
if content_ru:
post.content_ru_ru = content_ru
if "meta_description_ru" in post_data:
post.meta_description_ru_ru = post_data["meta_description_ru"]
post.save()
for tag_name in post_data.get("tags", []):
try:
tag = PostTag.objects.get(tag_name=tag_name)
post.tags.add(tag)
except PostTag.DoesNotExist:
pass
count += 1
return count
def _load_blog_content(self, content_file: str, lang: str) -> str | None:
file_path = DEMO_BLOG_DIR / f"{content_file}.{lang}.md"
if not file_path.exists():
return None
with open(file_path, encoding="utf-8") as f:
return f.read()