Skip to content

Fix: aiohttp Not Working — Session Leaks, ClientTimeout, and Connector Errors

FixDevs ·

Quick Answer

How to fix aiohttp errors — RuntimeError session is closed, ClientConnectorError connection refused, SSL verify failure, Unclosed client session warning, server websocket disconnect, and connector pool exhausted.

The Error

You reuse a ClientSession after it’s closed and Python raises:

RuntimeError: Session is closed

Or your script finishes with a warning that haunts every aiohttp user:

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f...>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f...>, 1.234)]']

Or connections fail behind a proxy:

aiohttp.client_exceptions.ClientConnectorError:
Cannot connect to host api.example.com:443 ssl:default
[Connect call failed ('93.184.216.34', 443)]

Or your HTTP server hangs under load:

async def handle(request):
    # Server freezes at ~100 concurrent requests
    result = await requests.get("https://...")   # Blocking call in async context!
    return web.json_response(result.json())

Or WebSocket clients disconnect immediately:

aiohttp.WSServerHandshakeError: Invalid response status

aiohttp is older than httpx — it’s been the async HTTP library for Python since the aiohttp 1.0 era. It has both a client (aiohttp.ClientSession) and a server (aiohttp.web) in one package. The session lifecycle, connector pooling, and proper error handling create specific pitfalls that newer libraries (httpx) have tried to simplify. This guide covers them.

Why This Happens

ClientSession holds a connection pool. Creating a session per request defeats the pool’s purpose and triggers Unclosed session warnings if you forget to close them. The correct pattern is one session shared across many requests, closed when you’re done — but this is easy to get wrong in scripts that mix sync and async code.

Connection pool exhaustion happens when you open more concurrent requests than the connector allows (default 100). Requests queue until a slot opens, appearing as hangs.

Fix 1: Always Use Context Manager for Sessions

import aiohttp
import asyncio

# WRONG — session never closed
async def fetch_bad(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    return await response.text()
    # Session leaks

# CORRECT — context manager closes it
async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

# Reuse one session across many requests
async def fetch_many(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        return await asyncio.gather(*tasks)

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

asyncio.run(fetch_many(["https://a.com", "https://b.com"]))

ClientSession() creates a TCP connection pool. Creating one per request means:

  • No connection reuse (slow)
  • Unclosed session warnings cluttering logs
  • Potential file descriptor exhaustion

Long-lived session pattern (for apps, not scripts):

import aiohttp

# At app startup
session = aiohttp.ClientSession()

# During request handling
async def call_api(url):
    async with session.get(url) as response:
        return await response.json()

# At app shutdown
async def cleanup():
    await session.close()

For FastAPI, use the lifespan context:

from contextlib import asynccontextmanager
from fastapi import FastAPI
import aiohttp

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.http = aiohttp.ClientSession()
    yield
    await app.state.http.close()

app = FastAPI(lifespan=lifespan)

@app.get("/proxy")
async def proxy(request):
    async with app.state.http.get("https://upstream.com/data") as response:
        return await response.json()

Common Mistake: Creating a new ClientSession() inside every request handler. Each session opens its own connection pool — the upstream server sees a flood of new TCP connections instead of reusing existing ones. Create the session once at startup and share it across requests.

Fix 2: ClientTimeout Configuration

Default timeouts are conservative but not always appropriate. Configure explicitly:

import aiohttp

# Simple timeout
timeout = aiohttp.ClientTimeout(total=30)   # 30s total

# Fine-grained
timeout = aiohttp.ClientTimeout(
    total=60,               # Total timeout for the request
    connect=5,              # Connection establishment
    sock_connect=5,         # Individual socket connection attempt
    sock_read=30,            # Socket read (between bytes)
)

async with aiohttp.ClientSession(timeout=timeout) as session:
    async with session.get("https://slow.example.com") as response:
        data = await response.json()

Per-request override:

async with session.get(
    "https://fast.example.com",
    timeout=aiohttp.ClientTimeout(total=5),
) as response:
    ...

Disable timeout (rarely appropriate):

timeout = aiohttp.ClientTimeout(total=None)   # No timeout

Catch specific timeout exceptions:

import aiohttp
import asyncio

try:
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
        data = await resp.json()
except asyncio.TimeoutError:
    print("Request timed out")
except aiohttp.ClientError as e:
    print(f"Client error: {e}")

aiohttp raises asyncio.TimeoutError (not aiohttp.ServerTimeoutError) on timeout. This tripped a lot of people up in older versions — it’s still asyncio’s timeout propagating up.

Fix 3: Connection Pool Tuning

import aiohttp

# Default connector: 100 total, 30 per host
connector = aiohttp.TCPConnector(
    limit=200,                # Total connections across all hosts
    limit_per_host=50,        # Connections to a single host
    ttl_dns_cache=300,         # DNS cache TTL (5 minutes)
    enable_cleanup_closed=True,  # Clean up closed connections aggressively
    force_close=False,         # Keep-alive (default)
)

async with aiohttp.ClientSession(connector=connector) as session:
    # All requests share the pool
    ...

Hit the per-host limit — requests queue silently:

# If limit_per_host=30 and you launch 100 requests to the same host,
# 30 go out immediately, 70 queue.
urls = ["https://api.example.com/item/1", "https://api.example.com/item/2", ...]

# Appears as a slow request, not an error
results = await asyncio.gather(*[session.get(u) for u in urls])

Raise the limit for legitimate high-concurrency workloads:

connector = aiohttp.TCPConnector(limit=1000, limit_per_host=200)

Or throttle explicitly with a semaphore:

import asyncio

sem = asyncio.Semaphore(50)   # Max 50 concurrent

async def fetch_with_limit(session, url):
    async with sem:
        async with session.get(url) as resp:
            return await resp.text()

Pro Tip: Use an asyncio Semaphore instead of just raising connector limits. Semaphores express your intent (“at most 50 concurrent requests”) rather than a pool limit side effect. Makes rate-limiting, timing, and debugging much clearer.

Fix 4: SSL and Certificates

aiohttp.client_exceptions.ClientConnectorCertificateError:
[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed

Disable SSL verification (development only):

import aiohttp
import ssl

# Disable verification
connector = aiohttp.TCPConnector(ssl=False)

# Or with custom SSL context
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
connector = aiohttp.TCPConnector(ssl=ssl_ctx)

async with aiohttp.ClientSession(connector=connector) as session:
    async with session.get("https://self-signed.example.com") as resp:
        ...

Use a custom CA bundle (corporate networks with MITM proxy):

import ssl

ssl_ctx = ssl.create_default_context(cafile="/path/to/corporate-ca.pem")
connector = aiohttp.TCPConnector(ssl=ssl_ctx)

async with aiohttp.ClientSession(connector=connector) as session:
    ...

SSL error by host — disable for specific calls only:

async with session.get("https://self-signed.example.com", ssl=False) as resp:
    ...

For general Python SSL certificate issues, see Python SSL certificate verify failed.

Fix 5: Avoid Blocking Calls in Async Context

from aiohttp import web
import requests   # SYNC library

async def handle(request):
    # WRONG — requests.get blocks the entire event loop
    resp = requests.get("https://api.example.com/data")
    return web.json_response(resp.json())

One blocking call in an async handler stops all other requests from being processed. Always use async libraries inside async code.

CORRECT — use aiohttp or httpx:

from aiohttp import web, ClientSession

async def handle(request):
    async with request.app["http"].get("https://api.example.com/data") as resp:
        data = await resp.json()
    return web.json_response(data)

async def init_app():
    app = web.Application()
    app["http"] = ClientSession()
    app.router.add_get("/", handle)
    return app

async def cleanup(app):
    await app["http"].close()

app = web.Application()
app.cleanup_ctx.append(lambda app: (yield))   # Pattern for setup/teardown

If you must use a blocking library (DB client, crypto operation), offload to an executor:

import asyncio

async def handle(request):
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(
        None,   # Default ThreadPoolExecutor
        slow_blocking_call,
        arg1, arg2,
    )
    return web.json_response(result)

Common Mistake: Using requests inside an async function and wondering why the server grinds to a halt under load. Every sync call blocks the event loop for its entire duration. Use aiohttp or httpx for HTTP, asyncpg for Postgres, motor for MongoDB, etc.

For async/sync mixing patterns, see Python async sync mix.

Fix 6: Basic aiohttp Server

from aiohttp import web

async def hello(request):
    name = request.match_info.get("name", "world")
    return web.Response(text=f"Hello, {name}!")

async def api_handler(request):
    data = await request.json()   # Parse JSON body
    return web.json_response({"received": data})

app = web.Application()
app.router.add_get("/", hello)
app.router.add_get("/hello/{name}", hello)
app.router.add_post("/api", api_handler)

if __name__ == "__main__":
    web.run_app(app, host="0.0.0.0", port=8080)

Middleware:

from aiohttp import web
import time

@web.middleware
async def logging_middleware(request, handler):
    start = time.perf_counter()
    try:
        response = await handler(request)
    except web.HTTPException as ex:
        return web.Response(status=ex.status, text=str(ex))
    elapsed = time.perf_counter() - start
    print(f"{request.method} {request.path}{response.status} in {elapsed:.3f}s")
    return response

app = web.Application(middlewares=[logging_middleware])

Application lifecycle hooks:

async def on_startup(app):
    app["db"] = await create_db_pool()

async def on_cleanup(app):
    await app["db"].close()

app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)

Request parsing:

async def handler(request):
    # Path params
    user_id = request.match_info["user_id"]

    # Query params
    limit = int(request.query.get("limit", 10))

    # JSON body
    body = await request.json()

    # Form data
    form = await request.post()

    # Raw bytes
    data = await request.read()

    # Streaming body
    async for chunk in request.content.iter_chunked(1024):
        process(chunk)

Fix 7: WebSocket Client and Server

Client:

import aiohttp
import asyncio

async def ws_client():
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect("wss://echo.websocket.org") as ws:
            await ws.send_str("hello")
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print("Received:", msg.data)
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break

asyncio.run(ws_client())

Server:

from aiohttp import web, WSMsgType

async def ws_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == WSMsgType.TEXT:
            if msg.data == "close":
                await ws.close()
            else:
                await ws.send_str(f"Echo: {msg.data}")
        elif msg.type == WSMsgType.ERROR:
            print(f"WS error: {ws.exception()}")

    return ws

app = web.Application()
app.router.add_get("/ws", ws_handler)
web.run_app(app)

Broadcasting to multiple clients:

from aiohttp import web, WSMsgType
from weakref import WeakSet

async def broadcast_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    request.app["websockets"].add(ws)
    try:
        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                for ws_client in request.app["websockets"]:
                    if not ws_client.closed:
                        await ws_client.send_str(msg.data)
    finally:
        request.app["websockets"].discard(ws)
    return ws

async def init():
    app = web.Application()
    app["websockets"] = WeakSet()
    app.router.add_get("/ws", broadcast_handler)
    return app

Fix 8: Proxies and Authentication

HTTP proxy:

import aiohttp

async with aiohttp.ClientSession() as session:
    async with session.get(
        "https://example.com",
        proxy="http://proxy.example.com:8080",
        proxy_auth=aiohttp.BasicAuth("user", "pass"),
    ) as resp:
        ...

Authentication:

# Basic auth
auth = aiohttp.BasicAuth("user", "pass")

async with aiohttp.ClientSession(auth=auth) as session:
    async with session.get("https://api.example.com") as resp:
        ...

# Bearer token via headers
headers = {"Authorization": "Bearer YOUR_TOKEN"}
async with aiohttp.ClientSession(headers=headers) as session:
    ...

Cookies persist within a session:

async with aiohttp.ClientSession() as session:
    # Login — server sets cookies
    await session.post("https://example.com/login", data={"user": "x", "pw": "y"})

    # Subsequent requests automatically include cookies
    async with session.get("https://example.com/profile") as resp:
        print(await resp.text())

Still Not Working?

aiohttp vs httpx

  • aiohttp — Older, async-first, has a full HTTP server built in. Lower-level control.
  • httpx — Newer, supports sync AND async, requests-compatible API. Simpler for client-only use.

For httpx-specific patterns that parallel aiohttp, see httpx not working.

Choose aiohttp when you need the server too (or already use the ecosystem). Choose httpx for new client-only code — the API is cleaner.

Testing aiohttp Servers

from aiohttp.test_utils import AioHTTPTestCase
from aiohttp import web

class MyTest(AioHTTPTestCase):
    async def get_application(self):
        app = web.Application()
        app.router.add_get("/", self.handler)
        return app

    async def handler(self, request):
        return web.Response(text="Hello")

    async def test_get(self):
        async with self.client.get("/") as resp:
            assert resp.status == 200
            assert await resp.text() == "Hello"

Or with pytest:

import pytest
from aiohttp.test_utils import TestClient, TestServer

@pytest.fixture
async def client(app):
    async with TestClient(TestServer(app)) as client:
        yield client

async def test_index(client):
    resp = await client.get("/")
    assert resp.status == 200

For pytest async fixture patterns, see pytest fixture not found.

Streaming Large Responses

Don’t load large responses into memory. Stream instead:

async with session.get("https://example.com/huge-file.zip") as response:
    with open("out.zip", "wb") as f:
        async for chunk in response.content.iter_chunked(8192):
            f.write(chunk)

For server-side streaming responses:

from aiohttp import web

async def stream_handler(request):
    response = web.StreamResponse()
    response.content_type = "text/plain"
    await response.prepare(request)

    for i in range(1000):
        await response.write(f"chunk {i}\n".encode())

    await response.write_eof()
    return response

Server-Sent Events (SSE)

from aiohttp import web
import asyncio

async def sse_handler(request):
    response = web.StreamResponse()
    response.headers["Content-Type"] = "text/event-stream"
    response.headers["Cache-Control"] = "no-cache"
    await response.prepare(request)

    for i in range(100):
        await response.write(f"data: event {i}\n\n".encode())
        await asyncio.sleep(1)

    return response

Useful for LLM streaming responses, progress updates, and long-poll alternatives — simpler than WebSockets when you only need server-to-client messaging.

Graceful Shutdown

from aiohttp import web

async def shutdown(app):
    print("Shutting down")
    # Close pools, flush data
    for ws in set(app.get("websockets", [])):
        await ws.close(code=1001, message="Server shutdown")

app = web.Application()
app.on_shutdown.append(shutdown)

on_shutdown runs before on_cleanup. Use on_shutdown to close active connections, on_cleanup for deeper resource release (DB pools, caches).

Integration with asyncio

aiohttp builds on asyncio — the same event loop, the same tasks. For asyncio-specific issues around event loops, gather, and run, see Python asyncio not running.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles