Skip to content

Fix: Python asyncio Blocking the Event Loop — Mixing Sync and Async Code

FixDevs · (Updated: )

Part of:  Python Errors

Quick Answer

How to fix Python asyncio event loop blocking — using run_in_executor for sync calls, asyncio.to_thread, avoiding blocking I/O in coroutines, and detecting event loop stalls.

The Problem

An async Python application becomes unresponsive under load:

async def handle_request(request):
    # This blocks the entire event loop for every request
    data = requests.get('https://api.example.com/data')  # Sync HTTP call
    return process(data.json())

Or an async route is slow despite using async def:

@app.get("/report")
async def generate_report():
    # Looks async, but calls a blocking CPU-bound function
    report = generate_pdf(data)  # Synchronous — blocks all other requests
    return {"report": report}

Or asyncio.run() called inside an already-running event loop:

RuntimeError: This event loop is already running.

Or mixing time.sleep() with async code:

async def poll():
    while True:
        await check_status()
        time.sleep(5)  # Blocks the event loop — no other coroutines run during sleep

Why This Happens

Python’s asyncio runs coroutines on a single-threaded event loop. The event loop can only do one thing at a time — it switches between coroutines at await points. Any synchronous (blocking) code that runs inside a coroutine holds the event loop hostage for its entire duration.

This design is intentional. Async code achieves concurrency by cooperative multitasking: each coroutine voluntarily yields at await, letting others run. When a coroutine calls a blocking function — one that never yields — every other coroutine in the application stalls until that call finishes. In a web server handling hundreds of requests, a single 200ms database query through a synchronous driver freezes all other responses for that same 200ms.

The confusion deepens because async def does not make a function non-blocking. Declaring a function with async def simply means it can contain await statements and returns a coroutine object. A function with zero await expressions runs synchronously even with the async def prefix — the event loop never gets a chance to switch away from it.

Common blocking patterns:

  • Blocking I/Orequests.get(), open() with standard file I/O, psycopg2 queries run synchronously. The event loop can’t switch to other coroutines while waiting.
  • CPU-bound operations — image processing, PDF generation, data transformation. Python’s GIL means only one thread runs Python code at a time — CPU-bound work in a coroutine blocks all others.
  • time.sleep() — blocks the thread, unlike await asyncio.sleep() which yields control back to the event loop.
  • Synchronous ORM calls — SQLAlchemy’s standard session is synchronous. Using it in an async route blocks the event loop.
  • asyncio.run() inside async codeasyncio.run() creates a new event loop. Calling it inside an already-running loop raises RuntimeError.

Diagnostic Timeline

When an async Python service slows down or hangs, the debugging path depends on which exact problem you have. Here is how to walk through it systematically.

Minute 0 — Reproduce and classify the symptom. You see one of two things: (a) a RuntimeError: This event loop is already running traceback, or (b) the application responds slowly or stops responding entirely with no traceback. For (a), skip to Fix 6. For (b), continue.

Minute 2 — Determine the direction of the mismatch. Are you calling sync code from inside an async function (the most common case), or calling async code from a sync context? Run PYTHONASYNCIODEBUG=1 python server.py and look for slow callback warnings. If the debug output says a callback took 0.5 seconds or more, you have sync code blocking the loop. If you see no warnings but performance is still poor, the bottleneck might be outside asyncio entirely.

Minute 5 — Identify the framework’s runner model. This step is critical and often missed. If you are inside Django ASGI, FastAPI, or Quart, there is already a running event loop. Calling asyncio.run() from within that context raises RuntimeError. Calling loop.run_until_complete() also fails. Your first instinct — “wrap it in asyncio.run()” — is wrong here. Instead, use asyncio.to_thread() for sync-in-async, or await directly for async-in-async.

Minute 8 — Pinpoint the blocking call. Search your codebase for known synchronous patterns:

grep -rn "requests\.\(get\|post\|put\|delete\)\|time\.sleep\|open(" --include="*.py" src/

Check every hit. If it appears inside an async def function without asyncio.to_thread() or run_in_executor() wrapping it, that is your blocker.

Minute 12 — Apply the correct fix. For blocking I/O, use asyncio.to_thread() (Python 3.9+) or switch to an async library. For CPU-bound work, use ProcessPoolExecutor. For time.sleep(), replace with await asyncio.sleep(). For asyncio.run() inside an already-running loop, use await directly or run_coroutine_threadsafe().

Minute 15 — Verify the fix. Re-run with PYTHONASYNCIODEBUG=1. The slow callback warnings should disappear. Load test with a tool like wrk or locust and confirm that concurrent requests no longer block each other.

Fix 1: Use asyncio.to_thread for Blocking I/O

asyncio.to_thread() (Python 3.9+) runs a synchronous function in a separate thread, freeing the event loop:

import asyncio
import requests  # Synchronous HTTP library

# WRONG — blocks the event loop
async def fetch_data_wrong():
    response = requests.get('https://api.example.com/data')
    return response.json()

# CORRECT — run in a thread pool
async def fetch_data():
    response = await asyncio.to_thread(requests.get, 'https://api.example.com/data')
    return response.json()

# With keyword arguments
async def fetch_with_params():
    response = await asyncio.to_thread(
        requests.get,
        'https://api.example.com/data',
        timeout=30,
        headers={'Authorization': 'Bearer token'}
    )
    return response.json()

Python 3.8 and earlier — use loop.run_in_executor():

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)

async def fetch_data():
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(
        executor,
        lambda: requests.get('https://api.example.com/data')
    )
    return response.json()

Better solution: Use an async HTTP library instead of running sync libraries in threads. httpx with async support or aiohttp are purpose-built for async code and don’t need thread pools.

Fix 2: Use Async Libraries Instead of Sync Ones

The best fix for blocking I/O is to use async-native libraries:

# SYNC (blocks event loop)       → ASYNC alternative
# requests                       → httpx (async) or aiohttp
# psycopg2 (PostgreSQL)          → asyncpg or psycopg3 (async)
# pymysql (MySQL)                → aiomysql
# redis-py (sync)                → redis.asyncio (included in redis-py v4+)
# pymongo (sync)                 → motor (async MongoDB)
# boto3 (sync)                   → aioboto3 or boto3 run_in_executor
# SQLAlchemy sync                → SQLAlchemy async (1.4+) with asyncpg
# smtplib                        → aiosmtplib
# time.sleep()                   → await asyncio.sleep()

HTTP requests with httpx:

import httpx

# WRONG — sync requests in async function
async def get_user_wrong(user_id: int):
    response = requests.get(f'https://api.example.com/users/{user_id}')
    return response.json()

# CORRECT — async httpx client
async def get_user(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f'https://api.example.com/users/{user_id}')
        response.raise_for_status()
        return response.json()

# Reuse client across requests (more efficient)
client = httpx.AsyncClient(timeout=30.0)

async def get_user_efficient(user_id: int):
    response = await client.get(f'https://api.example.com/users/{user_id}')
    return response.json()

Database with SQLAlchemy async:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine('postgresql+asyncpg://user:pass@localhost/db')
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# WRONG — sync SQLAlchemy in async function
async def get_users_wrong():
    db = SessionLocal()          # Sync session
    users = db.query(User).all() # Blocks event loop
    return users

# CORRECT — async SQLAlchemy
async def get_users():
    async with AsyncSessionLocal() as session:
        result = await session.execute(select(User))
        return result.scalars().all()

Fix 3: Offload CPU-Bound Work to ProcessPoolExecutor

Threading doesn’t help for CPU-bound work (Python’s GIL limits parallelism). Use ProcessPoolExecutor for CPU-intensive tasks:

import asyncio
from concurrent.futures import ProcessPoolExecutor

# CPU-bound function — runs in a separate process (no GIL restriction)
def generate_pdf_sync(data: dict) -> bytes:
    # Expensive CPU-bound work
    return pdf_library.generate(data)

# Process pool — creates separate Python processes
process_pool = ProcessPoolExecutor(max_workers=4)

async def generate_report(data: dict) -> bytes:
    loop = asyncio.get_event_loop()
    # Run in a separate process — doesn't block the event loop
    pdf_bytes = await loop.run_in_executor(process_pool, generate_pdf_sync, data)
    return pdf_bytes

FastAPI with CPU-bound tasks:

from fastapi import FastAPI, BackgroundTasks
from concurrent.futures import ProcessPoolExecutor
import asyncio

app = FastAPI()
executor = ProcessPoolExecutor(max_workers=4)

@app.post("/reports")
async def create_report(data: ReportData, background_tasks: BackgroundTasks):
    # Don't block the request — generate report in background
    background_tasks.add_task(generate_and_store_report, data)
    return {"status": "Report generation started", "report_id": new_id}

async def generate_and_store_report(data: ReportData):
    loop = asyncio.get_event_loop()
    pdf = await loop.run_in_executor(executor, generate_pdf_sync, data.dict())
    await store_report(pdf)

Fix 4: Fix time.sleep() in Async Code

Replace all time.sleep() calls in coroutines with await asyncio.sleep():

import time
import asyncio

# WRONG — blocks the entire event loop
async def poll_status():
    while True:
        status = await check_status()
        time.sleep(5)  # All other coroutines paused for 5 seconds

# CORRECT — yields control back to event loop during sleep
async def poll_status():
    while True:
        status = await check_status()
        await asyncio.sleep(5)  # Other coroutines run during this wait

# WRONG in unit tests too
def test_async_function():
    asyncio.run(async_function())
    time.sleep(1)   # Testing with real time — slow and unreliable
    # Use pytest-asyncio and monkeypatching instead

Find all time.sleep() calls in your codebase:

grep -rn "time\.sleep\(" --include="*.py" .
# Review each one — any inside an async function is a bug

Fix 5: Detect Event Loop Blocking

Use asyncio’s debug mode and slow callback monitoring to find blocking calls:

import asyncio
import logging

# Enable asyncio debug mode
asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())

async def main():
    loop = asyncio.get_event_loop()

    # Warn when a callback takes longer than 100ms (blocks the loop)
    loop.slow_callback_duration = 0.1  # seconds

    # Enable debug mode — logs blocked callbacks
    loop.set_debug(True)

    await your_application()

asyncio.run(main())

Or set via environment variable:

PYTHONASYNCIODEBUG=1 python server.py
# Logs:
# Executing <Task finished name='Task-1' coro=<slow_task() done>
# took 0.523 seconds  ← Blocked event loop for 523ms

Use aiomonitor for runtime profiling:

pip install aiomonitor

import asyncio
import aiomonitor

async def main():
    # Starts an aiomonitor server (telnet localhost 50101)
    async with aiomonitor.start_monitor(asyncio.get_event_loop()):
        await your_app()

Fix 6: Fix asyncio.run() Called Inside Running Loop

asyncio.run() creates a new event loop — calling it from inside a running loop raises RuntimeError:

# WRONG — asyncio.run() inside async code
async def outer():
    result = asyncio.run(inner())  # RuntimeError: This event loop is already running

# WRONG — in Jupyter notebooks (the kernel runs its own event loop)
asyncio.run(some_coroutine())  # RuntimeError in Jupyter
# CORRECT — use await inside async functions
async def outer():
    result = await inner()  # await the coroutine directly

# CORRECT — for calling async from sync code in a running loop (e.g., Django sync view)
import asyncio

def sync_function_needing_async():
    # Get or create an event loop
    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # In a running loop — use run_coroutine_threadsafe
            future = asyncio.run_coroutine_threadsafe(async_function(), loop)
            return future.result(timeout=30)
        else:
            return loop.run_until_complete(async_function())
    except RuntimeError:
        # No current event loop — create one
        return asyncio.run(async_function())

# CORRECT — in Jupyter notebooks, use await directly (Jupyter supports it)
result = await some_coroutine()

# Or install nest_asyncio for Jupyter
import nest_asyncio
nest_asyncio.apply()
asyncio.run(some_coroutine())  # Now works in Jupyter

Fix 7: Async Database Sessions in FastAPI

A complete pattern for async SQLAlchemy with FastAPI:

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import select

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/mydb"

engine = create_async_engine(DATABASE_URL, echo=True, pool_size=20)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

app = FastAPI()

# Async database session dependency
async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# Routes use async session — no event loop blocking
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User))
    users = result.scalars().all()
    return users

@app.post("/users")
async def create_user(data: UserCreate, db: AsyncSession = Depends(get_db)):
    user = User(**data.model_dump())
    db.add(user)
    await db.flush()  # Get the generated ID without committing
    return user

Still Not Working?

Check if a third-party library is synchronous — many popular libraries (Stripe, Twilio SDKs, some ORMs) are synchronous. Check the library’s docs for async support or use asyncio.to_thread().

Async function doesn’t mean concurrent — declaring async def doesn’t make a function non-blocking. It only means the function can yield control at await points. A function with no await statements runs synchronously even if declared async.

Connection pool exhaustion — if every request opens a new async database connection, the pool fills up and new connections wait. Reuse connection pools across requests:

# WRONG — new engine per request
@app.get("/data")
async def get_data():
    engine = create_async_engine(DATABASE_URL)  # Don't create per request
    ...

# CORRECT — shared engine at module level
engine = create_async_engine(DATABASE_URL, pool_size=20, max_overflow=10)

asyncio.gather() for concurrent tasks — if you need to make multiple independent async calls, run them concurrently:

# Sequential — each await waits for the previous
user = await get_user(user_id)
orders = await get_orders(user_id)
permissions = await get_permissions(user_id)
# Total time: sum of all three

# Concurrent — all three run in parallel
user, orders, permissions = await asyncio.gather(
    get_user(user_id),
    get_orders(user_id),
    get_permissions(user_id),
)
# Total time: max of the three

Thread pool size limitsasyncio.to_thread() uses the default executor, which has a limited number of threads (usually min(32, os.cpu_count() + 4)). If you push hundreds of sync calls through to_thread(), they queue behind each other. Increase the pool size or, better, switch to an async library:

import asyncio
from concurrent.futures import ThreadPoolExecutor

# Increase the default executor's thread count
loop = asyncio.get_event_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=64))

nest_asyncio in production is a code smellnest_asyncio.apply() patches the event loop to allow nested asyncio.run() calls. It works in Jupyter and quick scripts, but in production it masks architectural problems. Refactor the code so you never need nested event loops.

Django async views still calling sync ORM — Django 4.1+ supports async def views, but the ORM is still synchronous. Calling MyModel.objects.all() inside an async def view blocks the event loop. Use sync_to_async from asgiref or await the Django async ORM wrapper (Django 5.0+):

from asgiref.sync import sync_to_async

async def my_view(request):
    users = await sync_to_async(list)(User.objects.all())
    return JsonResponse({"users": users})

For related issues, see Fix: FastAPI Dependency Injection Error, Fix: Celery Task Not Executing, Fix: Python asyncio RuntimeError No Running Event Loop, and Fix: uvicorn Not Working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles