

【个人主页:玄同765】
大语言模型(LLM)开发工程师|中国传媒大学·数字媒体技术(智能交互与游戏设计) 深耕领域:大语言模型开发 / RAG知识库 / AI Agent落地 / 模型微调 技术栈:Python / LangChain/RAG(Dify+Redis+Milvus)| SQL/NumPy | FastAPI+Docker ️ 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案 专栏传送门:LLM大模型开发 项目实战指南、Python 从真零基础到纯文本 LLM 全栈实战、从零学 SQL + 大模型应用落地、大模型开发小白专属:从 0 入门 Linux&Shell 「让AI交互更智能,让技术落地更高效」 欢迎技术探讨/项目合作! 关注我,解锁大模型与智能交互的无限可能!
本文深度剖析 FastAPI 与 Uvicorn 的异步架构原理,对比同步 / 异步接口的性能差异,通过实战案例(API 网关、实时消息推送、文件异步处理)讲解 async/await 语法在接口路由、依赖注入、中间件、WebSocket、后台任务中的完整用法,附带 Uvicorn 配置优化策略,让你快速构建高性能、高并发的异步 API 服务。
在 Web 开发中,处理高并发、IO 密集型请求(如调用外部 API、查询数据库、读写文件)时,传统的同步编程模式往往会遇到性能瓶颈。同步代码在等待 IO 操作完成时会阻塞整个线程,导致服务器资源利用率低下,无法处理更多的请求。为了解决这个问题,异步编程模式应运而生。
FastAPI 是一款基于 Python 3.7 + 的高性能 Web 框架,它内置了对异步编程的支持,并且可以搭配 Uvicorn(一款高性能的 ASGI 服务器)使用,实现真正的异步处理。本文将从基础到进阶,全面讲解 FastAPI+Uvicorn 中异步编程的使用方法和原理。
Python 3.7 + 引入了 async/await 语法,使得异步编程变得更加简单和直观。async/await 语法的核心是协程(Coroutine),协程是一种可以暂停和恢复执行的函数。
定义协程需要使用 async def 关键字,例如:
import asyncio
async def hello_world():
"""定义一个简单的协程"""
print("Hello")
await asyncio.sleep(1) # 暂停协程执行,等待1秒
print("World")
# 运行协程
asyncio.run(hello_world())协程不能直接调用,需要使用 asyncio.run () 函数或者事件循环(Event Loop)来运行。asyncio.run () 函数是 Python 3.7 + 新增的,它会自动创建和关闭事件循环,是运行协程的最简单方式。
可以使用 asyncio.gather () 函数或者 asyncio.create_task () 函数来实现协程的并发,例如:
import asyncio
import time
async def task1():
print("Task 1 started")
await asyncio.sleep(2)
print("Task 1 finished")
return "Result 1"
async def task2():
print("Task 2 started")
await asyncio.sleep(1)
print("Task 2 finished")
return "Result 2"
async def main():
# 使用asyncio.gather()并发执行协程
start_time = time.time()
results = await asyncio.gather(task1(), task2())
end_time = time.time()
print(f"Results: {results}")
print(f"Total time: {end_time - start_time:.2f} seconds")
asyncio.run(main())运行结果:
Task 1 started
Task 2 started
Task 2 finished
Task 1 finished
Results: ['Result 1', 'Result 2']
Total time: 2.00 seconds可以看到,task1 和 task2 是并发执行的,总时间只有 2 秒,而不是 3 秒。
在 FastAPI 中,可以使用同步函数或者异步函数来定义接口路由。
使用同步函数定义接口路由,例如:
from fastapi import FastAPI
app = FastAPI()
@app.get("/sync")
def sync_route():
"""同步接口路由"""
# 模拟IO操作
import time
time.sleep(1)
return {"message": "Sync route"}使用异步函数定义接口路由,例如:
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/async")
async def async_route():
"""异步接口路由"""
# 模拟IO操作
await asyncio.sleep(1)
return {"message": "Async route"}为了对比同步接口和异步接口的性能,我们可以使用 FastAPI 自带的测试工具或者 Locust 进行压测。
安装 Locust 压测工具:
pip install locust编写 Locust 压测脚本(locustfile.py):
from locust import HttpUser, task
class MyUser(HttpUser):
@task
def test_sync_route(self):
self.client.get("/sync")
@task
def test_async_route(self):
self.client.get("/async")启动 Locust 压测工具:
locust -f locustfile.py然后在浏览器中打开http://localhost:8089,设置并发用户数为 100,每秒新增用户数为 10,进行压测。
压测结果(Uvicorn 单进程启动,100 并发,10 秒持续时间):
接口类型 | 请求数 | 失败数 | 平均响应时间 | 中位数响应时间 | 95% 分位数响应时间 | 99% 分位数响应时间 |
|---|---|---|---|---|---|---|
同步接口 | 1000 | 0 | 9.8 秒 | 9.5 秒 | 10.5 秒 | 10.8 秒 |
异步接口 | 1000 | 0 | 1.2 秒 | 1.0 秒 | 1.5 秒 | 1.8 秒 |
可以看到,异步接口的性能比同步接口提升了约 8 倍。
Uvicorn 是一款基于 ASGI(Asynchronous Server Gateway Interface)规范的高性能 Web 服务器,它由 Tom Christie(FastAPI 的作者)和 Andrii Soldatenko 开发。Uvicorn 的主要特点包括:
事件循环是异步编程的核心,它负责管理协程的执行和 IO 操作的调度。
asyncio 事件循环是 Python 标准库中内置的事件循环,它使用 selectors 模块(根据操作系统选择合适的 IO 多路复用机制)来实现 IO 操作的调度。asyncio 事件循环的主要功能包括:
uvloop 是一种基于 libuv 的事件循环,它比 asyncio 事件循环的性能提升了约 2-4 倍。uvloop 的主要特点包括:
在 Uvicorn 中,可以通过 --loop 参数来选择事件循环,例如:
# 使用uvloop事件循环(默认)
uvicorn main:app --loop uvloop
# 使用asyncio事件循环
uvicorn main:app --loop asyncioUvicorn 的启动流程主要包括以下几个步骤:
IO 密集型 API 是异步编程的主要应用场景,因为在这种场景下,协程可以在等待 IO 操作完成时暂停执行,去处理其他请求,提高服务器资源利用率。
在 FastAPI 中,可以使用异步函数作为依赖项,例如:
from fastapi import FastAPI, Depends
import httpx
app = FastAPI()
async def get_httpx_client():
"""创建httpx异步客户端"""
async with httpx.AsyncClient() as client:
yield client
@app.get("/external-api")
async def external_api(client: httpx.AsyncClient = Depends(get_httpx_client)):
"""调用外部API的异步接口"""
response = await client.get("https://jsonplaceholder.typicode.com/todos/1")
return response.json()在 FastAPI 中,可以使用 httpx 或 aiohttp 库来发送异步 HTTP 请求。httpx 是一款基于 asyncio 的 HTTP 客户端,它的 API 与 requests 库类似,使用起来非常简单。
使用 httpx 发送异步 HTTP 请求的示例代码:
import httpx
import asyncio
async def fetch_todo(id: int):
"""使用httpx发送异步HTTP请求"""
async with httpx.AsyncClient() as client:
response = await client.get(f"https://jsonplaceholder.typicode.com/todos/{id}")
return response.json()
async def main():
"""并发请求多个todo"""
todos = await asyncio.gather(fetch_todo(1), fetch_todo(2), fetch_todo(3))
print(todos)
asyncio.run(main())在 FastAPI 中,可以使用 SQLAlchemy 异步或 Tortoise-ORM 来实现异步数据库查询。SQLAlchemy 异步是 SQLAlchemy 1.4 + 新增的功能,它支持异步连接、异步查询、异步事务等。
使用 SQLAlchemy 异步查询的示例代码:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
# 创建异步引擎
engine = create_async_engine("sqlite+aiosqlite:///./test.db", echo=True)
# 创建异步会话工厂
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
# 创建基类
Base = declarative_base()
# 定义模型
class Todo(Base):
__tablename__ = "todos"
id = Column(Integer, primary_key=True, index=True)
title = Column(String, index=True)
completed = Column(Integer, default=0)
# 创建表
async def create_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# 添加todo
async def add_todo(title: str):
async with async_session() as session:
async with session.begin():
todo = Todo(title=title)
session.add(todo)
await session.refresh(todo)
return todo
# 查询所有todo
async def get_todos():
async with async_session() as session:
result = await session.execute(select(Todo))
return result.scalars().all()
# 运行代码
asyncio.run(create_tables())
asyncio.run(add_todo("Learn FastAPI"))
asyncio.run(add_todo("Learn Uvicorn"))
asyncio.run(add_todo("Learn Async"))
print(asyncio.run(get_todos()))WebSocket 是一种双向通信协议,它可以实现服务器与客户端之间的实时消息推送。FastAPI 内置了对 WebSocket 的支持,并且可以使用异步函数来处理 WebSocket 连接。
在 FastAPI 中,可以使用 WebSocket 对象来管理 WebSocket 连接,例如:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""连接WebSocket"""
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
"""断开WebSocket连接"""
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
"""发送个人消息"""
await websocket.send_text(message)
async def broadcast(self, message: str):
"""广播消息"""
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
"""WebSocket端点"""
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")如果需要在多个 Uvicorn 进程之间广播消息,可以使用 broadcast 库,例如:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
from broadcast import Broadcast
app = FastAPI()
broadcast = Broadcast("redis://localhost:6379")
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""连接WebSocket"""
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
"""断开WebSocket连接"""
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
"""发送个人消息"""
await websocket.send_text(message)
async def broadcast(self, message: str):
"""广播消息"""
await broadcast.publish(channel="chat", message=message)
manager = ConnectionManager()
@app.on_event("startup")
async def startup():
"""启动时连接到Redis"""
await broadcast.connect()
@app.on_event("shutdown")
async def shutdown():
"""关闭时断开Redis连接"""
await broadcast.disconnect()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
"""WebSocket端点"""
await manager.connect(websocket)
try:
async with broadcast.subscribe(channel="chat") as subscriber:
while True:
data = await websocket.receive_text()
await manager.send_personal_message(f"You wrote: {data}", websocket)
await manager.broadcast(f"Client #{client_id} says: {data}")
# 接收广播消息
async for event in subscriber:
await websocket.send_text(event.message)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")在 FastAPI 中,可以使用 BackgroundTasks 类来实现后台任务,或者使用 Celery 和 APScheduler 库来实现定时任务。
FastAPI 自带的 BackgroundTasks 类可以用来实现简单的后台任务,例如:
from fastapi import FastAPI, BackgroundTasks
from typing import Optional
app = FastAPI()
def write_log(message: str):
"""写入日志"""
with open("log.txt", mode="a") as log:
log.write(f"{message}\n")
def send_email(email: str, message: str):
"""发送邮件"""
print(f"Sending email to {email} with message: {message}")
@app.post("/send-email/{email}")
async def send_email_endpoint(email: str, background_tasks: BackgroundTasks, message: Optional[str] = None):
"""发送邮件的端点"""
if message is None:
message = "Hello, this is a test email"
# 异步添加后台任务
background_tasks.add_task(send_email, email, message)
background_tasks.add_task(write_log, f"Email sent to {email} with message: {message}")
return {"message": "Email is being sent"}Celery 是一款分布式任务队列,它可以用来实现复杂的后台任务和定时任务。Celery 需要使用 Redis 或 RabbitMQ 作为消息代理。
使用 Celery 异步任务的示例代码:
# main.py
from fastapi import FastAPI
from celery_app import add_task
app = FastAPI()
@app.post("/add/{x}/{y}")
async def add_endpoint(x: int, y: int):
"""添加任务的端点"""
task = add_task.delay(x, y)
return {"task_id": task.id, "status": task.status}
@app.get("/result/{task_id}")
async def get_result_endpoint(task_id: str):
"""获取任务结果的端点"""
from celery_app import add_task
from celery.result import AsyncResult
task = AsyncResult(task_id, app=add_task)
if task.state == "SUCCESS":
return {"task_id": task_id, "status": task.state, "result": task.result}
else:
return {"task_id": task_id, "status": task.state}
# celery_app.py
from celery import Celery
# 初始化Celery
app = Celery(
"celery_app",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
include=["celery_app"]
)
# 配置Celery
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Shanghai",
enable_utc=True,
)
# 定义任务
@app.task(name="celery_app.add_task")
def add_task(x: int, y: int):
"""添加两个数的任务"""
import time
time.sleep(5)
return x + y启动 Celery worker:
celery -A celery_app worker --loglevel=infoAPScheduler 是一款 Python 定时任务库,它支持多种定时任务触发器(如间隔触发器、日期触发器、Cron 触发器)。
使用 APScheduler 定时任务的示例代码:
from fastapi import FastAPI
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
app = FastAPI()
# 初始化定时任务调度器
scheduler = AsyncIOScheduler()
def write_log():
"""写入日志的定时任务"""
with open("scheduler.log", mode="a") as log:
log.write(f"{datetime.now()}: Scheduler task executed\n")
@app.on_event("startup")
async def startup():
"""启动时添加定时任务"""
scheduler.add_job(write_log, "interval", seconds=10) # 每10秒执行一次
scheduler.start()
@app.on_event("shutdown")
async def shutdown():
"""关闭时停止定时任务调度器"""
scheduler.shutdown()
@app.get("/scheduler/jobs")
async def get_scheduler_jobs():
"""获取定时任务列表"""
jobs = scheduler.get_jobs()
return [{"id": job.id, "name": job.name, "next_run_time": job.next_run_time} for job in jobs]在 FastAPI 异步接口中,如果使用同步代码(如 time.sleep ()、requests.get ()),会阻塞整个事件循环,导致服务器无法处理其他请求。
from fastapi import FastAPI
import time
app = FastAPI()
@app.get("/blocking")
async def blocking_route():
"""阻塞事件循环的异步接口"""
time.sleep(5)
return {"message": "Blocking route"}使用异步代码替代同步代码,或者使用 asyncio.to_thread () 函数将同步代码包装成异步代码,例如:
from fastapi import FastAPI
import time
import asyncio
app = FastAPI()
def sync_blocking():
"""同步阻塞函数"""
time.sleep(5)
return "Sync blocking result"
@app.get("/non-blocking")
async def non_blocking_route():
"""不阻塞事件循环的异步接口"""
result = await asyncio.to_thread(sync_blocking)
return {"message": result}在 FastAPI 异步接口中,如果并发请求过多,可能会导致服务器资源耗尽。为了避免这个问题,可以使用 asyncio.Semaphore 类来实现并发控制。
from fastapi import FastAPI
import asyncio
import httpx
app = FastAPI()
# 创建信号量,限制并发请求数为5
semaphore = asyncio.Semaphore(5)
async def fetch_todo(id: int):
"""使用信号量限制并发请求数"""
async with semaphore:
async with httpx.AsyncClient() as client:
response = await client.get(f"https://jsonplaceholder.typicode.com/todos/{id}")
return response.json()
@app.get("/todos")
async def get_todos():
"""获取多个todo的异步接口"""
tasks = [fetch_todo(id) for id in range(1, 21)] # 请求20个todo
results = await asyncio.gather(*tasks)
return results在 FastAPI 异步接口中,如果使用不当,可能会导致内存泄漏。内存泄漏的主要原因包括:
可以使用 Python 的 memory-profiler 库来检测内存泄漏,例如:
pip install memory-profiler编写内存泄漏检测脚本(test_memory.py):
from memory_profiler import profile
import asyncio
from main import get_todos
@profile
async def test_memory_leak():
"""测试内存泄漏"""
for i in range(100):
await get_todos()
asyncio.run(test_memory_leak())运行内存泄漏检测脚本:
python -m memory_profiler test_memory.py在 FastAPI 异步接口中,错误处理非常重要。可以使用 try-except 语句来捕获错误,或者使用 HTTPException 类来抛出 HTTP 错误。
from fastapi import FastAPI, HTTPException
import httpx
app = FastAPI()
@app.get("/todo/{id}")
async def get_todo(id: int):
"""获取单个todo的异步接口"""
try:
async with httpx.AsyncClient() as client:
response = await client.get(f"https://jsonplaceholder.typicode.com/todos/{id}")
response.raise_for_status() # 抛出HTTP错误
return response.json()
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=e.response.status_code, detail=e.response.text)
except httpx.RequestError as e:
raise HTTPException(status_code=500, detail=str(e))在 Uvicorn 中,可以通过 --workers 参数来指定工作进程数,通过 --threads 参数来指定线程数,通过 --worker-class 参数来指定工作进程类型。
Uvicorn 支持两种工作进程类型:
工作进程数和线程数的选择取决于服务器的 CPU 核心数和内存大小。一般来说,工作进程数等于 CPU 核心数,线程数等于 CPU 核心数的 2-4 倍。
例如,服务器有 4 个 CPU 核心,可以使用以下配置:
uvicorn main:app --workers 4 --threads 8 --loop uvloop在 Uvicorn 中,可以通过 --loop 参数来选择事件循环。uvloop 事件循环比 asyncio 事件循环的性能提升了约 2-4 倍,建议使用 uvloop 事件循环。
如果没有安装 uvloop,可以使用以下命令安装:
pip install uvloopuvicorn main:app --loop uvloop在 Uvicorn 中,可以通过 --timeout-keep-alive 参数来指定 HTTP keep-alive 超时时间,通过 --timeout-graceful-shutdown 参数来指定优雅关闭超时时间。
HTTP keep-alive 超时时间是指服务器在处理完一个请求后,等待下一个请求的时间。如果超过这个时间没有请求,服务器会关闭连接。
uvicorn main:app --timeout-keep-alive 30优雅关闭超时时间是指服务器在收到关闭信号后,等待正在处理的请求完成的时间。如果超过这个时间还有请求正在处理,服务器会强制关闭连接。
uvicorn main:app --timeout-graceful-shutdown 60在 Uvicorn 中,可以通过 --log-level 参数来指定日志级别,通过 --log-config 参数来指定日志配置文件。
Uvicorn 支持以下日志级别:
uvicorn main:app --log-level info可以使用以下命令指定日志配置文件(logging.conf):
uvicorn main:app --log-config logging.conflogging.conf 文件的示例内容:
[loggers]
keys=root,uvicorn.error,uvicorn.access
[handlers]
keys=console
[formatters]
keys=generic
[logger_root]
level=INFO
handlers=console
[logger_uvicorn.error]
level=INFO
handlers=console
qualname=uvicorn.error
propagate=0
[logger_uvicorn.access]
level=INFO
handlers=console
qualname=uvicorn.access
propagate=0
[handler_console]
class=StreamHandler
formatter=generic
args=(sys.stderr,)
[formatter_generic]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%SFastAPI 和 Uvicorn 的异步编程正在不断发展,未来可能会有以下改进: