Fix: aiohttp Not Working — Session Leaks, ClientTimeout, and Connector Errors
Quick Answer
How to fix aiohttp errors — RuntimeError session is closed, ClientConnectorError connection refused, SSL verify failure, Unclosed client session warning, server websocket disconnect, and connector pool exhausted.
The Error
You reuse a ClientSession after it’s closed and Python raises:
RuntimeError: Session is closedOr your script finishes with a warning that haunts every aiohttp user:
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f...>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f...>, 1.234)]']Or connections fail behind a proxy:
aiohttp.client_exceptions.ClientConnectorError:
Cannot connect to host api.example.com:443 ssl:default
[Connect call failed ('93.184.216.34', 443)]Or your HTTP server hangs under load:
async def handle(request):
# Server freezes at ~100 concurrent requests
result = await requests.get("https://...") # Blocking call in async context!
return web.json_response(result.json())Or WebSocket clients disconnect immediately:
aiohttp.WSServerHandshakeError: Invalid response statusaiohttp is older than httpx — it’s been the async HTTP library for Python since the aiohttp 1.0 era. It has both a client (aiohttp.ClientSession) and a server (aiohttp.web) in one package. The session lifecycle, connector pooling, and proper error handling create specific pitfalls that newer libraries (httpx) have tried to simplify. This guide covers them.
Why This Happens
ClientSession holds a connection pool. Creating a session per request defeats the pool’s purpose and triggers Unclosed session warnings if you forget to close them. The correct pattern is one session shared across many requests, closed when you’re done — but this is easy to get wrong in scripts that mix sync and async code.
Connection pool exhaustion happens when you open more concurrent requests than the connector allows (default 100). Requests queue until a slot opens, appearing as hangs.
Fix 1: Always Use Context Manager for Sessions
import aiohttp
import asyncio
# WRONG — session never closed
async def fetch_bad(url):
session = aiohttp.ClientSession()
response = await session.get(url)
return await response.text()
# Session leaks
# CORRECT — context manager closes it
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# Reuse one session across many requests
async def fetch_many(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
asyncio.run(fetch_many(["https://a.com", "https://b.com"]))ClientSession() creates a TCP connection pool. Creating one per request means:
- No connection reuse (slow)
Unclosed sessionwarnings cluttering logs- Potential file descriptor exhaustion
Long-lived session pattern (for apps, not scripts):
import aiohttp
# At app startup
session = aiohttp.ClientSession()
# During request handling
async def call_api(url):
async with session.get(url) as response:
return await response.json()
# At app shutdown
async def cleanup():
await session.close()For FastAPI, use the lifespan context:
from contextlib import asynccontextmanager
from fastapi import FastAPI
import aiohttp
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.http = aiohttp.ClientSession()
yield
await app.state.http.close()
app = FastAPI(lifespan=lifespan)
@app.get("/proxy")
async def proxy(request):
async with app.state.http.get("https://upstream.com/data") as response:
return await response.json()Common Mistake: Creating a new ClientSession() inside every request handler. Each session opens its own connection pool — the upstream server sees a flood of new TCP connections instead of reusing existing ones. Create the session once at startup and share it across requests.
Fix 2: ClientTimeout Configuration
Default timeouts are conservative but not always appropriate. Configure explicitly:
import aiohttp
# Simple timeout
timeout = aiohttp.ClientTimeout(total=30) # 30s total
# Fine-grained
timeout = aiohttp.ClientTimeout(
total=60, # Total timeout for the request
connect=5, # Connection establishment
sock_connect=5, # Individual socket connection attempt
sock_read=30, # Socket read (between bytes)
)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get("https://slow.example.com") as response:
data = await response.json()Per-request override:
async with session.get(
"https://fast.example.com",
timeout=aiohttp.ClientTimeout(total=5),
) as response:
...Disable timeout (rarely appropriate):
timeout = aiohttp.ClientTimeout(total=None) # No timeoutCatch specific timeout exceptions:
import aiohttp
import asyncio
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json()
except asyncio.TimeoutError:
print("Request timed out")
except aiohttp.ClientError as e:
print(f"Client error: {e}")aiohttp raises asyncio.TimeoutError (not aiohttp.ServerTimeoutError) on timeout. This tripped a lot of people up in older versions — it’s still asyncio’s timeout propagating up.
Fix 3: Connection Pool Tuning
import aiohttp
# Default connector: 100 total, 30 per host
connector = aiohttp.TCPConnector(
limit=200, # Total connections across all hosts
limit_per_host=50, # Connections to a single host
ttl_dns_cache=300, # DNS cache TTL (5 minutes)
enable_cleanup_closed=True, # Clean up closed connections aggressively
force_close=False, # Keep-alive (default)
)
async with aiohttp.ClientSession(connector=connector) as session:
# All requests share the pool
...Hit the per-host limit — requests queue silently:
# If limit_per_host=30 and you launch 100 requests to the same host,
# 30 go out immediately, 70 queue.
urls = ["https://api.example.com/item/1", "https://api.example.com/item/2", ...]
# Appears as a slow request, not an error
results = await asyncio.gather(*[session.get(u) for u in urls])Raise the limit for legitimate high-concurrency workloads:
connector = aiohttp.TCPConnector(limit=1000, limit_per_host=200)Or throttle explicitly with a semaphore:
import asyncio
sem = asyncio.Semaphore(50) # Max 50 concurrent
async def fetch_with_limit(session, url):
async with sem:
async with session.get(url) as resp:
return await resp.text()Pro Tip: Use an asyncio Semaphore instead of just raising connector limits. Semaphores express your intent (“at most 50 concurrent requests”) rather than a pool limit side effect. Makes rate-limiting, timing, and debugging much clearer.
Fix 4: SSL and Certificates
aiohttp.client_exceptions.ClientConnectorCertificateError:
[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failedDisable SSL verification (development only):
import aiohttp
import ssl
# Disable verification
connector = aiohttp.TCPConnector(ssl=False)
# Or with custom SSL context
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
connector = aiohttp.TCPConnector(ssl=ssl_ctx)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get("https://self-signed.example.com") as resp:
...Use a custom CA bundle (corporate networks with MITM proxy):
import ssl
ssl_ctx = ssl.create_default_context(cafile="/path/to/corporate-ca.pem")
connector = aiohttp.TCPConnector(ssl=ssl_ctx)
async with aiohttp.ClientSession(connector=connector) as session:
...SSL error by host — disable for specific calls only:
async with session.get("https://self-signed.example.com", ssl=False) as resp:
...For general Python SSL certificate issues, see Python SSL certificate verify failed.
Fix 5: Avoid Blocking Calls in Async Context
from aiohttp import web
import requests # SYNC library
async def handle(request):
# WRONG — requests.get blocks the entire event loop
resp = requests.get("https://api.example.com/data")
return web.json_response(resp.json())One blocking call in an async handler stops all other requests from being processed. Always use async libraries inside async code.
CORRECT — use aiohttp or httpx:
from aiohttp import web, ClientSession
async def handle(request):
async with request.app["http"].get("https://api.example.com/data") as resp:
data = await resp.json()
return web.json_response(data)
async def init_app():
app = web.Application()
app["http"] = ClientSession()
app.router.add_get("/", handle)
return app
async def cleanup(app):
await app["http"].close()
app = web.Application()
app.cleanup_ctx.append(lambda app: (yield)) # Pattern for setup/teardownIf you must use a blocking library (DB client, crypto operation), offload to an executor:
import asyncio
async def handle(request):
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
None, # Default ThreadPoolExecutor
slow_blocking_call,
arg1, arg2,
)
return web.json_response(result)Common Mistake: Using requests inside an async function and wondering why the server grinds to a halt under load. Every sync call blocks the event loop for its entire duration. Use aiohttp or httpx for HTTP, asyncpg for Postgres, motor for MongoDB, etc.
For async/sync mixing patterns, see Python async sync mix.
Fix 6: Basic aiohttp Server
from aiohttp import web
async def hello(request):
name = request.match_info.get("name", "world")
return web.Response(text=f"Hello, {name}!")
async def api_handler(request):
data = await request.json() # Parse JSON body
return web.json_response({"received": data})
app = web.Application()
app.router.add_get("/", hello)
app.router.add_get("/hello/{name}", hello)
app.router.add_post("/api", api_handler)
if __name__ == "__main__":
web.run_app(app, host="0.0.0.0", port=8080)Middleware:
from aiohttp import web
import time
@web.middleware
async def logging_middleware(request, handler):
start = time.perf_counter()
try:
response = await handler(request)
except web.HTTPException as ex:
return web.Response(status=ex.status, text=str(ex))
elapsed = time.perf_counter() - start
print(f"{request.method} {request.path} → {response.status} in {elapsed:.3f}s")
return response
app = web.Application(middlewares=[logging_middleware])Application lifecycle hooks:
async def on_startup(app):
app["db"] = await create_db_pool()
async def on_cleanup(app):
await app["db"].close()
app.on_startup.append(on_startup)
app.on_cleanup.append(on_cleanup)Request parsing:
async def handler(request):
# Path params
user_id = request.match_info["user_id"]
# Query params
limit = int(request.query.get("limit", 10))
# JSON body
body = await request.json()
# Form data
form = await request.post()
# Raw bytes
data = await request.read()
# Streaming body
async for chunk in request.content.iter_chunked(1024):
process(chunk)Fix 7: WebSocket Client and Server
Client:
import aiohttp
import asyncio
async def ws_client():
async with aiohttp.ClientSession() as session:
async with session.ws_connect("wss://echo.websocket.org") as ws:
await ws.send_str("hello")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print("Received:", msg.data)
elif msg.type == aiohttp.WSMsgType.ERROR:
break
asyncio.run(ws_client())Server:
from aiohttp import web, WSMsgType
async def ws_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
else:
await ws.send_str(f"Echo: {msg.data}")
elif msg.type == WSMsgType.ERROR:
print(f"WS error: {ws.exception()}")
return ws
app = web.Application()
app.router.add_get("/ws", ws_handler)
web.run_app(app)Broadcasting to multiple clients:
from aiohttp import web, WSMsgType
from weakref import WeakSet
async def broadcast_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
request.app["websockets"].add(ws)
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
for ws_client in request.app["websockets"]:
if not ws_client.closed:
await ws_client.send_str(msg.data)
finally:
request.app["websockets"].discard(ws)
return ws
async def init():
app = web.Application()
app["websockets"] = WeakSet()
app.router.add_get("/ws", broadcast_handler)
return appFix 8: Proxies and Authentication
HTTP proxy:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(
"https://example.com",
proxy="http://proxy.example.com:8080",
proxy_auth=aiohttp.BasicAuth("user", "pass"),
) as resp:
...Authentication:
# Basic auth
auth = aiohttp.BasicAuth("user", "pass")
async with aiohttp.ClientSession(auth=auth) as session:
async with session.get("https://api.example.com") as resp:
...
# Bearer token via headers
headers = {"Authorization": "Bearer YOUR_TOKEN"}
async with aiohttp.ClientSession(headers=headers) as session:
...Cookies persist within a session:
async with aiohttp.ClientSession() as session:
# Login — server sets cookies
await session.post("https://example.com/login", data={"user": "x", "pw": "y"})
# Subsequent requests automatically include cookies
async with session.get("https://example.com/profile") as resp:
print(await resp.text())Still Not Working?
aiohttp vs httpx
- aiohttp — Older, async-first, has a full HTTP server built in. Lower-level control.
- httpx — Newer, supports sync AND async, requests-compatible API. Simpler for client-only use.
For httpx-specific patterns that parallel aiohttp, see httpx not working.
Choose aiohttp when you need the server too (or already use the ecosystem). Choose httpx for new client-only code — the API is cleaner.
Testing aiohttp Servers
from aiohttp.test_utils import AioHTTPTestCase
from aiohttp import web
class MyTest(AioHTTPTestCase):
async def get_application(self):
app = web.Application()
app.router.add_get("/", self.handler)
return app
async def handler(self, request):
return web.Response(text="Hello")
async def test_get(self):
async with self.client.get("/") as resp:
assert resp.status == 200
assert await resp.text() == "Hello"Or with pytest:
import pytest
from aiohttp.test_utils import TestClient, TestServer
@pytest.fixture
async def client(app):
async with TestClient(TestServer(app)) as client:
yield client
async def test_index(client):
resp = await client.get("/")
assert resp.status == 200For pytest async fixture patterns, see pytest fixture not found.
Streaming Large Responses
Don’t load large responses into memory. Stream instead:
async with session.get("https://example.com/huge-file.zip") as response:
with open("out.zip", "wb") as f:
async for chunk in response.content.iter_chunked(8192):
f.write(chunk)For server-side streaming responses:
from aiohttp import web
async def stream_handler(request):
response = web.StreamResponse()
response.content_type = "text/plain"
await response.prepare(request)
for i in range(1000):
await response.write(f"chunk {i}\n".encode())
await response.write_eof()
return responseServer-Sent Events (SSE)
from aiohttp import web
import asyncio
async def sse_handler(request):
response = web.StreamResponse()
response.headers["Content-Type"] = "text/event-stream"
response.headers["Cache-Control"] = "no-cache"
await response.prepare(request)
for i in range(100):
await response.write(f"data: event {i}\n\n".encode())
await asyncio.sleep(1)
return responseUseful for LLM streaming responses, progress updates, and long-poll alternatives — simpler than WebSockets when you only need server-to-client messaging.
Graceful Shutdown
from aiohttp import web
async def shutdown(app):
print("Shutting down")
# Close pools, flush data
for ws in set(app.get("websockets", [])):
await ws.close(code=1001, message="Server shutdown")
app = web.Application()
app.on_shutdown.append(shutdown)on_shutdown runs before on_cleanup. Use on_shutdown to close active connections, on_cleanup for deeper resource release (DB pools, caches).
Integration with asyncio
aiohttp builds on asyncio — the same event loop, the same tasks. For asyncio-specific issues around event loops, gather, and run, see Python asyncio not running.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: httpx Not Working — Async Client, Timeout, and Connection Pool Errors
How to fix httpx errors — RuntimeError event loop is closed, ReadTimeout exception, ConnectionResetError, async client not closing properly, HTTP/2 not enabled, SSL verify failed, and proxy not working.
Fix: Python asyncio Not Running / async Functions Not Executing
How to fix Python asyncio not running — coroutines never executing, RuntimeError no running event loop, mixing sync and async code, and common async/await mistakes in Python.
Fix: Apache Airflow Not Working — DAG Not Found, Task Failures, and Scheduler Issues
How to fix Apache Airflow errors — DAG not appearing in UI, ImportError preventing DAG load, task stuck in running or queued, scheduler not scheduling, XCom too large, connection not found, and database migration errors.
Fix: BeautifulSoup Not Working — Parser Errors, Encoding Issues, and find_all Returns Empty
How to fix BeautifulSoup errors — bs4.FeatureNotFound install lxml, find_all returns empty list, Unicode decode error, JavaScript-rendered content not found, select vs find_all confusion, and slow parsing on large HTML.