from django.conf import settings from django.http import Http404 from django.shortcuts import get_object_or_404 from django.utils.text import slugify from django.utils.translation import gettext_lazy as _ from django_elasticsearch_dsl import fields from django_elasticsearch_dsl.registries import registry from elasticsearch import NotFoundError from elasticsearch.dsl import Q, Search from rest_framework.request import Request from core.models import Brand, Category, Product SMART_FIELDS = [ "name^4", "name.ngram^3", "name.phonetic", "description^2", "description.ngram", "description.phonetic", "name.auto^4", "description.auto^2", "brand__name^2", "brand__name.ngram", "brand__name.auto", "category__name^2", "category__name.ngram", "category__name.auto", "title^4", "title.ngram^3", "title.phonetic", "title.auto^4", "name.sat^6", "title.sat^4", ] def process_query(query: str = "", request: Request | None = None): if not (query := query.strip()): raise ValueError(_("no search term provided.")) sat_match = Q( "multi_match", query=query, type="phrase_prefix", fields=[f for f in SMART_FIELDS if ".sat" in f], ) fuzzy_match = Q( "multi_match", query=query, fields=SMART_FIELDS, fuzziness="AUTO", operator="and", ) prefix_match = Q( "multi_match", query=query, fields=[f for f in SMART_FIELDS if f.endswith(".auto")], type="bool_prefix", ) combined = Q( "bool", should=[sat_match, fuzzy_match, prefix_match], minimum_should_match=1, ) functions = [ { "filter": Q("prefix", **{"name.raw": query.lower()}), "weight": 5, }, { "gauss": { "sales_rank": { "origin": 100, "scale": 500, "offset": 0, "decay": 0.3, } }, "weight": 3, }, ] boosted = Q( "function_score", query=combined, boost_mode="sum", score_mode="sum", functions=functions, ) search = ( Search(index=["products", "categories", "brands", "posts"]) .query(boosted) .extra(size=100) ) try: response = search.execute() except NotFoundError: raise Http404 results = {"products": [], "categories": [], "brands": [], "posts": []} for hit in response.hits: obj_uuid = getattr(hit, "uuid", None) or hit.meta.id obj_name = getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A" raw_slug = getattr(hit, "slug", None) or "" obj_slug = ( raw_slug or slugify(obj_name) if hit.meta.index in {"brands", "categories"} else raw_slug ) image_url = None idx = hit.meta.index if request: if idx == "products": prod = get_object_or_404(Product, uuid=obj_uuid) first = prod.images.order_by("priority").first() image_url = ( request.build_absolute_uri(first.image.url) if first and first.image else None ) if idx == "brands": brand = get_object_or_404(Brand, uuid=obj_uuid) image_url = ( request.build_absolute_uri(brand.small_logo.url) if brand.small_logo else None ) if idx == "categories": cat = get_object_or_404(Category, uuid=obj_uuid) image_url = ( request.build_absolute_uri(cat.image.url) if cat.image else None ) results[idx].append( { "uuid": str(obj_uuid), "name": obj_name, "slug": obj_slug, "image": image_url, } ) return results LANGUAGE_ANALYZER_MAP = { "ar": "arabic", "cs": "czech", "da": "danish", "de": "german", "en": "english", "es": "spanish", "fr": "french", "hi": "hindi", "it": "italian", "ja": "standard", "kk": "standard", "nl": "dutch", "pl": "standard", "pt": "portuguese", "ro": "romanian", "ru": "russian", "zh": "standard", } def _lang_analyzer(lang_code: str) -> str: """Return the best‑guess ES analyzer for an ISO language code.""" base = lang_code.split("-")[0].lower() return LANGUAGE_ANALYZER_MAP.get(base, "standard") class ActiveOnlyMixin: """QuerySet & indexing helpers, so only *active* objects are indexed.""" def get_queryset(self): return super().get_queryset().filter(is_active=True) def should_index_object(self, obj): return getattr(obj, "is_active", False) COMMON_ANALYSIS = { "filter": { "edge_ngram_filter": {"type": "edge_ngram", "min_gram": 1, "max_gram": 20}, "ngram_filter": {"type": "ngram", "min_gram": 2, "max_gram": 20}, "double_metaphone": { "type": "phonetic", "encoder": "double_metaphone", "replace": False, }, }, "analyzer": { "autocomplete": { "tokenizer": "standard", "filter": ["lowercase", "asciifolding", "edge_ngram_filter"], }, "autocomplete_search": { "tokenizer": "standard", "filter": ["lowercase", "asciifolding"], }, "name_ngram": { "tokenizer": "standard", "filter": ["lowercase", "asciifolding", "ngram_filter"], }, "name_phonetic": { "tokenizer": "standard", "filter": ["lowercase", "asciifolding", "double_metaphone"], }, "query_lc": {"tokenizer": "standard", "filter": ["lowercase", "asciifolding"]}, }, "normalizer": {"lc": {"type": "custom", "filter": ["lowercase", "asciifolding"]}}, } def _add_multilang_fields(cls): """ Dynamically add multilingual name/description fields and prepare methods to guard against None. """ for code, _lang in settings.LANGUAGES: lc = code.replace("-", "_").lower() # name_{lc} name_field = f"name_{lc}" setattr( cls, name_field, fields.TextField( attr=name_field, analyzer=_lang_analyzer(code), copy_to="name", fields={ "raw": fields.KeywordField(ignore_above=256), "ngram": fields.TextField( analyzer="name_ngram", search_analyzer="query_lc" ), "phonetic": fields.TextField(analyzer="name_phonetic"), }, ), ) # prepare_name_{lc} to ensure no None values def make_prepare(attr): return lambda self, instance: getattr(instance, attr, "") or "" setattr(cls, f"prepare_{name_field}", make_prepare(name_field)) # description_{lc} desc_field = f"description_{lc}" setattr( cls, desc_field, fields.TextField( attr=desc_field, analyzer=_lang_analyzer(code), copy_to="description", fields={ "raw": fields.KeywordField(ignore_above=256), "ngram": fields.TextField( analyzer="name_ngram", search_analyzer="query_lc" ), "phonetic": fields.TextField(analyzer="name_phonetic"), }, ), ) setattr(cls, f"prepare_{desc_field}", make_prepare(desc_field)) def populate_index(): for doc in registry.get_documents(set(registry.get_models())): qs = doc().get_indexing_queryset() doc().update(qs, parallel=True, refresh=True)