Token-Bucket Rate Limiting for Celery Tasks

This guide builds a true fleet-wide token-bucket limiter for Celery, the implementation referenced by rate limiting and throttling async jobs and part of the wider Queue Fundamentals & Architecture collection. It is the concrete answer to a specific, common failure.

The symptom: you set @app.task(rate_limit="100/m") on a task that calls a partner API capped at 100 requests per minute, deploy ten workers, and immediately start getting 429 responses and threatening emails from the vendor. The reason is that Celery's rate_limit is enforced per worker process, so ten workers permit up to 1000 requests per minute in aggregate — ten times the contract. You need a limit that holds across the whole fleet, which means a shared counter in Redis with atomic check-and-decrement.

Prerequisites

  • A running Celery application with a Redis-backed broker (pip install celery redis). If you are still setting that up, see setting up Celery with a Redis broker.
  • A Redis instance reachable from every worker (the broker's Redis is fine; use a separate logical DB to isolate limiter keys).
  • Familiarity with how the token-bucket algorithm refills and consumes — recapped in the parent guide.
  • A target rate and burst from the downstream contract (e.g. "100 requests/minute, burst 20").

Step 1: Write the atomic Lua script

The limiter's correctness rests on one rule: reading the bucket, computing the refill, and consuming a token must be a single atomic operation. Redis runs a Lua script as an indivisible unit, which gives us exactly that. Critically, we read the current time inside the script with redis.call('TIME') so every worker measures elapsed time against the single Redis clock — eliminating clock skew between worker hosts.

-- celery_token_bucket.lua
-- KEYS[1] = bucket key
-- ARGV[1] = capacity (burst, max tokens)
-- ARGV[2] = refill rate (tokens per second)
-- ARGV[3] = requested tokens (usually 1)
local capacity  = tonumber(ARGV[1])
local refill    = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])

-- single source of truth for "now": the Redis server clock
local t = redis.call('TIME')
local now = tonumber(t[1]) + tonumber(t[2]) / 1000000

local data   = redis.call('HMGET', KEYS[1], 'tokens', 'ts')
local tokens = tonumber(data[1])
local ts     = tonumber(data[2])

if tokens == nil then
  tokens = capacity      -- first observation: start full
  ts = now
end

-- lazy refill: add tokens accrued since last call, capped at capacity
tokens = math.min(capacity, tokens + math.max(0, now - ts) * refill)

local allowed = 0
if tokens >= requested then
  tokens = tokens - requested
  allowed = 1
end

redis.call('HMSET', KEYS[1], 'tokens', tokens, 'ts', now)
-- evict idle buckets so per-key memory does not leak
redis.call('EXPIRE', KEYS[1], math.ceil(capacity / refill) * 2)

-- return whether allowed, plus a hint of how long to wait for one token
local retry_after = 0
if allowed == 0 then
  retry_after = (requested - tokens) / refill
end
return {allowed, tostring(retry_after)}

The script returns both the decision and a retry_after hint, which we'll use to requeue intelligently rather than busy-wait.

Step 2: Wrap the script in a Python limiter

Register the script once at import so Celery sends it by SHA (EVALSHA) on every call instead of shipping the source each time.

# limiter.py
import redis

_redis = redis.Redis(host="localhost", port=6379, db=1)  # db=1 isolates limiter keys

with open("celery_token_bucket.lua") as f:
    _consume = _redis.register_script(f.read())

def try_consume(bucket: str, capacity: int, refill_per_sec: float, n: int = 1):
    """Returns (allowed: bool, retry_after_seconds: float)."""
    allowed, retry_after = _consume(keys=[bucket], args=[capacity, refill_per_sec, n])
    return bool(allowed), float(retry_after)

Step 3: Apply it through a task base class

The cleanest way to attach the limiter to many tasks is a custom task base class. Tasks declare their rate via attributes; the base class checks the bucket before the body runs, and retries the task with a delay if no token is available — freeing the worker thread instead of sleeping. The retry delay comes straight from the script's retry_after hint.

# tasks.py
from celery import Celery, Task
from limiter import try_consume

app = Celery("tasks", broker="redis://localhost:6379/0")

class RateLimitedTask(Task):
    # subclasses/decorated tasks override these
    rl_bucket = None          # shared key — same for all workers
    rl_capacity = 100         # burst
    rl_refill_per_sec = 100 / 60.0   # 100 per minute

    def __call__(self, *args, **kwargs):
        if self.rl_bucket:
            allowed, retry_after = try_consume(
                self.rl_bucket, self.rl_capacity, self.rl_refill_per_sec
            )
            if not allowed:
                # requeue with a delay; do NOT block the worker
                raise self.retry(countdown=max(retry_after, 0.1), max_retries=None)
        return super().__call__(*args, **kwargs)

@app.task(
    base=RateLimitedTask,
    bind=True,
    rl_bucket="ratelimit:partner-api",
    rl_capacity=20,                 # allow short bursts of 20
    rl_refill_per_sec=100 / 60.0,   # sustained 100/min, fleet-wide
)
def call_partner_api(self, account_id):
    # ... make the rate-limited downstream call ...
    return do_http_call(account_id)

Because rl_bucket is the same string on every worker, all of them decrement one shared bucket — the limit now holds across the entire fleet, not per process. Using self.retry(countdown=...) puts the job back on the queue as a delayed job so the worker is immediately free to pick up other work.

Step 4: Combine with Celery's built-in rate_limit

The Redis limiter is the source of truth for the fleet-wide rate, but Celery's per-worker rate_limit is still useful as a cheap local pre-throttle: it avoids hammering Redis when a single worker is clearly over its fair share, reducing limiter contention. Set it generously — to roughly the fleet limit divided by worker count, with headroom — so it never becomes the binding constraint.

@app.task(
    base=RateLimitedTask,
    bind=True,
    rate_limit="40/m",              # per-worker guard: cheap local smoothing
    rl_bucket="ratelimit:partner-api",
    rl_capacity=20,
    rl_refill_per_sec=100 / 60.0,   # the real, fleet-wide ceiling
)
def call_partner_api(self, account_id):
    return do_http_call(account_id)

The two layers compose: rate_limit smooths each worker locally and reduces Redis traffic; the Lua bucket enforces the hard aggregate limit no matter how many workers exist.

Step 5: Per-tenant fairness (optional)

To make the limit fair per customer rather than global, key the bucket by tenant. The same code now maintains an independent bucket per account, and the EXPIRE line keeps idle tenants from accumulating keys.

class PerTenantTask(RateLimitedTask):
    def __call__(self, account_id, *args, **kwargs):
        self.rl_bucket = f"ratelimit:partner-api:{account_id}"
        return super().__call__(account_id, *args, **kwargs)

Verification

Unit-level: confirm the bucket throttles. Drain the bucket in a tight loop and assert it stops granting at the configured capacity.

from limiter import try_consume

def test_bucket_caps_at_capacity():
    bucket = "test:bucket"
    granted = sum(try_consume(bucket, capacity=10, refill_per_sec=0)[0] for _ in range(50))
    assert granted == 10   # exactly capacity tokens, no refill

Under load: confirm the fleet-wide rate holds. Start several workers, flood the queue, and count actual downstream calls per minute. Instrument the task and watch the rate converge on the configured ceiling regardless of worker count.

import time
from collections import deque

_calls = deque()
def record_call():
    now = time.time()
    _calls.append(now)
    while _calls and _calls[0] < now - 60:
        _calls.popleft()
    print(f"calls in last 60s: {len(_calls)}")   # should stay <= 100

Run with, say, 8 workers and 5000 queued jobs:

celery -A tasks worker --concurrency=8 --loglevel=info

The observed rate should hold at ~100/min with 1 worker and still ~100/min with 8 — that invariance is the proof the limit is fleet-wide and not per process.

Gotchas and edge cases

Retry storms when the limit is far below demand. If thousands of jobs are all over the limit, they will all retry. Because each retry is a delayed requeue rather than a sleep, workers stay free, but the retry traffic itself can be heavy. Add jitter to the retry countdown (retry_after + random.uniform(0, 1)) to desynchronize the herd, and cap visibility into the queue so retries don't crowd out fresh work.

Fail-open vs fail-closed on Redis outage. Wrap try_consume in a try/except. For a limiter that prevents vendor bans or cost overruns, fail closed (retry the task) so an outage never bypasses the limit. For a soft fairness limit, fail open with an alert. Never let an unhandled Redis exception crash the task silently.

Burst capacity vs sustained rate. rl_capacity is the burst and rl_refill_per_sec is the sustained rate. Setting capacity equal to the per-minute count means an idle minute lets a full minute's worth fire instantly — fine if the downstream tolerates it, dangerous if not. Set capacity to the largest burst the contract actually allows.

Time unit mistakes. refill_per_sec is per second. A "100 per minute" limit is 100/60, not 100. Getting this wrong by a factor of 60 is the most common bug here; assert the effective rate in a test.

FAQ

Why retry with a countdown instead of time.sleep? Sleeping holds the worker thread idle for the whole wait, shrinking effective concurrency and risking redelivery if the worker dies mid-sleep. self.retry(countdown=...) returns the job to the queue as a delayed job and frees the worker to process other tasks immediately.

Do I still need Celery's rate_limit at all? It is optional. The Redis bucket alone is sufficient for correctness. The built-in rate_limit is a cheap local optimization that reduces how often each worker has to talk to Redis; set it loose enough that the Redis bucket remains the binding limit.

Can the same bucket serve multiple task types? Yes — point several tasks at the same rl_bucket string and they share one rate budget, which is what you want when they all hit the same downstream dependency. Give them distinct keys when their downstreams are independent.

Related