Gérez tâches asynchrones avec Redis et BullMQ : producers, workers, retry, priorités, jobs récurrents, monitoring Bull Board et scalabilité Node.js.
1. Pourquoi des queues en production
Une API REST bien conçue doit répondre en moins de 200 ms. Pourtant, beaucoup d'opérations métier prennent plusieurs secondes : envoi d'emails, génération de PDF, appels à des APIs lentes, traitement d'images, calculs IA. Faire attendre l'utilisateur sur ces tâches dégrade l'expérience et fait timeout les load balancers.
La solution : déporter le travail dans une queue asynchrone. L'API répond immédiatement (« job accepté ») et un worker exécute la tâche en arrière-plan. Redis sert de broker fiable, BullMQ fournit l'API TypeScript moderne pour orchestrer producers, workers, retries et monitoring.
Cas d'usage typiques
| Cas d'usage | Pourquoi en queue | Durée typique |
|---|---|---|
| Envoi d'email transactionnel | Le SMTP peut être lent ou indisponible, retry nécessaire | 0,5 - 3 s |
| Génération PDF / Excel | CPU-intensif, bloque la requête HTTP | 1 - 30 s |
| Webhooks sortants | Endpoint externe imprévisible, retry obligatoire | 0,2 - 10 s |
| Traitement d'image / vidéo | CPU ou GPU lourd, à isoler du process API | 5 s - 5 min |
| Appels LLM / IA | Latence externe variable, coût à contrôler | 2 - 60 s |
| Synchronisation tierce (CRM, ERP) | Rate limit, batching, retry intelligent | 0,5 - 5 s |
| Export massif / data warehouse | Gros volumes, parallélisation | 30 s - 1 h |
- Bull (legacy) : encore stable, mais plus d'évolutions. À éviter pour un nouveau projet.
- BullMQ : réécriture TypeScript moderne, pattern Worker isolé, QueueEvents, flows parent/enfant. Choix par défaut en 2026.
- Bee-Queue : ultra-rapide mais moins de features (pas de cron, pas de delays). Niche.
2. Prérequis et installation
Environnement requis
- Node.js 20 LTS ou supérieur
- Redis 7+ (local ou Docker, hosted Redis Cloud / Upstash / ElastiCache)
- TypeScript 5+ recommandé
- BullMQ 5.x
Lancer Redis en local avec Docker
# Redis 7 en arrière-plan, persistance AOF activée
docker run -d \
--name redis-bullmq \
-p 6379:6379 \
-v redis-data:/data \
redis:7-alpine \
redis-server --appendonly yes
# Vérifier que Redis répond
docker exec -it redis-bullmq redis-cli PING
# → PONG
Installer BullMQ
npm install bullmq ioredis
npm install --save-dev @types/node typescript ts-node
Structure de projet recommandée
api-worker/
├── src/
│ ├── api/
│ │ └── server.ts # Serveur Express (producer)
│ ├── queues/
│ │ ├── connection.ts # Connexion Redis partagée
│ │ ├── email.queue.ts # Définition de la queue email
│ │ └── pdf.queue.ts # Queue PDF
│ ├── workers/
│ │ ├── email.worker.ts # Worker email
│ │ └── pdf.worker.ts # Worker PDF
│ ├── jobs/
│ │ ├── send-email.job.ts # Logique métier email
│ │ └── generate-pdf.job.ts
│ └── monitoring/
│ └── bull-board.ts
├── .env
├── package.json
└── tsconfig.json
3. Architecture Producer → Queue → Worker
BullMQ s'appuie sur quatre composants principaux qu'il est essentiel de bien distinguer :
| Composant | Rôle | Où il vit |
|---|---|---|
Queue |
Producer : ajoute des jobs (queue.add()) |
Processus API |
Worker |
Consumer : exécute les jobs un par un | Processus worker dédié |
QueueEvents |
Listener d'événements (completed, failed, progress) | API, dashboard, monitoring |
FlowProducer |
Compose des jobs parents / enfants | API (cas avancés) |
Diagramme de flux
┌──────────┐ queue.add() ┌──────────────────┐
│ API │ ────────────────▶ │ Redis (BullMQ) │
│ (Express)│ │ queue: email │
└──────────┘ └──────────────────┘
│
worker pulls jobs
▼
┌────────────────┐
│ Worker email │ ──▶ SMTP
└────────────────┘
│
events: completed / failed
▼
┌────────────────┐
│ Bull Board │
│ (dashboard) │
└────────────────┘
Connexion Redis partagée
// src/queues/connection.ts
import IORedis from 'ioredis';
// maxRetriesPerRequest doit être null pour BullMQ workers (sinon erreurs)
export const connection = new IORedis(process.env.REDIS_URL ?? 'redis://localhost:6379', {
maxRetriesPerRequest: null,
});
// Logger pour suivre l'état de la connexion
connection.on('connect', () => console.log('[Redis] connecté'));
connection.on('error', (err) => console.error('[Redis] erreur', err.message));
connection.on('reconnecting',() => console.log('[Redis] reconnexion...'));
4. Créer sa première queue
Définir la queue (côté API)
// src/queues/email.queue.ts
import { Queue } from 'bullmq';
import { connection } from './connection';
// Données passées à chaque job email
export interface EmailJobData {
to: string;
subject: string;
template: 'welcome' | 'reset-password' | 'invoice';
variables: Record;
}
// Une queue par domaine fonctionnel (email, pdf, sync...)
export const emailQueue = new Queue('email', {
connection,
defaultJobOptions: {
// Conserve les 1000 derniers jobs réussis (pour debug)
removeOnComplete: { count: 1000, age: 24 * 3600 },
// Conserve les 5000 derniers jobs échoués (pour analyse)
removeOnFail: { count: 5000 },
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
},
});
Publier un job depuis l'API
// src/api/server.ts
import express from 'express';
import { emailQueue } from '../queues/email.queue';
const app = express();
app.use(express.json());
app.post('/users/signup', async (req, res) => {
// 1. Créer l'utilisateur en BDD (synchrone)
const user = await createUser(req.body);
// 2. Publier le job d'envoi d'email (asynchrone)
await emailQueue.add('welcome-email', {
to: user.email,
subject: 'Bienvenue !',
template: 'welcome',
variables: { firstName: user.firstName },
});
// 3. Répondre immédiatement à l'utilisateur (l'email part en background)
res.status(201).json({ id: user.id });
});
app.listen(3000);
jobId stable (ex. welcome-{userId}) pour éviter de créer deux fois le même job en cas de retry HTTP côté client :
await emailQueue.add('welcome-email', payload, {
jobId: `welcome-${user.id}`, // unique : doublon = no-op
});
5. Worker et processing des jobs
Définir le worker
// src/workers/email.worker.ts
import { Worker, Job } from 'bullmq';
import { connection } from '../queues/connection';
import { EmailJobData } from '../queues/email.queue';
import { sendEmail } from '../services/mailer';
const worker = new Worker(
'email', // doit matcher le nom de la queue
async (job: Job) => {
// job.data contient le payload envoyé par queue.add()
console.log(`[email] traitement job ${job.id} pour ${job.data.to}`);
// Reporting de progression (visible dans Bull Board)
await job.updateProgress(20);
const html = await renderTemplate(job.data.template, job.data.variables);
await job.updateProgress(60);
await sendEmail(job.data.to, job.data.subject, html);
await job.updateProgress(100);
// La valeur retournée est stockée dans job.returnvalue
return { sentAt: new Date().toISOString(), to: job.data.to };
},
{
connection,
// Concurrence : nombre de jobs traités en parallèle par ce worker
concurrency: 10,
// Limite : 50 jobs / 10s (rate limit côté SMTP)
limiter: { max: 50, duration: 10_000 },
},
);
// Hooks pour observabilité (locale au worker)
worker.on('completed', (job) => {
console.log(`[email] ✅ job ${job.id} terminé`);
});
worker.on('failed', (job, err) => {
console.error(`[email] ❌ job ${job?.id} échoué :`, err.message);
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await worker.close();
process.exit(0);
});
Lancer le worker dans un process séparé
// package.json
{
"scripts": {
"start:api": "ts-node src/api/server.ts",
"start:worker:email": "ts-node src/workers/email.worker.ts",
"start:worker:pdf": "ts-node src/workers/pdf.worker.ts"
}
}
En production, vous lancez l'API et chaque worker comme des process distincts (Docker compose, PM2, Kubernetes Deployment). Cela isole les pannes : un crash du worker PDF n'impacte ni l'API ni les emails.
6. Retry, backoff et gestion d'erreurs
Un job qui échoue n'est pas un drame : BullMQ retente automatiquement selon la stratégie configurée. Les erreurs définitives (payload invalide) doivent en revanche échouer rapidement et sans retry.
Configurer attempts et backoff
// Par défaut sur la queue (déjà vu en section 4)
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential', // 1s, 2s, 4s, 8s, 16s
delay: 1000,
},
}
// Ou par job individuel
await emailQueue.add('welcome', payload, {
attempts: 10,
backoff: { type: 'fixed', delay: 5000 }, // 5s entre chaque tentative
});
Distinguer erreurs transientes et permanentes
import { UnrecoverableError } from 'bullmq';
new Worker('email', async (job) => {
// Validation rapide : si payload corrompu, ne pas retenter
if (!job.data.to.includes('@')) {
// UnrecoverableError stoppe les retries immédiatement
throw new UnrecoverableError('Adresse email invalide');
}
try {
await sendEmail(job.data.to, ...);
} catch (err) {
if (err.code === 'EAUTH') {
// Identifiants SMTP cassés : inutile de réessayer
throw new UnrecoverableError('Auth SMTP : intervention humaine requise');
}
// Toute autre erreur (timeout, 5xx) → retry automatique
throw err;
}
});
Stratégie de backoff personnalisée
new Worker('webhook', processor, {
connection,
settings: {
backoffStrategy: (attemptsMade, type, err, job) => {
// Backoff inspiré d'AWS : exponentiel + jitter
const base = Math.min(2 ** attemptsMade * 1000, 60_000);
const jitter = Math.random() * 1000;
return base + jitter;
},
},
});
failed. Configurez une alerte (Bull Board, Prometheus) sur leur volume et exposez un endpoint admin pour les rejouer (job.retry()).
7. Priorités, delays et jobs récurrents
Priorités
// 1 = priorité maximale, plus grand = moins prioritaire
await emailQueue.add('reset-password', payload, { priority: 1 }); // urgent
await emailQueue.add('newsletter', payload, { priority: 10 }); // peut attendre
Delays (envoi différé)
// Job planifié dans 1h
await emailQueue.add('reminder', payload, {
delay: 60 * 60 * 1000, // millisecondes
});
// Job planifié à une date précise
const sendAt = new Date('2026-12-01T10:00:00Z');
await emailQueue.add('campaign', payload, {
delay: sendAt.getTime() - Date.now(),
});
Jobs récurrents (cron-like)
// Exécuter toutes les heures
await reportQueue.add(
'hourly-stats',
{ type: 'stats' },
{
repeat: { pattern: '0 * * * *' }, // syntaxe cron
jobId: 'hourly-stats', // évite la création de doublons
},
);
// Toutes les 30 minutes
await reportQueue.add('cleanup', {}, {
repeat: { every: 30 * 60 * 1000 },
jobId: 'cleanup',
});
Gestion des jobs récurrents
// Lister les schedulers actifs
const repeatable = await reportQueue.getRepeatableJobs();
console.log(repeatable);
// Supprimer un schedule
await reportQueue.removeRepeatableByKey(repeatable[0].key);
- Toujours préciser
jobIdstable pour les jobs récurrents (sinon doublons après redéploiement) - Préférer des intervalles plus longs que la durée maximale d'exécution du job
- Si le worker est down au moment où le cron déclenche, le job sera créé dès que le worker revient (pas de rattrapage du retard, sauf si vous l'implémentez)
8. QueueEvents et observabilité
QueueEvents est un listener découplé du worker : il consomme uniquement le flux d'événements Redis. Utile pour l'API (notifier le client), pour le dashboard et pour l'export Prometheus.
// src/monitoring/email-events.ts
import { QueueEvents } from 'bullmq';
import { connection } from '../queues/connection';
const events = new QueueEvents('email', { connection });
events.on('waiting', ({ jobId }) => console.log(`📥 ${jobId} en attente`));
events.on('active', ({ jobId }) => console.log(`⚙️ ${jobId} en cours`));
events.on('completed', ({ jobId, returnvalue }) => {
console.log(`✅ ${jobId} terminé :`, returnvalue);
});
events.on('failed', ({ jobId, failedReason }) => {
console.error(`❌ ${jobId} échoué :`, failedReason);
// Ici : alerter Slack, écrire en BDD, incrémenter un compteur Prometheus...
});
events.on('progress', ({ jobId, data }) => {
console.log(`📊 ${jobId} progression :`, data);
});
Attendre la complétion d'un job depuis l'API
import { QueueEvents } from 'bullmq';
const events = new QueueEvents('pdf', { connection });
app.post('/exports/pdf', async (req, res) => {
const job = await pdfQueue.add('export', { reportId: req.body.reportId });
try {
// Attend la fin du job (timeout 30s)
const result = await job.waitUntilFinished(events, 30_000);
res.json({ url: result.downloadUrl });
} catch (err) {
res.status(504).json({ error: 'PDF generation timeout' });
}
});
Métriques Prometheus
import { Counter, Histogram } from 'prom-client';
const jobsTotal = new Counter({
name: 'bullmq_jobs_total',
help: 'Total des jobs traités',
labelNames: ['queue', 'status'],
});
const jobDuration = new Histogram({
name: 'bullmq_job_duration_seconds',
help: 'Durée d\'exécution des jobs',
labelNames: ['queue'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
});
events.on('completed', ({ jobId }) => jobsTotal.inc({ queue: 'email', status: 'completed' }));
events.on('failed', ({ jobId }) => jobsTotal.inc({ queue: 'email', status: 'failed' }));
9. Dashboard Bull Board
Bull Board fournit une UI web pour inspecter les queues : jobs actifs, en attente, terminés, échoués, payloads, logs, retry manuel. Indispensable en production pour le debug rapide.
npm install @bull-board/express @bull-board/api
// src/monitoring/bull-board.ts
import express from 'express';
import basicAuth from 'express-basic-auth';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue } from '../queues/email.queue';
import { pdfQueue } from '../queues/pdf.queue';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(pdfQueue),
],
serverAdapter,
});
const app = express();
// Authentification basique : NE JAMAIS exposer Bull Board publiquement
app.use(
'/admin/queues',
basicAuth({
users: { admin: process.env.BULL_BOARD_PASSWORD ?? 'changeme' },
challenge: true,
}),
serverAdapter.getRouter(),
);
app.listen(4000, () => {
console.log('Bull Board : http://localhost:4000/admin/queues');
});
10. Intégration Express et NestJS
Avec Express : pattern singleton
// src/index.ts
import express from 'express';
import { emailQueue } from './queues/email.queue';
const app = express();
app.use(express.json());
// Inject la queue dans res.locals pour les routes
app.use((req, res, next) => {
res.locals.queues = { email: emailQueue };
next();
});
app.post('/notify', async (req, res) => {
const { email: q } = res.locals.queues;
await q.add('notify', req.body);
res.json({ queued: true });
});
// Graceful shutdown
const server = app.listen(3000);
process.on('SIGTERM', async () => {
await emailQueue.close();
server.close(() => process.exit(0));
});
Avec NestJS : @nestjs/bullmq
npm install @nestjs/bullmq bullmq
// src/app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { EmailModule } from './email/email.module';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST,
port: Number(process.env.REDIS_PORT ?? 6379),
},
}),
EmailModule,
],
})
export class AppModule {}
// src/email/email.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { EmailService } from './email.service';
import { EmailProcessor } from './email.processor';
@Module({
imports: [BullModule.registerQueue({ name: 'email' })],
providers: [EmailService, EmailProcessor],
})
export class EmailModule {}
// src/email/email.service.ts (producer)
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
@Injectable()
export class EmailService {
constructor(@InjectQueue('email') private readonly queue: Queue) {}
send(to: string, subject: string) {
return this.queue.add('send-email', { to, subject });
}
}
// src/email/email.processor.ts (worker)
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
@Processor('email', { concurrency: 10 })
export class EmailProcessor extends WorkerHost {
async process(job: Job<{ to: string; subject: string }>) {
await sendEmail(job.data.to, job.data.subject);
return { sentAt: new Date().toISOString() };
}
}
11. Scalabilité multi-workers
Scaler horizontalement
Pour augmenter le débit, ajoutez des instances de worker. BullMQ utilise un mécanisme de lock atomique côté Redis : un job ne sera jamais traité deux fois en parallèle, même avec 100 workers actifs.
# docker-compose.yml
version: '3.9'
services:
api:
build: .
command: node dist/api/server.js
deploy:
replicas: 3 # 3 instances API
environment:
REDIS_URL: redis://redis:6379
worker-email:
build: .
command: node dist/workers/email.worker.js
deploy:
replicas: 5 # 5 workers email
environment:
REDIS_URL: redis://redis:6379
worker-pdf:
build: .
command: node dist/workers/pdf.worker.js
deploy:
replicas: 2 # 2 workers PDF (CPU lourd)
environment:
REDIS_URL: redis://redis:6379
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis-data:/data
volumes:
redis-data:
Choisir la concurrency
| Type de job | Concurrency recommandée | Raisonnement |
|---|---|---|
| CPU-bound (PDF, image, encryption) | 1 à 2 par cœur | Le CPU est le goulot, plus = context switching |
| I/O API HTTP externe | 20 à 100 | Le worker attend, le réseau peut paralléliser |
| BDD heavy (gros INSERT) | 5 à 20 | Limité par les connexions BDD du pool |
| SMTP / providers email | 10 à 50 + rate limit | Respecter les quotas du provider |
Rate limiting au niveau worker
new Worker('sms', processor, {
connection,
concurrency: 20,
// Maximum 100 jobs par minute (quota Twilio par exemple)
limiter: { max: 100, duration: 60_000 },
});
Redis : single node vs cluster
- Single node + persistance AOF : suffisant jusqu'à ~10 000 jobs/s. Backup régulier indispensable.
- Redis Sentinel : haute disponibilité avec failover automatique. BullMQ supporte nativement via
ioredis. - Redis Cluster : sharding au-delà de la mémoire d'une instance. BullMQ requiert le préfixe
{queueName}pour respecter le hash slot. - Hosted (Upstash, Redis Cloud, ElastiCache) : sans surprise, gestion de la HA déléguée. Recommandé pour démarrer.
Conclusion et checklist
BullMQ rend les queues async accessibles à toute API Node.js. Sept à dix lignes suffisent pour publier un premier job, mais l'outil tient en production grâce à ses retries, ses priorités, ses jobs récurrents et son écosystème (Bull Board, intégration NestJS, Prometheus).
✅ Checklist mise en production
- Connexion Redis avec
maxRetriesPerRequest: null - Workers isolés dans des process distincts de l'API
defaultJobOptionsdéfinis sur chaque queue (attempts, backoff, removeOnComplete)jobIdstable sur les jobs critiques (idempotence)UnrecoverableErrorutilisé pour les erreurs définitives- Concurrency calibrée selon le type de travail (CPU vs I/O)
limiterconfiguré pour respecter les quotas externes- Bull Board protégé par auth + IP whitelist
- Métriques Prometheus exportées (jobs total, durée, taux d'échec)
- Alerte sur le nombre de jobs en échec (Slack, PagerDuty)
- Graceful shutdown :
worker.close()sur SIGTERM - Redis sauvegardé (AOF + snapshots, ou hosted avec HA)
- Tests automatisés sur les processors (mock du payload)
- Broker : Redis 7 (single, Sentinel ou Cluster)
- Queue lib : BullMQ 5 + ioredis
- Framework : Express ou NestJS (@nestjs/bullmq)
- Monitoring : Bull Board + Prometheus + Grafana
- Process manager : PM2, Docker compose ou Kubernetes Deployment
- Sécurité : auth Bull Board, secrets via env, TLS Redis en prod