Multi-Database¶
Learn how to work with multiple databases in your FastAPI application.
When to Use Multiple Databases¶
Common scenarios: - Primary + Analytics - Separate reporting from transactional data - Primary + Logging - Dedicated audit/logging database - Microservices - Each service has its own database - Multi-tenancy - One database per tenant - Read Replicas - Separate read and write databases
Quick Example¶
Configure multiple databases:
# dbwarden.py
from dbwarden import database_config
# Primary database
database_config(
database_name="primary",
default=True,
database_type="postgresql",
database_url="postgresql://user:password@localhost/myapp",
model_paths=["app.models.primary"],
)
# Analytics database
database_config(
database_name="analytics",
database_type="postgresql",
database_url="postgresql://user:password@localhost/analytics",
model_paths=["app.models.analytics"],
)
# Logging database
database_config(
database_name="logging",
database_type="postgresql",
database_url="postgresql://user:password@localhost/logs",
model_paths=["app.models.logging"],
)
Create session dependencies:
# app/dependencies.py
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from dbwarden.fastapi import get_session
# Primary database
PrimarySessionDep = Annotated[AsyncSession, Depends(get_session())]
# Analytics database
AnalyticsSessionDep = Annotated[AsyncSession, Depends(get_session("analytics"))]
# Logging database
LoggingSessionDep = Annotated[AsyncSession, Depends(get_session("logging"))]
Use in routes:
@app.get("/users")
async def list_users(session: PrimarySessionDep):
result = await session.execute(select(User))
return result.scalars().all()
@app.get("/analytics/events")
async def list_events(session: AnalyticsSessionDep):
result = await session.execute(select(Event))
return result.scalars().all()
Query Multiple Databases¶
You can use multiple session dependencies in the same route:
@app.get("/dashboard")
async def get_dashboard(
primary_session: PrimarySessionDep,
analytics_session: AnalyticsSessionDep,
logging_session: LoggingSessionDep,
):
# Query primary database
users = await primary_session.execute(select(User))
# Query analytics database
events = await analytics_session.execute(select(Event))
# Query logging database
logs = await logging_session.execute(select(AuditLog))
return {
"users": users.scalars().all(),
"events": events.scalars().all(),
"logs": logs.scalars().all(),
}
Independent Transactions
Each session has its own transaction. If one fails, others are unaffected.
Cross-Database Queries¶
SQLAlchemy doesn't support joining across different databases. Instead:
Pattern 1: Query Then Combine¶
@app.get("/report")
async def get_report(
primary_session: PrimarySessionDep,
analytics_session: AnalyticsSessionDep,
):
# Get user IDs from primary
user_result = await primary_session.execute(select(User.id))
user_ids = [row[0] for row in user_result.all()]
# Get events for those users from analytics
event_result = await analytics_session.execute(
select(Event).where(Event.user_id.in_(user_ids))
)
return {"events": event_result.scalars().all()}
Pattern 2: Denormalize¶
Store redundant data in each database:
# Primary DB - User
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str]
# Analytics DB - Event with user email
class Event(Base):
__tablename__ = "events"
id: Mapped[int] = mapped_column(primary_key=True)
user_email: Mapped[str] # Denormalized from User
event_type: Mapped[str]
Pattern 3: Application-Level Join¶
@app.get("/enriched-events")
async def get_enriched_events(
primary_session: PrimarySessionDep,
analytics_session: AnalyticsSessionDep,
):
# Get all users
users_result = await primary_session.execute(select(User))
users = {u.id: u for u in users_result.scalars().all()}
# Get all events
events_result = await analytics_session.execute(select(Event))
events = events_result.scalars().all()
# Join in Python
enriched = [
{
"event": event,
"user": users.get(event.user_id)
}
for event in events
]
return enriched
Startup Checks for All Databases¶
Check all databases on startup:
from contextlib import asynccontextmanager
from dbwarden.fastapi import migration_context
@asynccontextmanager
async def lifespan(app: FastAPI):
async with migration_context(
mode="check",
all_databases=True, # ← Check all databases
fail_fast=True,
):
yield
Health Checks for All Databases¶
Health endpoints automatically report all databases:
from dbwarden.fastapi import DBWardenHealthRouter
app.include_router(DBWardenHealthRouter(), prefix="/health")
Response shows all databases:
{
"status": "ok",
"databases": [
{
"database": "primary",
"status": "ok",
"connected": true,
"pending_migrations": 0,
"lock_active": false,
"error": null
},
{
"database": "analytics",
"status": "ok",
"connected": true,
"pending_migrations": 0,
"lock_active": false,
"error": null
},
{
"database": "logging",
"status": "ok",
"connected": true,
"pending_migrations": 0,
"lock_active": false,
"error": null
}
]
}
Check a specific database:
curl http://localhost:8000/health/analytics
Migrations for Multiple Databases¶
Each database has its own migration history:
# Create migration for primary database
dbwarden make-migrations -d primary -m "add users table"
# Create migration for analytics database
dbwarden make-migrations -d analytics -m "add events table"
# Apply migrations to all databases
dbwarden migrate --all
Common Patterns¶
Pattern 1: Primary + Read Replica¶
# app/dependencies.py
WriteSessionDep = Annotated[AsyncSession, Depends(get_session("primary"))]
ReadSessionDep = Annotated[AsyncSession, Depends(get_session("replica"))]
# Routes
@app.post("/users")
async def create_user(session: WriteSessionDep):
# Write to primary
...
@app.get("/users")
async def list_users(session: ReadSessionDep):
# Read from replica
...
Pattern 2: Tenant Per Database¶
def get_tenant_session(tenant_id: str):
return Annotated[AsyncSession, Depends(get_session(f"tenant_{tenant_id}"))]
@app.get("/data")
async def get_data(tenant_id: str):
TenantSessionDep = get_tenant_session(tenant_id)
# Use tenant-specific database
...
Pattern 3: Audit Logging¶
@app.post("/users")
async def create_user(
user_data: UserCreate,
primary_session: PrimarySessionDep,
logging_session: LoggingSessionDep,
):
# Create user in primary
user = User(**user_data.dict())
primary_session.add(user)
await primary_session.commit()
# Log action in logging database
log = AuditLog(action="create_user", user_id=user.id)
logging_session.add(log)
await logging_session.commit()
return user
Recap¶
✅ Configure multiple databases with database_config()
✅ Create separate session dependencies for each database
✅ Use multiple sessions in the same route
✅ Each session has independent transactions
✅ Check all databases on startup with all_databases=True
✅ Health endpoints report all databases automatically
✅ Each database has its own migration history
What's Next?¶
- Testing - Test multi-database applications
- Transaction Management - Coordinate across databases
- Production Patterns - Deploy multi-database apps