from collections import defaultdict from typing import Any from django.core.management.base import BaseCommand from engine.core.models import Category, Product, Stock class Command(BaseCommand): def handle(self, *args: list[Any], **options: dict[Any, Any]) -> None: self.stdout.write(self.style.SUCCESS("Starting clearing unwanted data...")) # 1. Clean up duplicate Stock entries per product and vendor: # Group stocks by (product, vendor) stocks_by_group = defaultdict(list) for stock in Stock.objects.all().order_by("modified"): stocks_by_group[stock.product_pk].append(stock) stock_deletions: list[str] = [] for group in stocks_by_group.values(): if len(group) <= 1: continue # Split the group into admin-edited and never-edited admin_edited = [s for s in group if s.modified > s.created] if admin_edited: # Keep the admin-edited stock with the latest modified record_to_keep = max(admin_edited, key=lambda s: s.modified) else: # None were admin-edited; keep the one with the latest modified field record_to_keep = max(group, key=lambda s: s.modified) # Mark all stocks (except the designated one) for deletion. for s in group: if s.uuid != record_to_keep.uuid: stock_deletions.append(str(s.uuid)) if stock_deletions: Stock.objects.filter(uuid__in=stock_deletions).delete() self.stdout.write( self.style.SUCCESS( f"Deleted {len(stock_deletions)} duplicate stock entries." ) ) # 2. Clean up duplicate Category entries based on name (case-insensitive) category_groups = defaultdict(list) for cat in Category.objects.all().order_by("modified"): key: str = cat.name.lower() category_groups[key].append(cat) categories_to_delete: list[str] = [] total_product_updates = 0 for cat_list in category_groups.values(): if len(cat_list) <= 1: continue # Check for admin-edited categories in this group. admin_edited = [c for c in cat_list if c.modified > c.created] if admin_edited: keep_category = max(admin_edited, key=lambda c: c.modified) else: keep_category = max(cat_list, key=lambda c: c.modified) for duplicate in cat_list: if duplicate.uuid == keep_category.uuid: continue total_product_updates += Product.objects.filter( category=duplicate ).update(category=keep_category) categories_to_delete.append(str(duplicate.uuid)) if categories_to_delete: Category.objects.filter(uuid__in=categories_to_delete).delete() self.stdout.write( self.style.SUCCESS( f"Replaced category for {total_product_updates} product(s) " f"and deleted {len(categories_to_delete)} duplicate categories." ) ) # 3. For Products without stocks: set is_active = False. inactive_products = Product.objects.filter(stocks__isnull=True) count_inactive = inactive_products.count() if count_inactive: inactive_products.update(is_active=False) self.stdout.write( self.style.SUCCESS( f"Set {count_inactive} product(s) as inactive due to missing stocks." ) ) # 4. Delete stocks without an associated product. orphan_stocks = Stock.objects.filter(product__isnull=True) orphan_count = orphan_stocks.count() if orphan_count: orphan_stocks.delete() self.stdout.write( self.style.SUCCESS( f"Deleted {orphan_count} stock(s) without an associated product." ) ) self.stdout.write( self.style.SUCCESS( "Started fetching products task in worker container without errors!" ) )