Type something to search...
Logging Strutturato con FastAPI e Loguru

Logging Strutturato con FastAPI e Loguru

Logging strutturato per FastAPI: debugging efficace in produzione

Logging strutturato per FastAPI: dal print() a Loguru con JSON, correlation ID e gestione eccezioni


⏱️ 13 minuti di lettura | 💻 Codice completo su GitHub → | 🎯 Livello: Intermedio


TL;DR

Cosa imparerai: Logging strutturato per FastAPI con Loguru, JSON output, correlation ID e gestione eccezioni

Stack:

  • ✅ Loguru (più semplice di logging standard)
  • ✅ Structured logging con JSON
  • ✅ Correlation IDs per tracciare request
  • ✅ Context enrichment automatico
  • ✅ Integration con Sentry/monitoring

Quando ti serve:

  • Debug in produzione (senza SSH nel server)
  • Tracciare user actions
  • Performance monitoring
  • Error tracking cross-service

Cosa sostituisce:

  • print("debug") ovunque
  • ❌ Logging non strutturato
  • ❌ Log che nessuno legge mai

Quick Start | Correlation IDs | Structured Logging


Il Problema: Debugging Senza Contesto

Quando un’app va in produzione, i log diventano l’unico strumento di debugging. Se i log non portano informazioni utili, ogni bug diventa un’indagine cieca.

Il pattern più comune è questo:

# app/services/order_service.py

async def process_order(self, order_id: UUID, user_id: UUID):
    print("Processing order")

    order = await self.order_repo.get(order_id)
    print(f"Order: {order}")

    if not order:
        print("Order not found!")
        raise ValueError("Order not found")

    print(f"User: {user_id}")

    # ... 50 lines later
    print("Done!")

Perché non funziona in produzione:

  1. print() non finisce nei log file — va su stdout, che molti ambienti non catturano
  2. Nessun contesto — quale utente? quale request? quale sessione?
  3. Nessun timestamp — quando è successo rispetto ad altri eventi?
  4. Nessun livello — errori e debug log sembrano identici
  5. Impossibile filtrare — con 10.000 righe di log, trovare una request specifica è manuale

Con logging strutturato, lo stesso codice produce:

{
  "timestamp": "2025-01-29T18:32:15.123Z",
  "level": "ERROR",
  "message": "Order not found",
  "order_id": "123e4567-e89b-12d3-a456-426614174000",
  "user_id": "987fcdeb-51a2-43e1-9876-ba9876543210",
  "correlation_id": "req_abc123",
  "service": "order_service",
  "function": "process_order"
}

Ogni evento è filtrabile, correlabile, aggregabile. Quello che prima richiedeva SSH e grep manuale, diventa una query sul log aggregator.


La Soluzione

Con logging strutturato:

# Same code, but with proper logging

async def process_order(self, order_id: UUID, user_id: UUID):
    logger.info(
        "Starting order processing",
        order_id=order_id,
        user_id=user_id
    )

    order = await self.order_repo.get(order_id)

    if not order:
        logger.error(
            "Order not found",
            order_id=order_id,
            user_id=user_id
        )
        raise ValueError("Order not found")

    logger.info(
        "Order processed successfully",
        order_id=order_id,
        user_id=user_id
    )

Risultato log (JSON structured):

{
  "timestamp": "2025-01-29T18:32:15.123Z",
  "level": "ERROR",
  "message": "Order not found",
  "order_id": "123e4567-e89b-12d3-a456-426614174000",
  "user_id": "987fcdeb-51a2-43e1-9876-ba9876543210",
  "correlation_id": "req_abc123",
  "service": "order_service",
  "function": "process_order"
}

Ora posso:

  • ✅ Filtrare per user_id
  • ✅ Vedere tutti i log di una request (correlation_id)
  • ✅ Sapere quando è successo
  • ✅ Capire il context completo

Perché Logging Standard Python è Problematico

Il Problema

# Standard library logging
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Verbose configuration
logger.info("User logged in")

# Ugly output
INFO:__main__:User logged in

Problemi:

  1. Setup verboso e complicato
  2. Output non strutturato
  3. Difficile aggiungere context
  4. Handler/formatter/logger confusionari
  5. No rotation automatica dei file

La Soluzione: Loguru

from loguru import logger

# Zero configuration!
logger.info("User logged in")

# Clean output
2025-01-29 18:32:15.123 | INFO     | __main__:10 - User logged in

Vantaggi:

  • ✅ Zero config, funziona subito
  • ✅ Output colorato e leggibile
  • ✅ Context facile da aggiungere
  • ✅ Rotation automatica file
  • ✅ Exception catching automatico
  • ✅ Async-safe out of the box

Quick Start (5 Minuti)

Step 1: Install Loguru (30 secondi)

pip install loguru python-json-logger

Step 2: Basic Setup (2 minuti)

# app/core/logging.py

from loguru import logger
import sys

def setup_logging(level: str = "INFO"):
    """Configure Loguru for FastAPI."""

    # Remove default handler
    logger.remove()

    # Console output (development)
    logger.add(
        sys.stdout,
        colorize=True,
        format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
        level=level
    )

    # File output (production)
    logger.add(
        "logs/app_{time:YYYY-MM-DD}.log",
        rotation="500 MB",  # Rotate when file > 500MB
        retention="10 days",  # Keep logs 10 days
        compression="zip",  # Compress old logs
        level=level,
        format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}"
    )

    logger.info("Logging configured successfully")

Step 3: Use in FastAPI (2 minuti)

# app/main.py

from fastapi import FastAPI
from app.core.logging import setup_logging
from loguru import logger

# Setup logging
setup_logging(level="INFO")

app = FastAPI()

@app.get("/")
async def root():
    logger.info("Root endpoint called")
    return {"message": "Hello World"}

@app.get("/users/{user_id}")
async def get_user(user_id: str):
    logger.info("Fetching user", user_id=user_id)

    # Simulate user fetch
    user = {"id": user_id, "name": "Test User"}

    logger.info("User fetched successfully", user_id=user_id)
    return user

Done! Hai logging funzionante.


Correlation IDs: Tracciare Request End-to-End

Il Problema

# Logs from 3 different requests mixed together

INFO: Starting quest completion
INFO: Fetching user
INFO: Quest not found
INFO: User fetched successfully
INFO: Starting quest completion

Quale log appartiene a quale request? Impossibile dirlo.

La Soluzione: Correlation ID

Un ID unico per ogni request che compare in tutti i log.

# app/middleware/correlation_id.py

import uuid
from contextvars import ContextVar
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
from loguru import logger

# Context variable for correlation ID
correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="")

class CorrelationIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Generate or extract correlation ID
        correlation_id = request.headers.get(
            "X-Correlation-ID",
            f"req_{uuid.uuid4().hex[:12]}"
        )

        # Set in context
        correlation_id_var.set(correlation_id)

        # Add to response headers
        response = await call_next(request)
        response.headers["X-Correlation-ID"] = correlation_id

        return response

Custom Logger con Context

# app/core/logging.py

from contextvars import ContextVar

correlation_id_var: ContextVar[str] = ContextVar("correlation_id", default="")
user_id_var: ContextVar[str] = ContextVar("user_id", default="")

def setup_logging():
    logger.remove()

    # Custom format with correlation_id
    def formatter(record):
        # Add context vars to record
        record["extra"]["correlation_id"] = correlation_id_var.get("")
        record["extra"]["user_id"] = user_id_var.get("")
        return record

    logger.add(
        sys.stdout,
        format="<green>{time}</green> | <level>{level}</level> | <cyan>req:{extra[correlation_id]}</cyan> | <yellow>user:{extra[user_id]}</yellow> | {message}",
        level="INFO"
    )

Usage

# app/routers/quests.py

from app.middleware.correlation_id import correlation_id_var, user_id_var
from loguru import logger

@router.post("/quests/{quest_id}/complete")
async def complete_quest(
    quest_id: UUID,
    current_user: User = Depends(get_current_user)
):
    # Set user context
    user_id_var.set(str(current_user.id))

    logger.info("Starting quest completion", quest_id=str(quest_id))

    try:
        result = await quest_service.complete_quest(quest_id, current_user.id)
        logger.info("Quest completed", quest_id=str(quest_id), xp_earned=result.xp)
        return result
    except Exception as e:
        logger.error("Quest completion failed", quest_id=str(quest_id), error=str(e))
        raise

Output:

2025-01-29 18:32:15 | INFO | req:abc123 | user:user456 | Starting quest completion
2025-01-29 18:32:15 | INFO | req:abc123 | user:user456 | Quest completed

Ora posso fare: grep "req:abc123" logs/app.log e vedo TUTTI i log di quella request!


Structured Logging con JSON

Perché JSON?

# Plain text (hard to parse)
INFO: User user123 completed quest quest456 earning 150 XP

# JSON (perfect for querying)
{
  "level": "INFO",
  "user_id": "user123",
  "quest_id": "quest456",
  "xp_earned": 150,
  "timestamp": "2025-01-29T18:32:15Z"
}

Vantaggi JSON:

  • ✅ Facile da parsare (tools, ElasticSearch, etc)
  • ✅ Structured queries: “Trova tutti error con user_id=X”
  • ✅ Aggregazioni: “Average XP earned per quest type”

Setup JSON Logging

# app/core/logging.py

import json
from loguru import logger

def json_formatter(record):
    """Format log as JSON."""

    # Base fields
    log_entry = {
        "timestamp": record["time"].isoformat(),
        "level": record["level"].name,
        "message": record["message"],
        "service": "quest-api",
        "environment": settings.ENVIRONMENT,
    }

    # Add context vars
    log_entry["correlation_id"] = correlation_id_var.get("")
    log_entry["user_id"] = user_id_var.get("")

    # Add extra fields
    log_entry.update(record["extra"])

    # Add exception if present
    if record["exception"]:
        import traceback as tb
        log_entry["exception"] = {
            "type": record["exception"].type.__name__,
            "value": str(record["exception"].value),
            "traceback": "".join(tb.format_tb(record["exception"].traceback))
        }

    return json.dumps(log_entry) + "\n"

def setup_logging():
    # JSON output for production
    logger.add(
        "logs/app.json",
        format=json_formatter,
        rotation="500 MB",
        retention="10 days"
    )

Usage con Extra Fields

from loguru import logger

# Simple
logger.info("Quest completed")

# With extra fields (structured!)
logger.info(
    "Quest completed",
    quest_id=quest_id,
    quest_type="combat",
    difficulty="hard",
    xp_earned=150,
    completion_time_seconds=247
)

Output JSON:

{
  "timestamp": "2025-01-29T18:32:15.123456Z",
  "level": "INFO",
  "message": "Quest completed",
  "service": "quest-api",
  "environment": "production",
  "correlation_id": "req_abc123",
  "user_id": "user456",
  "quest_id": "quest789",
  "quest_type": "combat",
  "difficulty": "hard",
  "xp_earned": 150,
  "completion_time_seconds": 247
}

Ora posso query: “Media completion_time per difficulty=hard”!


Best Practices & Patterns

1. Log Levels Giusti

# BAD: Everything at INFO level
logger.info("Starting process")
logger.info("Error occurred!")  # Error should use ERROR level!
logger.info("Debug value: X")    # Debug values should use DEBUG level!

# GOOD: Livelli appropriati
logger.debug("Debug value", value=x)          # Solo in development
logger.info("Quest completed", quest_id=id)    # Info normale
logger.warning("Slow query", duration_ms=5000) # Potential issue
logger.error("Quest failed", error=str(e))     # Error recoverable
logger.critical("Database down!")              # System failure

Regola:

  • DEBUG: Info per developer, mai in production
  • INFO: Operazioni normali (request start/end, actions)
  • WARNING: Qualcosa di strano ma non bloccante
  • ERROR: Errore gestito (try/except)
  • CRITICAL: Sistema down, serve intervento immediato

2. Context Ricco

# BAD: Minimal information
logger.error("Quest failed")

# GOOD: Context completo
logger.error(
    "Quest completion failed",
    quest_id=quest_id,
    user_id=user_id,
    quest_type=quest.type,
    difficulty=quest.difficulty,
    attempts=attempts,
    last_error=str(e),
    duration_ms=duration
)

Pensa: “Se leggo questo log tra 1 mese, capisco cosa è successo?“

3. Structured Fields, Non String Formatting

# BAD: String interpolation
logger.info(f"User {user_id} completed quest {quest_id}")

# GOOD: Structured fields
logger.info(
    "User completed quest",
    user_id=user_id,
    quest_id=quest_id
)

Perché: Il secondo è queryable. Il primo no.

4. Exception Logging

# BAD: Loses traceback
try:
    risky_operation()
except Exception as e:
    logger.error(f"Error: {e}")

# GOOD: Include traceback
try:
    risky_operation()
except Exception as e:
    logger.exception("Operation failed")  # Auto-include traceback!
    # O manualmente:
    # logger.error("Operation failed", exc_info=True)

5. Performance-Sensitive Code

# BAD: Expensive operation even when log is disabled
logger.debug(f"Data: {expensive_serialization(data)}")

# GOOD: Lazy evaluation con logger.opt(lazy=True)
logger.opt(lazy=True).debug("Data: {data}", data=lambda: expensive_serialization(data))

# BETTER: Use bind per context
logger = logger.bind(user_id=user_id)
# Now all logs include user_id without repeating it

Integration con Sentry

Setup

pip install sentry-sdk
# app/core/logging.py

import logging
import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration

def setup_sentry():
    """Configure Sentry error tracking."""

    sentry_sdk.init(
        dsn=settings.SENTRY_DSN,
        environment=settings.ENVIRONMENT,
        traces_sample_rate=0.1,  # 10% transactions for performance

        # Capture logs
        integrations=[
            LoggingIntegration(
                level=logging.INFO,
                event_level=logging.ERROR  # Send errors to Sentry
            )
        ]
    )

# app/main.py
from app.core.logging import setup_logging, setup_sentry

setup_logging()
setup_sentry()

Custom Context

from sentry_sdk import set_context, set_tag

@router.post("/quests/{quest_id}/complete")
async def complete_quest(quest_id: UUID, user: User = Depends(get_current_user)):
    # Add context to Sentry
    set_tag("user_id", str(user.id))
    set_tag("quest_id", str(quest_id))

    set_context("quest", {
        "id": str(quest_id),
        "type": quest.type,
        "difficulty": quest.difficulty
    })

    try:
        result = await quest_service.complete_quest(quest_id, user.id)
        return result
    except Exception as e:
        logger.exception("Quest completion failed")
        # Automatically sent to Sentry with full context
        raise

Ora in Sentry vedi errori con context completo: user, quest, traceback.


Testing & Development

Mock Logs in Tests

# tests/test_quest_service.py

import pytest
from loguru import logger

@pytest.fixture
def caplog():
    """Capture logs in tests."""
    import io

    # Create string buffer
    buffer = io.StringIO()

    # Add test handler
    handler_id = logger.add(buffer, format="{message}")

    yield buffer

    # Cleanup
    logger.remove(handler_id)

def test_quest_completion_logging(caplog):
    """Test that quest completion logs correctly."""

    quest_service = QuestService(mock_repo)

    quest_service.complete_quest(quest_id, user_id)

    # Check logs
    logs = caplog.getvalue()
    assert "Quest completed" in logs
    assert str(quest_id) in logs

Development vs Production Config

# app/core/logging.py

def setup_logging():
    logger.remove()

    if settings.ENVIRONMENT == "development":
        # Pretty console output
        logger.add(
            sys.stdout,
            colorize=True,
            format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <level>{message}</level>",
            level="DEBUG"
        )
    else:
        # JSON structured for production
        logger.add(
            sys.stdout,
            format=json_formatter,
            level="INFO"
        )

        logger.add(
            "logs/app.json",
            format=json_formatter,
            rotation="500 MB",
            retention="10 days",
            level="INFO"
        )

Errori Comuni

1. Logare Password/Token

# NEVER LOG SENSITIVE DATA
logger.info("User logged in", password=password)
logger.info("API call", api_key=api_key)

# Sanitize before logging
logger.info("User logged in", username=username)  # OK
logger.info("API call", api_key="***REDACTED***")

2. Troppi Log

# Log spam (10,000 items!)
for item in items:  # 10,000 items!
    logger.info("Processing item", item_id=item.id)

# Log aggregato
logger.info("Processing items", count=len(items))
# Log only on failure
for item in items:
    try:
        process(item)
    except Exception as e:
        logger.error("Item failed", item_id=item.id, error=str(e))

3. Niente Context

# Useless log
logger.error("Error occurred")

# Log utile
logger.error(
    "Quest completion failed",
    quest_id=quest_id,
    user_id=user_id,
    error=str(e),
    attempts=attempts
)

4. String Interpolation

# Not queryable
logger.info(f"User {user_id} level {level}")

# Queryable
logger.info("User leveled up", user_id=user_id, level=level)

Real-World Example: Quest Service

# app/services/quest_service.py

from datetime import datetime
from uuid import UUID
from loguru import logger
from app.middleware.correlation_id import user_id_var
import time

class QuestService:
    async def complete_quest(
        self,
        quest_id: UUID,
        user_id: UUID
    ) -> QuestCompletionResult:
        """Complete quest con logging completo."""

        start_time = time.time()

        # Set user context
        user_id_var.set(str(user_id))

        logger.info(
            "Starting quest completion",
            quest_id=str(quest_id),
            user_id=str(user_id)
        )

        try:
            # Fetch quest
            quest = await self.quest_repo.get(quest_id)

            if not quest:
                logger.warning(
                    "Quest not found",
                    quest_id=str(quest_id),
                    user_id=str(user_id)
                )
                raise ValueError("Quest not found")

            logger.debug(
                "Quest fetched",
                quest_id=str(quest_id),
                quest_type=quest.type,
                difficulty=quest.difficulty
            )

            # Validate user can complete
            if quest.user_id != user_id:
                logger.error(
                    "User attempting to complete another user's quest",
                    quest_id=str(quest_id),
                    quest_owner=str(quest.user_id),
                    attempting_user=str(user_id)
                )
                raise PermissionError("Not your quest")

            # Calculate rewards
            xp_earned = self._calculate_xp(quest)

            logger.debug(
                "XP calculated",
                quest_id=str(quest_id),
                base_xp=quest.reward_xp,
                multiplier=1.5 if quest.difficulty == "hard" else 1.0,
                final_xp=xp_earned
            )

            # Update quest
            await self.quest_repo.update(
                quest_id,
                {
                    "status": "completed",
                    "completed_at": datetime.now(),
                    "xp_earned": xp_earned
                }
            )

            # Update user
            await self.user_repo.update(
                user_id,
                {"total_xp": user.total_xp + xp_earned}
            )

            duration_ms = int((time.time() - start_time) * 1000)

            logger.info(
                "Quest completed successfully",
                quest_id=str(quest_id),
                user_id=str(user_id),
                quest_type=quest.type,
                difficulty=quest.difficulty,
                xp_earned=xp_earned,
                duration_ms=duration_ms
            )

            return QuestCompletionResult(
                quest_id=quest_id,
                xp_earned=xp_earned
            )

        except Exception as e:
            duration_ms = int((time.time() - start_time) * 1000)

            logger.exception(
                "Quest completion failed",
                quest_id=str(quest_id),
                user_id=str(user_id),
                error_type=type(e).__name__,
                error_message=str(e),
                duration_ms=duration_ms
            )
            raise

Output (JSON):

{
  "timestamp": "2025-01-29T18:32:15.123Z",
  "level": "INFO",
  "message": "Quest completed successfully",
  "correlation_id": "req_abc123",
  "user_id": "user456",
  "quest_id": "quest789",
  "quest_type": "combat",
  "difficulty": "hard",
  "xp_earned": 150,
  "duration_ms": 247,
  "service": "quest-api",
  "function": "complete_quest"
}

Posso fare query tipo:

  • Tutte le quest completate da user456
  • Media duration_ms per difficulty
  • Quante quest tipo combat vs exploration
  • Error rate per quest_type

Conclusione

Next Steps

Quick Start:

git clone https://github.com/layerbylayer-blog/fastapi-logging-guide
cd fastapi-logging-guide

# Install
pip install -r requirements.txt

# Run con logging
uvicorn app.main:app --reload

Nel Tuo Progetto:

Day 1:

  1. Sostituisci tutti print() con logger
  2. Setup Loguru base
  3. Add correlation_id middleware

Day 2: 4. JSON logging per production 5. Context vars (user_id, etc) 6. Sentry integration

Day 3: 7. Review log levels 8. Add structured fields 9. Setup log rotation


Risorse

GitHub: fastapi-logging-guide

Include:

  • ✅ Setup completo Loguru + Sentry
  • ✅ Middleware correlation IDs
  • ✅ Context vars examples
  • ✅ Testing utilities
  • ✅ Production config

Tools:

Related Posts

PostgreSQL JSONB: Schema Flessibile senza Migrazioni

PostgreSQL JSONB: Schema Flessibile senza Migrazioni

PostgreSQL JSONB: schema flessibile senza migrazioni continuePostgreSQL JSONB: il best of both worlds tra NoSQL flexibility e SQL power⏱️ 15 minuti di lettura | **💻 [Codice completo

read more
Sistema Email Completo per FastAPI con SendGrid

Sistema Email Completo per FastAPI con SendGrid

Come ho costruito un sistema email affidabile per FastAPISistema email completo per app FastAPI, da template HTML a deliverability in produzione⏱️ 12 minuti di lettura | **💻 [Codice

read more
API Security in FastAPI: JWT, Rate Limiting e CORS

API Security in FastAPI: JWT, Rate Limiting e CORS

API Security in FastAPI: JWT, Rate Limiting e CORSJWT ben configurato, bcrypt, CORS per environment, rate limiting e dependency di autenticazione — lo stack minimo per un'API che non fa imbarazz

read more
Testing in FastAPI: PyTest, Fixtures e Async

Testing in FastAPI: PyTest, Fixtures e Async

Testing in FastAPI: PyTest, Fixtures e Asyncpytest + FastAPI + async SQLAlchemy: setup corretto, isolamento del database, dependency injection nei test⏱️ 14 minuti di lettura | **💻 [

read more
Repository Pattern in FastAPI: Quando e Perché Usarlo

Repository Pattern in FastAPI: Quando e Perché Usarlo

Repository Pattern in FastAPI: Quando e Perché UsarloSeparare business logic da data access: la guida che avrei voluto leggere prima di iniziare⏱️ 12 minuti di lettura | **💻 [Codice

read more