Skip to content

Fix: aiohttp Not Working — Session Leaks, ClientTimeout, and Connector Errors

FixDevs · (Updated: )

Part of:  Python Errors

Quick Answer

How to fix aiohttp errors — RuntimeError session is closed, ClientConnectorError connection refused, SSL verify failure, Unclosed client session warning, server websocket disconnect, and connector pool exhausted.

The Error

You reuse a ClientSession after it’s closed and Python raises:

RuntimeError: Session is closed

Or your script finishes with a warning that haunts every aiohttp user:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f...>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f...>, 1.234)]']

Or connections fail behind a proxy:

aiohttp.client_exceptions.ClientConnectorError:
Cannot connect to host api.example.com:443 ssl:default
[Connect call failed ('93.184.216.34', 443)]

Or your HTTP server hangs under load:

async def handle(request):
    # Server freezes at ~100 concurrent requests
    result = await requests.get("https://...")   # Blocking call in async context!
    return web.json_response(result.json())

Or WebSocket clients disconnect immediately:

aiohttp.WSServerHandshakeError: Invalid response status

aiohttp is older than httpx — it’s been the async HTTP library for Python since the aiohttp 1.0 era. It has both a client (aiohttp.ClientSession) and a server (aiohttp.web) in one package. The session lifecycle, connector pooling, and proper error handling create specific pitfalls that newer libraries (httpx) have tried to simplify. This guide covers them.

Why This Happens

ClientSession holds a connection pool. Creating a session per request defeats the pool’s purpose and triggers Unclosed session warnings if you forget to close them. The correct pattern is one session shared across many requests, closed when you’re done — but this is easy to get wrong in scripts that mix sync and async code.

Connection pool exhaustion happens when you open more concurrent requests than the connector allows (default 100). Requests queue until a slot opens, appearing as hangs.

Fix 1: Always Use Context Manager for Sessions

import aiohttp
import asyncio

# WRONG — session never closed
async def fetch_bad(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    return await response.text()
    # Session leaks

# CORRECT — context manager closes it
async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# Reuse one session across many requests
async def fetch_many(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

asyncio.run(fetch_many(["https://a.com", "https://b.com"]))

ClientSession() creates a TCP connection pool. Creating one per request means:

  • No connection reuse (slow)
  • Unclosed session warnings cluttering logs
  • Potential file descriptor exhaustion

Long-lived session pattern (for apps, not scripts):

import aiohttp

# At app startup
session = aiohttp.ClientSession()

# During request handling
async def call_api(url):
    async with session.get(url) as response:
        return await response.json()

# At app shutdown
async def cleanup():
    await session.close()

For FastAPI, use the lifespan context:

from contextlib import asynccontextmanager
from fastapi import FastAPI
import aiohttp

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.http = aiohttp.ClientSession()
    yield
    await app.state.http.close()

app = FastAPI(lifespan=lifespan)

@app.get("/proxy")
async def proxy(request):
    async with app.state.http.get("https://upstream.com/data") as response:
        return await response.json()

Common Mistake: Creating a new ClientSession() inside every request handler. Each session opens its own connection pool — the upstream server sees a flood of new TCP connections instead of reusing existing ones. Create the session once at startup and share it across requests.

Fix 2: ClientTimeout Configuration

Default timeouts are conservative but not always appropriate. Configure explicitly:

import aiohttp

# Simple timeout
timeout = aiohttp.ClientTimeout(total=30)   # 30s total

# Fine-grained
timeout = aiohttp.ClientTimeout(
    total=60,               # Total timeout for the request
    connect=5,              # Connection establishment
    sock_connect=5,         # Individual socket connection attempt
    sock_read=30,            # Socket read (between bytes)
)

async with aiohttp.ClientSession(timeout=timeout) as session:
    async with session.get("https://slow.example.com") as response:
        data = await response.json()

Per-request override:

async with session.get(
    "https://fast.example.com",
    timeout=aiohttp.ClientTimeout(total=5),
) as response:
    ...

Disable timeout (rarely appropriate):

timeout = aiohttp.ClientTimeout(total=None)   # No timeout

Catch specific timeout exceptions:

import aiohttp
import asyncio

try:
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
        data = await resp.json()
except asyncio.TimeoutError:
    print("Request timed out")
except aiohttp.ClientError as e:
    print(f"Client error: {e}")

aiohttp raises asyncio.TimeoutError (not aiohttp.ServerTimeoutError) on timeout. This tripped a lot of people up in older versions — it’s still asyncio’s timeout propagating up.

Fix 3: Connection Pool Tuning

import aiohttp

# Default connector: 100 total, 30 per host
connector = aiohttp.TCPConnector(
    limit=200,                # Total connections across all hosts
    limit_per_host=50,        # Connections to a single host
    ttl_dns_cache=300,         # DNS cache TTL (5 minutes)
    enable_cleanup_closed=True,  # Clean up closed connections aggressively
    force_close=False,         # Keep-alive (default)
)

async with aiohttp.ClientSession(connector=connector) as session:
    # All requests share the pool
    ...

Hit the per-host limit — requests queue silently:

# If limit_per_host=30 and you launch 100 requests to the same host,
# 30 go out immediately, 70 queue.
urls = ["https://api.example.com/item/1", "https://api.example.com/item/2", ...]

# Appears as a slow request, not an error
results = await asyncio.gather(*[session.get(u) for u in urls])

Raise the limit for legitimate high-concurrency workloads:

connector = aiohttp.TCPConnector(limit=1000, limit_per_host=200)

Or throttle explicitly with a semaphore:

import asyncio

sem = asyncio.Semaphore(50)   # Max 50 concurrent

async def fetch_with_limit(session, url):
    async with sem:
        async with session.get(url) as resp:
            return await resp.text()

Pro Tip: Use an asyncio Semaphore instead of just raising connector limits. Semaphores express your intent (“at most 50 concurrent requests”) rather than a pool limit side effect. Makes rate-limiting, timing, and debugging much clearer.

Fix 4: SSL and Certificates

aiohttp.client_exceptions.ClientConnectorCertificateError:
[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed

Disable SSL verification (development only):

import aiohttp
import ssl

# Disable verification
connector = aiohttp.TCPConnector(ssl=False)

# Or with custom SSL context
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
connector = aiohttp.TCPConnector(ssl=ssl_ctx)

async with aiohttp.ClientSession(connector=connector) as session:
    async with session.get("https://self-signed.example.com") as resp:
        ...

Use a custom CA bundle (corporate networks with MITM proxy):

import ssl

ssl_ctx = ssl.create_default_context(cafile="/path/to/corporate-ca.pem")
connector = aiohttp.TCPConnector(ssl=ssl_ctx)

async with aiohttp.ClientSession(connector=connector) as session:
    ...

SSL error by host — disable for specific calls only:

async with session.get("https://self-signed.example.com", ssl=False) as resp:
    ...

For general Python SSL certificate issues, see Python SSL certificate verify failed.

Fix 5: Avoid Blocking Calls in Async Context

from aiohttp import web
import requests   # SYNC library

async def handle(request):
    # WRONG — requests.get blocks the entire event loop
    resp = requests.get("https://api.example.com/data")
    return web.json_response(resp.json())

One blocking call in an async handler stops all other requests from being processed. Always use async libraries inside async code.

CORRECT — use aiohttp or httpx:

from aiohttp import web, ClientSession

async def handle(request):
    async with request.app["http"].get("https://api.example.com/data") as resp:
        data = await resp.json()
    return web.json_response(data)

async def init_app():
    app = web.Application()
    app["http"] = ClientSession()
    app.router.add_get("/", handle)
    return app

async def cleanup(app):
    await app["http"].close()

app = web.Application()
app.cleanup_ctx.append(lambda app: (yield))   # Pattern for setup/teardown

If you must use a blocking library (DB client, crypto operation), offload to an executor:

import asyncio

async def handle(request):
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(
        None,   # Default ThreadPoolExecutor
        slow_blocking_call,
        arg1, arg2,
    )
    return web.json_response(result)

Common Mistake: Using requests inside an async function and wondering why the server grinds to a halt under load. Every sync call blocks the event loop for its entire duration. Use aiohttp or httpx for HTTP, asyncpg for Postgres, motor for MongoDB, etc.

Use aiohttp or httpx for HTTP, asyncpg for Postgres, motor for MongoDB — anything that does I/O needs an async version inside an async handler.

Fix 6: Basic aiohttp Server

from aiohttp import web

async def hello(request):
    name = request.match_info.get("name", "world")
    return web.Response(text=f"Hello, {name}!")

async def api_handler(request):
    data = await request.json()   # Parse JSON body
    return web.json_response({"received": data})

app = web.Application()
app.router.add_get("/", hello)
app.router.add_get("/hello/{name}", hello)
app.router.add_post("/api", api_handler)

if __name__ == "__main__":
    web.run_app(app, host="0.0.0.0", port=8080)

Middleware:

from aiohttp import web
import time

@web.middleware
async def logging_middleware(request, handler):
    start = time.perf_counter()
    try:
        response = await handler(request)
    except web.HTTPException as ex:
        return web.Response(status=ex.status, text=str(ex))
    elapsed = time.perf_counter() - start
    print(f"{request.method} {request.path}{response.status} in {elapsed:.3f}s")
    return response

app = web.Application(middlewares=[logging_middleware])

Application lifecycle hooks:

async def on_startup(app):
    app["db"] = await create_db_pool()

async def on_cleanup(app):
    await app["db"].close()

app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)

Request parsing:

async def handler(request):
    # Path params
    user_id = request.match_info["user_id"]

    # Query params
    limit = int(request.query.get("limit", 10))

    # JSON body
    body = await request.json()

    # Form data
    form = await request.post()

    # Raw bytes
    data = await request.read()

    # Streaming body
    async for chunk in request.content.iter_chunked(1024):
        process(chunk)

Fix 7: WebSocket Client and Server

Client:

import aiohttp
import asyncio

async def ws_client():
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect("wss://echo.websocket.org") as ws:
            await ws.send_str("hello")
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print("Received:", msg.data)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break

asyncio.run(ws_client())

Server:

from aiohttp import web, WSMsgType

async def ws_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            if msg.data == "close":
                await ws.close()
            else:
                await ws.send_str(f"Echo: {msg.data}")
        elif msg.type == WSMsgType.ERROR:
            print(f"WS error: {ws.exception()}")

    return ws

app = web.Application()
app.router.add_get("/ws", ws_handler)
web.run_app(app)

Broadcasting to multiple clients:

from aiohttp import web, WSMsgType
from weakref import WeakSet

async def broadcast_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    request.app["websockets"].add(ws)
    try:
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                for ws_client in request.app["websockets"]:
                    if not ws_client.closed:
                        await ws_client.send_str(msg.data)
    finally:
        request.app["websockets"].discard(ws)
    return ws

async def init():
    app = web.Application()
    app["websockets"] = WeakSet()
    app.router.add_get("/ws", broadcast_handler)
    return app

Fix 8: Proxies and Authentication

HTTP proxy:

import aiohttp

async with aiohttp.ClientSession() as session:
    async with session.get(
        "https://example.com",
        proxy="http://proxy.example.com:8080",
        proxy_auth=aiohttp.BasicAuth("user", "pass"),
    ) as resp:
        ...

Authentication:

# Basic auth
auth = aiohttp.BasicAuth("user", "pass")

async with aiohttp.ClientSession(auth=auth) as session:
    async with session.get("https://api.example.com") as resp:
        ...

# Bearer token via headers
headers = {"Authorization": "Bearer YOUR_TOKEN"}
async with aiohttp.ClientSession(headers=headers) as session:
    ...

Cookies persist within a session:

async with aiohttp.ClientSession() as session:
    # Login — server sets cookies
    await session.post("https://example.com/login", data={"user": "x", "pw": "y"})

    # Subsequent requests automatically include cookies
    async with session.get("https://example.com/profile") as resp:
        print(await resp.text())

Still Not Working?

aiohttp vs Other Async HTTP Frameworks

aiohttp is rarely the only option for an async HTTP problem. The decision usually comes down to whether you need a server too, which async library powers the rest of your app, and how much migration cost you’re willing to absorb.

aiohttp vs httpx async. Both target async client work. aiohttp’s ClientSession and httpx’s AsyncClient have similar APIs — context-manager-driven, connection-pooled, timeout-configurable. Differences that matter: httpx has a sync API too (so you can share code between sync scripts and async services), httpx supports HTTP/2 out of the box, and httpx is AnyIO-compatible (trio backend). aiohttp is purely async and asyncio-only, but its server (aiohttp.web) is mature and there’s no equivalent in httpx. Pick aiohttp when you need server + client in one package, httpx for client-only modern apps. For httpx-specific patterns, see httpx not working.

aiohttp vs Trio + asks. Trio is an alternative async framework with a structured-concurrency model (nurseries). The asks library is its HTTP client. If you’re using Trio for reasoning-about-concurrency reasons (no orphan tasks, deterministic shutdown), asks is the matching HTTP client. aiohttp doesn’t run on Trio without a bridge. For most projects asyncio + aiohttp wins on ecosystem depth; pick Trio only if structured concurrency is a hard requirement.

aiohttp vs FastAPI’s TestClient. FastAPI’s TestClient wraps httpx and runs against a FastAPI app in-process. If you only need to test FastAPI endpoints, TestClient is the right tool — it handles app startup, dependency overrides, and async test orchestration. aiohttp’s AioHTTPTestCase is the equivalent for aiohttp servers. Don’t try to test a FastAPI app with aiohttp’s test utilities or vice versa — the lifecycle hooks differ.

aiohttp vs Tornado. Tornado is older than both aiohttp and asyncio. It has its own IOLoop, its own HTTP client (AsyncHTTPClient), and its own web framework. The reason to use Tornado today is legacy code or specific Tornado-only features (WebSocket compression, granular IOLoop control). For new code, aiohttp or FastAPI is simpler.

Server + Client framework comparison:

FrameworkAsync clientAsync serverBackendBest for
aiohttpYesYes (web)asyncioOne-package full stack
httpx + FastAPIYes (httpx)Yes (FastAPI)asyncio, AnyIOModern API services
Trio + asks + HypercornYes (asks)Yes (Hypercorn)trioStructured concurrency
TornadoYes (AsyncHTTPClient)Yesown IOLoopLegacy / WebSocket-heavy

Pro Tip: If you’re picking from scratch in 2025+, FastAPI for the server + httpx for the client is the modern default. Choose aiohttp specifically when (a) you don’t want the FastAPI dependency tree, (b) you’re already in the aiohttp ecosystem, or (c) you need fine-grained control over the WebSocket server.

Testing aiohttp Servers

from aiohttp.test_utils import AioHTTPTestCase
from aiohttp import web

class MyTest(AioHTTPTestCase):
    async def get_application(self):
        app = web.Application()
        app.router.add_get("/", self.handler)
        return app

    async def handler(self, request):
        return web.Response(text="Hello")

    async def test_get(self):
        async with self.client.get("/") as resp:
            assert resp.status == 200
            assert await resp.text() == "Hello"

Or with pytest:

import pytest
from aiohttp.test_utils import TestClient, TestServer

@pytest.fixture
async def client(app):
    async with TestClient(TestServer(app)) as client:
        yield client

async def test_index(client):
    resp = await client.get("/")
    assert resp.status == 200

For pytest async fixture patterns, see pytest fixture not found.

Streaming Large Responses

Don’t load large responses into memory. Stream instead:

async with session.get("https://example.com/huge-file.zip") as response:
    with open("out.zip", "wb") as f:
        async for chunk in response.content.iter_chunked(8192):
            f.write(chunk)

For server-side streaming responses:

from aiohttp import web

async def stream_handler(request):
    response = web.StreamResponse()
    response.content_type = "text/plain"
    await response.prepare(request)

    for i in range(1000):
        await response.write(f"chunk {i}\n".encode())

    await response.write_eof()
    return response

Server-Sent Events (SSE)

from aiohttp import web
import asyncio

async def sse_handler(request):
    response = web.StreamResponse()
    response.headers["Content-Type"] = "text/event-stream"
    response.headers["Cache-Control"] = "no-cache"
    await response.prepare(request)

    for i in range(100):
        await response.write(f"data: event {i}\n\n".encode())
        await asyncio.sleep(1)

    return response

Useful for LLM streaming responses, progress updates, and long-poll alternatives — simpler than WebSockets when you only need server-to-client messaging.

Graceful Shutdown

from aiohttp import web

async def shutdown(app):
    print("Shutting down")
    # Close pools, flush data
    for ws in set(app.get("websockets", [])):
        await ws.close(code=1001, message="Server shutdown")

app = web.Application()
app.on_shutdown.append(shutdown)

on_shutdown runs before on_cleanup. Use on_shutdown to close active connections, on_cleanup for deeper resource release (DB pools, caches).

Integration with asyncio

aiohttp builds on asyncio — the same event loop, the same tasks. For asyncio-specific issues around event loops, gather, and run, see Python asyncio not running.

Diagnosing Connector Pool Exhaustion in Production

Pool exhaustion shows up as “everything got slower” rather than as an error. The connector queues requests silently when the per-host limit is hit. To verify it’s the cause:

import aiohttp

connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)

async def diagnose():
    async with aiohttp.ClientSession(connector=connector) as session:
        # Periodic snapshot
        async def log_pool():
            while True:
                await asyncio.sleep(5)
                print(f"acquired: {len(connector._acquired)}, "
                      f"limit: {connector.limit}, "
                      f"per_host: {connector.limit_per_host}")
        asyncio.create_task(log_pool())
        # ... your real workload ...

If len(connector._acquired) is stuck at limit_per_host for the same host while new requests pile up, raise the per-host limit or split traffic across multiple hosts. The _acquired attribute is private but stable enough for diagnostics.

Migrating From requests in an Async Refactor

The most common cause of aiohttp errors in newly-async code is half-migrated handlers — one requests.get left over inside an async def. To audit a codebase:

# Find sync HTTP calls inside async def
grep -rn "requests\." src/ --include="*.py" -B 5 | grep -A 5 "async def"

Replace each one with an aiohttp session.get using the shared application session. Don’t create a new ClientSession per call — that defeats the connection pool (Fix 1).

Handling Server-Side Backpressure

When your aiohttp server is overwhelmed, returning 503 Service Unavailable with a Retry-After header is more correct than letting connections pile up:

from aiohttp import web

SEMAPHORE = asyncio.Semaphore(100)   # Max 100 concurrent handlers

@web.middleware
async def backpressure(request, handler):
    if SEMAPHORE.locked():
        return web.json_response(
            {"error": "overloaded"},
            status=503,
            headers={"Retry-After": "10"},
        )
    async with SEMAPHORE:
        return await handler(request)

app = web.Application(middlewares=[backpressure])

This bounds concurrent work and gives upstream clients a clear signal to back off. Better than silent timeouts that leave clients hanging — and better than a crashed process.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles