Front-end angularforall.com

- RxJS Operators : map, filter, switchMap & merge

Rxjs Operators Angular Observable
RxJS Operators : map, filter, switchMap & merge

Guide complet des operators RxJS les plus utilisés : map, filter, switchMap, mergeMap, flatMap. Exemples concrets et différences de comportement.

Mental model RxJS

RxJS operators sont des fonctions qui transforment des Observables. Pensez à un pipeline d'usine : les données brutes entrent d'un côté, les operators les transforment étape par étape, et les données traitées sortent de l'autre côté.

// Toujours utiliser pipe() pour chaîner les operators
import { fromEvent, of } from 'rxjs';
import { map, filter, debounceTime, switchMap, catchError } from 'rxjs/operators';

// Pattern type : pipeline de recherche en temps réel
const searchInput = document.getElementById('search') as HTMLInputElement;

fromEvent(searchInput, 'input').pipe(
    map(event => (event.target as HTMLInputElement).value),  // Extraire la valeur
    map(text => text.trim()),                                 // Nettoyer
    filter(text => text.length >= 2),                        // Filtrer les trop courts
    debounceTime(300),                                        // Attendre la fin de frappe
    distinctUntilChanged(),                                   // Ignorer si même valeur
    switchMap(query => searchAPI(query)),                     // Lancer la recherche (annule la précédente)
    catchError(err => of([]))                                 // Gérer les erreurs
).subscribe(results => renderResults(results));
Mental model : Observable = flux de données qui arrive dans le temps. Operator = transformation à appliquer à chaque valeur. pipe(op1, op2, op3) = pipeline de transformations séquentielles.

map() et tap() — transformer et observer

import { map, tap } from 'rxjs/operators';

// map() — transformer chaque valeur émise
this.userService.getUsers().pipe(
    map(users => users.map(u => ({
        ...u,
        fullName: `${u.firstName} ${u.lastName}`,
        initials: `${u.firstName[0]}${u.lastName[0]}`,
        avatarUrl: `https://api.dicebear.com/7.x/initials/svg?seed=${u.firstName}`,
    }))),
    map(users => users.sort((a, b) => a.lastName.localeCompare(b.lastName)))
).subscribe(users => this.users = users);

// tap() — observer sans modifier (debug, side effects)
// tap() est transparent : laisse passer les valeurs sans les modifier
this.userService.getUsers().pipe(
    tap(users => console.log('Avant filtre:', users.length)),
    map(users => users.filter(u => u.isActive)),
    tap(users => console.log('Après filtre:', users.length)),
    tap(users => this.analytics.track('users_loaded', { count: users.length }))
).subscribe(users => this.activeUsers = users);

// Différence clé : map() retourne une nouvelle valeur, tap() retourne la valeur originale

filter() et distinctUntilChanged()

import { filter, distinctUntilChanged, map } from 'rxjs/operators';

// filter() — ne laisser passer que les valeurs qui satisfont la condition
this.events$.pipe(
    filter(event => event.type === 'USER_ACTION'),         // Par type
    filter(event => event.userId !== null),                 // Non null
    filter((event): event is UserActionEvent => !!event.payload) // Type guard TypeScript
).subscribe(event => this.handleUserAction(event));

// distinctUntilChanged() — ignorer les émissions si la valeur n'a pas changé
// Par défaut, compare par référence (===)
this.searchInput$.pipe(
    map(v => v.trim().toLowerCase()),
    distinctUntilChanged(), // "angular " puis "angular " → n'émet qu'une fois
).subscribe(query => this.search(query));

// distinctUntilChanged() avec comparateur personnalisé
this.user$.pipe(
    distinctUntilChanged((prev, curr) => prev.id === curr.id) // Comparaison par ID
).subscribe(user => this.loadUserProfile(user.id));

// Combo typique : recherche debounced
this.searchControl.valueChanges.pipe(
    debounceTime(300),
    distinctUntilChanged(),
    filter(query => (query?.length ?? 0) >= 2),
    switchMap(query => this.searchService.search(query!))
).subscribe(results => this.results = results);

switchMap, mergeMap, concatMap, exhaustMap

Ces 4 operators "aplatissent" un Observable d'Observables. La différence est leur stratégie de gestion des requêtes concurrentes.

Operator Comportement Cas d'usage
switchMapAnnule la requête précédenteRecherche live, filtres, navigation de route
mergeMapToutes en parallèleUpload multiple, suppressions parallèles
concatMapUne par une, dans l'ordreActions ordonnées, retry séquentiel
exhaustMapIgnore les nouvelles si une est en coursBouton de soumission (prévenir double-clic)
import { switchMap, mergeMap, concatMap, exhaustMap, from, of } from 'rxjs';

// switchMap — recherche live : annule la précédente à chaque keystroke
this.search$.pipe(
    switchMap(query => this.api.search(query)) // Annule requête précédente
).subscribe(results => this.results = results);

// mergeMap — upload parallèle
from(selectedFiles).pipe(
    mergeMap(file => this.uploadService.upload(file), 3) // Max 3 en parallèle
).subscribe(result => this.uploadedFiles.push(result));

// concatMap — actions ordonnées (ex: sauvegarder des items dans l'ordre)
saveQueue$.pipe(
    concatMap(item => this.api.save(item)) // Une save à la fois, dans l'ordre
).subscribe(saved => console.log('Sauvegardé:', saved));

// exhaustMap — bouton submit (ignorer si requête en cours)
fromEvent(submitBtn, 'click').pipe(
    exhaustMap(() => this.api.submitForm(formData))
    // Si l'utilisateur clique plusieurs fois, seule la première est traitée
).subscribe(response => this.handleResponse(response));

debounceTime() et throttleTime()

import { debounceTime, throttleTime, auditTime } from 'rxjs/operators';

// debounceTime — n'émet que si N ms se sont écoulées sans nouvelle émission
// Cas d'usage : frappe dans un champ de recherche
this.searchControl.valueChanges.pipe(
    debounceTime(400) // Attendre 400ms de pause avant d'émettre
).subscribe(query => this.search(query));

// throttleTime — émet la première valeur, ignore pendant N ms
// Cas d'usage : bouton "like" (éviter le spam)
fromEvent(likeBtn, 'click').pipe(
    throttleTime(1000) // Une fois par seconde maximum
).subscribe(() => this.toggleLike());

// auditTime — émet la dernière valeur APRÈS N ms
// Cas d'usage : scroll, resize (prendre la dernière position)
fromEvent(window, 'scroll').pipe(
    auditTime(100) // Émet 100ms après le dernier scroll
).subscribe(() => this.updateScrollPosition());

combineLatest() et forkJoin()

import { combineLatest, forkJoin } from 'rxjs';

// combineLatest — émet à chaque changement de n'importe quel Observable
// Les deux Observables doivent avoir émis au moins une valeur
const vm$ = combineLatest({
    user: this.userService.currentUser$,
    products: this.productService.products$,
    cart: this.cartService.cart$,
}).pipe(
    map(({ user, products, cart }) => ({
        user,
        products: products.filter(p => p.isAvailable),
        cartTotal: cart.reduce((sum, item) => sum + item.price, 0),
    }))
);

// Utilisation dans le template :
// @if (vm$ | async; as vm) { ... }

// forkJoin — attend que TOUS les Observables se complètent (HTTP calls)
// Comme Promise.all() mais pour les Observables
forkJoin({
    user: this.userService.getUser(userId),
    orders: this.orderService.getOrders(userId),
    addresses: this.addressService.getAddresses(userId),
}).subscribe(({ user, orders, addresses }) => {
    this.pageData = { user, orders, addresses };
});

// Différence clé :
// combineLatest → pour des streams continus (BehaviorSubject, Observables infinis)
// forkJoin → pour des requêtes HTTP (se complètent)

catchError() et retry()

import { catchError, retry, retryWhen, delay, take } from 'rxjs/operators';
import { of, throwError, timer } from 'rxjs';

// catchError — intercepter une erreur et émettre une valeur de fallback
this.userService.getUser(id).pipe(
    catchError(error => {
        if (error.status === 404) {
            return of(null); // Retourner null si non trouvé
        }
        if (error.status === 401) {
            this.router.navigate(['/login']);
            return of(null);
        }
        return throwError(() => error); // Re-propager les autres erreurs
    })
).subscribe(user => this.user = user);

// retry() — réessayer N fois en cas d'erreur
this.httpClient.get('/api/data').pipe(
    retry(3) // Réessayer 3 fois avant d'abandonner
).subscribe(data => this.data = data);

// Backoff exponentiel — attendre plus longtemps à chaque retry
this.httpClient.get('/api/unstable').pipe(
    retryWhen(errors =>
        errors.pipe(
            // Attendre 1s, 2s, 4s avant chaque retry
            delay(1000 * Math.pow(2, attempt++)),
            take(4) // Maximum 3 retries + 1 throw
        )
    ),
    catchError(err => {
        console.error('Échec après 3 retries:', err);
        return of(null);
    })
)

takeUntil() et takeUntilDestroyed()

import { takeUntil, takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { Subject } from 'rxjs';

// === ANCIENNE APPROCHE : takeUntil + destroy$ Subject ===
@Component({ ... })
export class OldComponent implements OnDestroy {
    private destroy$ = new Subject<void>();

    ngOnInit() {
        // Tous les subscriptions se désabonnent quand destroy$ émet
        this.dataService.stream$.pipe(
            takeUntil(this.destroy$)
        ).subscribe(data => this.data = data);
    }

    ngOnDestroy() {
        this.destroy$.next(); // Signal de destruction
        this.destroy$.complete();
    }
}

// === NOUVELLE APPROCHE : takeUntilDestroyed (Angular 16+) ===
@Component({ ... })
export class ModernComponent {
    constructor() {
        // Se désabonne automatiquement quand le composant est détruit
        // Doit être appelé dans le contexte d'injection (constructor ou inject())
        this.dataService.stream$.pipe(
            takeUntilDestroyed() // ← Plus besoin de ngOnDestroy
        ).subscribe(data => this.data = data);
    }
}

// Utiliser hors du contexte d'injection
@Injectable({ providedIn: 'root' })
export class DataService {
    private destroyRef = inject(DestroyRef);

    startPolling() {
        interval(5000).pipe(
            takeUntilDestroyed(this.destroyRef) // Passer DestroyRef explicitement
        ).subscribe(() => this.refresh());
    }
}

Tableau récapitulatif

Operator Catégorie Effet Cas d'usage principal
map()TransformationTransforme chaque valeurAPI response → model métier
tap()Side effectsObserve sans modifierLogging, analytics
filter()FiltrageBloque certaines valeursValidation, type guard
distinctUntilChanged()FiltrageIgnore les doublons consécutifsRecherche, formulaires
switchMap()AplatissementAnnule le précédentRecherche live
mergeMap()AplatissementParallèle illimitéUpload multiple
concatMap()AplatissementSéquentielSauvegardes ordonnées
exhaustMap()AplatissementIgnore si en coursBouton submit
debounceTime()TimingÉmet après pauseInput utilisateur
throttleTime()TimingRate limitingScroll, like
combineLatest()CombinaisonÉmet si un changeViewModel multi-sources
forkJoin()CombinaisonAttend tousChargement parallèle
catchError()ErreursIntercepte les erreursFallback HTTP
takeUntilDestroyed()Cycle de vieAuto-unsubscribeComposants Angular

Partager