Complete Application¶
A full, production-ready FastAPI application using all of DBWarden's features.
Overview¶
This example shows: - Database configuration - Model definition - Session dependencies - CRUD operations - Startup checks - Health endpoints - Transaction management - Error handling
Project Structure¶
my_app/
├── dbwarden.py # Database configuration
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app
│ ├── dependencies.py # Shared dependencies
│ ├── models.py # SQLAlchemy models
│ └── routes/
│ ├── __init__.py
│ └── users.py # User endpoints
└── requirements.txt
Step 1: Database Configuration¶
Create dbwarden.py in your project root:
# dbwarden.py
from dbwarden import database_config
database_config(
database_name="primary",
default=True,
database_type="postgresql",
database_url="postgresql://user:password@localhost:5432/myapp",
dev_database_type="sqlite",
dev_database_url="sqlite:///./dev.db",
model_paths=["app.models"],
)
Environment Variables
In production, use environment variables for sensitive data: ```python import os
database_config( database_name="primary", default=True, database_type="postgresql", database_url=os.getenv("DATABASE_URL"), model_paths=["app.models"], ) ```
Step 2: Define Models¶
Create app/models.py:
# app/models.py
from datetime import datetime
from sqlalchemy import Boolean, DateTime, Integer, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
username: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
full_name: Mapped[str | None] = mapped_column(String(200))
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
Step 3: Shared Dependencies¶
Create app/dependencies.py:
# app/dependencies.py
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from dbwarden.fastapi import get_session
# Database session dependency
SessionDep = Annotated[AsyncSession, Depends(get_session())]
This creates a reusable type alias for database sessions.
Step 4: Pydantic Schemas¶
Create app/schemas.py for request/response models:
# app/schemas.py
from datetime import datetime
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
email: EmailStr
username: str
full_name: str | None = None
class UserCreate(UserBase):
pass
class UserUpdate(BaseModel):
email: EmailStr | None = None
username: str | None = None
full_name: str | None = None
is_active: bool | None = None
class UserResponse(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
Step 5: User Routes¶
Create app/routes/users.py:
# app/routes/users.py
from fastapi import APIRouter, HTTPException
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from app.dependencies import SessionDep
from app.models import User
from app.schemas import UserCreate, UserResponse, UserUpdate
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/", response_model=list[UserResponse])
async def list_users(
session: SessionDep,
skip: int = 0,
limit: int = 100,
active_only: bool = False,
):
"""List all users with pagination."""
stmt = select(User).offset(skip).limit(limit)
if active_only:
stmt = stmt.where(User.is_active == True)
result = await session.execute(stmt)
return result.scalars().all()
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, session: SessionDep):
"""Get a single user by ID."""
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(user_data: UserCreate, session: SessionDep):
"""Create a new user."""
user = User(**user_data.model_dump())
session.add(user)
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(
status_code=400,
detail="User with this email or username already exists"
)
await session.refresh(user)
return user
@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_data: UserUpdate,
session: SessionDep,
):
"""Update a user."""
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Update only provided fields
update_data = user_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(user, key, value)
try:
await session.commit()
except IntegrityError:
await session.rollback()
raise HTTPException(
status_code=400,
detail="Email or username already taken"
)
await session.refresh(user)
return user
@router.delete("/{user_id}", status_code=204)
async def delete_user(user_id: int, session: SessionDep):
"""Delete a user."""
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
await session.delete(user)
await session.commit()
Step 6: Main Application¶
Create app/main.py:
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from dbwarden.fastapi import DBWardenHealthRouter, migration_context
from app.routes import users
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown logic."""
# Startup: check database migrations
async with migration_context(
mode="check",
all_databases=True,
fail_fast=True,
verbose=True,
):
yield
# Shutdown: cleanup happens here
# Create FastAPI app
app = FastAPI(
title="My App",
description="Example app with DBWarden integration",
version="1.0.0",
lifespan=lifespan,
)
# Include routers
app.include_router(users.router, prefix="/api/v1")
app.include_router(DBWardenHealthRouter(), prefix="/health")
@app.get("/")
async def root():
"""Root endpoint."""
return {
"message": "Welcome to My App",
"docs": "/docs",
"health": "/health/"
}
Step 7: Create Migrations¶
Initialize DBWarden and create your first migration:
# Initialize DBWarden (if not already done)
dbwarden init
# Create migration for User model
dbwarden make-migrations -m "create users table"
This generates a migration file like 0001_create_users_table.py.
Step 8: Apply Migrations¶
Apply the migration to your database:
# For development (SQLite)
export ENVIRONMENT=development
dbwarden migrate
# For production (PostgreSQL)
export ENVIRONMENT=production
dbwarden migrate
Step 9: Run the Application¶
Start your FastAPI app:
uvicorn app.main:app --reload
You'll see:
INFO: Started server process [12345]
INFO: Waiting for application startup.
INFO: DBWarden: migration_context mode=check outcome=ok duration_ms=45
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000
Step 10: Test the API¶
Interactive Documentation¶
Open http://127.0.0.1:8000/docs to see the Swagger UI.
Create a User¶
curl -X POST http://localhost:8000/api/v1/users/ \
-H "Content-Type: application/json" \
-d '{
"email": "alice@example.com",
"username": "alice",
"full_name": "Alice Smith"
}'
Response:
{
"id": 1,
"email": "alice@example.com",
"username": "alice",
"full_name": "Alice Smith",
"is_active": true,
"created_at": "2024-01-15T10:30:00",
"updated_at": "2024-01-15T10:30:00"
}
List Users¶
curl http://localhost:8000/api/v1/users/
Get a User¶
curl http://localhost:8000/api/v1/users/1
Update a User¶
curl -X PATCH http://localhost:8000/api/v1/users/1 \
-H "Content-Type: application/json" \
-d '{
"full_name": "Alice Johnson"
}'
Delete a User¶
curl -X DELETE http://localhost:8000/api/v1/users/1
Check Health¶
curl http://localhost:8000/health/
Key Features Demonstrated¶
1. Session Management¶
@router.post("/", response_model=UserResponse)
async def create_user(user_data: UserCreate, session: SessionDep):
# Session automatically provided
user = User(**user_data.model_dump())
session.add(user)
await session.commit()
await session.refresh(user)
return user
# Session automatically closed
2. Error Handling¶
try:
await session.commit()
except IntegrityError:
await session.rollback() # Explicit rollback
raise HTTPException(400, "Duplicate entry")
3. Query Patterns¶
# Select one
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
# Select many
result = await session.execute(
select(User).offset(skip).limit(limit)
)
users = result.scalars().all()
4. Transaction Management¶
# Add to session
session.add(user)
# Commit changes
await session.commit()
# Refresh to get DB-generated values
await session.refresh(user)
# Delete
await session.delete(user)
await session.commit()
5. Startup Validation¶
@asynccontextmanager
async def lifespan(app: FastAPI):
async with migration_context(mode="check"):
# App only starts if database is healthy
yield
6. Health Endpoints¶
app.include_router(DBWardenHealthRouter(), prefix="/health")
Provides:
- GET /health/ - Overall health
- GET /health/{database_name} - Per-database health
Production Deployment¶
Docker¶
Create Dockerfile:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# Run migrations before starting app
CMD dbwarden migrate && uvicorn app.main:app --host 0.0.0.0 --port 8000
Kubernetes¶
Create deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
spec:
replicas: 3
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
spec:
initContainers:
# Run migrations in init container
- name: migrate
image: myapp:latest
command: ["dbwarden", "migrate"]
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
containers:
- name: app
image: myapp:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
- name: ENVIRONMENT
value: "production"
# Liveness probe
livenessProbe:
httpGet:
path: /health/
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
# Readiness probe
readinessProbe:
httpGet:
path: /health/
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
Environment Variables¶
Create .env for local development:
# .env
ENVIRONMENT=development
DATABASE_URL=sqlite:///./dev.db
For production, set:
ENVIRONMENT=production
DATABASE_URL=postgresql://user:password@db-host:5432/myapp
Requirements¶
Create requirements.txt:
fastapi>=0.104.0
uvicorn[standard]>=0.24.0
sqlalchemy>=2.0.0
asyncpg>=0.29.0 # PostgreSQL async driver
aiosqlite>=0.19.0 # SQLite async driver
pydantic[email]>=2.4.0
dbwarden>=0.1.0
Install:
pip install -r requirements.txt
Testing¶
Create tests/test_users.py:
import pytest
from httpx import AsyncClient
from app.main import app
@pytest.mark.asyncio
async def test_create_user():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.post(
"/api/v1/users/",
json={
"email": "test@example.com",
"username": "testuser",
"full_name": "Test User"
}
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "test@example.com"
assert data["username"] == "testuser"
@pytest.mark.asyncio
async def test_list_users():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/api/v1/users/")
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.asyncio
async def test_health():
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/health/")
assert response.status_code == 200
data = response.json()
assert data["status"] in ["ok", "degraded", "error"]
Run tests:
pytest tests/
Recap¶
This complete application demonstrates:
✅ Database configuration with DBWarden
✅ SQLAlchemy models with Mapped columns
✅ Pydantic schemas for validation
✅ Session dependencies with type aliases
✅ Full CRUD operations
✅ Error handling and transactions
✅ Startup migration checks
✅ Health endpoints
✅ Production deployment (Docker, Kubernetes)
✅ Testing setup
What's Next?¶
Take your app further:
- Multi-Database - Add analytics or logging databases
- Testing - Advanced testing patterns
- Transaction Management - Complex transactions
- Production Patterns - CI/CD and monitoring