from django.conf import settings from django.http import Http404 from django.utils.text import slugify from django.utils.translation import gettext_lazy as _ from django_elasticsearch_dsl import fields from django_elasticsearch_dsl.registries import registry from elasticsearch import NotFoundError from elasticsearch.dsl import Q, Search from rest_framework.request import Request from core.models import Brand, Category, Product SMART_FIELDS = [ "name^8", "name.ngram^6", "name.phonetic^3", "title^5", "title.ngram^4", "title.phonetic^2", "description^2", "description.ngram", "description.phonetic", "brand_name^4", "brand_name.ngram^3", "brand_name.auto^3", "category_name^3", "category_name.ngram^2", "category_name.auto^2", ] functions = [ { "filter": Q("term", **{"_index": "products"}), "field_value_factor": { "field": "brand_priority", "modifier": "log1p", "factor": 1.5, "missing": 0, }, }, { "filter": Q("term", **{"_index": "products"}), "field_value_factor": { "field": "rating", "modifier": "log1p", "factor": 2.0, "missing": 0, }, }, { "filter": Q("term", **{"_index": "products"}), "field_value_factor": { "field": "total_orders", "modifier": "log1p", "factor": 3.0, "missing": 0, }, }, { "filter": Q("term", **{"_index": "products"}), "field_value_factor": { "field": "category_priority", "modifier": "log1p", "factor": 1.2, "missing": 0, }, }, { "filter": Q("term", **{"_index": "categories"}), "field_value_factor": { "field": "priority", "modifier": "log1p", "factor": 2.0, "missing": 0, }, }, { "filter": Q("term", **{"_index": "brands"}), "field_value_factor": { "field": "priority", "modifier": "log1p", "factor": 2.0, "missing": 0, }, }, ] def process_query(query: str = "", request: Request | None = None) -> dict[str, list[dict]] | None: if not query: raise ValueError(_("no search term provided.")) query = query.strip() try: exact_shoulds = [ Q("term", **{"name.raw": query}), Q("term", **{"slug": slugify(query)}), ] text_shoulds = [ Q( "multi_match", query=query, fields=SMART_FIELDS, fuzziness="AUTO", operator="and", ), Q( "multi_match", query=query, fields=[f for f in SMART_FIELDS if f.endswith(".auto")], type="bool_prefix", ), ] query_base = Q( "bool", should=exact_shoulds + text_shoulds, minimum_should_match=1, ) function_score_query = Q( "function_score", query=query_base, functions=functions, boost_mode="multiply", score_mode="sum", ) search = Search(index=["products", "categories", "brands", "posts"]).query(function_score_query).extra(size=100) response = search.execute() # Batch-load related image data to avoid N+1 queries results: dict = {"products": [], "categories": [], "brands": [], "posts": []} uuids_by_index: dict[str, list] = {"products": [], "categories": [], "brands": []} hit_cache: list = [] for hit in response.hits: hit_cache.append(hit) if getattr(hit, "uuid", None): uuids_by_index.setdefault(hit.meta.index, []).append(str(hit.uuid)) products_by_uuid = {} brands_by_uuid = {} cats_by_uuid = {} if request: if uuids_by_index.get("products"): products_by_uuid = { str(p.uuid): p for p in Product.objects.filter(uuid__in=uuids_by_index["products"]) .select_related("brand", "category") .prefetch_related("images") } if uuids_by_index.get("brands"): brands_by_uuid = {str(b.uuid): b for b in Brand.objects.filter(uuid__in=uuids_by_index["brands"])} if uuids_by_index.get("categories"): cats_by_uuid = {str(c.uuid): c for c in Category.objects.filter(uuid__in=uuids_by_index["categories"])} for hit in hit_cache: obj_uuid = getattr(hit, "uuid", None) or hit.meta.id obj_name = getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A" obj_slug = getattr(hit, "slug", "") or ( slugify(obj_name) if hit.meta.index in {"brands", "categories"} else "" ) image_url = None idx = hit.meta.index if idx == "products" and request: prod = products_by_uuid.get(str(obj_uuid)) if prod: first = prod.images.order_by("priority").first() if first and first.image: image_url = request.build_absolute_uri(first.image.url) elif idx == "brands" and request: brand = brands_by_uuid.get(str(obj_uuid)) if brand and brand.small_logo: image_url = request.build_absolute_uri(brand.small_logo.url) elif idx == "categories" and request: cat = cats_by_uuid.get(str(obj_uuid)) if cat and cat.image: image_url = request.build_absolute_uri(cat.image.url) hit_result = { "uuid": str(obj_uuid), "name": obj_name, "slug": obj_slug, "image": image_url, } if settings.DEBUG: if idx == "products": hit_result["rating_debug"] = getattr(hit, "rating", 0) hit_result["total_orders_debug"] = getattr(hit, "total_orders", 0) hit_result["brand_priority_debug"] = getattr(hit, "brand_priority", 0) hit_result["category_priority_debug"] = getattr(hit, "category_priority", 0) if idx in ("brands", "categories"): hit_result["priority_debug"] = getattr(hit, "priority", 0) results[idx].append(hit_result) return results except NotFoundError as nfe: raise Http404 from nfe LANGUAGE_ANALYZER_MAP = { "cs": "czech", "da": "danish", "de": "german", "en": "english", "es": "spanish", "fr": "french", "it": "italian", "nl": "dutch", "pt": "portuguese", "ro": "romanian", "ja": "cjk_search", "zh": "cjk_search", "ar": "arabic_search", "hi": "indic_search", "ru": "russian", "pl": "standard", "kk": "standard", } def _lang_analyzer(lang_code: str) -> str: base = lang_code.split("-")[0].lower() return LANGUAGE_ANALYZER_MAP.get(base, "icu_query") class ActiveOnlyMixin: """QuerySet & indexing helpers, so only *active* objects are indexed.""" def get_queryset(self): return super().get_queryset().filter(is_active=True) def should_index_object(self, obj): return getattr(obj, "is_active", False) COMMON_ANALYSIS = { "char_filter": { # ICU normalizer tidies up Unicode (compatibility forms etc.) "icu_nfkc_cf": {"type": "icu_normalizer", "name": "nfkc_cf"}, }, "filter": { "edge_ngram_filter": {"type": "edge_ngram", "min_gram": 1, "max_gram": 20}, "ngram_filter": {"type": "ngram", "min_gram": 2, "max_gram": 20}, # CJK bigramming helps ja/zh when no language plugin is present "cjk_bigram": {"type": "cjk_bigram"}, # ICU casefolding/diacritics for *all* scripts "icu_folding": {"type": "icu_folding"}, # Your existing phonetic encoder (mainly helpful for Latin languages) "double_metaphone": {"type": "phonetic", "encoder": "double_metaphone", "replace": False}, # Script-specific light normalizers "arabic_norm": {"type": "arabic_normalization"}, "indic_norm": {"type": "indic_normalization"}, }, "analyzer": { # Generic query analyzer: ICU normalize+fold across scripts "icu_query": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding"], }, # Autocomplete (works well for all scripts thanks to icu_tokenizer) "autocomplete": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "edge_ngram_filter"], }, "autocomplete_search": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding"], }, # Content ngram for recall (again ICU-aware) "name_ngram": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "ngram_filter"], }, # Phonetic for Latin fallback "name_phonetic": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "double_metaphone"], }, # CJK search analyzer (no stemming; bigram + ICU) "cjk_search": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "cjk_bigram"], }, # Arabic & Indic light normalizations (no stemming; reliable & fast) "arabic_search": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "arabic_norm"], }, "indic_search": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "indic_norm"], }, }, } def _add_multilang_fields(cls): """ Dynamically add multilingual name/description fields and prepare methods to guard against None. """ for code, _lang in settings.LANGUAGES: lc = code.replace("-", "_").lower() # name_{lc} name_field = f"name_{lc}" setattr( cls, name_field, fields.TextField( attr=name_field, analyzer=_lang_analyzer(code), copy_to="name", fields={ "raw": fields.KeywordField(ignore_above=256), "ngram": fields.TextField(analyzer="name_ngram", search_analyzer="icu_query"), "phonetic": fields.TextField(analyzer="name_phonetic"), }, ), ) # prepare_name_{lc} to ensure no None values def make_prepare(attr): return lambda self, instance: getattr(instance, attr, "") or "" setattr(cls, f"prepare_{name_field}", make_prepare(name_field)) # description_{lc} desc_field = f"description_{lc}" setattr( cls, desc_field, fields.TextField( attr=desc_field, analyzer=_lang_analyzer(code), copy_to="description", fields={ "raw": fields.KeywordField(ignore_above=256), "ngram": fields.TextField(analyzer="name_ngram", search_analyzer="icu_query"), "phonetic": fields.TextField(analyzer="name_phonetic"), }, ), ) setattr(cls, f"prepare_{desc_field}", make_prepare(desc_field)) def populate_index(): for doc in registry.get_documents(set(registry.get_models())): qs = doc().get_indexing_queryset() doc().update(qs, parallel=True, refresh=True)