Transaction Management¶
Learn how to manage database transactions in FastAPI with DBWarden.
Automatic Transactions¶
By default, DBWarden sessions handle transactions automatically:
@app.post("/users")
async def create_user(user_data: UserCreate, session: SessionDep):
user = User(**user_data.dict())
session.add(user)
await session.commit() # Explicit commit
return user
# Session automatically closes here
When to Commit¶
Automatic (Session Autoflush)¶
For simple operations, SQLAlchemy flushes changes automatically:
@app.get("/users/{user_id}")
async def get_user(user_id: int, session: SessionDep):
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
# No commit needed for reads
Manual Commit¶
For writes, explicitly commit:
@app.post("/users")
async def create_user(user_data: UserCreate, session: SessionDep):
user = User(**user_data.dict())
session.add(user)
await session.commit() # ← Explicit commit
await session.refresh(user) # Get DB-generated values
return user
Error Handling and Rollback¶
Automatic Rollback¶
If an exception occurs, the session rolls back automatically:
@app.post("/users")
async def create_user(user_data: UserCreate, session: SessionDep):
user = User(**user_data.dict())
session.add(user)
if not validate_email(user.email):
raise HTTPException(400, "Invalid email")
# Session automatically rolls back
await session.commit()
return user
Manual Rollback¶
For explicit control:
@app.post("/users")
async def create_user(user_data: UserCreate, session: SessionDep):
user = User(**user_data.dict())
session.add(user)
try:
await session.commit()
except IntegrityError:
await session.rollback() # Explicit rollback
raise HTTPException(400, "User already exists")
return user
Nested Transactions (Savepoints)¶
Use savepoints for partial rollbacks:
from sqlalchemy.exc import IntegrityError
@app.post("/batch")
async def batch_create(users: list[UserCreate], session: SessionDep):
created = []
failed = []
for user_data in users:
savepoint = await session.begin_nested() # Create savepoint
try:
user = User(**user_data.dict())
session.add(user)
await session.flush()
created.append(user)
await savepoint.commit()
except IntegrityError:
await savepoint.rollback() # Rollback to savepoint
failed.append(user_data)
await session.commit() # Commit all successful inserts
return {"created": created, "failed": failed}
Multiple Operations in One Transaction¶
Group related operations:
@app.post("/orders")
async def create_order(order_data: OrderCreate, session: SessionDep):
# All operations in one transaction
# 1. Create order
order = Order(user_id=order_data.user_id)
session.add(order)
await session.flush() # Get order ID
# 2. Add order items
for item_data in order_data.items:
item = OrderItem(order_id=order.id, **item_data.dict())
session.add(item)
# 3. Update inventory
for item_data in order_data.items:
await session.execute(
update(Product)
.where(Product.id == item_data.product_id)
.values(stock=Product.stock - item_data.quantity)
)
# Commit everything at once
await session.commit()
await session.refresh(order)
return order
If any step fails, everything rolls back.
Isolation Levels¶
Set transaction isolation level:
from sqlalchemy import create_engine
# In engine creation (advanced)
engine = create_async_engine(
database_url,
isolation_level="SERIALIZABLE" # Strictest isolation
)
Isolation levels:
- READ UNCOMMITTED - Dirty reads possible
- READ COMMITTED - Default for PostgreSQL
- REPEATABLE READ - No phantom reads
- SERIALIZABLE - Strictest, slowest
Two-Phase Commit (Distributed Transactions)¶
For multi-database transactions (advanced):
@app.post("/transfer")
async def transfer_funds(
primary_session: PrimarySessionDep,
analytics_session: AnalyticsSessionDep,
):
try:
# Phase 1: Prepare both transactions
user = await primary_session.get(User, 1)
user.balance -= 100
await primary_session.flush()
event = Event(type="transfer", amount=100)
analytics_session.add(event)
await analytics_session.flush()
# Phase 2: Commit both
await primary_session.commit()
await analytics_session.commit()
except Exception:
# Rollback both if either fails
await primary_session.rollback()
await analytics_session.rollback()
raise
Limitations
Two-phase commit is complex and not fully supported by SQLAlchemy. Consider using saga pattern or event sourcing for distributed transactions.
Optimistic Locking¶
Prevent lost updates with version columns:
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str]
version: Mapped[int] = mapped_column(default=0) # Version column
@app.patch("/users/{user_id}")
async def update_user(
user_id: int,
user_data: UserUpdate,
session: SessionDep,
):
user = await session.get(User, user_id)
# Check version matches
if user.version != user_data.expected_version:
raise HTTPException(409, "User was modified by someone else")
user.email = user_data.email
user.version += 1 # Increment version
await session.commit()
return user
Pessimistic Locking¶
Lock rows explicitly:
from sqlalchemy import select
@app.post("/reserve")
async def reserve_item(item_id: int, session: SessionDep):
# Lock the row for update
result = await session.execute(
select(Item)
.where(Item.id == item_id)
.with_for_update() # SELECT ... FOR UPDATE
)
item = result.scalar_one()
if item.reserved:
raise HTTPException(400, "Already reserved")
item.reserved = True
await session.commit()
return item
Idempotency¶
Make operations idempotent:
@app.post("/orders", status_code=201)
async def create_order(
order_data: OrderCreate,
idempotency_key: str,
session: SessionDep,
):
# Check if order already exists
existing = await session.execute(
select(Order).where(Order.idempotency_key == idempotency_key)
)
order = existing.scalar_one_or_none()
if order:
return order # Already created, return existing
# Create new order
order = Order(**order_data.dict(), idempotency_key=idempotency_key)
session.add(order)
await session.commit()
return order
Recap¶
✅ Sessions handle transactions automatically
✅ Commit explicitly for writes
✅ Rollback automatically on errors
✅ Use savepoints for partial rollbacks
✅ Group related operations in one transaction
✅ Use optimistic locking to prevent lost updates
✅ Use pessimistic locking for critical sections
✅ Make operations idempotent with idempotency keys
What's Next?¶
- Engine Lifecycle - Connection pooling and cleanup
- Production Patterns - Deploy with confidence