Skip to content

Fix: arq Not Working — Worker Not Picking Jobs, WorkerSettings, Cron, Retries, and Result Expiry

FixDevs ·

Quick Answer

How to fix Python arq errors — worker can't find tasks, WorkerSettings class structure, cron syntax differences, msgpack serialization errors, job_id deduplication, result expiration, and Redis connection pooling.

The Error

You enqueue a job from FastAPI and the worker logs it as unknown:

WARNING - task my_app.tasks.send_email is not in the registered tasks

Or your worker starts but does nothing:

$ arq tasks.WorkerSettings
14:32:01: Starting worker for 0 functions
14:32:01: redis_version=7.2.4 mem_usage=2.34M clients_connected=1 db_keys=0
# Sits idle. Never runs anything.

Or you enqueue_job and get this:

TypeError: Object of type Decimal is not msgpack serializable

Or cron jobs aren’t firing at the times you expect:

cron_jobs = [
    cron("send_report", hour=3, minute=0),
]
# Configured. Job never runs at 3:00 AM.

Why This Happens

arq is small and opinionated. Most failures map to one of:

  • WorkerSettings must list every task. Unlike Celery’s auto-discovery, arq requires you to explicitly enumerate task functions in WorkerSettings.functions. Missing one → worker doesn’t register it → “not in the registered tasks” when enqueued.
  • arq uses msgpack by default. Job arguments are serialized with msgpack, which supports a smaller set of types than pickle. Decimal, datetime (in some cases), custom classes, and other Python objects fail.
  • Cron jobs use a custom DSL, not standard cron strings. cron("...", hour=3) is a function call with keyword args — not cron("0 3 * * *"). Mixing up the syntax silently produces wrong schedules.
  • Worker discovery is by import path. arq tasks.WorkerSettings must resolve in Python’s import path. Run from the wrong directory, get module-not-found.

Fix 1: Register Tasks in WorkerSettings.functions

Every task the worker should run must be in functions:

# tasks.py
from arq import create_pool
from arq.connections import RedisSettings

async def send_email(ctx, to: str, subject: str):
    print(f"Sending to {to}: {subject}")
    # ... actual send ...
    return {"ok": True}

async def process_payment(ctx, payment_id: int):
    # ...
    pass

class WorkerSettings:
    functions = [send_email, process_payment]
    redis_settings = RedisSettings(host="localhost", port=6379)
# enqueue.py (from your web app)
from arq import create_pool
from arq.connections import RedisSettings

async def main():
    redis = await create_pool(RedisSettings(host="localhost"))
    await redis.enqueue_job("send_email", "[email protected]", "Welcome")

Run the worker:

arq tasks.WorkerSettings

The first log line should list the registered functions:

Starting worker for 2 functions: send_email, process_payment

If you see 0 functions, your WorkerSettings class wasn’t loaded — usually a typo or wrong import path.

Pro Tip: Put all your task functions in one tasks.py (or a tasks/ package with __init__.py re-exporting them). Easier to keep WorkerSettings.functions in sync.

Fix 2: Task Function Signature Must Accept ctx

Every arq task takes ctx as the first positional argument:

async def send_email(ctx, to: str, subject: str):
    # ctx is a dict-like with: job_id, job_try, enqueue_time, etc.
    print(f"Job {ctx['job_id']} attempt {ctx['job_try']}")
    ...

The ctx dict carries runtime metadata. Don’t omit it — arq calls the function with ctx as the first arg unconditionally:

# Wrong:
async def send_email(to: str, subject: str): ...
# arq calls: send_email(ctx, "[email protected]", "Welcome")
# TypeError: send_email() takes 2 positional arguments but 3 were given

Inside the task, you can also pull shared resources from ctx. Set them up in on_startup:

class WorkerSettings:
    functions = [...]
    
    async def on_startup(ctx):
        ctx["http"] = httpx.AsyncClient()
    
    async def on_shutdown(ctx):
        await ctx["http"].aclose()

async def fetch_url(ctx, url: str):
    return (await ctx["http"].get(url)).text

on_startup runs once when the worker boots. The ctx it populates is shared across all jobs that worker processes.

Fix 3: Serialize Non-msgpack Types Explicitly

msgpack supports: int, float, str, bytes, bool, None, list, dict. Other types need conversion before enqueue:

from decimal import Decimal
from datetime import datetime

# Wrong — fails with serialization error:
await redis.enqueue_job("process_payment", amount=Decimal("9.99"))

# Right — convert at the call site:
await redis.enqueue_job("process_payment", amount=str(Decimal("9.99")))

For complex objects, prefer IDs:

# Wrong — Pydantic model serialization is fragile:
await redis.enqueue_job("send_invoice", invoice=invoice_model)

# Right — pass the ID, fetch in the task:
await redis.enqueue_job("send_invoice", invoice_id=invoice.id)

async def send_invoice(ctx, invoice_id: int):
    invoice = await fetch_invoice(invoice_id)
    ...

To switch arq’s serializer to pickle (supports anything but is slower and unsafe across versions):

import pickle

class WorkerSettings:
    functions = [...]
    job_serializer = pickle.dumps
    job_deserializer = pickle.loads

Common Mistake: Mixing serializers between producer and consumer. The web app’s arq pool and the worker must use the same serializer. Configure both.

Fix 4: Cron Jobs With cron()

arq’s cron uses keyword args, not a cron string:

from arq.cron import cron

cron_jobs = [
    # Every day at 03:00 UTC
    cron("send_report", hour={3}, minute={0}),
    
    # Every weekday at 09:30
    cron("morning_digest", weekday={"mon", "tue", "wed", "thu", "fri"}, hour={9}, minute={30}),
    
    # Every 15 minutes
    cron("health_check", minute={0, 15, 30, 45}),
]

class WorkerSettings:
    functions = [...]
    cron_jobs = cron_jobs

The keyword args (hour, minute, weekday, month, day) accept sets, lists, or scalars. arq fires the job when the current time matches all specified fields.

Pro Tip: Test cron schedules in the REPL before deploying:

from arq.cron import cron
from datetime import datetime

job = cron("test", hour={3}, minute={0})
print(job.next(datetime.utcnow()))
# datetime of the next scheduled firing

Fix 5: Job Deduplication With _job_id

If you enqueue the same job multiple times for the same external event, use _job_id for at-most-once semantics:

# At-most-once: only one job runs per user per day
job_id = f"daily_email:{user_id}:{date.today().isoformat()}"
await redis.enqueue_job("send_daily_email", user_id=user_id, _job_id=job_id)

arq treats the _job_id as a unique key in Redis. A second enqueue_job with the same _job_id is a no-op (returns the existing job’s info).

For one-shot delayed scheduling:

from datetime import datetime, timedelta

await redis.enqueue_job(
    "reminder",
    user_id=42,
    _defer_until=datetime.utcnow() + timedelta(hours=2),
)

_defer_until (or _defer_by for a timedelta) schedules the job for later execution.

Fix 6: Retries and max_tries

By default, arq retries failed jobs up to 5 times with exponential backoff. Tune per task or globally:

from arq import Retry

async def call_flaky_api(ctx, payload):
    try:
        return await external_call(payload)
    except TransientError:
        # Tell arq to retry with custom delay.
        raise Retry(defer=60)

class WorkerSettings:
    functions = [...]
    max_tries = 3  # Globally cap retries

Retry(defer=N) schedules the next attempt N seconds out. For permanent failures, raise a regular exception — once max_tries is exhausted, the job goes to the failed queue.

To inspect failures:

from arq import create_pool

redis = await create_pool(RedisSettings(...))
result = await redis.zrange("arq:result", 0, -1, withscores=True)
# Failed job results stay around until result_expire

Fix 7: Result Expiry and Job Status

By default, job results expire after a short window (around an hour in recent arq versions — check your installed version’s defaults). Configure explicitly:

class WorkerSettings:
    functions = [...]
    keep_result = 86400  # 24 hours in seconds — bumped to a full day

To wait for and read a result:

job = await redis.enqueue_job("compute", input_data)
result = await job.result(timeout=60)  # Blocks up to 60s
print(result)

To check status without blocking:

status = await job.status()  # JobStatus.queued / .deferred / .in_progress / .complete / .not_found

Note: .result() blocks the calling coroutine. In a web request handler, this turns your async queue into a sync RPC — usually wrong. Return the job ID, let the client poll.

Fix 8: Connection Pooling and Web Integration

For FastAPI / Starlette apps that enqueue jobs, share one Redis pool across requests:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from arq import create_pool
from arq.connections import RedisSettings

redis_pool = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global redis_pool
    redis_pool = await create_pool(RedisSettings(host="localhost"))
    yield
    await redis_pool.close()

app = FastAPI(lifespan=lifespan)

@app.post("/email")
async def trigger_email(to: str):
    job = await redis_pool.enqueue_job("send_email", to, "Welcome")
    return {"job_id": job.job_id}

Don’t create_pool per request — opening a Redis connection per request adds 1-10ms of latency and exhausts the connection limit fast under load.

Common Mistake: Forgetting await redis_pool.close() in shutdown. Without it, you leak connections that may interfere with redeploys (Redis maxclients reached).

Still Not Working?

A few less-obvious failures:

  • Worker shows 0 functions even with functions=[...]. Wrong import — arq tasks.WorkerSettings needs tasks.py in the working dir. Try python -c "from tasks import WorkerSettings; print(WorkerSettings.functions)" to confirm.
  • Cron jobs run twice when you have two workers. That’s expected — arq doesn’t elect a leader. Use _job_id with a time-based suffix to dedupe, or run only one worker for cron and others for general jobs.
  • max_jobs doesn’t limit concurrency the way you think. It caps concurrent jobs on a single worker. To limit across workers, scale workers down or use Redis-based semaphores.
  • enqueue_job returns None instead of a job. That happens when _job_id already exists (deduplication). Check for None if you’re using _job_id.
  • Health checks fail silently. Set health_check_interval and arq logs its own health periodically; for monitoring, expose arq:health-check:<queue> from Redis.
  • on_job_start / on_job_end not firing. They’re class-level on WorkerSettings, not free functions. Check indentation.
  • Worker memory grows over time. A task leaks (open file, lingering HTTP connection). Use on_shutdown to close shared resources, and consider max_jobs * burst recycling.
  • Multiple queues, jobs land on the wrong one. Specify _queue_name on enqueue and pass queue_name in WorkerSettings. Default queue is arq:queue.

For related Python async task queue issues, see Dramatiq not working, Celery task not received, APScheduler not working, and Redis connection refused.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles