Rxjs
Operators
Angular
Observable
Guide complet des operators RxJS les plus utilisés : map, filter, switchMap, mergeMap, flatMap. Exemples concrets et différences de comportement.
Mental model RxJS
RxJS operators sont des fonctions qui transforment des Observables. Pensez à un pipeline d'usine : les données brutes entrent d'un côté, les operators les transforment étape par étape, et les données traitées sortent de l'autre côté.
// Toujours utiliser pipe() pour chaîner les operators
import { fromEvent, of } from 'rxjs';
import { map, filter, debounceTime, switchMap, catchError } from 'rxjs/operators';
// Pattern type : pipeline de recherche en temps réel
const searchInput = document.getElementById('search') as HTMLInputElement;
fromEvent(searchInput, 'input').pipe(
map(event => (event.target as HTMLInputElement).value), // Extraire la valeur
map(text => text.trim()), // Nettoyer
filter(text => text.length >= 2), // Filtrer les trop courts
debounceTime(300), // Attendre la fin de frappe
distinctUntilChanged(), // Ignorer si même valeur
switchMap(query => searchAPI(query)), // Lancer la recherche (annule la précédente)
catchError(err => of([])) // Gérer les erreurs
).subscribe(results => renderResults(results));
Mental model : Observable = flux de données qui arrive dans le temps. Operator = transformation à appliquer à chaque valeur.
pipe(op1, op2, op3) = pipeline de transformations séquentielles.
map() et tap() — transformer et observer
import { map, tap } from 'rxjs/operators';
// map() — transformer chaque valeur émise
this.userService.getUsers().pipe(
map(users => users.map(u => ({
...u,
fullName: `${u.firstName} ${u.lastName}`,
initials: `${u.firstName[0]}${u.lastName[0]}`,
avatarUrl: `https://api.dicebear.com/7.x/initials/svg?seed=${u.firstName}`,
}))),
map(users => users.sort((a, b) => a.lastName.localeCompare(b.lastName)))
).subscribe(users => this.users = users);
// tap() — observer sans modifier (debug, side effects)
// tap() est transparent : laisse passer les valeurs sans les modifier
this.userService.getUsers().pipe(
tap(users => console.log('Avant filtre:', users.length)),
map(users => users.filter(u => u.isActive)),
tap(users => console.log('Après filtre:', users.length)),
tap(users => this.analytics.track('users_loaded', { count: users.length }))
).subscribe(users => this.activeUsers = users);
// Différence clé : map() retourne une nouvelle valeur, tap() retourne la valeur originale
filter() et distinctUntilChanged()
import { filter, distinctUntilChanged, map } from 'rxjs/operators';
// filter() — ne laisser passer que les valeurs qui satisfont la condition
this.events$.pipe(
filter(event => event.type === 'USER_ACTION'), // Par type
filter(event => event.userId !== null), // Non null
filter((event): event is UserActionEvent => !!event.payload) // Type guard TypeScript
).subscribe(event => this.handleUserAction(event));
// distinctUntilChanged() — ignorer les émissions si la valeur n'a pas changé
// Par défaut, compare par référence (===)
this.searchInput$.pipe(
map(v => v.trim().toLowerCase()),
distinctUntilChanged(), // "angular " puis "angular " → n'émet qu'une fois
).subscribe(query => this.search(query));
// distinctUntilChanged() avec comparateur personnalisé
this.user$.pipe(
distinctUntilChanged((prev, curr) => prev.id === curr.id) // Comparaison par ID
).subscribe(user => this.loadUserProfile(user.id));
// Combo typique : recherche debounced
this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
filter(query => (query?.length ?? 0) >= 2),
switchMap(query => this.searchService.search(query!))
).subscribe(results => this.results = results);
switchMap, mergeMap, concatMap, exhaustMap
Ces 4 operators "aplatissent" un Observable d'Observables. La différence est leur stratégie de gestion des requêtes concurrentes.
| Operator | Comportement | Cas d'usage |
|---|---|---|
| switchMap | Annule la requête précédente | Recherche live, filtres, navigation de route |
| mergeMap | Toutes en parallèle | Upload multiple, suppressions parallèles |
| concatMap | Une par une, dans l'ordre | Actions ordonnées, retry séquentiel |
| exhaustMap | Ignore les nouvelles si une est en cours | Bouton de soumission (prévenir double-clic) |
import { switchMap, mergeMap, concatMap, exhaustMap, from, of } from 'rxjs';
// switchMap — recherche live : annule la précédente à chaque keystroke
this.search$.pipe(
switchMap(query => this.api.search(query)) // Annule requête précédente
).subscribe(results => this.results = results);
// mergeMap — upload parallèle
from(selectedFiles).pipe(
mergeMap(file => this.uploadService.upload(file), 3) // Max 3 en parallèle
).subscribe(result => this.uploadedFiles.push(result));
// concatMap — actions ordonnées (ex: sauvegarder des items dans l'ordre)
saveQueue$.pipe(
concatMap(item => this.api.save(item)) // Une save à la fois, dans l'ordre
).subscribe(saved => console.log('Sauvegardé:', saved));
// exhaustMap — bouton submit (ignorer si requête en cours)
fromEvent(submitBtn, 'click').pipe(
exhaustMap(() => this.api.submitForm(formData))
// Si l'utilisateur clique plusieurs fois, seule la première est traitée
).subscribe(response => this.handleResponse(response));
debounceTime() et throttleTime()
import { debounceTime, throttleTime, auditTime } from 'rxjs/operators';
// debounceTime — n'émet que si N ms se sont écoulées sans nouvelle émission
// Cas d'usage : frappe dans un champ de recherche
this.searchControl.valueChanges.pipe(
debounceTime(400) // Attendre 400ms de pause avant d'émettre
).subscribe(query => this.search(query));
// throttleTime — émet la première valeur, ignore pendant N ms
// Cas d'usage : bouton "like" (éviter le spam)
fromEvent(likeBtn, 'click').pipe(
throttleTime(1000) // Une fois par seconde maximum
).subscribe(() => this.toggleLike());
// auditTime — émet la dernière valeur APRÈS N ms
// Cas d'usage : scroll, resize (prendre la dernière position)
fromEvent(window, 'scroll').pipe(
auditTime(100) // Émet 100ms après le dernier scroll
).subscribe(() => this.updateScrollPosition());
combineLatest() et forkJoin()
import { combineLatest, forkJoin } from 'rxjs';
// combineLatest — émet à chaque changement de n'importe quel Observable
// Les deux Observables doivent avoir émis au moins une valeur
const vm$ = combineLatest({
user: this.userService.currentUser$,
products: this.productService.products$,
cart: this.cartService.cart$,
}).pipe(
map(({ user, products, cart }) => ({
user,
products: products.filter(p => p.isAvailable),
cartTotal: cart.reduce((sum, item) => sum + item.price, 0),
}))
);
// Utilisation dans le template :
// @if (vm$ | async; as vm) { ... }
// forkJoin — attend que TOUS les Observables se complètent (HTTP calls)
// Comme Promise.all() mais pour les Observables
forkJoin({
user: this.userService.getUser(userId),
orders: this.orderService.getOrders(userId),
addresses: this.addressService.getAddresses(userId),
}).subscribe(({ user, orders, addresses }) => {
this.pageData = { user, orders, addresses };
});
// Différence clé :
// combineLatest → pour des streams continus (BehaviorSubject, Observables infinis)
// forkJoin → pour des requêtes HTTP (se complètent)
catchError() et retry()
import { catchError, retry, retryWhen, delay, take } from 'rxjs/operators';
import { of, throwError, timer } from 'rxjs';
// catchError — intercepter une erreur et émettre une valeur de fallback
this.userService.getUser(id).pipe(
catchError(error => {
if (error.status === 404) {
return of(null); // Retourner null si non trouvé
}
if (error.status === 401) {
this.router.navigate(['/login']);
return of(null);
}
return throwError(() => error); // Re-propager les autres erreurs
})
).subscribe(user => this.user = user);
// retry() — réessayer N fois en cas d'erreur
this.httpClient.get('/api/data').pipe(
retry(3) // Réessayer 3 fois avant d'abandonner
).subscribe(data => this.data = data);
// Backoff exponentiel — attendre plus longtemps à chaque retry
this.httpClient.get('/api/unstable').pipe(
retryWhen(errors =>
errors.pipe(
// Attendre 1s, 2s, 4s avant chaque retry
delay(1000 * Math.pow(2, attempt++)),
take(4) // Maximum 3 retries + 1 throw
)
),
catchError(err => {
console.error('Échec après 3 retries:', err);
return of(null);
})
)
takeUntil() et takeUntilDestroyed()
import { takeUntil, takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { Subject } from 'rxjs';
// === ANCIENNE APPROCHE : takeUntil + destroy$ Subject ===
@Component({ ... })
export class OldComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
// Tous les subscriptions se désabonnent quand destroy$ émet
this.dataService.stream$.pipe(
takeUntil(this.destroy$)
).subscribe(data => this.data = data);
}
ngOnDestroy() {
this.destroy$.next(); // Signal de destruction
this.destroy$.complete();
}
}
// === NOUVELLE APPROCHE : takeUntilDestroyed (Angular 16+) ===
@Component({ ... })
export class ModernComponent {
constructor() {
// Se désabonne automatiquement quand le composant est détruit
// Doit être appelé dans le contexte d'injection (constructor ou inject())
this.dataService.stream$.pipe(
takeUntilDestroyed() // ← Plus besoin de ngOnDestroy
).subscribe(data => this.data = data);
}
}
// Utiliser hors du contexte d'injection
@Injectable({ providedIn: 'root' })
export class DataService {
private destroyRef = inject(DestroyRef);
startPolling() {
interval(5000).pipe(
takeUntilDestroyed(this.destroyRef) // Passer DestroyRef explicitement
).subscribe(() => this.refresh());
}
}
Tableau récapitulatif
| Operator | Catégorie | Effet | Cas d'usage principal |
|---|---|---|---|
map() | Transformation | Transforme chaque valeur | API response → model métier |
tap() | Side effects | Observe sans modifier | Logging, analytics |
filter() | Filtrage | Bloque certaines valeurs | Validation, type guard |
distinctUntilChanged() | Filtrage | Ignore les doublons consécutifs | Recherche, formulaires |
switchMap() | Aplatissement | Annule le précédent | Recherche live |
mergeMap() | Aplatissement | Parallèle illimité | Upload multiple |
concatMap() | Aplatissement | Séquentiel | Sauvegardes ordonnées |
exhaustMap() | Aplatissement | Ignore si en cours | Bouton submit |
debounceTime() | Timing | Émet après pause | Input utilisateur |
throttleTime() | Timing | Rate limiting | Scroll, like |
combineLatest() | Combinaison | Émet si un change | ViewModel multi-sources |
forkJoin() | Combinaison | Attend tous | Chargement parallèle |
catchError() | Erreurs | Intercepte les erreurs | Fallback HTTP |
takeUntilDestroyed() | Cycle de vie | Auto-unsubscribe | Composants Angular |