Engine Lifecycle¶
Learn how DBWarden manages database engines and connections.
How Engines Are Created¶
When you first use get_session():
- First request arrives with
SessionDepdependency - DBWarden checks engine cache - Is there an engine for this database?
- If not cached:
- Reads database config from DBWarden registry
- Converts URL to async version (
postgresql+asyncpg://...) - Creates
AsyncEnginewithcreate_async_engine() - Caches engine by database URL
- Creates session factory (
async_sessionmaker) - Opens new session for this request
- Yields session to your route
- Closes session in finally block
Engine Caching¶
Engines are cached per unique database URL:
# Internal cache (simplified)
_engine_cache = {}
def get_engine(database_url: str):
if database_url not in _engine_cache:
_engine_cache[database_url] = create_async_engine(database_url)
return _engine_cache[database_url]
Why cache engines? - Engines are expensive to create - Each engine manages a connection pool - Creating per-request would exhaust connections - Single engine per database is SQLAlchemy best practice
Connection Pooling¶
Each engine maintains a connection pool:
Default Pool Settings¶
create_async_engine(
database_url,
pool_size=5, # Max connections in pool
max_overflow=10, # Extra connections if pool full
pool_timeout=30, # Wait time for available connection
pool_recycle=3600, # Recycle connections after 1 hour
)
Custom Pool Settings¶
To customize, you need to create engines manually (advanced):
from sqlalchemy.ext.asyncio import create_async_engine
custom_engine = create_async_engine(
"postgresql+asyncpg://...",
pool_size=20, # More connections
max_overflow=5, # Fewer overflow
pool_pre_ping=True, # Test connections before use
)
DBWarden Default
DBWarden uses SQLAlchemy's defaults, which work well for most applications.
Engine Disposal¶
Engines should be disposed when your app shuts down.
Manual Disposal¶
Currently, DBWarden doesn't provide a built-in disposal function. You can add one:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from dbwarden.fastapi import migration_context, get_session
# Keep reference to engines
_engines = []
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
async with migration_context(mode="check"):
yield
# Shutdown - dispose engines
for engine in _engines:
await engine.dispose()
app = FastAPI(lifespan=lifespan)
Kubernetes
In Kubernetes, pods are terminated quickly, so manual disposal is less critical. The OS cleans up connections.
Session Lifecycle¶
Each request gets its own session:
Request → get_session() → Session created → Route runs → Session closed
Session Settings¶
DBWarden sessions use:
async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Keep objects accessible after commit
)
Why expire_on_commit=False?¶
Without it:
@app.post("/users")
async def create_user(session: SessionDep):
user = User(email="test@example.com")
session.add(user)
await session.commit()
# ❌ Error: Instance is not bound to a Session
return user
With it:
@app.post("/users")
async def create_user(session: SessionDep):
user = User(email="test@example.com")
session.add(user)
await session.commit()
# ✅ Works: Object still accessible
return user
FastAPI needs to serialize the object after the route returns, so expire_on_commit=False is essential.
Connection Pool Exhaustion¶
Symptoms¶
TimeoutError: QueuePool limit of size 5 overflow 10 reached
This happens when: - Too many concurrent requests - Connections not being released - Long-running queries - Connection leaks
Solutions¶
1. Increase Pool Size¶
# In engine creation
pool_size=20,
max_overflow=10,
2. Profile Connection Usage¶
# Enable pool logging
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.pool').setLevel(logging.DEBUG)
3. Close Connections Properly¶
Make sure sessions close:
# ✅ Correct - session closes automatically
@app.get("/users")
async def list_users(session: SessionDep):
result = await session.execute(select(User))
return result.scalars().all()
# Session closes here
# ❌ Wrong - keeping session reference
_sessions = []
@app.get("/users")
async def list_users(session: SessionDep):
_sessions.append(session) # Leak!
...
4. Set Connection Timeout¶
pool_timeout=30, # Wait 30 seconds for connection
Engine Per Database¶
With multiple databases, each gets its own engine:
# Internal (simplified)
_engine_cache = {
"postgresql://localhost/primary": <Engine1>,
"postgresql://localhost/analytics": <Engine2>,
"postgresql://localhost/logging": <Engine3>,
}
Each engine has its own connection pool.
Monitoring Connections¶
Check Pool Status¶
from sqlalchemy import inspect
@app.get("/debug/pool-status")
async def pool_status():
engine = get_engine_for_database("primary") # Hypothetical
pool = engine.pool
return {
"size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
}
Log Pool Events¶
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.pool').setLevel(logging.INFO)
Connection Recycling¶
Connections are recycled after pool_recycle seconds (default: -1, never):
create_async_engine(
database_url,
pool_recycle=3600, # Recycle after 1 hour
)
Why recycle? - Database closes idle connections - Prevents stale connections - Refreshes connection state
Pre-Ping¶
Test connections before use:
create_async_engine(
database_url,
pool_pre_ping=True, # Test connection with SELECT 1
)
Trade-off: - Pro: Prevents errors from stale connections - Con: Adds latency to every request
Recap¶
✅ Engines are created once and cached per database
✅ Each engine manages a connection pool
✅ Default pool size is 5 + 10 overflow
✅ Sessions are request-scoped and auto-closed
✅ expire_on_commit=False for FastAPI compatibility
✅ Monitor pool usage to diagnose connection issues
✅ Recycle connections to prevent staleness
What's Next?¶
- Production Patterns - Deploy and monitor
- Multi-Database - Multiple connection pools