Back-end angularforall.com

- Redis + BullMQ : tâches asynchrones Node.js en prod

Redis Bullmq Node-Js Queues Background-Jobs Workers Async Typescript Bull-Board Monitoring Retry-Strategy Scalabilite Backend Production Ioredis
Redis + BullMQ : tâches asynchrones Node.js en prod

Gérez tâches asynchrones avec Redis et BullMQ : producers, workers, retry, priorités, jobs récurrents, monitoring Bull Board et scalabilité Node.js.

1. Pourquoi des queues en production

Une API REST bien conçue doit répondre en moins de 200 ms. Pourtant, beaucoup d'opérations métier prennent plusieurs secondes : envoi d'emails, génération de PDF, appels à des APIs lentes, traitement d'images, calculs IA. Faire attendre l'utilisateur sur ces tâches dégrade l'expérience et fait timeout les load balancers.

La solution : déporter le travail dans une queue asynchrone. L'API répond immédiatement (« job accepté ») et un worker exécute la tâche en arrière-plan. Redis sert de broker fiable, BullMQ fournit l'API TypeScript moderne pour orchestrer producers, workers, retries et monitoring.

Cas d'usage typiques

Cas d'usage Pourquoi en queue Durée typique
Envoi d'email transactionnel Le SMTP peut être lent ou indisponible, retry nécessaire 0,5 - 3 s
Génération PDF / Excel CPU-intensif, bloque la requête HTTP 1 - 30 s
Webhooks sortants Endpoint externe imprévisible, retry obligatoire 0,2 - 10 s
Traitement d'image / vidéo CPU ou GPU lourd, à isoler du process API 5 s - 5 min
Appels LLM / IA Latence externe variable, coût à contrôler 2 - 60 s
Synchronisation tierce (CRM, ERP) Rate limit, batching, retry intelligent 0,5 - 5 s
Export massif / data warehouse Gros volumes, parallélisation 30 s - 1 h
💡 Bull vs BullMQ vs Bee-Queue :
  • Bull (legacy) : encore stable, mais plus d'évolutions. À éviter pour un nouveau projet.
  • BullMQ : réécriture TypeScript moderne, pattern Worker isolé, QueueEvents, flows parent/enfant. Choix par défaut en 2026.
  • Bee-Queue : ultra-rapide mais moins de features (pas de cron, pas de delays). Niche.

2. Prérequis et installation

Environnement requis

  • Node.js 20 LTS ou supérieur
  • Redis 7+ (local ou Docker, hosted Redis Cloud / Upstash / ElastiCache)
  • TypeScript 5+ recommandé
  • BullMQ 5.x

Lancer Redis en local avec Docker


# Redis 7 en arrière-plan, persistance AOF activée
docker run -d \
  --name redis-bullmq \
  -p 6379:6379 \
  -v redis-data:/data \
  redis:7-alpine \
  redis-server --appendonly yes

# Vérifier que Redis répond
docker exec -it redis-bullmq redis-cli PING
# → PONG

Installer BullMQ


npm install bullmq ioredis
npm install --save-dev @types/node typescript ts-node

Structure de projet recommandée


api-worker/
├── src/
│   ├── api/
│   │   └── server.ts             # Serveur Express (producer)
│   ├── queues/
│   │   ├── connection.ts         # Connexion Redis partagée
│   │   ├── email.queue.ts        # Définition de la queue email
│   │   └── pdf.queue.ts          # Queue PDF
│   ├── workers/
│   │   ├── email.worker.ts       # Worker email
│   │   └── pdf.worker.ts         # Worker PDF
│   ├── jobs/
│   │   ├── send-email.job.ts     # Logique métier email
│   │   └── generate-pdf.job.ts
│   └── monitoring/
│       └── bull-board.ts
├── .env
├── package.json
└── tsconfig.json
📌 Séparation API / Workers : en production, le processus API et les processus worker sont des entrypoints différents. L'API publie des jobs, les workers les consomment. Cela permet de scaler indépendamment (3 instances API, 10 workers email, 2 workers PDF).

3. Architecture Producer → Queue → Worker

BullMQ s'appuie sur quatre composants principaux qu'il est essentiel de bien distinguer :

ComposantRôleOù il vit
Queue Producer : ajoute des jobs (queue.add()) Processus API
Worker Consumer : exécute les jobs un par un Processus worker dédié
QueueEvents Listener d'événements (completed, failed, progress) API, dashboard, monitoring
FlowProducer Compose des jobs parents / enfants API (cas avancés)

Diagramme de flux


   ┌──────────┐    queue.add()     ┌──────────────────┐
   │   API    │ ────────────────▶  │  Redis (BullMQ)  │
   │ (Express)│                    │   queue: email   │
   └──────────┘                    └──────────────────┘
                                            │
                                  worker pulls jobs
                                            ▼
                                  ┌────────────────┐
                                  │  Worker email  │ ──▶ SMTP
                                  └────────────────┘
                                            │
                                  events: completed / failed
                                            ▼
                                  ┌────────────────┐
                                  │  Bull Board    │
                                  │   (dashboard)  │
                                  └────────────────┘

Connexion Redis partagée


// src/queues/connection.ts
import IORedis from 'ioredis';

// maxRetriesPerRequest doit être null pour BullMQ workers (sinon erreurs)
export const connection = new IORedis(process.env.REDIS_URL ?? 'redis://localhost:6379', {
  maxRetriesPerRequest: null,
});

// Logger pour suivre l'état de la connexion
connection.on('connect',     () => console.log('[Redis] connecté'));
connection.on('error',       (err) => console.error('[Redis] erreur', err.message));
connection.on('reconnecting',() => console.log('[Redis] reconnexion...'));

4. Créer sa première queue

Définir la queue (côté API)


// src/queues/email.queue.ts
import { Queue } from 'bullmq';
import { connection } from './connection';

// Données passées à chaque job email
export interface EmailJobData {
  to: string;
  subject: string;
  template: 'welcome' | 'reset-password' | 'invoice';
  variables: Record;
}

// Une queue par domaine fonctionnel (email, pdf, sync...)
export const emailQueue = new Queue('email', {
  connection,
  defaultJobOptions: {
    // Conserve les 1000 derniers jobs réussis (pour debug)
    removeOnComplete: { count: 1000, age: 24 * 3600 },
    // Conserve les 5000 derniers jobs échoués (pour analyse)
    removeOnFail:     { count: 5000 },
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
  },
});

Publier un job depuis l'API


// src/api/server.ts
import express from 'express';
import { emailQueue } from '../queues/email.queue';

const app = express();
app.use(express.json());

app.post('/users/signup', async (req, res) => {
  // 1. Créer l'utilisateur en BDD (synchrone)
  const user = await createUser(req.body);

  // 2. Publier le job d'envoi d'email (asynchrone)
  await emailQueue.add('welcome-email', {
    to: user.email,
    subject: 'Bienvenue !',
    template: 'welcome',
    variables: { firstName: user.firstName },
  });

  // 3. Répondre immédiatement à l'utilisateur (l'email part en background)
  res.status(201).json({ id: user.id });
});

app.listen(3000);
🎯 Idempotence : donnez un jobId stable (ex. welcome-{userId}) pour éviter de créer deux fois le même job en cas de retry HTTP côté client :

await emailQueue.add('welcome-email', payload, {
  jobId: `welcome-${user.id}`, // unique : doublon = no-op
});

5. Worker et processing des jobs

Définir le worker


// src/workers/email.worker.ts
import { Worker, Job } from 'bullmq';
import { connection } from '../queues/connection';
import { EmailJobData } from '../queues/email.queue';
import { sendEmail } from '../services/mailer';

const worker = new Worker(
  'email', // doit matcher le nom de la queue
  async (job: Job) => {
    // job.data contient le payload envoyé par queue.add()
    console.log(`[email] traitement job ${job.id} pour ${job.data.to}`);

    // Reporting de progression (visible dans Bull Board)
    await job.updateProgress(20);

    const html = await renderTemplate(job.data.template, job.data.variables);
    await job.updateProgress(60);

    await sendEmail(job.data.to, job.data.subject, html);
    await job.updateProgress(100);

    // La valeur retournée est stockée dans job.returnvalue
    return { sentAt: new Date().toISOString(), to: job.data.to };
  },
  {
    connection,
    // Concurrence : nombre de jobs traités en parallèle par ce worker
    concurrency: 10,
    // Limite : 50 jobs / 10s (rate limit côté SMTP)
    limiter: { max: 50, duration: 10_000 },
  },
);

// Hooks pour observabilité (locale au worker)
worker.on('completed', (job) => {
  console.log(`[email] ✅ job ${job.id} terminé`);
});
worker.on('failed', (job, err) => {
  console.error(`[email] ❌ job ${job?.id} échoué :`, err.message);
});

// Graceful shutdown
process.on('SIGTERM', async () => {
  await worker.close();
  process.exit(0);
});

Lancer le worker dans un process séparé


// package.json
{
  "scripts": {
    "start:api":           "ts-node src/api/server.ts",
    "start:worker:email":  "ts-node src/workers/email.worker.ts",
    "start:worker:pdf":    "ts-node src/workers/pdf.worker.ts"
  }
}

En production, vous lancez l'API et chaque worker comme des process distincts (Docker compose, PM2, Kubernetes Deployment). Cela isole les pannes : un crash du worker PDF n'impacte ni l'API ni les emails.

6. Retry, backoff et gestion d'erreurs

Un job qui échoue n'est pas un drame : BullMQ retente automatiquement selon la stratégie configurée. Les erreurs définitives (payload invalide) doivent en revanche échouer rapidement et sans retry.

Configurer attempts et backoff


// Par défaut sur la queue (déjà vu en section 4)
defaultJobOptions: {
  attempts: 5,
  backoff: {
    type: 'exponential', // 1s, 2s, 4s, 8s, 16s
    delay: 1000,
  },
}

// Ou par job individuel
await emailQueue.add('welcome', payload, {
  attempts: 10,
  backoff: { type: 'fixed', delay: 5000 }, // 5s entre chaque tentative
});

Distinguer erreurs transientes et permanentes


import { UnrecoverableError } from 'bullmq';

new Worker('email', async (job) => {
  // Validation rapide : si payload corrompu, ne pas retenter
  if (!job.data.to.includes('@')) {
    // UnrecoverableError stoppe les retries immédiatement
    throw new UnrecoverableError('Adresse email invalide');
  }

  try {
    await sendEmail(job.data.to, ...);
  } catch (err) {
    if (err.code === 'EAUTH') {
      // Identifiants SMTP cassés : inutile de réessayer
      throw new UnrecoverableError('Auth SMTP : intervention humaine requise');
    }
    // Toute autre erreur (timeout, 5xx) → retry automatique
    throw err;
  }
});

Stratégie de backoff personnalisée


new Worker('webhook', processor, {
  connection,
  settings: {
    backoffStrategy: (attemptsMade, type, err, job) => {
      // Backoff inspiré d'AWS : exponentiel + jitter
      const base = Math.min(2 ** attemptsMade * 1000, 60_000);
      const jitter = Math.random() * 1000;
      return base + jitter;
    },
  },
});
🛡️ Dead Letter Queue (DLQ) : les jobs définitivement échoués restent dans Redis en état failed. Configurez une alerte (Bull Board, Prometheus) sur leur volume et exposez un endpoint admin pour les rejouer (job.retry()).

7. Priorités, delays et jobs récurrents

Priorités


// 1 = priorité maximale, plus grand = moins prioritaire
await emailQueue.add('reset-password', payload, { priority: 1 });   // urgent
await emailQueue.add('newsletter',     payload, { priority: 10 });  // peut attendre

Delays (envoi différé)


// Job planifié dans 1h
await emailQueue.add('reminder', payload, {
  delay: 60 * 60 * 1000, // millisecondes
});

// Job planifié à une date précise
const sendAt = new Date('2026-12-01T10:00:00Z');
await emailQueue.add('campaign', payload, {
  delay: sendAt.getTime() - Date.now(),
});

Jobs récurrents (cron-like)


// Exécuter toutes les heures
await reportQueue.add(
  'hourly-stats',
  { type: 'stats' },
  {
    repeat: { pattern: '0 * * * *' }, // syntaxe cron
    jobId: 'hourly-stats',            // évite la création de doublons
  },
);

// Toutes les 30 minutes
await reportQueue.add('cleanup', {}, {
  repeat: { every: 30 * 60 * 1000 },
  jobId: 'cleanup',
});

Gestion des jobs récurrents


// Lister les schedulers actifs
const repeatable = await reportQueue.getRepeatableJobs();
console.log(repeatable);

// Supprimer un schedule
await reportQueue.removeRepeatableByKey(repeatable[0].key);
🕒 Bonnes pratiques cron :
  • Toujours préciser jobId stable pour les jobs récurrents (sinon doublons après redéploiement)
  • Préférer des intervalles plus longs que la durée maximale d'exécution du job
  • Si le worker est down au moment où le cron déclenche, le job sera créé dès que le worker revient (pas de rattrapage du retard, sauf si vous l'implémentez)

8. QueueEvents et observabilité

QueueEvents est un listener découplé du worker : il consomme uniquement le flux d'événements Redis. Utile pour l'API (notifier le client), pour le dashboard et pour l'export Prometheus.


// src/monitoring/email-events.ts
import { QueueEvents } from 'bullmq';
import { connection } from '../queues/connection';

const events = new QueueEvents('email', { connection });

events.on('waiting',   ({ jobId }) => console.log(`📥 ${jobId} en attente`));
events.on('active',    ({ jobId }) => console.log(`⚙️  ${jobId} en cours`));
events.on('completed', ({ jobId, returnvalue }) => {
  console.log(`✅ ${jobId} terminé :`, returnvalue);
});
events.on('failed',    ({ jobId, failedReason }) => {
  console.error(`❌ ${jobId} échoué :`, failedReason);
  // Ici : alerter Slack, écrire en BDD, incrémenter un compteur Prometheus...
});
events.on('progress',  ({ jobId, data }) => {
  console.log(`📊 ${jobId} progression :`, data);
});

Attendre la complétion d'un job depuis l'API


import { QueueEvents } from 'bullmq';

const events = new QueueEvents('pdf', { connection });

app.post('/exports/pdf', async (req, res) => {
  const job = await pdfQueue.add('export', { reportId: req.body.reportId });

  try {
    // Attend la fin du job (timeout 30s)
    const result = await job.waitUntilFinished(events, 30_000);
    res.json({ url: result.downloadUrl });
  } catch (err) {
    res.status(504).json({ error: 'PDF generation timeout' });
  }
});

Métriques Prometheus


import { Counter, Histogram } from 'prom-client';

const jobsTotal = new Counter({
  name: 'bullmq_jobs_total',
  help: 'Total des jobs traités',
  labelNames: ['queue', 'status'],
});

const jobDuration = new Histogram({
  name: 'bullmq_job_duration_seconds',
  help: 'Durée d\'exécution des jobs',
  labelNames: ['queue'],
  buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
});

events.on('completed', ({ jobId }) => jobsTotal.inc({ queue: 'email', status: 'completed' }));
events.on('failed',    ({ jobId }) => jobsTotal.inc({ queue: 'email', status: 'failed' }));

9. Dashboard Bull Board

Bull Board fournit une UI web pour inspecter les queues : jobs actifs, en attente, terminés, échoués, payloads, logs, retry manuel. Indispensable en production pour le debug rapide.


npm install @bull-board/express @bull-board/api

// src/monitoring/bull-board.ts
import express from 'express';
import basicAuth from 'express-basic-auth';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';

import { emailQueue } from '../queues/email.queue';
import { pdfQueue }   from '../queues/pdf.queue';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(pdfQueue),
  ],
  serverAdapter,
});

const app = express();

// Authentification basique : NE JAMAIS exposer Bull Board publiquement
app.use(
  '/admin/queues',
  basicAuth({
    users: { admin: process.env.BULL_BOARD_PASSWORD ?? 'changeme' },
    challenge: true,
  }),
  serverAdapter.getRouter(),
);

app.listen(4000, () => {
  console.log('Bull Board : http://localhost:4000/admin/queues');
});
🔒 Sécurité : Bull Board expose les payloads en clair. Toujours protéger par auth (basic auth, JWT admin, SSO) et restreindre par IP en production. Idéalement derrière un VPN ou Cloudflare Access.

10. Intégration Express et NestJS

Avec Express : pattern singleton


// src/index.ts
import express from 'express';
import { emailQueue } from './queues/email.queue';

const app = express();
app.use(express.json());

// Inject la queue dans res.locals pour les routes
app.use((req, res, next) => {
  res.locals.queues = { email: emailQueue };
  next();
});

app.post('/notify', async (req, res) => {
  const { email: q } = res.locals.queues;
  await q.add('notify', req.body);
  res.json({ queued: true });
});

// Graceful shutdown
const server = app.listen(3000);
process.on('SIGTERM', async () => {
  await emailQueue.close();
  server.close(() => process.exit(0));
});

Avec NestJS : @nestjs/bullmq


npm install @nestjs/bullmq bullmq

// src/app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { EmailModule } from './email/email.module';

@Module({
  imports: [
    BullModule.forRoot({
      connection: {
        host: process.env.REDIS_HOST,
        port: Number(process.env.REDIS_PORT ?? 6379),
      },
    }),
    EmailModule,
  ],
})
export class AppModule {}

// src/email/email.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { EmailService } from './email.service';
import { EmailProcessor } from './email.processor';

@Module({
  imports: [BullModule.registerQueue({ name: 'email' })],
  providers: [EmailService, EmailProcessor],
})
export class EmailModule {}

// src/email/email.service.ts (producer)
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

@Injectable()
export class EmailService {
  constructor(@InjectQueue('email') private readonly queue: Queue) {}

  send(to: string, subject: string) {
    return this.queue.add('send-email', { to, subject });
  }
}

// src/email/email.processor.ts (worker)
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';

@Processor('email', { concurrency: 10 })
export class EmailProcessor extends WorkerHost {
  async process(job: Job<{ to: string; subject: string }>) {
    await sendEmail(job.data.to, job.data.subject);
    return { sentAt: new Date().toISOString() };
  }
}
🎯 Bénéfice NestJS : les workers deviennent des providers Nest classiques. Vous y injectez vos services (mailer, repository, logger) via le constructeur, exactement comme dans un controller.

11. Scalabilité multi-workers

Scaler horizontalement

Pour augmenter le débit, ajoutez des instances de worker. BullMQ utilise un mécanisme de lock atomique côté Redis : un job ne sera jamais traité deux fois en parallèle, même avec 100 workers actifs.


# docker-compose.yml
version: '3.9'
services:
  api:
    build: .
    command: node dist/api/server.js
    deploy:
      replicas: 3            # 3 instances API
    environment:
      REDIS_URL: redis://redis:6379

  worker-email:
    build: .
    command: node dist/workers/email.worker.js
    deploy:
      replicas: 5            # 5 workers email
    environment:
      REDIS_URL: redis://redis:6379

  worker-pdf:
    build: .
    command: node dist/workers/pdf.worker.js
    deploy:
      replicas: 2            # 2 workers PDF (CPU lourd)
    environment:
      REDIS_URL: redis://redis:6379

  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis-data:/data

volumes:
  redis-data:

Choisir la concurrency

Type de jobConcurrency recommandéeRaisonnement
CPU-bound (PDF, image, encryption) 1 à 2 par cœur Le CPU est le goulot, plus = context switching
I/O API HTTP externe 20 à 100 Le worker attend, le réseau peut paralléliser
BDD heavy (gros INSERT) 5 à 20 Limité par les connexions BDD du pool
SMTP / providers email 10 à 50 + rate limit Respecter les quotas du provider

Rate limiting au niveau worker


new Worker('sms', processor, {
  connection,
  concurrency: 20,
  // Maximum 100 jobs par minute (quota Twilio par exemple)
  limiter: { max: 100, duration: 60_000 },
});

Redis : single node vs cluster

  • Single node + persistance AOF : suffisant jusqu'à ~10 000 jobs/s. Backup régulier indispensable.
  • Redis Sentinel : haute disponibilité avec failover automatique. BullMQ supporte nativement via ioredis.
  • Redis Cluster : sharding au-delà de la mémoire d'une instance. BullMQ requiert le préfixe {queueName} pour respecter le hash slot.
  • Hosted (Upstash, Redis Cloud, ElastiCache) : sans surprise, gestion de la HA déléguée. Recommandé pour démarrer.

Conclusion et checklist

BullMQ rend les queues async accessibles à toute API Node.js. Sept à dix lignes suffisent pour publier un premier job, mais l'outil tient en production grâce à ses retries, ses priorités, ses jobs récurrents et son écosystème (Bull Board, intégration NestJS, Prometheus).

✅ Checklist mise en production

  • Connexion Redis avec maxRetriesPerRequest: null
  • Workers isolés dans des process distincts de l'API
  • defaultJobOptions définis sur chaque queue (attempts, backoff, removeOnComplete)
  • jobId stable sur les jobs critiques (idempotence)
  • UnrecoverableError utilisé pour les erreurs définitives
  • Concurrency calibrée selon le type de travail (CPU vs I/O)
  • limiter configuré pour respecter les quotas externes
  • Bull Board protégé par auth + IP whitelist
  • Métriques Prometheus exportées (jobs total, durée, taux d'échec)
  • Alerte sur le nombre de jobs en échec (Slack, PagerDuty)
  • Graceful shutdown : worker.close() sur SIGTERM
  • Redis sauvegardé (AOF + snapshots, ou hosted avec HA)
  • Tests automatisés sur les processors (mock du payload)
🎯 Stack récapitulée :
  • Broker : Redis 7 (single, Sentinel ou Cluster)
  • Queue lib : BullMQ 5 + ioredis
  • Framework : Express ou NestJS (@nestjs/bullmq)
  • Monitoring : Bull Board + Prometheus + Grafana
  • Process manager : PM2, Docker compose ou Kubernetes Deployment
  • Sécurité : auth Bull Board, secrets via env, TLS Redis en prod

Partager