schon/engine/core/management/commands/demo_data.py
Egor fureunoir Gorbunov 72a54de707 fix(demo_data): set default passwords for staff and superuser in demo data
Ensure default passwords are assigned to demo users for consistent behavior during testing and development.
2026-02-15 02:17:34 +03:00

670 lines
24 KiB
Python

import json
import random
import shutil
from datetime import timedelta
from pathlib import Path
from typing import Any
from django.conf import settings
from django.core.files.base import ContentFile
from django.core.management.base import BaseCommand
from django.db import transaction
from django.utils import timezone
from engine.blog.models import Post, PostTag
from engine.core.models import (
Address,
Attribute,
AttributeGroup,
Brand,
Category,
CategoryTag,
Order,
OrderProduct,
Product,
ProductImage,
ProductTag,
Stock,
Vendor,
Wishlist,
)
from engine.payments.models import Balance
from engine.vibes_auth.models import Group, User
DEMO_EMAIL_DOMAIN = "wiseless.xyz"
DEMO_VENDOR_NAME = "GemDemo Global"
DEMO_IMAGES_DIR = Path(settings.BASE_DIR) / "engine/core/fixtures/demo_products_images"
DEMO_BLOG_DIR = Path(settings.BASE_DIR) / "engine/core/fixtures/demo_blog_posts"
class Command(BaseCommand):
help = "Install or remove demo fixtures for Schon"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.demo_data: dict[str, Any] = {}
def add_arguments(self, parser):
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
install_parser = subparsers.add_parser("install", help="Install demo fixtures")
install_parser.add_argument(
"--users",
type=int,
default=50,
help="Number of demo users to create (default: 50)",
)
install_parser.add_argument(
"--orders",
type=int,
default=100,
help="Number of demo orders to create (default: 100)",
)
install_parser.add_argument(
"--days",
type=int,
default=30,
help="Number of days to spread orders over (default: 30)",
)
install_parser.add_argument(
"--skip-products",
action="store_true",
help="Skip creating gem products (use if already created)",
)
subparsers.add_parser("remove", help="Remove all demo fixtures")
def handle(self, *args: list[Any], **options: dict[str, Any]) -> None:
action = options.get("action")
if not action:
self.stdout.write(
self.style.ERROR("Please specify an action: install or remove")
)
self.stdout.write("Usage: python manage.py demo_data install|remove")
return
self._load_demo_data()
if action == "install":
self._install(options)
elif action == "remove":
self._remove()
else:
self.stdout.write(self.style.ERROR(f"Unknown action: {action}"))
@property
def staff_user(self):
user, _ = User.objects.get_or_create(
email=f"staff@{DEMO_EMAIL_DOMAIN}",
password="Staff!Demo888",
first_name="Alice",
last_name="Schon",
is_staff=True,
is_active=True,
is_verified=True,
)
if _:
user.set_password("Staff!Demo888")
if not user.groups.filter(name="E-Commerce Admin").exists():
user.groups.add(Group.objects.get(name="E-Commerce Admin"))
return user
@property
def super_user(self):
user, _ = User.objects.get_or_create(
email=f"super@{DEMO_EMAIL_DOMAIN}",
password="Super!Demo888",
first_name="Bob",
last_name="Schon",
is_superuser=True,
is_staff=True,
is_active=True,
is_verified=True,
)
if _:
user.set_password("Super!Demo888")
return user
def _load_demo_data(self) -> None:
fixture_path = Path(settings.BASE_DIR) / "engine/core/fixtures/demo.json"
with open(fixture_path, encoding="utf-8") as f:
self.demo_data = json.load(f)
def _install(self, options: dict[str, Any]) -> None:
num_users = options["users"]
num_orders = options["orders"]
num_days = options["days"]
skip_products = options.get("skip_products", False)
self.stdout.write(self.style.NOTICE("Starting demo fixture installation..."))
if not skip_products:
self.stdout.write("Creating gem products with translations...")
self._create_gem_products()
self.stdout.write(self.style.SUCCESS("Gem products created!"))
self.stdout.write(f"Creating {num_users} demo users...")
users = self._create_demo_users(num_users)
self.stdout.write(self.style.SUCCESS(f"Created {len(users)} users"))
products = list(Product.objects.filter(is_active=True))
if not products:
self.stdout.write(self.style.ERROR("No products found!"))
return
self.stdout.write(f"Creating {num_orders} orders over {num_days} days...")
orders, refunded_count = self._create_demo_orders(
users, products, num_orders, num_days
)
self.stdout.write(
self.style.SUCCESS(
f"Created {len(orders)} orders ({refunded_count} refunded)"
)
)
self.stdout.write("Creating wishlists for demo users...")
wishlist_count = self._create_demo_wishlists(users, products)
self.stdout.write(self.style.SUCCESS(f"Created {wishlist_count} wishlists"))
self.stdout.write("Creating blog posts...")
blog_count = self._create_blog_posts()
self.stdout.write(self.style.SUCCESS(f"Created {blog_count} blog posts"))
self.stdout.write(
self.style.SUCCESS(f"Created staff {self.staff_user.email} user")
)
self.stdout.write(
self.style.SUCCESS(f"Created super {self.super_user.email} user")
)
self._print_summary(users, orders, refunded_count, num_days)
def _remove(self) -> None:
self.stdout.write(self.style.WARNING("Removing demo fixtures..."))
with transaction.atomic():
demo_users = User.objects.filter(email__endswith=f"@{DEMO_EMAIL_DOMAIN}")
user_count = demo_users.count()
orders = Order.objects.filter(user__in=demo_users)
order_count = orders.count()
OrderProduct.objects.filter(order__in=orders).delete()
orders.delete()
Address.objects.filter(user__in=demo_users).delete()
Balance.objects.filter(user__in=demo_users).delete()
Wishlist.objects.filter(user__in=demo_users).delete()
post_titles = [p["title"] for p in self.demo_data.get("blog_posts", [])]
blog_count = Post.objects.filter(title__in=post_titles).delete()[0]
self.stdout.write(f" Removed blog posts: {blog_count}")
post_tag_names = [
t["tag_name"] for t in self.demo_data.get("post_tags", [])
]
PostTag.objects.filter(tag_name__in=post_tag_names).delete()
demo_users.delete()
try:
vendor = Vendor.objects.get(name=DEMO_VENDOR_NAME)
Stock.objects.filter(vendor=vendor).delete()
vendor.delete()
self.stdout.write(f" Removed vendor: {DEMO_VENDOR_NAME}")
except Vendor.DoesNotExist:
pass
partnumbers = [p["partnumber"] for p in self.demo_data["products"]]
products = Product.objects.filter(partnumber__in=partnumbers)
product_count = products.count()
for product in products:
for image in product.images.all():
if image.image:
image.image.delete(save=False)
image.delete()
product_dir: Path = settings.MEDIA_ROOT / "products" / str(product.uuid)
if product_dir.exists() and not any(product_dir.iterdir()):
shutil.rmtree(product_dir, ignore_errors=True)
products.delete()
brand_names = [b["name"] for b in self.demo_data["brands"]]
Brand.objects.filter(name__in=brand_names).delete()
for cat_data in reversed(self.demo_data["categories"]):
Category.objects.filter(name=cat_data["name"]).delete()
tag_names = [t["tag_name"] for t in self.demo_data["category_tags"]]
CategoryTag.objects.filter(tag_name__in=tag_names).delete()
tag_names = [t["tag_name"] for t in self.demo_data["product_tags"]]
ProductTag.objects.filter(tag_name__in=tag_names).delete()
group_names = [g["name"] for g in self.demo_data["attribute_groups"]]
Attribute.objects.filter(group__name__in=group_names).delete()
AttributeGroup.objects.filter(name__in=group_names).delete()
self.staff_user.delete()
self.super_user.delete()
self.stdout.write("")
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(self.style.SUCCESS("Demo fixtures removed successfully!"))
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(f" Users removed: {user_count}")
self.stdout.write(f" Orders removed: {order_count}")
self.stdout.write(f" Products removed: {product_count}")
def _print_summary(
self, users: list, orders: list, refunded_count: int, num_days: int
) -> None:
password = self.demo_data["demo_users"]["password"]
self.stdout.write("")
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(self.style.SUCCESS("Demo fixtures installed successfully!"))
self.stdout.write(self.style.SUCCESS("=" * 50))
self.stdout.write(f" Products: {Product.objects.count()}")
self.stdout.write(f" Categories: {Category.objects.count()}")
self.stdout.write(f" Brands: {Brand.objects.count()}")
self.stdout.write(f" Blog posts: {Post.objects.count()}")
self.stdout.write(f" Users created: {len(users)}")
self.stdout.write(f" Orders created: {len(orders)}")
self.stdout.write(f" Refunded orders: {refunded_count}")
self.stdout.write(f" Date range: {num_days} days")
self.stdout.write(f" Demo password: {password}")
self.stdout.write("")
self.stdout.write("Sample demo accounts:")
for user in users[:5]:
self.stdout.write(f" - {user.email}")
@transaction.atomic
def _create_gem_products(self) -> None:
data = self.demo_data
vendor, _ = Vendor.objects.get_or_create(
name=data["vendor"]["name"],
defaults={"markup_percent": data["vendor"]["markup_percent"]},
)
for tag_data in data["category_tags"]:
tag, created = CategoryTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
for tag_data in data["product_tags"]:
tag, created = ProductTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
attr_groups = {}
for group_data in data["attribute_groups"]:
group, created = AttributeGroup.objects.get_or_create(
name=group_data["name"]
)
if created and "name_ru" in group_data:
group.name_ru_ru = group_data["name_ru"]
group.save()
attr_groups[group_data["name"]] = group
for attr_data in data["attributes"]:
group = attr_groups.get(attr_data["group"])
if group:
attr, created = Attribute.objects.get_or_create(
group=group,
name=attr_data["name"],
defaults={
"value_type": attr_data["value_type"],
"is_filterable": attr_data["is_filterable"],
},
)
if created and "name_ru" in attr_data:
attr.name_ru_ru = attr_data["name_ru"]
attr.save()
brands = {}
for brand_data in data["brands"]:
brand, created = Brand.objects.get_or_create(
name=brand_data["name"],
defaults={"description": brand_data["description"]},
)
if created and "description_ru" in brand_data:
brand.description_ru_ru = brand_data["description_ru"]
brand.save()
brands[brand_data["name"]] = brand
categories = {}
for cat_data in data["categories"]:
parent = categories.get(cat_data["parent"]) if cat_data["parent"] else None
category, created = Category.objects.get_or_create(
name=cat_data["name"],
defaults={
"description": cat_data["description"],
"parent": parent,
"markup_percent": cat_data["markup_percent"],
},
)
if created:
if "name_ru" in cat_data:
category.name_ru_ru = cat_data["name_ru"]
if "description_ru" in cat_data:
category.description_ru_ru = cat_data["description_ru"]
category.save()
categories[cat_data["name"]] = category
for prod_data in data["products"]:
category = categories.get(prod_data["category"])
brand = brands.get(prod_data["brand"])
if not category:
continue
product, created = Product.objects.get_or_create(
partnumber=prod_data["partnumber"],
defaults={
"name": prod_data["name"],
"description": prod_data["description"],
"category": category,
"brand": brand,
"is_digital": False,
},
)
if created:
if "name_ru" in prod_data:
product.name_ru_ru = prod_data["name_ru"]
if "description_ru" in prod_data:
product.description_ru_ru = prod_data["description_ru"]
product.save()
Stock.objects.create(
vendor=vendor,
product=product,
sku=f"GS-{prod_data['partnumber']}",
price=prod_data["price"],
purchase_price=prod_data["purchase_price"],
quantity=prod_data["quantity"],
)
# Add product image
self._add_product_image(product, prod_data["partnumber"])
def _add_product_image(self, product: Product, partnumber: str) -> None:
image_path = DEMO_IMAGES_DIR / f"{partnumber}.jpg"
if not image_path.exists():
image_path = DEMO_IMAGES_DIR / "placeholder.png"
if not image_path.exists():
self.stdout.write(
self.style.WARNING(f" No image found for {partnumber}, skipping...")
)
return
with open(image_path, "rb") as f:
image_content = f.read()
filename = image_path.name
product_image = ProductImage(
product=product,
alt=product.name,
priority=1,
)
product_image.image.save(filename, ContentFile(image_content), save=True)
@transaction.atomic
def _create_demo_users(self, count: int) -> list:
users = []
user_data = self.demo_data["demo_users"]
existing_emails = set(User.objects.values_list("email", flat=True))
first_names = user_data["first_names"]
last_names = user_data["last_names"]
cities = user_data["cities"]
streets = user_data["streets"]
password = user_data["password"]
email_domain = user_data["email_domain"]
for _ in range(count):
first_name = random.choice(first_names)
last_name = random.choice(last_names)
base_email = f"{first_name.lower()}.{last_name.lower()}@{email_domain}"
email = base_email
counter = 1
while email in existing_emails:
email = (
f"{first_name.lower()}.{last_name.lower()}{counter}@{email_domain}"
)
counter += 1
existing_emails.add(email)
# Create user
user = User(
email=email,
first_name=first_name,
last_name=last_name,
is_active=True,
is_verified=True,
)
user.set_password(password)
user.save()
Balance.objects.get_or_create(
user=user,
defaults={"amount": round(random.uniform(100, 5000), 2)},
)
city_data = random.choice(cities)
street_num = random.randint(1, 999)
street = random.choice(streets)
address_line = (
f"{street_num} {street}, {city_data['city']}, "
f"{city_data['region']} {city_data['postal_code']}, {city_data['country']}"
)
address = Address(
user=user,
street=f"{street_num} {street}",
city=city_data["city"],
region=city_data["region"],
postal_code=city_data["postal_code"],
country=city_data["country"],
address_line=address_line,
raw_data=address_line,
)
address.save()
users.append(user)
return users
@transaction.atomic
def _create_demo_orders(
self,
users: list,
products: list,
count: int,
days: int,
) -> tuple[list, int]:
orders = []
refunded_count = 0
now = timezone.now()
refund_target = int(count * 0.08)
refund_indices = set(random.sample(range(count), refund_target))
day_weights = [1 + (i / days) for i in range(days)]
total_weight = sum(day_weights)
day_probabilities = [w / total_weight for w in day_weights]
for i in range(count):
user = random.choice(users)
is_refunded = i in refund_indices
day_offset = random.choices(range(days), weights=day_probabilities)[0]
order_date = now - timedelta(
days=days - 1 - day_offset,
hours=random.randint(0, 23),
minutes=random.randint(0, 59),
)
if is_refunded:
status = "FAILED"
refunded_count += 1
elif day_offset > days * 0.7: # Recent orders
status = random.choice(["CREATED", "DELIVERING", "FINISHED"])
else:
status = "FINISHED"
address = Address.objects.filter(user=user).first()
order = Order.objects.create(
user=user,
status=status,
buy_time=order_date,
billing_address=address,
shipping_address=address,
)
Order.objects.filter(pk=order.pk).update(created=order_date)
num_products = random.randint(1, 4)
order_products = random.sample(products, min(num_products, len(products)))
for product in order_products:
quantity = random.randint(1, 3)
price = product.price if product.price else random.uniform(100, 5000)
if is_refunded:
op_status = "RETURNED"
elif status == "FINISHED":
op_status = "FINISHED"
elif status == "DELIVERING":
op_status = random.choice(["DELIVERING", "DELIVERED"])
else:
op_status = random.choice(["ACCEPTED", "PENDING"])
OrderProduct.objects.create(
order=order,
product=product,
quantity=quantity,
buy_price=round(price, 2),
status=op_status,
)
orders.append(order)
return orders, refunded_count
@transaction.atomic
def _create_demo_wishlists(self, users: list, products: list) -> int:
"""
Ensure exactly 5 products are wishlisted, each by 2-4 random demo users.
"""
if len(products) < 5:
self.stdout.write(
self.style.WARNING(
f"Not enough products ({len(products)}) to create 5 wishlisted items"
)
)
wishlisted_products = products
else:
wishlisted_products = random.sample(products, 5)
if len(users) < 2:
self.stdout.write(
self.style.WARNING("Not enough users to create wishlists")
)
return 0
users_with_wishlists = set()
for product in wishlisted_products:
num_users = random.randint(2, min(4, len(users)))
selected_users = random.sample(users, num_users)
for user in selected_users:
wishlist, _ = Wishlist.objects.get_or_create(user=user)
wishlist.products.add(product)
users_with_wishlists.add(user.id)
return len(users_with_wishlists)
@transaction.atomic
def _create_blog_posts(self) -> int:
data = self.demo_data
author = self.staff_user
count = 0
for tag_data in data.get("post_tags", []):
tag, created = PostTag.objects.get_or_create(
tag_name=tag_data["tag_name"],
defaults={"name": tag_data["name"]},
)
if created and "name_ru" in tag_data:
tag.name_ru_ru = tag_data["name_ru"]
tag.save()
for post_data in data.get("blog_posts", []):
if Post.objects.filter(title=post_data["title"]).exists():
continue
content_en = self._load_blog_content(post_data["content_file"], "en")
content_ru = self._load_blog_content(post_data["content_file"], "ru")
if not content_en:
self.stdout.write(
self.style.WARNING(
f" No content found for {post_data['content_file']}, skipping..."
)
)
continue
post = Post(
author=author,
title=post_data["title"],
content=content_en,
meta_description=post_data.get("meta_description", ""),
is_static_page=post_data.get("is_static_page", False),
)
if "title_ru" in post_data:
post.title_ru_ru = post_data["title_ru"]
if content_ru:
post.content_ru_ru = content_ru
if "meta_description_ru" in post_data:
post.meta_description_ru_ru = post_data["meta_description_ru"]
post.save()
for tag_name in post_data.get("tags", []):
try:
tag = PostTag.objects.get(tag_name=tag_name)
post.tags.add(tag)
except PostTag.DoesNotExist:
pass
count += 1
return count
def _load_blog_content(self, content_file: str, lang: str) -> str | None:
file_path = DEMO_BLOG_DIR / f"{content_file}.{lang}.md"
if not file_path.exists():
return None
with open(file_path, encoding="utf-8") as f:
return f.read()