Fix: APScheduler Not Working — Jobs Not Running, Gunicorn Duplicates, and Timezone Issues
Quick Answer
How to fix APScheduler — BackgroundScheduler exits when script ends, jobs run multiple times under Gunicorn, AsyncIOScheduler not firing, misfire_grace_time skips, and timezone-aware cron triggers.
The Error
You schedule a job and nothing happens, or the scheduler exits the moment you start it:
from apscheduler.schedulers.background import BackgroundScheduler
def job():
print("running")
scheduler = BackgroundScheduler()
scheduler.add_job(job, "interval", seconds=5)
scheduler.start()
# Script exits. Job never runs.Or your interval job fires twice (or three times, or four) every interval under Gunicorn:
[2026-05-18 12:00:00] running
[2026-05-18 12:00:00] running
[2026-05-18 12:00:00] running
[2026-05-18 12:00:00] runningOr you see this log line and the job is silently dropped:
Run time of job "job (trigger: cron[hour='3'])" was missed by 0:15:23Or with AsyncIOScheduler:
RuntimeError: There is no current event loop in thread 'MainThread'.Why This Happens
APScheduler has four scheduler classes, and picking the wrong one is the most common failure mode. Each scheduler is tied to a runtime model:
BlockingScheduler— blocks the calling thread. Use it in standalone scripts.BackgroundScheduler— runs in a daemon thread. Use it inside a long-lived process (Flask dev server, manual app).AsyncIOScheduler— runs on an asyncio event loop. Use it inside FastAPI, aiohttp, or any asyncio app.GeventScheduler/TornadoScheduler/TwistedScheduler— for those specific frameworks.
If you call scheduler.start() on a BackgroundScheduler and your script exits, the daemon thread dies with it — that’s the “nothing happens” case.
The Gunicorn duplicate problem is different: Gunicorn forks N workers, and each worker initializes the scheduler independently. With --workers 4, every job fires four times. The fix isn’t “make APScheduler smarter” — it’s “stop running the scheduler in every worker.”
Misfires happen when the scheduler can’t run a job at its scheduled time (process was down, event loop blocked, job took too long) and the default misfire_grace_time of 1 second has passed. APScheduler then skips that run entirely.
Fix 1: Pick the Right Scheduler
For a standalone script, use BlockingScheduler — it blocks main() so the process stays alive:
from apscheduler.schedulers.blocking import BlockingScheduler
def job():
print("running")
scheduler = BlockingScheduler()
scheduler.add_job(job, "interval", seconds=5)
scheduler.start() # Blocks forever.For Flask (sync), use BackgroundScheduler but ensure the process stays alive. Flask’s dev server does, so this works:
from flask import Flask
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
app = Flask(__name__)
scheduler = BackgroundScheduler()
scheduler.add_job(lambda: print("running"), "interval", seconds=5)
scheduler.start()
atexit.register(lambda: scheduler.shutdown())For FastAPI, use AsyncIOScheduler and start it from a startup event so it attaches to the running loop:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.add_job(lambda: print("running"), "interval", seconds=5)
scheduler.start()
yield
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)Note: AsyncIOScheduler can call both sync and async functions. Sync functions run in a default executor; async functions run as tasks on the loop.
Fix 2: Stop Duplicate Jobs Under Gunicorn / Uvicorn Workers
The cleanest fix is to run the scheduler in a separate process — not in your web workers at all. Create a small scheduler.py and run it as its own service:
# scheduler.py
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
# Same business logic your API can also call.
...
scheduler = BlockingScheduler()
scheduler.add_job(my_job, "cron", hour=3)
scheduler.start()Run it with python scheduler.py next to your gunicorn process. One scheduler, one process, no duplicates.
If you need the scheduler inside the web process, gate it to a single worker. With Gunicorn’s preload_app = True you can use a file-lock pattern:
import fcntl
import sys
from apscheduler.schedulers.background import BackgroundScheduler
def start_scheduler_once():
f = open("/tmp/scheduler.lock", "w")
try:
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
return # Another worker holds the lock.
scheduler = BackgroundScheduler()
scheduler.add_job(my_job, "cron", hour=3)
scheduler.start()
# Keep f in scope so the lock is held.
sys.modules["__scheduler_lock__"] = fPro Tip: A separate process is simpler and survives gunicorn --reload, worker restarts, and --max-requests recycling. The lock pattern is fragile under those conditions.
For Kubernetes, run the scheduler as its own Deployment with replicas: 1 (or a CronJob if the jobs are infrequent).
Fix 3: Use a Persistent Job Store
By default APScheduler keeps jobs in memory. Restart the process and every job is gone, including ones you added via scheduler.add_job at runtime. Use SQLAlchemyJobStore for persistence:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
"default": SQLAlchemyJobStore(url="postgresql://user:pass@localhost/db")
}
scheduler = BackgroundScheduler(jobstores=jobstores)
scheduler.start()With a persistent store, use replace_existing=True and a stable id so re-adding the same job on startup doesn’t error:
scheduler.add_job(
my_job,
"cron",
hour=3,
id="daily_report",
replace_existing=True,
)Without replace_existing=True you get ConflictingIdError on the second startup.
Fix 4: Set misfire_grace_time and coalesce
If your app sleeps, GC stalls, or a previous run overran, the scheduled time can pass before APScheduler gets to it. The default misfire_grace_time=1 second is too tight for most real workloads.
Bump it on the job:
scheduler.add_job(
my_job,
"cron",
hour=3,
misfire_grace_time=300, # 5 minutes
coalesce=True, # If multiple runs were missed, only fire once.
)Or set defaults for all jobs:
scheduler = BackgroundScheduler(
job_defaults={
"misfire_grace_time": 300,
"coalesce": True,
"max_instances": 1,
}
)max_instances=1 prevents the same job from running concurrently if a previous run hasn’t finished. Without it, a slow job can overlap with itself and corrupt state.
Fix 5: Use Timezone-Aware Cron Triggers
cron triggers default to local time, which silently changes meaning across DST boundaries and breaks when you deploy from a laptop in JST to a server in UTC. Always pass an explicit timezone:
from zoneinfo import ZoneInfo
scheduler.add_job(
my_job,
"cron",
hour=3,
timezone=ZoneInfo("Asia/Tokyo"),
)Or set the timezone for the whole scheduler:
scheduler = BackgroundScheduler(timezone=ZoneInfo("UTC"))Common Mistake: Passing a pytz timezone to APScheduler 4+. APScheduler 4 uses zoneinfo exclusively. If you still rely on pytz you’ll see TypeError: tzinfo argument must be None or of a tzinfo subclass. Use zoneinfo.ZoneInfo instead.
Fix 6: AsyncIOScheduler Inside an Existing Event Loop
If you call AsyncIOScheduler().start() outside an async context, you get RuntimeError: There is no current event loop. The scheduler needs a running loop.
Either start it from an async function:
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def main():
scheduler = AsyncIOScheduler()
scheduler.add_job(my_async_job, "interval", seconds=5)
scheduler.start()
while True:
await asyncio.sleep(3600)
asyncio.run(main())Or pass the loop explicitly if you’re starting it from sync code that owns a loop:
loop = asyncio.new_event_loop()
scheduler = AsyncIOScheduler(event_loop=loop)For FastAPI, the lifespan pattern in Fix 1 is the safest — FastAPI’s loop is already running when lifespan enters.
Fix 7: Async Jobs Run Synchronously
If you pass an async def function to BackgroundScheduler or BlockingScheduler, APScheduler 3 will call it like a sync function and you’ll get a coroutine object that never executes:
async def my_job():
await asyncio.sleep(1)
print("done")
# Wrong: returns a coroutine that's discarded.
scheduler.add_job(my_job, "interval", seconds=5)Use AsyncIOScheduler for async functions, or wrap the coroutine:
import asyncio
def run_async_job():
asyncio.run(my_job())
scheduler.add_job(run_async_job, "interval", seconds=5)APScheduler 4 added native async support across scheduler types — if you can upgrade, do so and check the migration notes.
Fix 8: Logging Reveals What’s Actually Happening
When jobs silently fail, turn on APScheduler’s logger to see misfires, exceptions, and trigger calculations:
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("apscheduler").setLevel(logging.DEBUG)You’ll start seeing Added job, Scheduler started, Running job, and crucially Job ... raised an exception — APScheduler swallows job exceptions by default and only logs them.
Add an event listener to surface failures in your own logging or alerting:
from apscheduler.events import EVENT_JOB_ERROR
def on_error(event):
print(f"Job {event.job_id} failed: {event.exception}")
scheduler.add_listener(on_error, EVENT_JOB_ERROR)Still Not Working?
A few less-obvious failures:
AlreadyRunningErroron hot reload. Flask or FastAPI--reloadre-imports your module, andscheduler.start()runs again. Guard withif not scheduler.running: scheduler.start().- Cron
hour="*/2"not firing as expected. The wildcard interpretation is “every 2 hours starting at 0,” not “every 2 hours from now.” Useintervalif you want offset-from-start semantics. SQLAlchemyJobStorepickleerrors after deploy. Pickled jobs reference module paths. Renaming a module or moving a function breaks all stored jobs. Clear the jobs table or migrate carefully.- Job runs at wrong time after server reboot. Server clock is wrong, or the scheduler is using UTC while you assumed local. Run
timedatectland checkscheduler.timezone. max_instancesreached, dropping run. A previous job is still running. Either shorten the job, raisemax_instances, or use acrontrigger withmisfire_grace_timeso the next slot picks it up cleanly.- Jobs disappear after Docker container restart. You’re using the default
MemoryJobStore. Switch toSQLAlchemyJobStoreand mount the database, or use Redis withRedisJobStore.
For related Python scheduling and async issues, see Celery beat not working, FastAPI background tasks not working, Python asyncio not running, and Linux cron job not running.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: aiosqlite Not Working — Single Writer, WAL Mode, Row Factory, and Connection Patterns
How to fix Python aiosqlite errors — database is locked, WAL mode for concurrent reads, foreign_keys PRAGMA, row factory for dict-like rows, connection per request vs pool, datetime detect_types, and FastAPI integration.
Fix: arq Not Working — Worker Not Picking Jobs, WorkerSettings, Cron, Retries, and Result Expiry
How to fix Python arq errors — worker can't find tasks, WorkerSettings class structure, cron syntax differences, msgpack serialization errors, job_id deduplication, result expiration, and Redis connection pooling.
Fix: Marshmallow Not Working — Schema Errors, Load vs Dump, and Field Validation
How to fix Marshmallow errors — Schema not validated on dump, ValidationError messages format, unknown field handling, missing vs default, post_load object construction, and Marshmallow 3 to 4 migration.
Fix: msgspec Not Working — Struct Definition, Type Validation, and JSON/MessagePack Encoding
How to fix msgspec errors — Struct field type not supported, ValidationError on decode, msgspec vs Pydantic differences, custom type hooks, frozen Struct mutation, and JSON Schema generation.