schon/core/elasticsearch/__init__.py

401 lines
13 KiB
Python

from django.conf import settings
from django.http import Http404
from django.utils.text import slugify
from django.utils.translation import gettext_lazy as _
from django_elasticsearch_dsl import fields
from django_elasticsearch_dsl.registries import registry
from elasticsearch import NotFoundError
from elasticsearch.dsl import Q, Search
from rest_framework.request import Request
from core.models import Brand, Category, Product
SMART_FIELDS = [
"name^8",
"name.ngram^8",
"name.phonetic^6",
"title^5",
"title.ngram^4",
"title.phonetic^2",
"description^2",
"description.ngram",
"description.phonetic",
"brand_name^5",
"brand_name.ngram^3",
"brand_name.auto^4",
"category_name^3",
"category_name.ngram^2",
"category_name.auto^2",
]
functions = [
{
"filter": Q("term", **{"_index": "products"}),
"field_value_factor": {
"field": "brand_priority",
"modifier": "log1p",
"factor": 0.2,
"missing": 0,
},
"weight": 0.6,
},
{
"filter": Q("term", **{"_index": "products"}),
"field_value_factor": {
"field": "rating",
"modifier": "log1p",
"factor": 0.15,
"missing": 0,
},
"weight": 0.5,
},
{
"filter": Q("term", **{"_index": "products"}),
"field_value_factor": {
"field": "total_orders",
"modifier": "log1p",
"factor": 0.25,
"missing": 0,
},
"weight": 0.7,
},
{
"filter": Q("term", **{"_index": "products"}),
"field_value_factor": {
"field": "category_priority",
"modifier": "log1p",
"factor": 0.2,
"missing": 0,
},
"weight": 0.6,
},
{
"filter": Q("term", **{"_index": "categories"}),
"field_value_factor": {
"field": "priority",
"modifier": "log1p",
"factor": 0.25,
"missing": 0,
},
"weight": 0.8,
},
{
"filter": Q("term", **{"_index": "brands"}),
"field_value_factor": {
"field": "priority",
"modifier": "log1p",
"factor": 0.25,
"missing": 0,
},
"weight": 0.8,
},
]
def process_query(query: str = "", request: Request | None = None) -> dict[str, list[dict]] | None:
if not query:
raise ValueError(_("no search term provided."))
query = query.strip()
try:
exact_shoulds = [
Q("term", **{"name.raw": {"value": query, "boost": 3.0}}),
Q("term", **{"slug": {"value": slugify(query), "boost": 2.0}}),
]
lang = ""
if request and hasattr(request, "LANGUAGE_CODE") and request.LANGUAGE_CODE:
lang = request.LANGUAGE_CODE.lower()
base = lang.split("-")[0] if lang else ""
is_cjk = base in {"ja", "zh"}
is_rtl_or_indic = base in {"ar", "hi"}
fields_all = SMART_FIELDS[:]
if is_cjk or is_rtl_or_indic:
fields_all = [f for f in fields_all if ".phonetic" not in f]
if is_cjk or is_rtl_or_indic:
fields_all = [
f.replace("name.ngram^6", "name.ngram^8").replace("title.ngram^4", "title.ngram^6") for f in fields_all
]
if is_cjk or is_rtl_or_indic:
fuzzy = None
else:
fuzzy = "AUTO:5,8"
text_shoulds = [
Q(
"multi_match",
query=query,
fields=fields_all,
operator="and",
**({"fuzziness": fuzzy} if fuzzy else {}),
),
Q(
"multi_match",
query=query,
fields=[f for f in fields_all if f.endswith(".auto")],
type="bool_prefix",
),
]
query_base = Q(
"bool",
should=exact_shoulds + text_shoulds,
minimum_should_match=1,
)
search = (
Search(index=["products", "categories", "brands", "posts"])
.query(query_base)
.extra(
rescore={
"window_size": 200,
"query": {
"rescore_query": Q(
"function_score",
query=Q("match_all"),
functions=functions,
boost_mode="sum",
score_mode="sum",
max_boost=2.0,
).to_dict(),
"query_weight": 1.0,
"rescore_query_weight": 1.0,
},
}
)
.extra(size=100)
)
response = search.execute()
results: dict = {"products": [], "categories": [], "brands": [], "posts": []}
uuids_by_index: dict[str, list] = {"products": [], "categories": [], "brands": []}
hit_cache: list = []
for hit in response.hits:
hit_cache.append(hit)
if getattr(hit, "uuid", None):
uuids_by_index.setdefault(hit.meta.index, []).append(str(hit.uuid))
products_by_uuid = {}
brands_by_uuid = {}
cats_by_uuid = {}
if request:
if uuids_by_index.get("products"):
products_by_uuid = {
str(p.uuid): p
for p in Product.objects.filter(uuid__in=uuids_by_index["products"])
.select_related("brand", "category")
.prefetch_related("images")
}
if uuids_by_index.get("brands"):
brands_by_uuid = {str(b.uuid): b for b in Brand.objects.filter(uuid__in=uuids_by_index["brands"])}
if uuids_by_index.get("categories"):
cats_by_uuid = {str(c.uuid): c for c in Category.objects.filter(uuid__in=uuids_by_index["categories"])}
for hit in hit_cache:
obj_uuid = getattr(hit, "uuid", None) or hit.meta.id
obj_name = getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A"
obj_slug = getattr(hit, "slug", "") or (
slugify(obj_name) if hit.meta.index in {"brands", "categories"} else ""
)
image_url = None
idx = hit.meta.index
if idx == "products" and request:
prod = products_by_uuid.get(str(obj_uuid))
if prod:
first = prod.images.order_by("priority").first()
if first and first.image:
image_url = request.build_absolute_uri(first.image.url)
elif idx == "brands" and request:
brand = brands_by_uuid.get(str(obj_uuid))
if brand and brand.small_logo:
image_url = request.build_absolute_uri(brand.small_logo.url)
elif idx == "categories" and request:
cat = cats_by_uuid.get(str(obj_uuid))
if cat and cat.image:
image_url = request.build_absolute_uri(cat.image.url)
hit_result = {
"uuid": str(obj_uuid),
"name": obj_name,
"slug": obj_slug,
"image": image_url,
}
if settings.DEBUG:
if idx == "products":
hit_result["rating_debug"] = getattr(hit, "rating", 0)
hit_result["total_orders_debug"] = getattr(hit, "total_orders", 0)
hit_result["brand_priority_debug"] = getattr(hit, "brand_priority", 0)
hit_result["category_priority_debug"] = getattr(hit, "category_priority", 0)
if idx in ("brands", "categories"):
hit_result["priority_debug"] = getattr(hit, "priority", 0)
results[idx].append(hit_result)
return results
except NotFoundError as nfe:
raise Http404 from nfe
LANGUAGE_ANALYZER_MAP = {
"cs": "czech",
"da": "danish",
"de": "german",
"en": "english",
"es": "spanish",
"fr": "french",
"it": "italian",
"nl": "dutch",
"pt": "portuguese",
"ro": "romanian",
"ja": "cjk_search",
"zh": "cjk_search",
"ar": "arabic_search",
"hi": "indic_search",
"ru": "russian",
"pl": "standard",
"kk": "standard",
}
def _lang_analyzer(lang_code: str) -> str:
base = lang_code.split("-")[0].lower()
return LANGUAGE_ANALYZER_MAP.get(base, "icu_query")
class ActiveOnlyMixin:
"""QuerySet & indexing helpers, so only *active* objects are indexed."""
def get_queryset(self):
return super().get_queryset().filter(is_active=True)
def should_index_object(self, obj):
return getattr(obj, "is_active", False)
COMMON_ANALYSIS = {
"char_filter": {
"icu_nfkc_cf": {"type": "icu_normalizer", "name": "nfkc_cf"},
},
"filter": {
"edge_ngram_filter": {"type": "edge_ngram", "min_gram": 1, "max_gram": 20},
"ngram_filter": {"type": "ngram", "min_gram": 2, "max_gram": 20},
"cjk_bigram": {"type": "cjk_bigram"},
"icu_folding": {"type": "icu_folding"},
"double_metaphone": {"type": "phonetic", "encoder": "double_metaphone", "replace": False},
"arabic_norm": {"type": "arabic_normalization"},
"indic_norm": {"type": "indic_normalization"},
},
"analyzer": {
"icu_query": {
"type": "custom",
"char_filter": ["icu_nfkc_cf"],
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "icu_folding"],
},
"autocomplete": {
"type": "custom",
"char_filter": ["icu_nfkc_cf"],
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "icu_folding", "edge_ngram_filter"],
},
"autocomplete_search": {
"type": "custom",
"char_filter": ["icu_nfkc_cf"],
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "icu_folding"],
},
"name_ngram": {
"type": "custom",
"char_filter": ["icu_nfkc_cf"],
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "icu_folding", "ngram_filter"],
},
"name_phonetic": {
"type": "custom",
"char_filter": ["icu_nfkc_cf"],
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "icu_folding", "double_metaphone"],
},
"cjk_search": {
"type": "custom",
"char_filter": ["icu_nfkc_cf"],
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "icu_folding", "cjk_bigram"],
},
"arabic_search": {
"type": "custom",
"char_filter": ["icu_nfkc_cf"],
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "icu_folding", "arabic_norm"],
},
"indic_search": {
"type": "custom",
"char_filter": ["icu_nfkc_cf"],
"tokenizer": "icu_tokenizer",
"filter": ["lowercase", "icu_folding", "indic_norm"],
},
},
}
def add_multilang_fields(cls):
"""
Dynamically add multilingual name/description fields and prepare methods to guard against None.
"""
for code, _lang in settings.LANGUAGES:
lc = code.replace("-", "_").lower()
# name_{lc}
name_field = f"name_{lc}"
setattr(
cls,
name_field,
fields.TextField(
attr=name_field,
analyzer=_lang_analyzer(code),
copy_to="name",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="icu_query"),
"phonetic": fields.TextField(analyzer="name_phonetic"),
},
),
)
# prepare_name_{lc} to ensure no None values
def make_prepare(attr):
return lambda self, instance: getattr(instance, attr, "") or ""
setattr(cls, f"prepare_{name_field}", make_prepare(name_field))
# description_{lc}
desc_field = f"description_{lc}"
setattr(
cls,
desc_field,
fields.TextField(
attr=desc_field,
analyzer=_lang_analyzer(code),
copy_to="description",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(analyzer="name_ngram", search_analyzer="icu_query"),
"phonetic": fields.TextField(analyzer="name_phonetic"),
},
),
)
setattr(cls, f"prepare_{desc_field}", make_prepare(desc_field))
def populate_index():
for doc in registry.get_documents(set(registry.get_models())):
qs = doc().get_indexing_queryset()
doc().update(qs, parallel=True, refresh=True)