Introduce `demo_data` management command for populating or removing demo fixtures in the system, facilitating testing and demonstration. Enhancements include low stock product list UI with product images and integration of value formatting for financial KPIs.
587 lines
21 KiB
Python
587 lines
21 KiB
Python
import json
|
|
import random
|
|
import shutil
|
|
from datetime import timedelta
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from django.conf import settings
|
|
from django.core.files.base import ContentFile
|
|
from django.core.management.base import BaseCommand
|
|
from django.db import transaction
|
|
from django.utils import timezone
|
|
|
|
from engine.core.models import (
|
|
Address,
|
|
Attribute,
|
|
AttributeGroup,
|
|
Brand,
|
|
Category,
|
|
CategoryTag,
|
|
Order,
|
|
OrderProduct,
|
|
Product,
|
|
ProductImage,
|
|
ProductTag,
|
|
Stock,
|
|
Vendor,
|
|
Wishlist,
|
|
)
|
|
from engine.payments.models import Balance
|
|
from engine.vibes_auth.models import Group, User
|
|
|
|
DEMO_EMAIL_DOMAIN = "demo.schon.store"
|
|
DEMO_VENDOR_NAME = "GemSource Global"
|
|
DEMO_IMAGES_DIR = Path(settings.BASE_DIR) / "engine/core/fixtures/demo_products_images"
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Install or remove demo fixtures for Schon"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.demo_data: dict[str, Any] = {}
|
|
|
|
def add_arguments(self, parser):
|
|
subparsers = parser.add_subparsers(dest="action", help="Action to perform")
|
|
|
|
install_parser = subparsers.add_parser("install", help="Install demo fixtures")
|
|
install_parser.add_argument(
|
|
"--users",
|
|
type=int,
|
|
default=50,
|
|
help="Number of demo users to create (default: 50)",
|
|
)
|
|
install_parser.add_argument(
|
|
"--orders",
|
|
type=int,
|
|
default=100,
|
|
help="Number of demo orders to create (default: 100)",
|
|
)
|
|
install_parser.add_argument(
|
|
"--days",
|
|
type=int,
|
|
default=30,
|
|
help="Number of days to spread orders over (default: 30)",
|
|
)
|
|
install_parser.add_argument(
|
|
"--skip-products",
|
|
action="store_true",
|
|
help="Skip creating gem products (use if already created)",
|
|
)
|
|
|
|
subparsers.add_parser("remove", help="Remove all demo fixtures")
|
|
|
|
def handle(self, *args: list[Any], **options: dict[str, Any]) -> None:
|
|
action = options.get("action")
|
|
|
|
if not action:
|
|
self.stdout.write(
|
|
self.style.ERROR("Please specify an action: install or remove")
|
|
)
|
|
self.stdout.write("Usage: python manage.py demo_data install|remove")
|
|
return
|
|
|
|
self._load_demo_data()
|
|
|
|
if action == "install":
|
|
self._install(options)
|
|
elif action == "remove":
|
|
self._remove()
|
|
else:
|
|
self.stdout.write(self.style.ERROR(f"Unknown action: {action}"))
|
|
|
|
@property
|
|
def staff_user(self):
|
|
user, _ = User.objects.get_or_create(
|
|
email=f"staff@{DEMO_EMAIL_DOMAIN}",
|
|
password="Staff!Demo888",
|
|
first_name="Alice",
|
|
last_name="Schon",
|
|
is_staff=True,
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
if not user.groups.filter(name="E-Commerce Admin").exists():
|
|
user.groups.add(Group.objects.get(name="E-Commerce Admin"))
|
|
return user
|
|
|
|
@property
|
|
def super_user(self):
|
|
user, _ = User.objects.get_or_create(
|
|
email=f"super@{DEMO_EMAIL_DOMAIN}",
|
|
password="Super!Demo888",
|
|
first_name="Bob",
|
|
last_name="Schon",
|
|
is_superuser=True,
|
|
is_staff=True,
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
return user
|
|
|
|
def _load_demo_data(self) -> None:
|
|
fixture_path = Path(settings.BASE_DIR) / "engine/core/fixtures/demo.json"
|
|
with open(fixture_path, encoding="utf-8") as f:
|
|
self.demo_data = json.load(f)
|
|
|
|
def _install(self, options: dict[str, Any]) -> None:
|
|
num_users = options["users"]
|
|
num_orders = options["orders"]
|
|
num_days = options["days"]
|
|
skip_products = options.get("skip_products", False)
|
|
|
|
self.stdout.write(self.style.NOTICE("Starting demo fixture installation..."))
|
|
|
|
if not skip_products:
|
|
self.stdout.write("Creating gem products with translations...")
|
|
self._create_gem_products()
|
|
self.stdout.write(self.style.SUCCESS("Gem products created!"))
|
|
|
|
self.stdout.write(f"Creating {num_users} demo users...")
|
|
users = self._create_demo_users(num_users)
|
|
self.stdout.write(self.style.SUCCESS(f"Created {len(users)} users"))
|
|
|
|
products = list(Product.objects.filter(is_active=True))
|
|
if not products:
|
|
self.stdout.write(self.style.ERROR("No products found!"))
|
|
return
|
|
|
|
self.stdout.write(f"Creating {num_orders} orders over {num_days} days...")
|
|
orders, refunded_count = self._create_demo_orders(
|
|
users, products, num_orders, num_days
|
|
)
|
|
self.stdout.write(
|
|
self.style.SUCCESS(
|
|
f"Created {len(orders)} orders ({refunded_count} refunded)"
|
|
)
|
|
)
|
|
|
|
self.stdout.write("Creating wishlists for demo users...")
|
|
wishlist_count = self._create_demo_wishlists(users, products)
|
|
self.stdout.write(self.style.SUCCESS(f"Created {wishlist_count} wishlists"))
|
|
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"Created staff {self.staff_user.email} user")
|
|
)
|
|
self.stdout.write(
|
|
self.style.SUCCESS(f"Created super {self.super_user.email} user")
|
|
)
|
|
|
|
self._print_summary(users, orders, refunded_count, num_days)
|
|
|
|
def _remove(self) -> None:
|
|
self.stdout.write(self.style.WARNING("Removing demo fixtures..."))
|
|
|
|
with transaction.atomic():
|
|
demo_users = User.objects.filter(email__endswith=f"@{DEMO_EMAIL_DOMAIN}")
|
|
user_count = demo_users.count()
|
|
|
|
orders = Order.objects.filter(user__in=demo_users)
|
|
order_count = orders.count()
|
|
|
|
OrderProduct.objects.filter(order__in=orders).delete()
|
|
|
|
orders.delete()
|
|
|
|
Address.objects.filter(user__in=demo_users).delete()
|
|
|
|
Balance.objects.filter(user__in=demo_users).delete()
|
|
|
|
Wishlist.objects.filter(user__in=demo_users).delete()
|
|
|
|
demo_users.delete()
|
|
|
|
try:
|
|
vendor = Vendor.objects.get(name=DEMO_VENDOR_NAME)
|
|
Stock.objects.filter(vendor=vendor).delete()
|
|
vendor.delete()
|
|
self.stdout.write(f" Removed vendor: {DEMO_VENDOR_NAME}")
|
|
except Vendor.DoesNotExist:
|
|
pass
|
|
|
|
partnumbers = [p["partnumber"] for p in self.demo_data["products"]]
|
|
products = Product.objects.filter(partnumber__in=partnumbers)
|
|
product_count = products.count()
|
|
|
|
for product in products:
|
|
for image in product.images.all():
|
|
if image.image:
|
|
image.image.delete(save=False)
|
|
image.delete()
|
|
product_dir: Path = settings.MEDIA_ROOT / "products" / str(product.uuid)
|
|
if product_dir.exists() and not any(product_dir.iterdir()):
|
|
shutil.rmtree(product_dir, ignore_errors=True)
|
|
|
|
products.delete()
|
|
|
|
brand_names = [b["name"] for b in self.demo_data["brands"]]
|
|
Brand.objects.filter(name__in=brand_names).delete()
|
|
|
|
for cat_data in reversed(self.demo_data["categories"]):
|
|
Category.objects.filter(name=cat_data["name"]).delete()
|
|
|
|
tag_names = [t["tag_name"] for t in self.demo_data["category_tags"]]
|
|
CategoryTag.objects.filter(tag_name__in=tag_names).delete()
|
|
|
|
tag_names = [t["tag_name"] for t in self.demo_data["product_tags"]]
|
|
ProductTag.objects.filter(tag_name__in=tag_names).delete()
|
|
|
|
group_names = [g["name"] for g in self.demo_data["attribute_groups"]]
|
|
Attribute.objects.filter(group__name__in=group_names).delete()
|
|
AttributeGroup.objects.filter(name__in=group_names).delete()
|
|
|
|
self.staff_user.delete()
|
|
self.super_user.delete()
|
|
|
|
self.stdout.write("")
|
|
self.stdout.write(self.style.SUCCESS("=" * 50))
|
|
self.stdout.write(self.style.SUCCESS("Demo fixtures removed successfully!"))
|
|
self.stdout.write(self.style.SUCCESS("=" * 50))
|
|
self.stdout.write(f" Users removed: {user_count}")
|
|
self.stdout.write(f" Orders removed: {order_count}")
|
|
self.stdout.write(f" Products removed: {product_count}")
|
|
|
|
def _print_summary(
|
|
self, users: list, orders: list, refunded_count: int, num_days: int
|
|
) -> None:
|
|
password = self.demo_data["demo_users"]["password"]
|
|
|
|
self.stdout.write("")
|
|
self.stdout.write(self.style.SUCCESS("=" * 50))
|
|
self.stdout.write(self.style.SUCCESS("Demo fixtures installed successfully!"))
|
|
self.stdout.write(self.style.SUCCESS("=" * 50))
|
|
self.stdout.write(f" Products: {Product.objects.count()}")
|
|
self.stdout.write(f" Categories: {Category.objects.count()}")
|
|
self.stdout.write(f" Brands: {Brand.objects.count()}")
|
|
self.stdout.write(f" Users created: {len(users)}")
|
|
self.stdout.write(f" Orders created: {len(orders)}")
|
|
self.stdout.write(f" Refunded orders: {refunded_count}")
|
|
self.stdout.write(f" Date range: {num_days} days")
|
|
self.stdout.write(f" Demo password: {password}")
|
|
self.stdout.write("")
|
|
self.stdout.write("Sample demo accounts:")
|
|
for user in users[:5]:
|
|
self.stdout.write(f" - {user.email}")
|
|
|
|
@transaction.atomic
|
|
def _create_gem_products(self) -> None:
|
|
data = self.demo_data
|
|
|
|
vendor, _ = Vendor.objects.get_or_create(
|
|
name=data["vendor"]["name"],
|
|
defaults={"markup_percent": data["vendor"]["markup_percent"]},
|
|
)
|
|
|
|
for tag_data in data["category_tags"]:
|
|
tag, created = CategoryTag.objects.get_or_create(
|
|
tag_name=tag_data["tag_name"],
|
|
defaults={"name": tag_data["name"]},
|
|
)
|
|
if created and "name_ru" in tag_data:
|
|
tag.name_ru_ru = tag_data["name_ru"]
|
|
tag.save()
|
|
|
|
for tag_data in data["product_tags"]:
|
|
tag, created = ProductTag.objects.get_or_create(
|
|
tag_name=tag_data["tag_name"],
|
|
defaults={"name": tag_data["name"]},
|
|
)
|
|
if created and "name_ru" in tag_data:
|
|
tag.name_ru_ru = tag_data["name_ru"]
|
|
tag.save()
|
|
|
|
attr_groups = {}
|
|
for group_data in data["attribute_groups"]:
|
|
group, created = AttributeGroup.objects.get_or_create(
|
|
name=group_data["name"]
|
|
)
|
|
if created and "name_ru" in group_data:
|
|
group.name_ru_ru = group_data["name_ru"]
|
|
group.save()
|
|
attr_groups[group_data["name"]] = group
|
|
|
|
for attr_data in data["attributes"]:
|
|
group = attr_groups.get(attr_data["group"])
|
|
if group:
|
|
attr, created = Attribute.objects.get_or_create(
|
|
group=group,
|
|
name=attr_data["name"],
|
|
defaults={
|
|
"value_type": attr_data["value_type"],
|
|
"is_filterable": attr_data["is_filterable"],
|
|
},
|
|
)
|
|
if created and "name_ru" in attr_data:
|
|
attr.name_ru_ru = attr_data["name_ru"]
|
|
attr.save()
|
|
|
|
brands = {}
|
|
for brand_data in data["brands"]:
|
|
brand, created = Brand.objects.get_or_create(
|
|
name=brand_data["name"],
|
|
defaults={"description": brand_data["description"]},
|
|
)
|
|
if created and "description_ru" in brand_data:
|
|
brand.description_ru_ru = brand_data["description_ru"]
|
|
brand.save()
|
|
brands[brand_data["name"]] = brand
|
|
|
|
categories = {}
|
|
for cat_data in data["categories"]:
|
|
parent = categories.get(cat_data["parent"]) if cat_data["parent"] else None
|
|
category, created = Category.objects.get_or_create(
|
|
name=cat_data["name"],
|
|
defaults={
|
|
"description": cat_data["description"],
|
|
"parent": parent,
|
|
"markup_percent": cat_data["markup_percent"],
|
|
},
|
|
)
|
|
if created:
|
|
if "name_ru" in cat_data:
|
|
category.name_ru_ru = cat_data["name_ru"]
|
|
if "description_ru" in cat_data:
|
|
category.description_ru_ru = cat_data["description_ru"]
|
|
category.save()
|
|
categories[cat_data["name"]] = category
|
|
|
|
for prod_data in data["products"]:
|
|
category = categories.get(prod_data["category"])
|
|
brand = brands.get(prod_data["brand"])
|
|
|
|
if not category:
|
|
continue
|
|
|
|
product, created = Product.objects.get_or_create(
|
|
partnumber=prod_data["partnumber"],
|
|
defaults={
|
|
"name": prod_data["name"],
|
|
"description": prod_data["description"],
|
|
"category": category,
|
|
"brand": brand,
|
|
"is_digital": False,
|
|
},
|
|
)
|
|
|
|
if created:
|
|
if "name_ru" in prod_data:
|
|
product.name_ru_ru = prod_data["name_ru"]
|
|
if "description_ru" in prod_data:
|
|
product.description_ru_ru = prod_data["description_ru"]
|
|
product.save()
|
|
|
|
Stock.objects.create(
|
|
vendor=vendor,
|
|
product=product,
|
|
sku=f"GS-{prod_data['partnumber']}",
|
|
price=prod_data["price"],
|
|
purchase_price=prod_data["purchase_price"],
|
|
quantity=prod_data["quantity"],
|
|
)
|
|
|
|
# Add product image
|
|
self._add_product_image(product, prod_data["partnumber"])
|
|
|
|
def _add_product_image(self, product: Product, partnumber: str) -> None:
|
|
image_path = DEMO_IMAGES_DIR / f"{partnumber}.jpg"
|
|
if not image_path.exists():
|
|
image_path = DEMO_IMAGES_DIR / "placeholder.png"
|
|
|
|
if not image_path.exists():
|
|
self.stdout.write(
|
|
self.style.WARNING(f" No image found for {partnumber}, skipping...")
|
|
)
|
|
return
|
|
|
|
with open(image_path, "rb") as f:
|
|
image_content = f.read()
|
|
|
|
filename = image_path.name
|
|
product_image = ProductImage(
|
|
product=product,
|
|
alt=product.name,
|
|
priority=1,
|
|
)
|
|
product_image.image.save(filename, ContentFile(image_content), save=True)
|
|
|
|
@transaction.atomic
|
|
def _create_demo_users(self, count: int) -> list:
|
|
users = []
|
|
user_data = self.demo_data["demo_users"]
|
|
existing_emails = set(User.objects.values_list("email", flat=True))
|
|
|
|
first_names = user_data["first_names"]
|
|
last_names = user_data["last_names"]
|
|
cities = user_data["cities"]
|
|
streets = user_data["streets"]
|
|
password = user_data["password"]
|
|
email_domain = user_data["email_domain"]
|
|
|
|
for _ in range(count):
|
|
first_name = random.choice(first_names)
|
|
last_name = random.choice(last_names)
|
|
|
|
base_email = f"{first_name.lower()}.{last_name.lower()}@{email_domain}"
|
|
email = base_email
|
|
counter = 1
|
|
while email in existing_emails:
|
|
email = (
|
|
f"{first_name.lower()}.{last_name.lower()}{counter}@{email_domain}"
|
|
)
|
|
counter += 1
|
|
|
|
existing_emails.add(email)
|
|
|
|
# Create user
|
|
user = User(
|
|
email=email,
|
|
first_name=first_name,
|
|
last_name=last_name,
|
|
is_active=True,
|
|
is_verified=True,
|
|
)
|
|
user.set_password(password)
|
|
user.save()
|
|
|
|
Balance.objects.get_or_create(
|
|
user=user,
|
|
defaults={"amount": round(random.uniform(100, 5000), 2)},
|
|
)
|
|
|
|
city_data = random.choice(cities)
|
|
street_num = random.randint(1, 999)
|
|
street = random.choice(streets)
|
|
address_line = (
|
|
f"{street_num} {street}, {city_data['city']}, "
|
|
f"{city_data['region']} {city_data['postal_code']}, {city_data['country']}"
|
|
)
|
|
|
|
address = Address(
|
|
user=user,
|
|
street=f"{street_num} {street}",
|
|
city=city_data["city"],
|
|
region=city_data["region"],
|
|
postal_code=city_data["postal_code"],
|
|
country=city_data["country"],
|
|
address_line=address_line,
|
|
raw_data=address_line,
|
|
)
|
|
address.save()
|
|
|
|
users.append(user)
|
|
|
|
return users
|
|
|
|
@transaction.atomic
|
|
def _create_demo_orders(
|
|
self,
|
|
users: list,
|
|
products: list,
|
|
count: int,
|
|
days: int,
|
|
) -> tuple[list, int]:
|
|
orders = []
|
|
refunded_count = 0
|
|
now = timezone.now()
|
|
|
|
refund_target = int(count * 0.08)
|
|
refund_indices = set(random.sample(range(count), refund_target))
|
|
|
|
day_weights = [1 + (i / days) for i in range(days)]
|
|
total_weight = sum(day_weights)
|
|
day_probabilities = [w / total_weight for w in day_weights]
|
|
|
|
for i in range(count):
|
|
user = random.choice(users)
|
|
|
|
is_refunded = i in refund_indices
|
|
|
|
day_offset = random.choices(range(days), weights=day_probabilities)[0]
|
|
order_date = now - timedelta(
|
|
days=days - 1 - day_offset,
|
|
hours=random.randint(0, 23),
|
|
minutes=random.randint(0, 59),
|
|
)
|
|
|
|
if is_refunded:
|
|
status = "FAILED"
|
|
refunded_count += 1
|
|
elif day_offset > days * 0.7: # Recent orders
|
|
status = random.choice(["CREATED", "DELIVERING", "FINISHED"])
|
|
else:
|
|
status = "FINISHED"
|
|
|
|
address = Address.objects.filter(user=user).first()
|
|
|
|
order = Order.objects.create(
|
|
user=user,
|
|
status=status,
|
|
buy_time=order_date,
|
|
billing_address=address,
|
|
shipping_address=address,
|
|
)
|
|
|
|
Order.objects.filter(pk=order.pk).update(created=order_date)
|
|
|
|
num_products = random.randint(1, 4)
|
|
order_products = random.sample(products, min(num_products, len(products)))
|
|
|
|
for product in order_products:
|
|
quantity = random.randint(1, 3)
|
|
price = product.price if product.price else random.uniform(100, 5000)
|
|
|
|
if is_refunded:
|
|
op_status = "RETURNED"
|
|
elif status == "FINISHED":
|
|
op_status = "FINISHED"
|
|
elif status == "DELIVERING":
|
|
op_status = random.choice(["DELIVERING", "DELIVERED"])
|
|
else:
|
|
op_status = random.choice(["ACCEPTED", "PENDING"])
|
|
|
|
OrderProduct.objects.create(
|
|
order=order,
|
|
product=product,
|
|
quantity=quantity,
|
|
buy_price=round(price, 2),
|
|
status=op_status,
|
|
)
|
|
|
|
orders.append(order)
|
|
|
|
return orders, refunded_count
|
|
|
|
@transaction.atomic
|
|
def _create_demo_wishlists(self, users: list, products: list) -> int:
|
|
"""
|
|
Ensure exactly 5 products are wishlisted, each by 2-4 random demo users.
|
|
"""
|
|
if len(products) < 5:
|
|
self.stdout.write(
|
|
self.style.WARNING(
|
|
f"Not enough products ({len(products)}) to create 5 wishlisted items"
|
|
)
|
|
)
|
|
wishlisted_products = products
|
|
else:
|
|
wishlisted_products = random.sample(products, 5)
|
|
|
|
if len(users) < 2:
|
|
self.stdout.write(
|
|
self.style.WARNING("Not enough users to create wishlists")
|
|
)
|
|
return 0
|
|
|
|
users_with_wishlists = set()
|
|
|
|
for product in wishlisted_products:
|
|
num_users = random.randint(2, min(4, len(users)))
|
|
selected_users = random.sample(users, num_users)
|
|
|
|
for user in selected_users:
|
|
wishlist, _ = Wishlist.objects.get_or_create(user=user)
|
|
wishlist.products.add(product)
|
|
users_with_wishlists.add(user.id)
|
|
|
|
return len(users_with_wishlists)
|