在介绍Block区块之前,先介绍区块使用的一些通用的数据结构。
1)存储 Storage
Nebulas定义了存储(Storage)的接口,并实现了三种存储类型:RocksStorage,MemoryStorage, DiskStorage。相关的源代码在storage目录下。
1.// Storage interface of Storage.
2.type Storage interface {
3.// Get return the value to the key in Storage.
4.Get(key []byte) ([]byte, error) //读
5.
6.// Put put the key-value entry to Storage.
7.Put(key []byte, value []byte) error //写
8.
9.// Del delete the key entry in Storage.
10.Del(key []byte) error //删除
11.
12.// EnableBatch enable batch write.
13.EnableBatch() //开启Batch模式
14.
15.// DisableBatch disable batch write.
16.DisableBatch() //关闭Batch模式
17.
18.// Flush write and flush pending batch write.
19.Flush() error //刷缓存
20.}
RocksStorage是基于gorocksdb项目(RocksDB的go的封装)实现。DiskStorage是基于goleveldb项目实现。MemoryStorage是一个简单的基于内存的KV(key-value)实现。
RocksStorage能充分利用高速存储器的性能(比如flash,内存)。
2) Trie
Nebualas实现了Merkle Patrica Trie,代码在common/trie/trie.go文件中。Merkle Patrica Trie集成了Merkle树和Patrica树的优点,在以太坊中被设计和首次应用的。
1.type Triestruct{
2.rootHash []byte //树根的hash
3.storage storage.Storage //存储对象
4.changelog []*Entry //所有的changelog
5.needChangelogbool//是否需要changelog
6.}
7.
8.// Entry in log, [key, old value, new value]
9.type Entrystruct{
10.action Action //有三种操作:插入,更改和删除
11.key []byte //key
12.old []byte //old的value
13.update []byte //更新的value
14.}
如果保存了changelog,可以通过Replay函数重构Trie。
Trie中的节点由Node类型表示:
1.// Node in trie, three kinds,
2.// Branch Node [hash_0, hash_1, ..., hash_f]
3.// Extension Node [flag, encodedPath, next hash]
4.// Leaf Node [flag, encodedPath, value]
5.type nodestruct{
6.Hash []byte //node的hash值
7.Bytes []byte //node的系列化数据
8.Val [][]byte //node的类型和数值
9.}
Node有三种类型:
1)分支节点(BranchNode)
分支结点是由16个hash组成。
2)扩展节点(ExtentionNode)
扩展节点由三个字节数组组成:扩展标记,路径,以及hash。
3)叶子结点(Leaf Node)
叶子结点由三个字节数组组成:叶子标记,叶子的路径,以及hash。
以646f83,646f67,686f72三个key为例,三种结点的关系如下图所示:
注意:Nebulas的Trie的设计中没有考虑扩展节点,可能本身就是叶子的情况。因为在实际运行的时候,key都是hash值,很少遇到这样的情况。在以太坊最新的代码中,扩展节点本身也可能是也叶子结点(可以存储value值)。
Trie主要提供三类函数:
1)一般操作函数:“Put”,“Get”,“Del”等函数,对树添加,获取,删除结点。注意“Put”/“Del”函数会直接更改存储。
2)同步函数:SyncTrie和SyncPath,同步整棵树或者一个路径。
3)证明函数:Prove和Verify函数提供证明和验证某个节点在一棵树中。
以证明节点(646f83)为例,Prove函数的结果是N1到N5节点的Val的数组,也就是下图中的蓝线经过的节点的Val组成的数组。
3)MVCCDB
MVCCDB是Nebulas设计的多版本并发控制数据库,类型结构及其关系如下图所示。
MVCCDB支持如下的三种操作类型:
1)直接读写(Get/Put/Del)
MVCCDB直接从Storage读写数据。
2)事务处理(begin – Get/Put/Del – commit/rollback)
StagingTable会缓存一个事务中的Get/Put/Del操作,在commit中同步到Storage中。
3)预准备事务处理(begin – prepare – Get/Put/Del – update –commit/rollback)
预准备事务处理会创建预处理MVCCDB。update的时候,将预处理MVCCDB中的KV值更新到父MVCCDB中。每次更新会更新父StagingTable中的版本。
MVCCDB类型定义在core/mvccdb/mvccdb.go文件中:
1.// MVCCDB the data with MVCC supporting.
2.type MVCCDBstruct{
3.tid interface{} //MVCCDB的tid(事务编号)
4.storage storage.Storage //MVCCDB的底层存储(KV)
5.stagingTable *StagingTable //带版本信息的KV对
6.mutex sync.Mutex //同步锁
7.parentDB *MVCCDB //父亲MVCCDB
8.isInTransactionbool//是否在一次事务流程中?
9.isPreparedDBbool//是否是“预准备”DB
10.isPreparedDBClosedbool//“预准备”DB是否关闭
11.preparedDBs map[interface{}]*MVCCDB//所有“预准备”DB的MAP
12.isTrieSameKeyCompatibilitybool//trie中是否允许同样的key值
13.}
MVCCDB是个嵌套的类型,基本的属性包括:tid(事务编号),storage(存储类型)和stagingTable(在存入存储之前,在内存中缓存的带版本的KV对)。MVCCDB可以发起事务,为每个事务会创建一个MVCCDB,作为preparedDB。parentDB是preparedDB的父亲MVCCDB。
仔细分析一下begin/prepare/Get/Put/commit函数。
begin函数很简单,设置isInTransaction标志:
1.// Begin begin a transaction.
2.func (db *MVCCDB) Begin() error {
3.db.mutex.Lock()
4.defer db.mutex.Unlock()
5.
6.ifdb.isInTransaction {
7.returnErrUnsupportedNestedTransaction
8.}
9.
10.ifdb.isPreparedDB {
11.returnErrUnsupportedBeginInPreparedDB
12.}
13.
14.db.isInTransaction =true
15.
16.returnnil
17.}
prepare函数创建一个preparedDB,每个preparedDB都有唯一的标示tid:
1.// Prepare a nested transaction
2.func (db *MVCCDB) Prepare(tid interface{}) (*MVCCDB, error) {
3.db.mutex.Lock()
4.defer db.mutex.Unlock()
5.
6.if!db.isInTransaction { //只有在一个事务中,才能创建preparedDB
7.returnnil, ErrTransactionNotStarted
8.}
9.
10.iftid == nil { //每个preparedDB都必须有个唯一的tid
11.returnnil, ErrTidIsNil
12.}
13.
14.ifdb.preparedDBs[tid] != nil { //检查是否preparedDB已经创建
15.returnnil, ErrTidIsExist
16.}
17.//为preparedDB创建preparedStagingTable
18.preparedStagingTable, err := db.stagingTable.Prepare(tid)
19.iferr != nil {
20.returnnil, err
21.}
22.
23.preparedDB := &MVCCDB{
24.tid: tid,
25.storage: db.storage,
26.stagingTable: preparedStagingTable,
27.parentDB: db,
28.isInTransaction:true,
29.isPreparedDB:true,
30.isPreparedDBClosed:false,
31.preparedDBs: make(map[interface{}]*MVCCDB),
32.isTrieSameKeyCompatibility: db.isTrieSameKeyCompatibility,
33.}
34.
35.db.preparedDBs[tid] = preparedDB
36.returnpreparedDB, nil
37.}
Get/Put/Del函数相当简单,从storage或者stagingTable获取,设置,或者删除key值。
CheckAndUpdate函数,合并preparedDB的KV数据到父亲MVCCDB中。
1.// CheckAndUpdate merge current changes to `FinalVersionizedValues`.
2.func (db *MVCCDB) CheckAndUpdate() ([]interface{}, error) {
3.db.mutex.Lock()
4.defer db.mutex.Unlock()
5.
6.if!db.isInTransaction {
7.returnnil, ErrTransactionNotStarted
8.}
9.
10.if!db.isPreparedDB { //只有preparedDB允许update操作
11.returnnil, ErrDisallowedCallingInNoPreparedDB
12.}
13.
14.ifdb.isPreparedDBClosed {
15.returnnil, ErrPreparedDBIsClosed
16.}
17.
18.ret, err := db.stagingTable.MergeToParent() //合并到parent的MVCCDB的stagingTable中
19.
20.iferr == nil {
21.// cleanup.
22.db.stagingTable.Purge()
23.}
24.
25.returnret, err
26.}
commit函数讲stagingTable的KV值,存储到stoage中。也就是说,commit函数讲内存中的KV值存储到永久存储storage中。
带版本信息的KV值由StagingTable类型管理,定义在common/mvccdb/staging_table.go中:
1.// StagingTable a struct to store all staging changed key/value pairs.
2.// There are two map to store the key/value pairs. One are stored associated with tid,
3.// the other is `finalVersionizedValue`, record the `ready to commit` key/value pairs.
4.type StagingTablestruct{
5.storage storage.Storage //对应的持久化存储
6.globalVersion int64 //版本编号
7.parentStagingTable *StagingTable //父stagingTable
8.versionizedValues stagingValuesMap //KV对
9.tid interface{} //事务编号
10.mutex sync.Mutex //锁
11.prepareingGlobalVersion int64 //预准备时的版本编号
12.preparedStagingTables map[interface{}]*StagingTable //preparedDB对应的preparedStagingTables
13.isTrieSameKeyCompatibilitybool//是否允许相同的key
14.disableStrictGlobalVersionCheckbool// default `true`
15.}
StagingTable也是嵌套类型,和MVCCDB逻辑几乎一致。StagingValuesMap是带版本的KV对的map表。
1.type stagingValuesMap map[string]*VersionizedValueItem
2.
3.// VersionizedValueItem a struct for key/value pair, with version, dirty, deleted flags.
4.type VersionizedValueItemstruct{
5.tid interface{} //事务编号
6.key []byte //K
7.val []byte //V
8.versionint//版本号(每次更新,版本加一)
9.deletedbool//是否删除
10.dirtybool//是否修改
11.globalVersion int64 //全局版本号(每次update,版本加一)
12.}
一个带版本信息的KV对由VersionizedValueItem表示。具体分析一下MergeToParent函数:
1.// MergeToParent merge key/value pair of tid to `finalVersionizedValues` which the version of value are the same.
2.func (tbl *StagingTable) MergeToParent() ([]interface{}, error) {
3.iftbl.parentStagingTable == nil {
4.returnnil, ErrParentStagingTableIsNil
5.}
6.
7.tbl.parentStagingTable.mutex.Lock() //锁 parentStagingTable
8.defer tbl.parentStagingTable.mutex.Unlock()
9.
10.tbl.mutex.Lock() //锁当前的StagingTable
11.defer tbl.mutex.Unlock()
12.
13.dependentTids := make(map[interface{}]bool)
14.conflictKeys := make(map[string]interface{})
15.
16.// 1. check version.
17.targetValues := tbl.parentStagingTable.versionizedValues //获得parentStagingTable的所有KV对
18.//查看当前StagingTable的所有KV对
19.forkeyStr, fromValueItem := range tbl.versionizedValues {
20.targetValueItem := targetValues[keyStr]
21.
22.iftargetValueItem == nil {
23.continue
24.}
25.
26.// 1. record conflict.
27.iffromValueItem.isConflict(targetValueItem, tbl.isTrieSameKeyCompatibility) {
28.conflictKeys[keyStr] = targetValueItem.tid
29.continue
30.}
31.
32.// 2. record dependentTids.
33.
34.// skip default value loaded from storage.
35.iftargetValueItem.isDefault() {
36.continue
37.}
38.
39.// ignore same parent tid for dependentTids.
40.iftargetValueItem.tid == tbl.parentStagingTable.tid {
41.continue
42.}
43.
44.// ignore version check when TrieSameKeyCompatibility is enabled.
45.iftbl.isTrieSameKeyCompatibility {
46.continue
47.}
48.//如果存在targetValueItem,也就是说,parentStagingTable存在同样key的KV对,并且tid不一样,说明当前的tid和改tid有依赖关系
49.dependentTids[targetValueItem.tid] =true
50.}
51.
52.iflen(conflictKeys) > 0 {
53.logging.VLog().WithFields(logrus.Fields{
54."tid": tbl.tid,
55."parentTid": tbl.parentStagingTable.tid,
56."conflictKeys": conflictKeys,
57.}).Debug("Failed to be merged into parent.")
58.returnnil, ErrStagingTableKeyConfliction
59.}
60.
61.// 2. merge to final.
62.
63.// incr parentStagingTable.globalVersion.
64.tbl.parentStagingTable.globalVersion++ //更新parentStagingTable的全局版本
65.
66.forkeyStr, fromValueItem := range tbl.versionizedValues {
67.// ignore default value item.
68.iffromValueItem.isDefault() {
69.continue
70.}
71.
72.// ignore non-dirty.
73.if!fromValueItem.dirty {
74.continue
75.}
76.
77.// merge.
78.value := fromValueItem.CloneForMerge(tbl.parentStagingTable.globalVersion)
79.targetValues[keyStr] = value //Clone当前的KV对,并更新到parentStaingTable中
80.}
81.
82.tids := make([]interface{}, 0, len(dependentTids)) //收集所有的依赖关系
83.forkey := range dependentTids {
84.tids = append(tids, key)
85.}
86.
87.returntids, nil
88.}
特别注意的是,MergeToParent返回和当前tid依赖的其他tid。
4) DAG
DAG(有向无环图)定义在common/dag/dag.go文件中:
1.// Dag struct
2.type Dagstruct{
3.nodes map[interface{}]*Node
4.indexint
5.indexs map[int]interface{}
6.}
DAG由一系列的Node组成:
1.// Node struct
2.type Nodestruct{
3.key interface{}
4.indexint
5.children []*Node
6.parentCounterint
7.}
Dispatcher是在DAG类型基础上,提供的辅助类型。针对DAG的节点的依赖关系,Dispatcher能最大化地对节点进行并行化处理。相关的代码比较容易理解,可以查看具体的代码common/dag/dispatcher.go。
举个例子,说明Dispatcher的逻辑,假设DAG的节点关系如下:
Dispatcher会尽力先并行化处理1/2/3节点,在处理完后,再并行化处理其他节点。
1.// NewDispatcher create Dag Dispatcher instance.
2.func NewDispatcher(dag *Dag, concurrencyint, elapseInMs int64, context interface{}, cb Callback) *Dispatcher {
NewDispatcher函数用来创建一个Dispatcher,其中concurrency参数代表最大的并行个数,elapseInMs参数代表最大执行时间(ms为单位),cb参数是针对DAG中的节点的回调函数。
领取专属 10元无门槛券
私享最新 技术干货