本文基于 TiDB release-5.1进行分析,需要用到 Go 1.16以后的版本 我的博客地址:: https://www.luozhiyun.com/archives/620
由于 TiDB 的事务模型沿用了 Percolator 的事务模型。所以先从 Percolator 开始,关于 Percolator 论文没看过的同学看这里:https://www.luozhiyun.com/archives/609 我已经翻译好了
Percolator实现分布式事务主要基于3个实体:Client、TSO、BigTable。
Percolator存储一列数据的时候,会在BigTable中存储多列数据:
Percolator的分布式写事务是由2阶段提交(后称2PC)实现的。不过它对传统2PC做了一些修改。一个写事务事务开启时,Client 会从 TSO 处获取一个 timestamp 作为事务的开始时间(后称为start_ts)。在提交之前,所有的写操作都只是缓存在内存里。提交时会经过 prewrite 阶段和 commit阶段,一个写事务可以包含多个写操作。
Prewrite
Commit
如果 Prewrite 成功,则进入 Commit 阶段:
每个 client connection 对应着一个 session , 事务相关数据的放在了 session 中, 它包含了对 KVStore 和 Txn 接口的引用。
func (s *session) NewTxn(ctx context.Context) error {
if err := s.checkBeforeNewTxn(ctx); err != nil {
return err
}
// 开启事务
txn, err := s.store.BeginWithOption(tikv.DefaultStartTSOption().SetTxnScope(s.sessionVars.CheckAndGetTxnScope()))
if err != nil {
return err
}
...
return nil
}
KVStore 定义了Begin/BeginWithOption
,用来创建开始一个事务。如上代码,session 的 NewTxn 方法中调用 KVStore 的 BeginWithOption 方法创建开始一个事务。
func (s *KVStore) Begin() (*KVTxn, error) {
return s.BeginWithOption(DefaultStartTSOption())
}
func (s *KVStore) BeginWithOption(options StartTSOption) (*KVTxn, error) {
return newTiKVTxnWithOptions(s, options)
}
Begin/BeginWithOption
调用图如下:
Begin/BeginWithOption
最终都会调用到 newTiKVTxnWithOptions 函数中。如果 startTS 为 nil ,则会去 PD服务获取一个时间戳,作为事务的startTS,同时也是事务的唯一标识。
func newTiKVTxnWithOptions(store *KVStore, options StartTSOption) (*KVTxn, error) {
if options.TxnScope == "" {
options.TxnScope = oracle.GlobalTxnScope
}
// 去PD服务获取一个时间戳
startTS, err := ExtractStartTS(store, options)
if err != nil {
return nil, errors.Trace(err)
}
snapshot := newTiKVSnapshot(store, startTS, store.nextReplicaReadSeed())
newTiKVTxn := &KVTxn{
snapshot: snapshot,
us: unionstore.NewUnionStore(snapshot),
store: store,
startTS: startTS,
startTime: time.Now(),
valid: true,
vars: tikv.DefaultVars,
scope: options.TxnScope,
}
return newTiKVTxn, nil
}
TiDB 在执行 insert/update/delete
等 DML 时,会调用memBuffer.Set(key, value)
将数据放入到 kv.Transaction
的 memBuffer 里面,如果执行失败,就调 StmtRollback
将 TxnState
里面的buf 清空 。具体实现可以看 tableCommon.AddRecord
函数:
func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) {
...
var setPresume bool
skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck
if (t.meta.IsCommonHandle || t.meta.PKIsHandle) && !skipCheck && !opt.SkipHandleCheck {
// 如果是 LazyCheck ,那么只读取本地缓存判断是否存在
if sctx.GetSessionVars().LazyCheckKeyNotExists() {
var v []byte
//只读取本地缓存判断是否存在
v, err = txn.GetMemBuffer().Get(ctx, key)
if err != nil {
setPresume = true
}
if err == nil && len(v) == 0 {
err = kv.ErrNotExist
}
} else {
//否则会通过rpc请求tikv从集群中校验数据是否存在
_, err = txn.Get(ctx, key)
}
if err == nil {
handleStr := getDuplicateErrorHandleString(t, recordID, r)
return recordID, kv.ErrKeyExists.FastGenByArgs(handleStr, "PRIMARY")
} else if !kv.ErrNotExist.Equal(err) {
return recordID, err
}
}
if setPresume {
// 表示假定数据不存在
err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists)
} else {
//将 Key-Value 写到当前事务的缓存中
err = memBuffer.Set(key, value)
}
if err != nil {
return nil, err
}
...
}
AddRecord 将数据写入的过程我在<5.深入TiDB:Insert 语句>分析过了,就不过多讲解。
TiDB 提交事务是通过调用 KVTxn 的 Commit 方法进行的。像 pecolator 论文中描述的协议一样,这是一个两阶段提交的过程,Prewrite 阶段与 Commit 阶段。
Prewrite:
当 Prewrite 阶段完成以后,进入 Commit 阶段,当前时间戳为 commitTs,TSO 会保证 commitTs > startTs。
Commit:
我们先看看整体的 twoPhaseCommitter 二阶段提交的调用时序图:
在代码实现上面首先会构建一个 twoPhaseCommitter,这个对象会用到在 begin 里面创建的 KVTxn 对象的字段:
func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
return &twoPhaseCommitter{
store: txn.store,
txn: txn,
startTS: txn.StartTS(),
sessionID: sessionID,
regionTxnSize: map[uint64]int{},
ttlManager: ttlManager{
ch: make(chan struct{}),
},
isPessimistic: txn.IsPessimistic(),
binlog: txn.binlog,
}, nil
}
mutations
由于事务数据都是存放在缓存中的,所以 twoPhaseCommitter 会通过 initKeysAndMutations 方法将当前事务的缓存中的数据转成 mutations:
func (c *twoPhaseCommitter) initKeysAndMutations() error {
var size, putCnt, delCnt, lockCnt, checkCnt int
txn := c.txn
// 当前事务的数据都存放在 memBuf 中
// memBuffer里的 key 是有序排列
memBuf := txn.GetMemBuffer()
sizeHint := txn.us.GetMemBuffer().Len()
c.mutations = newMemBufferMutations(sizeHint, memBuf)
c.isPessimistic = txn.IsPessimistic()
filter := txn.kvFilter
var err error
// 遍历 memBuffer 可以顺序的收集到事务里需要修改的 key
for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() {
_ = err
key := it.Key()
flags := it.Flags()
var value []byte
var op pb.Op
if !it.HasValue() {
...
} else {
value = it.Value()
...
}
var isPessimistic bool
if flags.HasLocked() {
isPessimistic = c.isPessimistic
}
c.mutations.Push(op, isPessimistic, it.Handle())
size += len(key) + len(value)
if len(c.primaryKey) == 0 && op != pb.Op_CheckNotExists {
c.primaryKey = key
}
}
...
commitDetail := &util.CommitDetails{WriteSize: size, WriteKeys: c.mutations.Len()}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))
c.hasNoNeedCommitKeys = checkCnt > 0
// 计算事务的 TTL 时间
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = txn.priority.ToPB()
c.syncLog = txn.syncLog
c.resourceGroupTag = txn.resourceGroupTag
c.setDetail(commitDetail)
return nil
}
当前事务的数据都存放在 memBuf 中,所以我们需要遍历 memBuf 可以顺序的收集到事务里需要修改的 key。
在这里还会调用 txnLockTTL 根据事务的大小计算事务的 TTL 时间。如果一个事务的 key 通过 prewrite
加锁后,事务没有执行完,tidb-server 就挂掉了,这时候集群内其他 tidb-server 是无法读取这个 key 的,如果没有 TTL,就会死锁。设置了 TTL 之后,读请求就可以在 TTL 超时之后执行清锁,然后读取到数据。
func txnLockTTL(startTime time.Time, txnSize int) uint64 {
lockTTL := defaultLockTTL
// 当事务大小大于16KB
if txnSize >= txnCommitBatchSize {
sizeMiB := float64(txnSize) / bytesPerMiB
// 6000 * 事务大小平方根
lockTTL = uint64(float64(ttlFactor) * math.Sqrt(sizeMiB))
//最小为3s
if lockTTL < defaultLockTTL {
lockTTL = defaultLockTTL
}
//最大为20s
if lockTTL > ManagedLockTTL {
lockTTL = ManagedLockTTL
}
}
elapsed := time.Since(startTime) / time.Millisecond
return lockTTL + uint64(elapsed)
}
TTL 和事务的大小的平方根成正比,并控制在一个最小值和一个最大值之间,最大20s,最小3s。
在执行 之前,先会调用 twoPhaseCommitter 的 prewriteMutations 方法进行一些预处理工作。
func (c *twoPhaseCommitter) prewriteMutations(bo *Backoffer, mutations CommitterMutations) error {
...
return c.doActionOnMutations(bo, actionPrewrite{}, mutations)
}
func (c *twoPhaseCommitter) doActionOnMutations(bo *Backoffer, action twoPhaseCommitAction, mutations CommitterMutations) error {
if mutations.Len() == 0 {
return nil
}
//按照region分组对mutations进行分组
groups, err := c.groupMutations(bo, mutations)
if err != nil {
return errors.Trace(err)
}
...
//进一步的分批处理
return c.doActionOnGroupMutations(bo, action, groups)
}
groupMutations
首先会调用 groupMutations 对 mutations 按照 region 分组。整个分组流程如下:
先对mutations按照region分组,如果某个region的mutations 太多, 则会先发送CmdSplitRegion命令给TiKV, TiKV对那个region先做个split, 然后再开始提交。
doActionOnGroupMutations
分组完之后会调用 doActionOnGroupMutations 会对每个group的 mutations 做进一步的分批处理。然后调用 doActionOnBatches 进行 prewrite 处理,整个调用图如下:
代码的主要执行逻辑如下:
func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups)))
...
batchBuilder := newBatched(c.primary())
//每个分组内按16KB大小再分批
for _, group := range groups {
batchBuilder.appendBatchMutationsBySize(group.region, group.mutations, sizeFunc, txnCommitBatchSize)
}
firstIsPrimary := batchBuilder.setPrimary()
actionCommit, actionIsCommit := action.(actionCommit)
...
//commit先同步的提交primary key所在的batch
if firstIsPrimary &&
((actionIsCommit && !c.isAsyncCommit()) || actionIsCleanup || actionIsPessimiticLock) {
// primary should be committed(not async commit)/cleanup/pessimistically locked first
err = c.doActionOnBatches(bo, action, batchBuilder.primaryBatch())
...
//提交完之后将primary key所在的batch移除
batchBuilder.forgetPrimary()
}
// Already spawned a goroutine for async commit transaction.
// 其它的key由go routine后台异步的提交
if actionIsCommit && !actionCommit.retry && !c.isAsyncCommit() {
secondaryBo := retry.NewBackofferWithVars(context.Background(), CommitSecondaryMaxBackoff, c.txn.vars)
go func() {
...
e := c.doActionOnBatches(secondaryBo, action, batchBuilder.allBatches())
...
}()
} else {
//执行 prewrite
err = c.doActionOnBatches(bo, action, batchBuilder.allBatches())
}
return errors.Trace(err)
}
doActionOnGroupMutations 里面还参杂了 commit 代码,可以先忽略。
下面跟着上面的流程图 doActionOnGroupMutations 会进入到 actionPrewrite 的 handleSingleBatch 方法中详细说说这个方法。在讲这个方法之前先看看主要的执行逻辑:
下面来看看代码的具体实现:
func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) (err error) {
...
// 获取事务的大小
txnSize := uint64(c.regionTxnSize[batch.region.id])
// 因为region的缺失导致的重试,所以不知道事务大小,这里重置事务大小为最大值
if action.retry {
txnSize = math.MaxUint64
}
tBegin := time.Now()
attempts := 0
// 构建 Request
req := c.buildPrewriteRequest(batch, txnSize)
// 构建 RegionRequestSender
sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
for {
//尝试次数
attempts++
// 如果请求超过了1分钟,那么打印一条日志
if time.Since(tBegin) > slowRequestThreshold {
logutil.BgLogger().Warn("slow prewrite request", zap.Uint64("startTS", c.startTS), zap.Stringer("region", &batch.region), zap.Int("attempts", attempts))
tBegin = time.Now()
}
//发送请求
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
// Unexpected error occurs, return it
if err != nil {
return errors.Trace(err)
}
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
// 如果遇到了regionError, 则需要重新调用doActionOnMutations重新分组,重新尝试
if regionErr != nil {
...
err = c.doActionOnMutations(bo, actionPrewrite{true}, batch.mutations)
return errors.Trace(err)
}
if resp.Resp == nil {
return errors.Trace(tikverr.ErrBodyMissing)
}
prewriteResp := resp.Resp.(*pb.PrewriteResponse)
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
//如果没有keyError,并且Batch是primary,则启动一个tllManager
if batch.isPrimary {
// 如果事务大于32M,那么开启ttlManager定时发送TxnHeartBeat心跳
if int64(c.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize &&
prewriteResp.OnePcCommitTs == 0 {
c.run(c, nil)
}
}
...
return nil
}
var locks []*Lock
for _, keyErr := range keyErrs {
// 该key已存在
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
e := &tikverr.ErrKeyExist{AlreadyExist: alreadyExist}
err = c.extractKeyExistsErr(e)
if err != nil {
atomic.StoreUint32(&c.prewriteFailed, 1)
}
return err
}
// 从 keyErr 中抽取出冲突的lock
lock, err1 := extractLockFromKeyErr(keyErr)
if err1 != nil {
atomic.StoreUint32(&c.prewriteFailed, 1)
return errors.Trace(err1)
}
locks = append(locks, lock)
}
start := time.Now()
//尝试解决这些locks,获取锁的过期时间
msBeforeExpired, err := c.store.lockResolver.resolveLocksForWrite(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
}
atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start)))
if msBeforeExpired > 0 {
// 过期时间大于0,那么sleep等待
err = bo.BackoffWithCfgAndMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
}
}
}
handleSingleBatch 里面有个循环会发请求到 TiKV,如果失败,那么会根据返回的错误来判断是否需要重试。需要注意的是,如果事务大于32M,那么开启ttlManager定时发送TxnHeartBeat心跳,因为大事务处理时间比较长。
commit 和 上面的 prewrite 执行流程类似,在 twoPhaseCommitter 的 execute 方法中执行完 prewriteMutations 之后会调用到 commitTxn 方法中,最后在 doActionOnBatches 方法中进行分批处理。
doActionOnBatches 方法会调用到 actionCommit 的 handleSingleBatch 方法进行事务的提交。
actionCommit 的 handleSingleBatch 执行流程其实和上面的 prewrite 也是类似的逻辑:
handleSingleBatch 首先也会调用 NewRequest 初始化一个 Request 结构体作为请求体,然后进入到循环结构中,调用 RegionRequestSender 的 SendReq 向 TiKV 发起请求;
如果返回 regionErr 错误,那么会重新调用 doActionOnMutations 重新分组之后再请求;如果返回的错误里面 GetCommitTsExpired 不为空,那么会调用 getTimestampWithRetry 方法重新获取 commitTS 之后再重试提交。
func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *Backoffer, batch batchMutations) error {
req := tikvrpc.NewRequest(...)
sender := NewRegionRequestSender(c.store.regionCache, c.store.GetTiKVClient())
for {
// 重试次数
attempts++
...
//向 tikv 发起提交请求
resp, err := sender.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
...
// 如果遇到了regionError, 则需要重新调用doActionOnMutations重新分组,重新尝试
regionErr, err := resp.GetRegionError()
if err != nil {
return errors.Trace(err)
}
if regionErr != nil {
...
// 重新分组
err = c.doActionOnMutations(bo, actionCommit{true}, batch.mutations)
return errors.Trace(err)
}
commitResp := resp.Resp.(*pb.CommitResponse)
if keyErr := commitResp.GetError(); keyErr != nil {
if rejected := keyErr.GetCommitTsExpired(); rejected != nil {
// 重新获取 commitTS
commitTS, err := c.store.getTimestampWithRetry(bo, c.txn.GetScope())
...
continue
}
...
}
break
}
return nil
}
由于TiDB的二阶段提交是通过 Percolator 分布式事务模型实现的,所以本篇文章首先从 Percolator 分布式事务模型给大家讲解一下它里面主要的写操作和读操作的实现步骤;
然后再带入到 TiDB 的二阶段提交中,从代码中和大家剖析实现原理。整个提交的过程大致如图所示:
https://pingcap.com/zh/blog/tidb-source-code-reading-19
https://asktug.com/t/topic/1495
https://pingcap.com/zh/blog/tidb-source-code-reading-19
https://pingcap.com/zh/blog/percolator-and-txn
https://www.luozhiyun.com/archives/609
https://pingcap.com/zh/blog/tikv-source-code-reading-12
https://github.com/tikv/sig-transaction/tree/master/design/async-commit
https://pingcap.com/zh/blog/async-commit-principle
https://zhuanlan.zhihu.com/p/59115828
http://mysql.taobao.org/monthly/2018/11/02/
https://pingcap.com/zh/blog/best-practice-optimistic-transaction