Maîtriser les Streams en Node.js pour traiter les données volumineuses sans surcharger la mémoire avec des exemples pratiques.
Qu'est-ce que les Streams et pourquoi les utiliser ?
Un Stream en Node.js est une séquence de données traitées morceau par morceau (chunk par chunk) plutôt que chargées entièrement en mémoire. C'est idéal pour les fichiers volumineux, les requêtes HTTP volumineuses ou les gros datasets.
Avantages des Streams :
- Mémoire optimisée : traiter 1GB de données avec quelques MB de RAM
- Latence réduite : commencer à traiter sans attendre la fin du téléchargement
- Contrôle du débit : backpressure automatique pour éviter le débordement
- Composition : combiner plusieurs streams avec
pipe()
Exemple sans streams (problématique) :
// ❌ MAUVAIS - charge tout en mémoire
const fs = require('fs');
const data = fs.readFileSync('huge-file.json');
console.info((data.length); // Tout en mémoire!
Exemple avec streams (optimal) :
// ✅ BON - traite par chunks
const fs = require('fs');
fs.createReadStream('huge-file.json')
.on('data', chunk => {
console.info('Chunk reçu:', chunk.length, 'bytes');
})
.on('end', () => {
console.info('Fichier terminé');
});
Types de streams : Readable, Writable, Transform, Duplex
Il existe 4 types de streams en Node.js, chacun avec un rôle spécifique :
1. Readable Stream - Lis des données
Exemple : fichiers, requêtes HTTP, bases de données.
const fs = require('fs');
const readable = fs.createReadStream('data.txt');
// Émettra des événements 'data', 'end', 'error'
2. Writable Stream - Écris des données
Exemple : fichiers, réponses HTTP, bases de données.
const writable = fs.createWriteStream('output.txt');
writable.write('Hello');
writable.end();
3. Transform Stream - Transforme les données
Combine Readable + Writable et modifie les données au passage.
const { Transform } = require('stream');
const uppercase = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
4. Duplex Stream - Lit ET écrit (bidirectionnel)
Exemple : sockets TCP, connexions WebSocket.
const net = require('net');
const socket = net.connect(3000); // Duplex (read & write)
| Type | Lecture | Écriture | Exemple |
|---|---|---|---|
| Readable | ✓ | ✗ | fs.createReadStream() |
| Writable | ✗ | ✓ | fs.createWriteStream() |
| Transform | ✓ | ✓ | Compression, JSON parsing |
| Duplex | ✓ | ✓ | Sockets TCP, WebSocket |
Créer un Readable Stream
Créer un stream personnalisé qui génère des données :
const { Readable } = require('stream');
const readable = new Readable({
read(size) {
// Appelé quand le consumer demande des données
if (this.count === undefined) {
this.count = 0;
}
if (this.count < 3) {
this.push(`Ligne ${this.count}\n`);
this.count++;
} else {
this.push(null); // Signal EOF (fin)
}
}
});
readable.on('data', chunk => {
console.info('Reçu:', chunk.toString());
});
readable.on('end', () => {
console.info('Stream terminé');
});
Créer un stream depuis un fichier :
const fs = require('fs');
const readable = fs.createReadStream('file.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB par chunk (défaut: 16KB)
});
readable.on('data', chunk => {
console.info('Chunk:', chunk.length, 'bytes');
});
readable.on('end', () => {
console.info('Fichier fini');
});
readable.on('error', err => {
console.error('Erreur:', err.message);
});
highWaterMark contrôle la taille des buffers. Augmentez-le pour les gros fichiers (mais attention à la mémoire).
Créer un Writable Stream
Créer un stream personnalisé qui consomme des données :
const { Writable } = require('stream');
const writable = new Writable({
write(chunk, encoding, callback) {
// Traiter le chunk
console.info('Écriture:', chunk.toString());
// Simuler une opération async
setTimeout(() => {
console.info('Chunk traité');
callback(); // Signal que c'est fini
}, 100);
}
});
writable.write('Hello ');
writable.write('World\n');
writable.end('Fin!\n');
writable.on('finish', () => {
console.info('Tous les chunks ont été écrits');
});
Écrire dans un fichier :
const fs = require('fs');
const writable = fs.createWriteStream('output.txt');
writable.write('Ligne 1\n');
writable.write('Ligne 2\n');
writable.write('Ligne 3\n');
writable.end('Fin du fichier\n');
writable.on('finish', () => {
console.info('Fichier écrit avec succès');
});
writable.on('error', err => {
console.error('Erreur:', err.message);
});
end() pour signaler la fin du stream. Sans cela, le stream reste ouvert.
Pipe et chaînage de streams
La méthode pipe() relie automatiquement un stream source à un stream destination, avec gestion du backpressure :
const fs = require('fs');
// Copier un fichier avec pipe
fs.createReadStream('input.txt')
.pipe(fs.createWriteStream('output.txt'))
.on('finish', () => {
console.info('Fichier copié!');
});
Chaîner plusieurs streams :
const fs = require('fs');
const zlib = require('zlib');
// Lire → compresser → écrire
fs.createReadStream('input.txt')
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream('input.txt.gz'))
.on('finish', () => {
console.info('Fichier compressé!');
});
Décompresser un fichier :
const fs = require('fs');
const zlib = require('zlib');
fs.createReadStream('input.txt.gz')
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream('input.txt'))
.on('finish', () => {
console.info('Fichier décompressé!');
});
Avantage de pipe() :
- Gère automatiquement le backpressure
- Évite de charger qui en mémoire
- Code plus lisible et déclaratif
Pipe avec gestion d'erreur :
const fs = require('fs');
fs.createReadStream('input.txt')
.pipe(fs.createWriteStream('output.txt'))
.on('error', err => console.error('Erreur write:', err))
.on('finish', () => console.info('Terminé!'));
// Écouter aussi les erreurs du source
fs.createReadStream('input.txt')
.on('error', err => console.error('Erreur read:', err))
.pipe(fs.createWriteStream('output.txt'));
Transform streams pour traiter les données
Un Transform Stream lit les données, les modifie, puis les écrit :
const { Transform } = require('stream');
const uppercase = new Transform({
transform(chunk, encoding, callback) {
// Modifier le chunk
const data = chunk.toString().toUpperCase();
this.push(data);
callback();
}
});
process.stdin.pipe(uppercase).pipe(process.stdout);
Parser JSON ligne par ligne :
const { Transform } = require('stream');
const fs = require('fs');
const readline = require('readline');
const jsonParser = new Transform({
transform(chunk, encoding, callback) {
try {
const obj = JSON.parse(chunk.toString());
this.push(JSON.stringify(obj, null, 2) + '\n');
callback();
} catch (err) {
callback(err);
}
}
});
fs.createReadStream('data.jsonl')
.pipe(jsonParser)
.pipe(fs.createWriteStream('formatted.json'));
Compter les lignes d'un fichier :
const { Transform } = require('stream');
const fs = require('fs');
let lineCount = 0;
const lineCounter = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
lineCount += lines.length - 1;
this.push(chunk); // Passer les données
callback();
}
});
fs.createReadStream('file.txt')
.pipe(lineCounter)
.on('finish', () => {
console.info('Nombre de lignes:', lineCount);
});
Filtrer des données :
const { Transform } = require('stream');
const filter = new Transform({
transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
const filtered = lines
.filter(line => !line.startsWith('#')) // Ignorer les commentaires
.join('\n');
this.push(filtered);
callback();
}
});
process.stdin.pipe(filter).pipe(process.stdout);
Gestion des erreurs et backpressure
1. Backpressure automatique
Quand vous écrivez plus vite que le consumer peut lire, Node.js signale un backpressure :
const fs = require('fs');
const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', chunk => {
const canContinue = writable.write(chunk);
if (!canContinue) {
// Backpressure! Pause la lecture
readable.pause();
console.info('Pause - buffer plein');
}
});
writable.on('drain', () => {
// Buffer vidé, reprendre
readable.resume();
console.info('Reprendre');
});
Avec pipe() - Automatique :
// pipe() gère le backpressure automatiquement
fs.createReadStream('input.txt')
.pipe(fs.createWriteStream('output.txt'));
2. Gestion des erreurs
Toujours écouter les événements 'error' :
const fs = require('fs');
const readable = fs.createReadStream('missing.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('error', err => {
if (err.code === 'ENOENT') {
console.error('Fichier introuvable');
} else {
console.error('Erreur lecture:', err);
}
});
writable.on('error', err => {
console.error('Erreur écriture:', err);
});
readable.pipe(writable);
Pipeline moderne (Node.js 15+) :
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
async function compress() {
try {
await pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('input.txt.gz')
);
console.info('Compressé!');
} catch (err) {
console.error('Erreur pipeline:', err);
// Le cleanup est automatique
}
}
compress();
pipeline() pour un code plus robuste. Il gère les erreurs et le cleanup automatiquement.
Cas d'usage pratiques et patterns avancés
1. Télécharger et sauvegarder un fichier
const http = require('http');
const fs = require('fs');
http.get('https://example.com/large-file.zip', response => {
response.pipe(fs.createWriteStream('downloaded.zip'))
.on('finish', () => {
console.info('Téléchargé!');
});
});
2. Parser un CSV volumineux
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('large.csv'),
crlfDelay: Infinity
});
rl.on('line', line => {
const [name, email, age] = line.split(',');
console.info(`Processus: ${name}`);
});
3. Servir un fichier PDF via HTTP
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
if (req.url === '/download') {
res.setHeader('Content-Type', 'application/pdf');
res.setHeader('Content-Disposition', 'attachment; filename=file.pdf');
fs.createReadStream('large-file.pdf').pipe(res);
}
}).listen(3000);
4. Transformer et sauvegarder des données
const { Transform } = require('stream');
const fs = require('fs');
const csvToJSON = new Transform({
transform(chunk, encoding, callback) {
const [name, age, city] = chunk.toString().trim().split(',');
const json = JSON.stringify({ name, age, city }) + '\n';
this.push(json);
callback();
}
});
fs.createReadStream('data.csv')
.pipe(csvToJSON)
.pipe(fs.createWriteStream('data.json'));
5. Streamer des données vers une base de données
const { Transform } = require('stream');
const fs = require('fs');
const db = require('sqlite3');
const dbWriter = new Transform({
async transform(chunk, encoding, callback) {
const data = JSON.parse(chunk);
await db.run('INSERT INTO users VALUES (?, ?)',
[data.name, data.email]);
callback();
}
});
fs.createReadStream('users.jsonl')
.pipe(dbWriter)
.on('finish', () => {
console.info('Données insérées!');
});
Bonnes pratiques :
- Toujours écouter l'événement
'error'sur chaque stream - Utilisez
pipeline()au lieu de chaînerpipe() - Contrôlez le
highWaterMarkselon votre cas d'usage - Testez avec des fichiers volumineux avant production
- Utilisez des outils de profiling pour vérifier la mémoire