245 lines
7.6 KiB
Python
245 lines
7.6 KiB
Python
from django.conf import settings
|
||
from django.http import Http404
|
||
from django.shortcuts import get_object_or_404
|
||
from django.utils.text import slugify
|
||
from django.utils.translation import gettext_lazy as _
|
||
from django_elasticsearch_dsl import fields
|
||
from django_elasticsearch_dsl.registries import registry
|
||
from elasticsearch import NotFoundError
|
||
from elasticsearch.dsl import Q, Search
|
||
from rest_framework.request import Request
|
||
|
||
from core.models import Brand, Category, Product
|
||
|
||
SMART_FIELDS = [
|
||
"name^4",
|
||
"name.ngram^3",
|
||
"name.phonetic",
|
||
"description^2",
|
||
"description.ngram",
|
||
"description.phonetic",
|
||
"name.auto^4",
|
||
"description.auto^2",
|
||
"brand__name^2",
|
||
"brand__name.ngram",
|
||
"brand__name.auto",
|
||
"category__name^2",
|
||
"category__name.ngram",
|
||
"category__name.auto",
|
||
"title^4",
|
||
"title.ngram^3",
|
||
"title.phonetic",
|
||
"title.auto^4",
|
||
]
|
||
|
||
|
||
def process_query(query: str = "", request: Request | None = None):
|
||
"""
|
||
Perform a lenient, typo‑tolerant, multi‑index search.
|
||
|
||
* Full‑text with fuzziness for spelling mistakes
|
||
* `bool_prefix` for edge‑ngram autocomplete / “icontains”
|
||
"""
|
||
if not query:
|
||
raise ValueError(_("no search term provided."))
|
||
|
||
query = query.strip()
|
||
try:
|
||
q = Q(
|
||
"bool",
|
||
should=[
|
||
Q(
|
||
"multi_match",
|
||
query=query,
|
||
fields=SMART_FIELDS,
|
||
fuzziness="AUTO",
|
||
operator="and",
|
||
),
|
||
Q(
|
||
"multi_match",
|
||
query=query,
|
||
fields=[f for f in SMART_FIELDS if f.endswith(".auto")],
|
||
type="bool_prefix",
|
||
),
|
||
],
|
||
minimum_should_match=1,
|
||
)
|
||
|
||
search = (
|
||
Search(index=["products", "categories", "brands", "posts"])
|
||
.query(q)
|
||
.extra(size=100)
|
||
)
|
||
response = search.execute()
|
||
|
||
results: dict = {"products": [], "categories": [], "brands": [], "posts": []}
|
||
for hit in response.hits:
|
||
obj_uuid = getattr(hit, "uuid", None) or hit.meta.id
|
||
obj_name = (
|
||
getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A"
|
||
)
|
||
obj_slug = ""
|
||
raw_slug = getattr(hit, "slug", None)
|
||
if raw_slug:
|
||
obj_slug = raw_slug
|
||
elif hit.meta.index == "brands":
|
||
obj_slug = slugify(obj_name)
|
||
elif hit.meta.index == "categories":
|
||
obj_slug = slugify(f"{obj_name}")
|
||
|
||
image_url = None
|
||
idx = hit.meta.index
|
||
if idx == "products" and request:
|
||
prod = get_object_or_404(Product, uuid=obj_uuid)
|
||
first = prod.images.order_by("priority").first()
|
||
if first and first.image:
|
||
image_url = request.build_absolute_uri(first.image.url)
|
||
elif idx == "brands" and request:
|
||
brand = get_object_or_404(Brand, uuid=obj_uuid)
|
||
if brand.small_logo:
|
||
image_url = request.build_absolute_uri(brand.small_logo.url)
|
||
elif idx == "categories" and request:
|
||
cat = get_object_or_404(Category, uuid=obj_uuid)
|
||
if cat.image:
|
||
image_url = request.build_absolute_uri(cat.image.url)
|
||
|
||
results[idx].append(
|
||
{
|
||
"uuid": str(obj_uuid),
|
||
"name": obj_name,
|
||
"slug": obj_slug,
|
||
"image": image_url,
|
||
}
|
||
)
|
||
|
||
return results
|
||
except NotFoundError:
|
||
raise Http404
|
||
|
||
|
||
LANGUAGE_ANALYZER_MAP = {
|
||
"ar": "arabic",
|
||
"cs": "czech",
|
||
"da": "danish",
|
||
"de": "german",
|
||
"en": "english",
|
||
"es": "spanish",
|
||
"fr": "french",
|
||
"hi": "hindi",
|
||
"it": "italian",
|
||
"ja": "standard",
|
||
"kk": "standard",
|
||
"nl": "dutch",
|
||
"pl": "standard",
|
||
"pt": "portuguese",
|
||
"ro": "romanian",
|
||
"ru": "russian",
|
||
"zh": "standard",
|
||
}
|
||
|
||
|
||
def _lang_analyzer(lang_code: str) -> str:
|
||
"""Return the best‑guess ES analyzer for an ISO language code."""
|
||
base = lang_code.split("-")[0].lower()
|
||
return LANGUAGE_ANALYZER_MAP.get(base, "standard")
|
||
|
||
|
||
class ActiveOnlyMixin:
|
||
"""QuerySet & indexing helpers, so only *active* objects are indexed."""
|
||
|
||
def get_queryset(self):
|
||
return super().get_queryset().filter(is_active=True)
|
||
|
||
def should_index_object(self, obj):
|
||
return getattr(obj, "is_active", False)
|
||
|
||
|
||
COMMON_ANALYSIS = {
|
||
"filter": {
|
||
"edge_ngram_filter": {"type": "edge_ngram", "min_gram": 1, "max_gram": 20},
|
||
"ngram_filter": {"type": "ngram", "min_gram": 2, "max_gram": 20},
|
||
"double_metaphone": {
|
||
"type": "phonetic",
|
||
"encoder": "double_metaphone",
|
||
"replace": False,
|
||
},
|
||
},
|
||
"analyzer": {
|
||
"autocomplete": {
|
||
"tokenizer": "standard",
|
||
"filter": ["lowercase", "asciifolding", "edge_ngram_filter"],
|
||
},
|
||
"autocomplete_search": {
|
||
"tokenizer": "standard",
|
||
"filter": ["lowercase", "asciifolding"],
|
||
},
|
||
"name_ngram": {
|
||
"tokenizer": "standard",
|
||
"filter": ["lowercase", "asciifolding", "ngram_filter"],
|
||
},
|
||
"name_phonetic": {
|
||
"tokenizer": "standard",
|
||
"filter": ["lowercase", "asciifolding", "double_metaphone"],
|
||
},
|
||
"query_lc": {"tokenizer": "standard", "filter": ["lowercase", "asciifolding"]},
|
||
},
|
||
"normalizer": {"lc": {"type": "custom", "filter": ["lowercase", "asciifolding"]}},
|
||
}
|
||
|
||
|
||
def _add_multilang_fields(cls):
|
||
"""
|
||
Dynamically add multilingual name/description fields and prepare methods to guard against None.
|
||
"""
|
||
for code, _lang in settings.LANGUAGES:
|
||
lc = code.replace("-", "_").lower()
|
||
# name_{lc}
|
||
name_field = f"name_{lc}"
|
||
setattr(
|
||
cls,
|
||
name_field,
|
||
fields.TextField(
|
||
attr=name_field,
|
||
analyzer=_lang_analyzer(code),
|
||
copy_to="name",
|
||
fields={
|
||
"raw": fields.KeywordField(ignore_above=256),
|
||
"ngram": fields.TextField(
|
||
analyzer="name_ngram", search_analyzer="query_lc"
|
||
),
|
||
"phonetic": fields.TextField(analyzer="name_phonetic"),
|
||
},
|
||
),
|
||
)
|
||
|
||
# prepare_name_{lc} to ensure no None values
|
||
def make_prepare(attr):
|
||
return lambda self, instance: getattr(instance, attr, "") or ""
|
||
|
||
setattr(cls, f"prepare_{name_field}", make_prepare(name_field))
|
||
|
||
# description_{lc}
|
||
desc_field = f"description_{lc}"
|
||
setattr(
|
||
cls,
|
||
desc_field,
|
||
fields.TextField(
|
||
attr=desc_field,
|
||
analyzer=_lang_analyzer(code),
|
||
copy_to="description",
|
||
fields={
|
||
"raw": fields.KeywordField(ignore_above=256),
|
||
"ngram": fields.TextField(
|
||
analyzer="name_ngram", search_analyzer="query_lc"
|
||
),
|
||
"phonetic": fields.TextField(analyzer="name_phonetic"),
|
||
},
|
||
),
|
||
)
|
||
setattr(cls, f"prepare_{desc_field}", make_prepare(desc_field))
|
||
|
||
|
||
def populate_index():
|
||
for doc in registry.get_documents(set(registry.get_models())):
|
||
qs = doc().get_indexing_queryset()
|
||
doc().update(qs, parallel=True, refresh=True)
|