schon/core/elasticsearch/__init__.py
Egor fureunoir Gorbunov 97829d23a6 Features: 1) Add suggest, price, and quantity fields to ProductDocument; 2) Introduce prepare_suggest method for enhancing autocomplete functionality; 3) Enhance search with filters, function scoring, and result aggregation; 4) Add new analyzers and token filters for synonyms and stopwords.
Fixes: 1) Remove unused imports in `core/elasticsearch/__init__.py`.

Extra: Refactor `process_query` for improved readability and functionality; update aggregation and result processing logic; reformat and clean up code.
2025-06-20 03:55:56 +03:00

279 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from django.conf import settings
from django.shortcuts import get_object_or_404
from django.utils.text import slugify
from django_elasticsearch_dsl import fields
from django_elasticsearch_dsl.registries import registry
from elasticsearch.dsl import SF, Q, Search
from rest_framework.request import Request
from core.models import Brand, Category, Product
SMART_FIELDS = [
"name^4",
"name.ngram^3",
"name.phonetic",
"description^2",
"description.ngram",
"description.phonetic",
"name.auto^4",
"description.auto^2",
"brand__name^2",
"brand__name.ngram",
"brand__name.auto",
"category__name^2",
"category__name.ngram",
"category__name.auto",
"title^4",
"title.ngram^3",
"title.phonetic",
"title.auto^4",
]
def process_query(
query: str = "", request: Request | None = None, filters: dict | None = None
):
if not query:
raise ValueError("no search term provided.")
filters = filters or {}
base_q = Q(
"bool",
should=[
Q(
"multi_match",
query=query,
fields=SMART_FIELDS,
fuzziness="AUTO",
operator="and",
),
Q(
"multi_match",
query=query,
fields=[f for f in SMART_FIELDS if f.endswith(".auto")],
type="bool_prefix",
),
],
minimum_should_match=1,
)
functions = [
SF("field_value_factor", field="quantity", modifier="log1p", missing=0),
SF("field_value_factor", field="rating", modifier="sqrt", missing=0),
SF("exp", field="created_at", origin="now", scale="30d"), # newness boost
]
fq = [
Q("function_score", query=base_q, functions=functions, boost_mode="sum"),
]
if "category" in filters:
fq.append(Q("term", category__slug=filters["category"]))
if "brand" in filters:
fq.append(Q("term", brand__slug=filters["brand"]))
if "price_min" in filters or "price_max" in filters:
range_q = {}
if "price_min" in filters:
range_q["gte"] = filters["price_min"]
if "price_max" in filters:
range_q["lte"] = filters["price_max"]
fq.append(Q("range", price=range_q))
search = (
Search(index=["products", "categories", "brands", "posts"])
.query(base_q)
.filter(*fq)
.extra(size=20)
.highlight("description", fragment_size=150)
.extra(
aggs={
"by_category": {"terms": {"field": "category.keyword"}},
"by_brand": {"terms": {"field": "brand.keyword"}},
"price_stats": {"stats": {"field": "price"}},
}
)
)
response = search.execute()
results: dict = {"products": [], "categories": [], "brands": [], "posts": []}
for hit in response.hits:
obj_uuid = getattr(hit, "uuid", None) or hit.meta.id
obj_name = getattr(hit, "name", None) or getattr(hit, "title", None) or "N/A"
obj_slug = ""
raw_slug = getattr(hit, "slug", None)
if raw_slug:
obj_slug = raw_slug
elif hit.meta.index == "brands":
obj_slug = slugify(obj_name)
elif hit.meta.index == "categories":
obj_slug = slugify(f"{obj_name}")
image_url = None
idx = hit.meta.index
if idx == "products" and request:
prod = get_object_or_404(Product, uuid=obj_uuid)
first = prod.images.order_by("priority").first()
if first and first.image:
image_url = request.build_absolute_uri(first.image.url)
elif idx == "brands" and request:
brand = get_object_or_404(Brand, uuid=obj_uuid)
if brand.small_logo:
image_url = request.build_absolute_uri(brand.small_logo.url)
elif idx == "categories" and request:
cat = get_object_or_404(Category, uuid=obj_uuid)
if cat.image:
image_url = request.build_absolute_uri(cat.image.url)
results[idx].append(
{
"uuid": str(obj_uuid),
"name": obj_name,
"slug": obj_slug,
"image": image_url,
}
)
facets = {
"categories": [
(b.key, b.doc_count) for b in response.aggregations.by_category.buckets
],
"brands": [
(b.key, b.doc_count) for b in response.aggregations.by_brand.buckets
],
"price": {
"min": response.aggregations.price_stats.min,
"max": response.aggregations.price_stats.max,
"avg": response.aggregations.price_stats.avg,
},
}
return {"results": results, "facets": facets}
LANGUAGE_ANALYZER_MAP = {
"ar": "arabic",
"cs": "czech",
"da": "danish",
"de": "german",
"en": "english",
"es": "spanish",
"fr": "french",
"hi": "hindi",
"it": "italian",
"ja": "standard",
"kk": "standard",
"nl": "dutch",
"pl": "standard",
"pt": "portuguese",
"ro": "romanian",
"ru": "russian",
"zh": "standard",
}
def _lang_analyzer(lang_code: str) -> str:
"""Return the bestguess ES analyzer for an ISO language code."""
base = lang_code.split("-")[0].lower()
return LANGUAGE_ANALYZER_MAP.get(base, "standard")
class ActiveOnlyMixin:
"""QuerySet & indexing helpers, so only *active* objects are indexed."""
def get_queryset(self):
return super().get_queryset().filter(is_active=True)
def should_index_object(self, obj):
return getattr(obj, "is_active", False)
COMMON_ANALYSIS = {
"filter": {
"edge_ngram_filter": {"type": "edge_ngram", "min_gram": 1, "max_gram": 20},
"ngram_filter": {"type": "ngram", "min_gram": 2, "max_gram": 20},
"double_metaphone": {
"type": "phonetic",
"encoder": "double_metaphone",
"replace": False,
},
"synonym_filter": {"type": "synonym", "synonyms_path": "analysis/synonyms.txt"},
"english_stop": {"type": "stop", "stopwords": "_english_"},
},
"analyzer": {
"autocomplete": {
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding", "edge_ngram_filter"],
},
"autocomplete_search": {
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding"],
},
"name_ngram": {
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding", "ngram_filter"],
},
"synonym_analyzer": {
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding", "synonym_filter"],
},
"name_phonetic": {
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding", "double_metaphone"],
},
"query_lc": {"tokenizer": "standard", "filter": ["lowercase", "asciifolding"]},
},
}
def _add_multilang_fields(cls):
"""
Dynamically add multilingual name/description fields and prepare methods to guard against None.
"""
for code, _lang in settings.LANGUAGES:
lc = code.replace("-", "_").lower()
# name_{lc}
name_field = f"name_{lc}"
setattr(
cls,
name_field,
fields.TextField(
attr=name_field,
analyzer=_lang_analyzer(code),
copy_to="name",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(
analyzer="name_ngram", search_analyzer="query_lc"
),
"phonetic": fields.TextField(analyzer="name_phonetic"),
},
),
)
# prepare_name_{lc} to ensure no None values
def make_prepare(attr):
return lambda self, instance: getattr(instance, attr, "") or ""
setattr(cls, f"prepare_{name_field}", make_prepare(name_field))
# description_{lc}
desc_field = f"description_{lc}"
setattr(
cls,
desc_field,
fields.TextField(
attr=desc_field,
analyzer=_lang_analyzer(code),
copy_to="description",
fields={
"raw": fields.KeywordField(ignore_above=256),
"ngram": fields.TextField(
analyzer="name_ngram", search_analyzer="query_lc"
),
"phonetic": fields.TextField(analyzer="name_phonetic"),
},
),
)
setattr(cls, f"prepare_{desc_field}", make_prepare(desc_field))
def populate_index():
for doc in registry.get_documents(set(registry.get_models())):
qs = doc().get_indexing_queryset()
doc().update(qs, parallel=True, refresh=True)