import re from django.conf import settings from django.http import Http404 from django.utils.text import slugify from django.utils.translation import gettext_lazy as _ from django_elasticsearch_dsl import fields from django_elasticsearch_dsl.registries import registry from elasticsearch import NotFoundError from elasticsearch.dsl import Q, Search from rest_framework.request import Request from core.models import Brand, Category, Product SMART_FIELDS = [ "name^8", "name.ngram^8", "name.phonetic^6", "title^5", "title.ngram^4", "title.phonetic^2", "description^2", "description.ngram", "description.phonetic", "brand_name^5", "brand_name.ngram^3", "brand_name.auto^4", "category_name^3", "category_name.ngram^2", "category_name.auto^2", "sku^9", "sku.ngram^6", "sku.auto^8", "partnumber^10", "partnumber.ngram^7", "partnumber.auto^9", ] functions = [ { "filter": Q("term", **{"_index": "products"}), "field_value_factor": { "field": "brand_priority", "modifier": "log1p", "factor": 0.2, "missing": 0, }, "weight": 0.6, }, { "filter": Q("term", **{"_index": "products"}), "field_value_factor": { "field": "rating", "modifier": "log1p", "factor": 0.15, "missing": 0, }, "weight": 0.5, }, { "filter": Q("term", **{"_index": "products"}), "field_value_factor": { "field": "total_orders", "modifier": "log1p", "factor": 0.25, "missing": 0, }, "weight": 0.7, }, { "filter": Q("term", **{"_index": "products"}), "field_value_factor": { "field": "category_priority", "modifier": "log1p", "factor": 0.2, "missing": 0, }, "weight": 0.6, }, { "filter": Q("term", **{"_index": "categories"}), "field_value_factor": { "field": "priority", "modifier": "log1p", "factor": 0.25, "missing": 0, }, "weight": 0.8, }, { "filter": Q("term", **{"_index": "brands"}), "field_value_factor": { "field": "priority", "modifier": "log1p", "factor": 0.25, "missing": 0, }, "weight": 0.8, }, ] def process_query(query: str = "", request: Request | None = None) -> dict[str, list[dict]] | None: if not query: raise ValueError(_("no search term provided.")) query = query.strip() try: exact_shoulds = [ Q("term", **{"name.raw": {"value": query, "boost": 3.0}}), Q("term", **{"slug": {"value": slugify(query), "boost": 2.0}}), Q("term", **{"sku.raw": {"value": query.lower(), "boost": 8.0}}), Q("term", **{"partnumber.raw": {"value": query.lower(), "boost": 9.0}}), ] lang = "" if request and hasattr(request, "LANGUAGE_CODE") and request.LANGUAGE_CODE: lang = request.LANGUAGE_CODE.lower() base = lang.split("-")[0] if lang else "" is_cjk = base in {"ja", "zh"} is_rtl_or_indic = base in {"ar", "hi"} fields_all = SMART_FIELDS[:] if is_cjk or is_rtl_or_indic: fields_all = [f for f in fields_all if ".phonetic" not in f] fields_all = [ f.replace("name.ngram^8", "name.ngram^10").replace("title.ngram^4", "title.ngram^6") for f in fields_all ] fuzzy = None if (is_cjk or is_rtl_or_indic) else "AUTO:5,8" is_code_like = bool(re.search(r"[0-9]", query)) and " " not in query text_shoulds = [ Q( "multi_match", query=query, fields=fields_all, operator="and", **({"fuzziness": fuzzy} if fuzzy else {}), ), Q( "multi_match", query=query, fields=[f for f in fields_all if f.endswith(".auto")], type="bool_prefix", ), ] if is_code_like: text_shoulds.extend( [ Q("term", **{"sku.raw": {"value": query.lower(), "boost": 12.0}}), Q("term", **{"partnumber.raw": {"value": query.lower(), "boost": 14.0}}), Q("prefix", **{"sku.raw": {"value": query.lower(), "boost": 6.0}}), Q("prefix", **{"partnumber.raw": {"value": query.lower(), "boost": 7.0}}), ] ) query_base = Q( "bool", should=exact_shoulds + text_shoulds, minimum_should_match=1, ) def build_search(indexes, size): return ( Search(index=indexes) .query(query_base) .extra( rescore={ "window_size": 200, "query": { "rescore_query": Q( "function_score", query=Q("match_all"), functions=functions, boost_mode="sum", score_mode="sum", max_boost=2.0, ).to_dict(), "query_weight": 1.0, "rescore_query_weight": 1.0, }, } ) .extra(size=size, track_total_hits=True) ) search_cats = build_search(["categories"], size=22) search_brands = build_search(["brands"], size=22) search_products = build_search(["products"], size=44) resp_cats = search_cats.execute() resp_brands = search_brands.execute() resp_products = search_products.execute() results: dict = {"products": [], "categories": [], "brands": [], "posts": []} uuids_by_index: dict[str, list] = {"products": [], "categories": [], "brands": []} hit_cache: list = [] for h in list(resp_cats.hits[:12]) + list(resp_brands.hits[:12]) + list(resp_products.hits[:26]): hit_cache.append(h) if getattr(h, "uuid", None): uuids_by_index.setdefault(h.meta.index, []).append(str(h.uuid)) products_by_uuid = {} brands_by_uuid = {} cats_by_uuid = {} if request: if uuids_by_index.get("products"): products_by_uuid = { str(p.uuid): p for p in Product.objects.filter(uuid__in=uuids_by_index["products"]) .select_related("brand", "category") .prefetch_related("images") } if uuids_by_index.get("brands"): brands_by_uuid = {str(b.uuid): b for b in Brand.objects.filter(uuid__in=uuids_by_index["brands"])} if uuids_by_index.get("categories"): cats_by_uuid = {str(c.uuid): c for c in Category.objects.filter(uuid__in=uuids_by_index["categories"])} for hit in hit_cache: obj_uuid = getattr(hit, "uuid", None) or hit.meta.id obj_name = getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A" obj_slug = getattr(hit, "slug", "") or ( slugify(obj_name) if hit.meta.index in {"brands", "categories"} else "" ) image_url = None idx = hit.meta.index if idx == "products" and request: prod = products_by_uuid.get(str(obj_uuid)) if prod: first = prod.images.order_by("priority").first() if first and first.image: image_url = request.build_absolute_uri(first.image.url) elif idx == "brands" and request: brand = brands_by_uuid.get(str(obj_uuid)) if brand and brand.small_logo: image_url = request.build_absolute_uri(brand.small_logo.url) elif idx == "categories" and request: cat = cats_by_uuid.get(str(obj_uuid)) if cat and cat.image: image_url = request.build_absolute_uri(cat.image.url) hit_result = { "uuid": str(obj_uuid), "name": obj_name, "slug": obj_slug, "image": image_url, } if settings.DEBUG: if idx == "products": hit_result["rating_debug"] = getattr(hit, "rating", 0) hit_result["total_orders_debug"] = getattr(hit, "total_orders", 0) hit_result["brand_priority_debug"] = getattr(hit, "brand_priority", 0) hit_result["category_priority_debug"] = getattr(hit, "category_priority", 0) if idx in ("brands", "categories"): hit_result["priority_debug"] = getattr(hit, "priority", 0) results[idx].append(hit_result) return results except NotFoundError as nfe: raise Http404 from nfe LANGUAGE_ANALYZER_MAP = { "cs": "czech", "da": "danish", "de": "german", "en": "english", "es": "spanish", "fr": "french", "it": "italian", "nl": "dutch", "pt": "portuguese", "ro": "romanian", "ja": "cjk_search", "zh": "cjk_search", "ar": "arabic_search", "hi": "indic_search", "ru": "russian", "pl": "standard", "kk": "standard", } def _lang_analyzer(lang_code: str) -> str: base = lang_code.split("-")[0].lower() return LANGUAGE_ANALYZER_MAP.get(base, "icu_query") class ActiveOnlyMixin: """QuerySet & indexing helpers, so only *active* objects are indexed.""" def get_queryset(self): return super().get_queryset().filter(is_active=True) def should_index_object(self, obj): return getattr(obj, "is_active", False) COMMON_ANALYSIS = { "char_filter": { "icu_nfkc_cf": {"type": "icu_normalizer", "name": "nfkc_cf"}, }, "filter": { "edge_ngram_filter": {"type": "edge_ngram", "min_gram": 1, "max_gram": 20}, "ngram_filter": {"type": "ngram", "min_gram": 2, "max_gram": 20}, "cjk_bigram": {"type": "cjk_bigram"}, "icu_folding": {"type": "icu_folding"}, "double_metaphone": {"type": "phonetic", "encoder": "double_metaphone", "replace": False}, "arabic_norm": {"type": "arabic_normalization"}, "indic_norm": {"type": "indic_normalization"}, }, "analyzer": { "icu_query": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding"], }, "autocomplete": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "edge_ngram_filter"], }, "autocomplete_search": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding"], }, "name_ngram": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "ngram_filter"], }, "name_phonetic": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "double_metaphone"], }, "cjk_search": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "cjk_bigram"], }, "arabic_search": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "arabic_norm"], }, "indic_search": { "type": "custom", "char_filter": ["icu_nfkc_cf"], "tokenizer": "icu_tokenizer", "filter": ["lowercase", "icu_folding", "indic_norm"], }, }, "normalizer": { "lc_norm": { "type": "custom", "filter": ["lowercase", "icu_folding"], } }, } def add_multilang_fields(cls): """ Dynamically add multilingual name/description fields and prepare methods to guard against None. """ for code, _lang in settings.LANGUAGES: lc = code.replace("-", "_").lower() # name_{lc} name_field = f"name_{lc}" setattr( cls, name_field, fields.TextField( attr=name_field, analyzer=_lang_analyzer(code), copy_to="name", fields={ "raw": fields.KeywordField(ignore_above=256), "ngram": fields.TextField(analyzer="name_ngram", search_analyzer="icu_query"), "phonetic": fields.TextField(analyzer="name_phonetic"), }, ), ) # prepare_name_{lc} to ensure no None values def make_prepare(attr): return lambda self, instance: getattr(instance, attr, "") or "" setattr(cls, f"prepare_{name_field}", make_prepare(name_field)) # description_{lc} desc_field = f"description_{lc}" setattr( cls, desc_field, fields.TextField( attr=desc_field, analyzer=_lang_analyzer(code), copy_to="description", fields={ "raw": fields.KeywordField(ignore_above=256), "ngram": fields.TextField(analyzer="name_ngram", search_analyzer="icu_query"), "phonetic": fields.TextField(analyzer="name_phonetic"), }, ), ) setattr(cls, f"prepare_{desc_field}", make_prepare(desc_field)) def populate_index(): for doc in registry.get_documents(set(registry.get_models())): qs = doc().get_indexing_queryset() doc().update(qs, parallel=True, refresh=True)