Skip to content

Fix: asyncpg Not Working — Connection Pool, Prepared Statements, and Transaction Errors

FixDevs ·

Quick Answer

How to fix asyncpg errors — connection refused localhost 5432, pool exhausted timeout, prepared statement does not exist, type codec not registered, JSON automatic conversion, and transaction rollback on exception.

The Error

You install asyncpg and connecting fails immediately:

asyncpg.exceptions.ConnectionDoesNotExistError:
[Errno 111] Connection refused

Or the pool exhausts under load and requests hang:

asyncpg.exceptions.TooManyConnectionsError:
remaining connection slots are reserved for non-replication superuser connections

Or prepared statements break after PgBouncer:

asyncpg.exceptions.InvalidSQLStatementNameError:
prepared statement "__asyncpg_stmt_1__" does not exist

Or fetched rows look weird:

row = await conn.fetchrow("SELECT data FROM events WHERE id = $1", 1)
print(row["data"])
# Returns a string '{"key": "value"}' instead of dict

Or transactions don’t roll back on exceptions:

async with conn.transaction():
    await conn.execute("INSERT INTO users (id) VALUES (1)")
    raise ValueError("oops")
# Expectation: rolled back. Reality: depends on usage

asyncpg is the fastest PostgreSQL driver for Python — written in C, async-native, used by FastAPI, Starlette, and most modern async Python web stacks. It’s deliberately lower-level than SQLAlchemy: no ORM, no automatic type coercion for custom types, no schema introspection. The performance comes from this minimalism, but it surfaces specific failure modes around pools, type handling, and PgBouncer compatibility. This guide covers each.

Why This Happens

asyncpg uses a connection pool to share PostgreSQL connections across many async tasks. Each task acquires a connection, runs queries, and releases it. Without proper pool sizing or release patterns, the pool exhausts (no available connections) and acquires block forever.

PostgreSQL prepared statements are server-side objects with names. asyncpg auto-prepares queries for performance, but PgBouncer in transaction or statement pooling mode doesn’t preserve prepared statements across connection reuses — causing “does not exist” errors.

Fix 1: Connecting and Connection Strings

import asyncpg
import asyncio

async def main():
    # Single connection (rarely used in production)
    conn = await asyncpg.connect(
        host="localhost",
        port=5432,
        user="postgres",
        password="postgres",
        database="mydb",
    )

    # Or via URL
    conn = await asyncpg.connect("postgresql://user:pass@localhost:5432/mydb")

    # Query
    rows = await conn.fetch("SELECT id, name FROM users")
    for row in rows:
        print(row["id"], row["name"])

    await conn.close()

asyncio.run(main())

Use a connection pool for any real workload:

async def main():
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost:5432/mydb",
        min_size=5,
        max_size=20,
    )

    async with pool.acquire() as conn:
        rows = await conn.fetch("SELECT * FROM users LIMIT 10")
        for row in rows:
            print(row)

    await pool.close()

asyncpg.connect parameters:

ParameterNotes
hostHostname or path to Unix socket
portDefault 5432
user, passwordCredentials
databaseDatabase name
sslTrue, False, 'require', or SSLContext
timeoutConnection timeout (seconds)
command_timeoutDefault query timeout
server_settingsDict of PG settings (e.g., {"jit": "off"})

Common Mistake: Forgetting await pool.close() on shutdown. The pool keeps connections alive, holding PostgreSQL slots. In FastAPI, use the lifespan context to manage the pool’s lifecycle. In scripts, wrap in try/finally to guarantee cleanup.

For PostgreSQL connection refused errors at the network/auth level, see PostgreSQL connection refused.

Fix 2: Connection Pool Configuration

pool = await asyncpg.create_pool(
    dsn="postgresql://user:pass@localhost/mydb",
    min_size=10,
    max_size=50,
    max_queries=50000,           # Recycle connection after N queries
    max_inactive_connection_lifetime=300,   # Close idle conns after 5 min
    timeout=60,                   # Acquire timeout
    command_timeout=30,            # Per-query timeout
    init=init_connection,          # Run on each new connection
    setup=setup_connection,        # Run before returning conn from acquire
    server_settings={
        "application_name": "myapp",
        "jit": "off",   # JIT can slow OLTP workloads
    },
)

Sizing the pool:

  • min_size — Connections opened immediately at pool create
  • max_size — Cap on simultaneous connections

Rule of thumb: max_size = (2 × CPU_cores_on_db_server). PostgreSQL benchmarks show diminishing returns above this; more connections often slow throughput due to lock contention.

init and setup hooks:

async def init_connection(conn):
    """Runs once when a new connection is created."""
    await conn.set_type_codec(
        "jsonb",
        encoder=json.dumps,
        decoder=json.loads,
        schema="pg_catalog",
    )

async def setup_connection(conn):
    """Runs every time a connection is acquired from the pool."""
    await conn.execute("SET TIME ZONE 'UTC'")

Use init for codec registration (once per connection). Use setup for session-level state (timezone, search_path, role).

Pro Tip: Always set command_timeout on the pool. Without it, a query that hangs (waiting on a lock, network blip) blocks the acquiring task forever — and may exhaust the pool as other tasks pile up. A 30-second default catches most pathological queries without affecting normal traffic.

Fix 3: Pool Exhaustion and Acquire Hangs

asyncpg.exceptions.TooManyConnectionsError:
remaining connection slots are reserved

Or worse — the task hangs forever in pool.acquire(). Common causes:

  1. max_size too high — PostgreSQL max_connections (default 100) is hit before your pool’s limit
  2. Connections not released — code path that doesn’t return the connection
  3. Long-running queries holding connections

Always use async with for acquire:

# CORRECT — connection released on context exit
async with pool.acquire() as conn:
    await conn.execute("...")

# WRONG — leaks if exception occurs
conn = await pool.acquire()
await conn.execute("...")   # If this raises, conn never released
await pool.release(conn)

Check PostgreSQL’s connection limit:

SHOW max_connections;   -- Default 100

-- Active connections per app
SELECT application_name, COUNT(*) FROM pg_stat_activity GROUP BY application_name;

If your pool’s max_size plus other apps exceeds max_connections, PostgreSQL refuses new connections.

Common Mistake: Setting max_size=100 on the pool with default PostgreSQL max_connections=100. With multiple application instances, you’ve already exceeded the database’s capacity. Either raise max_connections server-side or shrink each pool’s max_size.

For PostgreSQL connection limit exhaustion patterns, see PostgreSQL max connections exceeded.

Fix 4: Type Codecs and JSON

asyncpg returns most PostgreSQL types as their natural Python equivalents — but JSON/JSONB come back as strings by default:

# Default behavior
row = await conn.fetchrow("SELECT data FROM events LIMIT 1")
print(type(row["data"]))   # <class 'str'>
print(row["data"])          # '{"key": "value"}' — string, not dict

Register a JSON codec:

import json
import asyncpg

async def init_connection(conn):
    await conn.set_type_codec(
        "json",
        encoder=json.dumps,
        decoder=json.loads,
        schema="pg_catalog",
    )
    await conn.set_type_codec(
        "jsonb",
        encoder=json.dumps,
        decoder=json.loads,
        schema="pg_catalog",
    )

pool = await asyncpg.create_pool(dsn, init=init_connection)

# Now JSON columns come back as dict/list automatically
async with pool.acquire() as conn:
    row = await conn.fetchrow("SELECT data FROM events")
    print(type(row["data"]))   # <class 'dict'>

Other types worth registering:

import uuid
from decimal import Decimal

async def init(conn):
    # UUID — asyncpg returns uuid.UUID by default (no codec needed)
    # Decimal — asyncpg returns Decimal for NUMERIC by default

    # Custom enum types
    await conn.set_type_codec(
        "my_enum_type",
        encoder=str,
        decoder=lambda v: MyEnum(v),
        schema="public",
    )

    # Custom composite types
    await conn.set_builtin_type_codec("hstore", codec_name="pg_contrib.hstore")

Use orjson for faster JSON serialization:

import orjson

async def init(conn):
    await conn.set_type_codec(
        "jsonb",
        encoder=lambda v: orjson.dumps(v).decode(),
        decoder=orjson.loads,
        schema="pg_catalog",
    )

orjson is 2–10x faster than stdlib json for typical payloads — meaningful for JSON-heavy workloads.

Fix 5: Parameterized Queries

asyncpg uses PostgreSQL-style positional parameters ($1, $2):

# CORRECT — parameterized (safe from SQL injection)
await conn.fetch("SELECT * FROM users WHERE id = $1", user_id)

# WRONG — string interpolation (SQL injection vulnerability)
await conn.fetch(f"SELECT * FROM users WHERE id = {user_id}")

# Multiple params
await conn.fetch(
    "SELECT * FROM users WHERE name = $1 AND active = $2",
    "Alice", True,
)

# IN clause with array
ids = [1, 2, 3, 4, 5]
await conn.fetch("SELECT * FROM users WHERE id = ANY($1::int[])", ids)

Note: asyncpg uses $N not ? or %s. If you copy queries from SQLAlchemy or psycopg, convert the placeholders.

Bulk insert (copy_records_to_table):

records = [
    (1, "Alice", 30),
    (2, "Bob", 25),
    (3, "Charlie", 35),
]

await conn.copy_records_to_table(
    "users",
    records=records,
    columns=["id", "name", "age"],
)

copy_records_to_table uses PostgreSQL’s COPY protocol — 10-100x faster than individual INSERTs for bulk loads.

executemany for many similar queries:

await conn.executemany(
    "INSERT INTO logs (timestamp, message) VALUES ($1, $2)",
    [
        (datetime.now(), "log 1"),
        (datetime.now(), "log 2"),
    ],
)

Faster than calling execute in a loop because asyncpg batches the statements.

Fix 6: Transactions and Rollback

async with conn.transaction():
    await conn.execute("INSERT INTO users (id) VALUES (1)")
    await conn.execute("INSERT INTO users (id) VALUES (1)")   # Duplicate — raises
# On exception, the transaction is automatically rolled back

With explicit isolation level:

async with conn.transaction(isolation="serializable"):
    # Strongest isolation — catches more concurrency issues
    await conn.execute("...")

Isolation levels:

LevelBehavior
read_committed (default)Sees committed changes from other txns
repeatable_readSnapshot at txn start; no phantom reads
serializableStrongest — txns appear truly sequential

Savepoints for partial rollback:

async with conn.transaction():
    await conn.execute("INSERT INTO logs VALUES (1, 'start')")

    try:
        async with conn.transaction():   # Inner = SAVEPOINT
            await conn.execute("INSERT INTO logs VALUES (2, 'risky')")
            raise ValueError("rollback inner only")
    except ValueError:
        pass   # Inner rolled back; outer continues

    await conn.execute("INSERT INTO logs VALUES (3, 'end')")
# Only rows 1 and 3 committed

Common Mistake: Acquiring multiple connections from the pool inside a transaction. Each pool.acquire() returns a different connection — your transaction only covers the connection you started it on:

# WRONG — second acquire is a different conn, not in the transaction
async with pool.acquire() as conn1:
    async with conn1.transaction():
        await conn1.execute("INSERT INTO orders ...")
        async with pool.acquire() as conn2:   # Different conn!
            await conn2.execute("UPDATE inventory ...")   # Not transactional with the insert

# CORRECT — use one connection for the whole transaction
async with pool.acquire() as conn:
    async with conn.transaction():
        await conn.execute("INSERT INTO orders ...")
        await conn.execute("UPDATE inventory ...")

Fix 7: PgBouncer Compatibility

PgBouncer is a connection pooler that sits between your app and PostgreSQL. asyncpg’s auto-prepared statements break in PgBouncer’s transaction or statement pooling modes:

asyncpg.exceptions.InvalidSQLStatementNameError:
prepared statement "__asyncpg_stmt_1__" does not exist

Disable statement caching for PgBouncer:

pool = await asyncpg.create_pool(
    dsn,
    statement_cache_size=0,   # Disable prepared statement cache
    prepared_statement_cache_size=0,   # asyncpg 0.30+
)

Or use PgBouncer’s session pooling mode — preserves prepared statements but reduces pooling benefit.

Or use asyncpg 0.30+ which has better PgBouncer support:

pool = await asyncpg.create_pool(
    dsn,
    server_settings={"statement_cache_mode": "off"},
)

Common Mistake: Adding PgBouncer in production without disabling statement caching. Tests pass against direct PostgreSQL connections; production fails after the first reused PgBouncer connection. Always test against your actual deployment topology (with PgBouncer if you’ll use it).

Fix 8: FastAPI / Starlette Integration

from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
import asyncpg

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    app.state.db_pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/mydb",
        min_size=5,
        max_size=20,
        command_timeout=30,
    )
    yield
    # Shutdown
    await app.state.db_pool.close()

app = FastAPI(lifespan=lifespan)

async def get_db():
    async with app.state.db_pool.acquire() as conn:
        yield conn

@app.get("/users/{user_id}")
async def get_user(user_id: int, conn = Depends(get_db)):
    row = await conn.fetchrow("SELECT id, name FROM users WHERE id = $1", user_id)
    return dict(row) if row else None

For FastAPI dependency injection patterns that integrate with asyncpg pools, see FastAPI dependency injection error.

Per-request transaction dependency:

async def get_db_with_transaction():
    async with app.state.db_pool.acquire() as conn:
        async with conn.transaction():
            yield conn
    # Transaction commits on success, rolls back on exception

@app.post("/orders")
async def create_order(order: Order, conn = Depends(get_db_with_transaction)):
    await conn.execute("INSERT INTO orders ...")
    await conn.execute("UPDATE inventory ...")
    # Both succeed or both roll back

Still Not Working?

asyncpg vs psycopg vs SQLAlchemy

  • asyncpg — Fastest, lowest-level. Best for performance-critical paths.
  • psycopg 3 — Sync + async, supports more PostgreSQL features (LISTEN/NOTIFY easier, more codec support).
  • SQLAlchemy 2 — Full ORM with async support. Best for complex domain models. See SQLAlchemy not working.

asyncpg is the right choice for hot paths in async services. For complex schemas with relationships, SQLAlchemy + asyncpg (as the async driver) gives you both ergonomics and performance.

Notifications (LISTEN/NOTIFY)

async def listener(conn, pid, channel, payload):
    print(f"Received: channel={channel}, payload={payload}")

async with pool.acquire() as conn:
    await conn.add_listener("my_channel", listener)
    # Keep the connection alive to receive notifications
    while True:
        await asyncio.sleep(60)

For notifications, dedicate a connection — don’t return it to the pool. The pool’s setup/init hooks don’t survive connection recycling, so listeners get lost.

Streaming Large Result Sets

async with conn.transaction():
    async for row in conn.cursor("SELECT * FROM huge_table"):
        process(row)

Cursors stream rows in batches (default 100 rows per fetch) — process arbitrarily large tables without loading all into memory. Must be inside a transaction.

Testing with asyncpg

import pytest
import asyncpg

@pytest.fixture
async def db_conn():
    conn = await asyncpg.connect("postgresql://localhost/test_db")
    async with conn.transaction():
        yield conn
        raise asyncpg.RollbackError   # Roll back any changes the test made

@pytest.mark.asyncio
async def test_user_creation(db_conn):
    await db_conn.execute("INSERT INTO users (name) VALUES ($1)", "Alice")
    row = await db_conn.fetchrow("SELECT name FROM users WHERE name = $1", "Alice")
    assert row["name"] == "Alice"

The transaction-rollback fixture pattern keeps the test database clean between tests.

For pytest async fixture patterns, see pytest fixture not found.

Combining with httpx for External APIs

For applications that hit external APIs and a database, share the event loop carefully:

async def fetch_and_store(url: str, conn):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        data = response.json()

    await conn.execute(
        "INSERT INTO events (data) VALUES ($1)",
        json.dumps(data),
    )

For httpx-specific connection pool patterns, see httpx not working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles