Skip to content

Fix: psycopg Not Working — psycopg2 to psycopg3 Migration, Connection Pool, and Async Errors

FixDevs ·

Quick Answer

How to fix psycopg errors — psycopg2 to psycopg 3 import migration, connection pool setup, row factory tuple vs dict, COPY protocol changes, async psycopg pool, server-side cursor confusion, and binary mode performance.

The Error

You upgrade and old psycopg2 imports break:

import psycopg2
from psycopg2.extras import RealDictCursor   # Different in psycopg 3

Or you migrate to psycopg 3 and the API has changed:

import psycopg
conn = psycopg.connect("postgresql://...")
cur = conn.cursor()
cur.execute("SELECT * FROM users")
rows = cur.fetchall()
# Returns list of tuples — used to be RealDictRow in psycopg2 with cursor_factory

Or your async code fails because you used the wrong import:

import psycopg
conn = await psycopg.connect("postgresql://...")
# AttributeError: object NoneType has no attribute __aenter__

Or the pool exhausts under load:

psycopg_pool.PoolTimeout: couldn't get a connection after 30.0 sec

Or COPY syntax from psycopg2 no longer works:

cur.copy_from(file, "users")   # AttributeError in psycopg 3

psycopg is the most widely-used PostgreSQL driver for Python. psycopg2 has been the standard since 2007; psycopg 3 (released October 2021) is a complete rewrite with native async support, better performance, and a cleaner API. Many tutorials still use psycopg2, and the migration has specific patterns developers need to learn. This guide covers both psycopg2 quirks and the v2 → v3 migration.

Why This Happens

psycopg2 and psycopg 3 are different packages — import psycopg2 and import psycopg resolve to different libraries. They co-exist, so you can install both. The APIs overlap substantially but differ on row formats, async support, connection pools, and COPY semantics.

psycopg 3 introduced first-class async support with AsyncConnection — distinct from the sync Connection. Mixing them produces confusing errors because the method signatures look identical until you await.

Fix 1: psycopg2 vs psycopg 3 — Which to Use

psycopg2 (mature, sync-only):

pip install psycopg2-binary   # Pre-built wheels (recommended for dev)
# or
pip install psycopg2           # Build from source (requires PostgreSQL dev headers)
import psycopg2
from psycopg2.extras import RealDictCursor

conn = psycopg2.connect("postgresql://user:pass@localhost/mydb")
cur = conn.cursor(cursor_factory=RealDictCursor)
cur.execute("SELECT id, name FROM users")
rows = cur.fetchall()
# [{"id": 1, "name": "Alice"}, ...]

psycopg 3 (modern, sync + async):

pip install "psycopg[binary]"        # Pre-built (recommended)
# or
pip install "psycopg[binary,pool]"   # With connection pool
import psycopg
from psycopg.rows import dict_row

with psycopg.connect("postgresql://user:pass@localhost/mydb") as conn:
    with conn.cursor(row_factory=dict_row) as cur:
        cur.execute("SELECT id, name FROM users")
        rows = cur.fetchall()
        # [{"id": 1, "name": "Alice"}, ...]

Use psycopg 3 for new projects. Faster, async support, better type coverage, actively developed. psycopg2 is in maintenance mode — fine for existing apps, but new development should use psycopg 3.

Common Mistake: Installing both psycopg2 and psycopg and then mixing imports across files. Pick one. Mixing creates connection pool incompatibility, doubles your dependency footprint, and confuses static analysis tools that look at import patterns.

Fix 2: Migration from psycopg2 to psycopg 3

Most code translates with simple search/replace:

# OLD (psycopg2)
import psycopg2
from psycopg2.extras import RealDictCursor
import psycopg2.errors

conn = psycopg2.connect("postgresql://...")
try:
    cur = conn.cursor(cursor_factory=RealDictCursor)
    cur.execute("SELECT * FROM users WHERE id = %s", (1,))
    rows = cur.fetchall()
except psycopg2.errors.UniqueViolation:
    ...
finally:
    cur.close()
    conn.close()

# NEW (psycopg 3)
import psycopg
from psycopg.rows import dict_row
import psycopg.errors

with psycopg.connect("postgresql://...") as conn:
    with conn.cursor(row_factory=dict_row) as cur:
        try:
            cur.execute("SELECT * FROM users WHERE id = %s", (1,))
            rows = cur.fetchall()
        except psycopg.errors.UniqueViolation:
            ...

Key changes:

psycopg2psycopg 3
from psycopg2.extras import RealDictCursorfrom psycopg.rows import dict_row
cursor(cursor_factory=RealDictCursor)cursor(row_factory=dict_row)
psycopg2.connect(...)psycopg.connect(...)
psycopg2.errors.XYZpsycopg.errors.XYZ (mostly same names)
cur.copy_from(file, "table")with cur.copy("COPY table FROM STDIN") as copy:
cur.copy_to(file, "table")with cur.copy("COPY table TO STDOUT") as copy:
cur.mogrify(sql, params)cur.mogrify(sql, params) (same)

Row factories in psycopg 3:

from psycopg.rows import tuple_row, dict_row, namedtuple_row, class_row

# Tuple (default)
with conn.cursor() as cur:
    cur.execute("SELECT id, name FROM users")
    rows = cur.fetchall()   # [(1, "Alice"), (2, "Bob")]

# Dict
with conn.cursor(row_factory=dict_row) as cur:
    cur.execute("SELECT id, name FROM users")
    rows = cur.fetchall()   # [{"id": 1, "name": "Alice"}, ...]

# Namedtuple
with conn.cursor(row_factory=namedtuple_row) as cur:
    cur.execute("SELECT id, name FROM users")
    for row in cur:
        print(row.id, row.name)

# Custom class
from dataclasses import dataclass

@dataclass
class User:
    id: int
    name: str

with conn.cursor(row_factory=class_row(User)) as cur:
    cur.execute("SELECT id, name FROM users")
    users = cur.fetchall()   # List[User]

For SQLAlchemy ORM patterns that use psycopg as the driver, see SQLAlchemy not working.

Fix 3: Connection Pools

pip install "psycopg[pool]"

Sync pool:

from psycopg_pool import ConnectionPool

pool = ConnectionPool(
    "postgresql://user:pass@localhost/mydb",
    min_size=5,
    max_size=20,
    timeout=30,          # Acquire timeout
    max_idle=600,         # Close idle connections after 10 min
    max_lifetime=3600,    # Recycle connections after 1 hour
)

with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users")
        rows = cur.fetchall()

pool.close()

Async pool:

from psycopg_pool import AsyncConnectionPool
import asyncio

async def main():
    async with AsyncConnectionPool(
        "postgresql://user:pass@localhost/mydb",
        min_size=5,
        max_size=20,
    ) as pool:
        async with pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT * FROM users")
                rows = await cur.fetchall()

asyncio.run(main())

Pool sizing — same as asyncpg’s rule of thumb: max_size = 2 × DB CPU cores. PostgreSQL max_connections (default 100) caps total connections across all apps.

Pro Tip: Open the pool once at app startup, not per request. For FastAPI:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from psycopg_pool import AsyncConnectionPool

@asynccontextmanager
async def lifespan(app: FastAPI):
    app.state.db_pool = AsyncConnectionPool(
        "postgresql://...",
        min_size=5, max_size=20,
        open=False,
    )
    await app.state.db_pool.open()
    yield
    await app.state.db_pool.close()

app = FastAPI(lifespan=lifespan)

@app.get("/users")
async def list_users():
    async with app.state.db_pool.connection() as conn:
        async with conn.cursor(row_factory=dict_row) as cur:
            await cur.execute("SELECT id, name FROM users")
            return await cur.fetchall()

open=False lets the pool be constructed before the event loop is running; await pool.open() initializes it at the right time.

Fix 4: Sync vs Async API

# Sync
import psycopg

with psycopg.connect("postgresql://...") as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT 1")
        result = cur.fetchone()

# Async
import psycopg
import asyncio

async def main():
    async with await psycopg.AsyncConnection.connect("postgresql://...") as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 1")
            result = await cur.fetchone()

asyncio.run(main())

Critical syntax difference for AsyncConnection:

# WRONG
async with psycopg.AsyncConnection.connect(...) as conn:   # No await
    ...

# CORRECT
async with await psycopg.AsyncConnection.connect(...) as conn:   # Note the double await
    ...

The connect() method returns a coroutine that resolves to an async context manager — you need both await (to get the connection) and async with (to manage its lifecycle).

Common Mistake: Using psycopg.connect() in async code and not awaiting. The sync connect returns immediately but blocks the event loop while doing the actual connection. Always use AsyncConnection.connect() (with awaits) in async code.

For asyncpg as a faster alternative to psycopg 3’s async support, see asyncpg not working.

Fix 5: COPY for Bulk Operations

PostgreSQL’s COPY is the fastest way to load bulk data. psycopg 3 redesigned the API:

# psycopg2 (OLD)
with open("data.csv") as f:
    cur.copy_from(f, "users", sep=",")
    cur.copy_expert("COPY users FROM STDIN WITH CSV HEADER", f)

# psycopg 3 (NEW)
with cur.copy("COPY users FROM STDIN WITH (FORMAT CSV, HEADER)") as copy:
    with open("data.csv", "rb") as f:
        for line in f:
            copy.write(line)

Or use the write_row method for structured data:

import psycopg

with psycopg.connect("postgresql://...") as conn:
    with conn.cursor() as cur:
        with cur.copy("COPY users (id, name, email) FROM STDIN") as copy:
            for row in records:
                copy.write_row(row)   # Pass a tuple per row

write_row handles escaping automatically — no need to format CSV manually.

Read with COPY:

with cur.copy("COPY users TO STDOUT WITH CSV HEADER") as copy:
    with open("users.csv", "wb") as f:
        for data in copy:
            f.write(data)

Binary mode for max speed:

with cur.copy("COPY users FROM STDIN WITH BINARY") as copy:
    copy.set_types(["int4", "text", "text"])
    for record in records:
        copy.write_row(record)

Binary mode skips text-format conversion overhead — significantly faster for large inserts but harder to debug. Use text mode while developing, binary mode for production hot paths.

Fix 6: Server-Side Cursors

# Default — client-side cursor (loads all results into memory)
with conn.cursor() as cur:
    cur.execute("SELECT * FROM huge_table")
    rows = cur.fetchall()   # Loads everything

# Server-side cursor (named) — streams results
with conn.cursor(name="streaming_cur") as cur:
    cur.execute("SELECT * FROM huge_table")
    for row in cur:
        process(row)   # Fetches in batches as you iterate

Named cursors are server-side — PostgreSQL keeps the result set on the server and streams rows on demand. Essential for tables with millions of rows that won’t fit in client memory.

itersize parameter for batched fetches:

with conn.cursor(name="cur") as cur:
    cur.itersize = 1000   # Fetch 1000 rows per network round trip
    cur.execute("SELECT * FROM huge_table")
    for row in cur:
        process(row)

Common Mistake: Using a server-side cursor outside a transaction. PostgreSQL named cursors only exist within a transaction — autocommit mode (no transaction) makes the cursor close immediately. Wrap server-side cursor work in with conn: or with conn.transaction():.

Fix 7: Type Adaptation and Custom Types

psycopg 3 has built-in support for many PostgreSQL types beyond the basics:

import psycopg
from psycopg.types.json import Jsonb
import json

conn = psycopg.connect("postgresql://...")

# JSON / JSONB — passed as dict, returned as dict
conn.execute("INSERT INTO events (data) VALUES (%s)", [Jsonb({"key": "value"})])
row = conn.execute("SELECT data FROM events WHERE id = 1").fetchone()
print(type(row[0]))   # dict

# UUID, date, datetime — returned as their Python equivalents
# (uuid.UUID, datetime.date, datetime.datetime)

# Arrays
conn.execute("INSERT INTO posts (tags) VALUES (%s)", [["python", "psycopg"]])
row = conn.execute("SELECT tags FROM posts WHERE id = 1").fetchone()
print(row[0])   # ['python', 'psycopg'] — Python list

Custom enum:

from enum import Enum

class Status(Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"

# Pass as string
conn.execute("INSERT INTO users (status) VALUES (%s)", [Status.ACTIVE.value])

# Or register adapter (advanced)
from psycopg.adapt import Loader, Dumper

class StatusDumper(Dumper):
    def dump(self, value):
        return value.value.encode("utf-8")

conn.adapters.register_dumper(Status, StatusDumper)

Pro Tip: For JSON columns, always use Jsonb (or Json) explicitly when inserting. Without the wrapper, psycopg sends the dict as Python’s repr — which isn’t valid JSON. Common silent bug; the row appears in the database with a malformed string instead of a parseable JSON value.

# WRONG — sends dict as Python repr, not JSON
conn.execute("INSERT INTO events (data) VALUES (%s)", [{"key": "value"}])
# Stored as: '{"key": "value"}' — actually fine for VARCHAR but wrong for JSONB

# CORRECT — explicit JSONB wrapper
from psycopg.types.json import Jsonb
conn.execute("INSERT INTO events (data) VALUES (%s)", [Jsonb({"key": "value"})])

Fix 8: Connection String and Authentication

# Various formats
conn = psycopg.connect("postgresql://user:pass@localhost:5432/mydb")
conn = psycopg.connect("postgres://user:pass@localhost:5432/mydb")
conn = psycopg.connect("host=localhost dbname=mydb user=postgres password=pass")
conn = psycopg.connect(
    host="localhost",
    dbname="mydb",
    user="postgres",
    password="pass",
    port=5432,
)

SSL:

conn = psycopg.connect(
    "postgresql://user:pass@host/db",
    sslmode="require",        # require, prefer, allow, disable, verify-ca, verify-full
    sslrootcert="/path/to/ca.pem",
)

Connection options:

conn = psycopg.connect(
    "postgresql://...",
    options="-c statement_timeout=30000",   # PG session config
    application_name="myapp",
    connect_timeout=10,
)

application_name shows in pg_stat_activity — invaluable for debugging which app is holding connections:

SELECT application_name, COUNT(*) FROM pg_stat_activity GROUP BY application_name;

Environment variables auto-read by psycopg (and libpq generally):

export PGHOST=localhost
export PGPORT=5432
export PGUSER=postgres
export PGPASSWORD=secret
export PGDATABASE=mydb

# Now this works without arguments
python -c "import psycopg; psycopg.connect()"

For PostgreSQL connection errors at the network/auth layer, see PostgreSQL connection refused.

Still Not Working?

psycopg2 vs psycopg 3 vs asyncpg

  • psycopg2 — Mature, sync-only. Maintenance mode.
  • psycopg 3 — Sync + async, broader type support, modern API. Choose for new projects.
  • asyncpg — Fastest async option, lower-level API. Choose for performance-critical async paths.

For most projects, psycopg 3 is the best balance. For performance-critical async services hitting PostgreSQL hard, asyncpg edges it out.

Transactions

# Implicit transaction with `with conn`
with psycopg.connect("...") as conn:
    with conn.cursor() as cur:
        cur.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
        cur.execute("INSERT INTO orders (user_id) VALUES (currval('users_id_seq'))")
# Auto-commits on exit (or rolls back on exception)

# Explicit transaction control
conn.autocommit = False   # Default is False — txn until commit/rollback

with conn.cursor() as cur:
    cur.execute("...")

conn.commit()
# Or
conn.rollback()

# Autocommit mode (each statement in its own txn)
conn.autocommit = True

For pgvector integration, see pgvector not working — uses psycopg as the standard driver.

Listen/Notify

import psycopg

with psycopg.connect("postgresql://...", autocommit=True) as conn:
    cur = conn.cursor()
    cur.execute("LISTEN my_channel")

    # Poll for notifications
    import select
    while True:
        ready = select.select([conn], [], [], 5)
        if not ready[0]:
            continue   # Timeout, loop again
        conn.poll()
        while conn.notifies:
            notify = conn.notifies.pop(0)
            print(f"Got: {notify.payload}")

Autocommit is required for LISTEN — notifications don’t deliver inside an open transaction.

Testing with Real PostgreSQL

import pytest
import psycopg

@pytest.fixture
def db_conn():
    conn = psycopg.connect("postgresql://localhost/test_db", autocommit=False)
    yield conn
    conn.rollback()
    conn.close()

def test_insert(db_conn):
    with db_conn.cursor() as cur:
        cur.execute("INSERT INTO users (name) VALUES (%s)", ["Test"])
        cur.execute("SELECT name FROM users WHERE name = %s", ["Test"])
        assert cur.fetchone()[0] == "Test"
# Each test rolls back, so DB stays clean

For pytest fixture patterns that integrate with database state, see pytest fixture not found.

Combining with Alembic

Alembic uses psycopg (2 or 3) as its driver. To run migrations with psycopg 3:

# alembic.ini
sqlalchemy.url = postgresql+psycopg://user:pass@localhost/mydb

For Alembic-specific migration errors, see Alembic not working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles