Fix: PGMQ Not Working — Extension Install, Visibility Timeout, Long Polling, and Archive vs Delete
Quick Answer
How to fix PGMQ Postgres message queue errors — extension not installed, queue creation, send/read/delete/archive, visibility timeout (vt), long polling, partitioned queues, and Python/Node client setup.
The Error
You try to use PGMQ functions and Postgres complains:
ERROR: schema "pgmq" does not existOr messages disappear after pgmq.read:
SELECT * FROM pgmq.read('my_queue', 30, 10);
-- Returns 5 messages.
SELECT * FROM pgmq.read('my_queue', 30, 10);
-- Returns the same 5 messages — should be different (or queue empty).Or the visibility timeout doesn’t release:
-- Worker A reads with vt=300:
SELECT * FROM pgmq.read('my_queue', 300, 1);
-- Worker A crashes without ack.
-- 5 minutes later, worker B reads — still nothing!Or long polling returns immediately even though no messages exist:
SELECT * FROM pgmq.read_with_poll('my_queue', 30, 10, 5);
-- Returns nothing after 0ms.Why This Happens
PGMQ (Postgres Message Queue) is a Postgres extension by Tembo that adds queue semantics to Postgres. It’s like SQS but built on tables you already have.
Most issues map to:
- Extension not installed. PGMQ requires
CREATE EXTENSION pgmq. Some hosted Postgres (Supabase, Neon) ship it; others need manual install or aren’t compatible. readvspop.pgmq.readreturns messages and sets a visibility timeout — they’re invisible to others for that period but stay in the queue until acked.pgmq.popremoves them immediately. Mixing these up causes either lost messages or duplicate processing.- Visibility timeout (vt) is per-read. After vt seconds, the message becomes visible to other readers — if you haven’t acked, it’s redelivered. If your vt is too long for a crashed worker, redelivery is slow.
- Long polling needs
read_with_poll. Regularreadreturns immediately;read_with_pollwaits up to N seconds for messages.
Fix 1: Install the Extension
CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;CASCADE installs the pgmq_meta schema and required helpers.
For Docker:
FROM postgres:16
RUN apt-get update && apt-get install -y postgresql-16-pgmq
# Restart and CREATE EXTENSION in your init script.Or use the Tembo-provided image:
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=postgres quay.io/tembo/pg17-pgmq:latestFor Supabase:
-- In SQL Editor:
CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;Supabase ships PGMQ — it’s just not enabled by default.
For Neon / RDS / Cloud SQL: check if PGMQ is available in your version. RDS and Aurora Postgres haven’t shipped PGMQ as of writing — you’d need a separate Postgres instance or wait for support.
Pro Tip: Use the latest PGMQ version. The extension is rapidly evolving; older versions had different function signatures (pgmq.send vs older pgmq_send).
Fix 2: Create a Queue
SELECT pgmq.create('my_queue');For unlogged queues (faster, but lost on crash):
SELECT pgmq.create_unlogged('my_queue');For partitioned queues (better for high volume):
SELECT pgmq.create_partitioned('my_queue', '1 day');
-- Partitions by day. PGMQ rotates old partitions.List existing queues:
SELECT * FROM pgmq.list_queues();Drop a queue:
SELECT pgmq.drop_queue('my_queue');Common Mistake: Treating queues like tables. PGMQ stores messages in tables under the pgmq schema, but you should always use pgmq.* functions — not raw SELECT/INSERT/UPDATE.
Fix 3: Send Messages
-- Single message:
SELECT pgmq.send('my_queue', '{"task": "send_email", "to": "[email protected]"}'::jsonb);
-- Returns the message ID.
-- With delay (in seconds):
SELECT pgmq.send('my_queue', '{"task": "..."}'::jsonb, 60);
-- Visible after 60 seconds.
-- Batch send:
SELECT pgmq.send_batch('my_queue', ARRAY[
'{"id": 1}'::jsonb,
'{"id": 2}'::jsonb,
'{"id": 3}'::jsonb
]);
-- Returns array of message IDs.For programmatic usage from Python (with psycopg):
import psycopg
import json
with psycopg.connect("postgresql://...") as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT pgmq.send(%s, %s::jsonb)",
("my_queue", json.dumps({"task": "send_email", "to": "[email protected]"}))
)
msg_id = cur.fetchone()[0]
conn.commit()
print("sent:", msg_id)For Node (pg):
import pg from "pg";
const client = new pg.Client({ connectionString: process.env.DATABASE_URL });
await client.connect();
const result = await client.query(
"SELECT pgmq.send($1, $2::jsonb)",
["my_queue", JSON.stringify({ task: "send_email" })],
);
console.log("sent:", result.rows[0].send);Pro Tip: Use jsonb for structured payloads. PGMQ supports text/bytea too, but jsonb gives you Postgres-native indexing and queries on the message contents.
Fix 4: Read and Acknowledge
The basic consumer loop:
-- Read up to 10 messages with 30-second visibility timeout:
SELECT * FROM pgmq.read('my_queue', 30, 10);
-- Returns: msg_id, read_ct, enqueued_at, vt, messageAfter processing, delete (or archive):
-- Permanently remove (lighter, faster):
SELECT pgmq.delete('my_queue', 12345); -- msg_id
-- Or archive (keeps in pgmq.<queue>_archive for audit):
SELECT pgmq.archive('my_queue', 12345);
-- Batch:
SELECT pgmq.delete('my_queue', ARRAY[1, 2, 3]);
SELECT pgmq.archive('my_queue', ARRAY[1, 2, 3]);In Python:
def consume():
with psycopg.connect("...") as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT msg_id, message FROM pgmq.read(%s, %s, %s)",
("my_queue", 30, 10),
)
messages = cur.fetchall()
for msg_id, payload in messages:
try:
process(payload)
cur.execute(
"SELECT pgmq.delete(%s, %s)",
("my_queue", msg_id),
)
conn.commit()
except Exception as e:
print(f"Failed {msg_id}: {e}")
# Don't ack — vt will release, retry later.Common Mistake: Forgetting to delete/archive after processing. Messages stay in the queue, vt expires, redelivered. Either ack or let vt redelivery handle retries.
Fix 5: Visibility Timeout — Pick Wisely
The vt parameter to read is how long messages stay invisible to other readers:
- Too short: message redelivered while you’re still processing → duplicate work.
- Too long: crashed worker’s messages take a while to redeliver → latency spike.
Rule of thumb: vt = 2-3x your typical processing time. If processing takes 5s on average, set vt = 30s.
For long-running tasks where you can extend:
-- Extend the vt for a specific message:
SELECT pgmq.set_vt('my_queue', 12345, 60);
-- Now this message is hidden for another 60 seconds.In your consumer:
async def process_long_task(msg_id, payload):
# Start a background task to extend vt:
async def extend():
while not done:
await asyncio.sleep(20)
extend_vt(msg_id, 30)
extend_task = asyncio.create_task(extend())
try:
await actually_process(payload)
delete(msg_id)
finally:
done = True
await extend_taskThis pattern lets you process for arbitrarily long without redelivery.
Pro Tip: For tasks that may take minutes to hours, periodic vt extension is better than setting an initial vt of an hour. If the worker crashes early, the message redelivers sooner.
Fix 6: Long Polling With read_with_poll
read returns immediately even if the queue is empty — your worker loops tight and burns CPU. Use read_with_poll:
-- Wait up to 5 seconds for at least one message:
SELECT * FROM pgmq.read_with_poll('my_queue', 30, 10, 5);
-- vt=30, qty=10, poll_timeout=5If a message arrives during the 5-second window, returns immediately. Otherwise returns empty after 5 seconds.
This dramatically reduces wasted SELECT queries on quiet queues.
For Python consumer:
while True:
cur.execute(
"SELECT msg_id, message FROM pgmq.read_with_poll(%s, %s, %s, %s)",
("my_queue", 30, 10, 5),
)
messages = cur.fetchall()
if not messages:
continue # Try again
process(messages)The blocking happens inside Postgres. Your connection stays open during the wait.
Common Mistake: Setting poll_timeout too high (e.g. 60s). Connection idle timeouts (Postgres idle_in_transaction_session_timeout, RDS Proxy timeout) may kill the connection. Keep poll_timeout to ≤ 30s.
Fix 7: Archive vs Delete
PGMQ supports two completion paths:
delete— permanently remove the message from the queue. Fastest.archive— move the message to apgmq.<queue>_archivetable. Slower (extra write) but you have audit/replay history.
-- See archived messages:
SELECT * FROM pgmq.a_my_queue; -- or pgmq.<queue>_archive in some versionsFor audit-heavy workloads, archive. For high-volume fire-and-forget, delete.
To purge old archived messages:
DELETE FROM pgmq.a_my_queue WHERE archived_at < now() - interval '30 days';Run periodically — archives grow unbounded otherwise.
Pro Tip: For “exactly-once delivery” semantics, archive plus a unique index on a dedup column in your message:
ALTER TABLE pgmq.a_my_queue ADD COLUMN dedup_key TEXT UNIQUE GENERATED ALWAYS AS (message->>'dedup_key') STORED;Now archived processing protects against duplicate processing — the unique violation tells you it’s already done.
Fix 8: Partitioned Queues for Volume
For queues that handle millions of messages per day:
SELECT pgmq.create_partitioned('events', '1 day');
-- One partition per day. Old partitions can be dropped en masse.PGMQ manages partitions in the background:
-- See partitions:
SELECT * FROM pg_partition_tree('pgmq.q_events');For very long-retention queues, drop old partitions to keep size manageable:
DROP TABLE pgmq.q_events_p2024_01; -- January 2024 partitionThis is O(1) — much faster than DELETE WHERE enqueued_at < ....
For low-volume queues, regular create() (non-partitioned) is fine. Partitioning adds metadata overhead — only worth it for ~100K+ messages per partition.
Common Mistake: Partitioning a low-volume queue. Each partition adds catalog rows; many small partitions actively slow queries. Use partitioning for big queues only.
Still Not Working?
A few less-obvious failures:
function pgmq.send(text, jsonb) does not exist. Wrong PGMQ version. Older versions hadpgmq_send; newer usepgmq.send. Check\df pgmq.*in psql.- Messages with
read_ct > 1repeatedly. You’re not deleting/archiving after processing. vt expires and redelivers. maximum_connectionshit by many consumers. Each worker holds a connection during long-polling. Pool with PgBouncer or RDS Proxy.- Slow reads on big queues. Add indexes if missing — PGMQ’s defaults work, but if you customize the schema, indexes may be needed on
(vt, msg_id). - Schema changes break clients. Hardcoded
SELECT *frompgmq.q_<queue>fragile. Usepgmq.read()exclusively. - VACUUM never catches up. High-throughput queue with many delete operations creates dead tuples. Tune autovacuum for the queue table:
ALTER TABLE pgmq.q_my_queue SET (
autovacuum_vacuum_scale_factor = 0.01,
autovacuum_vacuum_threshold = 100
);- PGMQ functions in transactions don’t commit until the outer transaction does. Wrap consumer ops in their own transaction, not the whole batch.
- Connection pool routing reads to replicas. PGMQ writes (send / delete / archive) need the primary. Read-only replicas can’t enqueue. Route correctly.
For related messaging and Postgres issues, see Redis streams not working, AWS SQS not working, Cloudflare Queues not working, and Postgres slow query.
Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.
Was this article helpful?
Related Articles
Fix: Supabase Realtime Not Working — RLS Filters, Channel Subscribe, Presence, and Broadcast
How to fix Supabase Realtime errors — postgres_changes subscription not firing, RLS blocking events, channel.subscribe callback timing, presence diff payloads, broadcast vs database events, auth refresh, and reconnection.
Fix: AWS RDS Proxy Not Working — Endpoint, IAM Auth, Connection Pinning, and Lambda VPC
How to fix AWS RDS Proxy errors — IAM authentication token mismatch, connection pinning blocking reuse, Lambda VPC routing, Secrets Manager rotation, max_connections, read/write splitter, and TLS requirement.
Fix: Cloudflare Queues Not Working — Producer Binding, Consumer Worker, Batching, and Dead Letter
How to fix Cloudflare Queues errors — producer queue.send not delivering, consumer not invoking, ack/retry/DLQ patterns, batch size limits, max_retries, content type pitfalls, and local dev with wrangler.
Fix: NATS Not Working — Connection Auth, JetStream Streams, Consumer Ack, and Subject Wildcards
How to fix NATS errors — no responders to request, JetStream stream not found, consumer redelivery loop, durable vs ephemeral consumers, subject wildcard mismatch, TLS auth setup, and KV bucket basics.