Back-end angularforall.com

- JWT : authentifier une API FastAPI/Angular

Fastapi Jwt Angular Python Authentification Securite Api-Rest Oauth2
JWT : authentifier une API FastAPI/Angular

Sécurisez votre API FastAPI avec JWT et intégrez l'authentification dans Angular : guide complet avec refresh tokens et guards.

Prérequis et architecture

Avant de plonger dans le code, il est important de comprendre les briques technologiques et le flux général de l'authentification JWT dans une architecture FastAPI + Angular. Cette approche est dite stateless : le serveur ne stocke pas de session — toute l'information d'identité est portée par le token lui-même.

Versions et dépendances requises

  • Python 3.11+ — runtime backend
  • FastAPI 0.110+ — framework API asynchrone
  • python-jose[cryptography] — encodage/décodage JWT
  • passlib[bcrypt] — hachage des mots de passe
  • SQLAlchemy 2.0+ — ORM pour la base de données
  • Angular 17+ — framework frontend (standalone components)
  • Node.js 18+ / Angular CLI 17+

Architecture et flux JWT

Le flux d'authentification suit ce chemin en quatre étapes. L'utilisateur soumet ses identifiants, le serveur valide et retourne deux tokens, le client stocke ces tokens et les envoie à chaque requête, puis le serveur valide le token avant de répondre.

Endpoint Méthode Description Auth requise
/auth/login POST Login — retourne access + refresh tokens Non
/auth/refresh POST Renouvelle l'access token via refresh token Non (refresh token)
/auth/logout POST Révoque le refresh token Oui (access token)
/users/me GET Retourne le profil de l'utilisateur connecté Oui (access token)
À retenir : Le token JWT est auto-porteur : il contient les informations de l'utilisateur (sub, exp, role) directement dans son payload. Le serveur n'a pas besoin de requêter la base pour valider chaque requête — il suffit de vérifier la signature.

Configurer FastAPI avec JWT

La première étape consiste à installer les dépendances et à créer le module de gestion des tokens. Toute la logique JWT est centralisée dans un fichier auth/jwt.py pour une meilleure maintenabilité.

Installation des dépendances

# Créer un environnement virtuel et l'activer
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate   # Windows

# Installer les packages nécessaires
pip install fastapi uvicorn[standard] python-jose[cryptography] passlib[bcrypt] sqlalchemy python-dotenv

# Générer le fichier requirements.txt
pip freeze > requirements.txt

Structure du projet FastAPI

fastapi-auth/
├── app/
│   ├── __init__.py
│   ├── main.py              # Point d'entrée FastAPI
│   ├── auth/
│   │   ├── __init__.py
│   │   ├── jwt.py           # Logique JWT (encode/decode)
│   │   ├── router.py        # Endpoints /auth/*
│   │   └── dependencies.py  # get_current_user
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py          # Modèle SQLAlchemy User
│   │   └── token.py         # Modèle SQLAlchemy RefreshToken
│   ├── schemas/
│   │   ├── user.py          # Pydantic schemas User
│   │   └── token.py         # Pydantic schemas Token
│   └── database.py          # Connexion SQLAlchemy
├── .env                     # Variables d'environnement
└── requirements.txt

Variables d'environnement (.env)

# .env — NE PAS commiter ce fichier en production
# Générer une clé sécurisée : python -c "import secrets; print(secrets.token_hex(32))"
SECRET_KEY=votre-cle-secrete-de-256-bits-minimum-a-changer
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
DATABASE_URL=sqlite:///./auth.db

Module JWT — auth/jwt.py

# app/auth/jwt.py — Gestion des tokens JWT (encodage, décodage, validation)
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status
import os

# Charger les paramètres depuis les variables d'environnement
SECRET_KEY = os.getenv("SECRET_KEY", "changez-moi-en-production")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
REFRESH_TOKEN_EXPIRE_DAYS = int(os.getenv("REFRESH_TOKEN_EXPIRE_DAYS", "7"))

# Contexte de hachage pour les mots de passe (bcrypt recommandé)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(password: str) -> str:
    """Hache un mot de passe en clair avec bcrypt."""
    return pwd_context.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Vérifie qu'un mot de passe correspond à son hash."""
    return pwd_context.verify(plain_password, hashed_password)


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """
    Génère un JWT access token signé avec HS256.
    - data : dict contenant au minimum {"sub": "user_id"}
    - expires_delta : durée de vie personnalisée (défaut: ACCESS_TOKEN_EXPIRE_MINUTES)
    """
    to_encode = data.copy()
    # Calculer la date d'expiration
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    # Ajouter l'expiration et le type dans le payload
    to_encode.update({"exp": expire, "type": "access"})
    # Encoder et signer le token
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def create_refresh_token(data: dict) -> str:
    """
    Génère un JWT refresh token avec durée de vie longue.
    Utilisé uniquement pour renouveler l'access token.
    """
    to_encode = data.copy()
    # Le refresh token dure plus longtemps que l'access token
    expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
    to_encode.update({"exp": expire, "type": "refresh"})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def verify_token(token: str, token_type: str = "access") -> dict:
    """
    Décode et valide un JWT.
    - Lève HTTPException 401 si le token est invalide ou expiré
    - Vérifie que le type correspond (access vs refresh)
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Token invalide ou expiré",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # Décoder le token avec la clé secrète
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # Vérifier que le type correspond à ce qu'on attend
        if payload.get("type") != token_type:
            raise credentials_exception
        # Extraire le sujet (user_id)
        sub: str = payload.get("sub")
        if sub is None:
            raise credentials_exception
        return payload
    except JWTError:
        # Toute erreur de décodage → token invalide
        raise credentials_exception
Note : L'algorithme HS256 (HMAC-SHA256) utilise une clé symétrique partagée. Pour des architectures microservices où plusieurs services doivent valider les tokens, préférez RS256 (asymétrique) avec une clé publique/privée.

Endpoints d'authentification

On définit maintenant les modèles de données (Pydantic et SQLAlchemy) puis les endpoints d'authentification. L'endpoint /auth/login est le coeur du système : il valide les identifiants et retourne les deux tokens.

Schémas Pydantic — schemas/token.py

# app/schemas/token.py — Modèles de validation des données (Pydantic v2)
from pydantic import BaseModel, EmailStr
from typing import Optional


class UserCreate(BaseModel):
    """Données requises pour créer un compte utilisateur."""
    email: EmailStr
    password: str
    full_name: Optional[str] = None


class UserResponse(BaseModel):
    """Données renvoyées au client (sans le mot de passe)."""
    id: int
    email: str
    full_name: Optional[str] = None
    is_active: bool

    # Permet la sérialisation depuis un objet SQLAlchemy
    class Config:
        from_attributes = True


class TokenResponse(BaseModel):
    """Réponse complète après une connexion réussie."""
    access_token: str
    refresh_token: str
    token_type: str = "bearer"


class RefreshRequest(BaseModel):
    """Corps de la requête pour renouveler l'access token."""
    refresh_token: str

Modèle SQLAlchemy — models/user.py

# app/models/user.py — Modèle de base de données pour les utilisateurs
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from app.database import Base


class User(Base):
    """Table 'users' : stocke les utilisateurs avec mot de passe haché."""
    __tablename__ = "users"

    # Clé primaire auto-incrémentée
    id = Column(Integer, primary_key=True, index=True)
    # Email unique et indexé pour des recherches rapides
    email = Column(String, unique=True, index=True, nullable=False)
    # NE JAMAIS stocker le mot de passe en clair
    hashed_password = Column(String, nullable=False)
    full_name = Column(String, nullable=True)
    # Permet de désactiver un compte sans le supprimer
    is_active = Column(Boolean, default=True)
    # Dates automatiques via SQLAlchemy
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

Router d'authentification — auth/router.py

# app/auth/router.py — Endpoints d'authentification FastAPI
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.database import get_db
from app.models.user import User
from app.auth.jwt import (
    hash_password, verify_password,
    create_access_token, create_refresh_token, verify_token
)
from app.schemas.token import UserCreate, UserResponse, TokenResponse, RefreshRequest

# Créer le router avec préfixe /auth
router = APIRouter(prefix="/auth", tags=["authentication"])


@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(user_data: UserCreate, db: Session = Depends(get_db)):
    """
    Créer un nouveau compte utilisateur.
    - Vérifie que l'email n'est pas déjà utilisé
    - Hache le mot de passe avant de le stocker
    """
    # Vérifier si l'email existe déjà
    existing_user = db.query(User).filter(User.email == user_data.email).first()
    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Un compte avec cet email existe déjà"
        )

    # Créer l'utilisateur avec le mot de passe haché
    new_user = User(
        email=user_data.email,
        hashed_password=hash_password(user_data.password),
        full_name=user_data.full_name,
    )
    db.add(new_user)
    db.commit()
    db.refresh(new_user)
    return new_user


@router.post("/login", response_model=TokenResponse)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db)
):
    """
    Authentifier un utilisateur et retourner les tokens JWT.
    - Utilise OAuth2PasswordRequestForm (username/password en form-data)
    - Retourne access_token (courte durée) + refresh_token (longue durée)
    """
    # Rechercher l'utilisateur par email (username = email ici)
    user = db.query(User).filter(User.email == form_data.username).first()

    # Vérification : utilisateur existant + mot de passe correct + compte actif
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Email ou mot de passe incorrect",
            headers={"WWW-Authenticate": "Bearer"},
        )

    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Compte désactivé"
        )

    # Créer les deux tokens avec le user_id comme sujet
    token_data = {"sub": str(user.id), "email": user.email}
    access_token = create_access_token(data=token_data)
    refresh_token = create_refresh_token(data=token_data)

    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token
    )


@router.get("/me", response_model=UserResponse)
async def get_current_user_profile(
    db: Session = Depends(get_db),
    current_user: User = Depends(get_current_user)
):
    """Retourne le profil de l'utilisateur actuellement connecté."""
    return current_user

Dépendance get_current_user — auth/dependencies.py

# app/auth/dependencies.py — Dépendance FastAPI pour extraire l'utilisateur du token
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from app.database import get_db
from app.models.user import User
from app.auth.jwt import verify_token

# OAuth2PasswordBearer extrait automatiquement le token du header "Authorization: Bearer "
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> User:
    """
    Dépendance FastAPI : extraire et valider l'utilisateur depuis le JWT.
    Usage : ajouter `current_user: User = Depends(get_current_user)` à un endpoint.
    """
    # Décoder et valider le token (lève HTTPException si invalide)
    payload = verify_token(token, token_type="access")
    user_id: str = payload.get("sub")

    # Chercher l'utilisateur en base avec l'ID extrait du token
    user = db.query(User).filter(User.id == int(user_id)).first()
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Utilisateur introuvable"
        )
    return user

Test avec curl

# 1. Créer un compte
curl -X POST http://localhost:8000/auth/register \
  -H "Content-Type: application/json" \
  -d '{"email": "user@exemple.com", "password": "motdepasse123"}'

# 2. Se connecter et récupérer les tokens
curl -X POST http://localhost:8000/auth/login \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "username=user@exemple.com&password=motdepasse123"

# 3. Accéder à une route protégée
curl -X GET http://localhost:8000/auth/me \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

Refresh tokens et révocation

Le refresh token permet de renouveler l'access token sans que l'utilisateur ait besoin de se reconnecter. Pour sécuriser ce mécanisme, on applique la rotation des refresh tokens : à chaque renouvellement, l'ancien refresh token est révoqué et un nouveau est émis.

À retenir : Ne stockez jamais les refresh tokens uniquement côté client sans révocation côté serveur. Un refresh token volé permettrait à un attaquant de maintenir un accès indéfini. Stockez-les en base et révoquez-les à la déconnexion.

Modèle RefreshToken en base — models/token.py

# app/models/token.py — Stockage des refresh tokens pour gestion de la révocation
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.sql import func
from app.database import Base


class RefreshToken(Base):
    """
    Table 'refresh_tokens' : stocke les refresh tokens actifs.
    Permet la révocation à la déconnexion et la rotation sécurisée.
    """
    __tablename__ = "refresh_tokens"

    id = Column(Integer, primary_key=True, index=True)
    # Le token JWT refresh (haché pour plus de sécurité)
    token = Column(String, unique=True, index=True, nullable=False)
    # Référence vers l'utilisateur propriétaire
    user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
    # Permet de révoquer sans supprimer (audit possible)
    is_revoked = Column(Boolean, default=False)
    # Date de création pour le nettoyage automatique des tokens expirés
    created_at = Column(DateTime(timezone=True), server_default=func.now())

Endpoint de refresh — auth/router.py (suite)

# Ajouter dans app/auth/router.py

@router.post("/refresh", response_model=TokenResponse)
async def refresh_access_token(
    request: RefreshRequest,
    db: Session = Depends(get_db)
):
    """
    Renouveler l'access token avec un refresh token valide.
    Applique la rotation : l'ancien refresh token est révoqué, un nouveau est créé.
    """
    from app.models.token import RefreshToken

    # Valider le refresh token (signature + expiration + type)
    payload = verify_token(request.refresh_token, token_type="refresh")
    user_id = int(payload.get("sub"))

    # Vérifier que le refresh token existe et n'est pas révoqué en base
    db_token = db.query(RefreshToken).filter(
        RefreshToken.token == request.refresh_token,
        RefreshToken.user_id == user_id,
        RefreshToken.is_revoked == False
    ).first()

    if not db_token:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Refresh token révoqué ou invalide"
        )

    # Révoquer l'ancien refresh token (rotation)
    db_token.is_revoked = True
    db.commit()

    # Récupérer l'utilisateur pour créer les nouveaux tokens
    user = db.query(User).filter(User.id == user_id).first()
    token_data = {"sub": str(user.id), "email": user.email}

    # Créer un nouveau pair de tokens
    new_access_token = create_access_token(data=token_data)
    new_refresh_token = create_refresh_token(data=token_data)

    # Stocker le nouveau refresh token en base
    new_db_token = RefreshToken(token=new_refresh_token, user_id=user_id)
    db.add(new_db_token)
    db.commit()

    return TokenResponse(
        access_token=new_access_token,
        refresh_token=new_refresh_token
    )


@router.post("/logout", status_code=status.HTTP_204_NO_CONTENT)
async def logout(
    request: RefreshRequest,
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    """
    Déconnecter l'utilisateur en révoquant son refresh token.
    L'access token reste valide jusqu'à son expiration (durée courte).
    """
    from app.models.token import RefreshToken

    # Révoquer le refresh token fourni
    db.query(RefreshToken).filter(
        RefreshToken.token == request.refresh_token,
        RefreshToken.user_id == current_user.id
    ).update({"is_revoked": True})
    db.commit()

Intégration Angular : AuthService

Côté Angular, on centralise toute la logique d'authentification dans un AuthService. Ce service gère le stockage des tokens, les appels HTTP de login/logout/refresh, et expose un BehaviorSubject réactif pour que les composants sachent en temps réel si l'utilisateur est connecté.

Interfaces TypeScript

// src/app/auth/auth.models.ts — Typage fort pour les échanges avec l'API
export interface LoginCredentials {
  username: string;  // Email de l'utilisateur
  password: string;
}

export interface AuthTokens {
  access_token: string;
  refresh_token: string;
  token_type: string;
}

export interface UserProfile {
  id: number;
  email: string;
  full_name?: string;
  is_active: boolean;
}

// Payload décodé d'un JWT (sans vérification de signature)
export interface JwtPayload {
  sub: string;    // user_id sous forme de string
  email: string;
  exp: number;    // timestamp Unix d'expiration
  type: 'access' | 'refresh';
}

AuthService complet

// src/app/auth/auth.service.ts — Service central d'authentification Angular
import { Injectable } from '@angular/core';
import { HttpClient, HttpHeaders } from '@angular/common/http';
import { BehaviorSubject, Observable, throwError } from 'rxjs';
import { tap, catchError, map } from 'rxjs/operators';
import { Router } from '@angular/router';
import { AuthTokens, LoginCredentials, UserProfile, JwtPayload } from './auth.models';

@Injectable({ providedIn: 'root' })
export class AuthService {
  // URL de base de l'API FastAPI
  private readonly API_URL = 'http://localhost:8000';

  // BehaviorSubject : émet true si connecté, false sinon
  // Initialisé selon la présence d'un token valide au démarrage
  private isAuthenticatedSubject = new BehaviorSubject(this.hasValidToken());

  // Observable public consommé par les composants et guards
  isAuthenticated$ = this.isAuthenticatedSubject.asObservable();

  constructor(private http: HttpClient, private router: Router) {}

  /**
   * Connexion : envoie les credentials en form-data (requis par OAuth2PasswordRequestForm)
   * et stocke les tokens retournés.
   */
  login(credentials: LoginCredentials): Observable {
    // OAuth2PasswordRequestForm attend application/x-www-form-urlencoded
    const body = new URLSearchParams();
    body.set('username', credentials.username);
    body.set('password', credentials.password);

    const headers = new HttpHeaders({ 'Content-Type': 'application/x-www-form-urlencoded' });

    return this.http.post(`${this.API_URL}/auth/login`, body.toString(), { headers }).pipe(
      tap(tokens => {
        // Stocker les deux tokens après connexion réussie
        this.storeTokens(tokens);
        // Notifier tous les abonnés que l'utilisateur est connecté
        this.isAuthenticatedSubject.next(true);
      }),
      catchError(err => {
        console.error('Erreur de connexion:', err);
        return throwError(() => err);
      })
    );
  }

  /**
   * Renouvellement de l'access token via le refresh token.
   * Appelé automatiquement par l'intercepteur quand le serveur retourne 401.
   */
  refreshToken(): Observable {
    const refreshToken = this.getRefreshToken();
    if (!refreshToken) {
      // Pas de refresh token disponible → déconnecter
      this.logout();
      return throwError(() => new Error('Aucun refresh token disponible'));
    }

    return this.http.post(`${this.API_URL}/auth/refresh`, {
      refresh_token: refreshToken
    }).pipe(
      tap(tokens => {
        // Mettre à jour les tokens après rotation
        this.storeTokens(tokens);
      }),
      catchError(err => {
        // Refresh échoué → déconnecter l'utilisateur
        this.logout();
        return throwError(() => err);
      })
    );
  }

  /**
   * Déconnexion : révoque le refresh token côté serveur,
   * supprime les tokens locaux et redirige vers /login.
   */
  logout(): void {
    const refreshToken = this.getRefreshToken();

    if (refreshToken) {
      // Tenter de révoquer le refresh token côté serveur
      this.http.post(`${this.API_URL}/auth/logout`, { refresh_token: refreshToken })
        .pipe(catchError(() => throwError(() => null)))
        .subscribe();
    }

    // Nettoyer le stockage local dans tous les cas
    localStorage.removeItem('access_token');
    localStorage.removeItem('refresh_token');
    this.isAuthenticatedSubject.next(false);
    this.router.navigate(['/login']);
  }

  /** Retourne le profil de l'utilisateur connecté depuis l'API. */
  getCurrentUser(): Observable {
    return this.http.get(`${this.API_URL}/auth/me`);
  }

  /** Récupère l'access token stocké (pour l'intercepteur). */
  getAccessToken(): string | null {
    return localStorage.getItem('access_token');
  }

  /** Récupère le refresh token stocké. */
  getRefreshToken(): string | null {
    return localStorage.getItem('refresh_token');
  }

  /** Stocke les deux tokens dans localStorage. */
  private storeTokens(tokens: AuthTokens): void {
    localStorage.setItem('access_token', tokens.access_token);
    localStorage.setItem('refresh_token', tokens.refresh_token);
  }

  /**
   * Vérifie si un access token valide est présent localement.
   * Décode le payload JWT sans vérification de signature (côté client uniquement).
   */
  hasValidToken(): boolean {
    const token = localStorage.getItem('access_token');
    if (!token) return false;

    try {
      // Le JWT a 3 parties séparées par des points : header.payload.signature
      const payloadBase64 = token.split('.')[1];
      // Décoder le payload Base64 URL-encoded
      const payload: JwtPayload = JSON.parse(atob(payloadBase64.replace(/-/g, '+').replace(/_/g, '/')));
      // Vérifier l'expiration (exp est en secondes, Date.now() en millisecondes)
      return payload.exp * 1000 > Date.now();
    } catch {
      // Token malformé → non valide
      return false;
    }
  }
}

HTTP Interceptor et Route Guards

L'intercepteur HTTP ajoute automatiquement le token JWT à chaque requête sortante. Si le serveur répond 401 (token expiré), il tente un refresh silencieux avant de relancer la requête originale. Le Route Guard protège les routes privées en vérifiant l'état d'authentification.

HTTP Interceptor fonctionnel (Angular 17)

// src/app/auth/auth.interceptor.ts — Intercepteur HTTP fonctionnel (Angular 17+)
import { HttpInterceptorFn, HttpErrorResponse, HttpRequest, HttpHandlerFn } from '@angular/common/http';
import { inject } from '@angular/core';
import { catchError, switchMap, throwError } from 'rxjs';
import { AuthService } from './auth.service';

export const authInterceptor: HttpInterceptorFn = (req: HttpRequest<unknown>, next: HttpHandlerFn) => {
  // Injecter le service d'auth (injection fonctionnelle Angular 17)
  const authService = inject(AuthService);

  // Récupérer l'access token courant
  const token = authService.getAccessToken();

  // Cloner la requête et ajouter le header Authorization si token présent
  // On ne clone pas si c'est déjà une requête d'auth (éviter les boucles)
  const authReq = token && !req.url.includes('/auth/login') && !req.url.includes('/auth/register')
    ? req.clone({ setHeaders: { Authorization: `Bearer ${token}` } })
    : req;

  return next(authReq).pipe(
    catchError((error: HttpErrorResponse) => {
      // Si 401 sur une route non-auth → tenter un refresh automatique
      if (error.status === 401 && !req.url.includes('/auth/')) {
        return authService.refreshToken().pipe(
          switchMap(() => {
            // Après refresh réussi, relancer la requête originale avec le nouveau token
            const newToken = authService.getAccessToken();
            const retryReq = req.clone({
              setHeaders: { Authorization: `Bearer ${newToken}` }
            });
            return next(retryReq);
          }),
          catchError(refreshError => {
            // Le refresh a échoué → déconnecter l'utilisateur
            authService.logout();
            return throwError(() => refreshError);
          })
        );
      }
      // Autres erreurs HTTP → propager l'erreur normalement
      return throwError(() => error);
    })
  );
};

Route Guard — auth.guard.ts

// src/app/auth/auth.guard.ts — Guard fonctionnel pour protéger les routes privées
import { inject } from '@angular/core';
import { CanActivateFn, Router } from '@angular/router';
import { AuthService } from './auth.service';

/**
 * Guard fonctionnel (Angular 17+) : empêche l'accès aux routes protégées
 * si l'utilisateur n'est pas authentifié et redirige vers /login.
 */
export const authGuard: CanActivateFn = (route, state) => {
  const authService = inject(AuthService);
  const router = inject(Router);

  // Vérifier si un token valide est présent localement
  if (authService.hasValidToken()) {
    return true;  // Accès autorisé
  }

  // Rediriger vers /login avec l'URL cible pour revenir après connexion
  return router.createUrlTree(['/login'], {
    queryParams: { returnUrl: state.url }
  });
};

Configuration app.config.ts (Standalone)

// src/app/app.config.ts — Configuration de l'application Angular standalone
import { ApplicationConfig } from '@angular/core';
import { provideRouter } from '@angular/router';
import { provideHttpClient, withInterceptors } from '@angular/common/http';
import { routes } from './app.routes';
import { authInterceptor } from './auth/auth.interceptor';

export const appConfig: ApplicationConfig = {
  providers: [
    // Configurer le router avec les routes définies
    provideRouter(routes),
    // Configurer HttpClient avec l'intercepteur JWT
    provideHttpClient(
      withInterceptors([authInterceptor])
    ),
  ],
};

Définition des routes — app.routes.ts

// src/app/app.routes.ts — Définition des routes avec guards
import { Routes } from '@angular/router';
import { authGuard } from './auth/auth.guard';

export const routes: Routes = [
  // Route publique : page de connexion
  {
    path: 'login',
    loadComponent: () => import('./pages/login/login.component').then(m => m.LoginComponent)
  },
  // Route privée : tableau de bord (protégée par authGuard)
  {
    path: 'dashboard',
    loadComponent: () => import('./pages/dashboard/dashboard.component').then(m => m.DashboardComponent),
    canActivate: [authGuard]
  },
  // Route privée : profil utilisateur
  {
    path: 'profile',
    loadComponent: () => import('./pages/profile/profile.component').then(m => m.ProfileComponent),
    canActivate: [authGuard]
  },
  // Redirection par défaut
  { path: '', redirectTo: '/dashboard', pathMatch: 'full' },
  { path: '**', redirectTo: '/login' }
];

Composant de login

// src/app/pages/login/login.component.ts — Page de connexion Angular
import { Component } from '@angular/core';
import { CommonModule } from '@angular/common';
import { ReactiveFormsModule, FormBuilder, FormGroup, Validators } from '@angular/forms';
import { Router, ActivatedRoute } from '@angular/router';
import { AuthService } from '../../auth/auth.service';

@Component({
  selector: 'app-login',
  standalone: true,
  imports: [CommonModule, ReactiveFormsModule],
  template: `
    <div class="container mt-5">
      <div class="row justify-content-center">
        <div class="col-md-6">
          <h2 class="mb-4 fw-bold">Connexion</h2>

          <!-- Message d'erreur Bootstrap -->
          <div *ngIf="errorMessage" class="alert alert-danger" role="alert">
            {{ errorMessage }}
          </div>

          <form [formGroup]="loginForm" (ngSubmit)="onSubmit()">
            <div class="mb-3">
              <label for="email" class="form-label">Email</label>
              <input id="email" type="email" class="form-control"
                     formControlName="email" placeholder="vous@exemple.com"
                     [class.is-invalid]="submitted && f['email'].errors">
              <div class="invalid-feedback" *ngIf="submitted && f['email'].errors">
                Email invalide
              </div>
            </div>

            <div class="mb-3">
              <label for="password" class="form-label">Mot de passe</label>
              <input id="password" type="password" class="form-control"
                     formControlName="password"
                     [class.is-invalid]="submitted && f['password'].errors">
              <div class="invalid-feedback" *ngIf="submitted && f['password'].errors">
                Mot de passe requis (min. 6 caractères)
              </div>
            </div>

            <button type="submit" class="btn btn-primary w-100" [disabled]="loading">
              {{ loading ? 'Connexion...' : 'Se connecter' }}
            </button>
          </form>
        </div>
      </div>
    </div>
  `
})
export class LoginComponent {
  loginForm: FormGroup;
  submitted = false;
  loading = false;
  errorMessage = '';
  private returnUrl = '/dashboard';

  constructor(
    private fb: FormBuilder,
    private authService: AuthService,
    private router: Router,
    private route: ActivatedRoute
  ) {
    // Récupérer l'URL de retour depuis les query params si disponible
    this.returnUrl = this.route.snapshot.queryParams['returnUrl'] || '/dashboard';

    // Construire le formulaire avec validations
    this.loginForm = this.fb.group({
      email: ['', [Validators.required, Validators.email]],
      password: ['', [Validators.required, Validators.minLength(6)]]
    });
  }

  /** Raccourci pour accéder aux contrôles du formulaire dans le template. */
  get f() { return this.loginForm.controls; }

  onSubmit(): void {
    this.submitted = true;
    if (this.loginForm.invalid) return;

    this.loading = true;
    this.errorMessage = '';

    this.authService.login({
      username: this.f['email'].value,
      password: this.f['password'].value
    }).subscribe({
      next: () => {
        // Connexion réussie → rediriger vers l'URL cible
        this.router.navigateByUrl(this.returnUrl);
      },
      error: (err) => {
        // Afficher le message d'erreur du serveur
        this.errorMessage = err.error?.detail || 'Erreur de connexion. Vérifiez vos identifiants.';
        this.loading = false;
      }
    });
  }
}

Tests et sécurité en production

Un système d'authentification sans tests est une prise de risque. On couvre ici les tests essentiels pour FastAPI avec pytest, les points de sécurité critiques à vérifier avant mise en production, et un exemple de Docker Compose pour le déploiement.

Tests pytest pour FastAPI

# tests/test_auth.py — Tests d'intégration pour l'authentification FastAPI
import pytest
from fastapi.testclient import TestClient
from app.main import app

# Client de test FastAPI (synchrone, pas besoin d'uvicorn)
client = TestClient(app)


def test_register_new_user():
    """Test : créer un nouveau compte utilisateur."""
    response = client.post("/auth/register", json={
        "email": "test@exemple.com",
        "password": "motdepasse123",
        "full_name": "Test User"
    })
    # Vérifier que la création retourne 201 Created
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@exemple.com"
    # Le mot de passe ne doit JAMAIS être retourné dans la réponse
    assert "hashed_password" not in data
    assert "password" not in data


def test_login_success():
    """Test : connexion réussie avec credentials valides."""
    response = client.post("/auth/login", data={
        "username": "test@exemple.com",
        "password": "motdepasse123"
    })
    assert response.status_code == 200
    data = response.json()
    # Vérifier que les deux tokens sont présents
    assert "access_token" in data
    assert "refresh_token" in data
    assert data["token_type"] == "bearer"


def test_login_wrong_password():
    """Test : connexion avec mauvais mot de passe → 401."""
    response = client.post("/auth/login", data={
        "username": "test@exemple.com",
        "password": "mauvaismdp"
    })
    assert response.status_code == 401


def test_protected_route_without_token():
    """Test : accès à une route protégée sans token → 401."""
    response = client.get("/auth/me")
    assert response.status_code == 401


def test_protected_route_with_valid_token():
    """Test : accès à /auth/me avec token valide → profil retourné."""
    # D'abord se connecter pour obtenir un token
    login_response = client.post("/auth/login", data={
        "username": "test@exemple.com",
        "password": "motdepasse123"
    })
    token = login_response.json()["access_token"]

    # Accéder à la route protégée avec le token
    response = client.get("/auth/me", headers={"Authorization": f"Bearer {token}"})
    assert response.status_code == 200
    assert response.json()["email"] == "test@exemple.com"


def test_invalid_token():
    """Test : token malformé ou falsifié → 401."""
    response = client.get("/auth/me", headers={"Authorization": "Bearer token-invalide-123"})
    assert response.status_code == 401

Checklist sécurité production

  • SECRET_KEY d'au moins 256 bits, générée aléatoirement (python -c "import secrets; print(secrets.token_hex(32))")
  • HTTPS obligatoire en production (certificat SSL Let's Encrypt)
  • CORS configuré strictement avec la liste blanche des domaines autorisés
  • Rate limiting sur /auth/login pour prévenir le brute-force (slowapi)
  • Durée d'expiration courte pour l'access token (15-30 minutes maximum)
  • Rotation des refresh tokens à chaque renouvellement
  • Logs des tentatives de connexion échouées
  • Variables d'environnement dans un fichier .env non commité
  • Dépendances maintenues à jour (python-jose, passlib)
  • Headers de sécurité HTTP (X-Content-Type-Options, Strict-Transport-Security)

Rate limiting avec slowapi

# app/main.py — Exemple de rate limiting sur l'endpoint de login
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

# Limiter basé sur l'adresse IP du client
limiter = Limiter(key_func=get_remote_address)
app = FastAPI(title="FastAPI Auth JWT")
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# Appliquer le rate limiting dans le router auth
@router.post("/login", response_model=TokenResponse)
@limiter.limit("5/minute")  # Maximum 5 tentatives par minute par IP
async def login(
    request: Request,  # Requis par slowapi pour extraire l'IP
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db)
):
    # ... logique de login inchangée
    pass

Docker Compose — FastAPI + PostgreSQL

# docker-compose.yml — Stack complète FastAPI + PostgreSQL
version: '3.8'

services:
  # Base de données PostgreSQL
  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: fastapi_auth
      POSTGRES_USER: fastapi_user
      POSTGRES_PASSWORD: motdepasse_securise
    ports:
      - "5432:5432"
    volumes:
      # Persistance des données entre redémarrages
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U fastapi_user"]
      interval: 10s
      timeout: 5s
      retries: 5

  # API FastAPI
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      # Variables sensibles à externaliser dans un .env en production
      SECRET_KEY: votre-cle-secrete-256-bits
      DATABASE_URL: postgresql://fastapi_user:motdepasse_securise@postgres:5432/fastapi_auth
      ALGORITHM: HS256
      ACCESS_TOKEN_EXPIRE_MINUTES: 30
    depends_on:
      postgres:
        condition: service_healthy
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000

volumes:
  postgres_data:
Note : En production, ne mettez jamais les secrets directement dans docker-compose.yml. Utilisez docker secret, des variables d'environnement injectées par votre CI/CD, ou un gestionnaire de secrets comme HashiCorp Vault ou AWS Secrets Manager.

Conclusion

Vous disposez maintenant d'un système d'authentification JWT complet et sécurisé entre FastAPI et Angular. Côté backend, la combinaison FastAPI + python-jose + passlib offre une implémentation propre, performante et conforme aux standards OAuth2. Côté frontend, l'AuthService avec BehaviorSubject, l'intercepteur fonctionnel et les Route Guards forment une architecture réactive et maintenable.

La rotation des refresh tokens, le rate limiting et la checklist sécurité vous permettent de passer en production avec confiance. L'étape suivante naturelle est d'ajouter le contrôle d'accès basé sur les rôles (RBAC) en enrichissant le payload JWT avec un champ role et en créant des guards Angular supplémentaires.

À retenir : Ne stockez jamais les refresh tokens uniquement côté client. Implémentez toujours la révocation côté serveur pour pouvoir invalider les sessions compromises, et utilisez des cookies HttpOnly pour encore plus de sécurité en production.

Partager