diff --git a/evibes/celery.py b/evibes/celery.py index 0c28594b..b8129c57 100644 --- a/evibes/celery.py +++ b/evibes/celery.py @@ -2,24 +2,13 @@ import os from celery import Celery +from evibes.celery_serializers import register_orjson + os.environ.setdefault("DJANGO_SETTINGS_MODULE", "evibes.settings") app = Celery("evibes") -app.conf.update( - worker_hijack_root_logger=False, - broker_connection_retry_on_startup=True, - task_serializer="json", - result_serializer="json", - result_compression="zlib", - accept_content=["json"], - task_acks_late=True, - task_reject_on_worker_lost=True, -) - -app.conf.task_routes = { - "core.tasks.update_products_task": {"queue": "stock_updater"}, -} +register_orjson() app.config_from_object("django.conf:settings", namespace="CELERY") diff --git a/evibes/celery_serializers.py b/evibes/celery_serializers.py new file mode 100644 index 00000000..60506da8 --- /dev/null +++ b/evibes/celery_serializers.py @@ -0,0 +1,22 @@ +import orjson +from kombu.serialization import register + + +def orjson_dumps(obj): + return orjson.dumps(obj, option=orjson.OPT_NON_STR_KEYS).decode("utf-8") + + +def orjson_loads(data): + if isinstance(data, str): + data = data.encode("utf-8") + return orjson.loads(data) + + +def register_orjson(): + register( + "orjson", + orjson_dumps, + orjson_loads, + content_type="application/json", + content_encoding="utf-8", + ) diff --git a/evibes/settings/celery.py b/evibes/settings/celery.py index 9ff31529..17d3bd93 100644 --- a/evibes/settings/celery.py +++ b/evibes/settings/celery.py @@ -7,13 +7,18 @@ CELERY_TIMEZONE = TIME_ZONE CELERY_BROKER_URL = f"redis://:{REDIS_PASSWORD}@redis:6379/0" +CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True CELERY_BROKER_HEARTBEAT = 10 CELERY_BROKER_HEARTBEAT_CHECKRATE = 2 - CELERY_BROKER_POOL_LIMIT = 10 + CELERY_BROKER_TRANSPORT_OPTIONS = { "visibility_timeout": 3600, - "retry_policy": {"interval_start": 0.1, "interval_step": 0.2, "max_retries": 5}, + "retry_policy": { + "interval_start": 0.1, + "interval_step": 0.2, + "max_retries": 5, + }, "socket_keepalive": True, "socket_timeout": 30, "socket_connect_timeout": 30, @@ -22,6 +27,54 @@ CELERY_BROKER_TRANSPORT_OPTIONS = { CELERY_RESULT_BACKEND = CELERY_BROKER_URL +CELERY_RESULT_EXTENDED = True +CELERY_RESULT_EXPIRES = timedelta(days=1) +CELERY_RESULT_COMPRESSION = "zlib" + +CELERY_TASK_SERIALIZER = "orjson" +CELERY_RESULT_SERIALIZER = "orjson" +CELERY_ACCEPT_CONTENT = ["orjson", "json"] +CELERY_EVENT_SERIALIZER = "orjson" + +CELERY_TASK_ACKS_LATE = True +CELERY_TASK_REJECT_ON_WORKER_LOST = True + +CELERY_TASK_TRACK_STARTED = True +CELERY_TASK_SEND_SENT_EVENT = True +CELERY_WORKER_SEND_TASK_EVENTS = True + +CELERY_TASK_SOFT_TIME_LIMIT = 3600 +CELERY_TASK_TIME_LIMIT = 3900 + +CELERY_TASK_ANNOTATIONS = { + "engine.core.tasks.update_products_task": { + "rate_limit": "1/m", + "time_limit": 7200, + "soft_time_limit": 6900, + "acks_late": True, + "reject_on_worker_lost": True, + }, +} + +CELERY_WORKER_HIJACK_ROOT_LOGGER = False + +CELERY_WORKER_PREFETCH_MULTIPLIER = 4 + +CELERY_WORKER_MAX_TASKS_PER_CHILD = ( + 1000 +) +CELERY_WORKER_DISABLE_RATE_LIMITS = False + +CELERY_TASK_ROUTES = { + "engine.core.tasks.update_products_task": {"queue": "stock_updater"}, +} + +CELERY_TASK_DEFAULT_QUEUE = "default" +CELERY_TASK_DEFAULT_EXCHANGE = "default" +CELERY_TASK_DEFAULT_ROUTING_KEY = "default" + +CELERY_TASK_STORE_ERRORS_EVEN_IF_IGNORED = True + CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler" CELERY_BEAT_SCHEDULE = { diff --git a/scripts/Docker/stock-updater-entrypoint.sh b/scripts/Docker/stock-updater-entrypoint.sh index cc874d0e..942ccbf7 100644 --- a/scripts/Docker/stock-updater-entrypoint.sh +++ b/scripts/Docker/stock-updater-entrypoint.sh @@ -3,4 +3,4 @@ set -e uv run manage.py await_services -uv run celery -A evibes worker --pool=prefork --concurrency=1 --queues=stock_updater --loglevel=info --max-tasks-per-child=1 +uv run celery -A evibes worker --pool=prefork --concurrency=1 --queues=stock_updater --prefetch-multiplier=1 --max-tasks-per-child=5 --max-memory-per-child=1024000 -E diff --git a/scripts/Docker/worker-entrypoint.sh b/scripts/Docker/worker-entrypoint.sh index 7c69a80c..6edf6d5b 100644 --- a/scripts/Docker/worker-entrypoint.sh +++ b/scripts/Docker/worker-entrypoint.sh @@ -3,4 +3,4 @@ set -e uv run manage.py await_services -uv run celery -A evibes worker --pool=prefork --concurrency=8 --loglevel=info -E --queues=default --prefetch-multiplier=1 --max-tasks-per-child=100 --max-memory-per-child=512000 --soft-time-limit=3600 --time-limit=7200 & /opt/evibes-python/bin/celery-prometheus-exporter +uv run celery -A evibes worker --pool=prefork --concurrency=8 --queues=default --prefetch-multiplier=2 --max-tasks-per-child=100 --max-memory-per-child=512000 -E & /opt/evibes-python/bin/celery-prometheus-exporter