Fix: asyncpg Not Working — Connection Pool, Prepared Statements, and Transaction Errors
Quick Answer
How to fix asyncpg errors — connection refused localhost 5432, pool exhausted timeout, prepared statement does not exist, type codec not registered, JSON automatic conversion, and transaction rollback on exception.
The Error
You install asyncpg and connecting fails immediately:
asyncpg.exceptions.ConnectionDoesNotExistError:
[Errno 111] Connection refusedOr the pool exhausts under load and requests hang:
asyncpg.exceptions.TooManyConnectionsError:
remaining connection slots are reserved for non-replication superuser connectionsOr prepared statements break after PgBouncer:
asyncpg.exceptions.InvalidSQLStatementNameError:
prepared statement "__asyncpg_stmt_1__" does not existOr fetched rows look weird:
row = await conn.fetchrow("SELECT data FROM events WHERE id = $1", 1)
print(row["data"])
# Returns a string '{"key": "value"}' instead of dictOr transactions don’t roll back on exceptions:
async with conn.transaction():
await conn.execute("INSERT INTO users (id) VALUES (1)")
raise ValueError("oops")
# Expectation: rolled back. Reality: depends on usageasyncpg is the fastest PostgreSQL driver for Python — written in C, async-native, used by FastAPI, Starlette, and most modern async Python web stacks. It’s deliberately lower-level than SQLAlchemy: no ORM, no automatic type coercion for custom types, no schema introspection. The performance comes from this minimalism, but it surfaces specific failure modes around pools, type handling, and PgBouncer compatibility. This guide covers each.
Why This Happens
asyncpg uses a connection pool to share PostgreSQL connections across many async tasks. Each task acquires a connection, runs queries, and releases it. Without proper pool sizing or release patterns, the pool exhausts (no available connections) and acquires block forever.
PostgreSQL prepared statements are server-side objects with names. asyncpg auto-prepares queries for performance, but PgBouncer in transaction or statement pooling mode doesn’t preserve prepared statements across connection reuses — causing “does not exist” errors.
Fix 1: Connecting and Connection Strings
import asyncpg
import asyncio
async def main():
# Single connection (rarely used in production)
conn = await asyncpg.connect(
host="localhost",
port=5432,
user="postgres",
password="postgres",
database="mydb",
)
# Or via URL
conn = await asyncpg.connect("postgresql://user:pass@localhost:5432/mydb")
# Query
rows = await conn.fetch("SELECT id, name FROM users")
for row in rows:
print(row["id"], row["name"])
await conn.close()
asyncio.run(main())Use a connection pool for any real workload:
async def main():
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost:5432/mydb",
min_size=5,
max_size=20,
)
async with pool.acquire() as conn:
rows = await conn.fetch("SELECT * FROM users LIMIT 10")
for row in rows:
print(row)
await pool.close()asyncpg.connect parameters:
| Parameter | Notes |
|---|---|
host | Hostname or path to Unix socket |
port | Default 5432 |
user, password | Credentials |
database | Database name |
ssl | True, False, 'require', or SSLContext |
timeout | Connection timeout (seconds) |
command_timeout | Default query timeout |
server_settings | Dict of PG settings (e.g., {"jit": "off"}) |
Common Mistake: Forgetting await pool.close() on shutdown. The pool keeps connections alive, holding PostgreSQL slots. In FastAPI, use the lifespan context to manage the pool’s lifecycle. In scripts, wrap in try/finally to guarantee cleanup.
For PostgreSQL connection refused errors at the network/auth level, see PostgreSQL connection refused.
Fix 2: Connection Pool Configuration
pool = await asyncpg.create_pool(
dsn="postgresql://user:pass@localhost/mydb",
min_size=10,
max_size=50,
max_queries=50000, # Recycle connection after N queries
max_inactive_connection_lifetime=300, # Close idle conns after 5 min
timeout=60, # Acquire timeout
command_timeout=30, # Per-query timeout
init=init_connection, # Run on each new connection
setup=setup_connection, # Run before returning conn from acquire
server_settings={
"application_name": "myapp",
"jit": "off", # JIT can slow OLTP workloads
},
)Sizing the pool:
min_size— Connections opened immediately at pool createmax_size— Cap on simultaneous connections
Rule of thumb: max_size = (2 × CPU_cores_on_db_server). PostgreSQL benchmarks show diminishing returns above this; more connections often slow throughput due to lock contention.
init and setup hooks:
async def init_connection(conn):
"""Runs once when a new connection is created."""
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
async def setup_connection(conn):
"""Runs every time a connection is acquired from the pool."""
await conn.execute("SET TIME ZONE 'UTC'")Use init for codec registration (once per connection). Use setup for session-level state (timezone, search_path, role).
Pro Tip: Always set command_timeout on the pool. Without it, a query that hangs (waiting on a lock, network blip) blocks the acquiring task forever — and may exhaust the pool as other tasks pile up. A 30-second default catches most pathological queries without affecting normal traffic.
Fix 3: Pool Exhaustion and Acquire Hangs
asyncpg.exceptions.TooManyConnectionsError:
remaining connection slots are reservedOr worse — the task hangs forever in pool.acquire(). Common causes:
max_sizetoo high — PostgreSQLmax_connections(default 100) is hit before your pool’s limit- Connections not released — code path that doesn’t return the connection
- Long-running queries holding connections
Always use async with for acquire:
# CORRECT — connection released on context exit
async with pool.acquire() as conn:
await conn.execute("...")
# WRONG — leaks if exception occurs
conn = await pool.acquire()
await conn.execute("...") # If this raises, conn never released
await pool.release(conn)Check PostgreSQL’s connection limit:
SHOW max_connections; -- Default 100
-- Active connections per app
SELECT application_name, COUNT(*) FROM pg_stat_activity GROUP BY application_name;If your pool’s max_size plus other apps exceeds max_connections, PostgreSQL refuses new connections.
Common Mistake: Setting max_size=100 on the pool with default PostgreSQL max_connections=100. With multiple application instances, you’ve already exceeded the database’s capacity. Either raise max_connections server-side or shrink each pool’s max_size.
For PostgreSQL connection limit exhaustion patterns, see PostgreSQL max connections exceeded.
Fix 4: Type Codecs and JSON
asyncpg returns most PostgreSQL types as their natural Python equivalents — but JSON/JSONB come back as strings by default:
# Default behavior
row = await conn.fetchrow("SELECT data FROM events LIMIT 1")
print(type(row["data"])) # <class 'str'>
print(row["data"]) # '{"key": "value"}' — string, not dictRegister a JSON codec:
import json
import asyncpg
async def init_connection(conn):
await conn.set_type_codec(
"json",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
pool = await asyncpg.create_pool(dsn, init=init_connection)
# Now JSON columns come back as dict/list automatically
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT data FROM events")
print(type(row["data"])) # <class 'dict'>Other types worth registering:
import uuid
from decimal import Decimal
async def init(conn):
# UUID — asyncpg returns uuid.UUID by default (no codec needed)
# Decimal — asyncpg returns Decimal for NUMERIC by default
# Custom enum types
await conn.set_type_codec(
"my_enum_type",
encoder=str,
decoder=lambda v: MyEnum(v),
schema="public",
)
# Custom composite types
await conn.set_builtin_type_codec("hstore", codec_name="pg_contrib.hstore")Use orjson for faster JSON serialization:
import orjson
async def init(conn):
await conn.set_type_codec(
"jsonb",
encoder=lambda v: orjson.dumps(v).decode(),
decoder=orjson.loads,
schema="pg_catalog",
)orjson is 2–10x faster than stdlib json for typical payloads — meaningful for JSON-heavy workloads.
Fix 5: Parameterized Queries
asyncpg uses PostgreSQL-style positional parameters ($1, $2):
# CORRECT — parameterized (safe from SQL injection)
await conn.fetch("SELECT * FROM users WHERE id = $1", user_id)
# WRONG — string interpolation (SQL injection vulnerability)
await conn.fetch(f"SELECT * FROM users WHERE id = {user_id}")
# Multiple params
await conn.fetch(
"SELECT * FROM users WHERE name = $1 AND active = $2",
"Alice", True,
)
# IN clause with array
ids = [1, 2, 3, 4, 5]
await conn.fetch("SELECT * FROM users WHERE id = ANY($1::int[])", ids)Note: asyncpg uses $N not ? or %s. If you copy queries from SQLAlchemy or psycopg, convert the placeholders.
Bulk insert (copy_records_to_table):
records = [
(1, "Alice", 30),
(2, "Bob", 25),
(3, "Charlie", 35),
]
await conn.copy_records_to_table(
"users",
records=records,
columns=["id", "name", "age"],
)copy_records_to_table uses PostgreSQL’s COPY protocol — 10-100x faster than individual INSERTs for bulk loads.
executemany for many similar queries:
await conn.executemany(
"INSERT INTO logs (timestamp, message) VALUES ($1, $2)",
[
(datetime.now(), "log 1"),
(datetime.now(), "log 2"),
],
)Faster than calling execute in a loop because asyncpg batches the statements.
Fix 6: Transactions and Rollback
async with conn.transaction():
await conn.execute("INSERT INTO users (id) VALUES (1)")
await conn.execute("INSERT INTO users (id) VALUES (1)") # Duplicate — raises
# On exception, the transaction is automatically rolled backWith explicit isolation level:
async with conn.transaction(isolation="serializable"):
# Strongest isolation — catches more concurrency issues
await conn.execute("...")Isolation levels:
| Level | Behavior |
|---|---|
read_committed (default) | Sees committed changes from other txns |
repeatable_read | Snapshot at txn start; no phantom reads |
serializable | Strongest — txns appear truly sequential |
Savepoints for partial rollback:
async with conn.transaction():
await conn.execute("INSERT INTO logs VALUES (1, 'start')")
try:
async with conn.transaction(): # Inner = SAVEPOINT
await conn.execute("INSERT INTO logs VALUES (2, 'risky')")
raise ValueError("rollback inner only")
except ValueError:
pass # Inner rolled back; outer continues
await conn.execute("INSERT INTO logs VALUES (3, 'end')")
# Only rows 1 and 3 committedCommon Mistake: Acquiring multiple connections from the pool inside a transaction. Each pool.acquire() returns a different connection — your transaction only covers the connection you started it on:
# WRONG — second acquire is a different conn, not in the transaction
async with pool.acquire() as conn1:
async with conn1.transaction():
await conn1.execute("INSERT INTO orders ...")
async with pool.acquire() as conn2: # Different conn!
await conn2.execute("UPDATE inventory ...") # Not transactional with the insert
# CORRECT — use one connection for the whole transaction
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute("INSERT INTO orders ...")
await conn.execute("UPDATE inventory ...")Fix 7: PgBouncer Compatibility
PgBouncer is a connection pooler that sits between your app and PostgreSQL. asyncpg’s auto-prepared statements break in PgBouncer’s transaction or statement pooling modes:
asyncpg.exceptions.InvalidSQLStatementNameError:
prepared statement "__asyncpg_stmt_1__" does not existDisable statement caching for PgBouncer:
pool = await asyncpg.create_pool(
dsn,
statement_cache_size=0, # Disable prepared statement cache
prepared_statement_cache_size=0, # asyncpg 0.30+
)Or use PgBouncer’s session pooling mode — preserves prepared statements but reduces pooling benefit.
Or use asyncpg 0.30+ which has better PgBouncer support:
pool = await asyncpg.create_pool(
dsn,
server_settings={"statement_cache_mode": "off"},
)Common Mistake: Adding PgBouncer in production without disabling statement caching. Tests pass against direct PostgreSQL connections; production fails after the first reused PgBouncer connection. Always test against your actual deployment topology (with PgBouncer if you’ll use it).
Fix 8: FastAPI / Starlette Integration
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
import asyncpg
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
app.state.db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/mydb",
min_size=5,
max_size=20,
command_timeout=30,
)
yield
# Shutdown
await app.state.db_pool.close()
app = FastAPI(lifespan=lifespan)
async def get_db():
async with app.state.db_pool.acquire() as conn:
yield conn
@app.get("/users/{user_id}")
async def get_user(user_id: int, conn = Depends(get_db)):
row = await conn.fetchrow("SELECT id, name FROM users WHERE id = $1", user_id)
return dict(row) if row else NoneFor FastAPI dependency injection patterns that integrate with asyncpg pools, see FastAPI dependency injection error.
Per-request transaction dependency:
async def get_db_with_transaction():
async with app.state.db_pool.acquire() as conn:
async with conn.transaction():
yield conn
# Transaction commits on success, rolls back on exception
@app.post("/orders")
async def create_order(order: Order, conn = Depends(get_db_with_transaction)):
await conn.execute("INSERT INTO orders ...")
await conn.execute("UPDATE inventory ...")
# Both succeed or both roll backStill Not Working?
asyncpg vs psycopg vs SQLAlchemy
- asyncpg — Fastest, lowest-level. Best for performance-critical paths.
- psycopg 3 — Sync + async, supports more PostgreSQL features (LISTEN/NOTIFY easier, more codec support).
- SQLAlchemy 2 — Full ORM with async support. Best for complex domain models. See SQLAlchemy not working.
asyncpg is the right choice for hot paths in async services. For complex schemas with relationships, SQLAlchemy + asyncpg (as the async driver) gives you both ergonomics and performance.
Notifications (LISTEN/NOTIFY)
async def listener(conn, pid, channel, payload):
print(f"Received: channel={channel}, payload={payload}")
async with pool.acquire() as conn:
await conn.add_listener("my_channel", listener)
# Keep the connection alive to receive notifications
while True:
await asyncio.sleep(60)For notifications, dedicate a connection — don’t return it to the pool. The pool’s setup/init hooks don’t survive connection recycling, so listeners get lost.
Streaming Large Result Sets
async with conn.transaction():
async for row in conn.cursor("SELECT * FROM huge_table"):
process(row)Cursors stream rows in batches (default 100 rows per fetch) — process arbitrarily large tables without loading all into memory. Must be inside a transaction.
Testing with asyncpg
import pytest
import asyncpg
@pytest.fixture
async def db_conn():
conn = await asyncpg.connect("postgresql://localhost/test_db")
async with conn.transaction():
yield conn
raise asyncpg.RollbackError # Roll back any changes the test made
@pytest.mark.asyncio
async def test_user_creation(db_conn):
await db_conn.execute("INSERT INTO users (name) VALUES ($1)", "Alice")
row = await db_conn.fetchrow("SELECT name FROM users WHERE name = $1", "Alice")
assert row["name"] == "Alice"The transaction-rollback fixture pattern keeps the test database clean between tests.
For pytest async fixture patterns, see pytest fixture not found.
Combining with httpx for External APIs
For applications that hit external APIs and a database, share the event loop carefully:
async def fetch_and_store(url: str, conn):
async with httpx.AsyncClient() as client:
response = await client.get(url)
data = response.json()
await conn.execute(
"INSERT INTO events (data) VALUES ($1)",
json.dumps(data),
)For httpx-specific connection pool patterns, see httpx not working.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Peewee Not Working — Connection Pooling, Field Errors, and Migration Setup
How to fix Peewee errors — OperationalError database is locked, connection already open, field type mismatch, model meta database missing, N+1 queries, and peewee-migrate setup.
Fix: Tortoise ORM Not Working — Model Registration, Async Init, and Relationship Errors
How to fix Tortoise ORM errors — Tortoise.init not called, no module imported model, fetch_related missing, aerich migration setup, FastAPI integration patterns, and ConfigurationError missing connection.
Fix: psycopg Not Working — psycopg2 to psycopg3 Migration, Connection Pool, and Async Errors
How to fix psycopg errors — psycopg2 to psycopg 3 import migration, connection pool setup, row factory tuple vs dict, COPY protocol changes, async psycopg pool, server-side cursor confusion, and binary mode performance.
Fix: Alembic Not Working — Autogenerate Missing Changes, Multiple Heads, and Migration Conflicts
How to fix Alembic errors — autogenerate not detecting model changes, Multiple head revisions, can't locate revision, downgrade fails, async engine support, and database URL configuration.