Implementation Patterns
CRUD operations, authentication, pagination, rate limiting, and production patterns.
Beginner
#Common Patterns for Everyday Use
#CRUD Operations
The most common pattern in API development is CRUD (Create, Read, Update, Delete). Here is a complete example:
python
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
app = FastAPI()
# --- Models ---
class BookCreate(BaseModel):
title: str
author: str
year: int
class BookUpdate(BaseModel):
title: str | None = None
author: str | None = None
year: int | None = None
class BookResponse(BookCreate):
id: int
# --- In-memory storage (use a database in production) ---
books_db: dict[int, dict] = {}
next_id = 1
# --- Routes ---
@app.post("/books/", response_model=BookResponse, status_code=status.HTTP_201_CREATED)
def create_book(book: BookCreate):
global next_id
book_data = {"id": next_id, **book.model_dump()}
books_db[next_id] = book_data
next_id += 1
return book_data
@app.get("/books/", response_model=list[BookResponse])
def list_books(skip: int = 0, limit: int = 10):
all_books = list(books_db.values())
return all_books[skip : skip + limit]
@app.get("/books/{book_id}", response_model=BookResponse)
def get_book(book_id: int):
if book_id not in books_db:
raise HTTPException(status_code=404, detail="Book not found")
return books_db[book_id]
@app.patch("/books/{book_id}", response_model=BookResponse)
def update_book(book_id: int, book_update: BookUpdate):
if book_id not in books_db:
raise HTTPException(status_code=404, detail="Book not found")
stored = books_db[book_id]
update_data = book_update.model_dump(exclude_unset=True)
stored.update(update_data)
return stored
@app.delete("/books/{book_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_book(book_id: int):
if book_id not in books_db:
raise HTTPException(status_code=404, detail="Book not found")
del books_db[book_id]
#File Uploads
python
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/upload/")
async def upload_file(file: UploadFile):
contents = await file.read()
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(contents),
}
@app.post("/upload-multiple/")
async def upload_multiple(files: list[UploadFile]):
return [{"filename": f.filename, "size": f.size} for f in files]
#Form Data
python
from fastapi import FastAPI, Form
app = FastAPI()
@app.post("/login/")
def login(username: str = Form(...), password: str = Form(...)):
# In production, verify credentials against database
return {"username": username}
#Returning HTML
python
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from fastapi import Request
app = FastAPI()
templates = Jinja2Templates(directory="templates")
@app.get("/page/{page_name}", response_class=HTMLResponse)
def get_page(request: Request, page_name: str):
return templates.TemplateResponse(
"page.html",
{"request": request, "page_name": page_name},
)
Intermediate
#Production Patterns
#Dependency Injection for Database Sessions
The most common DI pattern is database session management:
python
from typing import Annotated, AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with SessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Create a reusable type alias
DbSession = Annotated[AsyncSession, Depends(get_db)]
# Use in any route
@app.get("/users/")
async def list_users(db: DbSession):
result = await db.execute(select(User))
return result.scalars().all()
#Authentication with OAuth2 + JWT
python
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
app = FastAPI()
# Configuration
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class Token(BaseModel):
access_token: str
token_type: str
class User(BaseModel):
username: str
email: str
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user_from_db(username)
if user is None:
raise credentials_exception
return user
# Reusable dependency type
CurrentUser = Annotated[User, Depends(get_current_user)]
@app.post("/token", response_model=Token)
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Incorrect username or password")
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: CurrentUser):
return current_user
#CORS Configuration
python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000", # React dev server
"https://myapp.example.com", # Production frontend
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["X-Total-Count"], # Custom headers visible to browser
)
#Pagination Pattern
python
from typing import Annotated, Generic, TypeVar
from fastapi import Depends, Query
from pydantic import BaseModel
T = TypeVar("T")
class PaginationParams(BaseModel):
skip: int = 0
limit: int = 20
class PaginatedResponse(BaseModel, Generic[T]):
items: list[T]
total: int
skip: int
limit: int
def get_pagination(
skip: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=100)] = 20,
) -> PaginationParams:
return PaginationParams(skip=skip, limit=limit)
Pagination = Annotated[PaginationParams, Depends(get_pagination)]
@app.get("/items/", response_model=PaginatedResponse[ItemResponse])
async def list_items(pagination: Pagination, db: DbSession):
total = await db.scalar(select(func.count(Item.id)))
items = await db.scalars(
select(Item).offset(pagination.skip).limit(pagination.limit)
)
return PaginatedResponse(
items=items.all(),
total=total,
skip=pagination.skip,
limit=pagination.limit,
)
#Rate Limiting with Dependencies
python
from fastapi import Depends, HTTPException, Request
import time
# Simple in-memory rate limiter (use Redis in production)
rate_limit_store: dict[str, list[float]] = {}
async def rate_limiter(request: Request, limit: int = 60, window: int = 60):
client_ip = request.client.host
now = time.time()
if client_ip not in rate_limit_store:
rate_limit_store[client_ip] = []
# Remove expired timestamps
rate_limit_store[client_ip] = [
t for t in rate_limit_store[client_ip] if t > now - window
]
if len(rate_limit_store[client_ip]) >= limit:
raise HTTPException(
status_code=429,
detail="Too many requests",
headers={"Retry-After": str(window)},
)
rate_limit_store[client_ip].append(now)
@app.get("/api/data", dependencies=[Depends(rate_limiter)])
async def get_data():
return {"data": "Here you go"}
Advanced
#Advanced Implementation Patterns
#Repository Pattern with Generic Types
python
from typing import Generic, TypeVar, Type
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
ModelType = TypeVar("ModelType")
CreateSchemaType = TypeVar("CreateSchemaType", bound=BaseModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=BaseModel)
class BaseRepository(Generic[ModelType, CreateSchemaType, UpdateSchemaType]):
def __init__(self, model: Type[ModelType], db: AsyncSession):
self.model = model
self.db = db
async def get(self, id: int) -> ModelType | None:
return await self.db.get(self.model, id)
async def get_multi(self, skip: int = 0, limit: int = 100) -> list[ModelType]:
result = await self.db.scalars(
select(self.model).offset(skip).limit(limit)
)
return result.all()
async def create(self, obj_in: CreateSchemaType) -> ModelType:
db_obj = self.model(**obj_in.model_dump())
self.db.add(db_obj)
await self.db.flush()
await self.db.refresh(db_obj)
return db_obj
async def update(self, id: int, obj_in: UpdateSchemaType) -> ModelType | None:
db_obj = await self.get(id)
if db_obj is None:
return None
update_data = obj_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
await self.db.flush()
await self.db.refresh(db_obj)
return db_obj
async def delete(self, id: int) -> bool:
db_obj = await self.get(id)
if db_obj is None:
return False
await self.db.delete(db_obj)
return True
# Usage
class UserRepository(BaseRepository[User, UserCreate, UserUpdate]):
pass
async def get_user_repo(db: DbSession) -> UserRepository:
return UserRepository(User, db)
UserRepo = Annotated[UserRepository, Depends(get_user_repo)]
@app.get("/users/{user_id}")
async def get_user(user_id: int, repo: UserRepo):
user = await repo.get(user_id)
if not user:
raise HTTPException(status_code=404)
return user
#Custom Middleware with State
python
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
import time
import prometheus_client
REQUEST_COUNT = prometheus_client.Counter(
"http_requests_total",
"Total HTTP requests",
["method", "path", "status"],
)
REQUEST_DURATION = prometheus_client.Histogram(
"http_request_duration_seconds",
"HTTP request duration in seconds",
["method", "path"],
)
class PrometheusMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
start_time = time.perf_counter()
response = await call_next(request)
duration = time.perf_counter() - start_time
REQUEST_COUNT.labels(
method=request.method,
path=request.url.path,
status=response.status_code,
).inc()
REQUEST_DURATION.labels(
method=request.method,
path=request.url.path,
).observe(duration)
return response
app.add_middleware(PrometheusMiddleware)
#Dependency Override for Testing
FastAPI's DI system enables clean testing without monkey-patching:
python
# app/dependencies.py
async def get_db():
async with SessionLocal() as session:
yield session
async def get_current_user(token: str = Depends(oauth2_scheme)):
return await verify_token(token)
# tests/conftest.py
import pytest
from httpx import ASGITransport, AsyncClient
from app.main import app
from app.dependencies import get_db, get_current_user
async def override_get_db():
async with TestSessionLocal() as session:
yield session
async def override_get_current_user():
return User(id=1, username="testuser", email="test@example.com")
@pytest.fixture
async def client():
app.dependency_overrides[get_db] = override_get_db
app.dependency_overrides[get_current_user] = override_get_current_user
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as ac:
yield ac
app.dependency_overrides.clear()
#Multi-Tenancy Pattern
python
from fastapi import Depends, Header, HTTPException
async def get_tenant(x_tenant_id: str = Header(...)):
tenant = await load_tenant(x_tenant_id)
if not tenant:
raise HTTPException(status_code=403, detail="Invalid tenant")
return tenant
async def get_tenant_db(tenant=Depends(get_tenant)):
engine = get_engine_for_tenant(tenant.db_url)
async with async_sessionmaker(engine)() as session:
yield session
TenantDb = Annotated[AsyncSession, Depends(get_tenant_db)]
@app.get("/data")
async def get_data(db: TenantDb):
# This query runs against the tenant-specific database
return await db.scalars(select(Data)).all()
#API Versioning
python
from fastapi import FastAPI, APIRouter
app = FastAPI()
# Version 1
v1_router = APIRouter(prefix="/api/v1")
@v1_router.get("/items/")
def list_items_v1():
return [{"id": 1, "name": "Widget"}]
# Version 2 with different response shape
v2_router = APIRouter(prefix="/api/v2")
@v2_router.get("/items/")
def list_items_v2():
return {"data": [{"id": 1, "name": "Widget"}], "meta": {"total": 1}}
app.include_router(v1_router, tags=["v1"])
app.include_router(v2_router, tags=["v2"])
📚 Quick Check
What does model_dump(exclude_unset=True) do in a PATCH endpoint?