前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >6.824 raft Lab 4 multi-raft-group KV-Server

6.824 raft Lab 4 multi-raft-group KV-Server

原创
作者头像
冰寒火
修改于 2022-10-23 13:04:01
修改于 2022-10-23 13:04:01
1.1K0
举报
文章被收录于专栏:软件设计软件设计

一、背景

上文6.824 raft Lab 3 kvRaft是实现了一个single raft group的键值数据库,本文实现一个multi-raft-group键值数据库,通过分片和副本来提高容量、性能和可用性。

这个分布式数据库是multi-raft-group,包含总控结点shardctrler、存储结点shardkv。shardctrler用于配置更新,每次增加/减少raft group、移动分片都会生成一个new configuration,并且提供Query接口供client、shardkv获取路由信息。shardkv是一个简易的k-v存储引擎,通过raft实现主从同步机制。

二、功能分析和详细设计

Lab 4是在Lab 3的基础上开发一个支持shard的数据库,可以拷贝一下Lab 3的代码。

1 shardctrler

功能分析:

  1. 存放shard id-->raft group的映射关系,用于client和server路由。
  2. 支持后台管理人员的操作:Join、Leave、Move、Query,每次更新生成一个new configuration。

详细设计:

  1. 用于一个数组configs来保存所有configuration,configs0是dummy config,起到哨兵作用。
  2. config的number在apply到config时才会分配,确保是连续、递增的。

2 shardkv

功能分析:

  1. 定时更新配置。
  2. 从其他raft group迁移分片。
  3. 对被迁移的分片进行删除。

详细设计:

  1. 定时任务:配置更新事件循环,但是要在前一个配置完全更新结束后才行。
  2. 定时任务:迁移分片事件循环,一旦获取到需要迁移的分片信息,立刻从对应raft group进行迁移。
  3. 定时任务:gc事件循环,用于检查需要迁移成功的分片,将它从原结点上删除。
  4. 配置更新、迁移分片、删除分片都由leader完成,并同步到其他结点上。
  5. 一旦配置开始更新,还没有迁移成功的分片、需要被迁移走的分片不能够支持读写了。

3 shardclerk

功能分析:

  1. 计算key-->shard_id。
  2. 负载均衡、重试。
  3. 定时更新配置。

详细设计:

  1. 每次请求shardkv返回ErrWrongGroup时进行一次配置更新。

三、shardctrler代码分析

1 数据结构

代码语言:go
AI代码解释
复制
type ShardCtrler struct {
	mu      sync.Mutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg

	nextNumber int      //版本编号
	configs    []Config // indexed by config num配置

	opContextMap     map[uint64]*OpContext //用于每个请求的上下文
	lastRequestIdMap map[int]uint64        //clientId-->lastRequestId,维持幂等性,需要客户端能够保证串行
}
const NShards = 10

// A configuration -- an assignment of shards to groups.
// Please don't change this.
type Config struct {
	Num    int              // config number
	Shards [NShards]int     // shard -> gid
	Groups map[int][]string // gid -> servers[]
}

func DefaultConfig() Config {
	return Config{
		Num:    0,
		Shards: [10]int{},
		Groups: map[int][]string{},
	}
}

func (sc *ShardCtrler) Config(number int) Config {
	if number == -1 || number >= len(sc.configs)-1 {
		return sc.configs[len(sc.configs)-1]
	}
	return sc.configs[number]
}

这个配置默认是10个分片,而raft group数量不确定。

2 配置更新

无论是Join、Leave、Move,都是在提交成功后、apply到状态机时才分配number,并更新,在此之前是通过增量配置来实现同步的。

代码语言:go
AI代码解释
复制
type ConfigEdit struct {
	NewGroups map[int][]string //Join
	LeaveGids []int            //Leave

	//Move
	ShardId int
	DestGid int
}

apply到状态机,每次更新后都要rebalance。

代码语言:go
AI代码解释
复制
func (sc *ShardCtrler) applyStateMachineLoop() {

	for {

		select {
		case applyMsg := <-sc.applyCh:
			if applyMsg.CommandValid {
				func() {
					sc.mu.Lock()
					defer sc.mu.Unlock()
					op := applyMsg.Command.(Op)
					//保证幂等性
					if op.RequestId <= sc.lastRequestIdMap[op.ClientId] {
						return
					}
					val := Config{}

					switch op.OpType {
					case OpTypeJoin:
						number := sc.nextNumber
						sc.nextNumber++
						//old := sc.Config(-1)
						conf := sc.rebalanceJoin(number, op.Value.NewGroups)
						sc.configs = append(sc.configs, *conf)
						sc.lastRequestIdMap[op.ClientId] = op.RequestId
						//DPrintf("controller applyLoop join, old config: %v new config: %v", mr.Any2String(old), mr.Any2String(conf))
					case OpTypeLeave:
						number := sc.nextNumber
						sc.nextNumber++
						//old := sc.Config(-1)
						conf := sc.rebalanceLeave(number, op.Value.LeaveGids)
						sc.configs = append(sc.configs, *conf)
						sc.lastRequestIdMap[op.ClientId] = op.RequestId
						//DPrintf("controller applyLoop leave, old config: %v new config: %v", mr.Any2String(old), mr.Any2String(conf))

					case OpTypeMove:
						number := sc.nextNumber
						sc.nextNumber++
						//old := sc.Config(-1)
						conf := sc.rebalanceMove(number, op.Value.ShardId, op.Value.DestGid)
						sc.configs = append(sc.configs, *conf)
						sc.lastRequestIdMap[op.ClientId] = op.RequestId
						//DPrintf("controller applyLoop move, old config: %v new config: %v", mr.Any2String(old), mr.Any2String(conf))
					case OpTypeQuery:
						//Get请求不需要更新lastRequestId
						val = sc.Config(op.Number)
					}
					//DPrintf("op: %v, config: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), mr.Any2String(val), sc.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(sc.configs))
					//使得写入的client能够响应
					if c, ok := sc.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
						c.WaitCh <- val
					}
				}()
			}
		}
	}
}
//

3 rebalance

shardctrler管理的是分片在group上的映射,而key-->shard是根据固定的哈希算法计算得来的,如下图。本文的rebalance非常简单,就是根据group数量重新哈希。实际上,rebalance应该着重考虑迁移成本,可以采用:Join时从分片最多的group上迁移一半分片,Leave时将分片全给予分片最少的group,这种方式影响最小,但是可能不是很均匀。

映射关系
映射关系
代码语言:go
AI代码解释
复制
//gids是leader当前config的gids,因为go map遍历是随机的,需要确保主从一致
func (sc *ShardCtrler) rebalanceJoin(number int, newGroups map[int][]string) *Config {

	conf := sc.configs[len(sc.configs)-1]
	//合并当前版本的raft group和新加入的group
	groups := map[int][]string{}
	gids := make([]int, 0)
	for gid, servers := range conf.Groups {
		dst := make([]string, len(servers))
		copy(dst, servers)
		groups[gid] = dst
		gids = append(gids, gid)
	}
	for gid, servers := range newGroups {
		dst := make([]string, len(servers))
		copy(dst, servers)
		groups[gid] = dst
		gids = append(gids, gid)
	}
	sort.Slice(gids, func(i, j int) bool {
		return gids[i] < gids[j]
	})
	var shards [NShards]int
	copy(shards[:], conf.Shards[:])

	for i := 0; i < len(shards); i++ {
		shards[i] = gids[i%len(gids)]
	}
	return &Config{
		Num:    number,
		Shards: shards,
		Groups: groups,
	}
}

func (sc *ShardCtrler) rebalanceLeave(number int, leaveGids []int) *Config {
	//获取这些分片,以及剩余每个group的分片数量
	conf := sc.configs[len(sc.configs)-1]
	var shards [NShards]int
	copy(shards[:], conf.Shards[:])

	groups := map[int][]string{}
	for gid, servers := range conf.Groups {
		dst := make([]string, len(servers))
		copy(dst, servers)
		groups[gid] = dst
	}
	//移除掉删除的group
	for _, gid := range leaveGids {
		delete(groups, gid)
	}
	gids := make([]int, 0)
	for gid, _ := range groups {
		gids = append(gids, gid)
	}
	sort.Slice(gids, func(i, j int) bool {
		return gids[i] < gids[j]
	})
	if len(gids) == 0 {
		shards = [NShards]int{}
	} else {
		for i := 0; i < len(shards); i++ {
			shards[i] = gids[i%len(gids)]
		}
	}
	return &Config{
		Num:    number,
		Shards: shards,
		Groups: groups,
	}

}

func (sc *ShardCtrler) rebalanceMove(number, shardId, togid int) *Config {
	//获取这些分片,以及剩余每个group的分片数量
	conf := sc.configs[len(sc.configs)-1]
	//copy分片
	var newShards [NShards]int
	copy(newShards[:], conf.Shards[:])
	newShards[shardId] = togid

	//copy group
	newGroups := map[int][]string{}
	for gid, servers := range conf.Groups {
		dst := make([]string, len(servers))
		copy(dst, servers)
		newGroups[gid] = dst
	}

	return &Config{
		Num:    number,
		Shards: newShards,
		Groups: newGroups,
	}
}

四、shardkv代码分析

以下讲解会以groupA从groupB迁移shard1为例。

1 数据结构

首先定义给分片的结构。

代码语言:go
AI代码解释
复制
type Shard struct {
	ShardStatus      Status
	KvStore          map[string]string
	LastRequestIdMap map[int]uint64 //去重表
}

然后定义shardkv结构。

代码语言:go
AI代码解释
复制
type ShardKV struct {
	mu           sync.RWMutex
	me           int
	rf           *raft.Raft
	make_end     func(string) *labrpc.ClientEnd //创建其他group连接
	gid          int
	ctrlers      []*labrpc.ClientEnd
	mck          *shardctrler.Clerk //controller的客户端代理
	maxraftstate int   // snapshot if log grows this big
	dead         int32 // set by Kill()

	nextRequestId uint64
	applyCh       chan raft.ApplyMsg
	lastApplied   int

	persister         *raft.Persister
	lastIncludedIndex int

	lastConfig    shardctrler.Config //保留上一次配置,因为要根据这个配置才能够找到对应group进行迁移。
	currentConfig shardctrler.Config
	shardMap      map[int]*Shard
	opContextMap  map[uint64]*OpContext //用于每个请求的上下文
}

相对于之前Lab 3,Lab 4将去重表下放到Shard中。并且添加lastConfig、currentConfig两个字段,lastConfig作用是在迁移分片时提供分片所处位置。make_end是在访问其他group时创建连接发送请求。

代码语言:go
AI代码解释
复制
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, ctrlers []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
	// call labgob.Register on structures you want
	// Go's RPC library to marshall/unmarshall.
    //注册传输时涉及的类型,要不然会Panic
	labgob.Register(Op{})
	labgob.Register(shardctrler.Config{})
	labgob.Register(PullShardArgs{})
	labgob.Register(PullShardReply{})
	labgob.Register(DeleteShardArgs{})
	labgob.Register(DeleteShardReply{})
	labgob.Register(Shard{})

	kv := new(ShardKV)
	kv.me = me
	kv.maxraftstate = maxraftstate
	kv.make_end = make_end
	kv.gid = gid
	//总控节点,更新配置
	kv.ctrlers = ctrlers
	kv.mck = shardctrler.MakeClerk(kv.ctrlers)

	kv.persister = persister
	kv.opContextMap = make(map[uint64]*OpContext)
	kv.applyCh = make(chan raft.ApplyMsg)
	kv.rf = raft.Make(servers, me, persister, kv.applyCh)
	kv.lastConfig = shardctrler.DefaultConfig()
	kv.currentConfig = shardctrler.DefaultConfig()
	kv.shardMap = map[int]*Shard{}
	for shardId, _ := range kv.currentConfig.Shards {
		kv.shardMap[shardId] = &Shard{
			ShardStatus:      NoServing,
			KvStore:          map[string]string{},
			LastRequestIdMap: map[int]uint64{},
		}
	}

	kv.decodeSnapshot(persister.ReadSnapshot())
	DPrintf("init kvserver, group: %v, node[%d]", gid, me)

	go kv.applyStateMachineLoop()

	go kv.StartEventLoop(kv.updateConfigurationEventLoop, time.Millisecond*100)
	go kv.StartEventLoop(kv.pullShardEventLoop, time.Millisecond*100)
	go kv.StartEventLoop(kv.gcShardEventLoop, time.Millisecond*100)

	return kv
}
//启动定时任务,包括配置更新、分片迁移等,只有leader才能够执行
func (kv *ShardKV) StartEventLoop(eventLoop func(), timeOut time.Duration) {
	for !kv.killed() {
		if _, isLeader := kv.rf.GetState(); isLeader {
			eventLoop()
		}
		time.Sleep(timeOut)
	}
}

2 apply事件循环

代码语言:go
AI代码解释
复制
func (kv *ShardKV) applyStateMachineLoop() {

	for !kv.killed() {

		select {
		case applyMsg := <-kv.applyCh:
			if applyMsg.CommandValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					if applyMsg.CommandIndex <= kv.lastApplied {
						return
					}
					kv.lastApplied = applyMsg.CommandIndex
					op := applyMsg.Command.(Op)

					var resp *ExecuteResponse
					switch op.OpType {
					case OpTypePut:
						resp = kv.applyPutOperation(&op)
					case OpTypeAppend:
						resp = kv.applyAppendOperation(&op)
					case OpTypeGet:
						resp = kv.applyGetOperation(&op)
					case OpTypeUpdateConfig:
						resp = kv.applyConfiguration(&op)
					case OpTypeAddShard:
						resp = kv.applyAddShard(&op)
					case OpTypeDeleteShard:
						resp = kv.applyDeleteShard(&op)
					}
					DPrintf("shardKV applyStateMachineLoop, gid[%d] node[%d] op: %v, resp: %v", kv.gid, kv.me, mr.Any2String(op), mr.Any2String(resp))

					//使得写入的client能够响应
					if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
						c.WaitCh <- resp
					}
					kv.maybeSnapshot(applyMsg.CommandIndex)
				}()
			} else if applyMsg.SnapshotValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					if kv.decodeSnapshot(applyMsg.Snapshot) {
						kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
					}
				}()
			}
		}
	}
}

3 状态转移

分片的状态有如下几种:

代码语言:go
AI代码解释
复制
type Status int
//每个group拥有的分片可能是以下情况
const (
	Serving   Status = 1 //拥有属于自己的分片,可正常读写
	Pulling   Status = 2 //正在迁移隶属自己的分片
	BePulling Status = 3 //拥有但不属于自己的分片,等待对方迁移
	GCing     Status = 4 //已经迁移到分片,但需要通知原group gc掉这个分片,此时可读写
	NoServing Status = 5 //不负责该分片
)
分片状态转移
分片状态转移

shardkv启动时,全是NoServing状态。第一次配置更新不涉及到分片迁移,所以只有Serving、NoServing状态。再次更新时,就可能存在Pulling、BePulling状态的分片,此时pullShardEventLoop会收集Pulling状态的shard,从对应group迁移,迁移成功就通过raft同步到group其他结点上,apply时将分片状态改为GCing。然后gcShardEventLoop收集GCing的分片,通知原group删除掉这个分片,并且本group修改状态到Serving。

4 配置更新

代码语言:go
AI代码解释
复制
func (kv *ShardKV) updateConfigurationEventLoop() {
	canUpdateConfig := true
	defer func() {
		DPrintf("updateConfigurationEventLoop, gid[%v] node[%v] leaderId[%v], canUpdateConfig[%v] config: %v,  shardMap: %v", kv.gid, kv.me, kv.rf.LeaderId(), canUpdateConfig, mr.Any2String(kv.currentConfig), mr.Any2String(kv.shardMap))
	}()
	kv.mu.RLock()
	for _, shard := range kv.shardMap {
		if shard.ShardStatus != Serving && shard.ShardStatus != NoServing {
			canUpdateConfig = false
		}
	}
	num := kv.currentConfig.Num
	kv.mu.RUnlock()
	//前一个配置没有迁移完,不能再次更新配置
	if !canUpdateConfig {
		return
	}
	//依序号顺序更新
	conf := kv.mck.Query(num + 1)
	if conf.Num == num+1 {
		op := Op{
			ClientId:       kv.gid*1000 + kv.me,
			RequestId:      atomic.AddUint64(&kv.nextRequestId, 1),
			OpType:         OpTypeUpdateConfig,
			Value:          conf,
			StartTimestamp: time.Now().UnixMilli(),
		}
        //写入raft
		kv.execute(op)
	}
}

只有上一个配置更新完毕后才能进行下一个配置更新,而且不能跨越版本更新,防止迁移分片失败、迁移到旧数据以及分片无法gc等各种复杂情况。配置更新的日志被提交后,它的apply逻辑如下:

代码语言:go
AI代码解释
复制
func (kv *ShardKV) applyConfiguration(op *Op) *ExecuteResponse {

	conf := op.Value.(shardctrler.Config)
	defer func() {
		DPrintf("applyConfiguration, gid[%v] node[%v] leaderId[%v], currentConfig: %v, lastConfig: %v", kv.gid, kv.me, kv.rf.LeaderId(), mr.Any2String(kv.currentConfig), mr.Any2String(kv.lastConfig))
	}()
	//配置必须逐次递增的更新,不能跳跃,避免导致旧数据等奇怪问题
	if conf.Num != kv.currentConfig.Num+1 {
		return &ExecuteResponse{Err: ErrOutDated}
	}
	//此时所有分片的状态只有Serving和NoServing两种
	for shardId, gid := range conf.Shards {
		if gid == kv.gid {
			if kv.currentConfig.Shards[shardId] == 0 {
				//该分片上一次没有被分配,则本次不需要迁移
				kv.shardMap[shardId].ShardStatus = Serving
			} else if kv.shardMap[shardId].ShardStatus == NoServing {
				//该分片上次和本次都由本gid负责,不需要迁移,否则迁移
				kv.shardMap[shardId].ShardStatus = Pulling
			}
		} else {
			if kv.shardMap[shardId].ShardStatus == Serving {
				kv.shardMap[shardId].ShardStatus = BePulling
			}
		}
	}

	kv.lastConfig = kv.currentConfig
	kv.currentConfig = conf
	return &ExecuteResponse{Err: OK}
}

apply配置时,重点就是ShardStatus的更新。更新状态前

  1. groupA本次负责 shard1,但是上次不负责,那么状态就是Pulling。
  2. groupA本次负责shard1且上次也负责,那么状态不变。
  3. groupA本次不负责shard1,但是上次负责,那么状态变为BePulling。
  4. 首个配置不需要迁移。

5 迁移分片

分片迁移协程定期检测处于Pulling状态的分片,利用lastConfig获取对应的gid,然后并发的拉取分片数据。此次groupA的pull协程检测到shard1处于Pulling状态,然后向groupB拉取shard1,拉取成功后写入raft。groupA成功apply这条日志,将shard1的status更新为GCing。

代码语言:go
AI代码解释
复制
func (kv *ShardKV) pullShardEventLoop() {
	kv.mu.RLock()
	gid2ShardId := kv.getShardIdsByStatus(Pulling)
	//DPrintf("pullShardEventLoop, gid: %v, node: %v, leaderId: %v,  gid2ShardId: %v, shardMap: %v", kv.gid, kv.me, kv.rf.LeaderId(), mr.Any2String(gid2ShardId), mr.Any2String(kv.shardMap))
	wg := &sync.WaitGroup{}
	for gid, shardIds := range gid2ShardId {
		wg.Add(1)
		go func(servers []string, configNum int, shardIds []int) {
			defer wg.Done()
			args := &PullShardArgs{
				Num:      configNum,
				ShardIds: shardIds,
			}
			for _, server := range servers {
				reply := &PullShardReply{}
				srv := kv.make_end(server)
				//拉取分片成功后就写入raft
				if srv.Call("ShardKV.PullShard", args, reply) && reply.Err == OK {
					op := Op{
						OpType:         OpTypeAddShard,
						Value:          *reply,
						StartTimestamp: time.Now().UnixMilli(),
					}
					kv.execute(op)
					break
				}
			}
		}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIds)
	}
	kv.mu.RUnlock()
	wg.Wait()
}


func (kv *ShardKV) applyAddShard(op *Op) *ExecuteResponse {
	reply := op.Value.(PullShardReply)
	//DPrintf("applyAddShard, gid: %v, node: %v, shard: %v", kv.gid, kv.me, mr.Any2String(reply))
	if reply.Num == kv.currentConfig.Num {
		for shardId, shard := range reply.ShardMap {
			if kv.shardMap[shardId].ShardStatus == Pulling {
				kv.shardMap[shardId] = shard.deepCopy()
                //gcEventLoop会检测到这个分片,并通知远端group删除分片
				kv.shardMap[shardId].ShardStatus = GCing //对方可以删除这个分片了
			} else {
				//DPrintf("gid[%d] node[%d] duplicated insert shard: %v", kv.gid, kv.me, mr.Any2String(shard))
				break
			}
		}
		return &ExecuteResponse{Err: OK}
	}
	return &ExecuteResponse{Err: ErrOutDated}
}

分片迁移要保证幂等性,不能重复更新分片,造成丢失数据的现象,因此只有config.Num==args.Num且ShardStatus==Pulling时才能更新分片,更新之后ShardStatus==GCing。

代码语言:go
AI代码解释
复制
//如果存在该分片,则传输分片
func (kv *ShardKV) PullShard(args *PullShardArgs, reply *PullShardReply) {

	defer func() {
		//DPrintf("shardKV PullShard, gid: %v, node: %v,  args: %v, reply: %v, currentConfig.Num: %v, args.Num: %v, shardMap: %v", kv.gid, kv.me, mr.Any2String(args), mr.Any2String(reply), kv.currentConfig.Num, args.Num, mr.Any2String(kv.shardMap))
	}()
	if _, isLeader := kv.rf.GetState(); !isLeader {
		reply.Err = ErrWrongLeader
		return
	}
	kv.mu.RLock()
	defer kv.mu.RUnlock()
	//configNum逐次递增,每个配置更新完成表示该group只有Serving和NoServing状态的shard
	//如果本group的configNum>args.Num,表示本group一定没有对方的shard
	//如果本group的configNum<args.Num,那么需要等待本group的config跟上来才行,避免迁移到旧shard
	if kv.currentConfig.Num != args.Num {
		reply.Err = ErrNotReady
		return
	}
	shardMap := map[int]*Shard{}
	for _, shardId := range args.ShardIds {
		shardMap[shardId] = kv.shardMap[shardId].deepCopy()
	}
	reply.ShardMap = shardMap
	reply.Num = kv.currentConfig.Num
	reply.Err = OK
}

6 清理分片

groupA的gc协程检测GCing状态的分片,并通知groupB删除。groupB收到后就会写入分片删除日志,提交后更新分片状态。

代码语言:go
AI代码解释
复制
func (kv *ShardKV) gcShardEventLoop() {
	kv.mu.RLock()
	gid2ShardIds := kv.getShardIdsByStatus(GCing)
	defer func() {
		DPrintf("getShardIdsByStatus, gid[%d] node[%d] num[%d] status: %v, gid2ShardIds: %v, shardMap: %v", kv.gid, kv.me, kv.currentConfig.Num, GCing, mr.Any2String(gid2ShardIds), mr.Any2String(kv.shardMap))
	}()
	wg := &sync.WaitGroup{}
	for gid, shardIds := range gid2ShardIds {
		wg.Add(1)
		go func(servers []string, num int, shardIds []int) {
			defer wg.Done()
			args := DeleteShardArgs{
				Num:      num,
				ShardIds: shardIds,
			}
			for _, server := range servers {
				reply := &DeleteShardReply{}
				srv := kv.make_end(server)
				if srv.Call("ShardKV.DeleteShard", &args, reply) && reply.Err == OK {
					op := Op{
						OpType:         OpTypeDeleteShard,
						Value:          args,
						StartTimestamp: time.Now().UnixMilli(),
					}
					kv.execute(op)
					break
				}
			}
		}(kv.lastConfig.Groups[gid], kv.currentConfig.Num, shardIds)
	}
	kv.mu.RUnlock()
	wg.Wait()
}


func (kv *ShardKV) DeleteShard(args *DeleteShardArgs, reply *DeleteShardReply) {

	term, isLeader := kv.rf.GetState()
	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}
	defer func() {
		//DPrintf("DeleteShard, gid[%d] node[%d], num: %v, args: %v, reply: %v, config: %v", kv.gid, kv.me, kv.currentConfig.Num, mr.Any2String(args), mr.Any2String(reply), mr.Any2String(kv.currentConfig))
	}()
	kv.mu.Lock()
	//过期请求也算删除成功
	if kv.currentConfig.Num > args.Num {
		reply.Err = OK
		defer kv.mu.Unlock()
		return
	}

	op := Op{
		ClientId:       kv.gid*1000 + kv.me,
		RequestId:      atomic.AddUint64(&kv.nextRequestId, 1),
		OpType:         OpTypeDeleteShard,
		Value:          *args,
		StartTimestamp: time.Now().UnixMilli(),
	}
	opContext := NewOpContext(&op, term)
	kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)] = opContext
	kv.mu.Unlock()

	defer func() {
		kv.mu.Lock()
		delete(kv.opContextMap, UniqueRequestId(op.ClientId, op.RequestId))
		kv.mu.Unlock()
	}()
	err := kv.execute(op)
	if err != OK {
		reply.Err = err
		return
	}
	select {
	case resp := <-opContext.WaitCh:
		reply.Err = resp.Err
	case <-time.After(time.Second):
		reply.Err = ErrTimeout
	}
}

groupA和groupB都会写入分片删除日志,groupB是为了清理分片,groupA是为了将分片状态GCing-->Serving。

代码语言:go
AI代码解释
复制
func (kv *ShardKV) applyDeleteShard(op *Op) *ExecuteResponse {
	defer func() {
		//DPrintf("applyDeleteShard, gid[%d] node[%d], op: %v, config: %v", kv.gid, kv.me, mr.Any2String(op), mr.Any2String(kv.currentConfig))
	}()
	args := op.Value.(DeleteShardArgs)
	num, shardIds := args.Num, args.ShardIds
	if num == kv.currentConfig.Num {
		for _, shardId := range shardIds {
			shard := kv.shardMap[shardId]
			if shard.ShardStatus == GCing {
				shard.ShardStatus = Serving
			} else if shard.ShardStatus == BePulling {
				kv.shardMap[shardId] = &Shard{
					ShardStatus:      NoServing,
					KvStore:          map[string]string{},
					LastRequestIdMap: map[int]uint64{},
				}
			} else {
				break
			}
		}
	}
	return &ExecuteResponse{Err: OK}
}

7 snapshot

snapshot这一部分和Lab 3相似,只是持久化的字段有些许变化,增加了currentConfig、lastConfig。可能大家觉得config可以直接从shardctrler恢复,但是这并不可行,因为我们只能够更新完成一个config才能够进入下一config的更新,如果前一config更新一部分然后group重启直接拉取最新config进行更新,就会跳过部分config,造成迁移失败甚至死锁。snapshot是根据日志数据量进行压缩的,并不会考虑当前config更新进度。

代码语言:go
AI代码解释
复制
func (kv *ShardKV) maybeSnapshot(index int) {
	if kv.maxraftstate == -1 {
		return
	}
	if kv.persister.RaftStateSize() > kv.maxraftstate {
		//DPrintf("maybeSnapshot starting, index: %v", index)
		kv.rf.Snapshot(index, kv.encodeSnapshot(index))
	}
}

func (kv *ShardKV) encodeSnapshot(lastIncludedIndex int) []byte {

	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(kv.currentConfig)
	e.Encode(kv.lastConfig)
	e.Encode(lastIncludedIndex)
	e.Encode(kv.shardMap)
	return w.Bytes()
}
func (kv *ShardKV) decodeSnapshot(snapshot []byte) bool {

	if len(snapshot) == 0 {
		return true
	}
	r := bytes.NewBuffer(snapshot)
	d := labgob.NewDecoder(r)

	if err := d.Decode(&kv.currentConfig); err != nil {
		return false
	}
	if err := d.Decode(&kv.lastConfig); err != nil {
		return false
	}
	if err := d.Decode(&kv.lastIncludedIndex); err != nil {
		return false
	}
	if err := d.Decode(&kv.shardMap); err != nil {
		return false
	}
	return true
}

8 读写流程

8.1 过滤逻辑

读写流程跟Lab 3相似,只是需要添加过滤条件,包括leader过滤、group过滤、Shard Status过滤、去重。

代码语言:go
AI代码解释
复制
func (kv *ShardKV) check(opType OpType, key string, clientId int, requestId uint64) (int, Err) {

	term, isLeader := kv.rf.GetState()
	if !isLeader {
		return term, ErrWrongLeader
	}
	shardId := key2shard(key)
	//DPrintf("ShardKV server, gid: %v, node: %v, leaderId: %v, shardId: %v, shardMap: %v", kv.gid, kv.me, kv.rf.LeaderId(), shardId, mr.Any2String(kv.shardMap))
	shard := kv.shardMap[shardId]
	if shard.ShardStatus == Pulling {
		return 0, ErrNotReady
	}
	if shard.ShardStatus == NoServing || shard.ShardStatus == BePulling {
		return 0, ErrWrongGroup
	}

	if opType == OpTypeGet {
		return term, OK
	}
	if lastRequestId, ok := kv.shardMap[shardId].LastRequestIdMap[clientId]; ok && lastRequestId >= requestId {
		return term, ErrDuplicated
	}
	return term, OK
}
//apply时过滤
func (kv *ShardKV) isAvailable(shardId int) Err {
	if kv.currentConfig.Shards[shardId] != kv.gid {
		return ErrWrongGroup
	}
	if kv.shardMap[shardId].ShardStatus == Serving || kv.shardMap[shardId].ShardStatus == GCing {
		return OK
	}
	return ErrNotReady
}
func (kv *ShardKV) isDuplicatedRequest(shardId, clientId int, requestId uint64) bool {
	return kv.shardMap[shardId].LastRequestIdMap[clientId] >= requestId
}

8.2 Get逻辑

代码语言:go
AI代码解释
复制
func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {

	kv.mu.Lock()
	term, err := kv.check(OpTypeGet, args.Key, args.ClientId, args.RequestId)

	reply.Err = err
	if err != OK {
		kv.mu.Unlock()
		return
	}
	op := &Op{
		ClientId:       args.ClientId,
		RequestId:      args.RequestId,
		OpType:         OpTypeGet,
		Key:            args.Key,
		StartTimestamp: time.Now().UnixMilli(),
	}
	opContext := NewOpContext(op, term)
	kv.opContextMap[opContext.UniqueRequestId] = opContext
	kv.mu.Unlock()
	defer func() {
		//DPrintf("server Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		kv.mu.Lock()
		delete(kv.opContextMap, opContext.UniqueRequestId)
		kv.mu.Unlock()
	}()
	_, _, ok := kv.rf.Start(*op)
	if !ok {
		reply.Err = ErrWrongLeader
		return
	}
	//阻塞等待
	select {
	case res := <-opContext.WaitCh:
		reply.Err = res.Err
		if reply.Err == OK && res.Value != nil {
			reply.Value = res.Value.(string)
		}
	case <-time.After(time.Millisecond * 1000):
		reply.Err = ErrTimeout
	}
}

func (kv *ShardKV) applyGetOperation(op *Op) *ExecuteResponse {

	shardId := key2shard(op.Key)
	resp := &ExecuteResponse{Err: OK}
	defer func() {
		//DPrintf("applyGetOperation, gid[%d] node[%d] leaderId[%d] op: %v, resp: %v, shardMap: %v", kv.gid, kv.me, kv.rf.LeaderId(), mr.Any2String(op), mr.Any2String(resp), mr.Any2String(kv.shardMap))
	}()
	if err := kv.isAvailable(shardId); err != OK {
		resp.Err = err
		return resp
	}
	if val, ok := kv.shardMap[shardId].KvStore[op.Key]; !ok {
		resp.Err = ErrNoKey
	} else {
		resp.Value = val
	}
	return resp
}

8.3 PutAppend逻辑

代码语言:go
AI代码解释
复制
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {

	kv.mu.Lock()
	term, err := kv.check(OpType(args.Op), args.Key, args.ClientId, args.RequestId)
	if err != OK {
		reply.Err = err
		kv.mu.Unlock()
		return
	}
	op := &Op{
		ClientId:       args.ClientId,
		RequestId:      args.RequestId,
		OpType:         OpType(args.Op),
		Key:            args.Key,
		Value:          args.Value,
		StartTimestamp: time.Now().UnixMilli(),
	}
	opContext := NewOpContext(op, term)
	kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)] = opContext
	defer func() {
		//DPrintf("server PutAppend, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		kv.mu.Lock()
		delete(kv.opContextMap, UniqueRequestId(op.ClientId, op.RequestId))
		kv.mu.Unlock()
	}()
	kv.mu.Unlock()
	_, _, ok := kv.rf.Start(*op)
	if !ok {
		reply.Err = ErrWrongLeader
		return
	}
	//阻塞等待
	select {
	case res := <-opContext.WaitCh:
		reply.Err = res.Err
	case <-time.After(time.Millisecond * 1000):
		reply.Err = ErrTimeout
	}
}

func (kv *ShardKV) applyPutOperation(op *Op) *ExecuteResponse {
	shardId := key2shard(op.Key)
	if err := kv.isAvailable(shardId); err != OK {
		return &ExecuteResponse{Err: err}
	}
	if kv.isDuplicatedRequest(shardId, op.ClientId, op.RequestId) {
		return &ExecuteResponse{Err: ErrDuplicated}
	}
	kv.shardMap[shardId].KvStore[op.Key] = op.Value.(string)
	kv.shardMap[shardId].LastRequestIdMap[op.ClientId] = op.RequestId
	return &ExecuteResponse{Err: OK}
}
func (kv *ShardKV) applyAppendOperation(op *Op) *ExecuteResponse {
	shardId := key2shard(op.Key)
	if err := kv.isAvailable(shardId); err != OK {
		return &ExecuteResponse{Err: err}
	}
	if kv.isDuplicatedRequest(shardId, op.ClientId, op.RequestId) {
		return &ExecuteResponse{Err: ErrDuplicated}
	}
    //
	kv.shardMap[shardId].KvStore[op.Key] += op.Value.(string)
	kv.shardMap[shardId].LastRequestIdMap[op.ClientId] = op.RequestId
	return &ExecuteResponse{Err: OK}
}

五、总结

1 测试结果

shardctrler测试结果
shardctrler测试结果
shardkv测试结果
shardkv测试结果

2 小结

本实验是6.824最后一个实验,实现了一个简易的multi-raft-group数据库,通过多副本、多分片来扩大容量、提高性能、提高容错性。

这个数据库中包含一个总控节点shardctrler来支持配置变更,shardkv来负责分片读写、分片迁移、分片清理等,并能够支持客户端幂等性、线性读。

分布式系统排查问题最为麻烦,只能通过测试用例和日志来定位问题,而且一些薄弱点未必能够测出来。目前本系统比较粗糙,只是实现了功能性需求,而对于性能、延迟、可用性等非功能性需求做的并不好,还有待提高。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
让智能音箱胡言乱语、乱下指令,只需要一部手机+一个喇叭
智能音箱近两年走入了很多家庭的生活,成为了娱乐、购物、日程管理、儿童陪伴甚至教育方面的帮手。但是,智能音箱的安全问题也日益受到关注。继今年 11 月份,有研究使用激光黑掉智能音箱后,又有新的破解方法来了。这回直接用定向声波。
机器之心
2019/12/24
9220
声源定位系统设计(一)——MVDR波束形成算法「建议收藏」
下一篇:声源定位系统设计(二)——MUSIC算法以及Python代码实现将讲述本篇博客中算法的代码实现以及另一种波束形成算法。
全栈程序员站长
2022/11/09
4.7K0
声源定位系统设计(一)——MVDR波束形成算法「建议收藏」
超低功耗解决方案如何赋能Always-on语音交互系统
消费者越来越需要可以随时通过语音控制的产品,可以与数字世界更加安全的和自然的交互。
用户6026865
2020/07/06
1.6K0
超低功耗解决方案如何赋能Always-on语音交互系统
谁说偷窥一定要趴墙头?这个系统可用声波「看见」墙后物体
我站在墙前,想看到拐角处我视线范围之外的事物,除了伸长脖子或者走过去,还有别的方法吗?
机器之心
2019/06/23
1.3K0
科大讯飞李伟:人机交互如何选择合适的「耳朵」
AI 研习社按:人工智能当前正处于爆发阶段,语音交互作为人工智能的重要组成部分正在各行业全面的落地,在人机进行语音交互的过程中,机器需要通过耳朵实现听觉的作用。
AI研习社
2018/07/26
1.3K0
科大讯飞李伟:人机交互如何选择合适的「耳朵」
智能语音交互中的麦克风阵列技术
近年来,随着语音识别技术的发展成熟,语音交互越来越多的走进我们的生活。从苹果手机Siri助手的横空出世开始,各大公司纷纷效仿开发自己的语音助手和语音识别平台,手机端的近场语音交互日趋成熟。后来Amazon发布Echo智能音箱,开启了智能硬件远场语音交互时代。相比于Siri手机端近场的语音交互,Echo音箱的语音交互支持距离更远,交互更加自然便捷,它使用了麦克风阵列来保证远距离复杂背景噪声和干扰环境下的良好拾音效果,随后麦克风阵列逐渐成为了后续语音交互智能硬件的标配。
DancingWind
2019/08/01
10.9K0
智能语音交互中的麦克风阵列技术
全球顶级语音技术比赛中获双料冠军,这家中国公司靠什么?
腾讯、西工大、CMU等国内外机构是这场对决的主办方,两项比赛内容是语音行业的前沿研究,针对真实视频会议场景。
量子位
2021/07/19
5880
日本富士通:我们研制出了世界首款小型免提式语音翻译器 | 黑科技
据悉,近日富士通实验室对外表示,他们研究出了世界上首款胸卡大小的免提式语音翻译器。 在与患者交流病情的同时,医院的医务人员手中总是有很多事情要做,伴随着国际化,患者常常不是本国人,在这样的情况下,交谈
镁客网
2018/05/30
4780
智能音箱大战全面开火,那么问题来了:如何成为一名全栈语音识别工程师?
文 / 陈孝良 11月16号,百度发布了渡鸦智能音箱和DuerOS开发板SoundPi,至此,国内再一名巨头加入智能音箱大战。迄今为止,国内战场上的巨头有阿里、京东、腾讯、百度、小米、科大讯飞等,国外则有苹果、微软、亚马逊、谷歌、脸书、三星等,这些巨头占据了全球市值的排名榜,同时发力争夺未来人工智能时代的语音入口,甚至亚马逊和阿里率先不惜代价开启了补贴大战。这些全球巨头的激烈竞争,将对未来十年产生极其重要的影响,同时,这更是新一波的职业快速发展机会。 语音智能当前的核心关键是声学问题和语义理解,随着市
AI科技大本营
2018/04/27
1.1K0
智能音箱大战全面开火,那么问题来了:如何成为一名全栈语音识别工程师?
Yobe推出AI系统,分离人群中的语音,错误率降低85%
现在智能助手和支持语音的扬声器比以往更受欢迎。据Voicebot称,约有4730万美国成年人使用智能音箱,超过一半的智能手机用户(52%)表示他们在移动设备上使用语音助手。但普及并不一定转化为准确性。但从人群中隔离语音时,它们往往效果很差。
AiTechYun
2018/12/05
5160
浅析硬件“好声音”:麦克风技术指标及选型指南
这类设备中都集成了麦克风和喇叭等电声器件,其中麦克风用于识别用户的声音,喇叭用于播放设备对用户指令的反应。麦克风的性能是影响语音唤醒率高低的重要因数,而喇叭的性能会影响打断唤醒率和用户的主观体验。接下来将分两篇文章对麦克风和喇叭的一些主要性能参数进行解析,给大家在产品设计时选择声学器件提供一些帮助。
硬件大熊
2022/06/23
3K0
浅析硬件“好声音”:麦克风技术指标及选型指南
语音识别现状与工程师必备技能
作者 | 陈孝良 责编 | 胡永波 目前来看,语音识别的精度和速度比较取决于实际应用环境,在安静环境、标准口音、常见词汇上的语音识别率已经超过95%,完全达到了可用状态,这也是当前语音识别比较火热的原因。 随着技术的发展,现在口音、方言、噪声等场景下的语音识别也达到了可用状态,但是对于强噪声、超远场、强干扰、多语种、大词汇等场景下的语音识别还需要很大的提升。当然,多人语音识别和离线语音识别也是当前需要重点解决的问题。 学术界探讨了很多语音识别的技术趋势,有两个思路是非常值得关注的,一个是就是端到端的语音识别
用户1737318
2018/06/05
2K0
声学相机:用“眼睛”听见声音的奥秘
说到终极武器,它的核心秘密是“机器听觉”。我们知道人类的听觉系统犹如一台精密的生物仪器:双耳通过时间差与强度差的微妙感知,能瞬间分辨出雨滴敲窗与孩童笑声的方向;而机器听觉依赖麦克风阵列与算法模型,虽能精准识别语音指令,却难以像人类般在交响乐中锁定单簧管音轨,或在装修工地中区分电钻与锤击声。这种差异源于感知维度的局限——人类听觉融合了经验记忆与情感共鸣,而机器仅能处理数字化的声波参数。正因如此,​声音可视化技术应运而生,成为跨越物种感知鸿沟的桥梁。它通过波束成形聚焦声源、扫描成像还原三维声场,将声波的振幅、频率等参数转化为动态热力图或频谱动画。例如,声学相机能通过麦克风阵列同步接收多个通道的声音信号,依据相控阵波束形成原理计算得到设备基准发射面上的声场分布云图。测量中同步记录设备的可见光图像,以其为背景,通过几何配准将声场分布彩色等高线云图与可见光图像叠加显示,获得声学成像结果。声学成像结果中直观显示了声源空间位置、强度和频谱等特征。
黄成甲
2025/04/01
3940
音频增益响度分析 ReplayGain 附完整C代码示例
人们所熟知的图像方面的3A算法有: AF自动对焦(Automatic Focus) 自动对焦即调节摄像头焦距自动得到清晰的图像的过程 AE自动曝光(Automatic Exposure) 自动曝光的是为了使感光器件获得合适的曝光量 AW自动白平衡(Automatic White Balance) 白平衡的本质是使白色物体在任何光源下都显示白色 与之相对应的音频方面的3A算法是: AGC自动增益补偿(Automatic Gain Control) 自动调麦克风的收音量,使与会者收到一定的音量水平,不会因发言者
cpuimage
2018/04/16
2K0
腾讯AI Lab语音技术中心应用与研究介绍
“CCF语音对话与听觉专业组走进企业系列活动”第十期之“走进腾讯”研讨会于上周六圆满闭幕,本次研讨会由上海交通大学钱彦旻副教授主持,并邀请到四位专家介绍腾讯语音及对话领域的最新成果,分别是: 腾讯AI Lab语音技术中心副总监苏丹博士,腾讯AI Lab资深算法专家卢恒博士,腾讯语言算法专家黄申博士,腾讯多媒体实验室高级总监商世东。 其中,腾讯 AI Lab语音技术中心副总监苏丹博士作了题为《腾讯AI Lab语音技术中心应用与研究介绍》的学术报告,主要介绍了腾讯AI Lab语音技术中心的主要应用落地,分
腾讯技术工程官方号
2020/09/14
3K1
ZLG深度解析——语音识别技术
语言作为人类的一种基本交流方式,在数千年历史中得到持续传承。近年来,语音识别技术的不断成熟,已广泛应用于我们的生活当中。语音识别技术是如何让机器“听懂”人类语言?本文将为大家从语音前端处理、基于统计学语音识别和基于深度学习语音识别等方面阐述语音识别的原理。
刘盼
2019/05/17
2.5K0
ZLG深度解析——语音识别技术
Alango - Speech Recognition Enhancement
穿戴和听力设备(wearable and hearable)设备需要永远在线(Always-on),这对于用户来说非常重要。我们不难想象出其重要性,比如外科医生(surgeon)在外科手术时佩戴智能眼镜,或者是建筑师在勘察施工现场的时候与电气工程师交流等等,所有这些用户场景都需要经过Alango 语音识别增强的(Speech Recognition Enhancement)自动语音识别技术。
用户6026865
2020/04/27
6460
深度学习在AEC中的应用探索
我们可以想象为两个人通电话,从左框看到的远端信号(Far-End)是指对方传过来的信号x(n),而右框的近端信号(Near-End)指着本地麦克风收到的信号y(n)。
LiveVideoStack
2019/12/17
3K0
深度学习在AEC中的应用探索
方案:汽车NVH与噪声定位系统
NVH(Noise、Vibration、Harshness噪声、振动与声振粗糙度)是衡量汽车制造质量的重要参数,可分为发动机NVH、车身NVH和底盘NVH三大部分。NVH直接决定着驾乘汽车的舒适度,有统计资料显示,整车约有1/3的故障问题是和车辆的NVH问题有关系,而各大公司有近20%的研发费用消耗在解决车辆的NVH问题上。
SHOUT
2022/05/31
2.3K0
方案:汽车NVH与噪声定位系统
基于麦克风阵列的现有声源定位技术有_高斯滤波 椒盐噪声
目前基于麦克风阵列的声源定位方法大致可以分为三类:基于最大输出功率的可控波束形成技术、基于高分辨率谱图估计技术和基于声音时间差(time-delay estimation,TDE)的声源定位技术。
全栈程序员站长
2022/09/20
1.7K0
推荐阅读
相关推荐
让智能音箱胡言乱语、乱下指令,只需要一部手机+一个喇叭
更多 >
LV.1
诺谛智能首席架构师
目录
  • 一、背景
  • 二、功能分析和详细设计
    • 1 shardctrler
    • 2 shardkv
    • 3 shardclerk
  • 三、shardctrler代码分析
    • 1 数据结构
    • 2 配置更新
    • 3 rebalance
  • 四、shardkv代码分析
    • 1 数据结构
    • 2 apply事件循环
    • 3 状态转移
    • 4 配置更新
    • 5 迁移分片
    • 6 清理分片
    • 7 snapshot
    • 8 读写流程
      • 8.1 过滤逻辑
      • 8.2 Get逻辑
      • 8.3 PutAppend逻辑
  • 五、总结
    • 1 测试结果
    • 2 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档