Type something to search...
API Security in FastAPI: JWT, Rate Limiting e CORS

API Security in FastAPI: JWT, Rate Limiting e CORS

API Security in FastAPI: JWT, Rate Limiting e CORS

JWT ben configurato, bcrypt, CORS per environment, rate limiting e dependency di autenticazione — lo stack minimo per un’API che non fa imbarazzare


⏱️ 16 minuti di lettura | 💻 Codice completo su GitHub → | Livello: Intermedio


TL;DR

Il problema: la maggior parte dei tutorial FastAPI mostra solo JWT e si ferma. I problemi reali arrivano dagli altri layer.

Lo stack corretto:

  • JWT — PyJWT, access token breve (15 min) + refresh token, secret da env
  • Password hashing — passlib con bcrypt, cost factor >= 12
  • CORS — origins espliciti per environment (non * in produzione)
  • Rate limiting — SlowAPI, sia sul login che sulle route sensibili
  • Dependencyget_current_user con Depends, 401 vs 403 usati correttamente
  • Header di sicurezzaX-Content-Type-Options, X-Frame-Options via middleware

JWT | Password hashing | CORS | Rate limiting | Errori comuni


Il Contesto

Tre domande su API security che mi sono sentito fare in colloqui tecnici:

  1. “Come gestite l’autenticazione? JWT, sessioni, qualcos’altro?”
  2. “Cosa succede se qualcuno fa brute force sul login?”
  3. “Come gestite CORS tra frontend e backend?”

Domande ragionevoli. La risposta onesta è che si tratta di layer separati, ognuno con i propri quirk. Questo articolo li copre uno per uno, con il codice effettivo che userai.

Il punto di partenza: security in FastAPI non è una singola feature da abilitare. È uno stack. Puoi avere JWT corretto e CORS aperto a tutto. Puoi avere bcrypt e token che non scadono mai. Ogni layer copre una superficie di attacco diversa.


1. JWT: Access Token + Refresh Token

Setup

pip install PyJWT passlib[bcrypt] python-multipart

JWT ha tre componenti: header, payload, signature. La signature dipende dal secret — chiunque abbia il secret può creare token validi. Due conseguenze pratiche:

  • Il secret deve venire dall’ambiente, mai hardcodato
  • Token compromessi non si possono invalidare lato server (a meno di una blacklist)

Configurazione

# app/config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    jwt_secret: str           # From env: JWT_SECRET
    jwt_algorithm: str = "HS256"
    access_token_expire_minutes: int = 15    # Short-lived
    refresh_token_expire_days: int = 7

    model_config = {"env_file": ".env"}

settings = Settings()
# .env (non committare mai questo file)
JWT_SECRET=genera-qualcosa-con-openssl-rand-hex-32

Creazione e verifica token

# app/auth/jwt.py
from datetime import datetime, timedelta, timezone
import jwt
from app.config import settings


def create_access_token(subject: str) -> str:
    """Create a short-lived access token (15 min by default)."""
    expire = datetime.now(timezone.utc) + timedelta(
        minutes=settings.access_token_expire_minutes
    )
    payload = {
        "sub": subject,
        "exp": expire,
        "type": "access",
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)


def create_refresh_token(subject: str) -> str:
    """Create a longer-lived refresh token (7 days by default)."""
    expire = datetime.now(timezone.utc) + timedelta(days=settings.refresh_token_expire_days)
    payload = {
        "sub": subject,
        "exp": expire,
        "type": "refresh",
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)


def decode_token(token: str) -> dict:
    """Decode and validate a JWT. Raises jwt.InvalidTokenError on failure."""
    return jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm])

Endpoint di login e refresh

# app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel

from app.auth.jwt import create_access_token, create_refresh_token, decode_token
from app.repositories.user_repository import UserRepository, get_user_repo
from app.services.user_service import UserService
import jwt

router = APIRouter(prefix="/auth", tags=["auth"])


def get_user_service(user_repo: UserRepository = Depends(get_user_repo)) -> UserService:
    return UserService(user_repo=user_repo)


class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"


class RefreshRequest(BaseModel):
    refresh_token: str


@router.post("/login", response_model=TokenResponse)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    user_service: UserService = Depends(get_user_service),
):
    user = await user_service.authenticate(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return TokenResponse(
        access_token=create_access_token(subject=str(user.id)),
        refresh_token=create_refresh_token(subject=str(user.id)),
    )


@router.post("/refresh", response_model=TokenResponse)
async def refresh(
    body: RefreshRequest,
    user_repo: UserRepository = Depends(get_user_repo),
):
    try:
        payload = decode_token(body.refresh_token)
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=401, detail="Invalid token type")
        user_id = payload["sub"]
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid or expired refresh token")

    # Verify the user still exists and is active — a valid token is not enough
    user = await user_repo.get(user_id)
    if user is None or not user.is_active:
        raise HTTPException(status_code=401, detail="User not found or inactive")

    return TokenResponse(
        access_token=create_access_token(subject=user_id),
        refresh_token=create_refresh_token(subject=user_id),
    )

Access token breve (15 minuti): se viene compromesso, scade presto. Refresh token più lungo (7 giorni): usato solo per ottenere nuovi access token, mai per accedere alle route.

Il check sull’utente nel refresh è importante: senza di esso, un utente disabilitato o eliminato continua a ottenere access token validi per tutta la durata del refresh token.

Token Revocation

JWT è stateless per design: il server non tiene traccia dei token emessi. Revocare un token (es. al logout) richiede stato extra.

L’approccio più semplice è una blacklist dei jti (JWT ID): ogni token ha un ID univoco, e al logout quell’ID viene marcato come revocato.

# app/auth/jwt.py
import uuid as uuid_lib

def create_access_token(subject: str) -> str:
    expire = datetime.now(timezone.utc) + timedelta(
        minutes=settings.access_token_expire_minutes
    )
    payload = {
        "sub": subject,
        "exp": expire,
        "jti": str(uuid_lib.uuid4()),  # Unique token ID for revocation
        "type": "access",
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
# app/auth/revocation.py

# In-memory blacklist — single process only.
# Replace with Redis for multi-worker/multi-instance deployments.
_revoked: set[str] = set()

def revoke(jti: str) -> None:
    _revoked.add(jti)

def is_revoked(jti: str) -> bool:
    return jti in _revoked
# app/auth/dependencies.py — add revocation check
from app.auth.revocation import is_revoked

async def get_current_user(...) -> User:
    ...
    try:
        payload = decode_token(token)
        jti = payload.get("jti")
        if jti and is_revoked(jti):
            raise credentials_exception
        ...
# app/routers/auth.py — logout endpoint
@router.post("/logout", status_code=204)
async def logout(
    token: str = Depends(oauth2_scheme),
    current_user: User = Depends(get_current_user),
):
    payload = decode_token(token)
    jti = payload.get("jti")
    if jti:
        revoke(jti)

Nota su Redis: la blacklist in memoria funziona su un singolo processo. Con più worker (uvicorn --workers 4) o più pod, ogni processo ha la propria blacklist — un logout su worker A non viene visto da worker B. La soluzione è usare Redis come storage condiviso, con TTL pari alla scadenza del token (così la blacklist non cresce indefinitamente).


2. Password Hashing con bcrypt

Non SHA256. Non MD5. bcrypt.

bcrypt è lento per design: include un cost factor che rende il brute force computazionalmente costoso. Un hash bcrypt con cost 12 richiede ~300ms per essere calcolato — trascurabile per un singolo login, inaccettabile per un attaccante che prova milioni di password.

# app/auth/password.py
from passlib.context import CryptContext

# Cost factor 12: ~300ms per hash on modern hardware
# Increase to 13-14 for high-security applications
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto", bcrypt__rounds=12)


def hash_password(plain: str) -> str:
    return pwd_context.hash(plain)


def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)
# app/services/user_service.py
from app.auth.password import hash_password, verify_password
from app.repositories.user_repository import UserRepository
from app.models import User


class UserService:
    def __init__(self, user_repo: UserRepository):
        self.user_repo = user_repo

    async def create_user(self, username: str, password: str) -> User:
        hashed = hash_password(password)
        return await self.user_repo.create(username=username, hashed_password=hashed)

    async def authenticate(self, username: str, password: str) -> User | None:
        user = await self.user_repo.get_by_username(username)
        if user is None:
            # Timing attack mitigation: always run verify even if user not found
            # Use a well-formed bcrypt hash — passlib raises on invalid format
            verify_password("dummy", "$2b$12$KIXnotarealpasswordhashXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
            return None
        if not verify_password(password, user.hashed_password):
            return None
        return user

Il commento sul timing attack è importante: se restituisci subito None quando l’utente non esiste, un attaccante può misurare il tempo di risposta per dedurre se uno username è valido. Eseguire comunque verify_password (anche su hash fittizio) uniformizza i tempi.


3. Dependency di Autenticazione

La dependency get_current_user è il punto centrale: tutte le route protette la usano tramite Depends.

# app/auth/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import jwt

from app.auth.jwt import decode_token
from app.repositories.user_repository import UserRepository, get_user_repo
from app.models import User

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")


async def get_current_user(
    token: str = Depends(oauth2_scheme),
    user_repo: UserRepository = Depends(get_user_repo),
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = decode_token(token)
        if payload.get("type") != "access":
            raise credentials_exception
        user_id: str = payload.get("sub")
        if user_id is None:
            raise credentials_exception
    except jwt.InvalidTokenError:
        raise credentials_exception

    user = await user_repo.get(user_id)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: User = Depends(get_current_user),
) -> User:
    if not current_user.is_active:
        # 403, not 401: the token is valid, but the user is forbidden
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user")
    return current_user

401 vs 403

La distinzione conta:

  • 401 Unauthorized — il client non si è autenticato (token mancante, scaduto, invalido)
  • 403 Forbidden — il client è autenticato ma non ha i permessi per quella risorsa

Restituire 403 su token invalido è sbagliato: dice al client che esiste la risorsa ma non ha i permessi, invece di dire che non è autenticato.

# app/routers/users.py
from fastapi import APIRouter, Depends
from app.auth.dependencies import get_current_active_user
from app.models import User

router = APIRouter(prefix="/users", tags=["users"])


@router.get("/me")
async def get_profile(current_user: User = Depends(get_current_active_user)):
    return {"id": str(current_user.id), "username": current_user.username}

4. CORS: Dev vs Produzione

CORS (Cross-Origin Resource Sharing) controlla quali origini possono fare richieste all’API dal browser. allow_origins=["*"] in produzione permette a qualsiasi sito di fare chiamate autenticate alla tua API.

# app/main.py
import os
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# Read allowed origins from environment
ALLOWED_ORIGINS = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000").split(",")

app.add_middleware(
    CORSMiddleware,
    allow_origins=ALLOWED_ORIGINS,
    allow_credentials=True,          # Required for cookies/auth headers
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["Authorization", "Content-Type"],
)
# .env.development
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:5173

# .env.production
ALLOWED_ORIGINS=https://myapp.com,https://www.myapp.com

allow_credentials=True con allow_origins=["*"] non funziona: il browser blocca le credenziali su wildcard per design. Devi specificare gli origins esplicitamente se usi cookies o header Authorization.


5. Rate Limiting con SlowAPI

pip install slowapi

Rate limiting senza un endpoint di login protetto significa che chiunque può tentare password infinite. Rate limiting solo sul login e non sulle API sensibili è parimenti incompleto.

# app/main.py
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# app/routers/auth.py
from fastapi import Request
from app.main import limiter


@router.post("/login", response_model=TokenResponse)
@limiter.limit("5/minute")   # 5 attempts per minute per IP
async def login(
    request: Request,        # Required by SlowAPI
    form_data: OAuth2PasswordRequestForm = Depends(),
    user_service: UserService = Depends(get_user_service),
):
    ...

Attenzione: SlowAPI usa contatori in memoria per default. Con più worker (uvicorn --workers 4) o più pod in Kubernetes, ogni processo ha il proprio contatore — il limite effettivo diventa limite × numero_worker. Per deployment multi-processo, configura un backend Redis:

limiter = Limiter(
    key_func=get_remote_address,
    storage_uri="redis://localhost:6379",  # Shared state across all workers
)

Per route protette con operazioni costose (es. export, ricerche pesanti), aggiungi un limite per utente autenticato:

def get_user_id_for_rate_limit(request: Request) -> str:
    """Rate limit by authenticated user ID instead of IP."""
    # Extract user ID from JWT without going through full Depends chain
    token = request.headers.get("Authorization", "").removeprefix("Bearer ")
    if token:
        try:
            payload = decode_token(token)
            return payload.get("sub", get_remote_address(request))
        except Exception:
            pass
    return get_remote_address(request)


user_limiter = Limiter(key_func=get_user_id_for_rate_limit)


@router.get("/export")
@user_limiter.limit("10/hour")
async def export_data(request: Request, current_user: User = Depends(get_current_active_user)):
    ...

6. Header di Sicurezza

Headers che il browser usa per difendersi da attacchi comuni. Non sostituiscono la logica di sicurezza, ma aggiungono un layer di difesa gratuito.

HeaderCosa fa
X-Content-Type-Options: nosniffImpedisce al browser di indovinare il MIME type — blocca attacchi di MIME sniffing
X-Frame-Options: DENYImpedisce che la pagina venga embedded in un <iframe> — blocca clickjacking
X-XSS-Protection: 1; mode=blockAttiva il filtro XSS nei browser vecchi (IE, Safari legacy). I browser moderni lo ignorano ma è innocuo
Referrer-Policy: strict-origin-when-cross-originLimita le informazioni passate nell’header Referer — non espone path/query su richieste cross-origin
Permissions-PolicyDisabilita esplicitamente API del browser che l’app non usa (geolocation, microfono, camera)
# app/middleware/security_headers.py
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request


class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        response.headers["Permissions-Policy"] = "geolocation=(), microphone=()"
        return response
# app/main.py
from app.middleware.security_headers import SecurityHeadersMiddleware

app.add_middleware(SecurityHeadersMiddleware)

Errori Comuni

1. JWT secret hardcodato o corto

# ❌ Hardcodato e prevedibile
SECRET_KEY = "mysecretkey"
SECRET_KEY = "secret"

# ✅ Da environment, minimo 32 bytes
import secrets
# Genera con: openssl rand -hex 32
# Oppure: python -c "import secrets; print(secrets.token_hex(32))"
SECRET_KEY = os.getenv("JWT_SECRET")
if not SECRET_KEY or len(SECRET_KEY) < 32:
    raise RuntimeError("JWT_SECRET must be set and at least 32 characters")

Un secret corto può essere bruteforced offline se qualcuno intercetta un token.

2. CORS wildcard in produzione

# ❌ Qualsiasi sito può fare chiamate autenticate alla tua API
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True)

# ✅ Origini esplicite per environment
ALLOWED_ORIGINS = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000").split(",")
app.add_middleware(CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_credentials=True)

3. Token senza scadenza

# ❌ Token valido per sempre
payload = {"sub": user_id}

# ✅ Scadenza breve + refresh token
payload = {
    "sub": user_id,
    "exp": datetime.now(timezone.utc) + timedelta(minutes=15),
    "type": "access",
}

Un access token senza scadenza è equivalente a una password permanente che non puoi revocare.

4. 401 e 403 invertiti

# ❌ 403 su token invalido: dice che la risorsa esiste ma sei forbidden
raise HTTPException(status_code=403, detail="Invalid token")

# ✅ 401 su credenziali invalide, 403 su permessi insufficienti
raise HTTPException(
    status_code=401,
    detail="Could not validate credentials",
    headers={"WWW-Authenticate": "Bearer"},
)

5. Rate limiting solo sul login

# ❌ Solo il login è protetto
@router.post("/login")
@limiter.limit("5/minute")
async def login(...): ...

# Routes sensibili esposte a brute force
@router.post("/password-reset")   # No limit: enumerate accounts trivially
@router.post("/verify-email")     # No limit

# ✅ Limitare anche le operazioni sensibili
@router.post("/password-reset")
@limiter.limit("3/hour")
async def password_reset(request: Request, ...): ...

Esempio Completo: main.py

# app/main.py
import os
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

from app.middleware.security_headers import SecurityHeadersMiddleware
from app.routers import auth, users

# Rate limiter
limiter = Limiter(key_func=get_remote_address)

app = FastAPI(title="Secure FastAPI")

# Security headers
app.add_middleware(SecurityHeadersMiddleware)

# CORS
ALLOWED_ORIGINS = os.getenv("ALLOWED_ORIGINS", "http://localhost:3000").split(",")
app.add_middleware(
    CORSMiddleware,
    allow_origins=ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["Authorization", "Content-Type"],
)

# Rate limiting
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# Routers
app.include_router(auth.router)
app.include_router(users.router)


@app.get("/health")
async def health():
    return {"status": "ok"}

Conclusione

Next Steps

git clone https://github.com/layerbylayer-blog/fastapi-security-guide
cd fastapi-security-guide
pip install -r requirements-dev.txt
pytest -v

Nel tuo progetto esistente:

  1. Verifica che JWT_SECRET venga dall’ambiente e sia lungo almeno 32 caratteri
  2. Controlla la scadenza dei token — se non c’è exp nel payload, aggiungila
  3. Sostituisci allow_origins=["*"] con origins espliciti se usi allow_credentials=True
  4. Aggiungi rate limiting al login e alle route che toccano dati sensibili

Articolo correlato: Testing in FastAPI: PyTest, Fixtures e Async — come testare endpoint autenticati con dependency_overrides.


Risorse

GitHub: fastapi-security-guide

Include:

  • ✅ Setup JWT completo (access + refresh token)
  • ✅ Password hashing con bcrypt
  • ✅ CORS per environment
  • ✅ Rate limiting con SlowAPI
  • ✅ Test per endpoint autenticati
  • ✅ Docker Compose con PostgreSQL

Documentazione:

Related Posts

Logging Strutturato con FastAPI e Loguru

Logging Strutturato con FastAPI e Loguru

Logging strutturato per FastAPI: debugging efficace in produzioneLogging strutturato per FastAPI: dal print() a Loguru con JSON, correlation ID e gestione eccezioni**⏱️ 13 minuti di lettu

read more
PostgreSQL JSONB: Schema Flessibile senza Migrazioni

PostgreSQL JSONB: Schema Flessibile senza Migrazioni

PostgreSQL JSONB: schema flessibile senza migrazioni continuePostgreSQL JSONB: il best of both worlds tra NoSQL flexibility e SQL power⏱️ 15 minuti di lettura | **💻 [Codice completo

read more
Sistema Email Completo per FastAPI con SendGrid

Sistema Email Completo per FastAPI con SendGrid

Come ho costruito un sistema email affidabile per FastAPISistema email completo per app FastAPI, da template HTML a deliverability in produzione⏱️ 12 minuti di lettura | **💻 [Codice

read more
Testing in FastAPI: PyTest, Fixtures e Async

Testing in FastAPI: PyTest, Fixtures e Async

Testing in FastAPI: PyTest, Fixtures e Asyncpytest + FastAPI + async SQLAlchemy: setup corretto, isolamento del database, dependency injection nei test⏱️ 14 minuti di lettura | **💻 [

read more
Repository Pattern in FastAPI: Quando e Perché Usarlo

Repository Pattern in FastAPI: Quando e Perché Usarlo

Repository Pattern in FastAPI: Quando e Perché UsarloSeparare business logic da data access: la guida che avrei voluto leggere prima di iniziare⏱️ 12 minuti di lettura | **💻 [Codice

read more