02. Tasks, queues, beat — the developer's daily surface¶
~11 min read. Defining tasks, routing to queues, composing tasks together, scheduling periodics. The day-to-day Celery surface every team writes.
Builds on: 01-broker-worker-protocol-internals.md.
The previous chapter explained the protocol. This chapter is what you actually write — task definitions, queue routing, composition patterns, and the beat scheduler for periodic work.
1) Defining tasks — the basics¶
# myapp/tasks.py
from celery import shared_task
@shared_task
def send_welcome_email(user_id):
user = User.objects.get(pk=user_id)
send_email(user.email, "Welcome", render_welcome(user))
shared_task decorates a function so it can be enqueued and executed by Celery. The function body is normal Python — ORM queries, side effects, return values.
To enqueue:
send_welcome_email.delay(42) # shortcut for apply_async
send_welcome_email.apply_async(args=[42], countdown=60) # delay 60 seconds
send_welcome_email.apply_async(args=[42], eta=datetime(2026, 6, 1, 10, 0))
Differences:
delay()— fire-and-forget; immediate enqueue.apply_async()— explicit options (queue, countdown, eta, expires, headers).s()— signature; useful for composition (next section).
2) Task options — what to set per task¶
@shared_task(
bind=True,
name='myapp.send_email',
autoretry_for=(SMTPException, ConnectionError),
retry_kwargs={'max_retries': 5},
retry_backoff=True,
retry_backoff_max=300,
retry_jitter=True,
acks_late=True,
soft_time_limit=30,
time_limit=60,
rate_limit='10/s',
)
def send_email(self, to, subject, body):
...
Key options:
bind=True— first arg is the task instance; lets you accessself.request.id,self.retry(), etc.name=...— explicit task name. Default ismodule.function. Stable name survives refactor.autoretry_for=(...)— auto-retry on these exception classes. Avoids manualself.retry()calls.retry_backoff=True— exponential backoff between retries (1s, 2s, 4s, 8s, ...).retry_backoff_max=300— cap the backoff at 5 minutes.retry_jitter=True— add randomness so retries don't synchronise across workers.acks_late=True— per-task override of the global setting.soft_time_limit— raisesSoftTimeLimitExceededin the task; gives it a chance to clean up.time_limit— kills the worker hard after this time.rate_limit='10/s'— at most 10 of this task per second per worker.
3) Queues — routing tasks by class¶
A single queue forces all tasks to compete for the same workers. With diverse task profiles, you want separate queues:
# myapp/celery.py
app.conf.task_routes = {
'myapp.send_email': {'queue': 'emails'},
'myapp.generate_report': {'queue': 'reports'},
'myapp.charge_payment': {'queue': 'payments'},
}
Now send_email.delay(...) goes to the emails queue. Workers can be started per queue:
# Worker for fast email tasks
celery -A myapp worker --queues=emails --concurrency=8 --prefetch-multiplier=4
# Worker for slow reports
celery -A myapp worker --queues=reports --concurrency=2 --prefetch-multiplier=1
# Worker for critical payments
celery -A myapp worker --queues=payments --concurrency=4 --prefetch-multiplier=1
The benefits:
- No head-of-line blocking. A slow report doesn't delay an email.
- Independent scaling. Add report workers without touching email workers.
- Isolation. A bug in report code can't starve email processing.
- Priority. Critical queues get more workers; low-priority queues get fewer.
Routing can also be dynamic:
4) Composition — chain, group, chord¶
For workflows of multiple tasks:
Chain — sequential pipeline.
from celery import chain
workflow = chain(
fetch_data.s(user_id=42),
transform_data.s(),
save_result.s(),
)
workflow.apply_async()
Each task's return value is passed as the first arg to the next. Useful for ETL-style pipelines.
Group — parallel fan-out.
from celery import group
job = group(
process_chunk.s(chunk_id=i) for i in range(100)
).apply_async()
All 100 tasks run in parallel. The group itself is a result object; job.ready() returns True when all complete.
Chord — fan-out then aggregate.
from celery import chord
job = chord(
(process_chunk.s(chunk_id=i) for i in range(100)),
aggregate_results.s(),
)
job.apply_async()
100 chunks run in parallel; when all done, aggregate_results is called with the list of results. Useful for map-reduce patterns. Requires a result backend.
Caveat: composition adds complexity. For simple two-step flows, just have the first task enqueue the second:
Explicit is often clearer than chord/chain magic.
5) Periodic tasks via Celery Beat¶
Beat is the scheduler. It enqueues tasks at configured intervals.
# myapp/celery.py
from celery.schedules import crontab
app.conf.beat_schedule = {
'send-daily-digest': {
'task': 'myapp.send_digest',
'schedule': crontab(hour=8, minute=0), # every day at 08:00
},
'refresh-cache-every-5-min': {
'task': 'myapp.refresh_cache',
'schedule': 300.0, # every 300 seconds
},
'cleanup-stale-sessions': {
'task': 'myapp.cleanup_sessions',
'schedule': crontab(hour=2, minute=0, day_of_week='sunday'),
},
}
Beat runs as a separate process:
Critical: run only one beat instance. Two beats produce duplicate enqueues. For HA, use django-celery-beat or redbeat, which persist schedule state in the database/Redis and elect a single active scheduler.
# Beat with django-celery-beat — schedule editable via Django admin
celery -A myapp beat --scheduler=django_celery_beat.schedulers:DatabaseScheduler
6) Task discovery — where workers find tasks¶
# myapp/celery.py
from celery import Celery
app = Celery('myapp')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
autodiscover_tasks() walks INSTALLED_APPS and imports each app's tasks.py. Conventional location:
myapp/
├── tasks.py # tasks for this app
orders/
├── tasks.py # tasks for orders
payments/
├── tasks.py # tasks for payments
If tasks live elsewhere, register them explicitly:
7) Task signatures and partial application¶
A signature is a frozen task call:
sig = send_email.s(user_id=42) # signature with args/kwargs
sig.delay() # enqueue with those args
partial = send_email.s(user_id=42) # set the user_id
partial.apply_async(args=[subject]) # add more args at call time
Signatures are how chain/group/chord are constructed. They serialise across the broker, so a chain can have many steps without keeping them all in memory.
8) Error handling within a task¶
@shared_task(bind=True, autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
def fetch_from_api(self, url):
try:
resp = httpx.get(url, timeout=10)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
if 400 <= e.response.status_code < 500:
# client error — don't retry
raise # task fails permanently
raise # 5xx auto-retried via autoretry_for
The pattern: distinguish retryable (transient) errors from non-retryable (permanent) ones. Let autoretry_for handle the transient; raise on the permanent. After max retries, the task lands in the dead-letter queue (if configured) or just errors out.
For tasks needing explicit retry logic:
@shared_task(bind=True, max_retries=5)
def fetch(self, url):
try:
return httpx.get(url).json()
except ConnectionError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
self.retry() re-enqueues the task with the same args and an incremented retry counter.
9) Task return values — what to return, what not to¶
# OK — primitive serialisable
@shared_task
def calculate_total(order_id):
order = Order.objects.get(pk=order_id)
return float(order.amount)
# BAD — model instance not serialisable
@shared_task
def get_order(order_id):
return Order.objects.get(pk=order_id) # JSON serialisation will fail
Return values are JSON-serialised (or whatever serializer is configured). Models, datetimes (need ISO format), bytes, custom classes — all need explicit conversion.
For tasks called by chain/chord, the return value becomes the next task's input. Plan accordingly.
10) The threaded example — designing the daily digest¶
A SaaS company sends a daily digest email at 08:00. Naive design: one task that iterates all users, sends each an email.
@shared_task
def send_daily_digest():
for user in User.objects.filter(daily_digest=True):
send_digest_for_user(user.id) # synchronous send
Problem: this is one task. If it fails halfway, the second half never runs. If it takes longer than the next 08:00, the next digest enqueues before this one finishes.
Better design: a fan-out pattern.
@shared_task
def send_daily_digest():
user_ids = User.objects.filter(daily_digest=True).values_list('id', flat=True)
job = group(send_digest_for_user.s(uid) for uid in user_ids).apply_async()
return job.id
@shared_task(bind=True, autoretry_for=(SMTPException,), retry_backoff=True, max_retries=3)
def send_digest_for_user(self, user_id):
user = User.objects.get(pk=user_id)
if user.last_digest_at and user.last_digest_at.date() == date.today():
return # already sent today; idempotent
body = render_digest(user)
send_email(user.email, 'Your Daily Digest', body)
user.last_digest_at = timezone.now()
user.save()
Now: one orchestrator task fans out 100K subtasks; each subtask is independent, retries on transient failures, and is idempotent. The 08:00 deadline is met because subtasks run in parallel across all workers, not sequentially.
Route subtasks to a dedicated queue (digest) so they don't block other email traffic:
Operational signals¶
Healthy. Tasks routed to appropriate queues; queue depths near zero or oscillating with traffic; task latency p99 within target; periodic tasks fire on schedule.
First degrading metric. Beat schedule drift — periodic tasks firing late. Either beat is overloaded or it's running on a slow worker.
Misleading metric. Total task throughput — masks per-queue latency.
Expert graph. Per-queue depth × per-task latency; the cell that lights up is the bottleneck.
Where this appears in production¶
- Instagram (Cinder) — heavy fan-out patterns for media processing; per-feature queues.
- Pinterest — chord patterns for image pipeline aggregation.
- Eventbrite — beat for daily reminder emails; one orchestrator + many subtasks.
- Dropbox (internal tools) — Celery with django-celery-beat for HA scheduling.
- A Mumbai e-commerce — separate queues for
orders,emails,reports,analytics. - A Pune SaaS — orchestrator pattern for nightly billing; each tenant is a subtask.
- A Bengaluru fintech —
autoretry_forwith exponential backoff on every external API call. - A Goa-based startup — chord pattern for end-of-day reconciliation; fan out across tenants, aggregate at end.
Recall / checkpoint¶
- What does
@shared_taskdo? - What is the difference between
delay(),apply_async(), ands()? - When should you split tasks across multiple queues?
- What is the difference between chain, group, and chord?
- Why is running multiple beat instances a bug?
- What is
self.retry()and when do you call it explicitly? - Why must task return values be JSON-serialisable?
Interview Q&A¶
Q1. The team's single-queue Celery setup has slow tasks blocking fast ones. Walk through the fix.
Split queues by task profile. Fast tasks (emails, notifications) on fast queue; slow tasks (reports, exports) on slow queue; critical tasks (payments) on critical queue. Workers per queue, sized appropriately. Routing via task_routes in config or per-call queue= arg. After split, each queue scales independently; head-of-line blocking is structurally eliminated. Common wrong answer to avoid: "scale workers" — same blocking pattern, just more workers.
Q2. The daily digest task is sometimes missing users. Walk through the diagnosis.
Likely the digest is one big task that fails mid-iteration; retry resumes from the start (or from nowhere); some users miss. Fix: refactor to orchestrator + per-user subtasks. The orchestrator enqueues N subtasks; each is idempotent (checks last_digest_at) and retryable. If one subtask fails, only that user's digest retries. Validate by injecting a transient failure and verifying re-delivery. Common wrong answer to avoid: "raise max_retries" — doesn't help if the task design is monolithic.
Q3. The team has beat running on every worker. Walk through the bug and the fix.
Multiple beat instances enqueue duplicate periodic tasks — every scheduled task fires N times. Fix: run exactly one beat process. For HA, use django-celery-beat or redbeat, which use leader election (Redis lock, database row) to ensure exactly one active scheduler. Standard pattern: one beat deployment with replicas=1 and a leader-election mechanism. Common wrong answer to avoid: "deduplicate at task level" — works but expensive; structural fix is single beat.
Q4. A task signature includes a Django model instance. Why is this wrong?
Celery messages are JSON-serialised (default). A model instance is not serialisable directly. Two options: pass the primary key (user.id) and re-fetch in the task; or serialise explicitly. PK-passing is the standard pattern — keeps the message small and avoids stale-data issues (the worker reads the current state, not the state at submission time). Common wrong answer to avoid: "use pickle serialiser" — works but unsafe; primitives are the right discipline.
Q5. Walk through choosing between autoretry_for and explicit self.retry().
autoretry_for is for the common case — known transient exceptions, exponential backoff, max retries. Set the option and forget. Explicit self.retry() is for cases that need conditional logic — e.g., retry only on certain status codes, custom backoff, switching arguments on retry. Use autoretry_for by default; reach for self.retry() when the retry policy is data-dependent. Common wrong answer to avoid: "always use self.retry()" — boilerplate; autoretry_for covers the common case cleanly.
Q6. A chord pattern is taking longer than expected to complete. Walk through diagnosis. A chord waits for all subtasks to finish before running the aggregator. If one subtask is slow, the chord stalls. Diagnosis: log subtask completion timestamps; find the slowest. Common causes: an unfair distribution (some subtasks have more work than others), a subtask waiting on a single hot resource, a subtask retrying repeatedly. Fix depends on cause — re-shard the work, add caching, fail-fast on retries. Common wrong answer to avoid: "scale workers" — doesn't help if one specific subtask is the constraint.
Operational memory¶
This chapter explained the day-to-day Celery developer surface: defining tasks, setting options, routing to queues, composing with chain/group/chord, scheduling with beat. The important idea is that Celery is convenient but not opinionated — every team must decide queue layout, retry policy, task granularity. Good defaults compound; bad ones surface only in production.
You learned to write tasks with proper retry/backoff/idempotency, separate queues per profile, use composition where it clarifies, schedule periodics safely, and pass serialisable arguments. That solves the daily surface; production gotchas come next.
Carry this diagnostic forward: when a Celery task misbehaves, ask which design surface is wrong — task granularity, queue routing, retry policy, or composition. Each has a structural fix.
Remember:
- Separate queues per task profile; no single-queue at scale.
autoretry_for+retry_backofffor the common retry case.- Pass primary keys, not model instances.
- One beat process; use HA scheduler for failover.
- Orchestrator + per-item subtask beats monolithic tasks.
Bridge. Tasks and queues are designed. Production has its own surface: retries that storm, queues that grow, dead letters, monitoring. The next chapter is that surface. → 03-retries-monitoring-prod-gotchas.md