Features: 1) Add orjson serializer and integrate it into Celery configuration for enhanced performance and compatibility; 2) Introduce configurable task settings such as soft and hard time limits, task tracking, and event serialization for better control and observability.
Fixes: 1) Update worker entrypoints to adjust prefetch multiplier and memory/task limits for optimized resource usage. Extra: 1) Refactor Celery settings into a dedicated file for improved organization and maintainability; 2) Adjust Docker entrypoints to align with updated task configurations; 3) Register `orjson` serializer in a separate module for cleaner code structure.
This commit is contained in:
parent
4e89b2aeec
commit
4c7b40b899
5 changed files with 82 additions and 18 deletions
|
|
@ -2,24 +2,13 @@ import os
|
||||||
|
|
||||||
from celery import Celery
|
from celery import Celery
|
||||||
|
|
||||||
|
from evibes.celery_serializers import register_orjson
|
||||||
|
|
||||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "evibes.settings")
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "evibes.settings")
|
||||||
|
|
||||||
app = Celery("evibes")
|
app = Celery("evibes")
|
||||||
|
|
||||||
app.conf.update(
|
register_orjson()
|
||||||
worker_hijack_root_logger=False,
|
|
||||||
broker_connection_retry_on_startup=True,
|
|
||||||
task_serializer="json",
|
|
||||||
result_serializer="json",
|
|
||||||
result_compression="zlib",
|
|
||||||
accept_content=["json"],
|
|
||||||
task_acks_late=True,
|
|
||||||
task_reject_on_worker_lost=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
app.conf.task_routes = {
|
|
||||||
"core.tasks.update_products_task": {"queue": "stock_updater"},
|
|
||||||
}
|
|
||||||
|
|
||||||
app.config_from_object("django.conf:settings", namespace="CELERY")
|
app.config_from_object("django.conf:settings", namespace="CELERY")
|
||||||
|
|
||||||
|
|
|
||||||
22
evibes/celery_serializers.py
Normal file
22
evibes/celery_serializers.py
Normal file
|
|
@ -0,0 +1,22 @@
|
||||||
|
import orjson
|
||||||
|
from kombu.serialization import register
|
||||||
|
|
||||||
|
|
||||||
|
def orjson_dumps(obj):
|
||||||
|
return orjson.dumps(obj, option=orjson.OPT_NON_STR_KEYS).decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def orjson_loads(data):
|
||||||
|
if isinstance(data, str):
|
||||||
|
data = data.encode("utf-8")
|
||||||
|
return orjson.loads(data)
|
||||||
|
|
||||||
|
|
||||||
|
def register_orjson():
|
||||||
|
register(
|
||||||
|
"orjson",
|
||||||
|
orjson_dumps,
|
||||||
|
orjson_loads,
|
||||||
|
content_type="application/json",
|
||||||
|
content_encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
@ -7,13 +7,18 @@ CELERY_TIMEZONE = TIME_ZONE
|
||||||
|
|
||||||
CELERY_BROKER_URL = f"redis://:{REDIS_PASSWORD}@redis:6379/0"
|
CELERY_BROKER_URL = f"redis://:{REDIS_PASSWORD}@redis:6379/0"
|
||||||
|
|
||||||
|
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
|
||||||
CELERY_BROKER_HEARTBEAT = 10
|
CELERY_BROKER_HEARTBEAT = 10
|
||||||
CELERY_BROKER_HEARTBEAT_CHECKRATE = 2
|
CELERY_BROKER_HEARTBEAT_CHECKRATE = 2
|
||||||
|
|
||||||
CELERY_BROKER_POOL_LIMIT = 10
|
CELERY_BROKER_POOL_LIMIT = 10
|
||||||
|
|
||||||
CELERY_BROKER_TRANSPORT_OPTIONS = {
|
CELERY_BROKER_TRANSPORT_OPTIONS = {
|
||||||
"visibility_timeout": 3600,
|
"visibility_timeout": 3600,
|
||||||
"retry_policy": {"interval_start": 0.1, "interval_step": 0.2, "max_retries": 5},
|
"retry_policy": {
|
||||||
|
"interval_start": 0.1,
|
||||||
|
"interval_step": 0.2,
|
||||||
|
"max_retries": 5,
|
||||||
|
},
|
||||||
"socket_keepalive": True,
|
"socket_keepalive": True,
|
||||||
"socket_timeout": 30,
|
"socket_timeout": 30,
|
||||||
"socket_connect_timeout": 30,
|
"socket_connect_timeout": 30,
|
||||||
|
|
@ -22,6 +27,54 @@ CELERY_BROKER_TRANSPORT_OPTIONS = {
|
||||||
|
|
||||||
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
|
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
|
||||||
|
|
||||||
|
CELERY_RESULT_EXTENDED = True
|
||||||
|
CELERY_RESULT_EXPIRES = timedelta(days=1)
|
||||||
|
CELERY_RESULT_COMPRESSION = "zlib"
|
||||||
|
|
||||||
|
CELERY_TASK_SERIALIZER = "orjson"
|
||||||
|
CELERY_RESULT_SERIALIZER = "orjson"
|
||||||
|
CELERY_ACCEPT_CONTENT = ["orjson", "json"]
|
||||||
|
CELERY_EVENT_SERIALIZER = "orjson"
|
||||||
|
|
||||||
|
CELERY_TASK_ACKS_LATE = True
|
||||||
|
CELERY_TASK_REJECT_ON_WORKER_LOST = True
|
||||||
|
|
||||||
|
CELERY_TASK_TRACK_STARTED = True
|
||||||
|
CELERY_TASK_SEND_SENT_EVENT = True
|
||||||
|
CELERY_WORKER_SEND_TASK_EVENTS = True
|
||||||
|
|
||||||
|
CELERY_TASK_SOFT_TIME_LIMIT = 3600
|
||||||
|
CELERY_TASK_TIME_LIMIT = 3900
|
||||||
|
|
||||||
|
CELERY_TASK_ANNOTATIONS = {
|
||||||
|
"engine.core.tasks.update_products_task": {
|
||||||
|
"rate_limit": "1/m",
|
||||||
|
"time_limit": 7200,
|
||||||
|
"soft_time_limit": 6900,
|
||||||
|
"acks_late": True,
|
||||||
|
"reject_on_worker_lost": True,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
|
||||||
|
|
||||||
|
CELERY_WORKER_PREFETCH_MULTIPLIER = 4
|
||||||
|
|
||||||
|
CELERY_WORKER_MAX_TASKS_PER_CHILD = (
|
||||||
|
1000
|
||||||
|
)
|
||||||
|
CELERY_WORKER_DISABLE_RATE_LIMITS = False
|
||||||
|
|
||||||
|
CELERY_TASK_ROUTES = {
|
||||||
|
"engine.core.tasks.update_products_task": {"queue": "stock_updater"},
|
||||||
|
}
|
||||||
|
|
||||||
|
CELERY_TASK_DEFAULT_QUEUE = "default"
|
||||||
|
CELERY_TASK_DEFAULT_EXCHANGE = "default"
|
||||||
|
CELERY_TASK_DEFAULT_ROUTING_KEY = "default"
|
||||||
|
|
||||||
|
CELERY_TASK_STORE_ERRORS_EVEN_IF_IGNORED = True
|
||||||
|
|
||||||
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler"
|
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler"
|
||||||
|
|
||||||
CELERY_BEAT_SCHEDULE = {
|
CELERY_BEAT_SCHEDULE = {
|
||||||
|
|
|
||||||
|
|
@ -3,4 +3,4 @@ set -e
|
||||||
|
|
||||||
uv run manage.py await_services
|
uv run manage.py await_services
|
||||||
|
|
||||||
uv run celery -A evibes worker --pool=prefork --concurrency=1 --queues=stock_updater --loglevel=info --max-tasks-per-child=1
|
uv run celery -A evibes worker --pool=prefork --concurrency=1 --queues=stock_updater --prefetch-multiplier=1 --max-tasks-per-child=5 --max-memory-per-child=1024000 -E
|
||||||
|
|
|
||||||
|
|
@ -3,4 +3,4 @@ set -e
|
||||||
|
|
||||||
uv run manage.py await_services
|
uv run manage.py await_services
|
||||||
|
|
||||||
uv run celery -A evibes worker --pool=prefork --concurrency=8 --loglevel=info -E --queues=default --prefetch-multiplier=1 --max-tasks-per-child=100 --max-memory-per-child=512000 --soft-time-limit=3600 --time-limit=7200 & /opt/evibes-python/bin/celery-prometheus-exporter
|
uv run celery -A evibes worker --pool=prefork --concurrency=8 --queues=default --prefetch-multiplier=2 --max-tasks-per-child=100 --max-memory-per-child=512000 -E & /opt/evibes-python/bin/celery-prometheus-exporter
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue