Skip to content

Fix: BullMQ Not Working — Jobs Not Processing, Workers Not Starting, or Redis Connection Failing

FixDevs · (Updated: )

Part of:  JavaScript & TypeScript Errors

Quick Answer

How to fix BullMQ issues — queue and worker setup, Redis connection, job scheduling, retry strategies, concurrency, rate limiting, event listeners, and dashboard monitoring.

The Problem

Jobs are added to the queue but never process:

import { Queue, Worker } from 'bullmq';

const queue = new Queue('email');
await queue.add('send', { to: '[email protected]', subject: 'Hello' });
// Job sits in the queue forever — worker never picks it up

Or the worker starts but immediately crashes:

Error: connect ECONNREFUSED 127.0.0.1:6379

Or jobs process but fail silently:

Job completed with status "failed" but no error message visible

Why This Happens

BullMQ is a job queue library built on Redis. It requires a running Redis instance and separate queue/worker processes:

  • Queue and Worker are separate conceptsQueue adds jobs, Worker processes them. Creating a Queue without a Worker means jobs pile up with nothing to process them. They can run in different processes or even different servers.
  • Redis must be running and accessible — BullMQ stores all job data in Redis. Without a Redis connection, neither queue nor worker can operate. The default connection is localhost:6379.
  • Workers must import the same queue namenew Worker('email', ...) only processes jobs from the queue named 'email'. A typo or different name means the worker listens on an empty queue.
  • Job processors must handle errors — if the processor function throws and you don’t listen for 'failed' events, errors vanish silently. BullMQ retries failed jobs by default (3 attempts), then marks them as permanently failed.

The deeper reason most production BullMQ setups break is version drift. Many tutorials online still target Bull v4 (the legacy bull package), which uses a different connection model and a different API surface. BullMQ is the rewritten successor — same authors, completely separate package on npm. If you copy-paste a bull example into a BullMQ project, the imports compile but Queue constructors silently behave differently around the Redis client lifecycle. Always confirm your package.json lists bullmq, not bull.

The other common trap is the Redis client. BullMQ requires maxRetriesPerRequest: null on the ioredis instance because workers use blocking commands (BRPOPLPUSH, BLMOVE) that conflict with ioredis’s default retry strategy. If you forget this option, the worker connection thrashes on every command and jobs appear to disappear. Managed Redis providers like Upstash, Redis Cloud, and DigitalOcean Managed Redis also require TLS and a specific port — the default localhost:6379 won’t work in production. See Fix: Upstash Not Working for the matching client config.

Version History: From Bull v4 to BullMQ v5

Understanding which release introduced which feature saves hours of debugging:

  • Bull v4 (2014–2022, legacy) — the original package, still on npm as bull. Uses ES5-style classes, single processor file per worker, no Flow Producer, no scheduler primitives beyond repeat. Still maintained for bug fixes but receives no new features.
  • BullMQ v1 (Oct 2020) — first public release of the rewrite. TypeScript-native, separate Queue / Worker / QueueEvents classes, removed the bound processor concept in favor of plain async functions.
  • BullMQ v2 (Mar 2022) — introduced repeat jobs with cron expressions, removed legacy Queue.process() pattern entirely. The concurrency option moved from queue to worker.
  • BullMQ v3 (Jul 2022) — added the Flow Producer for parent-child job graphs. This is the version that made BullMQ a real competitor to Temporal for simple DAG workflows. Also introduced removeOnComplete / removeOnFail with age and count options.
  • BullMQ v4 (Mar 2023) — added Job Schedulers (upsertJobScheduler) replacing the older repeat API, plus dynamic rate limiting per worker. Worker getNextJob performance improved significantly.
  • BullMQ v5 (Jan 2024)breaking changes. Drops Node.js 16 support, requires Redis 5.0+ (recommends Redis 7+), removes deprecated JobsOptions.repeat. The Job.data type signature became stricter. Most production breakages on upgrade come from v4-style repeat jobs that need migration to upsertJobScheduler.
  • BullMQ Pro (commercial, ongoing) — adds observation API, batch processing, and group rate limiting. Not required for most use cases but called out here so you don’t confuse Pro docs with OSS docs.

Adjacent libraries to know: RSMQ (Redis Simple Message Queue) is the minimalist alternative without retry or scheduling. Bee-Queue prioritizes raw throughput over features. Valkey is the Redis fork created after Redis switched to a source-available license in March 2024 — BullMQ v5.4+ supports Valkey transparently because the wire protocol is identical. If your platform standardizes on Valkey instead of Redis, no code change is required as long as your ioredis version is recent.

Fix 1: Basic Queue and Worker Setup

npm install bullmq ioredis
// lib/queue.ts — shared connection and queue definition
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';

// Shared Redis connection
const connection = new IORedis({
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  maxRetriesPerRequest: null,  // Required for BullMQ
});

// Define queues
export const emailQueue = new Queue('email', { connection });
export const reportQueue = new Queue('report', { connection });

// Add a job
export async function sendEmail(to: string, subject: string, html: string) {
  return emailQueue.add('send-email', { to, subject, html }, {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000,  // 2s, 4s, 8s
    },
    removeOnComplete: { age: 3600 },  // Remove completed jobs after 1 hour
    removeOnFail: { age: 86400 },      // Keep failed jobs for 24 hours
  });
}

// Schedule a delayed job
export async function scheduleReminder(userId: string, message: string, delay: number) {
  return emailQueue.add('reminder', { userId, message }, {
    delay,  // milliseconds
  });
}

// Schedule recurring job (cron)
export async function setupRecurringJobs() {
  await reportQueue.upsertJobScheduler('daily-report', {
    pattern: '0 9 * * *',  // Every day at 9 AM
  }, {
    name: 'generate-daily-report',
    data: { type: 'daily' },
  });

  await reportQueue.upsertJobScheduler('weekly-cleanup', {
    pattern: '0 3 * * 0',  // Every Sunday at 3 AM
  }, {
    name: 'cleanup-old-data',
    data: { type: 'cleanup' },
  });
}
// workers/email.worker.ts — processes email jobs
import { Worker, Job } from 'bullmq';
import IORedis from 'ioredis';

const connection = new IORedis({
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
  password: process.env.REDIS_PASSWORD,
  maxRetriesPerRequest: null,
});

const emailWorker = new Worker(
  'email',
  async (job: Job) => {
    // Job processing logic
    switch (job.name) {
      case 'send-email': {
        const { to, subject, html } = job.data;
        console.log(`Sending email to ${to}: ${subject}`);

        await sendEmailViaProvider(to, subject, html);

        // Return value is stored as job result
        return { sent: true, to, timestamp: new Date().toISOString() };
      }
      case 'reminder': {
        const { userId, message } = job.data;
        await sendPushNotification(userId, message);
        return { notified: true };
      }
      default:
        throw new Error(`Unknown job name: ${job.name}`);
    }
  },
  {
    connection,
    concurrency: 5,  // Process up to 5 jobs simultaneously
    limiter: {
      max: 10,       // Max 10 jobs
      duration: 1000, // Per 1 second (rate limiting)
    },
  },
);

// Event listeners — critical for monitoring
emailWorker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed:`, result);
});

emailWorker.on('failed', (job, error) => {
  console.error(`Job ${job?.id} failed:`, error.message);
  // Alert on repeated failures
  if (job && job.attemptsMade >= job.opts.attempts!) {
    alertOncall(`Email job permanently failed: ${error.message}`);
  }
});

emailWorker.on('error', (error) => {
  console.error('Worker error:', error);
});

emailWorker.on('stalled', (jobId) => {
  console.warn(`Job ${jobId} stalled — will be reprocessed`);
});

console.log('Email worker started');

Fix 2: Job Progress and Events

// Worker with progress reporting
const videoWorker = new Worker(
  'video',
  async (job: Job) => {
    const { videoId, format } = job.data;
    const totalSteps = 4;

    // Step 1: Download
    await job.updateProgress(25);
    await job.log('Downloading video...');
    const file = await downloadVideo(videoId);

    // Step 2: Transcode
    await job.updateProgress(50);
    await job.log(`Transcoding to ${format}...`);
    const transcoded = await transcodeVideo(file, format);

    // Step 3: Upload
    await job.updateProgress(75);
    await job.log('Uploading result...');
    const url = await uploadToStorage(transcoded);

    // Step 4: Complete
    await job.updateProgress(100);
    await job.log('Complete!');

    return { url, format };
  },
  { connection, concurrency: 2 },
);

// Listen for progress on the client side
import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('video', { connection });

queueEvents.on('progress', ({ jobId, data }) => {
  console.log(`Job ${jobId} progress: ${data}%`);
});

queueEvents.on('completed', ({ jobId, returnvalue }) => {
  console.log(`Job ${jobId} completed:`, returnvalue);
});

// API route — check job status
export async function GET(req: Request) {
  const jobId = new URL(req.url).searchParams.get('id');
  if (!jobId) return Response.json({ error: 'Missing id' }, { status: 400 });

  const job = await videoQueue.getJob(jobId);
  if (!job) return Response.json({ error: 'Not found' }, { status: 404 });

  const state = await job.getState();
  const progress = job.progress;
  const logs = await videoQueue.getJobLogs(jobId);

  return Response.json({ state, progress, logs: logs.logs, result: job.returnvalue });
}

Fix 3: Retry Strategies

// Per-job retry configuration
await queue.add('critical-task', data, {
  attempts: 5,
  backoff: {
    type: 'exponential',
    delay: 1000,  // 1s, 2s, 4s, 8s, 16s
  },
});

// Fixed delay retries
await queue.add('api-call', data, {
  attempts: 3,
  backoff: {
    type: 'fixed',
    delay: 5000,  // Always wait 5 seconds between retries
  },
});

// Custom backoff strategy
await queue.add('smart-retry', data, {
  attempts: 4,
  backoff: {
    type: 'custom',
  },
});

// In worker — custom backoff logic
const worker = new Worker('smart-retry', processor, {
  connection,
  settings: {
    backoffStrategy: (attemptsMade: number, type: string, err: Error) => {
      // Rate limited — wait longer
      if (err.message.includes('429')) {
        return 60000;  // 1 minute
      }
      // Server error — exponential
      if (err.message.includes('500')) {
        return Math.min(2 ** attemptsMade * 1000, 30000);
      }
      // Default
      return 2000;
    },
  },
});

Fix 4: Job Priorities and Flows

// Priority queues — lower number = higher priority
await queue.add('urgent', data, { priority: 1 });   // Processed first
await queue.add('normal', data, { priority: 5 });
await queue.add('low', data, { priority: 10 });      // Processed last

// Job flows — parent-child dependencies (BullMQ v3+)
import { FlowProducer } from 'bullmq';

const flow = new FlowProducer({ connection });

// Parent job waits for all children to complete
await flow.add({
  name: 'generate-report',
  queueName: 'report',
  data: { reportId: 'monthly-2026-03' },
  children: [
    {
      name: 'fetch-sales-data',
      queueName: 'data',
      data: { source: 'sales', month: '2026-03' },
    },
    {
      name: 'fetch-user-data',
      queueName: 'data',
      data: { source: 'users', month: '2026-03' },
    },
    {
      name: 'fetch-analytics',
      queueName: 'data',
      data: { source: 'analytics', month: '2026-03' },
    },
  ],
});

// The 'generate-report' job only runs after all 3 children complete
// Access children results in the parent worker:
const reportWorker = new Worker('report', async (job) => {
  const childrenValues = await job.getChildrenValues();
  // childrenValues = { 'data:fetch-sales-data:123': {...}, ... }

  return generateReport(childrenValues);
}, { connection });

Fix 5: Dashboard Monitoring

npm install @bull-board/api @bull-board/express
# Or for Next.js: npm install @bull-board/api @bull-board/next
// Express dashboard
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import express from 'express';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(reportQueue),
    new BullMQAdapter(videoQueue),
  ],
  serverAdapter,
});

const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3001, () => console.log('Bull Board on http://localhost:3001/admin/queues'));

Fix 6: Graceful Shutdown

// Proper cleanup on process exit
async function shutdown() {
  console.log('Shutting down workers...');

  // Close workers — wait for current jobs to finish
  await emailWorker.close();
  await reportWorker.close();

  // Close queues
  await emailQueue.close();
  await reportQueue.close();

  // Close Redis connection
  await connection.quit();

  console.log('Shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

Still Not Working?

Jobs added but never process — the worker isn’t running or uses a different queue name. Verify new Worker('email', ...) matches new Queue('email') exactly. Also check that the worker process is actually started — it’s a separate script, not automatically spawned by the queue.

“ECONNREFUSED 127.0.0.1:6379” — Redis isn’t running. Start Redis: redis-server (local) or configure the connection for your Redis provider (Upstash, Redis Cloud, etc.). Set maxRetriesPerRequest: null in the IORedis config — BullMQ requires this.

Jobs fail with no error message — add worker.on('failed', (job, error) => console.error(error)) to see failure reasons. Also check worker.on('error', ...) for connection-level errors. Without event listeners, failures are silent.

Stalled jobs reprocess unexpectedly — a job is “stalled” when the worker stops sending heartbeats (crash, timeout). BullMQ automatically retries stalled jobs. Increase stalledInterval if your jobs take a long time, or ensure long-running jobs call job.updateProgress() periodically to keep the heartbeat alive.

Repeat jobs broke after upgrading to BullMQ v5 — v5 removed the legacy JobsOptions.repeat API. Old code that did queue.add('cron', data, { repeat: { pattern: '...' } }) silently no-ops. Migrate every recurring job to queue.upsertJobScheduler('scheduler-id', { pattern: '...' }, { name, data }). The scheduler ID becomes the deduplication key — call it again to update the schedule, call queue.removeJobScheduler(id) to stop it.

Worker connection drops every few seconds on managed Redis — Upstash, Redis Cloud, and Valkey-based providers terminate idle connections aggressively. Set enableReadyCheck: false and pass an explicit connectTimeout of at least 10000ms on the ioredis instance. For Upstash specifically, use the rediss:// TLS URL, not redis://.

Jobs duplicate on horizontal scale — if you run multiple worker processes pointing at the same queue, BullMQ uses Redis-level locking to ensure each job is picked up once. Duplication usually means workers share a queue name but point at different Redis databases. Verify every worker connects to the same host:port:db triple, and check that you aren’t accidentally creating per-pod prefixes via the prefix option.

For related backend issues, see Fix: Upstash Not Working, Fix: Inngest Not Working, Fix: Temporal Not Working, and Fix: Hono Not Working.

F

FixDevs

Solo developer based in Japan. Every solution is cross-referenced with official documentation and tested before publishing.

Was this article helpful?

Related Articles