Fix: arq Not Working — Worker Not Picking Jobs, WorkerSettings, Cron, Retries, and Result Expiry
Quick Answer
How to fix Python arq errors — worker can't find tasks, WorkerSettings class structure, cron syntax differences, msgpack serialization errors, job_id deduplication, result expiration, and Redis connection pooling.
The Error
You enqueue a job from FastAPI and the worker logs it as unknown:
WARNING - task my_app.tasks.send_email is not in the registered tasksOr your worker starts but does nothing:
$ arq tasks.WorkerSettings
14:32:01: Starting worker for 0 functions
14:32:01: redis_version=7.2.4 mem_usage=2.34M clients_connected=1 db_keys=0
# Sits idle. Never runs anything.Or you enqueue_job and get this:
TypeError: Object of type Decimal is not msgpack serializableOr cron jobs aren’t firing at the times you expect:
cron_jobs = [
cron("send_report", hour=3, minute=0),
]
# Configured. Job never runs at 3:00 AM.Why This Happens
arq is small and opinionated. Most failures map to one of:
WorkerSettingsmust list every task. Unlike Celery’s auto-discovery, arq requires you to explicitly enumerate task functions inWorkerSettings.functions. Missing one → worker doesn’t register it → “not in the registered tasks” when enqueued.- arq uses msgpack by default. Job arguments are serialized with msgpack, which supports a smaller set of types than pickle.
Decimal,datetime(in some cases), custom classes, and other Python objects fail. - Cron jobs use a custom DSL, not standard cron strings.
cron("...", hour=3)is a function call with keyword args — notcron("0 3 * * *"). Mixing up the syntax silently produces wrong schedules. - Worker discovery is by import path.
arq tasks.WorkerSettingsmust resolve in Python’s import path. Run from the wrong directory, get module-not-found.
Fix 1: Register Tasks in WorkerSettings.functions
Every task the worker should run must be in functions:
# tasks.py
from arq import create_pool
from arq.connections import RedisSettings
async def send_email(ctx, to: str, subject: str):
print(f"Sending to {to}: {subject}")
# ... actual send ...
return {"ok": True}
async def process_payment(ctx, payment_id: int):
# ...
pass
class WorkerSettings:
functions = [send_email, process_payment]
redis_settings = RedisSettings(host="localhost", port=6379)# enqueue.py (from your web app)
from arq import create_pool
from arq.connections import RedisSettings
async def main():
redis = await create_pool(RedisSettings(host="localhost"))
await redis.enqueue_job("send_email", "[email protected]", "Welcome")Run the worker:
arq tasks.WorkerSettingsThe first log line should list the registered functions:
Starting worker for 2 functions: send_email, process_paymentIf you see 0 functions, your WorkerSettings class wasn’t loaded — usually a typo or wrong import path.
Pro Tip: Put all your task functions in one tasks.py (or a tasks/ package with __init__.py re-exporting them). Easier to keep WorkerSettings.functions in sync.
Fix 2: Task Function Signature Must Accept ctx
Every arq task takes ctx as the first positional argument:
async def send_email(ctx, to: str, subject: str):
# ctx is a dict-like with: job_id, job_try, enqueue_time, etc.
print(f"Job {ctx['job_id']} attempt {ctx['job_try']}")
...The ctx dict carries runtime metadata. Don’t omit it — arq calls the function with ctx as the first arg unconditionally:
# Wrong:
async def send_email(to: str, subject: str): ...
# arq calls: send_email(ctx, "[email protected]", "Welcome")
# TypeError: send_email() takes 2 positional arguments but 3 were givenInside the task, you can also pull shared resources from ctx. Set them up in on_startup:
class WorkerSettings:
functions = [...]
async def on_startup(ctx):
ctx["http"] = httpx.AsyncClient()
async def on_shutdown(ctx):
await ctx["http"].aclose()
async def fetch_url(ctx, url: str):
return (await ctx["http"].get(url)).texton_startup runs once when the worker boots. The ctx it populates is shared across all jobs that worker processes.
Fix 3: Serialize Non-msgpack Types Explicitly
msgpack supports: int, float, str, bytes, bool, None, list, dict. Other types need conversion before enqueue:
from decimal import Decimal
from datetime import datetime
# Wrong — fails with serialization error:
await redis.enqueue_job("process_payment", amount=Decimal("9.99"))
# Right — convert at the call site:
await redis.enqueue_job("process_payment", amount=str(Decimal("9.99")))For complex objects, prefer IDs:
# Wrong — Pydantic model serialization is fragile:
await redis.enqueue_job("send_invoice", invoice=invoice_model)
# Right — pass the ID, fetch in the task:
await redis.enqueue_job("send_invoice", invoice_id=invoice.id)
async def send_invoice(ctx, invoice_id: int):
invoice = await fetch_invoice(invoice_id)
...To switch arq’s serializer to pickle (supports anything but is slower and unsafe across versions):
import pickle
class WorkerSettings:
functions = [...]
job_serializer = pickle.dumps
job_deserializer = pickle.loadsCommon Mistake: Mixing serializers between producer and consumer. The web app’s arq pool and the worker must use the same serializer. Configure both.
Fix 4: Cron Jobs With cron()
arq’s cron uses keyword args, not a cron string:
from arq.cron import cron
cron_jobs = [
# Every day at 03:00 UTC
cron("send_report", hour={3}, minute={0}),
# Every weekday at 09:30
cron("morning_digest", weekday={"mon", "tue", "wed", "thu", "fri"}, hour={9}, minute={30}),
# Every 15 minutes
cron("health_check", minute={0, 15, 30, 45}),
]
class WorkerSettings:
functions = [...]
cron_jobs = cron_jobsThe keyword args (hour, minute, weekday, month, day) accept sets, lists, or scalars. arq fires the job when the current time matches all specified fields.
Pro Tip: Test cron schedules in the REPL before deploying:
from arq.cron import cron
from datetime import datetime
job = cron("test", hour={3}, minute={0})
print(job.next(datetime.utcnow()))
# datetime of the next scheduled firingFix 5: Job Deduplication With _job_id
If you enqueue the same job multiple times for the same external event, use _job_id for at-most-once semantics:
# At-most-once: only one job runs per user per day
job_id = f"daily_email:{user_id}:{date.today().isoformat()}"
await redis.enqueue_job("send_daily_email", user_id=user_id, _job_id=job_id)arq treats the _job_id as a unique key in Redis. A second enqueue_job with the same _job_id is a no-op (returns the existing job’s info).
For one-shot delayed scheduling:
from datetime import datetime, timedelta
await redis.enqueue_job(
"reminder",
user_id=42,
_defer_until=datetime.utcnow() + timedelta(hours=2),
)_defer_until (or _defer_by for a timedelta) schedules the job for later execution.
Fix 6: Retries and max_tries
By default, arq retries failed jobs up to 5 times with exponential backoff. Tune per task or globally:
from arq import Retry
async def call_flaky_api(ctx, payload):
try:
return await external_call(payload)
except TransientError:
# Tell arq to retry with custom delay.
raise Retry(defer=60)
class WorkerSettings:
functions = [...]
max_tries = 3 # Globally cap retriesRetry(defer=N) schedules the next attempt N seconds out. For permanent failures, raise a regular exception — once max_tries is exhausted, the job goes to the failed queue.
To inspect failures:
from arq import create_pool
redis = await create_pool(RedisSettings(...))
result = await redis.zrange("arq:result", 0, -1, withscores=True)
# Failed job results stay around until result_expireFix 7: Result Expiry and Job Status
By default, job results expire after a short window (around an hour in recent arq versions — check your installed version’s defaults). Configure explicitly:
class WorkerSettings:
functions = [...]
keep_result = 86400 # 24 hours in seconds — bumped to a full dayTo wait for and read a result:
job = await redis.enqueue_job("compute", input_data)
result = await job.result(timeout=60) # Blocks up to 60s
print(result)To check status without blocking:
status = await job.status() # JobStatus.queued / .deferred / .in_progress / .complete / .not_foundNote: .result() blocks the calling coroutine. In a web request handler, this turns your async queue into a sync RPC — usually wrong. Return the job ID, let the client poll.
Fix 8: Connection Pooling and Web Integration
For FastAPI / Starlette apps that enqueue jobs, share one Redis pool across requests:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings
redis_pool = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_pool
redis_pool = await create_pool(RedisSettings(host="localhost"))
yield
await redis_pool.close()
app = FastAPI(lifespan=lifespan)
@app.post("/email")
async def trigger_email(to: str):
job = await redis_pool.enqueue_job("send_email", to, "Welcome")
return {"job_id": job.job_id}Don’t create_pool per request — opening a Redis connection per request adds 1-10ms of latency and exhausts the connection limit fast under load.
Common Mistake: Forgetting await redis_pool.close() in shutdown. Without it, you leak connections that may interfere with redeploys (Redis maxclients reached).
Still Not Working?
A few less-obvious failures:
- Worker shows
0 functionseven withfunctions=[...]. Wrong import —arq tasks.WorkerSettingsneedstasks.pyin the working dir. Trypython -c "from tasks import WorkerSettings; print(WorkerSettings.functions)"to confirm. - Cron jobs run twice when you have two workers. That’s expected — arq doesn’t elect a leader. Use
_job_idwith a time-based suffix to dedupe, or run only one worker for cron and others for general jobs. max_jobsdoesn’t limit concurrency the way you think. It caps concurrent jobs on a single worker. To limit across workers, scale workers down or use Redis-based semaphores.enqueue_jobreturnsNoneinstead of a job. That happens when_job_idalready exists (deduplication). Check forNoneif you’re using_job_id.- Health checks fail silently. Set
health_check_intervaland arq logs its own health periodically; for monitoring, exposearq:health-check:<queue>from Redis. on_job_start/on_job_endnot firing. They’re class-level onWorkerSettings, not free functions. Check indentation.- Worker memory grows over time. A task leaks (open file, lingering HTTP connection). Use
on_shutdownto close shared resources, and considermax_jobs * burstrecycling. - Multiple queues, jobs land on the wrong one. Specify
_queue_nameon enqueue and passqueue_nameinWorkerSettings. Default queue isarq:queue.
For related Python async task queue issues, see Dramatiq not working, Celery task not received, APScheduler not working, and Redis connection refused.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: aiosqlite Not Working — Single Writer, WAL Mode, Row Factory, and Connection Patterns
How to fix Python aiosqlite errors — database is locked, WAL mode for concurrent reads, foreign_keys PRAGMA, row factory for dict-like rows, connection per request vs pool, datetime detect_types, and FastAPI integration.
Fix: Dramatiq Not Working — Actor Not Found, Broker Connection, Retries, and Django Integration
How to fix Dramatiq errors — ActorNotFound on worker, broker connection refused, Redis vs RabbitMQ trade-offs, message retries not triggering, async actors, and django-dramatiq AppConfig setup.
Fix: Python asyncio Blocking the Event Loop — Mixing Sync and Async Code
How to fix Python asyncio event loop blocking — using run_in_executor for sync calls, asyncio.to_thread, avoiding blocking I/O in coroutines, and detecting event loop stalls.
Fix: APScheduler Not Working — Jobs Not Running, Gunicorn Duplicates, and Timezone Issues
How to fix APScheduler — BackgroundScheduler exits when script ends, jobs run multiple times under Gunicorn, AsyncIOScheduler not firing, misfire_grace_time skips, and timezone-aware cron triggers.