

【个人主页:】
大语言模型(LLM)开发工程师|中国传媒大学·数字媒体技术(智能交互与游戏设计) 深耕领域:大语言模型开发 / RAG知识库 / AI Agent落地 / 模型微调 技术栈:Python / LangChain/RAG(Dify+Redis+Milvus)| SQL/NumPy | FastAPI+Docker ️ 工程能力:专注模型工程化部署、知识库构建与优化,擅长全流程解决方案 专栏传送门:LLM大模型开发 项目实战指南、Python 从真零基础到纯文本 LLM 全栈实战、从零学 SQL + 大模型应用落地、大模型开发小白专属:从 0 入门 Linux&Shell 「让AI交互更智能,让技术落地更高效」 欢迎技术探讨/项目合作! 关注我,解锁大模型与智能交互的无限可能!
你是不是之前一直用 mock 数据写 FastAPI 接口?上线后发现数据无法持久化存储?这篇详解 FastAPI 连接 SQLite、MySQL、PostgreSQL 的方法,用 SQLAlchemy 1.4/2.0 的 ORM 建模,实现用户的创建、读取、更新、删除操作,最后配置 Alembic 数据库迁移工具,20 分钟就能搭建出一个能持久化存储数据的 API 接口。
在 API 接口开发的初期,很多开发者会用 mock 数据(如 Python 列表)来模拟数据库的操作,这样可以快速验证接口的逻辑。但 mock 数据有以下几个明显的问题:
因此,在 API 接口开发的后期,我们需要连接真实的数据库,实现数据的持久化存储、复杂查询和事务处理。
如果你已经完成了前四篇的学习,可以直接使用之前创建的fastapi-demo项目。如果没有,请按照以下步骤初始化项目:
创建一个新的文件夹,命名为fastapi-demo;
在文件夹中创建以下文件和文件夹:
main.py:FastAPI 应用的主入口文件;database.py:数据库连接配置文件;models.py:SQLAlchemy ORM 模型文件;schemas.py:Pydantic 数据验证模型文件;在终端中进入fastapi-demo文件夹,输入以下命令安装依赖:
pip install fastapi uvicorn python-multipart python-jose[cryptography] passlib[bcrypt] sqlalchemy alembic pymysql psycopg2-binary其中:
sqlalchemy:Python 的 ORM 框架;alembic:SQLAlchemy 的数据库迁移工具;pymysql:连接 MySQL 数据库的驱动;psycopg2-binary:连接 PostgreSQL 数据库的驱动。FastAPI 可以通过 SQLAlchemy ORM 连接任何支持 SQLAlchemy 的数据库,包括 SQLite、MySQL、PostgreSQL 等。
SQLite 是一个轻量级的关系型数据库,它不需要单独的服务器进程,而是将数据存储在一个文件中。SQLite 数据库适合测试和开发环境,但不适合生产环境。
打开database.py文件,输入以下代码:
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# SQLite数据库的连接URL(数据库文件路径为./fastapi-demo.db)
SQLALCHEMY_DATABASE_URL = "sqlite:///./fastapi-demo.db"
# 初始化SQLAlchemy引擎
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
# 初始化SQLAlchemy会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 初始化SQLAlchemy基类(所有ORM模型都需要继承这个基类)
Base = declarative_base()
# 获取数据库会话的依赖注入函数
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()MySQL 是一个开源的关系型数据库管理系统,它是世界上最流行的数据库之一。MySQL 数据库适合生产环境,但需要单独安装 MySQL Server。
打开database.py文件,修改数据库连接 URL:
# MySQL数据库的连接URL(需要根据实际情况修改用户名、密码、数据库名)
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@localhost/fastapi_demo"其中:
root:MySQL 的用户名;123456:MySQL 的密码;localhost:MySQL Server 的地址;fastapi_demo:需要连接的数据库名。注意:在使用 MySQL 数据库之前,需要先安装 MySQL Server,并且创建一个名为fastapi_demo的数据库。
PostgreSQL 是一个开源的关系型数据库管理系统,它支持高级的 SQL 特性,如事务处理、并发控制、视图、存储过程等。PostgreSQL 数据库适合生产环境,但需要单独安装 PostgreSQL Server。
打开database.py文件,修改数据库连接 URL:
# PostgreSQL数据库的连接URL(需要根据实际情况修改用户名、密码、数据库名)
SQLALCHEMY_DATABASE_URL = "postgresql://postgres:123456@localhost/fastapi_demo"其中:
postgres:PostgreSQL 的用户名;123456:PostgreSQL 的密码;localhost:PostgreSQL Server 的地址;fastapi_demo:需要连接的数据库名。注意:在使用 PostgreSQL 数据库之前,需要先安装 PostgreSQL Server,并且创建一个名为fastapi_demo的数据库。
为了让你更直观地理解 SQLite、MySQL、PostgreSQL 的区别,我们整理了以下对比表:
数据库名称 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
SQLite | 轻量级,不需要单独的服务器进程,将数据存储在一个文件中,操作简单,适合测试和开发环境 | 不支持并发写入操作,不适合高并发场景,不支持复杂的 SQL 特性 | 测试和开发环境,单用户或低并发场景的应用程序 |
MySQL | 开源,免费,社区活跃,文档完善,支持并发写入操作,适合高并发场景,支持多种存储引擎(如 InnoDB、MyISAM) | 不支持部分高级 SQL 特性,如递归查询、窗口函数(MySQL 8.0 之前) | 生产环境,高并发场景的应用程序,如电商平台、社交媒体 |
PostgreSQL | 开源,免费,社区活跃,文档完善,支持高级 SQL 特性,如递归查询、窗口函数、CTE、JSONB 数据类型,支持并发写入操作,适合高并发场景,安全性高 | 学习曲线较陡,性能在某些高并发场景下不如 MySQL | 生产环境,需要处理复杂 SQL 查询或 JSONB 数据类型的应用程序,如数据分析平台、物联网平台 |
SQLAlchemy ORM 允许开发者用 Python 类来定义数据库中的表结构,这样可以减少手写 SQL 语句的时间,提高代码的可读性和可维护性。
打开models.py文件,输入以下代码:
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from database import Base
# 定义用户表的ORM模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(20), unique=True, index=True, nullable=False)
email = Column(String(50), unique=True, index=True, nullable=False)
hashed_password = Column(String(100), nullable=False)
age = Column(Integer, nullable=True)
is_admin = Column(Boolean, default=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())其中:
__tablename__:指定数据库中对应的表名;id:用户的唯一标识符,主键,自增;username:用户名,长度为 20,唯一,可索引,必填;email:邮箱,长度为 50,唯一,可索引,必填;hashed_password:加密后的密码,长度为 100,必填;age:年龄,可选;is_admin:是否是管理员,默认值为 False;is_active:是否激活,默认值为 True;created_at:创建时间,默认值为当前时间;updated_at:更新时间,每次更新数据时自动更新为当前时间。打开schemas.py文件,输入以下代码:
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
from datetime import datetime
# 定义用户创建的数据验证模型
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=20, pattern=r"^[a-zA-Z0-9_]+$")
email: EmailStr
password: str = Field(..., min_length=8, max_length=20,
regex=r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d).*$")
age: Optional[int] = Field(None, ge=1, le=120)
is_admin: Optional[bool] = False
# 定义用户信息的数据验证模型
class UserInfo(BaseModel):
id: int
username: str
email: str
age: Optional[int] = None
is_admin: bool = False
is_active: bool = True
created_at: datetime
updated_at: Optional[datetime] = None
class Config:
orm_mode = True
# 定义用户更新的数据验证模型
class UserUpdate(BaseModel):
username: Optional[str] = Field(None, min_length=3, max_length=20, pattern=r"^[a-zA-Z0-9_]+$")
email: Optional[EmailStr] = None
password: Optional[str] = Field(None, min_length=8, max_length=20,
regex=r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d).*$")
age: Optional[int] = Field(None, ge=1, le=120)
is_admin: Optional[bool] = None
is_active: Optional[bool] = None其中:
orm_mode = True:允许 Pydantic 模型直接从 SQLAlchemy ORM 模型中读取数据;...:表示必填字段;min_length和max_length:表示字符串字段的最小和最大长度;pattern或regex:表示字符串字段的正则表达式;ge和le:表示数值字段的最小和最大值。CRUD 操作是 API 接口开发中最常用的操作,包括创建(Create)、读取(Read)、更新(Update)、删除(Delete)。
打开main.py文件,输入以下代码:
from fastapi import FastAPI, Depends, HTTPException, status, Form
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from database import get_db
from models import User
from schemas import UserCreate, UserInfo, UserUpdate
from typing import List, Optional
app = FastAPI()
# 密码加密和验证的上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT令牌的配置
SECRET_KEY = "your-secret-key-here" # 生产环境中应该使用环境变量存储
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # 访问令牌的过期时间(分钟)
# OAuth2密码模式的配置
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 密码加密函数
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
# 密码验证函数
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# 根据用户名获取用户信息
def get_user_by_username(db: Session, username: str):
return db.query(User).filter(User.username == username).first()
# 认证用户(验证用户名和密码)
def authenticate_user(db: Session, username: str, password: str):
user = get_user_by_username(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 生成访问令牌
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 获取当前用户的依赖注入函数
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user_by_username(db, username=username)
if user is None:
raise credentials_exception
return user
# 获取当前激活用户的依赖注入函数
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# 获取当前激活管理员用户的依赖注入函数
async def get_current_active_admin_user(current_user: User = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not enough permissions")
return current_user
# 获取访问令牌的接口
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""
获取访问令牌的接口
:param form_data: OAuth2密码模式的请求数据(用户名和密码)
:param db: 数据库会话(依赖注入)
:return: 访问令牌的JSON响应
"""
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 创建用户的接口(受保护,只有管理员能访问)
@app.post("/users", response_model=UserInfo)
async def create_user(user: UserCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_admin_user)):
"""
创建用户的接口(受保护,只有管理员能访问)
:param user: 用户创建信息(请求体参数)
:param db: 数据库会话(依赖注入)
:param current_user: 当前激活管理员用户的信息(依赖注入)
:return: 新创建用户的信息
"""
# 检查用户名和邮箱是否已经存在
db_user = get_user_by_username(db, user.username)
if db_user:
raise HTTPException(status_code=400, detail="Username already registered")
db_email = db.query(User).filter(User.email == user.email).first()
if db_email:
raise HTTPException(status_code=400, detail="Email already registered")
# 加密密码
hashed_password = get_password_hash(user.password)
# 创建新用户
db_user = User(
username=user.username,
email=user.email,
hashed_password=hashed_password,
age=user.age,
is_admin=user.is_admin
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# 获取用户列表的接口(受保护,只有管理员能访问)
@app.get("/users", response_model=List[UserInfo])
async def read_users(page: int = 1, page_size: int = 10, is_active: Optional[bool] = None, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_admin_user)):
"""
获取用户列表的接口(受保护,只有管理员能访问)
:param page: 当前页码(查询参数,可选,默认值为1)
:param page_size: 每页显示的用户数量(查询参数,可选,默认值为10)
:param is_active: 用户是否激活(查询参数,可选,默认值为None表示查询所有用户)
:param db: 数据库会话(依赖注入)
:param current_user: 当前激活管理员用户的信息(依赖注入)
:return: 用户列表的JSON响应
"""
# 过滤用户数据
query = db.query(User)
if is_active is not None:
query = query.filter(User.is_active == is_active)
# 分页处理
start_index = (page - 1) * page_size
end_index = start_index + page_size
users = query.offset(start_index).limit(page_size).all()
return users
# 获取单个用户信息的接口(受保护,用户只能查看自己的信息,管理员可以查看所有用户的信息)
@app.get("/users/{user_id}", response_model=UserInfo)
async def read_user(user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user)):
"""
获取单个用户信息的接口(受保护,用户只能查看自己的信息,管理员可以查看所有用户的信息)
:param user_id: 用户的唯一标识符(路径参数)
:param db: 数据库会话(依赖注入)
:param current_user: 当前激活用户的信息(依赖注入)
:return: 单个用户信息的JSON响应
"""
# 查询用户信息
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# 检查权限
if not current_user.is_admin and current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
return user
# 更新用户信息的接口(受保护,用户只能更新自己的信息,管理员可以更新所有用户的信息)
@app.put("/users/{user_id}", response_model=UserInfo)
async def update_user(user_id: int, user_update: UserUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user)):
"""
更新用户信息的接口(受保护,用户只能更新自己的信息,管理员可以更新所有用户的信息)
:param user_id: 用户的唯一标识符(路径参数)
:param user_update: 用户更新信息(请求体参数)
:param db: 数据库会话(依赖注入)
:param current_user: 当前激活用户的信息(依赖注入)
:return: 更新后用户信息的JSON响应
"""
# 查询用户信息
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# 检查权限
if not current_user.is_admin and current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
# 更新用户信息
if user_update.username is not None:
# 检查用户名是否已经存在
db_user = get_user_by_username(db, user_update.username)
if db_user and db_user.id != user_id:
raise HTTPException(status_code=400, detail="Username already registered")
user.username = user_update.username
if user_update.email is not None:
# 检查邮箱是否已经存在
db_email = db.query(User).filter(User.email == user_update.email).first()
if db_email and db_email.id != user_id:
raise HTTPException(status_code=400, detail="Email already registered")
user.email = user_update.email
if user_update.password is not None:
# 加密密码
hashed_password = get_password_hash(user_update.password)
user.hashed_password = hashed_password
if user_update.age is not None:
user.age = user_update.age
if user_update.is_admin is not None:
# 只有管理员能修改用户的管理员权限
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not enough permissions")
user.is_admin = user_update.is_admin
if user_update.is_active is not None:
# 只有管理员能修改用户的激活状态
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Not enough permissions")
user.is_active = user_update.is_active
db.commit()
db.refresh(user)
return user
# 删除用户的接口(受保护,用户只能删除自己的信息,管理员可以删除所有用户的信息)
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_active_user)):
"""
删除用户的接口(受保护,用户只能删除自己的信息,管理员可以删除所有用户的信息)
:param user_id: 用户的唯一标识符(路径参数)
:param db: 数据库会话(依赖注入)
:param current_user: 当前激活用户的信息(依赖注入)
:return: 删除成功的响应
"""
# 查询用户信息
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# 检查权限
if not current_user.is_admin and current_user.id != user_id:
raise HTTPException(status_code=403, detail="Not enough permissions")
# 删除用户
db.delete(user)
db.commit()
return {"message": "User deleted successfully"}Alembic 是 SQLAlchemy 的数据库迁移工具,它允许开发者版本化管理数据库的表结构,方便团队协作和代码部署。
在终端中进入fastapi-demo文件夹,输入以下命令初始化 Alembic:
alembic init alembic执行完命令后,会在fastapi-demo文件夹中创建一个名为alembic的文件夹和一个名为alembic.ini的配置文件。
打开alembic.ini配置文件,修改数据库连接 URL:
# SQLite数据库的连接URL
sqlalchemy.url = sqlite:///./fastapi-demo.db
# MySQL数据库的连接URL
# sqlalchemy.url = mysql+pymysql://root:123456@localhost/fastapi_demo
# PostgreSQL数据库的连接URL
# sqlalchemy.url = postgresql://postgres:123456@localhost/fastapi_demo打开alembic/env.py文件,修改以下代码:
# 导入SQLAlchemy基类和ORM模型
from database import Base
from models import User
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# 这个是Alembic的配置对象
config = context.config
# 导入目标元数据
target_metadata = Base.metadata
# 其他代码保持不变在终端中进入fastapi-demo文件夹,输入以下命令生成迁移脚本:
alembic revision --autogenerate -m "create users table"执行完命令后,会在alembic/versions文件夹中创建一个名为xxxxxxxx_create_users_table.py的迁移脚本,其中xxxxxxxx是一个随机的字符串。
在终端中进入fastapi-demo文件夹,输入以下命令执行迁移脚本:
alembic upgrade head执行完命令后,会在数据库中创建一个名为users的表。
如果执行迁移脚本时出现错误,或者需要回滚到之前的版本,可以输入以下命令回滚迁移:
# 回滚到上一个版本
alembic downgrade -1
# 回滚到最初的版本
alembic downgrade base在生产环境中使用 FastAPI 连接数据库时,需要注意以下几点:
如果你已经掌握了 FastAPI 连接数据库和处理 CRUD 操作的方法,下一步你可以学习以下内容:
FastAPI+SQLAlchemy 是 API 接口开发的黄金组合,它允许开发者用 Python 类来定义数据库中的表结构,实现数据的持久化存储、复杂查询和事务处理,同时提供了自动生成 OpenAPI 文档和类型注解验证的功能。通过这篇文章的学习,你应该已经掌握了 FastAPI 连接 3 大主流数据库、处理 CRUD 操作、配置 Alembic 数据库迁移工具的方法。