Skip to content

02. Tasks, queues, beat — the developer's daily surface

~11 min read. Defining tasks, routing to queues, composing tasks together, scheduling periodics. The day-to-day Celery surface every team writes.

Builds on: 01-broker-worker-protocol-internals.md.

The previous chapter explained the protocol. This chapter is what you actually write — task definitions, queue routing, composition patterns, and the beat scheduler for periodic work.


1) Defining tasks — the basics

# myapp/tasks.py
from celery import shared_task

@shared_task
def send_welcome_email(user_id):
    user = User.objects.get(pk=user_id)
    send_email(user.email, "Welcome", render_welcome(user))

shared_task decorates a function so it can be enqueued and executed by Celery. The function body is normal Python — ORM queries, side effects, return values.

To enqueue:

send_welcome_email.delay(42)              # shortcut for apply_async
send_welcome_email.apply_async(args=[42], countdown=60)   # delay 60 seconds
send_welcome_email.apply_async(args=[42], eta=datetime(2026, 6, 1, 10, 0))

Differences:

  • delay() — fire-and-forget; immediate enqueue.
  • apply_async() — explicit options (queue, countdown, eta, expires, headers).
  • s() — signature; useful for composition (next section).

2) Task options — what to set per task

@shared_task(
    bind=True,
    name='myapp.send_email',
    autoretry_for=(SMTPException, ConnectionError),
    retry_kwargs={'max_retries': 5},
    retry_backoff=True,
    retry_backoff_max=300,
    retry_jitter=True,
    acks_late=True,
    soft_time_limit=30,
    time_limit=60,
    rate_limit='10/s',
)
def send_email(self, to, subject, body):
    ...

Key options:

  • bind=True — first arg is the task instance; lets you access self.request.id, self.retry(), etc.
  • name=... — explicit task name. Default is module.function. Stable name survives refactor.
  • autoretry_for=(...) — auto-retry on these exception classes. Avoids manual self.retry() calls.
  • retry_backoff=True — exponential backoff between retries (1s, 2s, 4s, 8s, ...).
  • retry_backoff_max=300 — cap the backoff at 5 minutes.
  • retry_jitter=True — add randomness so retries don't synchronise across workers.
  • acks_late=True — per-task override of the global setting.
  • soft_time_limit — raises SoftTimeLimitExceeded in the task; gives it a chance to clean up.
  • time_limit — kills the worker hard after this time.
  • rate_limit='10/s' — at most 10 of this task per second per worker.

3) Queues — routing tasks by class

A single queue forces all tasks to compete for the same workers. With diverse task profiles, you want separate queues:

# myapp/celery.py
app.conf.task_routes = {
    'myapp.send_email': {'queue': 'emails'},
    'myapp.generate_report': {'queue': 'reports'},
    'myapp.charge_payment': {'queue': 'payments'},
}

Now send_email.delay(...) goes to the emails queue. Workers can be started per queue:

# Worker for fast email tasks
celery -A myapp worker --queues=emails --concurrency=8 --prefetch-multiplier=4

# Worker for slow reports
celery -A myapp worker --queues=reports --concurrency=2 --prefetch-multiplier=1

# Worker for critical payments
celery -A myapp worker --queues=payments --concurrency=4 --prefetch-multiplier=1

The benefits:

  • No head-of-line blocking. A slow report doesn't delay an email.
  • Independent scaling. Add report workers without touching email workers.
  • Isolation. A bug in report code can't starve email processing.
  • Priority. Critical queues get more workers; low-priority queues get fewer.

Routing can also be dynamic:

send_email.apply_async(args=[42], queue='emails_urgent')

4) Composition — chain, group, chord

For workflows of multiple tasks:

Chain — sequential pipeline.

from celery import chain

workflow = chain(
    fetch_data.s(user_id=42),
    transform_data.s(),
    save_result.s(),
)
workflow.apply_async()

Each task's return value is passed as the first arg to the next. Useful for ETL-style pipelines.

Group — parallel fan-out.

from celery import group

job = group(
    process_chunk.s(chunk_id=i) for i in range(100)
).apply_async()

All 100 tasks run in parallel. The group itself is a result object; job.ready() returns True when all complete.

Chord — fan-out then aggregate.

from celery import chord

job = chord(
    (process_chunk.s(chunk_id=i) for i in range(100)),
    aggregate_results.s(),
)
job.apply_async()

100 chunks run in parallel; when all done, aggregate_results is called with the list of results. Useful for map-reduce patterns. Requires a result backend.

Caveat: composition adds complexity. For simple two-step flows, just have the first task enqueue the second:

@shared_task
def fetch_data(user_id):
    data = ...
    transform_data.delay(data)

Explicit is often clearer than chord/chain magic.


5) Periodic tasks via Celery Beat

Beat is the scheduler. It enqueues tasks at configured intervals.

# myapp/celery.py
from celery.schedules import crontab

app.conf.beat_schedule = {
    'send-daily-digest': {
        'task': 'myapp.send_digest',
        'schedule': crontab(hour=8, minute=0),  # every day at 08:00
    },
    'refresh-cache-every-5-min': {
        'task': 'myapp.refresh_cache',
        'schedule': 300.0,  # every 300 seconds
    },
    'cleanup-stale-sessions': {
        'task': 'myapp.cleanup_sessions',
        'schedule': crontab(hour=2, minute=0, day_of_week='sunday'),
    },
}

Beat runs as a separate process:

celery -A myapp beat --loglevel=info

Critical: run only one beat instance. Two beats produce duplicate enqueues. For HA, use django-celery-beat or redbeat, which persist schedule state in the database/Redis and elect a single active scheduler.

# Beat with django-celery-beat — schedule editable via Django admin
celery -A myapp beat --scheduler=django_celery_beat.schedulers:DatabaseScheduler

6) Task discovery — where workers find tasks

# myapp/celery.py
from celery import Celery

app = Celery('myapp')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

autodiscover_tasks() walks INSTALLED_APPS and imports each app's tasks.py. Conventional location:

myapp/
├── tasks.py          # tasks for this app
orders/
├── tasks.py          # tasks for orders
payments/
├── tasks.py          # tasks for payments

If tasks live elsewhere, register them explicitly:

app.autodiscover_tasks(['myapp', 'orders.async_tasks', 'payments.workers'])

7) Task signatures and partial application

A signature is a frozen task call:

sig = send_email.s(user_id=42)        # signature with args/kwargs
sig.delay()                           # enqueue with those args

partial = send_email.s(user_id=42)    # set the user_id
partial.apply_async(args=[subject])   # add more args at call time

Signatures are how chain/group/chord are constructed. They serialise across the broker, so a chain can have many steps without keeping them all in memory.


8) Error handling within a task

@shared_task(bind=True, autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
def fetch_from_api(self, url):
    try:
        resp = httpx.get(url, timeout=10)
        resp.raise_for_status()
        return resp.json()
    except httpx.HTTPStatusError as e:
        if 400 <= e.response.status_code < 500:
            # client error — don't retry
            raise  # task fails permanently
        raise   # 5xx auto-retried via autoretry_for

The pattern: distinguish retryable (transient) errors from non-retryable (permanent) ones. Let autoretry_for handle the transient; raise on the permanent. After max retries, the task lands in the dead-letter queue (if configured) or just errors out.

For tasks needing explicit retry logic:

@shared_task(bind=True, max_retries=5)
def fetch(self, url):
    try:
        return httpx.get(url).json()
    except ConnectionError as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

self.retry() re-enqueues the task with the same args and an incremented retry counter.


9) Task return values — what to return, what not to

# OK — primitive serialisable
@shared_task
def calculate_total(order_id):
    order = Order.objects.get(pk=order_id)
    return float(order.amount)

# BAD — model instance not serialisable
@shared_task
def get_order(order_id):
    return Order.objects.get(pk=order_id)   # JSON serialisation will fail

Return values are JSON-serialised (or whatever serializer is configured). Models, datetimes (need ISO format), bytes, custom classes — all need explicit conversion.

For tasks called by chain/chord, the return value becomes the next task's input. Plan accordingly.


10) The threaded example — designing the daily digest

A SaaS company sends a daily digest email at 08:00. Naive design: one task that iterates all users, sends each an email.

@shared_task
def send_daily_digest():
    for user in User.objects.filter(daily_digest=True):
        send_digest_for_user(user.id)   # synchronous send

Problem: this is one task. If it fails halfway, the second half never runs. If it takes longer than the next 08:00, the next digest enqueues before this one finishes.

Better design: a fan-out pattern.

@shared_task
def send_daily_digest():
    user_ids = User.objects.filter(daily_digest=True).values_list('id', flat=True)
    job = group(send_digest_for_user.s(uid) for uid in user_ids).apply_async()
    return job.id

@shared_task(bind=True, autoretry_for=(SMTPException,), retry_backoff=True, max_retries=3)
def send_digest_for_user(self, user_id):
    user = User.objects.get(pk=user_id)
    if user.last_digest_at and user.last_digest_at.date() == date.today():
        return  # already sent today; idempotent
    body = render_digest(user)
    send_email(user.email, 'Your Daily Digest', body)
    user.last_digest_at = timezone.now()
    user.save()

Now: one orchestrator task fans out 100K subtasks; each subtask is independent, retries on transient failures, and is idempotent. The 08:00 deadline is met because subtasks run in parallel across all workers, not sequentially.

Route subtasks to a dedicated queue (digest) so they don't block other email traffic:

send_digest_for_user.apply_async(args=[uid], queue='digest')

Operational signals

Healthy. Tasks routed to appropriate queues; queue depths near zero or oscillating with traffic; task latency p99 within target; periodic tasks fire on schedule.

First degrading metric. Beat schedule drift — periodic tasks firing late. Either beat is overloaded or it's running on a slow worker.

Misleading metric. Total task throughput — masks per-queue latency.

Expert graph. Per-queue depth × per-task latency; the cell that lights up is the bottleneck.


Where this appears in production

  • Instagram (Cinder) — heavy fan-out patterns for media processing; per-feature queues.
  • Pinterest — chord patterns for image pipeline aggregation.
  • Eventbrite — beat for daily reminder emails; one orchestrator + many subtasks.
  • Dropbox (internal tools) — Celery with django-celery-beat for HA scheduling.
  • A Mumbai e-commerce — separate queues for orders, emails, reports, analytics.
  • A Pune SaaS — orchestrator pattern for nightly billing; each tenant is a subtask.
  • A Bengaluru fintechautoretry_for with exponential backoff on every external API call.
  • A Goa-based startup — chord pattern for end-of-day reconciliation; fan out across tenants, aggregate at end.

Recall / checkpoint

  1. What does @shared_task do?
  2. What is the difference between delay(), apply_async(), and s()?
  3. When should you split tasks across multiple queues?
  4. What is the difference between chain, group, and chord?
  5. Why is running multiple beat instances a bug?
  6. What is self.retry() and when do you call it explicitly?
  7. Why must task return values be JSON-serialisable?

Interview Q&A

Q1. The team's single-queue Celery setup has slow tasks blocking fast ones. Walk through the fix. Split queues by task profile. Fast tasks (emails, notifications) on fast queue; slow tasks (reports, exports) on slow queue; critical tasks (payments) on critical queue. Workers per queue, sized appropriately. Routing via task_routes in config or per-call queue= arg. After split, each queue scales independently; head-of-line blocking is structurally eliminated. Common wrong answer to avoid: "scale workers" — same blocking pattern, just more workers.

Q2. The daily digest task is sometimes missing users. Walk through the diagnosis. Likely the digest is one big task that fails mid-iteration; retry resumes from the start (or from nowhere); some users miss. Fix: refactor to orchestrator + per-user subtasks. The orchestrator enqueues N subtasks; each is idempotent (checks last_digest_at) and retryable. If one subtask fails, only that user's digest retries. Validate by injecting a transient failure and verifying re-delivery. Common wrong answer to avoid: "raise max_retries" — doesn't help if the task design is monolithic.

Q3. The team has beat running on every worker. Walk through the bug and the fix. Multiple beat instances enqueue duplicate periodic tasks — every scheduled task fires N times. Fix: run exactly one beat process. For HA, use django-celery-beat or redbeat, which use leader election (Redis lock, database row) to ensure exactly one active scheduler. Standard pattern: one beat deployment with replicas=1 and a leader-election mechanism. Common wrong answer to avoid: "deduplicate at task level" — works but expensive; structural fix is single beat.

Q4. A task signature includes a Django model instance. Why is this wrong? Celery messages are JSON-serialised (default). A model instance is not serialisable directly. Two options: pass the primary key (user.id) and re-fetch in the task; or serialise explicitly. PK-passing is the standard pattern — keeps the message small and avoids stale-data issues (the worker reads the current state, not the state at submission time). Common wrong answer to avoid: "use pickle serialiser" — works but unsafe; primitives are the right discipline.

Q5. Walk through choosing between autoretry_for and explicit self.retry(). autoretry_for is for the common case — known transient exceptions, exponential backoff, max retries. Set the option and forget. Explicit self.retry() is for cases that need conditional logic — e.g., retry only on certain status codes, custom backoff, switching arguments on retry. Use autoretry_for by default; reach for self.retry() when the retry policy is data-dependent. Common wrong answer to avoid: "always use self.retry()" — boilerplate; autoretry_for covers the common case cleanly.

Q6. A chord pattern is taking longer than expected to complete. Walk through diagnosis. A chord waits for all subtasks to finish before running the aggregator. If one subtask is slow, the chord stalls. Diagnosis: log subtask completion timestamps; find the slowest. Common causes: an unfair distribution (some subtasks have more work than others), a subtask waiting on a single hot resource, a subtask retrying repeatedly. Fix depends on cause — re-shard the work, add caching, fail-fast on retries. Common wrong answer to avoid: "scale workers" — doesn't help if one specific subtask is the constraint.


Operational memory

This chapter explained the day-to-day Celery developer surface: defining tasks, setting options, routing to queues, composing with chain/group/chord, scheduling with beat. The important idea is that Celery is convenient but not opinionated — every team must decide queue layout, retry policy, task granularity. Good defaults compound; bad ones surface only in production.

You learned to write tasks with proper retry/backoff/idempotency, separate queues per profile, use composition where it clarifies, schedule periodics safely, and pass serialisable arguments. That solves the daily surface; production gotchas come next.

Carry this diagnostic forward: when a Celery task misbehaves, ask which design surface is wrong — task granularity, queue routing, retry policy, or composition. Each has a structural fix.

Remember:

  • Separate queues per task profile; no single-queue at scale.
  • autoretry_for + retry_backoff for the common retry case.
  • Pass primary keys, not model instances.
  • One beat process; use HA scheduler for failover.
  • Orchestrator + per-item subtask beats monolithic tasks.

Bridge. Tasks and queues are designed. Production has its own surface: retries that storm, queues that grow, dead letters, monitoring. The next chapter is that surface. → 03-retries-monitoring-prod-gotchas.md