schon/core/management/commands/clear_unwanted.py

90 lines
3.9 KiB
Python

from collections import defaultdict
from typing import Any
from django.core.management.base import BaseCommand
from core.models import Category, Product, Stock
class Command(BaseCommand):
def handle(self, *args: list[Any], **options: dict[Any, Any]) -> None:
self.stdout.write(self.style.SUCCESS("Starting clearing unwanted data..."))
# 1. Clean up duplicate Stock entries per product and vendor:
# Group stocks by (product, vendor)
stocks_by_group = defaultdict(list)
for stock in Stock.objects.all().order_by("modified"):
stocks_by_group[stock.product_pk].append(stock)
stock_deletions: list[str] = []
for group in stocks_by_group.values():
if len(group) <= 1:
continue
# Split the group into admin-edited and never-edited
admin_edited = [s for s in group if s.modified > s.created]
if admin_edited:
# Keep the admin-edited stock with the latest modified
record_to_keep = max(admin_edited, key=lambda s: s.modified)
else:
# None were admin-edited; keep the one with the latest modified field
record_to_keep = max(group, key=lambda s: s.modified)
# Mark all stocks (except the designated one) for deletion.
for s in group:
if s.uuid != record_to_keep.uuid:
stock_deletions.append(str(s.uuid))
if stock_deletions:
Stock.objects.filter(uuid__in=stock_deletions).delete()
self.stdout.write(self.style.SUCCESS(f"Deleted {len(stock_deletions)} duplicate stock entries."))
# 2. Clean up duplicate Category entries based on name (case-insensitive)
category_groups = defaultdict(list)
for cat in Category.objects.all().order_by("modified"):
key: str = cat.name.lower()
category_groups[key].append(cat)
categories_to_delete: list[str] = []
total_product_updates = 0
for cat_list in category_groups.values():
if len(cat_list) <= 1:
continue
# Check for admin-edited categories in this group.
admin_edited = [c for c in cat_list if c.modified > c.created]
if admin_edited:
keep_category = max(admin_edited, key=lambda c: c.modified)
else:
keep_category = max(cat_list, key=lambda c: c.modified)
for duplicate in cat_list:
if duplicate.uuid == keep_category.uuid:
continue
total_product_updates += Product.objects.filter(category=duplicate).update(category=keep_category)
categories_to_delete.append(str(duplicate.uuid))
if categories_to_delete:
Category.objects.filter(uuid__in=categories_to_delete).delete()
self.stdout.write(
self.style.SUCCESS(
f"Replaced category for {total_product_updates} product(s) "
f"and deleted {len(categories_to_delete)} duplicate categories."
)
)
# 3. For Products without stocks: set is_active = False.
inactive_products = Product.objects.filter(stocks__isnull=True)
count_inactive = inactive_products.count()
if count_inactive:
inactive_products.update(is_active=False)
self.stdout.write(self.style.SUCCESS(f"Set {count_inactive} product(s) as inactive due to missing stocks."))
# 4. Delete stocks without an associated product.
orphan_stocks = Stock.objects.filter(product__isnull=True)
orphan_count = orphan_stocks.count()
if orphan_count:
orphan_stocks.delete()
self.stdout.write(self.style.SUCCESS(f"Deleted {orphan_count} stock(s) without an associated product."))
self.stdout.write(self.style.SUCCESS("Started fetching products task in worker container without errors!"))