Fixes: 1) Remove unused imports in `celery.py`; Extra: Refactor formatting and improve code readability in PowerShell scripts.
28 lines
789 B
Python
28 lines
789 B
Python
import os
|
|
|
|
from celery import Celery
|
|
|
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "evibes.settings")
|
|
|
|
app = Celery("evibes")
|
|
|
|
app.conf.update(
|
|
worker_hijack_root_logger=False,
|
|
worker_log_format="[%(asctime)s: %(levelname)s/%(processName)s] %(name)s: %(message)s",
|
|
worker_task_log_format="[%(asctime)s: %(levelname)s/%(processName)s] %(name)s: %(message)s",
|
|
broker_connection_retry_on_startup=True,
|
|
task_serializer="json",
|
|
result_serializer="json",
|
|
result_compression="zlib",
|
|
accept_content=["json"],
|
|
task_acks_late=True,
|
|
task_reject_on_worker_lost=True,
|
|
)
|
|
|
|
app.conf.task_routes = {
|
|
"core.tasks.update_products_task": {"queue": "stock_updater"},
|
|
}
|
|
|
|
app.config_from_object("django.conf:settings", namespace="CELERY")
|
|
|
|
app.autodiscover_tasks()
|