from datetime import datetime from typing import Any from xml.etree.ElementTree import Element, SubElement from constance import config from django.conf import settings from django.db.models import QuerySet from engine.core.feeds.base import BaseFeedGenerator from engine.core.models import Product class GoogleMerchantFeedGenerator(BaseFeedGenerator): """ Google Merchant Center feed generator. Generates product feeds in Atom/RSS format compatible with Google Shopping. Reference: https://support.google.com/merchants/answer/7052112 """ name: str = "google_merchant" supported_formats: tuple[str, ...] = ("xml", "json") default_format: str = "xml" GOOGLE_NS = "http://base.google.com/ns/1.0" def generate_feed_data(self, products: QuerySet[Product]) -> list[dict[str, Any]]: """Generate feed data as a list of product dictionaries.""" items = [] for product in products: item = self._build_product_item(product) if item: items.append(item) return items def _build_product_item(self, product: Product) -> dict[str, Any] | None: """Build a product item dictionary for the feed.""" if not product.price or product.price <= 0: return None images = self.get_product_images(product) primary_image = images[0] if images else "" additional_images = images[1:10] if len(images) > 1 else [] item = { "id": product.sku, "title": product.name[:150], "description": (product.description or "")[:5000], "link": self.get_product_url(product), "image_link": primary_image, "availability": self.get_availability(product), "price": f"{product.price:.2f} {self.get_currency()}", "brand": product.brand.name if product.brand else "", "condition": "new", "product_type": self._get_product_type(product), } if additional_images: item["additional_image_link"] = additional_images if product.partnumber: item["mpn"] = product.partnumber if product.discount_price: sale_price = product.price - product.discount_price if sale_price > 0: item["sale_price"] = f"{sale_price:.2f} {self.get_currency()}" gtin = self._get_gtin(product) if gtin: item["gtin"] = gtin else: item["identifier_exists"] = "no" return item def _get_product_type(self, product: Product) -> str: """Build the product type hierarchy from category.""" if not product.category: return "" ancestors = product.category.get_ancestors(include_self=True) return " > ".join([cat.name for cat in ancestors]) def _get_gtin(self, product: Product) -> str | None: """Extract GTIN/EAN/UPC from product attributes.""" gtin_names = ["gtin", "ean", "upc", "isbn", "barcode"] for attr_value in product.attributes.all(): if attr_value.attribute.name.lower() in gtin_names: return attr_value.value return None def to_xml(self, data: list[dict[str, Any]]) -> str: """Convert feed data to Google Merchant XML format.""" rss = Element("rss") rss.set("version", "2.0") rss.set("xmlns:g", self.GOOGLE_NS) channel = SubElement(rss, "channel") title = SubElement(channel, "title") title.text = config.COMPANY_NAME or settings.PROJECT_NAME link = SubElement(channel, "link") link.text = f"https://{settings.STOREFRONT_DOMAIN}" description = SubElement(channel, "description") description.text = ( f"Product feed for {config.COMPANY_NAME or settings.PROJECT_NAME}" ) for product_data in data: item = SubElement(channel, "item") self._add_product_to_xml(item, product_data) return self.prettify_xml(rss) def _add_product_to_xml(self, item: Element, product_data: dict[str, Any]) -> None: """Add a product's data to an XML item element.""" simple_fields = [ ("id", "g:id"), ("title", "g:title"), ("description", "g:description"), ("link", "g:link"), ("image_link", "g:image_link"), ("availability", "g:availability"), ("price", "g:price"), ("brand", "g:brand"), ("condition", "g:condition"), ("product_type", "g:product_type"), ("mpn", "g:mpn"), ("gtin", "g:gtin"), ("sale_price", "g:sale_price"), ("identifier_exists", "g:identifier_exists"), ] for data_key, xml_tag in simple_fields: if data_key in product_data and product_data[data_key]: elem = SubElement(item, xml_tag) elem.text = str(product_data[data_key]) additional_images = product_data.get("additional_image_link", []) for img_url in additional_images: elem = SubElement(item, "g:additional_image_link") elem.text = img_url def to_json(self, data: list[dict[str, Any]]) -> str: """Convert feed data to JSON format.""" feed = { "channel": { "title": config.COMPANY_NAME or settings.PROJECT_NAME, "link": f"https://{settings.STOREFRONT_DOMAIN}", "description": f"Product feed for {config.COMPANY_NAME or settings.PROJECT_NAME}", "generated_at": datetime.now().isoformat(), }, "items": data, } return super().to_json(feed)