Skip to content

Latest commit

 

History

History
370 lines (317 loc) · 15 KB

Lab3_KVRaft_3B.md

File metadata and controls

370 lines (317 loc) · 15 KB
title date category tags
MIT6.5840(6.824) Lab3: 分布式KV数据库 3B
2024-02-03 10:19:25 -0800
CS课程笔记
MIT6.5840(6.824) 2023
Lab笔记
分布式系统
Go

本文将介绍lab3B部分的实现, lab3B要求基于在lab3A的基础上实现快照, 仅从lab本省来讲其实很简单, 可以说是目前我做的所有lab里面难度最小的一个模块了, 但实际上遇到了很多困难, 调试的时间比lab3A还久, 核心原因就是自己的lab2实现的raft底层有一些问题(真真真真太折磨了, 即时过了所有单元测试, 还是时不时地在后续的lab暴露出问题来, 并且这些问题还挺难定位, 得仔细分析海量的log输出才可以)

Lab文档见: http:https://nil.csail.mit.edu/6.5840/2023/labs/lab-kvraft.html

我的代码: https://github.com/GFX9/MIT6.5840/tree/lab3B

1 快照的生成和传递逻辑

简单说, lab3B就是要在底层raftlog过大时生成快照并截断日志, 从而节省内存空间, 并且快照会持久化存储到本地。因此, 原来的代码结构只需要在以下几个方面做出调整:

  1. 需要再某个地方定期地判断底层raft的日志大小, 决定是否要生成快照, 生成快照直接调用我们在lab2中实现的接口Snapshot即可
  2. 由于follower的底层raft会出现无法从Leader获取log的情况, 这时Leader会发送给followerraft层一个快照, raft层会将其上交给server, server通过快照改变自己的状态机
  3. server启动时需要判断是否有持久化的快照需要加载, 如果有就加载

2 代码实现

2.1 快照应该包含什么?

快照首先应该包含的肯定是内存中的KV数据库, 也就是自己维护的map, 但是还应该包含对每个clerk序列号的记录信息, 因为从快照恢复后的server应该具备判断重复的客户端请求的能力, 同时也应该记录最近一次应用到状态机的日志索引, 凡是低于这个索引的日志都是包含在快照中

因此, server结构体需要添加如下成员:

type KVServer struct {
    ...
	persister    *raft.Persister
	lastApplied  int
}

2.2 加载和生成快照

通过上述分析, 快照的加载和生成就很简单了,代码如下:

func (kv *KVServer) GenSnapShot() []byte {
	// 调用时必须持有锁mu
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)

	e.Encode(kv.db)
	e.Encode(kv.historyMap)

	serverState := w.Bytes()
	return serverState
}

func (kv *KVServer) LoadSnapShot(snapShot []byte) {
	// 调用时必须持有锁mu
	if len(snapShot) == 0 || snapShot == nil {
		ServerLog("server %v LoadSnapShot: 快照为空", kv.me)
		return
	}

	r := bytes.NewBuffer(snapShot)
	d := labgob.NewDecoder(r)

	tmpDB := make(map[string]string)
	tmpHistoryMap := make(map[int64]*Result)
	if d.Decode(&tmpDB) != nil ||
		d.Decode(&tmpHistoryMap) != nil {
		ServerLog("server %v LoadSnapShot 加载快照失败\n", kv.me)
	} else {
		kv.db = tmpDB
		kv.historyMap = tmpHistoryMap
		ServerLog("server %v LoadSnapShot 加载快照成功\n", kv.me)
	}
}

GenSnapShotLoadSnapShot分别生成和加载快照, 唯一需要注意的就是这两个函数应当在持有锁时才能调用

2.3 生成快照的时机判断

由于ApplyHandler协程会不断地读取raft commit的通道, 所以每收到一个log后进行判断即可:

func (kv *KVServer) ApplyHandler() {
	for !kv.killed() {
		log := <-kv.applyCh
		if log.CommandValid {
			...
			// 如果在follower一侧, 可能这个log包含在快照中, 直接跳过
			if log.CommandIndex <= kv.lastApplied {
				kv.mu.Unlock()
				continue
			}
			...
			// 每收到一个log就检测是否需要生成快照
			if kv.maxraftstate != -1 && kv.persister.RaftStateSize() >= kv.maxraftstate/100*95 {
				// 当达到95%容量时需要生成快照
				snapShot := kv.GenSnapShot()
				kv.rf.Snapshot(log.CommandIndex, snapShot)
			}
			kv.mu.Unlock()
		}
        ...
	}
}

这里还需要进行之前提到的判断: 低于lastApplied索引的日志都是包含在快照中, 在尽显lab3A的操作之后, 再判断是否需要生成快照, 在我的实现中, 如果仅仅比较maxraftstatepersister.RaftStateSize()相等才生成快照的话, 无法通过测例, 因为可能快照RPC存在一定延时, 所以我采用的手段是只要达到阈值的95%, 就生成快照

2.4 加载快照的时机判断

首先启动时需要判断是否需要加载快照, 然后就是ApplyHandler从通道收到快照时需要判断加载, 都很简单:

func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
	...
	kv.persister = persister
    ...

	// 先在启动时检查是否有快照
	kv.mu.Lock()
	kv.LoadSnapShot(persister.ReadSnapshot())
	kv.mu.Unlock()

	go kv.ApplyHandler()

	return kv
}
func (kv *KVServer) ApplyHandler() {
	for !kv.killed() {
		log := <-kv.applyCh
		if log.CommandValid {
            ...
		} else if log.SnapshotValid {
			// 日志项是一个快照
			kv.mu.Lock()
			if log.SnapshotIndex >= kv.lastApplied {
				kv.LoadSnapShot(log.Snapshot)
				kv.lastApplied = log.SnapshotIndex
			}
			kv.mu.Unlock()
		}
	}
}

3 调试Bug和修复

这里才是这个lab的重头戏, 我在完成上述所有修改后, 会在TestSnapshotUnreliable3B这个单元测试中大概率卡死, 一直会卡到go默认的十分钟单元测试时间截止后才报错退出, 在反复检查了死锁持锁接发通道消息等常见问题并确认无误后, 我再次观察超时报错的堆栈信息和日志输出, 得到结论就是: raft层因为无法承受测试的高并发程度而导致大量的RPC请求失败, 从而导致clerk无限重复发送请求RPC(我的实现是RPC请求失败)就重试

......陷入了沉思, 自己还是菜啊, 但代码还得慢慢修, 总不能把raft推倒重来吧......

我最后分别从raft层和server层进行了优化

3.1 raft层优化

3.1.1 修复过多的AppendEntries RPC

通过对日志的调试发现, AppendEntries RPC数量太多了, 这是因为我在lab3A中做了如下修改:

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	...
	defer func() {
		rf.ResetHeartTimer(1)
	}()

	return rf.VirtualLogIdx(len(rf.log) - 1), rf.currentTerm, true
}

也就是在接受一个请求并追加一个log后立即发送AppendEntries RPC, 但是如果在高并发的场景下, 新的请求绵绵不断地到来, 每到达一个请求都发一个RPC, 并且每个RPC可能只包含了长度为1的日志切片, 这是不太合理的设计, 过多的RPC使得raft无法及时处理而出现RPC卡死的情况, 因此, 我手动修改了重置定时器的时间为15ms, 这个值比心跳间隔小很多, 但又不是很小, 足以在满足响应速度的前提下摊销多个命令, 使一次AppendEntries RPC包含多个新的日志项:

func (rf *Raft) Start(command interface{}) (int, int, bool) {
	...
	defer func() {
		rf.ResetHeartTimer(15)
	}()

	return rf.VirtualLogIdx(len(rf.log) - 1), rf.currentTerm, true
}

至于为什么是15ms..., 我自己也说不出理由, 随便设的, 比心跳小很多, 但又不太小就是了, 本质目的就是积攒多个AppendEntries RPC后一次性发送, 避免AppendEntries RPC数量过大

3.1.2 修复过多的InstallSnapshot RPC

在我原来的设计中, InstallSnapshot RPC的发送有2中情形:

  1. handleAppendEntries在处理AppendEntries RPC回复时发现follower需要的日志项背快照截断, 立即调用go rf.handleInstallSnapshot(serverTo)协程发送快照
  2. 心跳函数发送时发现PrevLogIndex < rf.lastIncludedIndex, 则发送快照

这和之前的情形类似, 在高并发的场景下,followerLeader之间的日志复制也很频繁, 如果某一个日志触发了InstallSnapshot RPC的发送, 接下来连续很多个日志也会触发InstallSnapshot RPC的发送, 因为InstallSnapshot RPC的发送时间消耗更大, 这样以来, 又加大了raft的压力, 所以, 我对InstallSnapshot RPC的发送做出修改:

  1. handleAppendEntries在处理AppendEntries RPC回复时发现follower需要的日志项背快照截断, 仅仅设置rf.nextIndex[serverTo] = rf.lastIncludedIndex, 这将导致下一次心跳时调用go rf.handleInstallSnapshot(serverTo)协程发送快照
  2. 心跳函数发送时发现PrevLogIndex < rf.lastIncludedIndex, 则发送快照

代码如下:

func (rf *Raft) handleAppendEntries(serverTo int, args *AppendEntriesArgs) {
    ...
	if reply.Term == rf.currentTerm && rf.role == Leader {
		// term仍然相同, 且自己还是leader, 表名对应的follower在prevLogIndex位置没有与prevLogTerm匹配的项
		// 快速回退的处理
		if reply.XTerm == -1 {
			// PrevLogIndex这个位置在Follower中不存在
			DPrintf("leader %v 收到 server %v 的回退请求, 原因是log过短, 回退前的nextIndex[%v]=%v, 回退后的nextIndex[%v]=%v\n", rf.me, serverTo, serverTo, rf.nextIndex[serverTo], serverTo, reply.XLen)
			if rf.lastIncludedIndex >= reply.XLen {
				// 由于snapshot被截断
				// 下一次心跳添加InstallSnapshot的处理
				rf.nextIndex[serverTo] = rf.lastIncludedIndex
			} else {
				rf.nextIndex[serverTo] = reply.XLen
			}
			return
		}
		...
		if i == rf.lastIncludedIndex && rf.log[rf.RealLogIdx(i)].Term > reply.XTerm {
			// 要找的位置已经由于snapshot被截断
			// 下一次心跳添加InstallSnapshot的处理
			rf.nextIndex[serverTo] = rf.lastIncludedIndex
		} else if rf.log[rf.RealLogIdx(i)].Term == reply.XTerm {
			...
		} else {
			// 之前PrevLogIndex发生冲突位置时, Follower的Term自己没有
			if reply.XIndex <= rf.lastIncludedIndex {
				// XIndex位置也被截断了
				// 添加InstallSnapshot的处理
				rf.nextIndex[serverTo] = rf.lastIncludedIndex
			} else {
				rf.nextIndex[serverTo] = reply.XIndex
			}
		}
		return
	}
}

3.2 server层优化

server层应该尽量减小对raft层的接口的调用, 因为大量的接口调用将获取raft层的一把大锁, 从而阻碍RPC的响应

3.2.1 调用Start前过滤

之前的实现中, 无论是Put/Append还是Get, 都是封装成OP结构体, 在HandleOp中一股脑调用Start扔给raft层处理, 然后在ApplyHandler处进行去重判断, 现在可以在调用raft层的Start之前就从historyMap中判断是否有历史记录可以直接返回:

func (kv *KVServer) HandleOp(opArgs *Op) (res Result) {
	// 先判断是否有历史记录
	kv.mu.Lock()
	if hisMap, exist := kv.historyMap[opArgs.Identifier]; exist && hisMap.LastSeq == opArgs.Seq {
		kv.mu.Unlock()
		ServerLog("leader %v HandleOp: identifier %v Seq %v 的请求: %s(%v, %v) 从历史记录返回\n", kv.me, opArgs.Identifier, opArgs.OpType, opArgs.Key, opArgs.Val)
		return *hisMap
	}
	kv.mu.Unlock()
    ...
}

3.2.2 减少GetState的调用

ratfGetState也会获取锁, 从而阻碍RPC的响应速度, 我原来的实现中, GetState会在2个地方调用:

  1. GetPutAppend调用GetState判断是否是leader, 不是则返回错误
  2. ApplyHandler在通过通道唤醒HandleOp时, 需要判断当前节点是不是leader, 不是leader则不需要唤醒

以上2不操作看似合理, 但实际上是冗余的:

  1. 首先, GetPutAppend在后续的HandleOp会调用Start, Start也会因为当前节点不是leader而返回, 所以GetState是冗余的, 反而阻碍RPC响应速度
  2. 其次, ApplyHandler在通过通道唤醒HandleOp时, 日志项本身有term的记录, HandleOp会调用Start时也会获取那时的term, HandleOp只需要在被唤醒后比较前后的term是否相同, 就可以判断出当前的节点是不是一个过时的leader

以上2处修改很简单, 由于是删代码而不是新增和修改, 就不贴代码了, 感兴趣可以看仓库

3.2.3 clerksleep再重试

这个修改也很简单, 如果server返回了需要重试类型的错误, clerksleep一会, 再重试, 代码如下:

func (ck *Clerk) Get(key string) string {
	args := &GetArgs{Key: key, Seq: ck.GetSeq(), Identifier: ck.identifier}

	for {
		reply := &GetReply{}
		ok := ck.servers[ck.leaderId].Call("KVServer.Get", args, reply)
		if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated {
			if !ok {
				reply.Err = ERRRPCFailed
			}
			if reply.Err != ErrNotLeader {
				DPrintf("clerk %v Seq %v 重试Get(%v), Err=%s", args.Identifier, args.Key, args.Key, reply.Err)
			}

			ck.leaderId += 1
			ck.leaderId %= len(ck.servers)
			time.Sleep(RpcRetryInterval)
			continue
		}

		switch reply.Err {
		case ErrChanClose:
			DPrintf("clerk %v Seq %v 重试Get(%v), Err=%s", args.Identifier, args.Key, args.Key, reply.Err)
			time.Sleep(time.Microsecond * 5)
			continue
		case ErrHandleOpTimeOut:
			DPrintf("clerk %v Seq %v 重试Get(%v), Err=%s", args.Identifier, args.Key, args.Key, reply.Err)
			time.Sleep(RpcRetryInterval)
			continue
		case ErrKeyNotExist:
			DPrintf("clerk %v Seq %v 成功: Get(%v)=%v, Err=%s", args.Identifier, args.Key, args.Key, reply.Value, reply.Err)
			return reply.Value
		}
		DPrintf("clerk %v Seq %v 成功: Get(%v)=%v, Err=%s", args.Identifier, args.Key, args.Key, reply.Value, reply.Err)

		return reply.Value
	}
}

func (ck *Clerk) PutAppend(key string, value string, op string) {
	// You will have to modify this function.
	args := &PutAppendArgs{Key: key, Value: value, Op: op, Seq: ck.GetSeq(), Identifier: ck.identifier}

	for {
		reply := &PutAppendReply{}
		ok := ck.servers[ck.leaderId].Call("KVServer.PutAppend", args, reply)
		if !ok || reply.Err == ErrNotLeader || reply.Err == ErrLeaderOutDated {
			if !ok {
				reply.Err = ERRRPCFailed
			}
			if reply.Err != ErrNotLeader {
				DPrintf("clerk %v Seq %v 重试%s(%v, %v), Err=%s", args.Identifier, args.Key, args.Op, args.Key, args.Value, reply.Err)
			}

			ck.leaderId += 1
			ck.leaderId %= len(ck.servers)
			time.Sleep(RpcRetryInterval)
			continue
		}

		switch reply.Err {
		case ErrChanClose:
			DPrintf("clerk %v Seq %v 重试%s(%v, %v), Err=%s", args.Identifier, args.Key, args.Op, args.Key, args.Value, reply.Err)
			time.Sleep(RpcRetryInterval)
			continue
		case ErrHandleOpTimeOut:
			DPrintf("clerk %v Seq %v 重试%s(%v, %v), Err=%s", args.Identifier, args.Key, args.Op, args.Key, args.Value, reply.Err)
			time.Sleep(RpcRetryInterval)
			continue
		}
		DPrintf("clerk %v Seq %v 成功: %s(%v, %v), Err=%s", args.Identifier, args.Key, args.Op, args.Key, args.Value, reply.Err)

		return
	}
}

4 测试

  1. 执行测试命令测试lab3B
go test -run 3B

结果如下: Alt text 可以看出, 取消立即发送心跳广播导致前3个测例满了不少, 但也就凑活吧(不想优化了, 麻木了)

该代码经过150次测试没有报错

  1. 执行测试命令测试整个lab3
go test -run 3

结果如下: Alt text 该代码经过150次测试没有报错

  1. 修改后再次测试lab2
cd ../raft/
go test -run 2

结果如下: Alt text

该代码经过150次测试没有报错