前面实现了raft协议,本文实现一个单机键-值数据库,并通过raft建立主从架构,使得能够容错,但是没有分片。
这个键-值数据库需要实现以下几点功能:
server端:
client端:
client端:
server端:
var clientGerarator int32
//UniqueRequestId=clientId<<32+nextRequestId
type Clerk struct {
mu sync.Mutex
servers []*labrpc.ClientEnd
lastRpcServerId int //上次请求的server
clientId int //唯一
nextRequestId uint64 //递增
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
ck.mu = sync.Mutex{}
ck.lastRpcServerId = 0
ck.clientId = int(atomic.AddInt32(&clientGerarator, 1))
ck.nextRequestId = 0
return ck
}
func (ck *Clerk) currentRpcServerId() int {
//ck.mu.Lock()
//defer ck.mu.Unlock()
return ck.lastRpcServerId
}
func (ck *Clerk) setRpcServerId(rpcServerId int) {
//ck.mu.Lock()
//defer ck.mu.Unlock()
ck.lastRpcServerId = rpcServerId
ck.lastRpcServerId %= len(ck.servers)
}
func (ck *Clerk) Get(key string) string {
start := time.Now()
defer func() {
DPrintf("client Get cost: %v", time.Now().Sub(start).Milliseconds())
}()
ck.mu.Lock()
defer ck.mu.Unlock()
args := &GetArgs{
ClientId: ck.clientId,
RequestId: atomic.AddUint64(&ck.nextRequestId, 1), //
Key: key,
}
rpcServerId := ck.currentRpcServerId()
for {
reply := &GetReply{}
ok := ck.servers[rpcServerId].Call("KVServer.Get", args, reply)
//DPrintf("client Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
if !ok {
rpcServerId++
rpcServerId %= len(ck.servers)
} else if reply.Err == OK {
ck.setRpcServerId(rpcServerId) //记录调用成功的server,便于下次调用
return reply.Value
} else {
rpcServerId++
rpcServerId %= len(ck.servers)
}
time.Sleep(time.Millisecond * 1)
}
}
func (ck *Clerk) PutAppend(key string, value string, op string) {
start := time.Now()
ck.mu.Lock()
defer ck.mu.Unlock()
args := &PutAppendArgs{
ClientId: ck.clientId,
RequestId: atomic.AddUint64(&ck.nextRequestId, 1),
Key: key,
Value: value,
Op: op,
}
defer func() {
DPrintf("client Put cost: %v,op: %v, key: %v, value: %v, args: %v", time.Now().Sub(start).Milliseconds(), op, key, value, mr.Any2String(args))
}()
rpcServerId := ck.currentRpcServerId()
for {
reply := &PutAppendReply{}
ok := ck.servers[rpcServerId].Call("KVServer.PutAppend", args, reply)
//DPrintf("client Put, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
if !ok {
DPrintf("client Put, ok: %v, rpcServerId: %d, args: %v, reply: %v", ok, rpcServerId, mr.Any2String(args), mr.Any2String(reply))
rpcServerId++
rpcServerId %= len(ck.servers)
} else if reply.Err == OK {
ck.setRpcServerId(rpcServerId)
DPrintf("client Put, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
return
} else {
DPrintf("client Put, rpcServerId: %d, args: %v, reply: %v", rpcServerId, mr.Any2String(args), mr.Any2String(reply))
rpcServerId++
rpcServerId %= len(ck.servers)
}
time.Sleep(time.Millisecond * 1)
}
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}
客户端实现比较简单,两者都不断的重试,直至请求成功。
type OpType string
const (
OpTypeGet = "Get"
OpTypePut = "Put"
OpTypeAppend = "Append"
)
type Op struct {
ClientId int
RequestId uint64
OpType OpType
Key string
Value string
StartTimestamp int64
}
type OpContext struct {
ClientId int
RequestId uint64
UniqueRequestId uint64 //两者结合才是唯一ID
Op *Op
Term int
WaitCh chan string //用于实现写协程和日志应用循环协程的交互
}
func NewOpContext(op *Op, term int) *OpContext {
return &OpContext{
ClientId: op.ClientId,
RequestId: op.RequestId,
UniqueRequestId: UniqueRequestId(op.ClientId, op.RequestId),
Op: op,
Term: term,
WaitCh: make(chan string, 1), //缓冲区1是防止阻塞日志应用协程
}
}
type KVServer struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
kvStore map[string]string //k-v对,状态机
opContextMap map[uint64]*OpContext //用于每个请求的上下文
lastRequestIdMap map[int]uint64 //clientId-->lastRequestId,维持幂等性,需要客户端能够保证串行
}
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
op := &Op{
ClientId: args.ClientId,
RequestId: args.RequestId,
OpType: OpType(args.Op),
Key: args.Key,
Value: args.Value,
StartTimestamp: time.Now().UnixMilli(),
}
term := 0
isLeader := false
reply.Err = ErrWrongLeader
if term, isLeader = kv.rf.GetState(); !isLeader {
return
}
start := time.Now()
defer func() {
DPrintf("server PutAppend cost: %v, requestId: %d, node: %d, leaderId: %d", time.Now().Sub(start).Milliseconds(), op.RequestId, kv.me, kv.rf.LeaderId())
}()
kv.mu.Lock()
//可能存在前一次请求超时,但是这个请求实际上执行成功了,那么就直接return掉
if lastRequestId, ok := kv.lastRequestIdMap[op.ClientId]; ok && lastRequestId >= op.RequestId {
reply.Err = OK
kv.mu.Unlock()
return
}
opContext := NewOpContext(op, term)
kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)] = opContext
kv.mu.Unlock()
_, _, ok := kv.rf.Start(*op)
defer func() {
//DPrintf("server PutAppend, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
kv.mu.Lock()
delete(kv.opContextMap, UniqueRequestId(op.ClientId, op.RequestId))
kv.mu.Unlock()
}()
if !ok {
return
}
//阻塞等待
select {
case <-opContext.WaitCh:
reply.Err = OK
case <-time.After(time.Millisecond * 1000):
reply.Err = ErrTimeout
}
}
请求执行前需要判断是否已经执行过了,如果已经执行,就直接返回OK。如果没执行过,则通过raft同步到其他节点上,然后阻塞在waitCh上等待该条日志提交,需要有超时机制,不要一直阻塞。最后执行结束时需要删除掉上下文,要不然会有内存泄漏。
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
op := &Op{
ClientId: args.ClientId,
RequestId: args.RequestId,
OpType: OpTypeGet,
Key: args.Key,
StartTimestamp: time.Now().UnixMilli(),
}
//Append不能先append然后将日志传给raft
term := 0
isLeader := false
if term, isLeader = kv.rf.GetState(); !isLeader {
reply.Err = ErrWrongLeader
return
}
start := time.Now()
defer func() {
DPrintf("server Get cost: %v, node: %v, leaderId: %d", time.Now().Sub(start).Milliseconds(), kv.me, kv.rf.LeaderId())
}()
kv.mu.Lock()
opContext := NewOpContext(op, term)
kv.opContextMap[opContext.UniqueRequestId] = opContext
kv.mu.Unlock()
_, _, ok := kv.rf.Start(*op)
defer func() {
//DPrintf("server Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
kv.mu.Lock()
delete(kv.opContextMap, opContext.UniqueRequestId)
kv.mu.Unlock()
}()
if !ok {
reply.Err = ErrWrongLeader
return
}
//阻塞等待
select {
case c := <-opContext.WaitCh:
reply.Err = OK
reply.Value = c
case <-time.After(time.Millisecond * 1000):
reply.Err = ErrTimeout
}
}
为了实现线性读,将读请求当成写请求执行,流程大体和写请求一样,但不需要幂等性逻辑。
//串行写状态机
func (kv *KVServer) applyStateMachineLoop() {
for !kv.killed() {
select {
case applyMsg := <-kv.applyCh:
if applyMsg.CommandValid {
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
op := applyMsg.Command.(Op)
//保证幂等性
if op.RequestId <= kv.lastRequestIdMap[op.ClientId] {
return
}
switch op.OpType {
case OpTypePut:
kv.kvStore[op.Key] = op.Value
kv.lastRequestIdMap[op.ClientId] = op.RequestId
case OpTypeAppend:
kv.kvStore[op.Key] += op.Value
kv.lastRequestIdMap[op.ClientId] = op.RequestId
case OpTypeGet:
//Get请求不需要更新lastRequestId
}
DPrintf("op: %v, value: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), kv.kvStore[op.Key], kv.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(kv.kvStore))
val := kv.kvStore[op.Key]
//使得写入的client能够响应
if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
c.WaitCh <- val
}
}()
}
}
}
}
另起协程不断的从applyCh中读取已提交的日志。首先,幂等性逻辑,如果op.RequestId <= kv.lastRequestIdMapop.ClientId,则表示该请求已经执行过,不能再次执行。然后更新状态机,最后唤醒写协程。
在Lab 2D中已经讲解过raft如何进行压缩、同步snapshot等,而在本实验主要考虑:
上文讲过需要保存压缩点、状态机、去重表。
func (kv *KVServer) maybeSnapshot(index int) {
if kv.maxraftstate == -1 {
return
}
if kv.persister.RaftStateSize() > kv.maxraftstate {
DPrintf("maybeSnapshot starting, index: %v", index)
kv.rf.Snapshot(index, kv.encodeSnapshot(index))
}
}
//上层加锁
func (kv *KVServer) encodeSnapshot(lastIncludedIndex int) []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.kvStore)
e.Encode(lastIncludedIndex)
e.Encode(kv.lastRequestIdMap) //持久化每个client的最大已执行过的写请求
return w.Bytes()
}
//上层加锁
func (kv *KVServer) decodeSnapshot(snapshot []byte) bool {
if len(snapshot) == 0 {
return true
}
r := bytes.NewBuffer(snapshot)
d := labgob.NewDecoder(r)
if err := d.Decode(&kv.kvStore); err != nil {
return false
}
if err := d.Decode(&kv.lastIncludedIndex); err != nil {
return false
}
//持久化每个client的最大已执行过的写请求
if err := d.Decode(&kv.lastRequestIdMap); err != nil {
return false
}
return true
}
//串行写状态机
func (kv *KVServer) applyStateMachineLoop() {
for !kv.killed() {
select {
case applyMsg := <-kv.applyCh:
if applyMsg.CommandValid {
//...省略
} else if applyMsg.SnapshotValid {
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
//将snapshot apply到状态机
if kv.decodeSnapshot(applyMsg.Snapshot) {
//截断日志
kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
}
}()
}
}
DPrintf("snapshot size: %v, stateMachine: %v", kv.persister.SnapshotSize(), mr.Any2String(kv.kvStore))
}
}
在将日志apply到状态机时根据日志数据量决定是否日志压缩。
func (kv *KVServer) applyStateMachineLoop() {
for !kv.killed() {
select {
case applyMsg := <-kv.applyCh:
if applyMsg.CommandValid {
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
op := applyMsg.Command.(Op)
//保证幂等性
if op.RequestId <= kv.lastRequestIdMap[op.ClientId] {
return
}
//过滤掉snapshot前的日志
if applyMsg.CommandIndex <= kv.lastIncludedIndex && op.OpType != OpTypeGet {
if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
c.WaitCh <- "0"
}
return
}
switch op.OpType {
case OpTypePut:
kv.kvStore[op.Key] = op.Value
kv.lastRequestIdMap[op.ClientId] = op.RequestId
//尝试触发日志压缩
kv.maybeSnapshot(applyMsg.CommandIndex)
case OpTypeAppend:
kv.kvStore[op.Key] += op.Value
kv.lastRequestIdMap[op.ClientId] = op.RequestId
//尝试触发日志压缩
kv.maybeSnapshot(applyMsg.CommandIndex)
case OpTypeGet:
//Get请求不需要更新lastRequestId
}
DPrintf("op: %v, value: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), kv.kvStore[op.Key], kv.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(kv.kvStore))
val := kv.kvStore[op.Key]
//使得写入的client能够响应
if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
c.WaitCh <- val
}
}()
} else if applyMsg.SnapshotValid {
func() {
kv.mu.Lock()
defer kv.mu.Unlock()
if kv.decodeSnapshot(applyMsg.Snapshot) {
kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
}
}()
}
}
DPrintf("snapshot size: %v, stateMachine: %v", kv.persister.SnapshotSize(), mr.Any2String(kv.kvStore))
}
}
线性读方案有很多,本文以写请求形式处理读请求,简单但性能不是很好。本文推荐另外一种方案的实现,读Follower,并且从Leader复制此时已经提交过的日志,性能比前者要好一些,但多了一次与Leader的交互,该方案可以留到日后优化。
每个请求的处理时间比预期高一些,后续打算采用成组提交机制来批量处理写操作。
这个实验在6.824 Lab2D raft上实现一个single group的键值数据库,支持Get、Put、Append三种操作,能够保证客户端幂等性和线性读。本实验通过记录上下文和每个client的requestId来保证幂等性,以写请求的逻辑处理读请求来实现线性读。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。