Skip to content

Multi-Database

Learn how to work with multiple databases in your FastAPI application.

When to Use Multiple Databases

Common scenarios: - Primary + Analytics - Separate reporting from transactional data - Primary + Logging - Dedicated audit/logging database - Microservices - Each service has its own database - Multi-tenancy - One database per tenant - Read Replicas - Separate read and write databases

Quick Example

Configure multiple databases:

# dbwarden.py
from dbwarden import database_config

# Primary database
database_config(
    database_name="primary",
    default=True,
    database_type="postgresql",
    database_url="postgresql://user:password@localhost/myapp",
    model_paths=["app.models.primary"],
)

# Analytics database
database_config(
    database_name="analytics",
    database_type="postgresql",
    database_url="postgresql://user:password@localhost/analytics",
    model_paths=["app.models.analytics"],
)

# Logging database
database_config(
    database_name="logging",
    database_type="postgresql",
    database_url="postgresql://user:password@localhost/logs",
    model_paths=["app.models.logging"],
)

Create session dependencies:

# app/dependencies.py
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from dbwarden.fastapi import get_session

# Primary database
PrimarySessionDep = Annotated[AsyncSession, Depends(get_session())]

# Analytics database
AnalyticsSessionDep = Annotated[AsyncSession, Depends(get_session("analytics"))]

# Logging database
LoggingSessionDep = Annotated[AsyncSession, Depends(get_session("logging"))]

Use in routes:

@app.get("/users")
async def list_users(session: PrimarySessionDep):
    result = await session.execute(select(User))
    return result.scalars().all()

@app.get("/analytics/events")
async def list_events(session: AnalyticsSessionDep):
    result = await session.execute(select(Event))
    return result.scalars().all()

Query Multiple Databases

You can use multiple session dependencies in the same route:

@app.get("/dashboard")
async def get_dashboard(
    primary_session: PrimarySessionDep,
    analytics_session: AnalyticsSessionDep,
    logging_session: LoggingSessionDep,
):
    # Query primary database
    users = await primary_session.execute(select(User))

    # Query analytics database
    events = await analytics_session.execute(select(Event))

    # Query logging database
    logs = await logging_session.execute(select(AuditLog))

    return {
        "users": users.scalars().all(),
        "events": events.scalars().all(),
        "logs": logs.scalars().all(),
    }

Independent Transactions

Each session has its own transaction. If one fails, others are unaffected.

Cross-Database Queries

SQLAlchemy doesn't support joining across different databases. Instead:

Pattern 1: Query Then Combine

@app.get("/report")
async def get_report(
    primary_session: PrimarySessionDep,
    analytics_session: AnalyticsSessionDep,
):
    # Get user IDs from primary
    user_result = await primary_session.execute(select(User.id))
    user_ids = [row[0] for row in user_result.all()]

    # Get events for those users from analytics
    event_result = await analytics_session.execute(
        select(Event).where(Event.user_id.in_(user_ids))
    )

    return {"events": event_result.scalars().all()}

Pattern 2: Denormalize

Store redundant data in each database:

# Primary DB - User
class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str]

# Analytics DB - Event with user email
class Event(Base):
    __tablename__ = "events"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_email: Mapped[str]  # Denormalized from User
    event_type: Mapped[str]

Pattern 3: Application-Level Join

@app.get("/enriched-events")
async def get_enriched_events(
    primary_session: PrimarySessionDep,
    analytics_session: AnalyticsSessionDep,
):
    # Get all users
    users_result = await primary_session.execute(select(User))
    users = {u.id: u for u in users_result.scalars().all()}

    # Get all events
    events_result = await analytics_session.execute(select(Event))
    events = events_result.scalars().all()

    # Join in Python
    enriched = [
        {
            "event": event,
            "user": users.get(event.user_id)
        }
        for event in events
    ]

    return enriched

Startup Checks for All Databases

Check all databases on startup:

from contextlib import asynccontextmanager
from dbwarden.fastapi import migration_context

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with migration_context(
        mode="check",
        all_databases=True,  # ← Check all databases
        fail_fast=True,
    ):
        yield

Health Checks for All Databases

Health endpoints automatically report all databases:

from dbwarden.fastapi import DBWardenHealthRouter

app.include_router(DBWardenHealthRouter(), prefix="/health")

Response shows all databases:

{
  "status": "ok",
  "databases": [
    {
      "database": "primary",
      "status": "ok",
      "connected": true,
      "pending_migrations": 0,
      "lock_active": false,
      "error": null
    },
    {
      "database": "analytics",
      "status": "ok",
      "connected": true,
      "pending_migrations": 0,
      "lock_active": false,
      "error": null
    },
    {
      "database": "logging",
      "status": "ok",
      "connected": true,
      "pending_migrations": 0,
      "lock_active": false,
      "error": null
    }
  ]
}

Check a specific database:

curl http://localhost:8000/health/analytics

Migrations for Multiple Databases

Each database has its own migration history:

# Create migration for primary database
dbwarden make-migrations -d primary -m "add users table"

# Create migration for analytics database
dbwarden make-migrations -d analytics -m "add events table"

# Apply migrations to all databases
dbwarden migrate --all

Common Patterns

Pattern 1: Primary + Read Replica

# app/dependencies.py
WriteSessionDep = Annotated[AsyncSession, Depends(get_session("primary"))]
ReadSessionDep = Annotated[AsyncSession, Depends(get_session("replica"))]

# Routes
@app.post("/users")
async def create_user(session: WriteSessionDep):
    # Write to primary
    ...

@app.get("/users")
async def list_users(session: ReadSessionDep):
    # Read from replica
    ...

Pattern 2: Tenant Per Database

def get_tenant_session(tenant_id: str):
    return Annotated[AsyncSession, Depends(get_session(f"tenant_{tenant_id}"))]

@app.get("/data")
async def get_data(tenant_id: str):
    TenantSessionDep = get_tenant_session(tenant_id)
    # Use tenant-specific database
    ...

Pattern 3: Audit Logging

@app.post("/users")
async def create_user(
    user_data: UserCreate,
    primary_session: PrimarySessionDep,
    logging_session: LoggingSessionDep,
):
    # Create user in primary
    user = User(**user_data.dict())
    primary_session.add(user)
    await primary_session.commit()

    # Log action in logging database
    log = AuditLog(action="create_user", user_id=user.id)
    logging_session.add(log)
    await logging_session.commit()

    return user

Recap

✅ Configure multiple databases with database_config()
✅ Create separate session dependencies for each database
✅ Use multiple sessions in the same route
✅ Each session has independent transactions
✅ Check all databases on startup with all_databases=True
✅ Health endpoints report all databases automatically
✅ Each database has its own migration history

What's Next?