schon/engine/core/feeds/amazon_seller.py
Egor fureunoir Gorbunov adfa2f20dd feat(feeds): add marketplace-specific feed generators
add feed generators for Google Merchant, Amazon Seller, Yandex Market, and Yandex Products. Includes a shared base class (`BaseFeedGenerator`) for common functionality, such as product retrieval and validation. Each generator supports format-specific output and handles platform-specific requirements.

This change simplifies the creation of product feeds for multiple marketplaces.
2026-01-26 17:44:21 +03:00

241 lines
8.8 KiB
Python

from datetime import datetime
from typing import Any
from xml.etree.ElementTree import Element, SubElement
from constance import config
from django.conf import settings
from django.db.models import QuerySet
from engine.core.feeds.base import BaseFeedGenerator
from engine.core.models import Product
class AmazonSellerFeedGenerator(BaseFeedGenerator):
"""
Amazon Seller Central feed generator.
Generates product feeds in Amazon's XML format for Seller Central.
Reference: https://developer-docs.amazon.com/sp-api/docs/feeds-api-v2021-06-30-reference
"""
name: str = "amazon_seller"
supported_formats: tuple[str, ...] = ("xml", "json", "yaml")
default_format: str = "xml"
AMAZON_NS = "http://www.amazon.com/schema/merchant/product/2024-01"
def generate_feed_data(self, products: QuerySet[Product]) -> dict[str, Any]:
"""Generate feed data as a structured dictionary."""
messages = []
for idx, product in enumerate(products, start=1):
message = self._build_message(product, idx)
if message:
messages.append(message)
return {
"header": {
"documentVersion": "1.0",
"merchantIdentifier": config.COMPANY_NAME or settings.PROJECT_NAME,
},
"messageType": "Product",
"purgeAndReplace": False,
"messages": messages,
"generated_at": datetime.now().isoformat(),
}
def _build_message(
self, product: Product, message_id: int
) -> dict[str, Any] | None:
"""Build a message dictionary for a product."""
if not product.price or product.price <= 0:
return None
images = self.get_product_images(product)
description_data: dict[str, Any] = {
"title": product.name[:200],
"brand": product.brand.name if product.brand else "",
"description": (product.description or "")[:2000],
"bulletPoint": self._get_bullet_points(product),
"manufacturer": product.brand.name
if product.brand
else config.COMPANY_NAME or "",
"itemType": self._get_item_type(product),
}
if images:
description_data["mainImage"] = {
"imageType": "Main",
"imageLocation": images[0],
}
if len(images) > 1:
description_data["otherImages"] = [
{"imageType": f"PT{i}", "imageLocation": img}
for i, img in enumerate(images[1:9], start=1)
]
message: dict[str, Any] = {
"messageID": message_id,
"operationType": "Update",
"product": {
"sku": product.sku,
"standardProductID": self._get_standard_product_id(product),
"productTaxCode": "A_GEN_TAX",
"descriptionData": description_data,
"productData": self._get_product_data(product),
},
}
return message
def _get_standard_product_id(self, product: Product) -> dict[str, str]:
"""Get standard product identifier (EAN/UPC/GTIN)."""
id_types = [
("ean", "EAN"),
("upc", "UPC"),
("gtin", "GTIN"),
("isbn", "ISBN"),
]
for attr_value in product.attributes.all():
attr_name_lower = attr_value.attribute.name.lower()
for attr_key, amazon_type in id_types:
if attr_name_lower == attr_key:
return {"type": amazon_type, "value": attr_value.value}
if product.partnumber:
return {"type": "PrivateLabel", "value": product.partnumber}
return {"type": "PrivateLabel", "value": product.sku}
def _get_bullet_points(self, product: Product) -> list[str]:
"""Generate bullet points from product attributes."""
bullet_points = []
for attr_value in product.attributes.all()[:5]:
bullet_points.append(f"{attr_value.attribute.name}: {attr_value.value}")
return bullet_points
def _get_item_type(self, product: Product) -> str:
"""Get the item type from category."""
if product.category:
return product.category.name
return "General"
def _get_product_data(self, product: Product) -> dict[str, Any]:
"""Get additional product data."""
data = {
"price": {
"standardPrice": {
"value": product.price,
"currency": self.get_currency(),
},
},
"inventory": {
"quantity": product.quantity,
"fulfillmentLatency": 3,
},
}
if product.discount_price:
sale_price = product.price - product.discount_price
if sale_price > 0:
data["price"]["salePrice"] = {
"value": sale_price,
"currency": self.get_currency(),
}
return data
def to_xml(self, data: dict[str, Any]) -> str:
"""Convert feed data to Amazon XML format."""
envelope = Element("AmazonEnvelope")
envelope.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
envelope.set("xsi:noNamespaceSchemaLocation", "amzn-envelope.xsd")
header = SubElement(envelope, "Header")
doc_version = SubElement(header, "DocumentVersion")
doc_version.text = data["header"]["documentVersion"]
merchant_id = SubElement(header, "MerchantIdentifier")
merchant_id.text = data["header"]["merchantIdentifier"]
message_type = SubElement(envelope, "MessageType")
message_type.text = data["messageType"]
purge = SubElement(envelope, "PurgeAndReplace")
purge.text = "true" if data["purgeAndReplace"] else "false"
for msg_data in data["messages"]:
self._add_message_to_xml(envelope, msg_data)
return '<?xml version="1.0" encoding="UTF-8"?>\n' + self.prettify_xml(envelope)
def _add_message_to_xml(self, envelope: Element, msg_data: dict[str, Any]) -> None:
"""Add a message to the XML envelope."""
message = SubElement(envelope, "Message")
message_id = SubElement(message, "MessageID")
message_id.text = str(msg_data["messageID"])
operation_type = SubElement(message, "OperationType")
operation_type.text = msg_data["operationType"]
product = SubElement(message, "Product")
product_data = msg_data["product"]
sku = SubElement(product, "SKU")
sku.text = product_data["sku"]
std_product_id = SubElement(product, "StandardProductID")
type_elem = SubElement(std_product_id, "Type")
type_elem.text = product_data["standardProductID"]["type"]
value_elem = SubElement(std_product_id, "Value")
value_elem.text = product_data["standardProductID"]["value"]
tax_code = SubElement(product, "ProductTaxCode")
tax_code.text = product_data["productTaxCode"]
desc_data = SubElement(product, "DescriptionData")
self._add_description_data_to_xml(desc_data, product_data["descriptionData"])
def _add_description_data_to_xml(
self, parent: Element, desc_data: dict[str, Any]
) -> None:
"""Add description data to the XML element."""
title = SubElement(parent, "Title")
title.text = desc_data["title"]
if desc_data.get("brand"):
brand = SubElement(parent, "Brand")
brand.text = desc_data["brand"]
if desc_data.get("description"):
description = SubElement(parent, "Description")
description.text = desc_data["description"]
for bullet in desc_data.get("bulletPoint", []):
bullet_point = SubElement(parent, "BulletPoint")
bullet_point.text = bullet
if desc_data.get("manufacturer"):
manufacturer = SubElement(parent, "Manufacturer")
manufacturer.text = desc_data["manufacturer"]
if desc_data.get("itemType"):
item_type = SubElement(parent, "ItemType")
item_type.text = desc_data["itemType"]
if desc_data.get("mainImage"):
main_image = SubElement(parent, "MainImage")
img_type = SubElement(main_image, "ImageType")
img_type.text = desc_data["mainImage"]["imageType"]
img_loc = SubElement(main_image, "ImageLocation")
img_loc.text = desc_data["mainImage"]["imageLocation"]
for other_img in desc_data.get("otherImages", []):
other_image = SubElement(parent, "OtherImage")
img_type = SubElement(other_image, "ImageType")
img_type.text = other_img["imageType"]
img_loc = SubElement(other_image, "ImageLocation")
img_loc.text = other_img["imageLocation"]