boltdb是一个k-v存储引擎,它的核心操作是读写数据。本文从使用者的角度,结合读写数据的实例,分析读&写数据是如何执行的,以及各个组件是如何串联起来工作的。
boltdb中有两种事务模式:只读事务和读写事务。读数据采用只读事务操作,写数据采用读写事务操作。任意时刻,最多只有一个读写事务在操作。即多个只读事务、多个只读事务+1个读写事务都是可以并发进行操作。
读取boltdb数据有两种操作模式,一种是用户手动管理事务,另一种是通过boltdb提供的func (db *DB) View(fn func(*Tx) error) error方法,我们只要传入一个类型为func(*Tx) error的函数。
我们先来看手动管理事务的代码,分为以下流程:
func main() {
db, err := bolt.Open("test.db", 0600, nil)
if err != nil {
fmt.Println(err)
return
}
defer db.Close()
readDemo(db)
}
// 读模式
func readDemo(db *bolt.DB) {
// 传false,开启一个只读事务
tx, err := db.Begin(false)
if err != nil {
fmt.Println(err)
return
}
// 获取Bucket
bucket := tx.Bucket([]byte("bucket"))
if bucket != nil {
// 从Bucket获取数据
value := bucket.Get([]byte("key"))
fmt.Println(value)
}
// 回滚操作,只读事务不能执行tx.Commit操作
err = tx.Rollback()
if err != nil {
fmt.Println(err)
}
}
另一种读操作是通过db.View方法,只需传入我们的业务处理逻辑函数,由db.View来调用我们传入的函数,我们不用关心事务句柄tx的获取和Rollback操作。
err := db.View(func(tx *bolt.Tx) error {
...
return nil
})
结合下面的db.View方法,来看它做了哪些工作。总结起来有以下几点:
// 执行只读事务
func (db *DB) View(fn func(*Tx) error) error {
t, err := db.Begin(false)
if err != nil {
return err
}
defer func() {
if t.db != nil {
t.rollback()
}
}()
// fn内部不能进行rollback,否则引发panic
t.managed = true
err = fn(t)
// 在下面执行rollback之前,要将状态值修改回来
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
// 因为是只读操作,所以不会执行t.Commit操作
if err := t.Rollback(); err != nil {
return err
}
return nil
}
前面通过两个代码实例介绍了从boltdb中读数据的流程。本小节对这个操作流程做一个概括,从原理层面分析只读事务操作工作流程。

func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
...
// 1.1 通过mmap将db文件加载到内存,绑定到db.data字段上,
// 并初始化 db.meta0和db.meta1
if err := db.mmap(options.InitialMmapSize); err != nil {
_ = db.close()
return nil, err
}
// 1.2 初始化db.freelist
db.freelist = newFreelist()
// 根据page 3 填充freelist
db.freelist.read(db.page(db.meta().freelist))
return db, nil
}
db.Begin(false)开启一个只读事务,此操作会返回一个只读事务句柄tx。这个过程核心是对db文件创建了一个只读快照,将DB.meta信息拷贝了一份存储在tx中,主要是存储了root page id. tx内部会隐含创建一个伪根Bucket节点tx.root.
func (db *DB) Begin(writable bool) (*Tx, error) {
...
// 开启一个只读事务
return db.beginTx()
}
func (db *DB) beginTx() (*Tx, error) {
...
t := &Tx{}
t.init(db)
...
return t, nil
}
func (tx *Tx) init(db *DB) {
tx.db = db
tx.pages = nil
tx.meta = &meta{}
// 深拷贝db.meta,更改tx.meta不影响db
db.meta().copy(tx.meta)
// 创建根Bucket,相当于对B+做一个快照
tx.root = newBucket(tx)
tx.root.bucket = &bucket{}
*tx.root.bucket = tx.meta.root
...
}
func (b *Bucket) Get(key []byte) []byte操作,Cursor工作原理在boltdb源码分析系列-迭代器文章中已有介绍,本文不在做重复分析说明,感兴趣的同学可以看本系列文章。类似读取数据,向boltdb中写入数据也有两种模式,一种是用户自己管理事务,另一种是通过boltdb提供的API接口,我们只需传入一个事务操作函数,像func (db *DB) Update(fn func(*Tx) error) error、func (db *DB) Batch(fn func(*Tx) error) error.
对于手动进行写数据操作示例如下,操作与读数据基本相同,不同点有3处:
// 写模式
func writeDemo(db *bolt.DB) {
// 创建一个读写事务
tx, err := db.Begin(true)
if err != nil {
fmt.Println(err)
return
}
// 失败回滚操作
defer func() {
tx.Rollback()
}()
// 获取bucket
bucket := tx.Bucket([]byte("bucket"))
// 向bucket中添加key-value数据
err = bucket.Put([]byte("key"), []byte("value"))
if err != nil {
fmt.Println(err)
return
}
// 提交事务
err = tx.Commit()
if err != nil {
fmt.Println(err)
return
}
}
通过boltdb提供的写操作API接口,我们只需要提供一个事务逻辑函数即可。对于Update接口,它的内部逻辑与前面的读数据操作中的View是一致的,唯一不同的是多了Commit提交操作,不在重复分析。
因为写事务操作是有代价的,需要将数据写入磁盘,这个过程是比较费时的,boltdb提供了一个批量执行写事务的接口Batch,也就是说调用Batch(fn)传入的fn不会立即执行,需要等待一段时间,收集一批之后一起执行。具体等多久,这里有两种策略:
const (
// 批量事务最多1000个
DefaultMaxBatchSize int = 1000
// 批量事务等待时间10毫秒
DefaultMaxBatchDelay = 10 * time.Millisecond
)
满足上面两种中的任意一种,批量事务操作就会执行。通过策略1和策略2,同时兼顾了时间与效率,非常好。
下面是Batch方法处理过程,核心是收集和执行两个步骤。
func (db *DB) Batch(fn func(*Tx) error) error {
errCh := make(chan error, 1)
db.batchMu.Lock()
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
db.batch = &batch{
db: db,
}
// 启动一个定时器,延时一段时间批量处理收集到的fn
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
}
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
// 已经收集够了tx,立即开启一个协程执行批量run,就不用等到db.MaxBatchDelay再来执行了
// 那有没有重复执行的可能,下面立即执行了,上面的定时器到了又会执行,不会的,batch中通过sync.Once
// 只执行一次
if len(db.batch.calls) >= db.MaxBatchSize {
// 这里为什么要开启一个协程执行run? 开启一个协程是为了尽早释放batchMu锁,不开一个协程
// 会延迟锁的释放,导致其他goroutine执行Batch卡死,fn加入不到db.batch
go db.batch.trigger()
}
db.batchMu.Unlock()
err := <-errCh
if err == trySolo {
// fn执行失败,在单独执行一次
err = db.Update(fn)
}
return err
}
看了Batch实现,也许有同学有下面的疑问点?小编这里做一个分析说明。
答:不会被执行两次,虽然相当于在两个goroutine中都调用了db.batch.trigger,但是batch中start sync.Once保证了多次调用也只会执行一次,「sync.Once用的恰到好处」
go db.batch.trigger()中go可以去掉吗?答:不可以,开启一个goroutine执行是为了尽早释放锁,否则会导致其他调用Batch会阻塞卡死,fn加入不到db.batch
go db.batch.trigger()执行吗?答:不会的,if中新赋值的db.batch和go db.batch.trigger()中的db.batch不是同一个对象,不会影响。
Batch是一个阻塞操作,所以开启多个goroutine调用它才有意义。同时要注意,Batch中部分fn执行失败有重试执行操作,所以要考虑幂等性。
前一小节通过示例介绍了写数据操作流程,本小节从原理层面概括写数据是如何执行的。在示例代码中可以看到,写数据操作和读数据操作是一致的。都有db.Open一个boltdb实例,然后开启事务,获取桶并在桶上进行读写操作。这里只介绍写数据流程中与读不同的地方。
写数据关键的两步是func (b *Bucket) Put(key []byte, value []byte) error和func (tx *Tx) Commit() error. 相比读操作,写操作复杂不少,因为读操作不涉及数据更改,所以直接从mmap映射后的内存page中读取。而写操作会修改数据,要支持数据库的ACID特性,boltdb处理方法是:
下面结合源码对处理过程进行分析,boltdb是如何实现事务的在下篇文章中详细介绍。
2.调用Bucket提供的Put方法写入key-value数据,因所有的数据写入操作都是在叶子节点中进行的,先创建一个迭代器,对Bucket遍历并定位到要写入的叶子节点,Cursor c游标最后的位置即要写入的叶子节点。
2.1 定位叶子节点后,执行c.node().put(key,value,0,0)操作,这个操作分为两步,第一步是找到叶子节点node,「是node而不是page」,所有修改操作都是针对node的,对node不是很理解的可以看boltdb源码分析系列-内存结构文章。
func (b *Bucket) Put(key []byte, value []byte) error {
...
// 创建一个游标对象,对Bucket进行遍历,查找键值对为key的数据
c := b.Cursor()
k, _, flags := c.seek(key)
...
// 运行到这里,key有可能存在,也有可能不存在,不存在会创建一个key-value键值对,
// 存在会更新原来key的value,这些处理都是在put函数中实现的
// NOTE 对key是深拷贝,对value是浅拷贝
key = cloneBytes(key)
c.node().put(key, key, value, 0, 0)
return nil
}
Cursor的node方法返回叶子节点,如果node已存在,直接返回,否则从根节点开始从上往下,将游走路径上的page转成node,并保存到Bucket中的nodes。
func (c *Cursor) node() *node {
...
// 叶子节点已经在内存中,直接返回
if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
return ref.node
}
// 从根节点开始,从上往下直到根节点,将游走路径上的page转成node,并保存到
// Bucket中的nodes
var n = c.stack[0].node
if n == nil {
n = c.bucket.node(c.stack[0].page.id, nil)
}
for _, ref := range c.stack[:len(c.stack)-1] {
_assert(!n.isLeaf, "expected branch node")
n = n.childAt(int(ref.index))
}
_assert(n.isLeaf, "expected leaf node")
return n
}
// 返回bucket中的第index孩子节点
func (n *node) childAt(index int) *node {
if n.isLeaf {
panic(fmt.Sprintf("invalid childAt(%d) on a leaf node", index))
}
return n.bucket.node(n.inodes[index].pgid, n)
}
将pgid对应的page转换成node,保存到Bucket中的nodes中。调用链为Cursor.node-->node.childAt-->Bucket.node. Cursor和node结构中都有一个记录Bucket字段,它们都来自读写事务tx. 所以经过这个操作之后,从根节点到有修改写入叶子节点路径上的所有节点都添加到了Bucket的nodes中。
func (b *Bucket) node(pgid pgid, parent *node) *node {
...
if n := b.nodes[pgid]; n != nil {
return n
}
// 同时,将这些新建的node保存到了已b.rootNode为根节点的tree中
// 在执行tx.Commit时,根据rootNode将它们转成page,最后刷新到磁盘
n := &node{bucket: b, parent: parent}
if parent == nil {
b.rootNode = n
} else {
parent.children = append(parent.children, n)
}
...
// 根据page页p来对节点n进一步初始化一些内容,主要是填充key-value信息
n.read(p)
// 将节点n缓存起来
b.nodes[pgid] = n
...
return n
}
2.2 调用node的put操作,将修改的内容写入,这里会传入两个key,一个是旧key,一个是新key,如果旧key存在,用新的key-value将其覆盖,如果,旧key不存在,相当于新添加了新的key-value.
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
// 异常校验
...
// 在节点n的元素中查询旧key所在下标索引,即oldKey是它的第几个元素
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })
// 进一步判断索引为index位置的元素的key是否是oldKey,进一步确认oldKey是否存在
exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
// oldKey不存在,新增加一个节点
if !exact {
n.inodes = append(n.inodes, inode{})
copy(n.inodes[index+1:], n.inodes[index:])
}
// 更新index位置key-value数据
inode := &n.inodes[index]
inode.flags = flags
inode.key = newKey
inode.value = value
inode.pgid = pgid
_assert(len(inode.key) > 0, "put: zero-length inode key")
}
总结起来,Put操作的核心要点有两个:
结合下图,boltdb文件被mmap到内存之后,以page形式表示,右侧是这些page转成node之后的树形表示。当执行Put操作时,在叶子节点node d写入数据,从根节点到node d路径上所有的节点都被保存到了Bucket.nodes中,同时这些nodes节点也保存到了以rootNode为根节点的树中,对应到图中的红色节点。「注意,图中灰色的node并没有在内存中真实存在,这里是为了对比说明,只有Put操作会影响到的page才会转成node,保存在Bucket中」。对于Put操作会影响到的page是指哪些page? Put操作是发生在某个leaf page上,从root page到某leaf page路径上的page都是会影响的page.

下面的Commit代码抽取了转换nodes和写入磁盘操作, 将脏nodes转成page在tx.root.spill函数中,将page写入磁盘是tx.write操作。
func (tx *Tx) Commit() error {
...
// 对B+进行溢出处理, 并将Bucket中nodes转成dirty page
if err := tx.root.spill(); err != nil {
tx.rollback()
return err
}
...
// 刷新脏页(包含新的freelist)到磁盘
if err := tx.write(); err != nil {
tx.rollback()
return err
}
...
}
将nodes转成page调用链很长,依次是tx.root.spill-->b.rootNode.spill-->tx.allocate.
func (b *Bucket) spill() error {
...
// 从根节点开始进行溢出处理
if err := b.rootNode.spill(); err != nil {
return err
}
...
}
注意这里被执行spill操作的node,是在前面的Bucket Put操作中,加入到以rootNode为根节点中的node. 也就是Put操作影响到的node,会转成脏page。对于没有影响到的page,它们不会被转成node.
func (n *node) spill() error {
...
for _, node := range nodes {
...
p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
if err != nil {
return err
}
if p.id >= tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
}
// 分配的page id值赋值给node
node.pgid = p.id
// 将node中的元素内容序列化化到page p中
node.write(p)
...
}
...
}
在tx.allocate中将node转成page. 分配给定数量的page,并保存到脏页缓存pages中.
func (tx *Tx) allocate(count int) (*page, error) {
p, err := tx.db.allocate(count)
if err != nil {
return nil, err
}
tx.pages[p.id] = p
tx.stats.PageCount++
tx.stats.PageAlloc += count * tx.db.pageSize
return p, nil
}
tx.write()将dirty page保存到磁盘中,释放之前旧的page,将它们加入到freelist中,以便后续写入数据操作复用这些page.
