Fix: Python asyncio Blocking the Event Loop — Mixing Sync and Async Code
Part of: Python Errors
Quick Answer
How to fix Python asyncio event loop blocking — using run_in_executor for sync calls, asyncio.to_thread, avoiding blocking I/O in coroutines, and detecting event loop stalls.
The Problem
An async Python application becomes unresponsive under load:
async def handle_request(request):
# This blocks the entire event loop for every request
data = requests.get('https://api.example.com/data') # Sync HTTP call
return process(data.json())Or an async route is slow despite using async def:
@app.get("/report")
async def generate_report():
# Looks async, but calls a blocking CPU-bound function
report = generate_pdf(data) # Synchronous — blocks all other requests
return {"report": report}Or asyncio.run() called inside an already-running event loop:
RuntimeError: This event loop is already running.Or mixing time.sleep() with async code:
async def poll():
while True:
await check_status()
time.sleep(5) # Blocks the event loop — no other coroutines run during sleepWhy This Happens
Python’s asyncio runs coroutines on a single-threaded event loop. The event loop can only do one thing at a time — it switches between coroutines at await points. Any synchronous (blocking) code that runs inside a coroutine holds the event loop hostage for its entire duration.
This design is intentional. Async code achieves concurrency by cooperative multitasking: each coroutine voluntarily yields at await, letting others run. When a coroutine calls a blocking function — one that never yields — every other coroutine in the application stalls until that call finishes. In a web server handling hundreds of requests, a single 200ms database query through a synchronous driver freezes all other responses for that same 200ms.
The confusion deepens because async def does not make a function non-blocking. Declaring a function with async def simply means it can contain await statements and returns a coroutine object. A function with zero await expressions runs synchronously even with the async def prefix — the event loop never gets a chance to switch away from it.
Common blocking patterns:
- Blocking I/O —
requests.get(),open()with standard file I/O,psycopg2queries run synchronously. The event loop can’t switch to other coroutines while waiting. - CPU-bound operations — image processing, PDF generation, data transformation. Python’s GIL means only one thread runs Python code at a time — CPU-bound work in a coroutine blocks all others.
time.sleep()— blocks the thread, unlikeawait asyncio.sleep()which yields control back to the event loop.- Synchronous ORM calls — SQLAlchemy’s standard session is synchronous. Using it in an async route blocks the event loop.
asyncio.run()inside async code —asyncio.run()creates a new event loop. Calling it inside an already-running loop raisesRuntimeError.
Diagnostic Timeline
When an async Python service slows down or hangs, the debugging path depends on which exact problem you have. Here is how to walk through it systematically.
Minute 0 — Reproduce and classify the symptom. You see one of two things: (a) a RuntimeError: This event loop is already running traceback, or (b) the application responds slowly or stops responding entirely with no traceback. For (a), skip to Fix 6. For (b), continue.
Minute 2 — Determine the direction of the mismatch. Are you calling sync code from inside an async function (the most common case), or calling async code from a sync context? Run PYTHONASYNCIODEBUG=1 python server.py and look for slow callback warnings. If the debug output says a callback took 0.5 seconds or more, you have sync code blocking the loop. If you see no warnings but performance is still poor, the bottleneck might be outside asyncio entirely.
Minute 5 — Identify the framework’s runner model. This step is critical and often missed. If you are inside Django ASGI, FastAPI, or Quart, there is already a running event loop. Calling asyncio.run() from within that context raises RuntimeError. Calling loop.run_until_complete() also fails. Your first instinct — “wrap it in asyncio.run()” — is wrong here. Instead, use asyncio.to_thread() for sync-in-async, or await directly for async-in-async.
Minute 8 — Pinpoint the blocking call. Search your codebase for known synchronous patterns:
grep -rn "requests\.\(get\|post\|put\|delete\)\|time\.sleep\|open(" --include="*.py" src/Check every hit. If it appears inside an async def function without asyncio.to_thread() or run_in_executor() wrapping it, that is your blocker.
Minute 12 — Apply the correct fix. For blocking I/O, use asyncio.to_thread() (Python 3.9+) or switch to an async library. For CPU-bound work, use ProcessPoolExecutor. For time.sleep(), replace with await asyncio.sleep(). For asyncio.run() inside an already-running loop, use await directly or run_coroutine_threadsafe().
Minute 15 — Verify the fix. Re-run with PYTHONASYNCIODEBUG=1. The slow callback warnings should disappear. Load test with a tool like wrk or locust and confirm that concurrent requests no longer block each other.
Fix 1: Use asyncio.to_thread for Blocking I/O
asyncio.to_thread() (Python 3.9+) runs a synchronous function in a separate thread, freeing the event loop:
import asyncio
import requests # Synchronous HTTP library
# WRONG — blocks the event loop
async def fetch_data_wrong():
response = requests.get('https://api.example.com/data')
return response.json()
# CORRECT — run in a thread pool
async def fetch_data():
response = await asyncio.to_thread(requests.get, 'https://api.example.com/data')
return response.json()
# With keyword arguments
async def fetch_with_params():
response = await asyncio.to_thread(
requests.get,
'https://api.example.com/data',
timeout=30,
headers={'Authorization': 'Bearer token'}
)
return response.json()Python 3.8 and earlier — use loop.run_in_executor():
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
async def fetch_data():
loop = asyncio.get_event_loop()
response = await loop.run_in_executor(
executor,
lambda: requests.get('https://api.example.com/data')
)
return response.json()Better solution: Use an async HTTP library instead of running sync libraries in threads.
httpxwith async support oraiohttpare purpose-built for async code and don’t need thread pools.
Fix 2: Use Async Libraries Instead of Sync Ones
The best fix for blocking I/O is to use async-native libraries:
# SYNC (blocks event loop) → ASYNC alternative
# requests → httpx (async) or aiohttp
# psycopg2 (PostgreSQL) → asyncpg or psycopg3 (async)
# pymysql (MySQL) → aiomysql
# redis-py (sync) → redis.asyncio (included in redis-py v4+)
# pymongo (sync) → motor (async MongoDB)
# boto3 (sync) → aioboto3 or boto3 run_in_executor
# SQLAlchemy sync → SQLAlchemy async (1.4+) with asyncpg
# smtplib → aiosmtplib
# time.sleep() → await asyncio.sleep()HTTP requests with httpx:
import httpx
# WRONG — sync requests in async function
async def get_user_wrong(user_id: int):
response = requests.get(f'https://api.example.com/users/{user_id}')
return response.json()
# CORRECT — async httpx client
async def get_user(user_id: int):
async with httpx.AsyncClient() as client:
response = await client.get(f'https://api.example.com/users/{user_id}')
response.raise_for_status()
return response.json()
# Reuse client across requests (more efficient)
client = httpx.AsyncClient(timeout=30.0)
async def get_user_efficient(user_id: int):
response = await client.get(f'https://api.example.com/users/{user_id}')
return response.json()Database with SQLAlchemy async:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# WRONG — sync SQLAlchemy in async function
async def get_users_wrong():
db = SessionLocal() # Sync session
users = db.query(User).all() # Blocks event loop
return users
# CORRECT — async SQLAlchemy
async def get_users():
async with AsyncSessionLocal() as session:
result = await session.execute(select(User))
return result.scalars().all()Fix 3: Offload CPU-Bound Work to ProcessPoolExecutor
Threading doesn’t help for CPU-bound work (Python’s GIL limits parallelism). Use ProcessPoolExecutor for CPU-intensive tasks:
import asyncio
from concurrent.futures import ProcessPoolExecutor
# CPU-bound function — runs in a separate process (no GIL restriction)
def generate_pdf_sync(data: dict) -> bytes:
# Expensive CPU-bound work
return pdf_library.generate(data)
# Process pool — creates separate Python processes
process_pool = ProcessPoolExecutor(max_workers=4)
async def generate_report(data: dict) -> bytes:
loop = asyncio.get_event_loop()
# Run in a separate process — doesn't block the event loop
pdf_bytes = await loop.run_in_executor(process_pool, generate_pdf_sync, data)
return pdf_bytesFastAPI with CPU-bound tasks:
from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ProcessPoolExecutor
import asyncio
app = FastAPI()
executor = ProcessPoolExecutor(max_workers=4)
@app.post("/reports")
async def create_report(data: ReportData, background_tasks: BackgroundTasks):
# Don't block the request — generate report in background
background_tasks.add_task(generate_and_store_report, data)
return {"status": "Report generation started", "report_id": new_id}
async def generate_and_store_report(data: ReportData):
loop = asyncio.get_event_loop()
pdf = await loop.run_in_executor(executor, generate_pdf_sync, data.dict())
await store_report(pdf)Fix 4: Fix time.sleep() in Async Code
Replace all time.sleep() calls in coroutines with await asyncio.sleep():
import time
import asyncio
# WRONG — blocks the entire event loop
async def poll_status():
while True:
status = await check_status()
time.sleep(5) # All other coroutines paused for 5 seconds
# CORRECT — yields control back to event loop during sleep
async def poll_status():
while True:
status = await check_status()
await asyncio.sleep(5) # Other coroutines run during this wait
# WRONG in unit tests too
def test_async_function():
asyncio.run(async_function())
time.sleep(1) # Testing with real time — slow and unreliable
# Use pytest-asyncio and monkeypatching insteadFind all time.sleep() calls in your codebase:
grep -rn "time\.sleep\(" --include="*.py" .
# Review each one — any inside an async function is a bugFix 5: Detect Event Loop Blocking
Use asyncio’s debug mode and slow callback monitoring to find blocking calls:
import asyncio
import logging
# Enable asyncio debug mode
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
async def main():
loop = asyncio.get_event_loop()
# Warn when a callback takes longer than 100ms (blocks the loop)
loop.slow_callback_duration = 0.1 # seconds
# Enable debug mode — logs blocked callbacks
loop.set_debug(True)
await your_application()
asyncio.run(main())Or set via environment variable:
PYTHONASYNCIODEBUG=1 python server.py
# Logs:
# Executing <Task finished name='Task-1' coro=<slow_task() done>
# took 0.523 seconds ← Blocked event loop for 523msUse aiomonitor for runtime profiling:
pip install aiomonitor
import asyncio
import aiomonitor
async def main():
# Starts an aiomonitor server (telnet localhost 50101)
async with aiomonitor.start_monitor(asyncio.get_event_loop()):
await your_app()Fix 6: Fix asyncio.run() Called Inside Running Loop
asyncio.run() creates a new event loop — calling it from inside a running loop raises RuntimeError:
# WRONG — asyncio.run() inside async code
async def outer():
result = asyncio.run(inner()) # RuntimeError: This event loop is already running
# WRONG — in Jupyter notebooks (the kernel runs its own event loop)
asyncio.run(some_coroutine()) # RuntimeError in Jupyter# CORRECT — use await inside async functions
async def outer():
result = await inner() # await the coroutine directly
# CORRECT — for calling async from sync code in a running loop (e.g., Django sync view)
import asyncio
def sync_function_needing_async():
# Get or create an event loop
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# In a running loop — use run_coroutine_threadsafe
future = asyncio.run_coroutine_threadsafe(async_function(), loop)
return future.result(timeout=30)
else:
return loop.run_until_complete(async_function())
except RuntimeError:
# No current event loop — create one
return asyncio.run(async_function())
# CORRECT — in Jupyter notebooks, use await directly (Jupyter supports it)
result = await some_coroutine()
# Or install nest_asyncio for Jupyter
import nest_asyncio
nest_asyncio.apply()
asyncio.run(some_coroutine()) # Now works in JupyterFix 7: Async Database Sessions in FastAPI
A complete pattern for async SQLAlchemy with FastAPI:
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import select
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"
engine = create_async_engine(DATABASE_URL, echo=True, pool_size=20)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
app = FastAPI()
# Async database session dependency
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Routes use async session — no event loop blocking
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
users = result.scalars().all()
return users
@app.post("/users")
async def create_user(data: UserCreate, db: AsyncSession = Depends(get_db)):
user = User(**data.model_dump())
db.add(user)
await db.flush() # Get the generated ID without committing
return userStill Not Working?
Check if a third-party library is synchronous — many popular libraries (Stripe, Twilio SDKs, some ORMs) are synchronous. Check the library’s docs for async support or use asyncio.to_thread().
Async function doesn’t mean concurrent — declaring async def doesn’t make a function non-blocking. It only means the function can yield control at await points. A function with no await statements runs synchronously even if declared async.
Connection pool exhaustion — if every request opens a new async database connection, the pool fills up and new connections wait. Reuse connection pools across requests:
# WRONG — new engine per request
@app.get("/data")
async def get_data():
engine = create_async_engine(DATABASE_URL) # Don't create per request
...
# CORRECT — shared engine at module level
engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)asyncio.gather() for concurrent tasks — if you need to make multiple independent async calls, run them concurrently:
# Sequential — each await waits for the previous
user = await get_user(user_id)
orders = await get_orders(user_id)
permissions = await get_permissions(user_id)
# Total time: sum of all three
# Concurrent — all three run in parallel
user, orders, permissions = await asyncio.gather(
get_user(user_id),
get_orders(user_id),
get_permissions(user_id),
)
# Total time: max of the threeThread pool size limits — asyncio.to_thread() uses the default executor, which has a limited number of threads (usually min(32, os.cpu_count() + 4)). If you push hundreds of sync calls through to_thread(), they queue behind each other. Increase the pool size or, better, switch to an async library:
import asyncio
from concurrent.futures import ThreadPoolExecutor
# Increase the default executor's thread count
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=64))nest_asyncio in production is a code smell — nest_asyncio.apply() patches the event loop to allow nested asyncio.run() calls. It works in Jupyter and quick scripts, but in production it masks architectural problems. Refactor the code so you never need nested event loops.
Django async views still calling sync ORM — Django 4.1+ supports async def views, but the ORM is still synchronous. Calling MyModel.objects.all() inside an async def view blocks the event loop. Use sync_to_async from asgiref or await the Django async ORM wrapper (Django 5.0+):
from asgiref.sync import sync_to_async
async def my_view(request):
users = await sync_to_async(list)(User.objects.all())
return JsonResponse({"users": users})For related issues, see Fix: FastAPI Dependency Injection Error, Fix: Celery Task Not Executing, Fix: Python asyncio RuntimeError No Running Event Loop, and Fix: uvicorn Not Working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: FastAPI BackgroundTasks Not Working — Task Not Running or Dependency Errors
How to fix FastAPI BackgroundTasks — task not executing, dependency injection in tasks, error handling, Celery for heavy tasks, and lifespan-managed background workers.
Fix: Python asyncio.gather Not Handling Errors — Exceptions Swallowed or All Tasks Cancelled
How to fix asyncio.gather error handling — return_exceptions parameter, partial failures, task cancellation propagation, TaskGroup alternatives, and exception isolation patterns.
Fix: FastAPI Dependency Injection Errors — Dependencies Not Working
How to fix FastAPI dependency injection errors — async dependencies, database sessions, sub-dependencies, dependency overrides in tests, and common DI mistakes.
Fix: Pydantic ValidationError — Field Required / Value Not Valid
How to fix Pydantic ValidationError in Python — missing required fields, wrong types, custom validators, handling optional fields, v1 vs v2 API differences, and debugging complex nested models.