Skip to content

Fix: FastAPI BackgroundTasks Not Working — Task Not Running or Dependency Errors

FixDevs · (Updated: )

Part of:  Python Errors

Quick Answer

How to fix FastAPI BackgroundTasks — task not executing, dependency injection in tasks, error handling, Celery for heavy tasks, and lifespan-managed background workers.

The Problem

A FastAPI BackgroundTasks function never runs:

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def send_email(email: str):
    print(f"Sending email to {email}")  # Never prints

@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email, email)
    return {"message": "Registered"}

Or the task runs but raises an unhandled exception that silently disappears:

def process_data(data: dict):
    result = data["key"]  # KeyError — but the endpoint returns 200 anyway

Or you need to use a database session inside a background task, but get:

sqlalchemy.exc.InvalidRequestError: Instance <User> is not bound to a Session

Or a task takes too long and blocks the server response.

Why This Happens

FastAPI BackgroundTasks runs after the response is sent to the client. Common failures:

  • Task function isn’t actually being addedbackground_tasks.add_task() must be called before the endpoint returns. If an early return or exception happens first, the task is never queued.
  • Async vs sync task confusionBackgroundTasks can run both async def and def functions, but there are differences in how they execute.
  • Database session closed before task runs — the database session created per-request is closed when the request ends. By the time BackgroundTasks runs, the session is gone. Passing ORM objects to tasks causes “detached instance” errors.
  • Exceptions swallowed silently — if a background task raises an exception, FastAPI logs it but doesn’t crash the server. The error is silent from the client’s perspective.
  • BackgroundTasks is not for heavy work — it runs in the same process, blocking the event loop for CPU-bound tasks or tying up a worker for long-running I/O tasks. For production workloads, use Celery or similar.

The dangerous failure mode is the silent one. The client sees 200 OK, your metrics show a healthy 200 rate, and yet the email never goes out, the webhook never fires, the analytics event never lands. From the outside, your service looks perfectly healthy. From the user’s perspective, “the welcome email is broken.” This is invisible to typical HTTP monitoring and only surfaces when a customer complains or a downstream system reports missing data.

The blast radius is “everything that happens after the response is sent.” For most apps, that means transactional emails, webhook deliveries, cache warming, audit log writes, and analytics events. None of these are critical to the immediate response, but each one is critical to the business. A queue of silently failing background tasks is one of the worst incident classes because it accumulates damage without any single error spike to alert on. Recovery often involves replaying days of missed work from raw logs.

Fix 1: Verify the Task Is Registered Before Return

add_task() must be called before the function returns:

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def send_welcome_email(email: str, name: str):
    # Simulate sending email
    import time
    time.sleep(2)
    print(f"Email sent to {email}")

@app.post("/register")
async def register(
    email: str,
    name: str,
    background_tasks: BackgroundTasks,
):
    # WRONG — early return before adding the task
    if not email:
        return {"error": "Email required"}
    # Task never added if we returned above

    # CORRECT — add task, then return
    background_tasks.add_task(send_welcome_email, email, name)
    return {"message": "Registered successfully"}


# Multiple background tasks — all run after the response
@app.post("/order")
async def create_order(order_id: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_confirmation_email, order_id)
    background_tasks.add_task(update_inventory, order_id)
    background_tasks.add_task(notify_warehouse, order_id)
    return {"order_id": order_id, "status": "processing"}

Pass arguments correctly:

# Positional args after the function
background_tasks.add_task(my_function, arg1, arg2)

# Keyword args
background_tasks.add_task(my_function, email=email, subject="Welcome")

# Mixed
background_tasks.add_task(my_function, email, subject="Welcome", delay=5)

Fix 2: Use Async Tasks for I/O-Bound Work

BackgroundTasks supports both async def and regular def functions. For I/O operations like HTTP requests or file writes, use async def:

import httpx
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

# WRONG — sync function blocks the event loop for I/O
def sync_send_webhook(url: str, data: dict):
    import requests
    requests.post(url, json=data)  # Blocks the thread

# CORRECT — async function for I/O-bound work
async def async_send_webhook(url: str, data: dict):
    async with httpx.AsyncClient() as client:
        await client.post(url, json=data)

# CORRECT — sync function for CPU-bound work (runs in thread pool)
def sync_process_image(image_path: str):
    # CPU-bound: runs in thread pool, won't block event loop
    from PIL import Image
    img = Image.open(image_path)
    img.thumbnail((200, 200))
    img.save(image_path.replace('.jpg', '_thumb.jpg'))

@app.post("/process")
async def process(background_tasks: BackgroundTasks):
    background_tasks.add_task(async_send_webhook, "https://webhook.site/...", {"event": "processed"})
    background_tasks.add_task(sync_process_image, "/tmp/upload.jpg")
    return {"status": "processing"}

Fix 3: Create a New Database Session Inside the Task

Never reuse the request’s database session inside a background task. Create a new session:

from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from database import SessionLocal, User

app = FastAPI()

# WRONG — passing the request session to the background task
def update_last_login_wrong(user: User, db: Session):
    # db session is already closed by the time this runs
    user.last_login = datetime.utcnow()
    db.commit()  # sqlalchemy.exc.InvalidRequestError

# CORRECT — create a fresh session inside the task
def update_last_login(user_id: int):
    db = SessionLocal()  # New session
    try:
        user = db.query(User).filter(User.id == user_id).first()
        if user:
            user.last_login = datetime.utcnow()
            db.commit()
    except Exception:
        db.rollback()
        raise
    finally:
        db.close()  # Always close

@app.post("/login")
async def login(
    user_id: int,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db),
):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # Pass the ID (a plain value), not the ORM object
    background_tasks.add_task(update_last_login, user.id)

    return {"message": "Logged in"}

With SQLAlchemy async sessions:

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker

async_session_factory = async_sessionmaker(engine, expire_on_commit=False)

async def async_update_stats(user_id: int, event: str):
    async with async_session_factory() as session:
        async with session.begin():
            user = await session.get(User, user_id)
            if user:
                user.event_count += 1

Fix 4: Add Error Handling to Background Tasks

Exceptions in background tasks are silently swallowed from the client’s perspective. Add explicit error handling and logging:

import logging
from fastapi import FastAPI, BackgroundTasks

logger = logging.getLogger(__name__)
app = FastAPI()

def send_email(to: str, subject: str, body: str):
    try:
        # Email sending logic
        import smtplib
        # ... send email ...
        logger.info(f"Email sent to {to}")
    except smtplib.SMTPException as e:
        logger.error(f"Failed to send email to {to}: {e}")
        # Optionally: retry logic, dead letter queue, alert
    except Exception as e:
        logger.exception(f"Unexpected error sending email to {to}")
        raise  # Re-raise if you want FastAPI to log the traceback

@app.post("/notify")
async def notify(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(
        send_email,
        to=email,
        subject="Notification",
        body="You have a new message",
    )
    return {"message": "Notification queued"}

Wrap tasks in an error handler:

import functools

def with_error_handling(func):
    @functools.wraps(func)
    async def async_wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            logger.exception(f"Background task {func.__name__} failed: {e}")

    @functools.wraps(func)
    def sync_wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.exception(f"Background task {func.__name__} failed: {e}")

    if asyncio.iscoroutinefunction(func):
        return async_wrapper
    return sync_wrapper


@with_error_handling
def send_email(to: str, body: str):
    # Any exception here is caught and logged
    ...

Fix 5: Use Lifespan for Long-Running Background Workers

BackgroundTasks is per-request. For tasks that need to run continuously (polling, periodic cleanup), use the lifespan context manager:

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI

async def periodic_cleanup():
    """Runs in the background for the lifetime of the application."""
    while True:
        try:
            # Clean up expired sessions, temp files, etc.
            await cleanup_expired_sessions()
            logger.info("Cleanup completed")
        except Exception as e:
            logger.error(f"Cleanup failed: {e}")
        await asyncio.sleep(300)  # Run every 5 minutes

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: launch background task
    task = asyncio.create_task(periodic_cleanup())
    yield
    # Shutdown: cancel the task
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

app = FastAPI(lifespan=lifespan)

@app.get("/health")
async def health():
    return {"status": "ok"}

Fix 6: Use Celery for Heavy or Reliable Tasks

BackgroundTasks is fire-and-forget: if the server crashes, tasks are lost. For reliability or CPU-heavy work, use Celery:

# celery_app.py
from celery import Celery

celery_app = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)

@celery_app.task
def send_email_task(to: str, subject: str, body: str):
    # This runs in a separate Celery worker process
    send_email(to, subject, body)

@celery_app.task(bind=True, max_retries=3)
def process_payment(self, order_id: str):
    try:
        charge_payment(order_id)
    except PaymentError as e:
        # Retry after 60 seconds, up to 3 times
        raise self.retry(exc=e, countdown=60)
# main.py
from fastapi import FastAPI
from celery_app import send_email_task, process_payment

app = FastAPI()

@app.post("/order")
async def create_order(order_id: str, email: str):
    # Dispatch to Celery — survives server restarts
    process_payment.delay(order_id)
    send_email_task.delay(email, "Order Confirmation", f"Order {order_id} received")
    return {"order_id": order_id}

When to use BackgroundTasks vs Celery:

BackgroundTasksCelery
SetupZero configRequires broker (Redis/RabbitMQ)
DurabilityLost on crashPersisted in broker
RetriesManualBuilt-in
MonitoringNoneFlower dashboard
Use caseQuick fire-and-forgetReliable, retriable, scheduled

Fix 7: Add Observability for Background Tasks

The most dangerous property of BackgroundTasks is that failures are invisible. Add metrics so silent failures become loud:

import logging
from prometheus_client import Counter, Histogram
from fastapi import FastAPI, BackgroundTasks

logger = logging.getLogger(__name__)

task_started = Counter('background_task_started_total', 'Tasks started', ['task'])
task_succeeded = Counter('background_task_succeeded_total', 'Tasks succeeded', ['task'])
task_failed = Counter('background_task_failed_total', 'Tasks failed', ['task'])
task_duration = Histogram('background_task_duration_seconds', 'Task duration', ['task'])

def instrumented(name: str):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            task_started.labels(task=name).inc()
            with task_duration.labels(task=name).time():
                try:
                    result = func(*args, **kwargs)
                    task_succeeded.labels(task=name).inc()
                    return result
                except Exception:
                    task_failed.labels(task=name).inc()
                    logger.exception(f"Task {name} failed")
                    raise
        return wrapper
    return decorator

@instrumented("send_welcome_email")
def send_welcome_email(email: str):
    # ...

Alert on task_started_total - task_succeeded_total - task_failed_total > 0 for more than 5 minutes. That gap is the number of in-flight tasks. If it stays high, tasks are getting stuck. Also alert when task_failed_total increases at all — every failure represents a missed side effect.

Pro Tip: A “completed tasks” counter alone is not enough. You need a “started” counter to detect the case where tasks never even start (e.g., the worker crashed mid-execution). The gap between started and finished is the silent failure detector.

Production Incident Playbook: Tasks Silently Not Executing

Scenario: Support reports that “users are not receiving welcome emails.” Your API returns 200 for /register, your logs show no errors, your dashboards are green. But somewhere between the response and the user’s inbox, the work is being dropped.

Blast radius: Every side effect after the response — emails, webhooks, audit logs, cache invalidations, analytics events. The user-facing API works, so traditional uptime monitors stay green. Damage accumulates silently.

Detection: With Fix 7 in place, your alert fires on the gap between task_started and task_succeeded + task_failed. Without it, the first signal is a customer complaint, which can be hours or days late.

Diagnosis checklist:

  1. Check application logs for any exception traces from background task names. FastAPI logs unhandled exceptions in BackgroundTasks but does not surface them to clients.
  2. Check the process lifecycle. If your process restarted (deploy, OOM, crash) while tasks were in flight, those tasks are lost. BackgroundTasks has no persistence.
  3. Verify background_tasks.add_task() is actually being called. Add a log line immediately before and after the call.
  4. Check for early returns or raised exceptions in the endpoint function that occur before add_task() is called.
  5. If using a database, verify the task is creating a new session (Fix 3), not reusing the request’s session.
  6. If running multiple workers (e.g., uvicorn --workers 4), tasks run only in the worker that handled the request — confirm the worker process is still alive when the task should fire.

Recovery: For lost tasks, you need a replay mechanism. Pull the request data from access logs (or an audit log table) for the affected time window and re-run the work. For one-off recovery, a script that walks the user table and triggers the missing side effects works. For repeated incidents, the right answer is to migrate to a durable queue (Celery, RQ, or a managed service).

Prevention: For anything where loss is unacceptable (payments, transactional emails, billing events), do not use BackgroundTasks. Use Celery with a durable broker. Reserve BackgroundTasks for truly fire-and-forget work where loss is acceptable (debug logging, non-critical metrics). See Fix: FastAPI Dependency Injection Error for related patterns on getting state into tasks safely.

Still Not Working?

Task runs but changes aren’t visible — if the task writes to a database and you query immediately after the endpoint returns, the task may not have finished yet. BackgroundTasks runs after the response is sent but there’s no guarantee of when it completes.

BackgroundTasks in test mode — by default, FastAPI’s TestClient (Starlette’s TestClient) runs background tasks synchronously after the response, so they do execute during tests. If you’re using an async test client (httpx.AsyncClient), background tasks may not run — use TestClient for testing background tasks.

BackgroundTasks doesn’t receive dependency-injected values — you can’t use Depends() inside the task function itself (it’s not an endpoint). Pass the values you need as arguments when calling add_task().

CPU-bound tasks freeze the serverBackgroundTasks runs in the same process. Computationally heavy work (image processing, data analysis) blocks other requests. Use a thread pool (run_in_executor) or Celery workers for CPU-bound tasks.

Tasks lost after deploy or restartBackgroundTasks has zero persistence. When uvicorn or gunicorn restarts the worker (during a deploy, after OOM, or on signal), every in-flight task is dropped. If you ship deploys frequently, this drops more tasks than you think. Move critical work to Celery or implement an outbox pattern.

Tasks run on the wrong worker — if you scale horizontally with multiple processes (uvicorn --workers N) or multiple containers, the task runs in the same process that handled the request. There is no load balancing across workers. If that worker is under load or pinned to a slow request, your task waits. See Fix: Uvicorn Not Working for related worker behavior.

HTTPS callbacks hang the task — outbound HTTP calls from a background task can hang indefinitely without a timeout. Always set timeouts: httpx.AsyncClient(timeout=10.0) or requests.post(url, timeout=10). A single hung task can tie up a worker for hours. See Fix: Python Requests Timeout for safe defaults.

For related FastAPI issues, see Fix: FastAPI 422 Unprocessable Entity and Fix: FastAPI Dependency Injection Error.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles