Skip to content

Fix: APScheduler Not Working — Jobs Not Running, Gunicorn Duplicates, and Timezone Issues

FixDevs ·

Quick Answer

How to fix APScheduler — BackgroundScheduler exits when script ends, jobs run multiple times under Gunicorn, AsyncIOScheduler not firing, misfire_grace_time skips, and timezone-aware cron triggers.

The Error

You schedule a job and nothing happens, or the scheduler exits the moment you start it:

from apscheduler.schedulers.background import BackgroundScheduler

def job():
    print("running")

scheduler = BackgroundScheduler()
scheduler.add_job(job, "interval", seconds=5)
scheduler.start()
# Script exits. Job never runs.

Or your interval job fires twice (or three times, or four) every interval under Gunicorn:

[2026-05-18 12:00:00] running
[2026-05-18 12:00:00] running
[2026-05-18 12:00:00] running
[2026-05-18 12:00:00] running

Or you see this log line and the job is silently dropped:

Run time of job "job (trigger: cron[hour='3'])" was missed by 0:15:23

Or with AsyncIOScheduler:

RuntimeError: There is no current event loop in thread 'MainThread'.

Why This Happens

APScheduler has four scheduler classes, and picking the wrong one is the most common failure mode. Each scheduler is tied to a runtime model:

  • BlockingScheduler — blocks the calling thread. Use it in standalone scripts.
  • BackgroundScheduler — runs in a daemon thread. Use it inside a long-lived process (Flask dev server, manual app).
  • AsyncIOScheduler — runs on an asyncio event loop. Use it inside FastAPI, aiohttp, or any asyncio app.
  • GeventScheduler / TornadoScheduler / TwistedScheduler — for those specific frameworks.

If you call scheduler.start() on a BackgroundScheduler and your script exits, the daemon thread dies with it — that’s the “nothing happens” case.

The Gunicorn duplicate problem is different: Gunicorn forks N workers, and each worker initializes the scheduler independently. With --workers 4, every job fires four times. The fix isn’t “make APScheduler smarter” — it’s “stop running the scheduler in every worker.”

Misfires happen when the scheduler can’t run a job at its scheduled time (process was down, event loop blocked, job took too long) and the default misfire_grace_time of 1 second has passed. APScheduler then skips that run entirely.

Fix 1: Pick the Right Scheduler

For a standalone script, use BlockingScheduler — it blocks main() so the process stays alive:

from apscheduler.schedulers.blocking import BlockingScheduler

def job():
    print("running")

scheduler = BlockingScheduler()
scheduler.add_job(job, "interval", seconds=5)
scheduler.start()  # Blocks forever.

For Flask (sync), use BackgroundScheduler but ensure the process stays alive. Flask’s dev server does, so this works:

from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import atexit

app = Flask(__name__)
scheduler = BackgroundScheduler()
scheduler.add_job(lambda: print("running"), "interval", seconds=5)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())

For FastAPI, use AsyncIOScheduler and start it from a startup event so it attaches to the running loop:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler.add_job(lambda: print("running"), "interval", seconds=5)
    scheduler.start()
    yield
    scheduler.shutdown()

app = FastAPI(lifespan=lifespan)

Note: AsyncIOScheduler can call both sync and async functions. Sync functions run in a default executor; async functions run as tasks on the loop.

Fix 2: Stop Duplicate Jobs Under Gunicorn / Uvicorn Workers

The cleanest fix is to run the scheduler in a separate process — not in your web workers at all. Create a small scheduler.py and run it as its own service:

# scheduler.py
from apscheduler.schedulers.blocking import BlockingScheduler

def my_job():
    # Same business logic your API can also call.
    ...

scheduler = BlockingScheduler()
scheduler.add_job(my_job, "cron", hour=3)
scheduler.start()

Run it with python scheduler.py next to your gunicorn process. One scheduler, one process, no duplicates.

If you need the scheduler inside the web process, gate it to a single worker. With Gunicorn’s preload_app = True you can use a file-lock pattern:

import fcntl
import sys
from apscheduler.schedulers.background import BackgroundScheduler

def start_scheduler_once():
    f = open("/tmp/scheduler.lock", "w")
    try:
        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
    except BlockingIOError:
        return  # Another worker holds the lock.
    scheduler = BackgroundScheduler()
    scheduler.add_job(my_job, "cron", hour=3)
    scheduler.start()
    # Keep f in scope so the lock is held.
    sys.modules["__scheduler_lock__"] = f

Pro Tip: A separate process is simpler and survives gunicorn --reload, worker restarts, and --max-requests recycling. The lock pattern is fragile under those conditions.

For Kubernetes, run the scheduler as its own Deployment with replicas: 1 (or a CronJob if the jobs are infrequent).

Fix 3: Use a Persistent Job Store

By default APScheduler keeps jobs in memory. Restart the process and every job is gone, including ones you added via scheduler.add_job at runtime. Use SQLAlchemyJobStore for persistence:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

jobstores = {
    "default": SQLAlchemyJobStore(url="postgresql://user:pass@localhost/db")
}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.start()

With a persistent store, use replace_existing=True and a stable id so re-adding the same job on startup doesn’t error:

scheduler.add_job(
    my_job,
    "cron",
    hour=3,
    id="daily_report",
    replace_existing=True,
)

Without replace_existing=True you get ConflictingIdError on the second startup.

Fix 4: Set misfire_grace_time and coalesce

If your app sleeps, GC stalls, or a previous run overran, the scheduled time can pass before APScheduler gets to it. The default misfire_grace_time=1 second is too tight for most real workloads.

Bump it on the job:

scheduler.add_job(
    my_job,
    "cron",
    hour=3,
    misfire_grace_time=300,  # 5 minutes
    coalesce=True,  # If multiple runs were missed, only fire once.
)

Or set defaults for all jobs:

scheduler = BackgroundScheduler(
    job_defaults={
        "misfire_grace_time": 300,
        "coalesce": True,
        "max_instances": 1,
    }
)

max_instances=1 prevents the same job from running concurrently if a previous run hasn’t finished. Without it, a slow job can overlap with itself and corrupt state.

Fix 5: Use Timezone-Aware Cron Triggers

cron triggers default to local time, which silently changes meaning across DST boundaries and breaks when you deploy from a laptop in JST to a server in UTC. Always pass an explicit timezone:

from zoneinfo import ZoneInfo

scheduler.add_job(
    my_job,
    "cron",
    hour=3,
    timezone=ZoneInfo("Asia/Tokyo"),
)

Or set the timezone for the whole scheduler:

scheduler = BackgroundScheduler(timezone=ZoneInfo("UTC"))

Common Mistake: Passing a pytz timezone to APScheduler 4+. APScheduler 4 uses zoneinfo exclusively. If you still rely on pytz you’ll see TypeError: tzinfo argument must be None or of a tzinfo subclass. Use zoneinfo.ZoneInfo instead.

Fix 6: AsyncIOScheduler Inside an Existing Event Loop

If you call AsyncIOScheduler().start() outside an async context, you get RuntimeError: There is no current event loop. The scheduler needs a running loop.

Either start it from an async function:

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler

async def main():
    scheduler = AsyncIOScheduler()
    scheduler.add_job(my_async_job, "interval", seconds=5)
    scheduler.start()
    while True:
        await asyncio.sleep(3600)

asyncio.run(main())

Or pass the loop explicitly if you’re starting it from sync code that owns a loop:

loop = asyncio.new_event_loop()
scheduler = AsyncIOScheduler(event_loop=loop)

For FastAPI, the lifespan pattern in Fix 1 is the safest — FastAPI’s loop is already running when lifespan enters.

Fix 7: Async Jobs Run Synchronously

If you pass an async def function to BackgroundScheduler or BlockingScheduler, APScheduler 3 will call it like a sync function and you’ll get a coroutine object that never executes:

async def my_job():
    await asyncio.sleep(1)
    print("done")

# Wrong: returns a coroutine that's discarded.
scheduler.add_job(my_job, "interval", seconds=5)

Use AsyncIOScheduler for async functions, or wrap the coroutine:

import asyncio

def run_async_job():
    asyncio.run(my_job())

scheduler.add_job(run_async_job, "interval", seconds=5)

APScheduler 4 added native async support across scheduler types — if you can upgrade, do so and check the migration notes.

Fix 8: Logging Reveals What’s Actually Happening

When jobs silently fail, turn on APScheduler’s logger to see misfires, exceptions, and trigger calculations:

import logging

logging.basicConfig(level=logging.INFO)
logging.getLogger("apscheduler").setLevel(logging.DEBUG)

You’ll start seeing Added job, Scheduler started, Running job, and crucially Job ... raised an exception — APScheduler swallows job exceptions by default and only logs them.

Add an event listener to surface failures in your own logging or alerting:

from apscheduler.events import EVENT_JOB_ERROR

def on_error(event):
    print(f"Job {event.job_id} failed: {event.exception}")

scheduler.add_listener(on_error, EVENT_JOB_ERROR)

Still Not Working?

A few less-obvious failures:

  • AlreadyRunningError on hot reload. Flask or FastAPI --reload re-imports your module, and scheduler.start() runs again. Guard with if not scheduler.running: scheduler.start().
  • Cron hour="*/2" not firing as expected. The wildcard interpretation is “every 2 hours starting at 0,” not “every 2 hours from now.” Use interval if you want offset-from-start semantics.
  • SQLAlchemyJobStore pickle errors after deploy. Pickled jobs reference module paths. Renaming a module or moving a function breaks all stored jobs. Clear the jobs table or migrate carefully.
  • Job runs at wrong time after server reboot. Server clock is wrong, or the scheduler is using UTC while you assumed local. Run timedatectl and check scheduler.timezone.
  • max_instances reached, dropping run. A previous job is still running. Either shorten the job, raise max_instances, or use a cron trigger with misfire_grace_time so the next slot picks it up cleanly.
  • Jobs disappear after Docker container restart. You’re using the default MemoryJobStore. Switch to SQLAlchemyJobStore and mount the database, or use Redis with RedisJobStore.

For related Python scheduling and async issues, see Celery beat not working, FastAPI background tasks not working, Python asyncio not running, and Linux cron job not running.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles