
01
—
同步及异步方案选择及优缺点介绍
Python的同步web框架主要代表有Flask、Django;异步框架主要代表有Fastapi、Tornado等。
关于什么场景用同步,什么场景用异步,deepseek给出的总结如下图:

优点:
a、简单易用
b、生态成熟
c、兼容性强
缺点:
a、阻塞 I/O
b、扩展性受限
优点:
a、高并发性能
b、现代架构适配
c、精细控制并发
缺点:
a、复杂性高
b、生态限制
c、迁移成本
02
—
异步SQLAlchemy的应用
上一篇文章主要介绍了SQLAlchemy同步的应用,本文着重以笔者工作中用到的Tornado框架来介绍SQLAlchemy的异步应用。
用deepseek调研了下,tornado框架实现异步协程数据库IO,建议直接用原生异步数据库驱动(如aiomysql、asyncpg),理由是:
SQLAlchemy ORM层可能带来额外的开销,在需要高性能的场景下,考虑使用Core或者直接使用异步驱动(如aiomysql、asyncpg)进行原始SQL操作。
本文主要是介绍SQLAlchemy ORM的异步应用,所以详细介绍下tornado对SQLAlchemy ORM异步的使用。
直接使用aiomysql、asyncpg等异步驱动的方式本文就不做介绍了,有机会再另外撰文总结。
在 Tornado 中使用 SQLAlchemy 进行异步数据库操作时,需要结合异步驱动和 SQLAlchemy 1.4+ 的异步支持。
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import NullPool, QueuePool
from contextlib import asynccontextmanager
Base = declarative_base()
class AsyncDatabase:
def init(self, db_url: str, pool_size: int = 10, max_overflow: int = 5):
# 使用 QueuePool 连接池
self.engine = create_async_engine(
db_url,
poolclass=QueuePool,
pool_size=pool_size,
max_overflow=max_overflow,
pool_recycle=3600, # 1小时回收连接
echo=False, # 生产环境设为 False
future=True
)
self.async_session = sessionmaker(
self.engine,
class_=AsyncSession,
expire_on_commit=False
)
@asynccontextmanager
async def get_session(self):
"""异步上下文管理器获取数据库会话"""
session = self.async_session()
try:
yield session
await session.commit()
except Exception as e:
await session.rollback()
raise
finally:
await session.close()
async def create_all(self):
"""创建所有表结构"""
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# PostgreSQL 示例
DB_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# MySQL 示例
# DB_URL = "mysql+aiomysql://user:password@localhost/dbname"
# 全局数据库实例
db = AsyncDatabase(
db_url=DB_URL,
pool_size=15,
max_overflow=5
)# models.py
from sqlalchemy import Column, Integer, String
from .database import Base
class User(Base):
tablename = "users"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(50), nullable=False)
email = Column(String(100), unique=True, index=True)# services.py
from .database import db
from .models import User
from sqlalchemy import select, update, delete
from sqlalchemy.exc import SQLAlchemyError
from tornado.web import HTTPError
class UserService:
@staticmethod
async def create_user(name: str, email: str) -> User:
async with db.get_session() as session:
try:
user = User(name=name, email=email)
session.add(user)
await session.flush()
return user
except SQLAlchemyError as e:
raise HTTPError(500, f"Database error: {str(e)}")
@staticmethod
async def get_user(user_id: int) -> User:
async with db.get_session() as session:
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
user = result.scalar_one_or_none()
if not user:
raise HTTPError(404, "User not found")
return user
@staticmethod
async def update_user(user_id: int, **kwargs) -> User:
async with db.get_session() as session:
try:
stmt = (
update(User)
.where(User.id == user_id)
.values(**kwargs)
.returning(User)
)
result = await session.execute(stmt)
user = result.scalar_one()
return user
except SQLAlchemyError as e:
raise HTTPError(500, f"Update failed: {str(e)}")
@staticmethod
async def list_users(page: int = 1, per_page: int = 10) -> list[User]:
async with db.get_session() as session:
offset = (page - 1) * per_page
stmt = select(User).offset(offset).limit(per_page)
result = await session.execute(stmt)
return result.scalars().all()03
—
scoped_session介绍上一篇文章中关于SQLAlchemy Session中介绍了sessionmaker。sessionmaker是一个工厂函数,用于生成Session类。你可以配置它一次(例如,设置绑定引擎和其他选项),然后重复使用它来创建新的会话。
scoped_session 是一个用于创建线程局部(thread-local)会话的工厂。它包装了由 sessionmaker 产生的工厂,并确保在同一线程内多次调用时返回同一个会话对象。而在不同线程中,会返回不同的会话对象。
在Web应用框架中,每次调用都会产生一个新的Session实例,通常请求结束时就会关闭它。
scoped_sessionscoped_session是一个用来提供线程局部(thread-local)作用域的Session的工厂。它通过一个注册表(registry)来确保在同一个线程多次调用返回的是同一个Session实例。适合在传统同步Web框架中使用(如Flask),使用scoped_session来确保每个请求使用独立的Session。
同步Web框架中如果在一个线程中使用多个session,可能会存在数据不一致性、事务管理混乱、连接池耗尽内存泄漏等问题。
同步框架中的SQLAlchemy scoped_session 使用样例如下:
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine
engine = create_engine("sqlite:///:memory:")
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
# 在同一个线程中,多次调用Session()返回同一个Session对象
session1 = Session()
session2 = Session()
print(session1 is session2) # 输出 True
# 使用完后,需要移除当前线程的Session
Session.remove()但在异步环境中就不推荐使用scoped_session。有异步对应的方法:async_scoped_session 。
async_scoped_session使用样例如下:
from sqlalchemy.ext.asyncio import async_scoped_session, async_sessionmaker
async_session_factory = async_sessionmaker(engine, expire_on_commit=False)
AsyncScopedSession = async_scoped_session(async_session_factory, scopefunc=asyncio.current_task)scopefunc(默认为asyncio.current_task)为每个异步任务创建独立的 sessioncontextvars实现协程间的数据隔离
Tornado框架是使用单线程事件循环模型处理请求,async_scoped_session设计主要解决多线程环境中的会话管理问题,所以不太建议在Torando框架中使用该方法来实现异步数据库IO,如上第二点的实际例子中,也没有使用这个方法。
像 Starlette 或 FastAPI 这样的 ASGI 框架,这些框架通常使用 asyncio 任务来处理请求,并且每个请求可能在不同的任务中运行。通过使用上下文变量,async_scoped_session 可以为每个任务(task)绑定一个独立的会话。
但是FastAPI中,也更推荐使用依赖注入的方式。关于依赖注入方式见下方第3点。
async_scoped_session 介绍依赖注入的使用方式样例如下:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("postgresql+asyncpg://...")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# 依赖项
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# 在路由中使用
@app.get("/")
async def read_root(session: AsyncSession = Depends(get_db)):
...在每个请求中创建Session,并通过依赖注入传递。
这样,每个请求都会创建一个新的Session,并在请求结束时自动关闭。
Fastapi框架中,选择哪种方式,DeepSeek给出的建议如下:

04
—
总结关于 SQLAlchemy 同步以及异步使用的介绍内容,让自己对SQLAlchemy的底层逻辑以及同步和异步使用场景的区别,以及具体用法。
笔者对Python其他框架使用的比较少,主要是Flask和Tornado,以及简单实用过Fastapi。基本都是同步的方式在使用SQLAlchemy ORM,由于开发的平台并发也不高,倒也没出现过大的瓶颈。
文章的内容主要是通过SQLAlchemy官方文档以及结合DeepSeek的回答做的整理总结,如果有不对的地方,还请大佬多多指正哈。