402 lines
13 KiB
Python
402 lines
13 KiB
Python
from django.conf import settings
|
|
from django.http import Http404
|
|
from django.utils.text import slugify
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django_elasticsearch_dsl import fields
|
|
from django_elasticsearch_dsl.registries import registry
|
|
from elasticsearch import NotFoundError
|
|
from elasticsearch.dsl import Q, Search
|
|
from rest_framework.request import Request
|
|
|
|
from core.models import Brand, Category, Product
|
|
|
|
SMART_FIELDS = [
|
|
"name^8",
|
|
"name.ngram^8",
|
|
"name.phonetic^6",
|
|
"title^5",
|
|
"title.ngram^4",
|
|
"title.phonetic^2",
|
|
"description^2",
|
|
"description.ngram",
|
|
"description.phonetic",
|
|
"brand_name^5",
|
|
"brand_name.ngram^3",
|
|
"brand_name.auto^4",
|
|
"category_name^3",
|
|
"category_name.ngram^2",
|
|
"category_name.auto^2",
|
|
]
|
|
|
|
functions = [
|
|
{
|
|
"filter": Q("term", **{"_index": "products"}),
|
|
"field_value_factor": {
|
|
"field": "brand_priority",
|
|
"modifier": "log1p",
|
|
"factor": 0.2,
|
|
"missing": 0,
|
|
},
|
|
"weight": 0.6,
|
|
},
|
|
{
|
|
"filter": Q("term", **{"_index": "products"}),
|
|
"field_value_factor": {
|
|
"field": "rating",
|
|
"modifier": "log1p",
|
|
"factor": 0.15,
|
|
"missing": 0,
|
|
},
|
|
"weight": 0.5,
|
|
},
|
|
{
|
|
"filter": Q("term", **{"_index": "products"}),
|
|
"field_value_factor": {
|
|
"field": "total_orders",
|
|
"modifier": "log1p",
|
|
"factor": 0.25,
|
|
"missing": 0,
|
|
},
|
|
"weight": 0.7,
|
|
},
|
|
{
|
|
"filter": Q("term", **{"_index": "products"}),
|
|
"field_value_factor": {
|
|
"field": "category_priority",
|
|
"modifier": "log1p",
|
|
"factor": 0.2,
|
|
"missing": 0,
|
|
},
|
|
"weight": 0.6,
|
|
},
|
|
{
|
|
"filter": Q("term", **{"_index": "categories"}),
|
|
"field_value_factor": {
|
|
"field": "priority",
|
|
"modifier": "log1p",
|
|
"factor": 0.25,
|
|
"missing": 0,
|
|
},
|
|
"weight": 0.8,
|
|
},
|
|
{
|
|
"filter": Q("term", **{"_index": "brands"}),
|
|
"field_value_factor": {
|
|
"field": "priority",
|
|
"modifier": "log1p",
|
|
"factor": 0.25,
|
|
"missing": 0,
|
|
},
|
|
"weight": 0.8,
|
|
},
|
|
]
|
|
|
|
|
|
def process_query(query: str = "", request: Request | None = None) -> dict[str, list[dict]] | None:
|
|
if not query:
|
|
raise ValueError(_("no search term provided."))
|
|
|
|
query = query.strip()
|
|
try:
|
|
exact_shoulds = [
|
|
Q("term", **{"name.raw": {"value": query, "boost": 3.0}}),
|
|
Q("term", **{"slug": {"value": slugify(query), "boost": 2.0}}),
|
|
]
|
|
|
|
lang = ""
|
|
if request and hasattr(request, "LANGUAGE_CODE") and request.LANGUAGE_CODE:
|
|
lang = request.LANGUAGE_CODE.lower()
|
|
base = lang.split("-")[0] if lang else ""
|
|
|
|
is_cjk = base in {"ja", "zh"}
|
|
is_rtl_or_indic = base in {"ar", "hi"}
|
|
|
|
fields_all = SMART_FIELDS[:]
|
|
|
|
if is_cjk or is_rtl_or_indic:
|
|
fields_all = [f for f in fields_all if ".phonetic" not in f]
|
|
|
|
if is_cjk or is_rtl_or_indic:
|
|
fields_all = [
|
|
f.replace("name.ngram^6", "name.ngram^8").replace("title.ngram^4", "title.ngram^6") for f in fields_all
|
|
]
|
|
|
|
if is_cjk or is_rtl_or_indic:
|
|
fuzzy = None
|
|
else:
|
|
fuzzy = "AUTO:5,8"
|
|
|
|
text_shoulds = [
|
|
Q(
|
|
"multi_match",
|
|
query=query,
|
|
fields=fields_all,
|
|
operator="and",
|
|
**({"fuzziness": fuzzy} if fuzzy else {}),
|
|
),
|
|
Q(
|
|
"multi_match",
|
|
query=query,
|
|
fields=[f for f in fields_all if f.endswith(".auto")],
|
|
type="bool_prefix",
|
|
),
|
|
]
|
|
|
|
query_base = Q(
|
|
"bool",
|
|
should=exact_shoulds + text_shoulds,
|
|
minimum_should_match=1,
|
|
)
|
|
|
|
search = (
|
|
Search(index=["products", "categories", "brands", "posts"])
|
|
.query(query_base)
|
|
.extra(
|
|
rescore={
|
|
"window_size": 200,
|
|
"query": {
|
|
"rescore_query": Q(
|
|
"function_score",
|
|
query=Q("match_all"),
|
|
functions=functions,
|
|
boost_mode="sum",
|
|
score_mode="sum",
|
|
max_boost=2.0,
|
|
).to_dict(),
|
|
"query_weight": 1.0,
|
|
"rescore_query_weight": 1.0,
|
|
},
|
|
"track_scores": True,
|
|
}
|
|
)
|
|
.extra(size=100)
|
|
)
|
|
response = search.execute()
|
|
|
|
results: dict = {"products": [], "categories": [], "brands": [], "posts": []}
|
|
uuids_by_index: dict[str, list] = {"products": [], "categories": [], "brands": []}
|
|
hit_cache: list = []
|
|
|
|
for hit in response.hits:
|
|
hit_cache.append(hit)
|
|
if getattr(hit, "uuid", None):
|
|
uuids_by_index.setdefault(hit.meta.index, []).append(str(hit.uuid))
|
|
|
|
products_by_uuid = {}
|
|
brands_by_uuid = {}
|
|
cats_by_uuid = {}
|
|
|
|
if request:
|
|
if uuids_by_index.get("products"):
|
|
products_by_uuid = {
|
|
str(p.uuid): p
|
|
for p in Product.objects.filter(uuid__in=uuids_by_index["products"])
|
|
.select_related("brand", "category")
|
|
.prefetch_related("images")
|
|
}
|
|
if uuids_by_index.get("brands"):
|
|
brands_by_uuid = {str(b.uuid): b for b in Brand.objects.filter(uuid__in=uuids_by_index["brands"])}
|
|
if uuids_by_index.get("categories"):
|
|
cats_by_uuid = {str(c.uuid): c for c in Category.objects.filter(uuid__in=uuids_by_index["categories"])}
|
|
|
|
for hit in hit_cache:
|
|
obj_uuid = getattr(hit, "uuid", None) or hit.meta.id
|
|
obj_name = getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A"
|
|
obj_slug = getattr(hit, "slug", "") or (
|
|
slugify(obj_name) if hit.meta.index in {"brands", "categories"} else ""
|
|
)
|
|
|
|
image_url = None
|
|
idx = hit.meta.index
|
|
if idx == "products" and request:
|
|
prod = products_by_uuid.get(str(obj_uuid))
|
|
if prod:
|
|
first = prod.images.order_by("priority").first()
|
|
if first and first.image:
|
|
image_url = request.build_absolute_uri(first.image.url)
|
|
elif idx == "brands" and request:
|
|
brand = brands_by_uuid.get(str(obj_uuid))
|
|
if brand and brand.small_logo:
|
|
image_url = request.build_absolute_uri(brand.small_logo.url)
|
|
elif idx == "categories" and request:
|
|
cat = cats_by_uuid.get(str(obj_uuid))
|
|
if cat and cat.image:
|
|
image_url = request.build_absolute_uri(cat.image.url)
|
|
|
|
hit_result = {
|
|
"uuid": str(obj_uuid),
|
|
"name": obj_name,
|
|
"slug": obj_slug,
|
|
"image": image_url,
|
|
}
|
|
|
|
if settings.DEBUG:
|
|
if idx == "products":
|
|
hit_result["rating_debug"] = getattr(hit, "rating", 0)
|
|
hit_result["total_orders_debug"] = getattr(hit, "total_orders", 0)
|
|
hit_result["brand_priority_debug"] = getattr(hit, "brand_priority", 0)
|
|
hit_result["category_priority_debug"] = getattr(hit, "category_priority", 0)
|
|
if idx in ("brands", "categories"):
|
|
hit_result["priority_debug"] = getattr(hit, "priority", 0)
|
|
|
|
results[idx].append(hit_result)
|
|
|
|
return results
|
|
except NotFoundError as nfe:
|
|
raise Http404 from nfe
|
|
|
|
|
|
LANGUAGE_ANALYZER_MAP = {
|
|
"cs": "czech",
|
|
"da": "danish",
|
|
"de": "german",
|
|
"en": "english",
|
|
"es": "spanish",
|
|
"fr": "french",
|
|
"it": "italian",
|
|
"nl": "dutch",
|
|
"pt": "portuguese",
|
|
"ro": "romanian",
|
|
"ja": "cjk_search",
|
|
"zh": "cjk_search",
|
|
"ar": "arabic_search",
|
|
"hi": "indic_search",
|
|
"ru": "russian",
|
|
"pl": "standard",
|
|
"kk": "standard",
|
|
}
|
|
|
|
|
|
def _lang_analyzer(lang_code: str) -> str:
|
|
base = lang_code.split("-")[0].lower()
|
|
return LANGUAGE_ANALYZER_MAP.get(base, "icu_query")
|
|
|
|
|
|
class ActiveOnlyMixin:
|
|
"""QuerySet & indexing helpers, so only *active* objects are indexed."""
|
|
|
|
def get_queryset(self):
|
|
return super().get_queryset().filter(is_active=True)
|
|
|
|
def should_index_object(self, obj):
|
|
return getattr(obj, "is_active", False)
|
|
|
|
|
|
COMMON_ANALYSIS = {
|
|
"char_filter": {
|
|
"icu_nfkc_cf": {"type": "icu_normalizer", "name": "nfkc_cf"},
|
|
},
|
|
"filter": {
|
|
"edge_ngram_filter": {"type": "edge_ngram", "min_gram": 1, "max_gram": 20},
|
|
"ngram_filter": {"type": "ngram", "min_gram": 2, "max_gram": 20},
|
|
"cjk_bigram": {"type": "cjk_bigram"},
|
|
"icu_folding": {"type": "icu_folding"},
|
|
"double_metaphone": {"type": "phonetic", "encoder": "double_metaphone", "replace": False},
|
|
"arabic_norm": {"type": "arabic_normalization"},
|
|
"indic_norm": {"type": "indic_normalization"},
|
|
},
|
|
"analyzer": {
|
|
"icu_query": {
|
|
"type": "custom",
|
|
"char_filter": ["icu_nfkc_cf"],
|
|
"tokenizer": "icu_tokenizer",
|
|
"filter": ["lowercase", "icu_folding"],
|
|
},
|
|
"autocomplete": {
|
|
"type": "custom",
|
|
"char_filter": ["icu_nfkc_cf"],
|
|
"tokenizer": "icu_tokenizer",
|
|
"filter": ["lowercase", "icu_folding", "edge_ngram_filter"],
|
|
},
|
|
"autocomplete_search": {
|
|
"type": "custom",
|
|
"char_filter": ["icu_nfkc_cf"],
|
|
"tokenizer": "icu_tokenizer",
|
|
"filter": ["lowercase", "icu_folding"],
|
|
},
|
|
"name_ngram": {
|
|
"type": "custom",
|
|
"char_filter": ["icu_nfkc_cf"],
|
|
"tokenizer": "icu_tokenizer",
|
|
"filter": ["lowercase", "icu_folding", "ngram_filter"],
|
|
},
|
|
"name_phonetic": {
|
|
"type": "custom",
|
|
"char_filter": ["icu_nfkc_cf"],
|
|
"tokenizer": "icu_tokenizer",
|
|
"filter": ["lowercase", "icu_folding", "double_metaphone"],
|
|
},
|
|
"cjk_search": {
|
|
"type": "custom",
|
|
"char_filter": ["icu_nfkc_cf"],
|
|
"tokenizer": "icu_tokenizer",
|
|
"filter": ["lowercase", "icu_folding", "cjk_bigram"],
|
|
},
|
|
"arabic_search": {
|
|
"type": "custom",
|
|
"char_filter": ["icu_nfkc_cf"],
|
|
"tokenizer": "icu_tokenizer",
|
|
"filter": ["lowercase", "icu_folding", "arabic_norm"],
|
|
},
|
|
"indic_search": {
|
|
"type": "custom",
|
|
"char_filter": ["icu_nfkc_cf"],
|
|
"tokenizer": "icu_tokenizer",
|
|
"filter": ["lowercase", "icu_folding", "indic_norm"],
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
def add_multilang_fields(cls):
|
|
"""
|
|
Dynamically add multilingual name/description fields and prepare methods to guard against None.
|
|
"""
|
|
for code, _lang in settings.LANGUAGES:
|
|
lc = code.replace("-", "_").lower()
|
|
# name_{lc}
|
|
name_field = f"name_{lc}"
|
|
setattr(
|
|
cls,
|
|
name_field,
|
|
fields.TextField(
|
|
attr=name_field,
|
|
analyzer=_lang_analyzer(code),
|
|
copy_to="name",
|
|
fields={
|
|
"raw": fields.KeywordField(ignore_above=256),
|
|
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="icu_query"),
|
|
"phonetic": fields.TextField(analyzer="name_phonetic"),
|
|
},
|
|
),
|
|
)
|
|
|
|
# prepare_name_{lc} to ensure no None values
|
|
def make_prepare(attr):
|
|
return lambda self, instance: getattr(instance, attr, "") or ""
|
|
|
|
setattr(cls, f"prepare_{name_field}", make_prepare(name_field))
|
|
|
|
# description_{lc}
|
|
desc_field = f"description_{lc}"
|
|
setattr(
|
|
cls,
|
|
desc_field,
|
|
fields.TextField(
|
|
attr=desc_field,
|
|
analyzer=_lang_analyzer(code),
|
|
copy_to="description",
|
|
fields={
|
|
"raw": fields.KeywordField(ignore_above=256),
|
|
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="icu_query"),
|
|
"phonetic": fields.TextField(analyzer="name_phonetic"),
|
|
},
|
|
),
|
|
)
|
|
setattr(cls, f"prepare_{desc_field}", make_prepare(desc_field))
|
|
|
|
|
|
def populate_index():
|
|
for doc in registry.get_documents(set(registry.get_models())):
|
|
qs = doc().get_indexing_queryset()
|
|
doc().update(qs, parallel=True, refresh=True)
|