首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Python ORM及SQLAlchemy学习总结(下)

Python ORM及SQLAlchemy学习总结(下)

作者头像
Wangzy
发布2026-06-22 18:20:13
发布2026-06-22 18:20:13
370
举报

01

同步及异步方案选择及优缺点介绍

1、同步和异步分别适合的场景

Python的同步web框架主要代表有Flask、Django;异步框架主要代表有Fastapi、Tornado等。

关于什么场景用同步,什么场景用异步,deepseek给出的总结如下图:

2、SQLAlchemy同步的优缺点:

优点:

a、简单易用

  • 符合顺序执行的直觉,代码逻辑清晰。
  • 调试方便,堆栈信息完整。

b、生态成熟

  • 功能全面(ORM、Core、迁移工具 Alembic 等)。
  • 文档和社区支持完善,案例丰富。

c、兼容性强

  • 支持所有主流同步框架(Flask、Django 等)。
  • 无需特殊数据库驱动(如 psycopg2 即可工作)。

缺点:

a、阻塞 I/O

  • 每个数据库操作阻塞线程,高并发时需通过线程池(如 ThreadPoolExecutor)缓解,但线程开销大。

b、扩展性受限

  • 单线程处理请求时,数据库操作会成为瓶颈。
  • 为提升并发需增加线程/进程,消耗更多内存和 CPU。

3、SQLAlchemy异步的优缺点

优点:

a、高并发性能

  • 非阻塞 I/O 模型,单线程可处理数千并发连接(如搭配 FastAPI/Quart)。
  • 资源利用率高(内存占用远低于线程池)。

b、现代架构适配

  • 天然契合异步框架(FastAPI、Starlette、aiohttp)。
  • 与异步生态(WebSockets、消息队列)无缝集成。

c、精细控制并发

  • 可使用 asyncio.gather() 并行执行多个查询。

缺点:

a、复杂性高

  • 需掌握 async/await 语法和事件循环机制。
  • 调试困难(如协程堆栈跟踪复杂)。

b、生态限制

  • 部分同步工具不兼容(需异步替代品,如 databases 或手动适配)。
  • 数据库驱动受限(需支持异步驱动,如 asyncpg/aiomysql)。

c、迁移成本

  • 同步代码需重构为异步(全链路 async 化)。
  • 部分 ORM 高级功能可能受限(如复杂惰性加载)。

02

异步SQLAlchemy的应用

上一篇文章主要介绍了SQLAlchemy同步的应用,本文着重以笔者工作中用到的Tornado框架来介绍SQLAlchemy的异步应用。

用deepseek调研了下,tornado框架实现异步协程数据库IO,建议直接用原生异步数据库驱动(如aiomysql、asyncpg),理由是:

SQLAlchemy ORM层可能带来额外的开销,在需要高性能的场景下,考虑使用Core或者直接使用异步驱动(如aiomysql、asyncpg)进行原始SQL操作。

本文主要是介绍SQLAlchemy ORM的异步应用,所以详细介绍下tornado对SQLAlchemy ORM异步的使用。

直接使用aiomysql、asyncpg等异步驱动的方式本文就不做介绍了,有机会再另外撰文总结。

在 Tornado 中使用 SQLAlchemy 进行异步数据库操作时,需要结合异步驱动和 SQLAlchemy 1.4+ 的异步支持。

1、异步数据库引擎初始化

代码语言:javascript
复制
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import NullPool, QueuePool
from contextlib import asynccontextmanager
Base = declarative_base()
class AsyncDatabase:
    def init(self, db_url: str, pool_size: int = 10, max_overflow: int = 5):
        # 使用 QueuePool 连接池
        self.engine = create_async_engine(
            db_url,
            poolclass=QueuePool,
            pool_size=pool_size,
            max_overflow=max_overflow,
            pool_recycle=3600,  # 1小时回收连接
            echo=False,  # 生产环境设为 False
            future=True
        )
        self.async_session = sessionmaker(
            self.engine,
            class_=AsyncSession,
            expire_on_commit=False
        )

    @asynccontextmanager
    async def get_session(self):
        """异步上下文管理器获取数据库会话"""
        session = self.async_session()
        try:
            yield session
            await session.commit()
        except Exception as e:
            await session.rollback()
            raise
        finally:
            await session.close()

    async def create_all(self):
        """创建所有表结构"""
        async with self.engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
# PostgreSQL 示例
DB_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# MySQL 示例
# DB_URL = "mysql+aiomysql://user:password@localhost/dbname"
# 全局数据库实例
db = AsyncDatabase(
    db_url=DB_URL,
    pool_size=15,
    max_overflow=5
)

2、数据模型定义

代码语言:javascript
复制
# models.py
from sqlalchemy import Column, Integer, String
from .database import Base
class User(Base):
    tablename = "users"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(50), nullable=False)
    email = Column(String(100), unique=True, index=True)

3、异步数据库IO方式对User模型的数据做增删改查操作

代码语言:javascript
复制
# services.py
from .database import db
from .models import User
from sqlalchemy import select, update, delete
from sqlalchemy.exc import SQLAlchemyError
from tornado.web import HTTPError
class UserService:
    @staticmethod
    async def create_user(name: str, email: str) -> User:
        async with db.get_session() as session:
            try:
                user = User(name=name, email=email)
                session.add(user)
                await session.flush()
                return user
            except SQLAlchemyError as e:
                raise HTTPError(500, f"Database error: {str(e)}")
    @staticmethod
    async def get_user(user_id: int) -> User:
        async with db.get_session() as session:
            stmt = select(User).where(User.id == user_id)
            result = await session.execute(stmt)
            user = result.scalar_one_or_none()
            if not user:
                raise HTTPError(404, "User not found")
            return user
    @staticmethod
    async def update_user(user_id: int, **kwargs) -> User:
        async with db.get_session() as session:
            try:
                stmt = (
                    update(User)
                    .where(User.id == user_id)
                    .values(**kwargs)
                    .returning(User)
                )
                result = await session.execute(stmt)
                user = result.scalar_one()
                return user
            except SQLAlchemyError as e:
                raise HTTPError(500, f"Update failed: {str(e)}")
    @staticmethod
    async def list_users(page: int = 1, per_page: int = 10) -> list[User]:
        async with db.get_session() as session:
            offset = (page - 1) * per_page
            stmt = select(User).offset(offset).limit(per_page)
            result = await session.execute(stmt)
            return result.scalars().all()

03

同步及异步的scoped_session介绍

上一篇文章中关于SQLAlchemy Session中介绍了sessionmaker。sessionmaker是一个工厂函数,用于生成Session类。你可以配置它一次(例如,设置绑定引擎和其他选项),然后重复使用它来创建新的会话。

scoped_session 是一个用于创建线程局部(thread-local)会话的工厂。它包装了由 sessionmaker 产生的工厂,并确保在同一线程内多次调用时返回同一个会话对象。而在不同线程中,会返回不同的会话对象。

在Web应用框架中,每次调用都会产生一个新的Session实例,通常请求结束时就会关闭它。

1、同步scoped_session

scoped_session是一个用来提供线程局部(thread-local)作用域的Session的工厂。它通过一个注册表(registry)来确保在同一个线程多次调用返回的是同一个Session实例。适合在传统同步Web框架中使用(如Flask),使用scoped_session来确保每个请求使用独立的Session。

同步Web框架中如果在一个线程中使用多个session,可能会存在数据不一致性、事务管理混乱、连接池耗尽内存泄漏等问题。

同步框架中的SQLAlchemy scoped_session 使用样例如下:

代码语言:javascript
复制
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine

engine = create_engine("sqlite:///:memory:")
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)

# 在同一个线程中,多次调用Session()返回同一个Session对象
session1 = Session()
session2 = Session()
print(session1 is session2)  # 输出 True

# 使用完后,需要移除当前线程的Session
Session.remove()

但在异步环境中就不推荐使用scoped_session。有异步对应的方法:async_scoped_session

2、异步 async_scoped_session

使用样例如下:

代码语言:javascript
复制
from sqlalchemy.ext.asyncio import async_scoped_session, async_sessionmaker

async_session_factory = async_sessionmaker(engine, expire_on_commit=False)
AsyncScopedSession = async_scoped_session(async_session_factory, scopefunc=asyncio.current_task)
  • 协程隔离:通过scopefunc(默认为asyncio.current_task)为每个异步任务创建独立的 session
  • 线程/协程安全:内部使用contextvars实现协程间的数据隔离

Tornado框架是使用单线程事件循环模型处理请求,async_scoped_session设计主要解决多线程环境中的会话管理问题,所以不太建议在Torando框架中使用该方法来实现异步数据库IO,如上第二点的实际例子中,也没有使用这个方法。

像 Starlette 或 FastAPI 这样的 ASGI 框架,这些框架通常使用 asyncio 任务来处理请求,并且每个请求可能在不同的任务中运行。通过使用上下文变量,async_scoped_session 可以为每个任务(task)绑定一个独立的会话。

但是FastAPI中,也更推荐使用依赖注入的方式。关于依赖注入方式见下方第3点。

3、依赖注入方式实现类似 async_scoped_session 介绍

依赖注入的使用方式样例如下:

代码语言:javascript
复制
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("postgresql+asyncpg://...")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

# 依赖项
async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

# 在路由中使用
@app.get("/")
async def read_root(session: AsyncSession = Depends(get_db)):
    ...

在每个请求中创建Session,并通过依赖注入传递。

这样,每个请求都会创建一个新的Session,并在请求结束时自动关闭。

Fastapi框架中,选择哪种方式,DeepSeek给出的建议如下:

04

总结

关于 SQLAlchemy 同步以及异步使用的介绍内容,让自己对SQLAlchemy的底层逻辑以及同步和异步使用场景的区别,以及具体用法。

笔者对Python其他框架使用的比较少,主要是Flask和Tornado,以及简单实用过Fastapi。基本都是同步的方式在使用SQLAlchemy ORM,由于开发的平台并发也不高,倒也没出现过大的瓶颈。

文章的内容主要是通过SQLAlchemy官方文档以及结合DeepSeek的回答做的整理总结,如果有不对的地方,还请大佬多多指正哈。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-08-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 周银杂谈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、同步和异步分别适合的场景
  • 2、SQLAlchemy同步的优缺点:
  • 3、SQLAlchemy异步的优缺点
  • 1、异步数据库引擎初始化
  • 2、数据模型定义
  • 3、异步数据库IO方式对User模型的数据做增删改查操作
  • 同步及异步的scoped_session介绍
    • 1、同步scoped_session
    • 2、异步 async_scoped_session
    • 3、依赖注入方式实现类似 async_scoped_session 介绍
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档