Combinez plusieurs Observables avec combineLatest, forkJoin, merge, concat. Quand les utiliser, différences de comportement et cas d'usage réels.
combineLatest() — Réagir à chaque changement
combineLatest() combine les valeurs les plus récentes de N observables. Il émet chaque fois qu'UN observable émet, en fournissant les dernières valeurs de tous les autres. Condition : tous doivent avoir émis au moins une valeur.
// combinelatest-filters.component.ts — Filtres produits réactifs
import { Component, inject, OnDestroy } from '@angular/core';
import { combineLatest, Subject } from 'rxjs';
import { switchMap, takeUntil, map, startWith, debounceTime } from 'rxjs/operators';
import { AsyncPipe } from '@angular/common';
@Component({
standalone: true,
imports: [AsyncPipe],
template: `
<!-- Les 3 filtres sont réactifs — chaque changement relance la requête -->
@if (products$ | async; as vm) {
<p>{{ vm.total }} produits — page {{ vm.page }}</p>
@for (p of vm.products; track p.id) { <product-card [product]="p" /> }
}
`
})
export class ProductListComponent implements OnDestroy {
private api = inject(ApiService);
private filterService = inject(FilterService);
private destroy$ = new Subject<void>();
// ViewModel réactif — émet chaque fois qu'un filtre change
products$ = combineLatest({
category: this.filterService.category$.pipe(startWith('all')),
search: this.filterService.search$.pipe(startWith(''), debounceTime(300)),
page: this.filterService.page$.pipe(startWith(1)),
}).pipe(
// switchMap annule la requête précédente si les filtres changent avant la réponse
switchMap(({ category, search, page }) =>
this.api.getProducts({ category, search, page }).pipe(
map(data => ({ products: data.items, total: data.total, page }))
)
),
takeUntil(this.destroy$)
);
ngOnDestroy(): void {
this.destroy$.next();
this.destroy$.complete();
}
}
// Note : startWith() est crucial — sans lui, combineLatest n'émet pas tant que
// TOUS les observables n'ont pas émis. startWith force une valeur initiale.
combineLatest ne complète jamais (ex: BehaviorSubject), combineLatest ne se termine jamais non plus. Toujours utiliser takeUntil() ou takeUntilDestroyed() pour éviter les memory leaks.
withLatestFrom() — Snapshot au moment de l'action
withLatestFrom() émet seulement quand l'observable source émet, et ajoute la dernière valeur d'un autre observable en snapshot — sans déclencher sur les changements de cet autre observable.
// Différence combineLatest vs withLatestFrom
import { fromEvent, combineLatest, merge } from 'rxjs';
import { withLatestFrom, switchMap } from 'rxjs/operators';
// ❌ combineLatest — déclenche quand user$ change (pas voulu ici)
combineLatest([this.submitClick$, this.user$]).pipe(
switchMap(([_, user]) => this.saveForm(user))
);
// ✅ withLatestFrom — déclenche SEULEMENT au clic, snapshot de user$ au moment du clic
this.submitClick$.pipe(
withLatestFrom(this.user$), // user$ = snapshot, pas trigger
switchMap(([formData, user]) => {
// On a les données du formulaire ET l'utilisateur courant
return this.api.save({ ...formData, userId: user.id });
})
).subscribe();
// Autre exemple : bouton "Exporter" avec filtres courants
this.exportButton$.pipe(
withLatestFrom(
this.filterService.activeFilters$, // état courant des filtres
this.sortService.currentSort$ // tri courant
),
switchMap(([_, filters, sort]) =>
this.api.exportCsv({ filters, sort })
)
).subscribe(blob => this.downloadFile(blob));
forkJoin() — Chargement parallèle garanti
forkJoin() lance tous les observables en parallèle et n'émet qu'une seule fois quand TOUS se sont complétés. Si l'un échoue, tout échoue — gérer avec catchError sur chaque observable individuel.
// dashboard.component.ts — Chargement parallèle avec gestion d'erreur individuelle
import { Component, inject } from '@angular/core';
import { forkJoin, of } from 'rxjs';
import { catchError, finalize } from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
interface DashboardData {
users: User[];
stats: Stats | null;
notifications: Notification[];
config: AppConfig;
}
@Component({ standalone: true })
export class DashboardComponent {
private api = inject(ApiService);
isLoading = true;
data?: DashboardData;
errors: string[] = [];
constructor() {
forkJoin({
// Chaque appel est "bulletproof" — les erreurs sont transformées en null
users: this.api.getUsers().pipe(
catchError(err => {
this.errors.push('Impossible de charger les utilisateurs');
return of([]); // valeur de fallback
})
),
stats: this.api.getStats().pipe(
catchError(() => of(null)) // null = pas de stats disponibles
),
notifications: this.api.getNotifications().pipe(
catchError(() => of([]))
),
config: this.api.getConfig().pipe(
// config est critique — si elle échoue, on propage l'erreur
),
}).pipe(
finalize(() => this.isLoading = false),
takeUntilDestroyed() // Angular 16+ — cleanup automatique
).subscribe({
next: (data) => { this.data = data; },
error: (err) => {
console.error('Erreur critique :', err);
this.isLoading = false;
}
});
}
}
// Charger N utilisateurs en parallèle (liste dynamique)
const userIds = [1, 2, 3, 4, 5];
forkJoin(
userIds.map(id => this.api.getUser(id).pipe(catchError(() => of(null))))
).subscribe(users => {
// users = [User1, null, User3, User4, null] — null pour les erreurs
const loaded = users.filter((u): u is User => u !== null);
console.log(`${loaded.length}/${userIds.length} utilisateurs chargés`);
});
merge() — Fusionner les émissions
merge() combine plusieurs observables et retransmet chaque émission dès qu'elle arrive, sans attendre les autres. Parfait pour centraliser plusieurs sources de déclenchement.
// Refresh depuis plusieurs sources indépendantes
import { merge, fromEvent, timer, Subject } from 'rxjs';
import { switchMap, startWith, share } from 'rxjs/operators';
@Component({ standalone: true })
export class LiveDataComponent {
private api = inject(DataService);
private refresh$ = new Subject<void>();
// Toutes ces sources déclenchent un reload
private trigger$ = merge(
of(null), // Immédiatement au démarrage
this.refresh$, // Bouton refresh manuel
timer(0, 30_000), // Auto-refresh toutes les 30s
fromEvent(document, 'visibilitychange').pipe( // Quand l'onglet redevient actif
filter(() => document.visibilityState === 'visible')
)
);
data$ = this.trigger$.pipe(
switchMap(() => this.api.getLiveData()),
share() // Partager entre plusieurs abonnés sans relancer la requête
);
onRefresh(): void {
this.refresh$.next(); // Déclenche le refresh manuellement
}
}
zip() — Synchroniser en paires ordonnées
zip() synchronise les observables index par index : émet quand chaque observable a émis sa N-ième valeur. Rare en pratique, utile pour les animations séquentielles ou les tests.
// zip — synchroniser une animation avec des données
import { zip, interval, from } from 'rxjs';
import { map } from 'rxjs/operators';
const messages = ['Chargement...', 'Analyse...', 'Préparation...', 'Terminé !'];
// zip : affiche un message toutes les 800ms, en ordre
zip(
from(messages), // émet 4 valeurs immédiatement
interval(800) // émet 0, 1, 2, 3 toutes les 800ms
).pipe(
map(([message, _]) => message) // prendre juste le message
).subscribe(msg => {
this.statusMessage = msg; // Chargement... → Analyse... → ...
});
// Cas réel : corréler deux streams par position
// ex: questions et réponses d'un quiz qui arrivent en parallèle
zip(
this.questionsStream$,
this.answersStream$
).subscribe(([question, answer]) => {
this.results.push({ question, answer, correct: answer === question.expected });
});
race() — Premier qui émet gagne
race() souscrit à plusieurs observables et se connecte exclusivement au premier qui émet — les autres sont ignorés. Utile pour implémenter des timeouts ou des fallbacks.
// race — timeout sur une requête HTTP
import { race, timer, throwError } from 'rxjs';
import { switchMap } from 'rxjs/operators';
function withTimeout<T>(observable: Observable<T>, ms = 5000): Observable<T> {
return race(
observable,
timer(ms).pipe(
switchMap(() => throwError(() => new Error(`Timeout après ${ms}ms`)))
)
);
}
// Utilisation
withTimeout(this.api.getSlowData(), 3000).pipe(
catchError(err => {
if (err.message.startsWith('Timeout')) {
return this.api.getFallbackData(); // données de cache
}
return throwError(() => err);
})
).subscribe();
Patterns Angular : ViewModel réactif
Le pattern ViewModel réactif combine combineLatest et le pipe async pour créer une seule souscription qui gère tous les états (loading, data, error).
// search.component.ts — Pattern ViewModel complet avec états
import { Component, inject } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { combineLatest, BehaviorSubject } from 'rxjs';
import { switchMap, startWith, debounceTime, distinctUntilChanged, map, catchError, of } from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { AsyncPipe } from '@angular/common';
interface SearchVM {
query: string;
results: Article[];
total: number;
loading: boolean;
error: string | null;
}
@Component({
standalone: true,
imports: [ReactiveFormsModule, AsyncPipe],
template: `
<input [formControl]="searchControl" placeholder="Rechercher...">
@if (vm$ | async; as vm) {
@if (vm.loading) { <spinner /> }
@if (vm.error) { <p class="text-danger">{{ vm.error }}</p> }
@if (!vm.loading && !vm.error) {
<p>{{ vm.total }} résultats pour "{{ vm.query }}"</p>
@for (article of vm.results; track article.id) {
<article-card [article]="article" />
}
}
}
`
})
export class SearchComponent {
private api = inject(SearchService);
searchControl = new FormControl('');
private loading$ = new BehaviorSubject(false);
vm$ = this.searchControl.valueChanges.pipe(
startWith(''),
debounceTime(300),
distinctUntilChanged(),
switchMap(query => {
this.loading$.next(true);
return combineLatest({
results: this.api.search(query ?? '').pipe(catchError(() => of({ items: [], total: 0 }))),
loading: this.loading$.asObservable(),
}).pipe(
map(({ results, loading }) => ({
query: query ?? '',
results: results.items,
total: results.total,
loading,
error: null,
} satisfies SearchVM))
);
}),
takeUntilDestroyed()
);
}
Tableau comparatif complet
| Operator | Déclencheur | Émet N fois | Cas d'usage principal |
|---|---|---|---|
| combineLatest | N'importe quel observable change | Continu | Filtres, formulaires réactifs, ViewModel |
| withLatestFrom | Seulement l'observable source | Continu | Action + snapshot d'état, submit + user |
| forkJoin | Tous complétés | 1 fois | Chargement initial, requêtes parallèles |
| merge | N'importe quel observable émet | Continu | Multi-source → une action, refresh |
| zip | Tous ont émis leur N-ième valeur | Par paires | Animation, test, corrélation indexée |
| race | Le premier à émettre | Continu (1 gagnant) | Timeout, fallback, premières données |
- Filtres qui changent en temps réel →
combineLatest+switchMap+debounceTime - Action + lire l'état courant →
withLatestFrom - Charger N endpoints en parallèle →
forkJoin+catchErrorsur chaque - Plusieurs sources déclenchent la même chose →
merge - Timeout sur une requête →
race+timer - Jamais de combineLatest sans startWith si les observables n'ont pas encore émis