FastAPI 是一个 Web 框架,用于在 Python 3.8+ 中构建基于 HTTP 的服务 API。
介绍
- FastAPI 使用 Pydantic 和类型提示来验证、序列化和反序列化数据
- FastAPI 还会自动为使用它构建的 API 生成 OpenAPI 文档
- FastAPI(以及其底层的 Starlette 和 Uvicorn)是基于 Python 的 asyncio 事件循环(Event Loop)构建的。这个模型的核心思想是“单线程协作式多任务”。它非常适合处理 I/O 密集型任务(如等待数据库响应、调用外部 API、读写文件等)
- 缺点:CPU 密集任务(如复杂的数学计算、图像处理、数据压缩、循环加密等)不会产生 I/O 等待。它会持续占用 CPU 进行计算。如果你在一个 async def 路由函数中直接执行这样的任务,它会霸占整个事件循环。导致整个 FastAPI 应用将停止响应。
安装
pip install "fastapi[standard]"
使用
from typing import Union
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
is_offer: Union[bool, None] = None
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}
@app.put("/items/{item_id}")
def update_item(item_id: int, item: Item):
return {"item_name": item.name, "item_id": item_id}
- 启动
- http://127.0.0.1:8000
- docs: http://127.0.0.1:8000/docs
- openapi json: http://127.0.0.1:8000/openapi.json
- ReDoc: http://127.0.0.1:8000/redoc
# fastapi dev 可以在代码变化时自动加载
fastapi dev demo1.py
路径参数
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def read_item(item_id):
# async def read_item(item_id: int):
return {"item_id": item_id}
from enum import Enum
from fastapi import FastAPI
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
app = FastAPI()
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
if model_name is ModelName.alexnet:
return {"model_name": model_name, "message": "Deep Learning FTW!"}
if model_name.value == "lenet":
return {"model_name": model_name, "message": "LeCNN all the images"}
return {"model_name": model_name, "message": "Have some residuals"}
声明路径参数
from enum import Enum
from fastapi import FastAPI
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
app = FastAPI()
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
if model_name is ModelName.alexnet:
return {"model_name": model_name, "message": "Deep Learning FTW!"}
if model_name.value == "lenet":
return {"model_name": model_name, "message": "LeCNN all the images"}
return {"model_name": model_name, "message": "Have some residuals"}
路径转换器
from fastapi import FastAPI
app = FastAPI()
@app.get("/files/{file_path:path}")
async def read_file(file_path: str):
return {"file_path": file_path}
查询参数
from fastapi import FastAPI
app = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
# async def read_item(item_id: str, q: str | None = None): # 可选参数
# @app.get("/items/{item_id}")
# async def read_item(item_id: str, q: str | None = None, short: bool = False):
return fake_items_db[skip : skip + limit]
- http://127.0.0.1:8000/items/?skip=0&limit=10
必选查询参数
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def read_user_item(item_id: str, needy: str):
item = {"item_id": item_id, "needy": needy}
return item
请求体
from fastapi import FastAPI
from pydantic import BaseModel
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
app = FastAPI()
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item, q: str | None = None):
result = {"item_id": item_id, **item.dict()}
if q:
result.update({"q": q})
return result
查询参数和字符串校验
from typing import Union
from fastapi import FastAPI, Query
app = FastAPI()
@app.get("/items/")
async def read_items(q: Union[str, None] = Query(default=None, max_length=50)):
# async def read_items(
# q: Union[str, None] = Query(default=None, min_length=3, max_length=50),
# ):
# 添加正则表达式
# async def read_items(
# q: Union[str, None] = Query(
# default=None, min_length=3, max_length=50, pattern="^fixedquery$"
# ),
# ):
# 默认值
# async def read_items(q: str = Query(default="fixedquery", min_length=3)):
results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
if q:
results.update({"q": q})
return results
路径参数和数值校验
from typing import Annotated
from fastapi import FastAPI, Path, Query
app = FastAPI()
@app.get("/items/{item_id}")
async def read_items(
item_id: Annotated[int, Path(title="The ID of the item to get")],
q: Annotated[str | None, Query(alias="item-query")] = None,
):
# async def read_items(
# *, item_id: int = Path(title="The ID of the item to get", ge=1), q: str
# ):
results = {"item_id": item_id}
if q:
results.update({"q": q})
return results
使用 Pydantic 模型的查询参数
from typing import Annotated, Literal
from fastapi import FastAPI, Query
from pydantic import BaseModel, Field
app = FastAPI()
class FilterParams(BaseModel):
limit: int = Field(100, gt=0, le=100)
offset: int = Field(0, ge=0)
order_by: Literal["created_at", "updated_at"] = "created_at"
tags: list[str] = []
@app.get("/items/")
async def read_items(filter_query: Annotated[FilterParams, Query()]):
return filter_query
混合使用 Path、Query 和请求体参数
from typing import Annotated
from fastapi import FastAPI, Path
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
@app.put("/items/{item_id}")
async def update_item(
item_id: Annotated[int, Path(title="The ID of the item to get", ge=0, le=1000)],
q: str | None = None,
item: Item | None = None,
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
if item:
results.update({"item": item})
return results
请求体 - 字段
from typing import Annotated
from fastapi import Body, FastAPI
from pydantic import BaseModel, Field
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = Field(
default=None, title="The description of the item", max_length=300
)
price: float = Field(gt=0, description="The price must be greater than zero")
tax: float | None = None
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Annotated[Item, Body(embed=True)]):
results = {"item_id": item_id, "item": item}
return results
请求体 - 嵌套模型
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
tags: list = []
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
results = {"item_id": item_id, "item": item}
return results
Cookie 参数
from typing import Annotated
from fastapi import Cookie, FastAPI
app = FastAPI()
@app.get("/items/")
async def read_items(ads_id: Annotated[str | None, Cookie()] = None):
return {"ads_id": ads_id}
Cookie 参数模型
from typing import Annotated
from fastapi import Cookie, FastAPI
from pydantic import BaseModel
app = FastAPI()
class Cookies(BaseModel):
session_id: str
fatebook_tracker: str | None = None
googall_tracker: str | None = None
@app.get("/items/")
async def read_items(cookies: Annotated[Cookies, Cookie()]):
return cookies
from typing import Annotated
from fastapi import FastAPI, Header
app = FastAPI()
@app.get("/items/")
async def read_items(user_agent: Annotated[str | None, Header()] = None):
return {"User-Agent": user_agent}
from typing import Annotated
from fastapi import FastAPI, Header
from pydantic import BaseModel
app = FastAPI()
class CommonHeaders(BaseModel):
host: str
save_data: bool
if_modified_since: str | None = None
traceparent: str | None = None
x_tag: list[str] = []
@app.get("/items/")
async def read_items(headers: Annotated[CommonHeaders, Header()]):
return headers
请求体
使用 request.json()
或 request.body()
方法来获取数据
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/items_direct/")
async def create_item_direct(request: Request):
# 异步地从请求体中读取 JSON
body = await request.json()
return {"message": "Directly read from request body", "data": body}
响应模型
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
tags: list[str] = []
@app.post("/items/", response_model=Item)
async def create_item(item: Item) -> Any:
return item
@app.get("/items/", response_model=list[Item])
async def read_items() -> Any:
return [
{"name": "Portal Gun", "price": 42.0},
{"name": "Plumbus", "price": 32.0},
]
响应模型
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
tags: list[str] = []
@app.post("/items/", response_model=Item)
async def create_item(item: Item) -> Any:
return item
@app.get("/items/", response_model=list[Item])
async def read_items() -> Any:
return [
{"name": "Portal Gun", "price": 42.0},
{"name": "Plumbus", "price": 32.0},
]
响应状态码
from fastapi import FastAPI
app = FastAPI()
@app.post("/items/", status_code=201)
async def create_item(name: str):
return {"name": name}
表单数据
from fastapi import FastAPI, Form
app = FastAPI()
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
return {"username": username}
表单模型
from typing import Annotated
from fastapi import FastAPI, Form
from pydantic import BaseModel
app = FastAPI()
class FormData(BaseModel):
username: str
password: str
@app.post("/login/")
async def login(data: Annotated[FormData, Form()]):
return data
Access Log 配置
默认的 Access Log 没有时间,配置可如下(源码参考 1、源码参考 2、issue 参考)
from uvicorn.config import LOGGING_CONFIG
log_config = LOGGING_CONFIG
log_config["formatters"]["access"]["fmt"] = "%(asctime)s - %(levelname)s - %(message)s"
log_config["formatters"]["default"]["fmt"] = "%(asctime)s - %(levelname)s - %(message)s"
# LOGGING_CONFIG["formatters"]["access"]["fmt"] = '%(asctime)s - %(levelprefix)s %(client_addr)s - "%(request_line)s" %(status_code)s'
uvicorn.run(app, log_config=log_config)
过滤日志(参考):
class EndpointFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return record.getMessage().find("/endpoint") == -1
# Filter out /endpoint
logging.getLogger("uvicorn.access").addFilter(EndpointFilter())
生命周期 lifespan
- lifespan
- 参考
- 代码在应用开始接收请求之前执行,也会在处理可能的若干请求之后执行,它覆盖了整个应用程序的生命周期
- 替代
faseapi 'on_event' is deprecated, use 'lifespan' event handlers instead.
from contextlib import asynccontextmanager
from fastapi import FastAPI
def fake_answer_to_everything_ml_model(x: float):
return x * 42
ml_models = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Load the ML model
ml_models["answer_to_everything"] = fake_answer_to_everything_ml_model
yield
# Clean up the ML models and release the resources
ml_models.clear()
app = FastAPI(lifespan=lifespan)
@app.get("/predict")
async def predict(x: float):
result = ml_models["answer_to_everything"](x)
return {"result": result}
超时
from fastapi import FastAPI
from fastapi.middleware.timeout import TimeoutMiddleware
from datetime import timedelta
app = FastAPI()
# 设置应用程序的默认超时时间为5秒
app.add_middleware(TimeoutMiddleware, timeout=timedelta(seconds=5))
@app.get("/")
async def read_root():
return {"message": "Hello, World!"}
from fastapi import FastAPI
from datetime import timedelta
app = FastAPI()
@app.get("/fast_endpoint", timeout=timedelta(seconds=2))
async def fast_endpoint():
return {"message": "This is a fast endpoint."}
定时任务
asyncio 定时任务
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
import httpx
# This is the background task function, unchanged from the previous example.
async def periodic_ping():
while True:
try:
async with httpx.AsyncClient() as client:
response = await client.get("https://google.com")
response.raise_for_status()
print(f"Successfully pinged google.com with status code: {response.status_code}")
except httpx.HTTPStatusError as e:
print(f"HTTP error occurred while pinging google.com: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
await asyncio.sleep(600)
# Define the lifespan context manager.
# It handles startup and shutdown logic.
@asynccontextmanager
async def lifespan(app: FastAPI):
# This block of code runs on application startup.
print("Starting background task for periodic ping...")
task = asyncio.create_task(periodic_ping())
# The 'yield' statement is where the application runs and serves requests.
yield
# This block of code runs on application shutdown.
print("Shutting down background task...")
task.cancel() # Gracefully cancel the task.
try:
await task # Await the task to ensure it's fully cancelled.
except asyncio.CancelledError:
print("Background task successfully shut down.")
# Pass the lifespan context manager to the FastAPI instance.
app = FastAPI(lifespan=lifespan)
# Define a simple endpoint to verify the application is running.
@app.get("/")
def read_root():
return {"message": "FastAPI is running. The background task is active."}
APScheduler 定时任务
pip install APScheduler
import uvicorn
from contextlib import asynccontextmanager
from datetime import datetime
from fastapi import FastAPI
# 1. 导入正确的调度器:AsyncIOScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler
# 实例化正确的调度器
scheduler = AsyncIOScheduler()
def tick():
"""一个简单的任务,打印当前时间。"""
print(f'Tick! The time is: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理器:在应用启动时启动调度器,在关闭时停止它。
"""
print("应用启动...启动调度器。")
# 添加任务并启动调度器
scheduler.add_job(tick, 'interval', seconds=3, id='tick_job')
scheduler.start()
try:
# 2. 使用 yield 将控制权交给应用
yield
finally:
# 3. 应用关闭时,执行清理操作
print("应用关闭...关闭调度器。")
scheduler.shutdown()
# 将 lifespan 应用到 FastAPI 实例
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def read_root():
"""根路径,用于检查应用是否正在运行。"""
return {"message": "FastAPI is running. The background task is active."}
# 如果你将此文件保存为 main.py,可以使用下面的命令来运行它:
# uvicorn main:app --reload
celery 定时任务
pip install "fastapi[all]" celery "redis[hiredis]"
from celery import Celery
from celery.schedules import crontab
from datetime import datetime
import os
# 使用 Redis 作为消息代理
# 如果 Redis 在另一台主机或使用了密码,请相应修改
# 格式: redis://:password@hostname:port/db_number
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# 1. 创建 Celery 实例
# 第一个参数是当前模块的名称,第二个参数是消息代理的 URL
celery_app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
# 2. 定义一个任务
@celery_app.task
def tick():
"""
一个简单的任务,打印当前时间。
这个函数现在是一个 Celery Task。
"""
print(f'Tick! The time is: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}')
# 3. 配置 Celery Beat 定时任务
# 这是替代 apscheduler.add_job 的方式
celery_app.conf.beat_schedule = {
# 任务名称,可以自定义
'add-every-3-seconds': {
# 'celery_worker.tick' 指向我们上面定义的 tick 任务
'task': 'celery_worker.tick',
# 每 3 秒执行一次
'schedule': 3.0,
# 可以传递参数给任务(本例中不需要)
# 'args': (16, 16)
},
}
# 可选配置
celery_app.conf.timezone = 'UTC'
# main.py
import uvicorn
from fastapi import FastAPI
# FastAPI 应用本身不再需要感知后台任务的实现细节
app = FastAPI()
@app.get("/")
async def read_root():
"""
根路径,用于检查应用是否正在运行。
后台任务由独立的 Celery 进程管理。
"""
return {"message": "FastAPI is running. Celery background tasks are managed separately."}
# 如果你想从 FastAPI 触发一个一次性任务(非定时),可以这样做:
# from celery_worker import tick
#
# @app.post("/trigger-task")
# async def trigger_tick_task():
# """一个手动触发 tick 任务的端点。"""
# # .delay() 是执行任务的快捷方式
# tick.delay()
# return {"message": "Tick task triggered!"}
# 运行命令: uvicorn main:app --reload
docker run -d -p 6379:6379 --name my-redis redis
# -A 指定 Celery 实例的位置
# celery -A <模块名>.<Celery实例名> worker --loglevel=info
celery -A celery_worker.celery_app worker --loglevel=info
# Beat 进程负责发送定时任务
celery -A celery_worker.celery_app beat --loglevel=info
uvicorn main:app --reload
服务运行
Uvicorn 是一个 ASGI (Asynchronous Server Gateway Interface) 服务器,它本身是异步的。这意味着即使只启动一个 worker 进程,它也能通过一个事件循环来并发地处理多个连接和请求。
Uvicorn 的并发能力主要体现在以下几个方面,而不是通过传统的多线程模型来实现:
- 异步 I/O (Asynchronous I/O):Uvicorn 基于
asyncio
或 uvloop
,能够高效地处理大量的 I/O 密集型任务,如网络请求、数据库查询、文件读写等。当一个请求在等待 I/O 操作完成时,事件循环会切换去处理其他请求,而不是阻塞等待。
- 多进程 (Workers):这是 Uvicorn 处理 CPU 密集型任务和充分利用多核 CPU 的主要方式。通过启动多个 worker 进程,每个进程都有自己的事件循环,可以独立地处理请求。这才是 Uvicorn 官方推荐的生产环境部署方式。
uvicorn main:app --workers 4
# 在生产环境中,常见的部署方案还有使用 Gunicorn 这样的进程管理器来管理 Uvicorn worker。例如:
gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
实战
类似于 httpbin 的接口
# pip install "fastapi[all]" uvicorn
# uvicorn main:app --reload
# curl http://127.0.0.1:8000/any/path/you/want
# curl -X POST http://127.0.0.1:8000/data/submit -H "Content-Type: application/json" -d '{"id": 1, "name": "test"}'
# curl -X PUT http://127.0.0.1:8000/items/123
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.api_route("/{path_name:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
async def catch_all(request: Request, path_name: str):
"""
一个通用的端点,可以接收所有HTTP方法和任意路径的请求,
并返回请求的详细信息。
"""
method = request.method
url = str(request.url)
headers = dict(request.headers)
# 尝试获取请求体
body = None
if method not in ["GET", "HEAD", "OPTIONS"]:
try:
body = await request.json()
except:
body = (await request.body()).decode("utf-8")
response_data = {
"method": method,
"url": url,
"path_name": path_name,
"headers": headers,
"body": body,
}
return JSONResponse(content=response_data)
卸载部分 CPU 任务
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI
# -------------------- 1. 初始化 --------------------
# 创建 FastAPI 应用实例
app = FastAPI()
# 创建一个进程池执行器。
# 把它放在全局范围,这样它就可以在整个应用的生命周期内被复用。
# 如果不指定 max_workers,它会默认为你机器的 CPU 核心数。
process_pool = ProcessPoolExecutor()
# -------------------- 2. CPU 密集型任务 --------------------
def cpu_intensive_task(iterations: int) -> float:
"""
一个模拟 CPU 密集任务的函数,它进行大量计算。
返回执行所花费的时间。
"""
print(f"开始执行 CPU 密集任务...")
start_time = time.time()
total = 0
for i in range(iterations):
total += i
end_time = time.time()
duration = end_time - start_time
print(f"CPU 密集任务完成,耗时: {duration:.4f} 秒")
return duration
# -------------------- 3. API 端点 --------------------
@app.get("/")
async def root():
"""一个简单的端点,用于测试服务器的响应性。"""
return {"message": "服务器正常响应中..."}
@app.get("/block")
async def blocking_endpoint():
"""
【错误示例】
直接在 async 函数中运行 CPU 密集任务。
这将阻塞整个事件循环。
"""
print("进入阻塞端点 /block")
# 直接调用,会阻塞一切
duration = cpu_intensive_task(70_000_000)
print("离开阻塞端点 /block")
return {"message": f"任务已完成 (阻塞模式),耗时: {duration:.4f} 秒"}
@app.get("/non-block")
async def non_blocking_endpoint():
"""
【正确示例】
使用进程池来执行 CPU 密集任务。
这不会阻塞事件循环。
"""
print("进入非阻塞端点 /non-block")
# 获取当前的 asyncio 事件循环
loop = asyncio.get_running_loop()
# 使用 loop.run_in_executor 将阻塞函数提交给执行器
# 第一个参数是执行器实例 (我们这里是 process_pool),
# 第二个参数是你要调用的函数,
# 后面是传递给该函数的参数。
# `await` 会等待任务在另一个进程中完成并返回结果。
# 但在等待期间,事件循环是自由的,可以处理其他请求。
duration = await loop.run_in_executor(
process_pool, cpu_intensive_task, 70_000_000
)
print("离开非阻塞端点 /non-block")
return {"message": f"任务已提交 (非阻塞模式),并在后台完成,耗时: {duration:.4f} 秒"}
# -------------------- 4. 运行应用 (可选) --------------------
# 如果你直接运行 `python main.py`,可以使用这个部分
if __name__ == "__main__":
import uvicorn
# 建议通过命令行 `uvicorn main:app --workers 1` 启动
uvicorn.run(app, host="0.0.0.0", port=8000)
示例创建了三个 API 端点:
/
: 一个健康的根端点,用来测试服务器是否在阻塞期间仍能响应。
/block
: 错误的方式。直接在 async
函数中调用 CPU 密集任务,这将阻塞整个服务器。
/non-block
: 正确的方式。使用 ProcessPoolExecutor
将任务提交到另一个进程执行,不会阻塞服务器。
对比说明:
方法 |
实现方式 |
效果 |
适用场景 |
错误方式 |
在 async def 中直接调用同步的、耗时的 CPU 密集函数 |
阻塞整个服务器,无法处理并发请求,性能极差。 |
绝对要避免 |
正确方式 |
使用 loop.run_in_executor 配合 concurrent.futures.ProcessPoolExecutor |
将 CPU 密集任务卸载到独立的子进程中执行,不阻塞事件循环,服务器保持高响应性。 |
处理 FastAPI 中任何 CPU 密集型任务的标准做法 |
其他