Skip to content

Fix: PGMQ Not Working — Extension Install, Visibility Timeout, Long Polling, and Archive vs Delete

FixDevs ·

Quick Answer

How to fix PGMQ Postgres message queue errors — extension not installed, queue creation, send/read/delete/archive, visibility timeout (vt), long polling, partitioned queues, and Python/Node client setup.

The Error

You try to use PGMQ functions and Postgres complains:

ERROR: schema "pgmq" does not exist

Or messages disappear after pgmq.read:

SELECT * FROM pgmq.read('my_queue', 30, 10);
-- Returns 5 messages.

SELECT * FROM pgmq.read('my_queue', 30, 10);
-- Returns the same 5 messages — should be different (or queue empty).

Or the visibility timeout doesn’t release:

-- Worker A reads with vt=300:
SELECT * FROM pgmq.read('my_queue', 300, 1);

-- Worker A crashes without ack.
-- 5 minutes later, worker B reads — still nothing!

Or long polling returns immediately even though no messages exist:

SELECT * FROM pgmq.read_with_poll('my_queue', 30, 10, 5);
-- Returns nothing after 0ms.

Why This Happens

PGMQ (Postgres Message Queue) is a Postgres extension by Tembo that adds queue semantics to Postgres. It’s like SQS but built on tables you already have.

Most issues map to:

  • Extension not installed. PGMQ requires CREATE EXTENSION pgmq. Some hosted Postgres (Supabase, Neon) ship it; others need manual install or aren’t compatible.
  • read vs pop. pgmq.read returns messages and sets a visibility timeout — they’re invisible to others for that period but stay in the queue until acked. pgmq.pop removes them immediately. Mixing these up causes either lost messages or duplicate processing.
  • Visibility timeout (vt) is per-read. After vt seconds, the message becomes visible to other readers — if you haven’t acked, it’s redelivered. If your vt is too long for a crashed worker, redelivery is slow.
  • Long polling needs read_with_poll. Regular read returns immediately; read_with_poll waits up to N seconds for messages.

Fix 1: Install the Extension

CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;

CASCADE installs the pgmq_meta schema and required helpers.

For Docker:

FROM postgres:16
RUN apt-get update && apt-get install -y postgresql-16-pgmq
# Restart and CREATE EXTENSION in your init script.

Or use the Tembo-provided image:

docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=postgres quay.io/tembo/pg17-pgmq:latest

For Supabase:

-- In SQL Editor:
CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;

Supabase ships PGMQ — it’s just not enabled by default.

For Neon / RDS / Cloud SQL: check if PGMQ is available in your version. RDS and Aurora Postgres haven’t shipped PGMQ as of writing — you’d need a separate Postgres instance or wait for support.

Pro Tip: Use the latest PGMQ version. The extension is rapidly evolving; older versions had different function signatures (pgmq.send vs older pgmq_send).

Fix 2: Create a Queue

SELECT pgmq.create('my_queue');

For unlogged queues (faster, but lost on crash):

SELECT pgmq.create_unlogged('my_queue');

For partitioned queues (better for high volume):

SELECT pgmq.create_partitioned('my_queue', '1 day');
-- Partitions by day. PGMQ rotates old partitions.

List existing queues:

SELECT * FROM pgmq.list_queues();

Drop a queue:

SELECT pgmq.drop_queue('my_queue');

Common Mistake: Treating queues like tables. PGMQ stores messages in tables under the pgmq schema, but you should always use pgmq.* functions — not raw SELECT/INSERT/UPDATE.

Fix 3: Send Messages

-- Single message:
SELECT pgmq.send('my_queue', '{"task": "send_email", "to": "[email protected]"}'::jsonb);
-- Returns the message ID.

-- With delay (in seconds):
SELECT pgmq.send('my_queue', '{"task": "..."}'::jsonb, 60);
-- Visible after 60 seconds.

-- Batch send:
SELECT pgmq.send_batch('my_queue', ARRAY[
  '{"id": 1}'::jsonb,
  '{"id": 2}'::jsonb,
  '{"id": 3}'::jsonb
]);
-- Returns array of message IDs.

For programmatic usage from Python (with psycopg):

import psycopg
import json

with psycopg.connect("postgresql://...") as conn:
    with conn.cursor() as cur:
        cur.execute(
            "SELECT pgmq.send(%s, %s::jsonb)",
            ("my_queue", json.dumps({"task": "send_email", "to": "[email protected]"}))
        )
        msg_id = cur.fetchone()[0]
        conn.commit()
        print("sent:", msg_id)

For Node (pg):

import pg from "pg";

const client = new pg.Client({ connectionString: process.env.DATABASE_URL });
await client.connect();

const result = await client.query(
  "SELECT pgmq.send($1, $2::jsonb)",
  ["my_queue", JSON.stringify({ task: "send_email" })],
);
console.log("sent:", result.rows[0].send);

Pro Tip: Use jsonb for structured payloads. PGMQ supports text/bytea too, but jsonb gives you Postgres-native indexing and queries on the message contents.

Fix 4: Read and Acknowledge

The basic consumer loop:

-- Read up to 10 messages with 30-second visibility timeout:
SELECT * FROM pgmq.read('my_queue', 30, 10);
-- Returns: msg_id, read_ct, enqueued_at, vt, message

After processing, delete (or archive):

-- Permanently remove (lighter, faster):
SELECT pgmq.delete('my_queue', 12345);  -- msg_id

-- Or archive (keeps in pgmq.<queue>_archive for audit):
SELECT pgmq.archive('my_queue', 12345);

-- Batch:
SELECT pgmq.delete('my_queue', ARRAY[1, 2, 3]);
SELECT pgmq.archive('my_queue', ARRAY[1, 2, 3]);

In Python:

def consume():
    with psycopg.connect("...") as conn:
        with conn.cursor() as cur:
            cur.execute(
                "SELECT msg_id, message FROM pgmq.read(%s, %s, %s)",
                ("my_queue", 30, 10),
            )
            messages = cur.fetchall()

            for msg_id, payload in messages:
                try:
                    process(payload)
                    cur.execute(
                        "SELECT pgmq.delete(%s, %s)",
                        ("my_queue", msg_id),
                    )
                    conn.commit()
                except Exception as e:
                    print(f"Failed {msg_id}: {e}")
                    # Don't ack — vt will release, retry later.

Common Mistake: Forgetting to delete/archive after processing. Messages stay in the queue, vt expires, redelivered. Either ack or let vt redelivery handle retries.

Fix 5: Visibility Timeout — Pick Wisely

The vt parameter to read is how long messages stay invisible to other readers:

  • Too short: message redelivered while you’re still processing → duplicate work.
  • Too long: crashed worker’s messages take a while to redeliver → latency spike.

Rule of thumb: vt = 2-3x your typical processing time. If processing takes 5s on average, set vt = 30s.

For long-running tasks where you can extend:

-- Extend the vt for a specific message:
SELECT pgmq.set_vt('my_queue', 12345, 60);
-- Now this message is hidden for another 60 seconds.

In your consumer:

async def process_long_task(msg_id, payload):
    # Start a background task to extend vt:
    async def extend():
        while not done:
            await asyncio.sleep(20)
            extend_vt(msg_id, 30)
    
    extend_task = asyncio.create_task(extend())
    try:
        await actually_process(payload)
        delete(msg_id)
    finally:
        done = True
        await extend_task

This pattern lets you process for arbitrarily long without redelivery.

Pro Tip: For tasks that may take minutes to hours, periodic vt extension is better than setting an initial vt of an hour. If the worker crashes early, the message redelivers sooner.

Fix 6: Long Polling With read_with_poll

read returns immediately even if the queue is empty — your worker loops tight and burns CPU. Use read_with_poll:

-- Wait up to 5 seconds for at least one message:
SELECT * FROM pgmq.read_with_poll('my_queue', 30, 10, 5);
-- vt=30, qty=10, poll_timeout=5

If a message arrives during the 5-second window, returns immediately. Otherwise returns empty after 5 seconds.

This dramatically reduces wasted SELECT queries on quiet queues.

For Python consumer:

while True:
    cur.execute(
        "SELECT msg_id, message FROM pgmq.read_with_poll(%s, %s, %s, %s)",
        ("my_queue", 30, 10, 5),
    )
    messages = cur.fetchall()
    if not messages:
        continue  # Try again
    process(messages)

The blocking happens inside Postgres. Your connection stays open during the wait.

Common Mistake: Setting poll_timeout too high (e.g. 60s). Connection idle timeouts (Postgres idle_in_transaction_session_timeout, RDS Proxy timeout) may kill the connection. Keep poll_timeout to ≤ 30s.

Fix 7: Archive vs Delete

PGMQ supports two completion paths:

  • delete — permanently remove the message from the queue. Fastest.
  • archive — move the message to a pgmq.<queue>_archive table. Slower (extra write) but you have audit/replay history.
-- See archived messages:
SELECT * FROM pgmq.a_my_queue;  -- or pgmq.<queue>_archive in some versions

For audit-heavy workloads, archive. For high-volume fire-and-forget, delete.

To purge old archived messages:

DELETE FROM pgmq.a_my_queue WHERE archived_at < now() - interval '30 days';

Run periodically — archives grow unbounded otherwise.

Pro Tip: For “exactly-once delivery” semantics, archive plus a unique index on a dedup column in your message:

ALTER TABLE pgmq.a_my_queue ADD COLUMN dedup_key TEXT UNIQUE GENERATED ALWAYS AS (message->>'dedup_key') STORED;

Now archived processing protects against duplicate processing — the unique violation tells you it’s already done.

Fix 8: Partitioned Queues for Volume

For queues that handle millions of messages per day:

SELECT pgmq.create_partitioned('events', '1 day');
-- One partition per day. Old partitions can be dropped en masse.

PGMQ manages partitions in the background:

-- See partitions:
SELECT * FROM pg_partition_tree('pgmq.q_events');

For very long-retention queues, drop old partitions to keep size manageable:

DROP TABLE pgmq.q_events_p2024_01;  -- January 2024 partition

This is O(1) — much faster than DELETE WHERE enqueued_at < ....

For low-volume queues, regular create() (non-partitioned) is fine. Partitioning adds metadata overhead — only worth it for ~100K+ messages per partition.

Common Mistake: Partitioning a low-volume queue. Each partition adds catalog rows; many small partitions actively slow queries. Use partitioning for big queues only.

Still Not Working?

A few less-obvious failures:

  • function pgmq.send(text, jsonb) does not exist. Wrong PGMQ version. Older versions had pgmq_send; newer use pgmq.send. Check \df pgmq.* in psql.
  • Messages with read_ct > 1 repeatedly. You’re not deleting/archiving after processing. vt expires and redelivers.
  • maximum_connections hit by many consumers. Each worker holds a connection during long-polling. Pool with PgBouncer or RDS Proxy.
  • Slow reads on big queues. Add indexes if missing — PGMQ’s defaults work, but if you customize the schema, indexes may be needed on (vt, msg_id).
  • Schema changes break clients. Hardcoded SELECT * from pgmq.q_<queue> fragile. Use pgmq.read() exclusively.
  • VACUUM never catches up. High-throughput queue with many delete operations creates dead tuples. Tune autovacuum for the queue table:
ALTER TABLE pgmq.q_my_queue SET (
  autovacuum_vacuum_scale_factor = 0.01,
  autovacuum_vacuum_threshold = 100
);
  • PGMQ functions in transactions don’t commit until the outer transaction does. Wrap consumer ops in their own transaction, not the whole batch.
  • Connection pool routing reads to replicas. PGMQ writes (send / delete / archive) need the primary. Read-only replicas can’t enqueue. Route correctly.

For related messaging and Postgres issues, see Redis streams not working, AWS SQS not working, Cloudflare Queues not working, and Postgres slow query.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles