import threading import time from os import getenv from typing import Any import psycopg import redis from django.core.management.base import BaseCommand from django.db.utils import OperationalError from redis.exceptions import ConnectionError # noqa: A004 class Command(BaseCommand): def handle(self, *args: list[Any], **options: dict[Any, Any]) -> None: self.stdout.write("Waiting for services...") def wait_for_db() -> None: db_up = False while not db_up: try: conn = psycopg.connect( dbname=getenv("POSTGRES_DB"), user=getenv("POSTGRES_USER"), password=getenv("POSTGRES_PASSWORD"), host=getenv("DB_HOST", "database"), ) conn.close() db_up = True except (OperationalError, psycopg.OperationalError): self.stdout.write("Database unavailable, waiting 1 second...") time.sleep(1) self.stdout.write(self.style.SUCCESS("Database available!")) def wait_for_redis() -> None: redis_up = False while not redis_up: try: connection = redis.StrictRedis( host=getenv("REDIS_HOST", "redis"), port=int(getenv("REDIS_PORT", 6379)), db=int(getenv("REDIS_DB", 0)), password=getenv("REDIS_PASSWORD", None), socket_timeout=5, retry_on_timeout=True, ) connection.ping() redis_up = True except ConnectionError: self.stdout.write("Redis unavailable, waiting 1 second...") time.sleep(1) self.stdout.write(self.style.SUCCESS("Redis available!")) # Create and start threads for the database and Redis db_thread = threading.Thread(target=wait_for_db) redis_thread = threading.Thread(target=wait_for_redis) db_thread.start() redis_thread.start() # Wait for both threads to complete db_thread.join() redis_thread.join() self.stdout.write(self.style.SUCCESS("All services are available!"))