Type hints drive validation, serialisation, dependency injection and OpenAPI generation — one declaration, four behaviours, no decorators stack to learn.
uvicornDepends()yield for cleanupTestClient, async tests, fixturesFastAPI is a Python web framework created by Sebastián Ramírez (tiangolo) in 2018. It builds on Starlette (ASGI toolkit) and Pydantic v2 (data validation in Rust) to turn type hints into HTTP behaviour.
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class User(BaseModel):
email: str
name: str
age: int | None = None
@app.post('/users', response_model=User, status_code=201)
async def create_user(user: User) -> User:
return user
# → validation, deserialisation, serialisation, OpenAPI,
# Swagger UI, ReDoc — all from the type hints alone.
Depends() with sub-deps and yield-cleanuphttpx-based)| Layer | Component |
|---|---|
| App | FastAPI — routes, DI, OpenAPI |
| HTTP | Starlette — ASGI app, middleware, WebSocket |
| Validation | Pydantic v2 — pydantic-core (Rust) |
| Server | Uvicorn / Hypercorn / Granian (ASGI) |
| Process mgr | Gunicorn (with uvicorn workers) / systemd |
Not a full-stack framework. No ORM (you bring SQLAlchemy / SQLModel / Tortoise). No template engine (you bring Jinja2). No admin / auth UI (you bring your own). It's a web API framework.
Three claims, each backed by something real: type-driven design, performance close to Node / Go, developer ergonomics from one declaration doing four jobs.
@app.get('/items/{item_id}')
def read_item(
item_id: int, # path — coerced & validated
q: str | None = None, # query — optional
skip: int = 0, limit: int = 100, # query — defaults
user: User = Depends(current_user), # injected
):
...
# → routing, type coercion, validation, OpenAPI param entries,
# DI, automatic error responses with location info — one signature.
Depends() is great, but unusual if you've come from Spring / DI containersFastAPI ships as a regular Python package. The recommended install pulls in uvicorn (ASGI server), httpx (test client), and a curated set of optional extras — one command and you're live.
# the recommended bundle — FastAPI + uvicorn + extras
uv add 'fastapi[standard]'
# minimal — just the framework
uv add fastapi
uv add 'uvicorn[standard]'
# pip equivalent
pip install 'fastapi[standard]'
# version
uv run python -c 'import fastapi; print(fastapi.__version__)'
# app/main.py
from fastapi import FastAPI
app = FastAPI(title='Hello API', version='0.1.0')
@app.get('/healthz')
def health() -> dict:
return {'ok': True}
@app.get('/echo/{msg}')
async def echo(msg: str) -> dict:
return {'msg': msg}
# dev: hot reload, single worker
uv run fastapi dev app/main.py
# → http://127.0.0.1:8000
# → http://127.0.0.1:8000/docs (Swagger UI)
# → http://127.0.0.1:8000/redoc (ReDoc)
# prod-style: uvicorn directly
uv run uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
@app.get/.post/.put/.patch/.delete sets method & pathoperationIdresponse_model if not overridden; drives serialisationapp/
├── main.py # creates FastAPI(), includes routers
├── api/
│ ├── deps.py # shared dependencies
│ ├── v1/
│ │ ├── users.py # APIRouter
│ │ └── items.py
│ └── v1/__init__.py
├── core/
│ ├── config.py # pydantic Settings
│ ├── security.py
│ └── logging.py
├── schemas/ # pydantic models (DTOs)
├── models/ # SQLAlchemy ORM models
├── services/ # domain logic
├── db/ # session, migrations
└── tests/
Parameters are classified by their type and where they're declared. FastAPI never guesses — it follows a single rule: Path, Query, Body, Header, Cookie overrides; otherwise scalar → query, model → body.
from fastapi import Query, Path, Header, Cookie, Body
from typing import Annotated
@app.get('/users/{user_id}/posts')
def list_posts(
user_id: Annotated[int, Path(ge=1)],
# query params with constraints
page: Annotated[int, Query(ge=1)] = 1,
size: Annotated[int, Query(ge=1, le=100)] = 20,
q: Annotated[str | None, Query(max_length=64)] = None,
# header
x_request_id: Annotated[str | None, Header()] = None,
# cookie
session: Annotated[str | None, Cookie()] = None,
):
...
from pydantic import BaseModel
class ItemCreate(BaseModel):
name: str
price: float
@app.post('/items')
def create(item: ItemCreate): # whole body
...
@app.post('/items/multi')
def create2(
item: ItemCreate,
user: UserStub,
importance: Annotated[int, Body(ge=1, le=5)] = 1,
):
# body becomes { "item": ..., "user": ..., "importance": ... }
...
from fastapi import status
class ItemRead(BaseModel):
id: int
name: str
price: float
@app.post('/items',
response_model=ItemRead,
status_code=status.HTTP_201_CREATED,
tags=['items'],
summary='Create an item',
response_description='The created item')
def create(item: ItemCreate) -> ItemRead:
return ItemRead(id=42, **item.model_dump())
from fastapi import HTTPException
class Error(BaseModel):
code: str
message: str
@app.get('/items/{id}',
response_model=ItemRead,
responses={
404: {'model': Error, 'description': 'Not found'},
409: {'model': Error, 'description': 'Conflict'},
})
def get(id: int):
if id > 1000:
raise HTTPException(404, detail='not found')
return ItemRead(id=id, name='x', price=1.0)
FastAPI 0.95+ recommends Annotated[T, Query(...)] over = Query(...) default values — lets type checkers see the real type, plays nicely with stricter mypy / pyright settings, and reads better in long signatures.
Pydantic v2 (released 2023) rewrote the validation core in Rust. It is what makes FastAPI fast, strict, and informative — and learning Pydantic is most of learning FastAPI.
from pydantic import BaseModel, Field, EmailStr, HttpUrl
from datetime import datetime
from decimal import Decimal
class User(BaseModel):
id: int
email: EmailStr
name: str = Field(min_length=1, max_length=120)
role: Literal['user', 'admin'] = 'user'
avatar: HttpUrl | None = None
balance: Decimal = Field(decimal_places=2)
created: datetime
metadata: dict[str, str] = {}
# parse / validate
u = User.model_validate(payload) # raises ValidationError on bad input
u = User.model_validate_json(raw_bytes) # parse JSON directly — faster than json.loads + .model_validate
from pydantic import field_validator, model_validator
class Order(BaseModel):
qty: int
unit_price: Decimal
total: Decimal
@field_validator('qty')
@classmethod
def positive(cls, v):
if v <= 0: raise ValueError('qty must be positive')
return v
@model_validator(mode='after')
def check_total(self):
if self.total != self.qty * self.unit_price:
raise ValueError('total mismatch')
return self
model_dumpu.model_dump() # → dict
u.model_dump(mode='json') # JSON-compatible (Decimal/datetime as str)
u.model_dump_json() # → bytes / str
u.model_dump(exclude={'metadata'})
u.model_dump(include={'id', 'email'})
u.model_dump(by_alias=True, exclude_none=True)
# computed fields surface in serialisation
class Box(BaseModel):
w: float; h: float; d: float
@computed_field
def volume(self) -> float: return self.w * self.h * self.d
| Type | Use |
|---|---|
EmailStr | Validated email (needs email-validator) |
HttpUrl · AnyHttpUrl | URL with scheme/host parsing |
UUID4 · UUID7 | UUID with version check |
SecretStr | Hidden in repr / dump |
conlist · conint | Constrained list / int |
Annotated[..., AfterValidator(f)] | Plug a function in |
RootModel[list[X]] | Top-level list / scalar |
Field(discriminator='kind') | Tagged union |
Depends()FastAPI's signature feature: anything that can be a parameter can be a dependency. Dependencies compose, can have their own dependencies, and can yield to clean up — replacing what a DI container does in other ecosystems.
from fastapi import Depends
# any callable can be a dependency
def common_paginate(
page: int = 1,
size: int = 20,
) -> dict:
return {'page': page, 'size': size}
@app.get('/items')
def list_items(
p: Annotated[dict, Depends(common_paginate)],
):
return {'page': p['page'], 'size': p['size']}
# implicit form omitting the function in Depends()
def get_db() -> Session: ...
@app.get('/u/{id}')
def by_id(id: int, db: Annotated[Session, Depends(get_db)]):
...
from sqlalchemy.orm import Session
from app.db import SessionLocal
def get_db():
db = SessionLocal()
try:
yield db # the value passed to the handler
finally:
db.close() # runs after response is sent
# any exception in the handler → the finally still runs.
def current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Annotated[Session, Depends(get_db)],
) -> User:
user = decode_and_load(token, db)
if not user: raise HTTPException(401)
return user
def require_admin(
user: Annotated[User, Depends(current_user)],
) -> User:
if user.role != 'admin': raise HTTPException(403)
return user
@app.delete('/users/{id}')
def remove(id: int,
_admin: Annotated[User, Depends(require_admin)]):
...
def f(x = Depends(dep))APIRouter(dependencies=[Depends(verify_api_key)])FastAPI(dependencies=[...]) for global guardsHTTPException in the dep; never reach the handlerDependencies are cached per request: if get_db is in three deps in the same request, you get one session. To opt out: Depends(get_db, use_cache=False).
FastAPI runs async path operations on the event loop. Sync ones run in a thread pool (anyio). Mixing is fine; getting the boundary right is the difference between fast and stalled.
async def and await normallydef; FastAPI runs it in the threadpoolasync def — one stalled handler stalls every other request on the loopimport httpx
@app.get('/weather/{city}')
async def weather(city: str):
async with httpx.AsyncClient(timeout=5) as c:
r = await c.get(f'https://api.weather/{city}')
r.raise_for_status()
return r.json()
import requests
@app.get('/legacy/{q}')
def legacy(q: str): # def, not async def
# runs in threadpool — main loop unblocked
r = requests.get(f'https://legacy/{q}', timeout=5)
return r.json()
from fastapi.concurrency import run_in_threadpool
from anyio import to_thread
@app.post('/render')
async def render(req: RenderReq):
# CPU-bound or sync-only call from an async handler
img = await run_in_threadpool(make_pdf, req)
# equivalent:
img2 = await to_thread.run_sync(make_pdf, req)
return {'size': len(img)}
# BAD — time.sleep blocks the event loop;
# every request to ANY endpoint stalls for 2s
@app.get('/slow')
async def slow():
time.sleep(2) # blocking!
return {'ok': True}
# GOOD
import asyncio
@app.get('/slow')
async def slow():
await asyncio.sleep(2)
return {'ok': True}
concurrent.futures) for genuinely CPU-bound work — one process per coreawaits the model client; never block the loop on .generate()FastAPI ships security utilities — classes that double as dependencies and OpenAPI Security Schemes. The framework parses the right header / query / cookie; you decide how to verify.
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/auth/token')
def current_user(
token: Annotated[str, Depends(oauth2_scheme)],
) -> User:
try:
payload = jwt.decode(token, settings.JWT_KEY,
algorithms=['HS256'],
audience=settings.AUD,
issuer=settings.ISS)
except JWTError:
raise HTTPException(401, 'invalid token')
return User(**payload['user'])
@app.get('/users/me', response_model=User)
def me(user: Annotated[User, Depends(current_user)]):
return user
from fastapi.security import SecurityScopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl='/auth/token',
scopes={'read': 'Read', 'write': 'Modify'})
def require_scopes(
scopes: SecurityScopes,
token: Annotated[str, Depends(oauth2_scheme)],
):
payload = jwt.decode(token, ...)
have = set(payload.get('scope', '').split())
for s in scopes.scopes:
if s not in have: raise HTTPException(403)
return payload
@app.delete('/items/{id}')
def remove(id: int,
_: Annotated[dict, Security(require_scopes,
scopes=['write'])]):
...
from fastapi.security import APIKeyHeader
api_key = APIKeyHeader(name='X-API-Key', auto_error=False)
def require_api_key(
key: Annotated[str | None, Depends(api_key)],
db: Annotated[Session, Depends(get_db)],
) -> APIKey:
if not key: raise HTTPException(401)
row = db.query(APIKey).filter_by(hash=sha256(key)).first()
if not row or row.revoked: raise HTTPException(401)
return row
OAuth2*, APIKey*, HTTPBearer, HTTPBasicpython-jose, authlib, or hit your IdP's JWKSpasslib[argon2] or argon2-cffi directlyFastAPI turns raised exceptions into HTTP responses. Hook in handlers for your own classes; add middleware for cross-cutting concerns — timing, request IDs, CORS.
from fastapi import HTTPException
from fastapi.responses import JSONResponse
class NotFoundError(Exception):
def __init__(self, what: str): self.what = what
@app.exception_handler(NotFoundError)
async def not_found(_req, exc: NotFoundError):
return JSONResponse(404, content={
'error': {'code': 'not_found',
'message': f'{exc.what} not found'}})
@app.get('/users/{id}')
def get_user(id: int):
user = repo.find(id)
if not user: raise NotFoundError('user')
return user
from fastapi.exceptions import RequestValidationError
@app.exception_handler(RequestValidationError)
async def validation_error(req, exc):
return JSONResponse(422, content={
'error': {
'code': 'validation_failed',
'details': exc.errors(),
'request_id': req.state.request_id,
}
})
from fastapi import Request
from uuid import uuid4
import time
@app.middleware('http')
async def request_id_and_timing(request: Request, call_next):
rid = request.headers.get('x-request-id') or str(uuid4())
request.state.request_id = rid
t0 = time.perf_counter()
try:
response = await call_next(request)
except Exception:
# logged by your structured logger via exception_handler
raise
dur_ms = (time.perf_counter() - t0) * 1000
response.headers['x-request-id'] = rid
response.headers['server-timing'] = f'app;dur={dur_ms:.1f}'
return response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app.add_middleware(CORSMiddleware,
allow_origins=['https://app.example.com'],
allow_credentials=True,
allow_methods=['GET','POST','PUT','PATCH','DELETE'],
allow_headers=['authorization','content-type'])
app.add_middleware(TrustedHostMiddleware,
allowed_hosts=['api.example.com', '*.example.com'])
Middleware added later runs outermost. Put request-ID and timing last in code so they wrap everything — including CORS rejections.
Three patterns for non-classical request/response: BackgroundTasks for fire-and-forget after the response, StreamingResponse for chunked output, and WebSockets for full-duplex.
from fastapi import BackgroundTasks
@app.post('/users')
def create_user(
body: UserCreate,
bg: BackgroundTasks,
mailer: Annotated[Mailer, Depends(get_mailer)],
):
user = users.create(body)
bg.add_task(mailer.send_welcome, user.email)
return user
# → response sent immediately; mailer runs after.
# Good for: emails, webhooks, audit logs, <1s work.
# For real retry / scheduling: use Celery / arq / dramatiq.
from fastapi.responses import StreamingResponse
import json
async def gen_events():
for i in range(10):
yield f'event: tick\ndata: {json.dumps({"i": i})}\n\n'
await asyncio.sleep(1)
@app.get('/stream')
async def stream():
return StreamingResponse(gen_events(),
media_type='text/event-stream',
headers={'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'})
# NDJSON
async def gen_rows():
async for r in db.stream(query):
yield json.dumps(r).encode() + b'\n'
@app.get('/users.ndjson')
async def ndjson():
return StreamingResponse(gen_rows(),
media_type='application/x-ndjson')
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket('/ws')
async def ws(socket: WebSocket):
await socket.accept()
try:
async for msg in socket.iter_text():
await socket.send_text(f'echo: {msg}')
except WebSocketDisconnect:
...
finally:
# cleanup — remove from any pubsub fanout, etc.
...
from fastapi.responses import StreamingResponse
@app.post('/chat')
async def chat(req: ChatReq):
async def tokens():
async for delta in llm.stream(req.messages):
yield f'data: {json.dumps({"delta": delta})}\n\n'
yield 'data: [DONE]\n\n'
return StreamingResponse(tokens(),
media_type='text/event-stream',
headers={'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'})
BackgroundTasks blocks worker shutdown — long jobs go in a queue, not hereX-Accel-Buffering: no → silent clientStreamingResponse + a response_model doesn't work — you're past PydanticFastAPI doesn't bundle an ORM. Three real choices: SQLAlchemy 2.0 async (the default), SQLModel (Pydantic + SQLAlchemy by tiangolo), Tortoise ORM (Django-ish async).
# db.py
from sqlalchemy.ext.asyncio import (
create_async_engine, async_sessionmaker, AsyncSession)
from sqlalchemy.orm import DeclarativeBase
engine = create_async_engine(settings.DATABASE_URL,
pool_size=10, max_overflow=20)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase): ...
# deps.py
async def get_db() -> AsyncIterator[AsyncSession]:
async with SessionLocal() as s:
yield s
# usage
@app.get('/users/{id}', response_model=UserRead)
async def get_user(id: int,
db: Annotated[AsyncSession, Depends(get_db)]):
user = await db.get(User, id)
if not user: raise HTTPException(404)
return user
uv add alembic
uv run alembic init alembic
uv run alembic revision --autogenerate -m 'init'
uv run alembic upgrade head
from sqlmodel import SQLModel, Field
class User(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
email: str = Field(index=True, unique=True)
name: str
# → this is BOTH the SQLAlchemy table and the Pydantic model
# used in request / response bodies.
# pros: less duplication for simple CRUD
# cons: ORM and DTO concerns become tangled in larger apps
UserCreate / UserRead / UserDB; never expose the table directly past the boundaryusers.by_email(db, email) is testable; chained ORM calls in handlers aren'tasync with db.begin(): at the boundary, not deep in servicespool_size per worker × workers < DB max_connectionsDependsasync def handlersTwelve-factor config, but typed. pydantic-settings reads env vars / .env / secrets dirs, validates types and defaults at boot, and surfaces missing values as startup errors, not runtime ones.
from pydantic import Field, SecretStr, AnyHttpUrl, PostgresDsn
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file='.env', env_prefix='APP_',
env_nested_delimiter='__', case_sensitive=False)
env: Literal['dev','staging','prod'] = 'dev'
debug: bool = False
database_url: PostgresDsn
redis_url: str
jwt_key: SecretStr
cors_origins: list[AnyHttpUrl] = []
log_level: Literal['DEBUG','INFO','WARNING','ERROR'] = 'INFO'
# nested groups via env_nested_delimiter
smtp__host: str | None = None
smtp__port: int = 587
smtp__user: str | None = None
smtp__pass: SecretStr | None = None
settings = Settings()
from functools import lru_cache
@lru_cache
def get_settings() -> Settings:
return Settings()
@app.get('/config')
def cfg(s: Annotated[Settings, Depends(get_settings)]):
return {'env': s.env, 'log_level': s.log_level}
# .env.dev / .env.staging / .env.prod
ENV_FILE=.env.prod uv run uvicorn app.main:app
# in a Settings model:
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=os.getenv('ENV_FILE', '.env'))
SecretStr hides secrets in repr / logs / model_dump.env for local dev, real env vars for k8s — same code pathapp.dependency_overrides[get_settings] = lambda: TestSettings()Settings() at import time at module top — it makes tests harder; cache via get_settings()SecretStr, custom __str__ in error paths can leakSettingsFastAPI ships a TestClient built on httpx. The interesting feature is app.dependency_overrides — swap any Depends dependency in tests for a stub, with no patching.
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
def test_health():
r = client.get('/healthz')
assert r.status_code == 200
assert r.json() == {'ok': True}
def test_create_user():
r = client.post('/users', json={'email': 'a@x.io', 'name': 'A'})
assert r.status_code == 201
assert r.json()['email'] == 'a@x.io'
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app
@pytest.mark.asyncio
async def test_async():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport,
base_url='http://test') as c:
r = await c.get('/healthz')
assert r.status_code == 200
def fake_db():
return InMemoryDB()
def fake_user():
return User(id=1, email='t@t', role='admin')
app.dependency_overrides[get_db] = fake_db
app.dependency_overrides[current_user] = fake_user
# now every endpoint sees the fakes — no monkeypatch, no patch.object
client = TestClient(app)
r = client.get('/users/me')
assert r.status_code == 200
# clean up between tests
app.dependency_overrides.clear()
@pytest.fixture
def client(app_with_overrides):
return TestClient(app_with_overrides)
@pytest.fixture
def app_with_overrides():
app.dependency_overrides[get_db] = fake_db
yield app
app.dependency_overrides.clear()
current_user; don't mint real tokens in unit testsrespx (httpx) or pytest-httpserverOpenAPI generation is the point of FastAPI — not a side feature. The schema you serve at /openapi.json is the same schema Swagger UI / ReDoc render, the same one you ship to clients, the same one CI validates against.
Field(examples=[...])@app.post('/items',
tags=['items'],
summary='Create an item',
description='Accepts a JSON body...',
response_description='The created item',
status_code=201,
deprecated=False)
def create(item: ItemCreate): ...
# group tags
app = FastAPI(openapi_tags=[
{'name': 'items', 'description': 'Catalogue items'},
{'name': 'users', 'description': 'Account ops'},
])
from fastapi.openapi.utils import get_openapi
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
schema = get_openapi(title='My API', version='1.2.3',
routes=app.routes)
schema['info']['x-logo'] = {'url': 'https://example/logo.png'}
schema['servers'] = [{'url': 'https://api.example.com'}]
app.openapi_schema = schema
return schema
app.openapi = custom_openapi
# export the schema at build time
uv run python -c \
'from app.main import app; import json; \
print(json.dumps(app.openapi()))' \
> openapi.json
# typescript types for the frontend
npx openapi-typescript openapi.json -o api.ts
# python client
uv run datamodel-code-generator \
--input openapi.json --output client.py
include_in_schema=False on the operation or routerresponse_model_exclude_none=True for cleaner output schemasFastAPI(docs_url=None)Where to open a DB pool, warm a model, connect to Redis, register Prometheus collectors. The modern pattern is asynccontextmanager + FastAPI(lifespan=...).
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
# —— startup ——
app.state.db = create_engine_pool()
app.state.redis = await aioredis.from_url(settings.REDIS_URL)
app.state.model = load_model('llama-3-8b')
log.info('startup complete')
yield # app runs
# —— shutdown ——
await app.state.redis.close()
await app.state.db.dispose()
log.info('shutdown complete')
app = FastAPI(lifespan=lifespan)
def get_redis(req: Request):
return req.app.state.redis
@app.get('/cache/{k}')
async def cache(k: str,
r: Annotated[Redis, Depends(get_redis)]):
return {'value': await r.get(k)}
@app.get('/healthz') # liveness: process alive
def healthz(): return {'ok': True}
@app.get('/readyz') # readiness: deps OK + not draining
async def readyz(req: Request):
if app.state.draining:
return JSONResponse(503, {'ok': False})
try:
async with req.app.state.db.connect() as c:
await c.execute(text('select 1'))
await req.app.state.redis.ping()
except Exception:
return JSONResponse(503, {'ok': False})
return {'ok': True}
app.state.draining = True in a SIGTERM handler (or in lifespan exit) so /readyz goes 503 first — LB stops sending trafficterminationGracePeriodSeconds > longest_request; preStop hook of sleep 5 avoids the LB raceThe old @app.on_event('startup') / 'shutdown' decorators still work but are deprecated. Use lifespan; it integrates with Starlette and is what the docs recommend.
FastAPI is rarely the bottleneck. Wins live in workers vs concurrency, avoiding loop blocking, payload size & serialisation, and downstream parallelism.
# single process, multi-coroutine — great for I/O
uvicorn app.main:app --host 0.0.0.0 --port 8000
# multi-process — one event loop per worker
uvicorn app.main:app --workers 4
# production: gunicorn manages uvicorn workers
gunicorn app.main:app \
-k uvicorn.workers.UvicornWorker \
-w 4 -b 0.0.0.0:8000 \
--timeout 60 --graceful-timeout 30 \
--max-requests 10000 --max-requests-jitter 1000
Rule of thumb: workers ≈ 2 × CPU. In K8s prefer one worker per pod — the orchestrator scales pods.
--http httptools is usually plentytime.sleep, requests, blocking open() in async defasyncio.sleep, httpx.AsyncClient, aiofilesawait run_in_threadpool(fn, ...)anyio threadpool: BackgroundExecutor(max_workers=...)orjson via ORJSONResponse for > 2× on big listsresponse_model_exclude_unset=True — trims default fields from the wirefrom fastapi.responses import ORJSONResponse
app = FastAPI(default_response_class=ORJSONResponse)
py-spy record against the prod-like image — intuition is wrong half the timeThe shape that ships: multi-stage Dockerfile, non-root user, distroless or slim base, healthchecks wired to /healthz & /readyz.
ARG PY=3.12
FROM python:${PY}-slim-bookworm AS base
ENV PYTHONDONTWRITEBYTECODE=1 PYTHONUNBUFFERED=1
# —— deps stage ——
FROM base AS deps
RUN pip install --no-cache-dir uv
WORKDIR /app
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev
# —— runtime stage ——
FROM base AS runtime
RUN useradd -r -u 10001 app
WORKDIR /app
COPY --from=deps /app/.venv /app/.venv
COPY app/ ./app/
USER app
ENV PATH=/app/.venv/bin:$PATH
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=3s \
CMD python -c "import urllib.request,sys; \
sys.exit(0 if urllib.request.urlopen('http://localhost:8000/healthz').status==200 else 1)"
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
spec:
template:
spec:
terminationGracePeriodSeconds: 60
containers:
- name: api
image: ghcr.io/me/api:1.2.3
ports: [{containerPort: 8000}]
livenessProbe:
httpGet: {path: /healthz, port: 8000}
periodSeconds: 10
readinessProbe:
httpGet: {path: /readyz, port: 8000}
periodSeconds: 5
failureThreshold: 3
resources:
requests: {cpu: "200m", memory: "256Mi"}
limits: {cpu: "1", memory: "512Mi"}
lifecycle:
preStop:
exec: {command: ["sleep", "5"]}
| Platform | Notes |
|---|---|
| K8s (EKS / GKE / AKS) | One worker per pod; HPA |
| Cloud Run / Lambda + Mangum | Stateless; cold starts; no WebSockets on Lambda |
| Render / Fly / Railway | Easy buttons; great for staging |
| VPS + nginx + systemd | Cheap, predictable; gunicorn + uvicorn |
root--reload/docs public if the API is non-publicThree pillars, same as everywhere: structured logs, metrics, distributed traces. The minimum competent setup is structlog + prometheus-fastapi-instrumentator + OpenTelemetry.
import structlog, logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt='iso'),
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.stdlib.BoundLogger,
)
log = structlog.get_logger()
@app.middleware('http')
async def log_request(req, call_next):
rid = req.headers.get('x-request-id') or str(uuid4())
structlog.contextvars.bind_contextvars(request_id=rid)
t0 = time.perf_counter()
resp = await call_next(req)
log.info('http_request',
method=req.method, path=req.url.path,
status=resp.status_code,
dur_ms=round((time.perf_counter()-t0)*1000, 1))
return resp
from prometheus_fastapi_instrumentator import Instrumentator
Instrumentator().instrument(app).expose(app, endpoint='/metrics')
# → per-route RED metrics + python_gc + process_cpu
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter \
import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi \
import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
provider = TracerProvider(resource=Resource.create(
{'service.name': 'api', 'service.version': '1.2.3'}))
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)
FastAPIInstrumentor.instrument_app(app)
authorization, cookie, x-api-key in logsrequest_id via contextvars — appears on every lineFastAPI is a thin layer; security is mostly composing the right middleware, validating at the boundary, and not trusting input. Six controls cover the OWASP Top 10 for an API.
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from starlette.middleware.gzip import GZipMiddleware
app.add_middleware(HTTPSRedirectMiddleware)
app.add_middleware(TrustedHostMiddleware,
allowed_hosts=['api.example.com','*.example.com'])
app.add_middleware(CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=True,
allow_methods=['GET','POST','PUT','PATCH','DELETE'],
allow_headers=['authorization','content-type'])
app.add_middleware(GZipMiddleware, minimum_size=1024)
@app.middleware('http')
async def security_headers(req, call_next):
resp = await call_next(req)
resp.headers['Strict-Transport-Security'] = \
'max-age=31536000; includeSubDomains; preload'
resp.headers['X-Content-Type-Options'] = 'nosniff'
resp.headers['Referrer-Policy'] = 'no-referrer'
resp.headers['X-Frame-Options'] = 'DENY'
resp.headers['Content-Security-Policy'] = \
"default-src 'none'; frame-ancestors 'none'"
return resp
from starlette.middleware.base import BaseHTTPMiddleware
class BodySizeLimit(BaseHTTPMiddleware):
def __init__(self, app, max_bytes: int):
super().__init__(app); self.max = max_bytes
async def dispatch(self, req, call_next):
cl = int(req.headers.get('content-length', 0))
if cl > self.max:
return JSONResponse(413, {'error': 'too large'})
return await call_next(req)
app.add_middleware(BodySizeLimit, max_bytes=128 * 1024)
# rate limiting: slowapi (limits + redis)
from slowapi import Limiter
limiter = Limiter(key_func=lambda r: r.client.host,
storage_uri=settings.redis_url)
app.state.limiter = limiter
@limiter.limit('5/minute')
@app.post('/auth/login')
async def login(req: Request, body: Login): ...
EmailStr, HttpUrl, length / rangepasslib[argon2]iss / aud / exp / nbfsecrets.compare_digestX-Forwarded-For without configuring trust at the proxy'*' — nothing is more permanent than a temporary fixThe Python web stack splits along two axes: async / sync and batteries / minimalism. Pick by which axis matters more.
| Criterion | FastAPI | Litestar | Starlette | Flask | Django REST |
|---|---|---|---|---|---|
| Style | Type hints + DI | Type hints + DI | ASGI toolkit | Decorators + context | Class-based views |
| Validation | Pydantic v2 | msgspec / Pydantic / attrs | None (BYO) | None (BYO) | DRF serializers |
| Speed | Fast | Faster (msgspec) | Fast (raw) | Sync, slower | Sync, slowest |
| Async | Native | Native | Native | Limited | Limited (3.x) |
| OpenAPI | Built-in | Built-in | BYO | BYO (Spectree, etc.) | drf-spectacular |
| Batteries | Few | Few | None | Few | Many (admin/ORM) |
| Maturity | Mature (2018) | Active (2023+) | Mature | Mature | Very mature |
| Best for | JSON APIs, ML servers | Throughput, stricter typing | Building frameworks | Tiny apps, prototypes | Server-rendered + admin |
The recurring patterns: repository functions over the session, DTOs separate from ORM, an outbox for "DB write + queue", dependency overrides for tests and feature flags.
# schemas/users.py — DTOs
class UserCreate(BaseModel):
email: EmailStr; name: str
class UserRead(BaseModel):
id: int; email: EmailStr; name: str
# repositories/users.py
async def create(db, data: UserCreate) -> UserRead:
row = User(**data.model_dump())
db.add(row); await db.commit(); await db.refresh(row)
return UserRead.model_validate(row, from_attributes=True)
# services/users.py
async def signup(db, queue, data: UserCreate) -> UserRead:
if await users_repo.by_email(db, data.email):
raise HTTPException(409)
user = await users_repo.create(db, data)
await queue.enqueue('emails.welcome', user.id)
return user
# api/v1/users.py
@router.post('/', response_model=UserRead, status_code=201)
async def signup_endpoint(
body: UserCreate,
db: Annotated[AsyncSession, Depends(get_db)],
q: Annotated[Queue, Depends(get_queue)],
):
return await users_service.signup(db, q, body)
async def signup(db, data):
async with db.begin():
user = await users_repo.create(db, data)
await db.execute(insert(Outbox).values(
kind='emails.welcome',
payload={'user_id': user.id}))
# a relay process polls outbox and pushes to the queue
return user
from fastapi import APIRouter
v1 = APIRouter(prefix='/v1')
v1.include_router(users_router, prefix='/users', tags=['v1.users'])
v1.include_router(items_router, prefix='/items', tags=['v1.items'])
v2 = APIRouter(prefix='/v2')
v2.include_router(users_router_v2, prefix='/users', tags=['v2.users'])
app.include_router(v1)
app.include_router(v2)
def feature_x_enabled() -> bool:
return get_settings().flags.feature_x
@app.get('/x')
def x(enabled: Annotated[bool, Depends(feature_x_enabled)]):
if not enabled: raise HTTPException(404)
...
# tests can flip the flag with dependency_overrides[feature_x_enabled]
Depends() is small enough to hold in your head, big enough to compose policiesAnnotated[T, Query/Path/Body(...)] for type-checker-friendly signaturesDepends() chain per concern: DB, current user, scope guardon_eventUserCreate / UserRead / UserDBapp.dependency_overrides in tests, not monkeypatch/healthz + /readyz wiredTestClient + dependency_overridesstructlog + Prometheus + OTel; ship dashboardsopenapi.json