FastAPI 是一个 Web 框架,用于在 Python 3.8+ 中构建基于 HTTP 的服务 API。
介绍
- FastAPI 使用 Pydantic 和类型提示来验证、序列化和反序列化数据
- FastAPI 还会自动为使用它构建的 API 生成 OpenAPI 文档
- FastAPI(以及其底层的 Starlette 和 Uvicorn)是基于 Python 的 asyncio 事件循环(Event Loop)构建的。这个模型的核心思想是“单线程协作式多任务”。它非常适合处理 I/O 密集型任务(如等待数据库响应、调用外部 API、读写文件等)
- 缺点:CPU 密集任务(如复杂的数学计算、图像处理、数据压缩、循环加密等)不会产生 I/O 等待。它会持续占用 CPU 进行计算。如果你在一个 async def 路由函数中直接执行这样的任务,它会霸占整个事件循环。导致整个 FastAPI 应用将停止响应。
安装
pip install "fastapi[standard]"
使用
from typing import Union
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
is_offer: Union[bool, None] = None
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}
@app.put("/items/{item_id}")
def update_item(item_id: int, item: Item):
return {"item_name": item.name, "item_id": item_id}
- 启动
- http://127.0.0.1:8000
- docs: http://127.0.0.1:8000/docs
- openapi json: http://127.0.0.1:8000/openapi.json
- ReDoc: http://127.0.0.1:8000/redoc
# fastapi dev 可以在代码变化时自动加载
fastapi dev demo1.py
路径参数
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def read_item(item_id):
# async def read_item(item_id: int):
return {"item_id": item_id}
from enum import Enum
from fastapi import FastAPI
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
app = FastAPI()
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
if model_name is ModelName.alexnet:
return {"model_name": model_name, "message": "Deep Learning FTW!"}
if model_name.value == "lenet":
return {"model_name": model_name, "message": "LeCNN all the images"}
return {"model_name": model_name, "message": "Have some residuals"}
声明路径参数
from enum import Enum
from fastapi import FastAPI
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
app = FastAPI()
@app.get("/models/{model_name}")
async def get_model(model_name: ModelName):
if model_name is ModelName.alexnet:
return {"model_name": model_name, "message": "Deep Learning FTW!"}
if model_name.value == "lenet":
return {"model_name": model_name, "message": "LeCNN all the images"}
return {"model_name": model_name, "message": "Have some residuals"}
路径转换器
from fastapi import FastAPI
app = FastAPI()
@app.get("/files/{file_path:path}")
async def read_file(file_path: str):
return {"file_path": file_path}
查询参数
from fastapi import FastAPI
app = FastAPI()
fake_items_db = [{"item_name": "Foo"}, {"item_name": "Bar"}, {"item_name": "Baz"}]
@app.get("/items/")
async def read_item(skip: int = 0, limit: int = 10):
# async def read_item(item_id: str, q: str | None = None): # 可选参数
# @app.get("/items/{item_id}")
# async def read_item(item_id: str, q: str | None = None, short: bool = False):
return fake_items_db[skip : skip + limit]
- http://127.0.0.1:8000/items/?skip=0&limit=10
必选查询参数
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def read_user_item(item_id: str, needy: str):
item = {"item_id": item_id, "needy": needy}
return item
请求体
from fastapi import FastAPI
from pydantic import BaseModel
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
app = FastAPI()
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item, q: str | None = None):
result = {"item_id": item_id, **item.dict()}
if q:
result.update({"q": q})
return result
查询参数和字符串校验
from typing import Union
from fastapi import FastAPI, Query
app = FastAPI()
@app.get("/items/")
async def read_items(q: Union[str, None] = Query(default=None, max_length=50)):
# async def read_items(
# q: Union[str, None] = Query(default=None, min_length=3, max_length=50),
# ):
# 添加正则表达式
# async def read_items(
# q: Union[str, None] = Query(
# default=None, min_length=3, max_length=50, pattern="^fixedquery$"
# ),
# ):
# 默认值
# async def read_items(q: str = Query(default="fixedquery", min_length=3)):
results = {"items": [{"item_id": "Foo"}, {"item_id": "Bar"}]}
if q:
results.update({"q": q})
return results
路径参数和数值校验
from typing import Annotated
from fastapi import FastAPI, Path, Query
app = FastAPI()
@app.get("/items/{item_id}")
async def read_items(
item_id: Annotated[int, Path(title="The ID of the item to get")],
q: Annotated[str | None, Query(alias="item-query")] = None,
):
# async def read_items(
# *, item_id: int = Path(title="The ID of the item to get", ge=1), q: str
# ):
results = {"item_id": item_id}
if q:
results.update({"q": q})
return results
使用 Pydantic 模型的查询参数
from typing import Annotated, Literal
from fastapi import FastAPI, Query
from pydantic import BaseModel, Field
app = FastAPI()
class FilterParams(BaseModel):
limit: int = Field(100, gt=0, le=100)
offset: int = Field(0, ge=0)
order_by: Literal["created_at", "updated_at"] = "created_at"
tags: list[str] = []
@app.get("/items/")
async def read_items(filter_query: Annotated[FilterParams, Query()]):
return filter_query
混合使用 Path、Query 和请求体参数
from typing import Annotated
from fastapi import FastAPI, Path
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
@app.put("/items/{item_id}")
async def update_item(
item_id: Annotated[int, Path(title="The ID of the item to get", ge=0, le=1000)],
q: str | None = None,
item: Item | None = None,
):
results = {"item_id": item_id}
if q:
results.update({"q": q})
if item:
results.update({"item": item})
return results
请求体 - 字段
from typing import Annotated
from fastapi import Body, FastAPI
from pydantic import BaseModel, Field
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = Field(
default=None, title="The description of the item", max_length=300
)
price: float = Field(gt=0, description="The price must be greater than zero")
tax: float | None = None
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Annotated[Item, Body(embed=True)]):
results = {"item_id": item_id, "item": item}
return results
请求体 - 嵌套模型
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
tags: list = []
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item):
results = {"item_id": item_id, "item": item}
return results
Cookie 参数
from typing import Annotated
from fastapi import Cookie, FastAPI
app = FastAPI()
@app.get("/items/")
async def read_items(ads_id: Annotated[str | None, Cookie()] = None):
return {"ads_id": ads_id}
Cookie 参数模型
from typing import Annotated
from fastapi import Cookie, FastAPI
from pydantic import BaseModel
app = FastAPI()
class Cookies(BaseModel):
session_id: str
fatebook_tracker: str | None = None
googall_tracker: str | None = None
@app.get("/items/")
async def read_items(cookies: Annotated[Cookies, Cookie()]):
return cookies
from typing import Annotated
from fastapi import FastAPI, Header
app = FastAPI()
@app.get("/items/")
async def read_items(user_agent: Annotated[str | None, Header()] = None):
return {"User-Agent": user_agent}
from typing import Annotated
from fastapi import FastAPI, Header
from pydantic import BaseModel
app = FastAPI()
class CommonHeaders(BaseModel):
host: str
save_data: bool
if_modified_since: str | None = None
traceparent: str | None = None
x_tag: list[str] = []
@app.get("/items/")
async def read_items(headers: Annotated[CommonHeaders, Header()]):
return headers
响应模型
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
tags: list[str] = []
@app.post("/items/", response_model=Item)
async def create_item(item: Item) -> Any:
return item
@app.get("/items/", response_model=list[Item])
async def read_items() -> Any:
return [
{"name": "Portal Gun", "price": 42.0},
{"name": "Plumbus", "price": 32.0},
]
响应模型
from typing import Any
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
tags: list[str] = []
@app.post("/items/", response_model=Item)
async def create_item(item: Item) -> Any:
return item
@app.get("/items/", response_model=list[Item])
async def read_items() -> Any:
return [
{"name": "Portal Gun", "price": 42.0},
{"name": "Plumbus", "price": 32.0},
]
响应状态码
from fastapi import FastAPI
app = FastAPI()
@app.post("/items/", status_code=201)
async def create_item(name: str):
return {"name": name}
表单数据
from fastapi import FastAPI, Form
app = FastAPI()
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):
return {"username": username}
表单模型
from typing import Annotated
from fastapi import FastAPI, Form
from pydantic import BaseModel
app = FastAPI()
class FormData(BaseModel):
username: str
password: str
@app.post("/login/")
async def login(data: Annotated[FormData, Form()]):
return data
生命周期 lifespan
- 参考
- 代码在应用开始接收请求之前执行,也会在处理可能的若干请求之后执行,它覆盖了整个应用程序的生命周期
- 替代
faseapi 'on_event' is deprecated, use 'lifespan' event handlers instead.
服务运行
Uvicorn 是一个 ASGI (Asynchronous Server Gateway Interface) 服务器,它本身是异步的。这意味着即使只启动一个 worker 进程,它也能通过一个事件循环来并发地处理多个连接和请求。
Uvicorn 的并发能力主要体现在以下几个方面,而不是通过传统的多线程模型来实现:
- 异步 I/O (Asynchronous I/O):Uvicorn 基于
asyncio
或 uvloop
,能够高效地处理大量的 I/O 密集型任务,如网络请求、数据库查询、文件读写等。当一个请求在等待 I/O 操作完成时,事件循环会切换去处理其他请求,而不是阻塞等待。
- 多进程 (Workers):这是 Uvicorn 处理 CPU 密集型任务和充分利用多核 CPU 的主要方式。通过启动多个 worker 进程,每个进程都有自己的事件循环,可以独立地处理请求。这才是 Uvicorn 官方推荐的生产环境部署方式。
uvicorn main:app --workers 4
# 在生产环境中,常见的部署方案还有使用 Gunicorn 这样的进程管理器来管理 Uvicorn worker。例如:
gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
实战
类似于 httpbin 的接口
# pip install "fastapi[all]" uvicorn
# uvicorn main:app --reload
# curl http://127.0.0.1:8000/any/path/you/want
# curl -X POST http://127.0.0.1:8000/data/submit -H "Content-Type: application/json" -d '{"id": 1, "name": "test"}'
# curl -X PUT http://127.0.0.1:8000/items/123
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
app = FastAPI()
@app.api_route("/{path_name:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
async def catch_all(request: Request, path_name: str):
"""
一个通用的端点,可以接收所有HTTP方法和任意路径的请求,
并返回请求的详细信息。
"""
method = request.method
url = str(request.url)
headers = dict(request.headers)
# 尝试获取请求体
body = None
if method not in ["GET", "HEAD", "OPTIONS"]:
try:
body = await request.json()
except:
body = (await request.body()).decode("utf-8")
response_data = {
"method": method,
"url": url,
"path_name": path_name,
"headers": headers,
"body": body,
}
return JSONResponse(content=response_data)
卸载部分 CPU 任务
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI
# -------------------- 1. 初始化 --------------------
# 创建 FastAPI 应用实例
app = FastAPI()
# 创建一个进程池执行器。
# 把它放在全局范围,这样它就可以在整个应用的生命周期内被复用。
# 如果不指定 max_workers,它会默认为你机器的 CPU 核心数。
process_pool = ProcessPoolExecutor()
# -------------------- 2. CPU 密集型任务 --------------------
def cpu_intensive_task(iterations: int) -> float:
"""
一个模拟 CPU 密集任务的函数,它进行大量计算。
返回执行所花费的时间。
"""
print(f"开始执行 CPU 密集任务...")
start_time = time.time()
total = 0
for i in range(iterations):
total += i
end_time = time.time()
duration = end_time - start_time
print(f"CPU 密集任务完成,耗时: {duration:.4f} 秒")
return duration
# -------------------- 3. API 端点 --------------------
@app.get("/")
async def root():
"""一个简单的端点,用于测试服务器的响应性。"""
return {"message": "服务器正常响应中..."}
@app.get("/block")
async def blocking_endpoint():
"""
【错误示例】
直接在 async 函数中运行 CPU 密集任务。
这将阻塞整个事件循环。
"""
print("进入阻塞端点 /block")
# 直接调用,会阻塞一切
duration = cpu_intensive_task(70_000_000)
print("离开阻塞端点 /block")
return {"message": f"任务已完成 (阻塞模式),耗时: {duration:.4f} 秒"}
@app.get("/non-block")
async def non_blocking_endpoint():
"""
【正确示例】
使用进程池来执行 CPU 密集任务。
这不会阻塞事件循环。
"""
print("进入非阻塞端点 /non-block")
# 获取当前的 asyncio 事件循环
loop = asyncio.get_running_loop()
# 使用 loop.run_in_executor 将阻塞函数提交给执行器
# 第一个参数是执行器实例 (我们这里是 process_pool),
# 第二个参数是你要调用的函数,
# 后面是传递给该函数的参数。
# `await` 会等待任务在另一个进程中完成并返回结果。
# 但在等待期间,事件循环是自由的,可以处理其他请求。
duration = await loop.run_in_executor(
process_pool, cpu_intensive_task, 70_000_000
)
print("离开非阻塞端点 /non-block")
return {"message": f"任务已提交 (非阻塞模式),并在后台完成,耗时: {duration:.4f} 秒"}
# -------------------- 4. 运行应用 (可选) --------------------
# 如果你直接运行 `python main.py`,可以使用这个部分
if __name__ == "__main__":
import uvicorn
# 建议通过命令行 `uvicorn main:app --workers 1` 启动
uvicorn.run(app, host="0.0.0.0", port=8000)
示例创建了三个 API 端点:
/
: 一个健康的根端点,用来测试服务器是否在阻塞期间仍能响应。
/block
: 错误的方式。直接在 async
函数中调用 CPU 密集任务,这将阻塞整个服务器。
/non-block
: 正确的方式。使用 ProcessPoolExecutor
将任务提交到另一个进程执行,不会阻塞服务器。
对比说明:
方法 |
实现方式 |
效果 |
适用场景 |
错误方式 |
在 async def 中直接调用同步的、耗时的 CPU 密集函数 |
阻塞整个服务器,无法处理并发请求,性能极差。 |
绝对要避免 |
正确方式 |
使用 loop.run_in_executor 配合 concurrent.futures.ProcessPoolExecutor |
将 CPU 密集任务卸载到独立的子进程中执行,不阻塞事件循环,服务器保持高响应性。 |
处理 FastAPI 中任何 CPU 密集型任务的标准做法 |
其他