在 ORM 映射类上定义具有“index”属性的列的Indexable
类型。
“index”表示属性与具有预定义索引以访问它的Indexable
列的元素相关联。Indexable
类型包括ARRAY
、JSON
和HSTORE
等类型。
indexable
扩展为任何Indexable
类型的列的元素提供了类似于Column
的接口。在简单情况下,它可以被视为一个Column
- 映射属性。
假设Person
是一个具有主键和 JSON 数据字段的模型。虽然该字段可以包含任意数量的元素,但我们希望单独引用名为name
的元素作为行为类似独立列的专用属性:
from sqlalchemy import Column, JSON, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.indexable import index_property
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
data = Column(JSON)
name = index_property('data', 'name')
上面,name
属性现在的行为类似于映射列。我们可以组合一个新的Person
并设置name
的值:
>>> person = Person(name='Alchemist')
现在值是可访问的:
>>> person.name
'Alchemist'
在幕后,JSON 字段被初始化为一个新的空字典,并设置了字段:
>>> person.data
{"name": "Alchemist'}
该字段是可变的:
>>> person.name = 'Renamed'
>>> person.name
'Renamed'
>>> person.data
{'name': 'Renamed'}
当使用index_property
时,我们对可索引结构所做的更改也会自动跟踪为历史记录;我们不再需要使用MutableDict
来跟踪此更改以进行工作单元。
删除也可以正常工作:
>>> del person.name
>>> person.data
{}
上面,删除person.name
会删除字典中的值,但不会删除字典本身。
缺少键将产生AttributeError
:
>>> person = Person()
>>> person.name
...
AttributeError: 'name'
除非设置默认值:
>>> class Person(Base):
>>> __tablename__ = 'person'
>>>
>>> id = Column(Integer, primary_key=True)
>>> data = Column(JSON)
>>>
>>> name = index_property('data', 'name', default=None) # See default
>>> person = Person()
>>> print(person.name)
None
这些属性也可以在类级别访问。下面,我们演示了Person.name
用于生成带有索引的 SQL 条件:
>>> from sqlalchemy.orm import Session
>>> session = Session()
>>> query = session.query(Person).filter(Person.name == 'Alchemist')
上述查询等效于:
>>> query = session.query(Person).filter(Person.data['name'] == 'Alchemist')
可以链接多个index_property
对象以生成多层索引:
from sqlalchemy import Column, JSON, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.indexable import index_property
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
data = Column(JSON)
birthday = index_property('data', 'birthday')
year = index_property('birthday', 'year')
month = index_property('birthday', 'month')
day = index_property('birthday', 'day')
上面,一个查询如下:
q = session.query(Person).filter(Person.year == '1980')
在 PostgreSQL 后端上,上述查询将呈现为:
SELECT person.id, person.data
FROM person
WHERE person.data -> %(data_1)s -> %(param_1)s = %(param_2)s
index_property
在索引的数据结构不存在时包含特殊行为,并且调用了一个设置操作:
index_property
,默认的数据结构将是一个 Python 列表,其中包含至少与索引值一样多的None
值;然后将该值设置到列表中的相应位置。这意味着对于索引值为零的情况,在设置给定值之前,列表将初始化为[None]
,对于索引值为五的情况,在设置第五个元素之前,列表将初始化为[None, None, None, None, None]
。请注意,现有的列表不会直接扩展以接收一个值。
index_property
,将使用 Python 字典作为默认数据结构。
index_property.datatype
参数将默认数据结构设置为任何 Python 可调用对象,覆盖以前的规则。
index_property
可以被子类化,特别是针对常见的提供值或 SQL 表达式强制转换的用例。以下是在使用 PostgreSQL JSON 类型时的常见用法,其中我们还希望包括自动转换加astext()
:
class pg_json_property(index_property):
def __init__(self, attr_name, index, cast_type):
super(pg_json_property, self).__init__(attr_name, index)
self.cast_type = cast_type
def expr(self, model):
expr = super(pg_json_property, self).expr(model)
return expr.astext.cast(self.cast_type)
上述子类可以与 PostgreSQL 特定版本的JSON
一起使用:
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import JSON
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
data = Column(JSON)
age = pg_json_property('data', 'age', Integer)
实例级别的age
属性与以前的工作方式相同;但是在渲染 SQL 时,将使用 PostgreSQL 的->>
运算符进行索引访问,而不是通常的索引运算符->
:
>>> query = session.query(Person).filter(Person.age < 20)
上述查询将呈现为:
SELECT person.id, person.data
FROM person
WHERE CAST(person.data ->> %(data_1)s AS INTEGER) < %(param_1)s
对象名称 | 描述 |
---|---|
index_property | 属性生成器。生成的属性描述了一个与Indexable列相对应的对象属性。 |
class sqlalchemy.ext.indexable.index_property
属性生成器。生成的属性描述了一个与Indexable
列相对应的对象属性。
另请参阅
sqlalchemy.ext.indexable
成员
init()
类签名
类 sqlalchemy.ext.indexable.index_property
(sqlalchemy.ext.hybrid.hybrid_property
)
method __init__(attr_name, index, default=<object object>, datatype=None, mutable=True, onebased=True)
创建一个新的 index_property
。
参数:
attr_name
– Indexable 类型列的属性名,或者返回可索引结构的其他属性。
index
– 用于获取和设置此值的索引。这应该是整数的 Python 端索引值。
default
– 当给定索引处没有值时,将返回的值。
datatype
– 当字段为空时使用的默认数据类型。默认情况下,这是从使用的索引类型派生的;对于整数索引,是 Python 列表,对于任何其他类型的索引,是 Python 字典。对于列表,列表将初始化为长度至少为 index
的 None 值列表。
mutable
– 如果为 False,则不允许对属性进行写入和删除。
onebased
– 假设此值的 SQL 表示是基于一的;也就是说,SQL 中的第一个索引是 1,而不是零。
假设 Person
是一个带有主键和 JSON 数据字段的模型。虽然此字段可以包含任意数量的元素,但我们希望单独引用称为 name
的元素,作为一个独立的属性,其行为类似于独立的列:
from sqlalchemy import Column, JSON, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.indexable import index_property
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
data = Column(JSON)
name = index_property('data', 'name')
上面,name
属性现在的行为类似于映射列。我们可以组合一个新的 Person
并设置 name
的值:
>>> person = Person(name='Alchemist')
现在可以访问该值了:
>>> person.name
'Alchemist'
在幕后,JSON 字段被初始化为一个新的空字典,并设置了字段:
>>> person.data
{"name": "Alchemist'}
该字段是可变的:
>>> person.name = 'Renamed'
>>> person.name
'Renamed'
>>> person.data
{'name': 'Renamed'}
当使用 index_property
时,对可索引结构所做的更改也会自动跟踪为历史记录;我们不再需要使用 MutableDict
来跟踪此更改的工作单元。
删除操作也正常工作:
>>> del person.name
>>> person.data
{}
上面,对 person.name
的删除会删除字典中的值,但不会删除字典本身。
缺少的键会产生 AttributeError
:
>>> person = Person()
>>> person.name
...
AttributeError: 'name'
除非设置了默认值:
>>> class Person(Base):
>>> __tablename__ = 'person'
>>>
>>> id = Column(Integer, primary_key=True)
>>> data = Column(JSON)
>>>
>>> name = index_property('data', 'name', default=None) # See default
>>> person = Person()
>>> print(person.name)
None
这些属性也可以在类级别访问。下面,我们说明了 Person.name
用于生成带索引的 SQL 条件:
>>> from sqlalchemy.orm import Session
>>> session = Session()
>>> query = session.query(Person).filter(Person.name == 'Alchemist')
上面的查询等效于:
>>> query = session.query(Person).filter(Person.data['name'] == 'Alchemist')
可以链式连接多个 index_property
对象以产生多级索引:
from sqlalchemy import Column, JSON, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.indexable import index_property
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
data = Column(JSON)
birthday = index_property('data', 'birthday')
year = index_property('birthday', 'year')
month = index_property('birthday', 'month')
day = index_property('birthday', 'day')
上面,诸如以下查询:
q = session.query(Person).filter(Person.year == '1980')
在 PostgreSQL 后端上,上述查询将呈现为:
SELECT person.id, person.data
FROM person
WHERE person.data -> %(data_1)s -> %(param_1)s = %(param_2)s
index_property
包含了当索引数据结构不存在时的特殊行为,以及调用了设置操作时:
index_property
,默认数据结构将是包含 None
值的 Python 列表,至少与索引值一样长;然后将值设置在列表中的位置。这意味着对于索引值为零的索引值,列表将在设置给定值之前初始化为 [None]
,而对于索引值为五的索引值,列表将在将第五个元素设置为给定值之前初始化为 [None, None, None, None, None]
。请注意,现有列表 不会 在原地扩展以接收值。
index_property
,将使用 Python 字典作为默认数据结构。
index_property.datatype
参数将默认数据结构设置为任何 Python 可调用对象,从而覆盖之前的规则。
index_property
可以进行子类化,特别是用于提供在访问时进行值或 SQL 表达式强制转换的常见用例。下面是一个常见的配方,用于与 PostgreSQL JSON 类型一起使用,其中我们还希望包括自动转换以及 astext()
:
class pg_json_property(index_property):
def __init__(self, attr_name, index, cast_type):
super(pg_json_property, self).__init__(attr_name, index)
self.cast_type = cast_type
def expr(self, model):
expr = super(pg_json_property, self).expr(model)
return expr.astext.cast(self.cast_type)
上述子类可与 PostgreSQL 特定版本的 JSON
一起使用:
from sqlalchemy import Column, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import JSON
Base = declarative_base()
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
data = Column(JSON)
age = pg_json_property('data', 'age', Integer)
在实例级别的 age
属性仍然可以正常工作;但是,在渲染 SQL 时,将使用 PostgreSQL 的 ->>
运算符进行索引访问,而不是通常的索引运算符 ->
:
>>> query = session.query(Person).filter(Person.age < 20)
上述查询将呈现为:
SELECT person.id, person.data
FROM person
WHERE CAST(person.data ->> %(data_1)s AS INTEGER) < %(param_1)s
对象名称 | 描述 |
---|---|
index_property | 一个属性生成器。生成的属性描述了一个与 Indexable 列对应的对象属性。 |
class sqlalchemy.ext.indexable.index_property
一个属性生成器。生成的属性描述了一个与 Indexable
列对应的对象属性。
另请参阅
sqlalchemy.ext.indexable
成员
init()
类签名
类 sqlalchemy.ext.indexable.index_property
(sqlalchemy.ext.hybrid.hybrid_property
)
method __init__(attr_name, index, default=<object object>, datatype=None, mutable=True, onebased=True)
创建一个新的 index_property
。
参数:
attr_name
– 一个可索引类型列的属性名称,或者返回可索引结构的其他属性。
index
– 用于获取和设置此值的索引。这应该是整数的 Python 端索引值。
default
– 在给定索引处没有值时返回的值。
datatype
– 当字段为空时使用的默认数据类型。默认情况下,这是从使用的索引类型派生的;对于整数索引,是一个 Python 列表,对于任何其他类型的索引,是一个 Python 字典。对于列表,列表将被初始化为至少index
元素长的 None 值列表。
mutable
– 如果为 False,则将禁止对属性的写入和删除。
onebased
– 假设此值的 SQL 表示是基于一的;也就是说,在 SQL 中,第一个索引是 1,而不是零。
原文:
docs.sqlalchemy.org/en/20/orm/extensions/instrumentation.html
可扩展的类仪器化。
sqlalchemy.ext.instrumentation
包提供了 ORM 内的类仪器化的替代系统。类仪器化是指 ORM 如何将属性放在类上,以维护数据并跟踪对该数据的更改,以及安装在类上的事件钩子。
注意
该扩展包是为了与其他已经执行自己仪器化的对象管理包集成而提供的。它不适用于一般用途。
查看仪器扩展如何使用的示例,请参见示例属性仪器化。
对象名称 | 描述 |
---|---|
ExtendedInstrumentationRegistry | 使用额外的簿记扩展InstrumentationFactory,以适应多种类型的类管理器。 |
instrumentation_finders | 一个可扩展的返回仪器化实现的可调用序列 |
INSTRUMENTATION_MANAGER | 属性,当存在于映射类上时,选择自定义仪器化。 |
InstrumentationFactory | 用于新的 ClassManager 实例的工厂。 |
InstrumentationManager | 用户定义的类仪器化扩展。 |
sqlalchemy.ext.instrumentation.INSTRUMENTATION_MANAGER = '__sa_instrumentation_manager__'
属性,当存在于映射类上时,选择自定义仪器化。
允许类指定稍微或完全不同的技术来跟踪对映射属性和集合所做的更改。
在给定对象继承层次结构中只允许一个仪器化实现。
此属性的值必须是可调用的,并将传递一个类对象。可调用对象必须返回以下之一:
InstrumentationManager
的实例或子类ClassManager
的实例或子类一旦导入了sqlalchemy.ext.instrumentation
模块,此属性将被 SQLAlchemy 仪器解析所使用。 如果在全局仪器查找器列表中安装了自定义查找器,则它们可能会选择是否尊重此属性。
class sqlalchemy.orm.instrumentation.InstrumentationFactory
用于新的 ClassManager 实例的工厂。
类签名
类sqlalchemy.orm.instrumentation.InstrumentationFactory
(sqlalchemy.event.registry.EventTarget
)
class sqlalchemy.ext.instrumentation.InstrumentationManager
用户定义的类仪器扩展。
InstrumentationManager
可以被子类化以改变类仪器化的进行方式。 此类存在的目的是与其他对象管理框架集成,这些框架希望完全修改 ORM 的仪器方法,并且不适用于常规使用。 有关截取类仪器化事件,请参阅InstrumentationEvents
。
成员
dict_getter(), get_instance_dict(), initialize_instance_dict(), install_descriptor(), install_member(), install_state(), instrument_attribute(), instrument_collection_class(), manage(), manager_getter(), post_configure_attribute(), remove_state(), state_getter(), uninstall_descriptor(), uninstall_member(), unregister()
此类的 API 应被视为半稳定的,并且可能会随着新版本略微更改。
method dict_getter(class_)
method get_instance_dict(class_, instance)
method initialize_instance_dict(class_, instance)
method install_descriptor(class_, key, inst)
method install_member(class_, key, implementation)
method install_state(class_, instance, state)
method instrument_attribute(class_, key, inst)
method instrument_collection_class(class_, key, collection_class)
method manage(class_, manager)
method manager_getter(class_)
method post_configure_attribute(class_, key, inst)
method remove_state(class_, instance)
method state_getter(class_)
method uninstall_descriptor(class_, key)
method uninstall_member(class_, key)
method unregister(class_, manager)
sqlalchemy.ext.instrumentation.instrumentation_finders = [<function find_native_user_instrumentation_hook>]
一个可扩展的返回仪器实现的可调用序列
当一个类被注册时,每个可调用对象都将传递一个类对象。如果返回 None,则会查阅序列中的下一个查找器。否则,返回值必须是一个遵循与 sqlalchemy.ext.instrumentation.INSTRUMENTATION_MANAGER 相同指南的仪器工厂。
默认情况下,唯一的查找器是 find_native_user_instrumentation_hook,它搜索 INSTRUMENTATION_MANAGER。如果所有查找器都返回 None,则使用标准的 ClassManager 仪器。
class sqlalchemy.ext.instrumentation.ExtendedInstrumentationRegistry
用额外的记录扩展了 InstrumentationFactory
,以适应多种类型的类管理器。
类签名
类 sqlalchemy.ext.instrumentation.ExtendedInstrumentationRegistry
(sqlalchemy.orm.instrumentation.InstrumentationFactory
)
对象名称 | 描述 |
---|---|
扩展仪器注册表 | 用额外的记录扩展了 InstrumentationFactory,以适应多种类型的类管理器。 |
instrumentation_finders | 一个可扩展的序列,其中包含返回仪器实现的可调用对象。 |
INSTRUMENTATION_MANAGER | 属性,在映射类上出现时选择自定义仪器。 |
仪器工厂 | 用于创建新的 ClassManager 实例的工厂。 |
仪器管理器 | 用户定义的类仪器扩展。 |
sqlalchemy.ext.instrumentation.INSTRUMENTATION_MANAGER = '__sa_instrumentation_manager__'
属性,在映射类上出现时选择自定义仪器。
允许一个类指定一种稍微或完全不同的技术来跟踪对映射属性和集合所做的更改。
在给定对象继承层次结构中只允许有一个仪器实现。
此属性的值必须是一个可调用对象,并将传递一个类对象。可调用对象必须返回以下之一:
InstrumentationManager
或其子类的实例ClassManager
或其子类的实例一旦导入 sqlalchemy.ext.instrumentation
模块,此属性将由 SQLAlchemy 仪器化解析所使用。如果在全局 instrumentation_finders
列表中安装了自定义查找器,则它们可能会选择是否尊重此属性。
class sqlalchemy.orm.instrumentation.InstrumentationFactory
生成新的 ClassManager
实例的工厂。
类签名
类 sqlalchemy.orm.instrumentation.InstrumentationFactory
(sqlalchemy.event.registry.EventTarget
)
class sqlalchemy.ext.instrumentation.InstrumentationManager
用户自定义类仪器化扩展。
可以通过子类化 InstrumentationManager
来更改类仪器化的方式。此类存在的目的是为了与其他希望完全修改 ORM 的仪器化方法的对象管理框架集成,并不适用于常规使用。要拦截类仪器化事件,请参阅 InstrumentationEvents
。
成员
dict_getter(), get_instance_dict(), initialize_instance_dict(), install_descriptor(), install_member(), install_state(), instrument_attribute(), instrument_collection_class(), manage(), manager_getter(), post_configure_attribute(), remove_state(), state_getter(), uninstall_descriptor(), uninstall_member(), unregister()
该类的 API 应被视为半稳定,可能会在新版本中略微更改。
method dict_getter(class_)
method get_instance_dict(class_, instance)
method initialize_instance_dict(class_, instance)
method install_descriptor(class_, key, inst)
method install_member(class_, key, implementation)
method install_state(class_, instance, state)
method instrument_attribute(class_, key, inst)
method instrument_collection_class(class_, key, collection_class)
method manage(class_, manager)
method manager_getter(class_)
method post_configure_attribute(class_, key, inst)
method remove_state(class_, instance)
method state_getter(class_)
method uninstall_descriptor(class_, key)
method uninstall_member(class_, key)
method unregister(class_, manager)
sqlalchemy.ext.instrumentation.instrumentation_finders = [<function find_native_user_instrumentation_hook>]
一个可扩展的可调用序列,返回仪器化实现。
当一个类被注册时,每个可调用对象都将传递一个类对象。如果返回 None,则会查阅序列中的下一个查找器。否则,返回值必须是一个遵循与 sqlalchemy.ext.instrumentation.INSTRUMENTATION_MANAGER 相同指南的检测工厂。
默认情况下,唯一的查找器是 find_native_user_instrumentation_hook,它搜索 INSTRUMENTATION_MANAGER。如果所有查找器都返回 None,则使用标准的 ClassManager 仪器化。
class sqlalchemy.ext.instrumentation.ExtendedInstrumentationRegistry
通过额外的簿记扩展InstrumentationFactory
,以适应多种类型的类管理器。
类签名
类sqlalchemy.ext.instrumentation.ExtendedInstrumentationRegistry
(sqlalchemy.orm.instrumentation.InstrumentationFactory
)
SQLAlchemy 发行版包含各种代码示例,展示了一组选择的模式,有些是典型的,有些则不太典型。所有示例都可以运行,并且可以在发行版的/examples
目录中找到。这里可以找到所有示例的描述和源代码。
其他 SQLAlchemy 示例,一些是用户贡献的,可以在www.sqlalchemy.org/trac/wiki/UsageRecipes
的 wiki 上找到。
一个使用邻接列表模型映射的字典-字典结构的示例。
例如:
node = TreeNode('rootnode')
node.append('node1')
node.append('node3')
session.add(node)
session.commit()
dump_tree(node)
文件列表:
展示了“关联对象”模式的使用示例,其中一个中间类在两个关联在多对多模式中的类之间进行关联。
文件列表:
sqlalchemy.ext.associationproxy
的使用,以使对OrderItem
的显式引用变为可选。
展示 SQLAlchemy 的 asyncio 引擎特性的示例。
文件列表:
sqlalchemy.ext.asyncio.AsyncSession
对象用于异步 ORM 使用。
asyncio.gather()
在许多 asyncio 数据库连接上并发运行多个语句,将 ORM 结果合并到单个AsyncSession
中。
有向图结构的持久性示例。图被存储为一组边,每条边都引用节点表中的“下级”和“上级”节点。演示了基本的持久性和查询下级和上级邻居的方法:
n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())
文件清单:
演示如何在“动态”关系之上放置类似字典的外观,以便字典操作(假设简单字符串键)可以在不一次加载完整集合的情况下操作大集合。
文件清单:
演示了将多种类型的父对象与特定子对象关联的各种方法。
所有示例都使用声明性扩展和声明性混合类。每个示例最后都呈现相同的用例 - 两个类,Customer
和 Supplier
,都是 HasAddresses
混合类的子类,该混合类确保父类提供一个包含 Address
对象的 addresses
集合。
discriminator_on_association.py 和 generic_fk.py 脚本是在 2007 年博客文章 使用 SQLAlchemy 实现多态关联 中提出的配方的现代化版本。
文件清单:
使用 SQLAlchemy ORM 展示了用于分层数据的“物化路径”模式。
文件列表:
使用 SQLAlchemy ORM 展示了一种实现用于分层数据的“嵌套集”模式的简单方法。
文件列表:
一个用于各种 SQLAlchemy 使用情况的性能分析套件。
每个套件都专注于特定的用例,具有特定的性能配置文件和相关影响:
所有套件包括一系列使用模式,既展示了核心使用,也展示了 ORM 使用,并且通常按性能从最差到最好的顺序排序,根据 SQLAlchemy 提供的功能量的多少相反排序,从最多到最少(这两件事通常完美对应)。
一个命令行工具在包级别被呈现,允许单独的套件运行:
$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
[--num NUM] [--profile] [--dump]
[--echo]
{bulk_inserts,large_resultsets,single_inserts}
positional arguments:
{bulk_inserts,large_resultsets,single_inserts}
suite to run
optional arguments:
-h, --help show this help message and exit
--test TEST run specific test name
--dburl DBURL database URL, default sqlite:///profile.db
--num NUM Number of iterations/items/etc for tests;
default is module-specific
--profile run profiling and dump call counts
--dump dump full call profile (implies --profile)
--echo Echo SQL output
一个示例运行如下:
$ python -m examples.performance bulk_inserts
或者使用选项:
$ python -m examples.performance bulk_inserts \
--dburl mysql+mysqldb://scott:tiger@localhost/test \
--profile --num 1000
另请参阅
如何对 SQLAlchemy 驱动的应用程序进行性能分析?
文件列表:
这是默认形式的运行:
$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
test_bulk_insert_dictionaries, test_core,
test_core_query_caching, test_dbapi_raw_w_connect,
test_dbapi_raw_w_pool
test_orm_commit : Individual INSERT/COMMIT pairs via the
ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
the "bulk" API (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
the "bulk" API with dictionaries (10000 iterations);
total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
(10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
connection pool (10000 iterations); total time 8.001813 sec
可以为所有测试或更常见的是单个测试生成 Python 配置文件输出:
$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
186109 function calls (186102 primitive calls) in 1.089 seconds
Ordered by: internal time, call count
ncalls tottime percall cumtime percall filename:lineno(function)
1000 0.634 0.001 0.634 0.001 {method 'commit' of 'sqlite3.Connection' objects}
1000 0.154 0.000 0.154 0.000 {method 'execute' of 'sqlite3.Cursor' objects}
1000 0.021 0.000 0.074 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
1000 0.015 0.000 0.034 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
1 0.012 0.012 1.091 1.091 examples/performance/single_inserts.py:79(test_core)
...
分析套件系统是可扩展的,并且可以应用于您自己的一组测试。这是一个有价值的技术,可用于决定一些性能关键的程序集的正确方法。例如,如果我们想要分析几种加载之间的差异,我们可以创建一个名为test_loads.py
的文件,内容如下:
from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = None
session = None
class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
children = relationship("Child")
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('parent.id'))
# Init with name of file, default number of items
Profiler.init("test_loads", 1000)
@Profiler.setup_once
def setup_once(dburl, echo, num):
"setup once. create an engine, insert fixture data"
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
sess = Session(engine)
sess.add_all([
Parent(children=[Child() for j in range(100)])
for i in range(num)
])
sess.commit()
@Profiler.setup
def setup(dburl, echo, num):
"setup per test. create a new Session."
global session
session = Session(engine)
# pre-connect so this part isn't profiled (if we choose)
session.connection()
@Profiler.profile
def test_lazyload(n):
"load everything, no eager loading."
for parent in session.query(Parent):
parent.children
@Profiler.profile
def test_joinedload(n):
"load everything, joined eager loading."
for parent in session.query(Parent).options(joinedload("children")):
parent.children
@Profiler.profile
def test_subqueryload(n):
"load everything, subquery eager loading."
for parent in session.query(Parent).options(subqueryload("children")):
parent.children
if __name__ == '__main__':
Profiler.main()
我们可以直接运行我们的新脚本:
$ python test_loads.py --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec
```### 太空入侵者
使用 SQLite 作为状态机的太空入侵者游戏。
最初于 2012 年开发。适应在 Python 3 中工作。
在文本控制台中使用 ASCII 艺术运行。
![../_images/space_invaders.jpg](https://gitee.com/OpenDocCN/py-docs-zh/raw/master/docs/sqlalch_20/img/df5d8744e7ec946672bdde78d1db980c.png)
要运行:
```py
python -m examples.space_invaders.space_invaders
在运行时,观察日志中的 SQL 输出:
tail -f space_invaders.log
祝您愉快!
文件列表:
说明了一个扩展,它为实体创建版本表并为每个更改存储记录。给定的扩展生成一个匿名的“history”类,表示目标对象的历史版本。
与使用时间行进行版本控制示例相比,该示例将更新写入为同一表中的新行,而不使用单独的历史表。
通过一个名为test_versioning.py
的单元测试模块演示了用法,该模块可以像其他模块一样运行,内部使用unittest
:
python -m examples.versioned_history.test_versioning
示例用法的片段,使用声明性:
from history_meta import Versioned, versioned_session
class Base(DeclarativeBase):
pass
class SomeClass(Versioned, Base):
__tablename__ = 'sometable'
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
Session = sessionmaker(bind=engine)
versioned_session(Session)
sess = Session()
sc = SomeClass(name='sc1')
sess.add(sc)
sess.commit()
sc.name = 'sc1modified'
sess.commit()
assert sc.version == 2
SomeClassHistory = SomeClass.__history_mapper__.class_
assert sess.query(SomeClassHistory).\
filter(SomeClassHistory.version == 1).\
all() \
== [SomeClassHistory(version=1, name='sc1')]
Versioned
混合类设计用于与声明性结合使用。要将该扩展与经典映射器一起使用,可以应用_history_mapper
函数:
from history_meta import _history_mapper
m = mapper(SomeClass, sometable)
_history_mapper(m)
SomeHistoryClass = SomeClass.__history_mapper__.class_
版本控制示例还与 ORM 乐观并发特性集成,文档化在配置版本计数器中。要启用此功能,请将标志Versioned.use_mapper_versioning
设置为 True:
class SomeClass(Versioned, Base):
__tablename__ = 'sometable'
use_mapper_versioning = True
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
如果两个具有相同版本标识符的SomeClass
实例被同时更新并发送到数据库进行并发 UPDATE,如果数据库隔离级别允许两个 UPDATE 语句继续,其中一个将失败,因为它不再针对最后已知的版本标识符。
文件列表:
history_meta.py
模块函数用法的单元测试。
几个示例说明了拦截首先被解释为对行的 UPDATE 的更改的技术,并将其转换为对新行的 INSERT,将先前的行保留为历史版本。
与带有历史表的版本化示例进行比较,该示例将历史行写入单独的历史表。
文件列表:
versioned_rows.py
中相同的 UPDATE 转换为 INSERT 技术,但还发出了一个 UPDATE 命令来影响旧行的时间戳更改。 还包括一个SessionEvents.do_orm_execute()
钩子,以限制查询仅针对最新版本。 ### 垂直属性映射
展示了“垂直表”映射。
“垂直表”是指将对象的各个属性存储为表中的不同行的技术。 “垂直表”技术用于持久化可以具有各种属性集的对象,但牺牲了简单的查询控制和简洁性。 它通常在内容/文档管理系统中找到,以灵活地表示用户创建的结构。
给出了两种方法。在第二种方法中,每行引用一个“数据类型”,其中包含有关属性中存储的信息类型的信息,例如整数、字符串或日期。
例子:
shrew = Animal(u'shrew')
shrew[u'cuteness'] = 5
shrew[u'weasel-like'] = False
shrew[u'poisonous'] = True
session.add(shrew)
session.flush()
q = (session.query(Animal).
filter(Animal.facts.any(
and_(AnimalFact.key == u'weasel-like',
AnimalFact.value == True))))
print('weasel-like animals', q.all())
文件列表:
单表、联接表和具体表继承的工作示例,如映射类继承层次结构中所述。
文件列表:
示例说明了对 SQLAlchemy 属性管理系统的修改。
文件列表:
AttributeEvents.init_scalar()
事件,配合核心列默认值,以提供 ORM 对象,当访问未设置的属性时自动产生默认值。
sqlalchemy.ext.instrumentation
扩展包自定义类仪器化。### 水平分片
SQLAlchemy 分片 API 的基本示例。分片是指在多个数据库之间水平扩展数据。
“分片”映射的基本组件包括:
Engine
实例,每个都分配了一个“分片 id”。这些Engine
实例可能引用不同的数据库,或者同一数据库中的不同模式/帐户,或者它们甚至可以仅通过会导致它们在使用时访问不同模式或表的选项进行区分。
在这些示例中,使用不同类型的分片对相同的基本示例进行操作,该示例根据每个大陆的天气数据进行处理。我们提供了示例的 shard_chooser、id_chooser 和 query_chooser 函数。query_chooser 说明了检查 SQL 表达式元素以尝试确定所请求的单个分片。
创建通用分片例程是组织多个数据库实例的问题的一种雄心勃勃的方法。对于更简明的替代方案,“不同实体”方法是一种以明确方式将对象分配给不同表(以及可能的数据库节点)的简单方法 - 在EntityName的维基页面中有描述。
文件列表:
演示了如何使用 dogpile.cache 功能嵌入 ORM 查询,允许完全控制缓存以及从长期缓存中提取“延迟加载”的属性。
示例包括演示with_loader_criteria()
选项以及SessionEvents.do_orm_execute()
钩子的用法。
从 SQLAlchemy 1.4 开始,Query
构造与 Select
构造合并在一起,因此这两个对象基本上是相同的。
文件列表:
filter_public.py - 演示了应用于特定类型实体的全局条件。
在这个演示中,以下技术被说明:
SessionEvents.do_orm_execute()
事件挂钩
Session.execute()
的基本技术,从自定义缓存源中获取数据,而不是从数据库中获取。
UserDefinedOption
对象配置语句对象中的选项。
另请参阅
重新执行语句 - 包含此处提出的技术的一般示例。
例如:
# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))
# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))
# execute and results
result = session.execute(stmt)
print(result.scalars().all())
要运行,必须安装 SQLAlchemy 和 dogpile.cache,或者将它们安装到当前的 PYTHONPATH。演示将创建一个本地目录用于数据文件,插入初始数据,然后运行。第二次运行演示将利用已经存在的缓存文件,并且仅会发出一条 SQL 语句来查询两个表 - 但是显示的结果将利用数十个懒加载,所有懒加载都从缓存中获取。
演示脚本本身按复杂性顺序作为 Python 模块运行,以便相对导入起作用。
python -m examples.dogpile_caching.helloworld
python -m examples.dogpile_caching.relationship_caching
python -m examples.dogpile_caching.advanced
python -m examples.dogpile_caching.local_session_caching
文件列表:
使用邻接表模型映射的字典-字典结构的示例。
例如:
node = TreeNode('rootnode')
node.append('node1')
node.append('node3')
session.add(node)
session.commit()
dump_tree(node)
文件列表:
描述了“关联对象”模式的使用示例,其中一个中介类在两个以多对多模式关联的类之间进行关系中介。
文件清单:
sqlalchemy.ext.associationproxy
的使用,以使对 OrderItem
的显式引用是可选的。
描述了 SQLAlchemy 的 asyncio 引擎功能的示例。
文件清单:
sqlalchemy.ext.asyncio.AsyncSession
对象进行异步 ORM 使用。
asyncio.gather()
在许多 asyncio 数据库连接上同时运行许多语句,将 ORM 结果合并为单个 AsyncSession
。
有向图结构的持久性示例。 图以一组边的形式存储,每个边都引用节点表中的“下限”和“上限”节点。 演示了基本的持久性和查询“下限”和“上限”邻居的方法:
n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())
文件清单:
展示了如何在“动态”关系之上放置类似字典的外观,以便字典操作(假设简单字符串键)可以在一次加载完整集合的情况下操作大型集合。
文件清单:
展示了将多种类型的父对象与特定子对象关联的各种方法。
所有示例都使用了声明性扩展和声明性混合。每个示例最终呈现相同的用例 - 两个类,Customer
和Supplier
,都是HasAddresses
混合类的子类,该混合类确保父类提供一个包含Address
对象的addresses
集合。
discriminator_on_association.py 和 generic_fk.py 脚本是 2007 年博客文章使用 SQLAlchemy 进行多态关联中提出的配方的现代化版本。
文件列表:
展示了使用 SQLAlchemy ORM 实现“材料化路径”模式的方法。
文件列表:
展示了使用 SQLAlchemy ORM 实现“嵌套集”模式的基本方法。
文件列表:
用于各种 SQLAlchemy 用例的性能分析套件。
每个套件专注于具有特定性能配置文件和相关影响的特定用例:
所有套件都包括一系列使用模式,说明了核心和 ORM 使用,并且通常按性能从最差到最佳的顺序排序,基于 SQLAlchemy 提供的功能数量,从最大到最小(这两个方面通常完美对应)。
一个命令行工具在包级别被呈现,它允许运行个别套件:
$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
[--num NUM] [--profile] [--dump]
[--echo]
{bulk_inserts,large_resultsets,single_inserts}
positional arguments:
{bulk_inserts,large_resultsets,single_inserts}
suite to run
optional arguments:
-h, --help show this help message and exit
--test TEST run specific test name
--dburl DBURL database URL, default sqlite:///profile.db
--num NUM Number of iterations/items/etc for tests;
default is module-specific
--profile run profiling and dump call counts
--dump dump full call profile (implies --profile)
--echo Echo SQL output
示例运行如下:
$ python -m examples.performance bulk_inserts
或使用选项:
$ python -m examples.performance bulk_inserts \
--dburl mysql+mysqldb://scott:tiger@localhost/test \
--profile --num 1000
另请参阅
我如何分析使用 SQLAlchemy 的应用程序?
文件列表:
这是默认的运行形式:
$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
test_bulk_insert_dictionaries, test_core,
test_core_query_caching, test_dbapi_raw_w_connect,
test_dbapi_raw_w_pool
test_orm_commit : Individual INSERT/COMMIT pairs via the
ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
the "bulk" API (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
the "bulk" API with dictionaries (10000 iterations);
total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
(10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
connection pool (10000 iterations); total time 8.001813 sec
Python 分析配置文件可以为所有测试或更常见的是为个别测试转储:
$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
186109 function calls (186102 primitive calls) in 1.089 seconds
Ordered by: internal time, call count
ncalls tottime percall cumtime percall filename:lineno(function)
1000 0.634 0.001 0.634 0.001 {method 'commit' of 'sqlite3.Connection' objects}
1000 0.154 0.000 0.154 0.000 {method 'execute' of 'sqlite3.Cursor' objects}
1000 0.021 0.000 0.074 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
1000 0.015 0.000 0.034 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
1 0.012 0.012 1.091 1.091 examples/performance/single_inserts.py:79(test_core)
...
分析套件系统是可扩展的,并且可以应用于您自己的一组测试。这是一个在决定某些性能关键程度的一组例程的正确方法时使用的宝贵技术。例如,如果我们想要分析几种加载之间的差异,我们可以创建一个名为test_loads.py
的文件,内容如下:
from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = None
session = None
class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
children = relationship("Child")
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('parent.id'))
# Init with name of file, default number of items
Profiler.init("test_loads", 1000)
@Profiler.setup_once
def setup_once(dburl, echo, num):
"setup once. create an engine, insert fixture data"
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
sess = Session(engine)
sess.add_all([
Parent(children=[Child() for j in range(100)])
for i in range(num)
])
sess.commit()
@Profiler.setup
def setup(dburl, echo, num):
"setup per test. create a new Session."
global session
session = Session(engine)
# pre-connect so this part isn't profiled (if we choose)
session.connection()
@Profiler.profile
def test_lazyload(n):
"load everything, no eager loading."
for parent in session.query(Parent):
parent.children
@Profiler.profile
def test_joinedload(n):
"load everything, joined eager loading."
for parent in session.query(Parent).options(joinedload("children")):
parent.children
@Profiler.profile
def test_subqueryload(n):
"load everything, subquery eager loading."
for parent in session.query(Parent).options(subqueryload("children")):
parent.children
if __name__ == '__main__':
Profiler.main()
我们可以直接运行我们的新脚本:
$ python test_loads.py --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec
```### 太空入侵
使用 SQLite 作为状态机的太空入侵游戏。
最初于 2012 年开发。适应 Python 3 中运行。
使用 ASCII 艺术在文本控制台中运行。
![../_images/space_invaders.jpg](https://gitee.com/OpenDocCN/py-docs-zh/raw/master/docs/sqlalch_20/img/df5d8744e7ec946672bdde78d1db980c.png)
要运行:
```py
python -m examples.space_invaders.space_invaders
在运行时,请观察日志中的 SQL 输出:
tail -f space_invaders.log
祝您愉快!
文件列表:
演示了一个扩展,它为实体创建版本表并为每个更改存储记录。给定的扩展生成一个匿名的“历史”类,该类表示目标对象的历史版本。
与在相同表中将更新写为新行的使用时间行进行版本控制示例进行比较,而不使用单独的历史表。
通过一个单元测试模块 test_versioning.py
展示了用法,可以像运行任何其他模块一样运行,内部使用 unittest
:
python -m examples.versioned_history.test_versioning
一个使用声明性的示例用法片段:
from history_meta import Versioned, versioned_session
class Base(DeclarativeBase):
pass
class SomeClass(Versioned, Base):
__tablename__ = 'sometable'
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
Session = sessionmaker(bind=engine)
versioned_session(Session)
sess = Session()
sc = SomeClass(name='sc1')
sess.add(sc)
sess.commit()
sc.name = 'sc1modified'
sess.commit()
assert sc.version == 2
SomeClassHistory = SomeClass.__history_mapper__.class_
assert sess.query(SomeClassHistory).\
filter(SomeClassHistory.version == 1).\
all() \
== [SomeClassHistory(version=1, name='sc1')]
Versioned
混合类设计用于与声明性一起使用。要将扩展与经典映射器一起使用,可以应用 _history_mapper
函数:
from history_meta import _history_mapper
m = mapper(SomeClass, sometable)
_history_mapper(m)
SomeHistoryClass = SomeClass.__history_mapper__.class_
版本控制示例还与文档中记录的 ORM 乐观并发特性集成在一起 配置版本计数器。要启用此功能,请将标志 Versioned.use_mapper_versioning
设置为 True:
class SomeClass(Versioned, Base):
__tablename__ = 'sometable'
use_mapper_versioning = True
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
如果两个具有相同版本标识符的 SomeClass
实例被同时更新并发送到数据库以进行并发 UPDATE,如果数据库隔离级别允许两个 UPDATE 语句继续进行,则其中一个将失败,因为它不再针对最后已知的版本标识符。
文件列表:
history_meta.py
模块函数的用法的单元测试。
有几个示例说明了拦截更改的技术,这些更改首先被解释为对行的 UPDATE,而实际上将其转换为对新行的 INSERT,使以前的行保持不变作为历史版本。
与将历史行写入单独的历史表的使用历史表进行版本控制示例进行比较。
文件列表:
versioned_rows.py
中相同的 UPDATE 到 INSERT 技术,但也发出了对旧行的 UPDATE 以影响时间戳的更改。还包括一个SessionEvents.do_orm_execute()
钩子,以限制查询仅限于最新版本。 ### 竖直属性映射
说明了“竖直表”映射。
“竖直表”是指一种技术,其中对象的各个属性被存储为表中的不同行。使用“竖直表”技术来持久化可以具有不同属性集的对象,但会牺牲简单的查询控制和简洁性。在内容/文档管理系统中通常可以灵活表示用户创建的结构。
给出了两种方法的变体。在第二种方法中,每行引用一个“数据类型”,其中包含关于属性中存储的信息类型的信息,例如整数、字符串或日期。
示例:
shrew = Animal(u'shrew')
shrew[u'cuteness'] = 5
shrew[u'weasel-like'] = False
shrew[u'poisonous'] = True
session.add(shrew)
session.flush()
q = (session.query(Animal).
filter(Animal.facts.any(
and_(AnimalFact.key == u'weasel-like',
AnimalFact.value == True))))
print('weasel-like animals', q.all())
文件清单:
以邻接表模型映射的字典嵌套结构的示例。
例如:
node = TreeNode('rootnode')
node.append('node1')
node.append('node3')
session.add(node)
session.commit()
dump_tree(node)
文件清单:
示例说明了“关联对象”模式的使用,其中一个中间类介于两个以多对多模式关联的类之间。
文件清单:
sqlalchemy.ext.associationproxy
的使用,以使对OrderItem
的显式引用成为可选。
示例说明了 SQLAlchemy 的 asyncio 引擎功能。
文件清单:
sqlalchemy.ext.asyncio.AsyncSession
对象进行异步 ORM 使用。
asyncio.gather()
在许多 asyncio 数据库连接上并发运行多个语句,将 ORM 结果合并到单个AsyncSession
中。
一个有向图结构的持久性示例。图被存储为一组边,每条边都引用节点表中的“较低”和“较高”节点。演示了基本的持久性和查询较低和较高邻居的方法:
n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())
文件清单:
演示了如何在“动态”关系之上放置类似于字典的外观,以便字典操作(假设简单的字符串键)可以在大型集合上进行操作,而无需一次加载整个集合。
文件清单:
演示了将多种类型的父类与特定子对象关联的各种方法。
所有示例都使用声明性扩展以及声明性 mixin。每个示例最后都呈现相同的用例 - 两个类,Customer
和Supplier
,都是HasAddresses
mixin 的子类,该 mixin 确保父类提供了一个包含Address
对象的addresses
集合。
discriminator_on_association.py 和 generic_fk.py 脚本是 2007 年博客文章使用 SQLAlchemy 进行多态关联中提出的配方的现代化版本。
文件清单:
演示了使用 SQLAlchemy ORM 实现“材料化路径”模式的方法。
文件清单:
演示了使用 SQLAlchemy ORM 实现“嵌套集”模式的一种基本方法。
文件清单:
用于各种 SQLAlchemy 用例的性能分析套件。
每个套件都专注于特定用例,具有特定的性能概况和相关含义:
所有套件都包括各种使用模式,说明了 Core 和 ORM 的使用,并且通常按性能从最差到最佳的顺序排序,根据 SQLAlchemy 提供的功能量的多少,从最大到最小排列(这两件事通常完美对应)。
一个命令行工具在包级别呈现,允许运行各个套件:
$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
[--num NUM] [--profile] [--dump]
[--echo]
{bulk_inserts,large_resultsets,single_inserts}
positional arguments:
{bulk_inserts,large_resultsets,single_inserts}
suite to run
optional arguments:
-h, --help show this help message and exit
--test TEST run specific test name
--dburl DBURL database URL, default sqlite:///profile.db
--num NUM Number of iterations/items/etc for tests;
default is module-specific
--profile run profiling and dump call counts
--dump dump full call profile (implies --profile)
--echo Echo SQL output
一个示例运行如下:
$ python -m examples.performance bulk_inserts
或者带有选项:
$ python -m examples.performance bulk_inserts \
--dburl mysql+mysqldb://scott:tiger@localhost/test \
--profile --num 1000
另请参阅
我如何分析一个由 SQLAlchemy 驱动的应用程序?
文件清单:
这是运行的默认形式:
$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
test_bulk_insert_dictionaries, test_core,
test_core_query_caching, test_dbapi_raw_w_connect,
test_dbapi_raw_w_pool
test_orm_commit : Individual INSERT/COMMIT pairs via the
ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
the "bulk" API (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
the "bulk" API with dictionaries (10000 iterations);
total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
(10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
connection pool (10000 iterations); total time 8.001813 sec
可以为所有测试或更常见的是个别测试转储 Python 分析输出:
$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
186109 function calls (186102 primitive calls) in 1.089 seconds
Ordered by: internal time, call count
ncalls tottime percall cumtime percall filename:lineno(function)
1000 0.634 0.001 0.634 0.001 {method 'commit' of 'sqlite3.Connection' objects}
1000 0.154 0.000 0.154 0.000 {method 'execute' of 'sqlite3.Cursor' objects}
1000 0.021 0.000 0.074 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
1000 0.015 0.000 0.034 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
1 0.012 0.012 1.091 1.091 examples/performance/single_inserts.py:79(test_core)
...
分析套件系统是可扩展的,并且可以应用于你自己的一组测试。这是一种在决定某些性能关键例程的正确方法时使用的有价值技术。例如,如果我们想要分析几种加载方式之间的差异,我们可以创建一个名为 test_loads.py
的文件,其内容如下:
from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = None
session = None
class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
children = relationship("Child")
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('parent.id'))
# Init with name of file, default number of items
Profiler.init("test_loads", 1000)
@Profiler.setup_once
def setup_once(dburl, echo, num):
"setup once. create an engine, insert fixture data"
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
sess = Session(engine)
sess.add_all([
Parent(children=[Child() for j in range(100)])
for i in range(num)
])
sess.commit()
@Profiler.setup
def setup(dburl, echo, num):
"setup per test. create a new Session."
global session
session = Session(engine)
# pre-connect so this part isn't profiled (if we choose)
session.connection()
@Profiler.profile
def test_lazyload(n):
"load everything, no eager loading."
for parent in session.query(Parent):
parent.children
@Profiler.profile
def test_joinedload(n):
"load everything, joined eager loading."
for parent in session.query(Parent).options(joinedload("children")):
parent.children
@Profiler.profile
def test_subqueryload(n):
"load everything, subquery eager loading."
for parent in session.query(Parent).options(subqueryload("children")):
parent.children
if __name__ == '__main__':
Profiler.main()
我们可以直接运行我们的新脚本:
$ python test_loads.py --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec
文件列表:
这是运行的默认形式:
$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
test_bulk_insert_dictionaries, test_core,
test_core_query_caching, test_dbapi_raw_w_connect,
test_dbapi_raw_w_pool
test_orm_commit : Individual INSERT/COMMIT pairs via the
ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
the "bulk" API (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
the "bulk" API with dictionaries (10000 iterations);
total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
(10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
connection pool (10000 iterations); total time 8.001813 sec
可以为所有测试或更常见的是个别测试转储 Python 分析输出:
$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
186109 function calls (186102 primitive calls) in 1.089 seconds
Ordered by: internal time, call count
ncalls tottime percall cumtime percall filename:lineno(function)
1000 0.634 0.001 0.634 0.001 {method 'commit' of 'sqlite3.Connection' objects}
1000 0.154 0.000 0.154 0.000 {method 'execute' of 'sqlite3.Cursor' objects}
1000 0.021 0.000 0.074 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
1000 0.015 0.000 0.034 0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
1 0.012 0.012 1.091 1.091 examples/performance/single_inserts.py:79(test_core)
...
性能分析套件系统是可扩展的,并且可以应用于您自己的一组测试。这是在决定某些性能关键例程的正确方法时使用的宝贵技术。例如,如果我们想要分析几种加载之间的差异,我们可以创建一个名为test_loads.py
的文件,其中包含以下内容:
from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = None
session = None
class Parent(Base):
__tablename__ = 'parent'
id = Column(Integer, primary_key=True)
children = relationship("Child")
class Child(Base):
__tablename__ = 'child'
id = Column(Integer, primary_key=True)
parent_id = Column(Integer, ForeignKey('parent.id'))
# Init with name of file, default number of items
Profiler.init("test_loads", 1000)
@Profiler.setup_once
def setup_once(dburl, echo, num):
"setup once. create an engine, insert fixture data"
global engine
engine = create_engine(dburl, echo=echo)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
sess = Session(engine)
sess.add_all([
Parent(children=[Child() for j in range(100)])
for i in range(num)
])
sess.commit()
@Profiler.setup
def setup(dburl, echo, num):
"setup per test. create a new Session."
global session
session = Session(engine)
# pre-connect so this part isn't profiled (if we choose)
session.connection()
@Profiler.profile
def test_lazyload(n):
"load everything, no eager loading."
for parent in session.query(Parent):
parent.children
@Profiler.profile
def test_joinedload(n):
"load everything, joined eager loading."
for parent in session.query(Parent).options(joinedload("children")):
parent.children
@Profiler.profile
def test_subqueryload(n):
"load everything, subquery eager loading."
for parent in session.query(Parent).options(subqueryload("children")):
parent.children
if __name__ == '__main__':
Profiler.main()
我们可以直接运行我们的新脚本:
$ python test_loads.py --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec
使用 SQLite 作为状态机的太空侵略者游戏。
最初开发于 2012 年。已适配为在 Python 3 中运行。
在文本控制台中使用 ASCII 艺术运行。
运行:
python -m examples.space_invaders.space_invaders
在运行时,观察日志中的 SQL 输出:
tail -f space_invaders.log
尽情享受!
文件清单:
演示了一个扩展,为实体创建版本表并存储每次更改的记录。给定的扩展生成一个匿名的“历史”类,表示目标对象的历史版本。
与使用时间行进行版本控制示例进行比较,该示例将更新写入为同一表中的新行,而不使用单独的历史表。
使用通过一个单元测试模块test_versioning.py
进行演示,可以像其他模块一样运行,内部使用unittest
:
python -m examples.versioned_history.test_versioning
一个示例用法片段,使用声明性:
from history_meta import Versioned, versioned_session
class Base(DeclarativeBase):
pass
class SomeClass(Versioned, Base):
__tablename__ = 'sometable'
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
Session = sessionmaker(bind=engine)
versioned_session(Session)
sess = Session()
sc = SomeClass(name='sc1')
sess.add(sc)
sess.commit()
sc.name = 'sc1modified'
sess.commit()
assert sc.version == 2
SomeClassHistory = SomeClass.__history_mapper__.class_
assert sess.query(SomeClassHistory).\
filter(SomeClassHistory.version == 1).\
all() \
== [SomeClassHistory(version=1, name='sc1')]
Versioned
混合类设计用于与声明性一起使用。要将扩展与经典映射器一起使用,可以应用_history_mapper
函数:
from history_meta import _history_mapper
m = mapper(SomeClass, sometable)
_history_mapper(m)
SomeHistoryClass = SomeClass.__history_mapper__.class_
版本示例还与 ORM 乐观并发功能集成,文档化在配置版本计数器。要启用此功能,请将标志Versioned.use_mapper_versioning
设置为 True:
class SomeClass(Versioned, Base):
__tablename__ = 'sometable'
use_mapper_versioning = True
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
如果两个具有相同版本标识符的SomeClass
实例被同时更新并发送到数据库进行并发更新,如果数据库隔离级别允许两个 UPDATE 语句继续进行,其中一个将失败,因为它不再针对最后已知的版本标识符。
文件清单:
history_meta.py
模块函数的使用的单元测试。
几个示例说明拦截更改的技术,这些更改首先被解释为对行的更新,而实际上将其转换为对新行的插入,保留先前的行作为历史版本。
与带有历史表的版本控制示例进行比较,该示例将历史行写入单独的历史表。
文件清单:
versioned_rows
示例的变体,类似于垂直属性映射示例中所示的内容。
versioned_rows.py
的相同 UPDATE 到 INSERT 技术,但还会对旧行进行 UPDATE 以影响时间戳的更改。还包括一个SessionEvents.do_orm_execute()
挂钩来将查询限制为只有最新版本。 #### 使用历史表进行版本控制
展示了一个创建实体的版本表并为每个更改存储记录的扩展。给定的扩展生成一个匿名的“history”类,表示目标对象的历史版本。
与使用时间行版本化的例子相比,这些例子将更新写入相同表中的新行中,而不使用单独的历史表。
通过一个单元测试模块test_versioning.py
来说明使用方法,可以像其他模块一样运行,内部使用unittest
:
python -m examples.versioned_history.test_versioning
一个示例用法片段,使用声明式:
from history_meta import Versioned, versioned_session
class Base(DeclarativeBase):
pass
class SomeClass(Versioned, Base):
__tablename__ = 'sometable'
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
Session = sessionmaker(bind=engine)
versioned_session(Session)
sess = Session()
sc = SomeClass(name='sc1')
sess.add(sc)
sess.commit()
sc.name = 'sc1modified'
sess.commit()
assert sc.version == 2
SomeClassHistory = SomeClass.__history_mapper__.class_
assert sess.query(SomeClassHistory).\
filter(SomeClassHistory.version == 1).\
all() \
== [SomeClassHistory(version=1, name='sc1')]
Versioned
mixin 设计为与声明性一起使用。要将扩展与经典映射器一起使用,可以应用_history_mapper
函数:
from history_meta import _history_mapper
m = mapper(SomeClass, sometable)
_history_mapper(m)
SomeHistoryClass = SomeClass.__history_mapper__.class_
版本控制示例还与在配置版本计数器中记录的 ORM 乐观并发功能集成。要启用此功能,请将标志Versioned.use_mapper_versioning
设置为 True:
class SomeClass(Versioned, Base):
__tablename__ = 'sometable'
use_mapper_versioning = True
id = Column(Integer, primary_key=True)
name = Column(String(50))
def __eq__(self, other):
assert type(other) is SomeClass and other.id == self.id
上面,如果两个具有相同版本标识符的SomeClass
实例被同时更新并发送到数据库进行并发更新,如果数据库隔离级别允许这两个 UPDATE 语句继续进行,其中一个将失败,因为它不再是针对最后已知版本标识符的。
文件列表:
history_meta.py
模块函数的用法的单元测试。
几个示例说明了拦截更改的技术,这些更改首先被解释为对行的 UPDATE,而实际上将其转换为对新行的 INSERT,将先前的行保留为历史版本。
与带有历史表的版本控制示例进行比较,该示例将历史行写入单独的历史表中。
文件列表:
versioned_rows.py
相同的 UPDATE 转换为 INSERT 技术,但还发出了对旧行的 UPDATE 以影响时间戳的更改。还包括一个SessionEvents.do_orm_execute()
钩子,以限制查询仅针对最新版本。
展示了“垂直表”映射。
“垂直表”是指将对象的各个属性存储为表中的不同行的技术。 “垂直表”技术用于持久化可以具有各种属性集的对象,但牺牲了简单的查询控制和简洁性。它通常在内容/文档管理系统中找到,以灵活地表示用户创建的结构。
给出了两种方法的变体。在第二种方法中,每行引用一个包含有关存储在属性中的信息类型的“数据类型”,例如整数、字符串或日期。
示例:
shrew = Animal(u'shrew')
shrew[u'cuteness'] = 5
shrew[u'weasel-like'] = False
shrew[u'poisonous'] = True
session.add(shrew)
session.flush()
q = (session.query(Animal).
filter(Animal.facts.any(
and_(AnimalFact.key == u'weasel-like',
AnimalFact.value == True))))
print('weasel-like animals', q.all())
文件列表:
单表、联表和具体表继承的工作示例,如映射类继承层次结构中所述。
文件列表:
作为 Mapping Class Inheritance Hierarchies 中描述的单表、连接表和具体表继承的工作示例。
文件清单:
示例说明了对 SQLAlchemy 属性管理系统的修改。
文件清单:
AttributeEvents.init_scalar()
事件,结合核心列默认值来提供 ORM 对象,当访问未设置的属性时自动产生默认值。
sqlalchemy.ext.instrumentation
扩展包进行自定义类仪器化。 ### 水平分片
使用 SQLAlchemy 分片 API 的基本示例。分片是指将数据横向扩展到多个数据库。
“分片”映射的基本组件包括:
Engine
实例,每个实例分配一个“分片 ID”。这些Engine
实例可以引用不同的数据库,或者同一数据库中的不同模式/帐户,或者甚至可以仅通过选项来区分,当使用时会使它们访问不同的模式或表。
在这些示例中,针对相同的基本示例使用不同类型的分片,适应以每个大陆为基础的天气数据。我们提供了示例的 shard_chooser、id_chooser 和 query_chooser 函数。query_chooser 演示了检查 SQL 表达式元素以尝试确定请求的单个分片。
构建通用分片例程是解决将实例组织在多个数据库中的问题的一种雄心勃勃的方法。对于更直接的替代方案,“不同实体”方法是一种简单的将对象分配给不同表(以及潜在的数据库节点)的显式方法 - 在维基上描述为EntityName。
文件列表:
演示对 SQLAlchemy 属性管理系统的修改的示例。
文件列表:
AttributeEvents.init_scalar()
事件,结合核心列默认值,为 ORM 对象提供在访问未设置属性时自动生成默认值的功能。
sqlalchemy.ext.instrumentation
扩展包。
使用 SQLAlchemy 分片 API 的基本示例。分片是指在多个数据库之间水平扩展数据。
“分片”映射的基本组件包括:
Engine
实例,每个分配一个“分片 id”。这些Engine
实例可以引用不同的数据库,或者同一数据库中的不同模式/帐户,或者它们甚至可以仅通过选项进行区分,这些选项将在使用时导致它们访问不同的模式或表。
在这些示例中,针对同一基本示例使用了不同类型的分片,该示例适用于按大陆基础提供天气数据。我们提供了示例 shard_chooser、id_chooser 和 query_chooser 函数。query_chooser 展示了对 SQL 表达式元素的检查,以尝试确定请求的单个分片。
构建通用分片例程是组织实例在多个数据库中的一种雄心勃勃的方法。对于更直接的替代方案,“不同实体”方法是一种将对象分配给不同表(和潜在的数据库节点)的简单方法 - 在维基上描述为EntityName。
文件列表:
展示了增强 ORM SELECT 行为的示例,这些示例由 Session.execute()
与 2.0 风格 的 select()
以及 1.x 风格 的 Query
对象一起使用。
示例包括演示with_loader_criteria()
选项以及SessionEvents.do_orm_execute()
钩子。
截至 SQLAlchemy 1.4 版本,Query
构造与Select
构造合并在一起,因此这两个对象大部分相同。
文件列表:
演示如何嵌入dogpile.cache功能与 ORM 查询,允许完全控制缓存以及从长期缓存中获取“延迟加载”属性的能力。
在这个演示中,演示了以下技术:
SessionEvents.do_orm_execute()
事件钩子
Session.execute()
以从自定义缓存源而不是数据库中提取的基本技术。
UserDefinedOption
对象配置语句对象中的选项。
另请参阅
重新执行语句 - 包括这里介绍的技术的一个通用示例。
例如:
# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))
# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))
# execute and results
result = session.execute(stmt)
print(result.scalars().all())
要运行,必须安装 SQLAlchemy 和 dogpile.cache,或者在当前 PYTHONPATH 上。演示将为数据文件创建一个本地目录,插入初始数据,然后运行。第二次运行演示将利用已经存在的缓存文件,并且只会发出一条 SQL 语句针对两个表 - 但显示的结果将利用数十个从缓存中获取的延迟加载。
演示脚本本身,按复杂性顺序作为 Python 模块运行,以便相对导入正常工作:
python -m examples.dogpile_caching.helloworld
python -m examples.dogpile_caching.relationship_caching
python -m examples.dogpile_caching.advanced
python -m examples.dogpile_caching.local_session_caching
文件列表:
说明如何使用Session.execute()
与 2.0 样式的select()
一起增强 ORM SELECT 行为的示例,以及 1.x 样式的Query
对象。
示例包括演示with_loader_criteria()
选项以及SessionEvents.do_orm_execute()
钩子。
从 SQLAlchemy 1.4 开始,Query
构造与Select
构造统一,因此这两个对象大部分相同。
文件列表:
说明如何在 ORM 查询中嵌入dogpile.cache功能,允许完全的缓存控制,以及从长期缓存中拉取“惰性加载”属性的能力。
这个演示展示了以下技术:
SessionEvents.do_orm_execute()
事件挂钩
Session.execute()
以从自定义缓存源而不是数据库中提取的基本技术。
UserDefinedOption
对象配置语句对象中的选项。
另请参阅
重新执行语句 - 包括这里介绍的技术的一般示例。
例如:
# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))
# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))
# execute and results
result = session.execute(stmt)
print(result.scalars().all())
要运行,必须安装 SQLAlchemy 和 dogpile.cache,或者在当前 PYTHONPATH 上安装。演示将为数据文件创建一个本地目录,插入初始数据,并运行。再次运行演示将利用已经存在的缓存文件,并且只会发出一条针对两个表的 SQL 语句 - 但显示的结果将利用几十个懒加载,所有这些懒加载都从缓存中拉取。
演示脚本本身,按复杂度顺序运行为 Python 模块,以便相对导入正常工作:
python -m examples.dogpile_caching.helloworld
python -m examples.dogpile_caching.relationship_caching
python -m examples.dogpile_caching.advanced
python -m examples.dogpile_caching.local_session_caching
文件清单:
python -m examples.dogpile_caching.advanced
python -m examples.dogpile_caching.local_session_caching
文件列表:
+ environment.py - 建立数据/缓存文件路径和配置,必要时引导固定数据。
+ caching_query.py - 代表允许在 SQLAlchemy 中使用 Dogpile 缓存的函数和类。引入一个名为 FromCache 的查询选项。
+ model.py - 数据模型,表示具有多个地址对象的人员,每个地址对象都有邮政编码、城市和国家。
+ fixture_data.py - 安装一些示例数据。这里有一些美国/加拿大城市的少数邮政编码。然后,安装了 100 个人员记录,每个记录都有一个随机选择的邮政编码。
+ helloworld.py - 演示了如何加载一些数据,并缓存结果。
+ relationship_caching.py - 展示了如何在关系终点添加缓存选项,以便惰性加载从缓存中加载。
+ advanced.py - 演示了 Query 与 FromCache 选项的使用,包括前端加载、缓存失效和集合缓存。
+ local_session_caching.py - 这个例子创建了一个新的 dogpile.cache 后端,它将数据持久化存储在当前会话的字典中。删除会话,缓存就消失了。### ORM 查询事件
说明如何使用`Session.execute()`与 2.0 样式的`select()`一起增强 ORM SELECT 行为的示例,以及 1.x 样式的`Query`对象。
示例包括演示`with_loader_criteria()`选项以及`SessionEvents.do_orm_execute()`钩子。
从 SQLAlchemy 1.4 开始,`Query`构造与`Select`构造统一,因此这两个对象大部分相同。
文件列表:
+ temporal_range.py - 展示了将应用于选定实体的自定义每个查询条件。
+ filter_public.py - 展示了应用于特定类型实体的全局条件。
### Dogpile 缓存
说明如何在 ORM 查询中嵌入[dogpile.cache](https://dogpilecache.sqlalchemy.org/)功能,允许完全的缓存控制,以及从长期缓存中拉取“惰性加载”属性的能力。
这个演示展示了以下技术:
+ 使用`SessionEvents.do_orm_execute()`事件挂钩
+ 绕过`Session.execute()`以从自定义缓存源而不是数据库中提取的基本技术。
+ 基本的缓存与 dogpile.cache,使用允许全局控制一组固定配置的“区域”。
+ 使用自定义`UserDefinedOption`对象配置语句对象中的选项。
另请参阅
重新执行语句 - 包括这里介绍的技术的一般示例。
例如:
```py
# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))
# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))
# execute and results
result = session.execute(stmt)
print(result.scalars().all())
要运行,必须安装 SQLAlchemy 和 dogpile.cache,或者在当前 PYTHONPATH 上安装。演示将为数据文件创建一个本地目录,插入初始数据,并运行。再次运行演示将利用已经存在的缓存文件,并且只会发出一条针对两个表的 SQL 语句 - 但显示的结果将利用几十个懒加载,所有这些懒加载都从缓存中拉取。
演示脚本本身,按复杂度顺序运行为 Python 模块,以便相对导入正常工作:
python -m examples.dogpile_caching.helloworld
python -m examples.dogpile_caching.relationship_caching
python -m examples.dogpile_caching.advanced
python -m examples.dogpile_caching.local_session_caching
文件清单: