Skip to content

Complete Application

A full, production-ready FastAPI application using all of DBWarden's features.

Overview

This example shows: - Database configuration - Model definition - Session dependencies - CRUD operations - Startup checks - Health endpoints - Transaction management - Error handling

Project Structure

my_app/
├── dbwarden.py          # Database configuration
├── app/
│   ├── __init__.py
│   ├── main.py          # FastAPI app
│   ├── dependencies.py  # Shared dependencies
│   ├── models.py        # SQLAlchemy models
│   └── routes/
│       ├── __init__.py
│       └── users.py     # User endpoints
└── requirements.txt

Step 1: Database Configuration

Create dbwarden.py in your project root:

# dbwarden.py
from dbwarden import database_config

database_config(
    database_name="primary",
    default=True,
    database_type="postgresql",
    database_url="postgresql://user:password@localhost:5432/myapp",
    dev_database_type="sqlite",
    dev_database_url="sqlite:///./dev.db",
    model_paths=["app.models"],
)

Environment Variables

In production, use environment variables for sensitive data: ```python import os

database_config( database_name="primary", default=True, database_type="postgresql", database_url=os.getenv("DATABASE_URL"), model_paths=["app.models"], ) ```

Step 2: Define Models

Create app/models.py:

# app/models.py
from datetime import datetime
from sqlalchemy import Boolean, DateTime, Integer, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
    username: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
    full_name: Mapped[str | None] = mapped_column(String(200))
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, default=datetime.utcnow
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
    )

Step 3: Shared Dependencies

Create app/dependencies.py:

# app/dependencies.py
from typing import Annotated
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from dbwarden.fastapi import get_session

# Database session dependency
SessionDep = Annotated[AsyncSession, Depends(get_session())]

This creates a reusable type alias for database sessions.

Step 4: Pydantic Schemas

Create app/schemas.py for request/response models:

# app/schemas.py
from datetime import datetime
from pydantic import BaseModel, EmailStr


class UserBase(BaseModel):
    email: EmailStr
    username: str
    full_name: str | None = None


class UserCreate(UserBase):
    pass


class UserUpdate(BaseModel):
    email: EmailStr | None = None
    username: str | None = None
    full_name: str | None = None
    is_active: bool | None = None


class UserResponse(UserBase):
    id: int
    is_active: bool
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True

Step 5: User Routes

Create app/routes/users.py:

# app/routes/users.py
from fastapi import APIRouter, HTTPException
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError

from app.dependencies import SessionDep
from app.models import User
from app.schemas import UserCreate, UserResponse, UserUpdate

router = APIRouter(prefix="/users", tags=["users"])


@router.get("/", response_model=list[UserResponse])
async def list_users(
    session: SessionDep,
    skip: int = 0,
    limit: int = 100,
    active_only: bool = False,
):
    """List all users with pagination."""
    stmt = select(User).offset(skip).limit(limit)

    if active_only:
        stmt = stmt.where(User.is_active == True)

    result = await session.execute(stmt)
    return result.scalars().all()


@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, session: SessionDep):
    """Get a single user by ID."""
    result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()

    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return user


@router.post("/", response_model=UserResponse, status_code=201)
async def create_user(user_data: UserCreate, session: SessionDep):
    """Create a new user."""
    user = User(**user_data.model_dump())
    session.add(user)

    try:
        await session.commit()
    except IntegrityError:
        await session.rollback()
        raise HTTPException(
            status_code=400,
            detail="User with this email or username already exists"
        )

    await session.refresh(user)
    return user


@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
    user_id: int,
    user_data: UserUpdate,
    session: SessionDep,
):
    """Update a user."""
    result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()

    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    # Update only provided fields
    update_data = user_data.model_dump(exclude_unset=True)
    for key, value in update_data.items():
        setattr(user, key, value)

    try:
        await session.commit()
    except IntegrityError:
        await session.rollback()
        raise HTTPException(
            status_code=400,
            detail="Email or username already taken"
        )

    await session.refresh(user)
    return user


@router.delete("/{user_id}", status_code=204)
async def delete_user(user_id: int, session: SessionDep):
    """Delete a user."""
    result = await session.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalar_one_or_none()

    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    await session.delete(user)
    await session.commit()

Step 6: Main Application

Create app/main.py:

# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from dbwarden.fastapi import DBWardenHealthRouter, migration_context

from app.routes import users


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Startup and shutdown logic."""
    # Startup: check database migrations
    async with migration_context(
        mode="check",
        all_databases=True,
        fail_fast=True,
        verbose=True,
    ):
        yield
    # Shutdown: cleanup happens here


# Create FastAPI app
app = FastAPI(
    title="My App",
    description="Example app with DBWarden integration",
    version="1.0.0",
    lifespan=lifespan,
)

# Include routers
app.include_router(users.router, prefix="/api/v1")
app.include_router(DBWardenHealthRouter(), prefix="/health")


@app.get("/")
async def root():
    """Root endpoint."""
    return {
        "message": "Welcome to My App",
        "docs": "/docs",
        "health": "/health/"
    }

Step 7: Create Migrations

Initialize DBWarden and create your first migration:

# Initialize DBWarden (if not already done)
dbwarden init

# Create migration for User model
dbwarden make-migrations -m "create users table"

This generates a migration file like 0001_create_users_table.py.

Step 8: Apply Migrations

Apply the migration to your database:

# For development (SQLite)
export ENVIRONMENT=development
dbwarden migrate

# For production (PostgreSQL)
export ENVIRONMENT=production
dbwarden migrate

Step 9: Run the Application

Start your FastAPI app:

uvicorn app.main:app --reload

You'll see:

INFO:     Started server process [12345]
INFO:     Waiting for application startup.
INFO:     DBWarden: migration_context mode=check outcome=ok duration_ms=45
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000

Step 10: Test the API

Interactive Documentation

Open http://127.0.0.1:8000/docs to see the Swagger UI.

Create a User

curl -X POST http://localhost:8000/api/v1/users/ \
  -H "Content-Type: application/json" \
  -d '{
    "email": "alice@example.com",
    "username": "alice",
    "full_name": "Alice Smith"
  }'

Response:

{
  "id": 1,
  "email": "alice@example.com",
  "username": "alice",
  "full_name": "Alice Smith",
  "is_active": true,
  "created_at": "2024-01-15T10:30:00",
  "updated_at": "2024-01-15T10:30:00"
}

List Users

curl http://localhost:8000/api/v1/users/

Get a User

curl http://localhost:8000/api/v1/users/1

Update a User

curl -X PATCH http://localhost:8000/api/v1/users/1 \
  -H "Content-Type: application/json" \
  -d '{
    "full_name": "Alice Johnson"
  }'

Delete a User

curl -X DELETE http://localhost:8000/api/v1/users/1

Check Health

curl http://localhost:8000/health/

Key Features Demonstrated

1. Session Management

@router.post("/", response_model=UserResponse)
async def create_user(user_data: UserCreate, session: SessionDep):
    # Session automatically provided
    user = User(**user_data.model_dump())
    session.add(user)
    await session.commit()
    await session.refresh(user)
    return user
    # Session automatically closed

2. Error Handling

try:
    await session.commit()
except IntegrityError:
    await session.rollback()  # Explicit rollback
    raise HTTPException(400, "Duplicate entry")

3. Query Patterns

# Select one
result = await session.execute(
    select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()

# Select many
result = await session.execute(
    select(User).offset(skip).limit(limit)
)
users = result.scalars().all()

4. Transaction Management

# Add to session
session.add(user)

# Commit changes
await session.commit()

# Refresh to get DB-generated values
await session.refresh(user)

# Delete
await session.delete(user)
await session.commit()

5. Startup Validation

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with migration_context(mode="check"):
        # App only starts if database is healthy
        yield

6. Health Endpoints

app.include_router(DBWardenHealthRouter(), prefix="/health")

Provides: - GET /health/ - Overall health - GET /health/{database_name} - Per-database health

Production Deployment

Docker

Create Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# Run migrations before starting app
CMD dbwarden migrate && uvicorn app.main:app --host 0.0.0.0 --port 8000

Kubernetes

Create deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: myapp
spec:
  replicas: 3
  selector:
    matchLabels:
      app: myapp
  template:
    metadata:
      labels:
        app: myapp
    spec:
      initContainers:
      # Run migrations in init container
      - name: migrate
        image: myapp:latest
        command: ["dbwarden", "migrate"]
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url

      containers:
      - name: app
        image: myapp:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: db-secret
              key: url
        - name: ENVIRONMENT
          value: "production"

        # Liveness probe
        livenessProbe:
          httpGet:
            path: /health/
            port: 8000
          initialDelaySeconds: 10
          periodSeconds: 30

        # Readiness probe
        readinessProbe:
          httpGet:
            path: /health/
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10

Environment Variables

Create .env for local development:

# .env
ENVIRONMENT=development
DATABASE_URL=sqlite:///./dev.db

For production, set:

ENVIRONMENT=production
DATABASE_URL=postgresql://user:password@db-host:5432/myapp

Requirements

Create requirements.txt:

fastapi>=0.104.0
uvicorn[standard]>=0.24.0
sqlalchemy>=2.0.0
asyncpg>=0.29.0  # PostgreSQL async driver
aiosqlite>=0.19.0  # SQLite async driver
pydantic[email]>=2.4.0
dbwarden>=0.1.0

Install:

pip install -r requirements.txt

Testing

Create tests/test_users.py:

import pytest
from httpx import AsyncClient
from app.main import app


@pytest.mark.asyncio
async def test_create_user():
    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.post(
            "/api/v1/users/",
            json={
                "email": "test@example.com",
                "username": "testuser",
                "full_name": "Test User"
            }
        )
    assert response.status_code == 201
    data = response.json()
    assert data["email"] == "test@example.com"
    assert data["username"] == "testuser"


@pytest.mark.asyncio
async def test_list_users():
    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.get("/api/v1/users/")
    assert response.status_code == 200
    assert isinstance(response.json(), list)


@pytest.mark.asyncio
async def test_health():
    async with AsyncClient(app=app, base_url="http://test") as client:
        response = await client.get("/health/")
    assert response.status_code == 200
    data = response.json()
    assert data["status"] in ["ok", "degraded", "error"]

Run tests:

pytest tests/

Recap

This complete application demonstrates:

✅ Database configuration with DBWarden
✅ SQLAlchemy models with Mapped columns
✅ Pydantic schemas for validation
✅ Session dependencies with type aliases
✅ Full CRUD operations
✅ Error handling and transactions
✅ Startup migration checks
✅ Health endpoints
✅ Production deployment (Docker, Kubernetes)
✅ Testing setup

What's Next?

Take your app further: