Front-end angularforall.com

- Combiner Observables : combineLatest, forkJoin

Rxjs Observables Combine Angular
Combiner Observables : combineLatest, forkJoin

Combinez plusieurs Observables avec combineLatest, forkJoin, merge, concat. Quand les utiliser, différences de comportement et cas d'usage réels.

combineLatest() — Réagir à chaque changement

combineLatest() combine les valeurs les plus récentes de N observables. Il émet chaque fois qu'UN observable émet, en fournissant les dernières valeurs de tous les autres. Condition : tous doivent avoir émis au moins une valeur.

// combinelatest-filters.component.ts — Filtres produits réactifs
import { Component, inject, OnDestroy } from '@angular/core';
import { combineLatest, Subject } from 'rxjs';
import { switchMap, takeUntil, map, startWith, debounceTime } from 'rxjs/operators';
import { AsyncPipe } from '@angular/common';

@Component({
    standalone: true,
    imports: [AsyncPipe],
    template: `
        <!-- Les 3 filtres sont réactifs — chaque changement relance la requête -->
        @if (products$ | async; as vm) {
            <p>{{ vm.total }} produits — page {{ vm.page }}</p>
            @for (p of vm.products; track p.id) { <product-card [product]="p" /> }
        }
    `
})
export class ProductListComponent implements OnDestroy {
    private api = inject(ApiService);
    private filterService = inject(FilterService);
    private destroy$ = new Subject<void>();

    // ViewModel réactif — émet chaque fois qu'un filtre change
    products$ = combineLatest({
        category: this.filterService.category$.pipe(startWith('all')),
        search:   this.filterService.search$.pipe(startWith(''), debounceTime(300)),
        page:     this.filterService.page$.pipe(startWith(1)),
    }).pipe(
        // switchMap annule la requête précédente si les filtres changent avant la réponse
        switchMap(({ category, search, page }) =>
            this.api.getProducts({ category, search, page }).pipe(
                map(data => ({ products: data.items, total: data.total, page }))
            )
        ),
        takeUntil(this.destroy$)
    );

    ngOnDestroy(): void {
        this.destroy$.next();
        this.destroy$.complete();
    }
}
// Note : startWith() est crucial — sans lui, combineLatest n'émet pas tant que
// TOUS les observables n'ont pas émis. startWith force une valeur initiale.
Piège classique : si un observable de combineLatest ne complète jamais (ex: BehaviorSubject), combineLatest ne se termine jamais non plus. Toujours utiliser takeUntil() ou takeUntilDestroyed() pour éviter les memory leaks.

withLatestFrom() — Snapshot au moment de l'action

withLatestFrom() émet seulement quand l'observable source émet, et ajoute la dernière valeur d'un autre observable en snapshot — sans déclencher sur les changements de cet autre observable.

// Différence combineLatest vs withLatestFrom
import { fromEvent, combineLatest, merge } from 'rxjs';
import { withLatestFrom, switchMap } from 'rxjs/operators';

// ❌ combineLatest — déclenche quand user$ change (pas voulu ici)
combineLatest([this.submitClick$, this.user$]).pipe(
    switchMap(([_, user]) => this.saveForm(user))
);

// ✅ withLatestFrom — déclenche SEULEMENT au clic, snapshot de user$ au moment du clic
this.submitClick$.pipe(
    withLatestFrom(this.user$),                // user$ = snapshot, pas trigger
    switchMap(([formData, user]) => {
        // On a les données du formulaire ET l'utilisateur courant
        return this.api.save({ ...formData, userId: user.id });
    })
).subscribe();

// Autre exemple : bouton "Exporter" avec filtres courants
this.exportButton$.pipe(
    withLatestFrom(
        this.filterService.activeFilters$,  // état courant des filtres
        this.sortService.currentSort$       // tri courant
    ),
    switchMap(([_, filters, sort]) =>
        this.api.exportCsv({ filters, sort })
    )
).subscribe(blob => this.downloadFile(blob));

forkJoin() — Chargement parallèle garanti

forkJoin() lance tous les observables en parallèle et n'émet qu'une seule fois quand TOUS se sont complétés. Si l'un échoue, tout échoue — gérer avec catchError sur chaque observable individuel.

// dashboard.component.ts — Chargement parallèle avec gestion d'erreur individuelle
import { Component, inject } from '@angular/core';
import { forkJoin, of } from 'rxjs';
import { catchError, finalize } from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

interface DashboardData {
    users: User[];
    stats: Stats | null;
    notifications: Notification[];
    config: AppConfig;
}

@Component({ standalone: true })
export class DashboardComponent {
    private api = inject(ApiService);
    isLoading = true;
    data?: DashboardData;
    errors: string[] = [];

    constructor() {
        forkJoin({
            // Chaque appel est "bulletproof" — les erreurs sont transformées en null
            users: this.api.getUsers().pipe(
                catchError(err => {
                    this.errors.push('Impossible de charger les utilisateurs');
                    return of([]);  // valeur de fallback
                })
            ),
            stats: this.api.getStats().pipe(
                catchError(() => of(null))  // null = pas de stats disponibles
            ),
            notifications: this.api.getNotifications().pipe(
                catchError(() => of([]))
            ),
            config: this.api.getConfig().pipe(
                // config est critique — si elle échoue, on propage l'erreur
            ),
        }).pipe(
            finalize(() => this.isLoading = false),
            takeUntilDestroyed()  // Angular 16+ — cleanup automatique
        ).subscribe({
            next: (data) => { this.data = data; },
            error: (err) => {
                console.error('Erreur critique :', err);
                this.isLoading = false;
            }
        });
    }
}
forkJoin avec des tableaux dynamiques :
// Charger N utilisateurs en parallèle (liste dynamique)
const userIds = [1, 2, 3, 4, 5];

forkJoin(
    userIds.map(id => this.api.getUser(id).pipe(catchError(() => of(null))))
).subscribe(users => {
    // users = [User1, null, User3, User4, null] — null pour les erreurs
    const loaded = users.filter((u): u is User => u !== null);
    console.log(`${loaded.length}/${userIds.length} utilisateurs chargés`);
});

merge() — Fusionner les émissions

merge() combine plusieurs observables et retransmet chaque émission dès qu'elle arrive, sans attendre les autres. Parfait pour centraliser plusieurs sources de déclenchement.

// Refresh depuis plusieurs sources indépendantes
import { merge, fromEvent, timer, Subject } from 'rxjs';
import { switchMap, startWith, share } from 'rxjs/operators';

@Component({ standalone: true })
export class LiveDataComponent {
    private api = inject(DataService);
    private refresh$ = new Subject<void>();

    // Toutes ces sources déclenchent un reload
    private trigger$ = merge(
        of(null),                             // Immédiatement au démarrage
        this.refresh$,                        // Bouton refresh manuel
        timer(0, 30_000),                     // Auto-refresh toutes les 30s
        fromEvent(document, 'visibilitychange').pipe( // Quand l'onglet redevient actif
            filter(() => document.visibilityState === 'visible')
        )
    );

    data$ = this.trigger$.pipe(
        switchMap(() => this.api.getLiveData()),
        share()  // Partager entre plusieurs abonnés sans relancer la requête
    );

    onRefresh(): void {
        this.refresh$.next(); // Déclenche le refresh manuellement
    }
}

zip() — Synchroniser en paires ordonnées

zip() synchronise les observables index par index : émet quand chaque observable a émis sa N-ième valeur. Rare en pratique, utile pour les animations séquentielles ou les tests.

// zip — synchroniser une animation avec des données
import { zip, interval, from } from 'rxjs';
import { map } from 'rxjs/operators';

const messages = ['Chargement...', 'Analyse...', 'Préparation...', 'Terminé !'];

// zip : affiche un message toutes les 800ms, en ordre
zip(
    from(messages),         // émet 4 valeurs immédiatement
    interval(800)           // émet 0, 1, 2, 3 toutes les 800ms
).pipe(
    map(([message, _]) => message)  // prendre juste le message
).subscribe(msg => {
    this.statusMessage = msg;  // Chargement... → Analyse... → ...
});

// Cas réel : corréler deux streams par position
// ex: questions et réponses d'un quiz qui arrivent en parallèle
zip(
    this.questionsStream$,
    this.answersStream$
).subscribe(([question, answer]) => {
    this.results.push({ question, answer, correct: answer === question.expected });
});

race() — Premier qui émet gagne

race() souscrit à plusieurs observables et se connecte exclusivement au premier qui émet — les autres sont ignorés. Utile pour implémenter des timeouts ou des fallbacks.

// race — timeout sur une requête HTTP
import { race, timer, throwError } from 'rxjs';
import { switchMap } from 'rxjs/operators';

function withTimeout<T>(observable: Observable<T>, ms = 5000): Observable<T> {
    return race(
        observable,
        timer(ms).pipe(
            switchMap(() => throwError(() => new Error(`Timeout après ${ms}ms`)))
        )
    );
}

// Utilisation
withTimeout(this.api.getSlowData(), 3000).pipe(
    catchError(err => {
        if (err.message.startsWith('Timeout')) {
            return this.api.getFallbackData(); // données de cache
        }
        return throwError(() => err);
    })
).subscribe();

Patterns Angular : ViewModel réactif

Le pattern ViewModel réactif combine combineLatest et le pipe async pour créer une seule souscription qui gère tous les états (loading, data, error).

// search.component.ts — Pattern ViewModel complet avec états
import { Component, inject } from '@angular/core';
import { FormControl, ReactiveFormsModule } from '@angular/forms';
import { combineLatest, BehaviorSubject } from 'rxjs';
import { switchMap, startWith, debounceTime, distinctUntilChanged, map, catchError, of } from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { AsyncPipe } from '@angular/common';

interface SearchVM {
    query: string;
    results: Article[];
    total: number;
    loading: boolean;
    error: string | null;
}

@Component({
    standalone: true,
    imports: [ReactiveFormsModule, AsyncPipe],
    template: `
        <input [formControl]="searchControl" placeholder="Rechercher...">

        @if (vm$ | async; as vm) {
            @if (vm.loading) { <spinner /> }
            @if (vm.error) { <p class="text-danger">{{ vm.error }}</p> }
            @if (!vm.loading && !vm.error) {
                <p>{{ vm.total }} résultats pour "{{ vm.query }}"</p>
                @for (article of vm.results; track article.id) {
                    <article-card [article]="article" />
                }
            }
        }
    `
})
export class SearchComponent {
    private api = inject(SearchService);
    searchControl = new FormControl('');
    private loading$ = new BehaviorSubject(false);

    vm$ = this.searchControl.valueChanges.pipe(
        startWith(''),
        debounceTime(300),
        distinctUntilChanged(),
        switchMap(query => {
            this.loading$.next(true);
            return combineLatest({
                results: this.api.search(query ?? '').pipe(catchError(() => of({ items: [], total: 0 }))),
                loading: this.loading$.asObservable(),
            }).pipe(
                map(({ results, loading }) => ({
                    query: query ?? '',
                    results: results.items,
                    total: results.total,
                    loading,
                    error: null,
                } satisfies SearchVM))
            );
        }),
        takeUntilDestroyed()
    );
}

Tableau comparatif complet

Operator Déclencheur Émet N fois Cas d'usage principal
combineLatestN'importe quel observable changeContinuFiltres, formulaires réactifs, ViewModel
withLatestFromSeulement l'observable sourceContinuAction + snapshot d'état, submit + user
forkJoinTous complétés1 foisChargement initial, requêtes parallèles
mergeN'importe quel observable émetContinuMulti-source → une action, refresh
zipTous ont émis leur N-ième valeurPar pairesAnimation, test, corrélation indexée
raceLe premier à émettreContinu (1 gagnant)Timeout, fallback, premières données
  • Filtres qui changent en temps réelcombineLatest + switchMap + debounceTime
  • Action + lire l'état courantwithLatestFrom
  • Charger N endpoints en parallèleforkJoin + catchError sur chaque
  • Plusieurs sources déclenchent la même chosemerge
  • Timeout sur une requêterace + timer
  • Jamais de combineLatest sans startWith si les observables n'ont pas encore émis

Partager