Node.js Stream API : traiter les données efficacement

🏷️ Back-end 📅 12/04/2026 03:00:00 👤 Mezgani Said
Nodejs Stream Api Data Performance Data Processing
Node.js Stream API : traiter les données efficacement

Maîtriser les Streams en Node.js pour traiter les données volumineuses sans surcharger la mémoire avec des exemples pratiques.

Qu'est-ce que les Streams et pourquoi les utiliser ?

Un Stream en Node.js est une séquence de données traitées morceau par morceau (chunk par chunk) plutôt que chargées entièrement en mémoire. C'est idéal pour les fichiers volumineux, les requêtes HTTP volumineuses ou les gros datasets.

À retenir : Sans streams, charger un fichier de 1GB en mémoire le gèle. Avec les streams, vous le traitez par petits morceaux sans risque.

Avantages des Streams :

  • Mémoire optimisée : traiter 1GB de données avec quelques MB de RAM
  • Latence réduite : commencer à traiter sans attendre la fin du téléchargement
  • Contrôle du débit : backpressure automatique pour éviter le débordement
  • Composition : combiner plusieurs streams avec pipe()

Exemple sans streams (problématique) :

// ❌ MAUVAIS - charge tout en mémoire
const fs = require('fs');
const data = fs.readFileSync('huge-file.json');
console.info((data.length); // Tout en mémoire!

Exemple avec streams (optimal) :

// ✅ BON - traite par chunks
const fs = require('fs');
fs.createReadStream('huge-file.json')
  .on('data', chunk => {
    console.info('Chunk reçu:', chunk.length, 'bytes');
  })
  .on('end', () => {
    console.info('Fichier terminé');
  });

Types de streams : Readable, Writable, Transform, Duplex

Il existe 4 types de streams en Node.js, chacun avec un rôle spécifique :

1. Readable Stream - Lis des données

Exemple : fichiers, requêtes HTTP, bases de données.

const fs = require('fs');
const readable = fs.createReadStream('data.txt');
// Émettra des événements 'data', 'end', 'error'

2. Writable Stream - Écris des données

Exemple : fichiers, réponses HTTP, bases de données.

const writable = fs.createWriteStream('output.txt');
writable.write('Hello');
writable.end();

3. Transform Stream - Transforme les données

Combine Readable + Writable et modifie les données au passage.

const { Transform } = require('stream');
const uppercase = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

4. Duplex Stream - Lit ET écrit (bidirectionnel)

Exemple : sockets TCP, connexions WebSocket.

const net = require('net');
const socket = net.connect(3000); // Duplex (read & write)
Type Lecture Écriture Exemple
Readable fs.createReadStream()
Writable fs.createWriteStream()
Transform Compression, JSON parsing
Duplex Sockets TCP, WebSocket

Créer un Readable Stream

Créer un stream personnalisé qui génère des données :

const { Readable } = require('stream');

const readable = new Readable({
  read(size) {
    // Appelé quand le consumer demande des données
    if (this.count === undefined) {
      this.count = 0;
    }

    if (this.count < 3) {
      this.push(`Ligne ${this.count}\n`);
      this.count++;
    } else {
      this.push(null); // Signal EOF (fin)
    }
  }
});

readable.on('data', chunk => {
  console.info('Reçu:', chunk.toString());
});

readable.on('end', () => {
  console.info('Stream terminé');
});

Créer un stream depuis un fichier :

const fs = require('fs');

const readable = fs.createReadStream('file.txt', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64KB par chunk (défaut: 16KB)
});

readable.on('data', chunk => {
  console.info('Chunk:', chunk.length, 'bytes');
});

readable.on('end', () => {
  console.info('Fichier fini');
});

readable.on('error', err => {
  console.error('Erreur:', err.message);
});
Note : Le paramètre highWaterMark contrôle la taille des buffers. Augmentez-le pour les gros fichiers (mais attention à la mémoire).

Créer un Writable Stream

Créer un stream personnalisé qui consomme des données :

const { Writable } = require('stream');

const writable = new Writable({
  write(chunk, encoding, callback) {
    // Traiter le chunk
    console.info('Écriture:', chunk.toString());

    // Simuler une opération async
    setTimeout(() => {
      console.info('Chunk traité');
      callback(); // Signal que c'est fini
    }, 100);
  }
});

writable.write('Hello ');
writable.write('World\n');
writable.end('Fin!\n');

writable.on('finish', () => {
  console.info('Tous les chunks ont été écrits');
});

Écrire dans un fichier :

const fs = require('fs');

const writable = fs.createWriteStream('output.txt');

writable.write('Ligne 1\n');
writable.write('Ligne 2\n');
writable.write('Ligne 3\n');

writable.end('Fin du fichier\n');

writable.on('finish', () => {
  console.info('Fichier écrit avec succès');
});

writable.on('error', err => {
  console.error('Erreur:', err.message);
});
À retenir : Toujours appeler end() pour signaler la fin du stream. Sans cela, le stream reste ouvert.

Pipe et chaînage de streams

La méthode pipe() relie automatiquement un stream source à un stream destination, avec gestion du backpressure :

const fs = require('fs');

// Copier un fichier avec pipe
fs.createReadStream('input.txt')
  .pipe(fs.createWriteStream('output.txt'))
  .on('finish', () => {
    console.info('Fichier copié!');
  });

Chaîner plusieurs streams :

const fs = require('fs');
const zlib = require('zlib');

// Lire → compresser → écrire
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'))
  .on('finish', () => {
    console.info('Fichier compressé!');
  });

Décompresser un fichier :

const fs = require('fs');
const zlib = require('zlib');

fs.createReadStream('input.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('input.txt'))
  .on('finish', () => {
    console.info('Fichier décompressé!');
  });

Avantage de pipe() :

  • Gère automatiquement le backpressure
  • Évite de charger qui en mémoire
  • Code plus lisible et déclaratif

Pipe avec gestion d'erreur :

const fs = require('fs');

fs.createReadStream('input.txt')
  .pipe(fs.createWriteStream('output.txt'))
  .on('error', err => console.error('Erreur write:', err))
  .on('finish', () => console.info('Terminé!'));

// Écouter aussi les erreurs du source
fs.createReadStream('input.txt')
  .on('error', err => console.error('Erreur read:', err))
  .pipe(fs.createWriteStream('output.txt'));

Transform streams pour traiter les données

Un Transform Stream lit les données, les modifie, puis les écrit :

const { Transform } = require('stream');

const uppercase = new Transform({
  transform(chunk, encoding, callback) {
    // Modifier le chunk
    const data = chunk.toString().toUpperCase();
    this.push(data);
    callback();
  }
});

process.stdin.pipe(uppercase).pipe(process.stdout);

Parser JSON ligne par ligne :

const { Transform } = require('stream');
const fs = require('fs');
const readline = require('readline');

const jsonParser = new Transform({
  transform(chunk, encoding, callback) {
    try {
      const obj = JSON.parse(chunk.toString());
      this.push(JSON.stringify(obj, null, 2) + '\n');
      callback();
    } catch (err) {
      callback(err);
    }
  }
});

fs.createReadStream('data.jsonl')
  .pipe(jsonParser)
  .pipe(fs.createWriteStream('formatted.json'));

Compter les lignes d'un fichier :

const { Transform } = require('stream');
const fs = require('fs');

let lineCount = 0;

const lineCounter = new Transform({
  transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    lineCount += lines.length - 1;
    this.push(chunk); // Passer les données
    callback();
  }
});

fs.createReadStream('file.txt')
  .pipe(lineCounter)
  .on('finish', () => {
    console.info('Nombre de lignes:', lineCount);
  });

Filtrer des données :

const { Transform } = require('stream');

const filter = new Transform({
  transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');
    const filtered = lines
      .filter(line => !line.startsWith('#')) // Ignorer les commentaires
      .join('\n');
    this.push(filtered);
    callback();
  }
});

process.stdin.pipe(filter).pipe(process.stdout);

Gestion des erreurs et backpressure

1. Backpressure automatique

Quand vous écrivez plus vite que le consumer peut lire, Node.js signale un backpressure :

const fs = require('fs');

const readable = fs.createReadStream('huge-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', chunk => {
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    // Backpressure! Pause la lecture
    readable.pause();
    console.info('Pause - buffer plein');
  }
});

writable.on('drain', () => {
  // Buffer vidé, reprendre
  readable.resume();
  console.info('Reprendre');
});

Avec pipe() - Automatique :

// pipe() gère le backpressure automatiquement
fs.createReadStream('input.txt')
  .pipe(fs.createWriteStream('output.txt'));

2. Gestion des erreurs

Toujours écouter les événements 'error' :

const fs = require('fs');

const readable = fs.createReadStream('missing.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('error', err => {
  if (err.code === 'ENOENT') {
    console.error('Fichier introuvable');
  } else {
    console.error('Erreur lecture:', err);
  }
});

writable.on('error', err => {
  console.error('Erreur écriture:', err);
});

readable.pipe(writable);

Pipeline moderne (Node.js 15+) :

const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');

async function compress() {
  try {
    await pipeline(
      fs.createReadStream('input.txt'),
      zlib.createGzip(),
      fs.createWriteStream('input.txt.gz')
    );
    console.info('Compressé!');
  } catch (err) {
    console.error('Erreur pipeline:', err);
    // Le cleanup est automatique
  }
}

compress();
À retenir : Utilisez pipeline() pour un code plus robuste. Il gère les erreurs et le cleanup automatiquement.

Cas d'usage pratiques et patterns avancés

1. Télécharger et sauvegarder un fichier

const http = require('http');
const fs = require('fs');

http.get('https://example.com/large-file.zip', response => {
  response.pipe(fs.createWriteStream('downloaded.zip'))
    .on('finish', () => {
      console.info('Téléchargé!');
    });
});

2. Parser un CSV volumineux

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
  input: fs.createReadStream('large.csv'),
  crlfDelay: Infinity
});

rl.on('line', line => {
  const [name, email, age] = line.split(',');
  console.info(`Processus: ${name}`);
});

3. Servir un fichier PDF via HTTP

const http = require('http');
const fs = require('fs');

http.createServer((req, res) => {
  if (req.url === '/download') {
    res.setHeader('Content-Type', 'application/pdf');
    res.setHeader('Content-Disposition', 'attachment; filename=file.pdf');

    fs.createReadStream('large-file.pdf').pipe(res);
  }
}).listen(3000);

4. Transformer et sauvegarder des données

const { Transform } = require('stream');
const fs = require('fs');

const csvToJSON = new Transform({
  transform(chunk, encoding, callback) {
    const [name, age, city] = chunk.toString().trim().split(',');
    const json = JSON.stringify({ name, age, city }) + '\n';
    this.push(json);
    callback();
  }
});

fs.createReadStream('data.csv')
  .pipe(csvToJSON)
  .pipe(fs.createWriteStream('data.json'));

5. Streamer des données vers une base de données

const { Transform } = require('stream');
const fs = require('fs');
const db = require('sqlite3');

const dbWriter = new Transform({
  async transform(chunk, encoding, callback) {
    const data = JSON.parse(chunk);
    await db.run('INSERT INTO users VALUES (?, ?)',
      [data.name, data.email]);
    callback();
  }
});

fs.createReadStream('users.jsonl')
  .pipe(dbWriter)
  .on('finish', () => {
    console.info('Données insérées!');
  });

Bonnes pratiques :

  • Toujours écouter l'événement 'error' sur chaque stream
  • Utilisez pipeline() au lieu de chaîner pipe()
  • Contrôlez le highWaterMark selon votre cas d'usage
  • Testez avec des fichiers volumineux avant production
  • Utilisez des outils de profiling pour vérifier la mémoire