雄关漫道真如铁,
而今迈步从头越。
——《 忆秦娥⋅ 娄山关 》
完整代码见: https://github.com/SnowLegend-star/6.824
Part A: The Controller and Static Sharding (easy)
Part A有段话需要重点理解
Join RPC 由管理员用于添加新的副本组。它的参数是从唯一的、非零的副本组标识符(GID)到服务器名称列表的映射(a set of mappings)。分片控制器应通过创建一个包含新副本组的新配置来响应。新配置应尽可能均匀地将分片分配到整个副本组集中,并且应移动尽可能少的分片以达到这个目标。如果 GID 不是当前配置的一部分,则 shardctrler 应允许重复使用该 GID(即应允许 GID 加入,然后离开,然后再次加入)。
同时,我这里重点梳理下shardCtrler在server端的几个重要概念:
- Config:每当server端处理一次operation(Join、Leave、Move)时,它的Config都会发生改变,Num线性递增。同时对于新的Config,要用深拷贝的方式从旧Config内复制Shards与Groups这两个元素。原因是直接用“=”对slice与map复制时,属于引用复制,复制后的新变量与被复制变量本质指向同一个容器。
- Replica group:common.go中提到“A Config (configuration) describes a set of replica groups, and the replica group responsible for each shard”,这里shardCtrler的结构其实和Spanner是类似的。多个replica group共同组成了shardCtrler,每个replica group内部又包含几个服务器。replica group内部的server通过底层raft协议进行通信,而replica group之间又通过rpc调用进行shard的分配迁移。
- Shard:每个shard由一个replica group进行处理,一个replica group可以处理多个不同的shard。Part A我们要做的就是将shard分配给所有的replica group,而不需要考虑shard包含的数据应当如何传输迁移(Part B的内容)。
下面这张图就很很形象地说明shardCtrler的具体架构。
了解shardCtrler的构造之后,其实part A就通透了,剩下的问题就是如何处理负载均衡这个问题——即如何通过移动最少的shard来实现Config的变更。
想要解决这个问题,可以从最简单的情况着手,往shardCtrler中添加一个group后,shard该如何分配?再添加第二个、第三个呢?一个关键的思路是每个group获得的shards数量属于[average, average+1),多的也不再赘述。Talk is cheap, show you my code.
func (sc *ShardCtrler) rebalanceShards(newConfig *Config) {groupNum := len(newConfig.Groups) //这个集群里面有多少组 0 2 3直接算2组hasAssignedExtra := make(map[int]bool) //记录每个gid是否已经被分配了extraShard//组数重新归0的特殊情况if groupNum == 0 {return}//新的配置中,每个Server可以有拥有的shards的数量是:[average, average+1)averageShards := NShards / groupNumextraShards := NShards % groupNumshardPerGroup := make(map[int]int)//统计原来的Shards对应的gid的情况for _, gid := range newConfig.Shards {if gid != 0 {shardPerGroup[gid]++}}//获得升序的gid数组groupList := make([]int, 0, groupNum)for gid := range newConfig.Groups {groupList = append(groupList, gid)}sort.Ints(groupList)//遍历Shards设置可以被分配的shard 这段代码十分精妙availableShards := []int{}for shardId, gid := range newConfig.Shards {if !hasAssignedExtra[gid] {if gid == 0 || shardPerGroup[gid] > averageShards+boolToInt(extraShards > 0) {availableShards = append(availableShards, shardId)shardPerGroup[gid]--newConfig.Shards[shardId] = 0} else if shardPerGroup[gid] == averageShards+boolToInt(extraShards > 0) {extraShards--hasAssignedExtra[gid] = true}}}//将未分配的shard进行分配 gid0不参与分配for _, gid := range groupList {if gid == 0 {continue}// 计算该 group 应分配的 shard 数量expectedShards := averageShardsif extraShards > 0 {expectedShards++extraShards--}for shardPerGroup[gid] < expectedShards && len(availableShards) > 0 {shard := availableShards[len(availableShards)-1]availableShards = availableShards[:len(availableShards)-1]newConfig.Shards[shard] = gidshardPerGroup[gid]++}}}
Part B: Shard Movement (hard)
整个Part B可以分为两个大块。第一个大块是对来自Clerk的command进行处理;第二个大块则是实现分片迁移。先分析处理command的工作流程,就拿Put(key, value)举例:
1、Clerk计算key所在的shardId,根据Config得到这篇shardId所在的gid。
2、遍历该group的所有servers,与其中的Leader进行通信
3、Leader进行去重判断和Config失效性判断后,将Put传入raft进行一致性同步
4、接收同步后的Put并进行处理,最后判断一次Config的时效性即可向Clerk返回。
值得一提的是如何实现每个server的kvstorage呢?
Q:同一个group内部的servers是如何同步kvstorage的呢?明确kvstorage和shardData的关系。
A:在同一个group内,Leader通过将来自Clerk的command发送至raft进行一致性同步,接下来所有的Server都可以从raft peer中收到这个command,再将这个command应用至自己的状态机。Nshards个shardData组成完整的kvstorage。
其他实现过程与part A类似,重点是处理分片迁移。
先看实验说明里面的几句重点内容:
1、在配置更改期间实现分片迁移。确保副本组中的所有服务器在相同的操作序列点上执行迁移,以便它们要么同时接受客户端请求,要么同时拒绝客户端请求。
可以在group进行分片迁移的时候,进行阻塞。
2、服务器需要在配置更改期间相互发送RPC以传输分片。
看完实验说明,我们首先就要明确一点:无论是command还是分片迁移,都应该由group内的Leader进行处理,再与Follower进行一致性同步。这次给出的hints涉及内容跨度过大,就不率先分析hints了。
注意shardKV和shardCtrler的通信问题。
- Leader 从 shardCtrler 拉取配置:副本组的 Leader 定期从 shardCtrler 获取最新配置(约每100ms),检查当前配置是否与上次记录的配置不同。
- 分片迁移协调:一旦 Leader 检测到新的配置要求迁移某些分片,它会负责协调迁移工作。迁移数据的 RPC 请求由 Leader 发起,与其他组的 Leader 或组内的服务器通信。
- 同步迁移信息:为了确保一致性,Leader 将迁移指令或数据变更作为日志条目提交到其 Raft 共识模块中,然后由其他服务器应用此更改,确保整个组对分片的当前状态一致。
在实现分片迁移的时候,先搭好一个宏观的通信框架是很重要的。由于三个group不是同一时间查询Config,所以如果在shardCtrler在短时间内进行连续的配置变化时,三个group在同一个时间段查询得到的ConfigNum也不尽相同。例如在处理完Config2后,group 100查询到了Config3,group101查询到了Config3,而group102查询的是Config4。这时摆在我们面前的有两条路。
①一旦三组group的Config不同,则Config更旧的group重新查询最新的Config。但这样会忽略对旧Config的处理。在刚才的例子中,group100和group101发现自己的Config过时后,会跳过对Config3的处理,重新查询并处理Config4。
②三组group对每个Config都进行处理,而不是跳跃式处理Config。就像下图一般。
在三组group处理完Config2后,G100在第三次查询是Config8,而G101和G102都只查询到Config3。按照方案1,G101和G102会跳跃至Config8与G100同步,忽略中间几个Config。但是这种做法有两大弊端,首先是需要保存的中间状态明显更多。需要保存上一次成功同步处理的Config,还需要对已接收的shardData进行记录以便回退。其次,如果shardCtrler的状态处于持续的变化中,那三组group可能需要花费很长一段时间才能完全同步Config。
此时G100继续进入Config3进行处理,而不是令G101和G102迅速更新到Config8。三者处理完Config3后,继续处理Config4,以此类推。尽管这种方式可能要稍慢于前者,但是胜在需要保存的中间状态更少,处理过程更加的稳定。
我一开始就是选择的方案1,后面发现需要考虑的情况实在是太多了,不胜其烦,稍不留神就会出bug。
由于这次遇到的bug实在是太多,就挑连个我认为最具有代表性的来分享吧。
HandleShardData获取锁和applier获取锁直接冲突了,且是嵌套依赖。直接导致了死锁问题。
在group发送自己的shard时,我在这里对整个函数进行了加锁操作。但是,将operation提交至raft后,applier()接受来自raft的msg也需要获得锁。就会导致Handler()中的
index, _, _ := kv.rf.Start(operation)result := kv.waitForResult(index)if result.Err != OK {kv.DebugLeader(dConfig, "Server %v同步validNum %v失败: %v", kv.me, kv.validConfigNum, result.Err)}
result永远的都无法得到结果,从而产生死锁。
总的来说这次的Lab确实是难度极大,容易遇到各种各样的问题。可恶的是这个问题就自己遇到了,不具有一般性,那就只能对着log慢慢看了,让我也是几经道心受损。最后,能用方案②就用方案②,方案①实现起来苦难重重,我用方案①写到了unreliable那个测试发现难以进行下去,最后也是重构代码采用方案②。