Celery with Redis takes 20 minutes to set up in development. The production configuration — worker concurrency, task retries, idempotency, error handling, and scheduled tasks — takes experience. This guide covers both, with the production gotchas that most tutorials skip.
Installation
pip install celery redis django-celery-beat django-celery-results
Add to INSTALLED_APPS:
INSTALLED_APPS = [
# ...
"django_celery_beat",
"django_celery_results",
]
Run migrations for the beat scheduler and results backend:
python manage.py migrate
Basic Celery Configuration
Create celery.py in your Django project directory (next to settings.py):
# myproject/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Update __init__.py to load Celery on Django startup:
# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)
Configure Redis as broker and results backend in settings:
# settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "django-db" # Uses django_celery_results
CELERY_CACHE_BACKEND = "default"
CELERY_TIMEZONE = "Asia/Kolkata"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 300 # 5 minutes hard limit per task
CELERY_WORKER_CONCURRENCY = 2 # Adjust based on CPU cores
Writing Your First Task
# myapp/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id: int):
from .models import User
try:
user = User.objects.get(id=user_id)
send_mail(
subject="Welcome!",
message=f"Hi {user.name}, welcome to our platform.",
from_email="[email protected]",
recipient_list=[user.email],
)
except User.DoesNotExist:
# Don't retry if the user doesn't exist
return
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
Key patterns in this task:
- bind=True gives access to self (the task instance) for retries
- max_retries=3 caps the retry count
- countdown=60 * (2 ** self.request.retries) implements exponential backoff: 60s, 120s, 240s
- The User.DoesNotExist branch returns without retrying — retrying a deleted object is pointless
Call it from a view:
from .tasks import send_welcome_email
def register(request):
user = create_user(request.data)
send_welcome_email.delay(user.id) # Enqueued immediately, returns
return Response({"status": "registered"})
.delay() enqueues the task and returns immediately. The response goes back to the client before the email is sent.
Production Worker Configuration
Start the worker with appropriate concurrency:
# For I/O-bound tasks (API calls, email, file processing)
celery -A myproject worker --loglevel=info --concurrency=4
# For CPU-bound tasks on a 2-core VPS
celery -A myproject worker --loglevel=info --concurrency=2
# Prefork (default) is best for CPU-bound, gevent for I/O-bound
celery -A myproject worker --loglevel=info --concurrency=10 --pool=gevent
Run as a systemd service in production (save as /etc/systemd/system/celery.service):
[Unit]
Description=Celery Worker
After=network.target
[Service]
Type=forking
User=www-data
WorkingDirectory=/srv/myproject
ExecStart=/srv/myproject/.venv/bin/celery -A myproject worker \
--loglevel=info \
--concurrency=2 \
--logfile=/var/log/celery/worker.log \
--pidfile=/run/celery/worker.pid
Restart=always
[Install]
WantedBy=multi-user.target
Task Idempotency
Idempotency means running a task twice has the same result as running it once. It is critical for tasks that modify external state (sending emails, charging cards, updating external APIs).
Why retries happen even when you don't expect them: Celery's default acknowledgement mode is acks_early — the message is removed from the Redis queue when the worker receives it, before the task function runs. If the worker process crashes (OOM kill, server restart, power outage) after receiving but before completing the task, the message is gone. acks_late=True changes this: Celery sends the acknowledgement only after the task function returns successfully. If the worker crashes, the message's visibility timeout expires in Redis and the message becomes visible again for the next worker to pick up — but now the task runs again. Without idempotency, acks_late causes double execution; with idempotency, double execution is safe.
Bad — not idempotent:
@shared_task
def charge_user(user_id, amount):
stripe.charge.create(customer=user.stripe_id, amount=amount)
If this task runs twice due to a retry, the user is charged twice.
Good — idempotent:
@shared_task
def charge_user(user_id, amount, idempotency_key):
stripe.charge.create(
customer=user.stripe_id,
amount=amount,
idempotency_key=idempotency_key,
)
Or use a processed-flag approach:
@shared_task
def send_invoice(order_id):
order = Order.objects.get(id=order_id)
if order.invoice_sent:
return # Already done
send_invoice_email(order)
Order.objects.filter(id=order_id).update(invoice_sent=True)
Scheduled Tasks with Celery Beat
django-celery-beat stores schedules in the database, manageable from Django admin:
# settings.py
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
Define periodic tasks programmatically (useful for initial setup):
# In a migration or management command
from django_celery_beat.models import PeriodicTask, IntervalSchedule
import json
schedule, _ = IntervalSchedule.objects.get_or_create(
every=60,
period=IntervalSchedule.SECONDS,
)
PeriodicTask.objects.get_or_create(
name="Flush Redis counters to DB",
defaults={
"task": "myapp.tasks.flush_counters",
"interval": schedule,
},
)
Start the beat scheduler (separate process from the worker):
celery -A myproject beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
Monitoring Tasks
Use Flower for a web-based task monitor:
pip install flower
celery -A myproject flower --port=5555
Access at http://localhost:5555. Shows active workers, task history, retry counts, and failure rates. In production, put Flower behind Nginx with HTTP basic auth — never expose it publicly.
Check task results programmatically:
from celery.result import AsyncResult
result = AsyncResult("task-id-here")
print(result.status) # PENDING, STARTED, SUCCESS, FAILURE, RETRY
print(result.result) # Return value or exception
Common Production Mistakes
Not setting CELERY_TASK_TIME_LIMIT: Without a hard time limit, a hung task occupies a worker forever. Set CELERY_TASK_TIME_LIMIT = 300 (5 minutes) as a sane default.
Using database as broker:django-db as the broker (not the results backend) creates lock contention under load. Use Redis as the broker, django-db as the results backend only.
Not handling SoftTimeLimitExceeded: When a task hits the soft time limit, Celery raises SoftTimeLimitExceeded. Catch it to clean up:
from celery.exceptions import SoftTimeLimitExceeded
@shared_task(soft_time_limit=240, time_limit=300)
def long_running_task():
try:
do_work()
except SoftTimeLimitExceeded:
cleanup()
raise
If you need background tasks, scheduled jobs, or async processing built into your Django application — done correctly with retries, idempotency, and monitoring — hire me as a Django developer or as a backend developer.