Migrating from RQ to Celery
This walkthrough sits under RQ vs Celery for Python and the broader Backend Frameworks & Worker Scaling guide, and assumes you have already decided Celery's routing, workflow, and multi-broker capabilities justify leaving RQ behind.
Teams outgrow RQ when they need fan-out workflows, per-task rate limits, or a broker other than Redis. The migration risk is concrete: a big-bang switch leaves in-flight RQ jobs stranded in Redis lists that Celery workers will never read, so payments retry never fires and scheduled jobs silently stop. The safe path is an incremental cutover where both systems drain the same logical work until the RQ queues hit zero, then you decommission RQ. This guide maps every RQ concept to its Celery equivalent and gives you a reversible rollout.
Prerequisites
- A running RQ deployment with
rqandredispinned inrequirements.txt, and a known list of queue names (default,high, etc.). - An inventory of every
queue.enqueue(...)call site and every@job-decorated or plain function used as a task. - Celery 5.3+ installed alongside RQ:
pip install "celery[redis]>=5.3". - A Redis instance you can point Celery at — ideally a separate database index from RQ (e.g. RQ on
db=0, Celery ondb=2) so the two systems never collide on keys. - Idempotent task bodies. During cutover a job may run once under RQ and, if redelivered, again under Celery. Review exactly-once vs at-least-once delivery before you start.
Step 1: Map RQ concepts to Celery
Build a translation table before touching code so every call site has a known target. The mapping is mostly mechanical.
| RQ | Celery | Notes |
|---|---|---|
Queue('high', connection=r) |
task_routes + named queue |
Routing is config-driven, not an object you pass around |
@job('default') / plain function |
@shared_task |
Decorator wraps the callable as a registered task |
queue.enqueue(fn, arg) |
fn.delay(arg) / fn.apply_async(args, queue='high') |
delay is the shorthand; apply_async exposes routing/ETA |
Retry(max=3, interval=[10,30,60]) |
autoretry_for + retry_backoff |
Celery computes backoff; you don't pass a literal list |
enqueue_in(timedelta, fn) |
apply_async(countdown=...) / eta=... |
Delayed execution is a kwarg |
rq-scheduler cron |
Celery Beat | Periodic schedule lives in beat_schedule |
rq worker high default |
celery -A app worker -Q high,default |
-Q selects queues |
job.result / RQ result TTL |
result_backend + AsyncResult |
Results need an explicit backend |
Write this down per call site. The dangerous rows are retries and scheduling, because RQ's literal interval list has no direct Celery analogue — Celery derives delays from a backoff policy instead.
Step 2: Stand up a Celery app next to RQ
Add a Celery app without removing any RQ code. Point it at a distinct Redis database so RQ keys and Celery keys cannot overwrite each other.
# app/celery_app.py
from celery import Celery
celery_app = Celery("myproject")
celery_app.conf.update(
broker_url="redis://redis-primary:6379/2", # separate db from RQ (db=0)
result_backend="redis://redis-primary:6379/3", # results isolated again
task_serializer="json", # avoid pickle RCE risk
accept_content=["json"],
result_serializer="json",
task_acks_late=True, # ack after success, not on receive
task_reject_on_worker_lost=True, # requeue if a worker dies mid-task
broker_connection_retry_on_startup=True,
# Route by name so producers stay simple:
task_routes={
"tasks.send_email": {"queue": "high"},
"tasks.*": {"queue": "default"},
},
)
This config keeps the two systems on the same Redis host but in separate logical databases, eliminating the namespace-collision failure mode where RQ and Celery fight over the same list keys.
Step 3: Translate task definitions
Convert RQ task functions to Celery tasks. Keep the function body identical; only the decorator and signature change.
# Before — RQ task
# tasks.py
from rq.decorators import job
from redis import Redis
redis_conn = Redis(host="redis-primary", port=6379, db=0)
@job("high", connection=redis_conn, timeout=300)
def send_email(user_id, template):
deliver(user_id, template) # unchanged business logic
# After — Celery task
# tasks.py
from app.celery_app import celery_app
@celery_app.shared_task(
name="tasks.send_email", # explicit name matches task_routes
bind=True,
soft_time_limit=300, # RQ timeout -> soft_time_limit
time_limit=330, # hard kill 30s later
)
def send_email(self, user_id, template):
deliver(user_id, template) # business logic untouched
Note soft_time_limit raises SoftTimeLimitExceeded inside the task so you can clean up; time_limit is the hard SIGKILL boundary. RQ's single timeout maps to the soft limit.
Step 4: Translate retries and scheduling
RQ's Retry(max=3, interval=[10, 30, 60]) enumerates delays explicitly. Celery instead declares a backoff policy and computes the delays for you.
# Celery retry: equivalent intent to Retry(max=3, interval=[10,30,60])
@celery_app.shared_task(
bind=True,
autoretry_for=(ConnectionError, TimeoutError), # which exceptions retry
retry_backoff=10, # base delay -> ~10, 20, 40s (exponential)
retry_backoff_max=60, # cap each delay at 60s (matches RQ's tail)
retry_jitter=True, # spread retries to avoid thundering herd
max_retries=3,
)
def charge_card(self, order_id):
gateway.charge(order_id)
For delayed and periodic jobs, translate the dispatch site rather than the task:
# RQ delayed: queue.enqueue_in(timedelta(minutes=5), reminder, uid)
reminder.apply_async(args=[uid], countdown=300) # Celery: 5 minutes later
# RQ scheduler cron -> Celery Beat
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"nightly-report": {
"task": "tasks.nightly_report",
"schedule": crontab(hour=2, minute=0), # 02:00 daily
},
}
If you rely heavily on cron-style schedules, plan that side of the move with Celery Beat periodic task scheduling before cutover.
Step 5: Update producers and run both workers
Now flip enqueue call sites. Keep RQ workers running so anything already queued still drains.
# Before — RQ producer
from redis import Redis
from rq import Queue
q = Queue("high", connection=Redis(db=0))
q.enqueue(send_email, user_id, "welcome", timeout=300)
# After — Celery producer
from tasks import send_email
send_email.apply_async(args=[user_id, "welcome"], queue="high")
Start a Celery worker beside the existing rq worker process so both consume during the transition window:
# Existing RQ worker keeps draining old jobs already in db=0
rq worker high default
# New Celery worker handles all newly enqueued work in db=2
celery -A app.celery_app worker -Q high,default --concurrency=8 --loglevel=info
# Beat scheduler (run exactly one replica) for periodic tasks
celery -A app.celery_app beat --loglevel=info
Deploy the producer change. From this moment all new work goes to Celery; RQ only finishes its backlog.
Verification
Confirm RQ's queues drain to zero and Celery's are flowing before you remove anything.
# RQ backlog must trend to 0 — these lists should empty out
redis-cli -n 0 LLEN rq:queue:high
redis-cli -n 0 LLEN rq:queue:default
# Celery is receiving and finishing work
celery -A app.celery_app inspect active
celery -A app.celery_app inspect stats | grep -A2 total
# Assert a known task round-trips through Celery before cutover sign-off
from tasks import send_email
r = send_email.apply_async(args=[1, "smoke-test"], queue="high")
assert r.get(timeout=30) is None # raises on failure/timeout
print("celery path verified:", r.state) # -> SUCCESS
Treat the RQ LLEN of every queue reaching and staying at 0 (including delayed/scheduled sets) as the gate for decommissioning RQ workers.
Gotchas & edge cases
- Stranded delayed jobs.
enqueue_inandrq-schedulerentries live in Redis sorted sets, not the main list. They will not show up inLLENand Celery cannot read them. Enumeraterq:scheduler:scheduled_jobsand re-issue any future-dated work as Celerycountdown/etatasks, or wait for them to fire under a still-running RQ worker before shutdown. - Pickle vs JSON payloads. RQ pickles arguments by default, so it happily enqueues non-JSON-serializable objects (datetimes, model instances). Celery configured with
accept_content=['json']will reject them at enqueue time. Convert such arguments to plain types (IDs, ISO strings) at the call site, not inside the task. - Result API mismatch. Code that reads
job.resultsynchronously must move toAsyncResult(id).get()and requires a configuredresult_backend. If you only used RQ to fire-and-forget, skip the backend entirely to avoid the extra Redis write per task. - Two Beat schedulers. Running more than one
celery beatreplica double-fires every periodic task. Pin Beat to a single replica (or use a leader-elected lock) — this is the most common post-migration duplicate-execution bug.
Rollback
Because producers are the only thing that changed atomically, rollback is a single deploy: revert the producer commit so enqueue calls go back to queue.enqueue(...) on RQ db=0. The RQ workers never stopped, so they immediately pick the work back up. Leave the Celery worker running until its in-flight tasks finish, then scale it to zero. Keep the separate Redis database indices in place during the bake period so a rollback never has to untangle mixed keys.
Related
- Comparing RQ and Celery for lightweight Python tasks — decide whether the migration is worth it before committing.
- RQ vs Celery for Django scheduled tasks — the scheduling half of the move, focused on Django.
- Celery Beat periodic task scheduling — translate rq-scheduler cron jobs to Beat correctly.
- Setting up Celery with Redis broker and RabbitMQ backend — broker/backend wiring for the new Celery app.
- Preventing duplicate job execution with idempotency — required because cutover can deliver a job to both systems.