一. 概述

lab4分为两部分,第一部分是master,负责管理所有的分片shards和raft groups元数据,在src/shardmaster内部实现;第二部分是raft group的实现,在src/shardkv中。

这个lab是实现一个简易版的分布式KV数据库,他具备一个和TiKV,Spanner,HBase等系统类似的系统:master + 多副本的分片,而master也是一个能够容错的group。它的多副本是用raft协议实现的,可以复用lab3 kvraft部分的代码,因为他们其实是在做同样的事,即使逻辑上有所区别。

这个lab还要实现shards的”负载均衡“,即将所有的shards尽量均分到每个raft group,获得shards最多的group的数量最多比获得shards最少的group的数量多一个。因此当出现了新的raft group或者移除某group的时候,需要进行shards的调整。

二. Part A: The Shard Master

Part A就是实现Master,这个Master提供了一些RPC接口供Client调用,从而让管理者能够修改集群的状态。

当Client调用RPC修改Master的状态之后(即任何元数据变更时),命令会发送到group的其他副本,当这个日志被apply时,就会得到一个新的Config,同时,master会保留每个版本的Config。

在master中,表示当前状态的数据结构是Config,这个结构的定义为

1
2
3
4
5
type Config struct {
	Num    int              // config number
	Shards [NShards]int     // shard -> gid 
	Groups map[int][]string // gid -> servers[]
}

包括了每个shard对应的group id(简称gid),以及每个raft group的每个server的名字列表(string表示)。

内容介绍

master提供四个RPC调用给Client,分别是 Join, Leave, Move, and Query RPCs。

  1. Join

    添加新的复制组. 它的参数由一系列从gids到server列表的集合组成, 代表几个gids背后的groups。新配置应尽可能均匀地将碎片划分为完整组,并且应尽可能少的碎片来实现这一目标。

  2. Leave

    Leave的参数是一个gids的列表, 意思是master需要撤销这些gids, 然后将他们的shards分配给剩下的groups。新配置应尽可能均匀地在groups中划分shards,并且应尽可能少地移动以实现目标。

  3. Move

    参数是一个gid和一个shard, 将这个shard分配给group。

  4. Query

    Query的参数是一个配置编号. 如果Query(-1), 则它是一个线性化的查询, 需要等它前面所有动作都执行完后, 返回结果。

    Query和其他三个不同点在于它是只读的,不会产生新的配置,因为它不会影响元数据。当然,Query提交到leader之后,也需要获取大多数节点的响应才可以返回结果。

初始的时候shards对应的gid都需要被设置为0,而正式的raft group的gid是从1开始的,0代表没有raft group。

lab文档中提出了几个hints:

  1. 可以从copy lab3中实现的kvraft server的代码,因为master也是这样的kvraft server
  2. 需要实现避免RPC重复逻辑。在lab3中有这样的test,但是lab4 shardmaster中没有此test
  3. 不要简单地使用一个map对另一个map进行赋值,因为它是引用类型,需要遍历它所有的key-value,一个一个地赋值
  4. 可以使用go test -race检测代码(很管用)

实现思路

这个master的实现思路并不复杂,因为如果做了lab3 kvraft的话,可以看出shardmaster的结构基本是一致的,只是换了一种形式,写不是lab3中简单的Put、Append,而是以Join、Leave、Move三种RPC来实现,而他们所做的工作,就是根据语义变更元数据的状态,包括基本实现和负载均衡。

在lab3中,每一个Put/Append在leader server接收到操作之后,就发送给followers,然后获取大多数节点同意的reply之后,就进行commit以及apply,apply的动作就是将修改kv数据内容。而在shardmaster中,它并不是简单的修改kv的内容,而是根据RPC的语义来修改config的内容。虽然不一样,但逻辑大致相同。

例如,对于Join,在apply之后,执行的动作就是将其参数中带有的 gid -> servers 列表添加到新的config的Groups中。Join带来新的group,大概率会触发负载均衡,于是需要调整shards。

如何以最少shard移动的成本进行负载均衡呢

其思路就是将shards最多的group的shard分配给最少shards的group,直到满足均衡的条件。可以使用一个最大堆,不断的获取拥有shards最多的group,然后取出其一个shard放到拥有shard最少的group,不断地这样操作,直到所有的group要么有x个shard,要么有x+1个shard,即达到了平衡的条件。

需要注意的是,shardmaster应当提供Client并发访问的能力,因此需要使用Sync.Mutex,对临界区进行保护。

主要保护的地方是shardmaster的configs,因为多Clients操作时,可能对configs同时进行读取和写入,从而导致竞争的发生。

Test解读

这个Part看起来test很少,只有两个:TestBasic, TestMulti,但是其实一个Test里包含了多个Test,只不过没有像之前一样每个test都单独写出来。

例如TestBasic中,先测试了 Leave和Join,以及Query,这是一个简单的先Join,再Join,再Leave,再Leave,中间穿插着一些Query的查询来进行check。

然后TestBasic中开启了一个循环,依次shutdown某个server,然后再进行Query查询config是否符合预期。这个是测试当leader被kill之后,剩下的followers是否能选出leader并继续提供一致的服务。

之后TestBasic中对Move的逻辑进行了测试。

最后,测试了并发的Leave和Join,这里要注意使用mutex对临界区的保护。

在第二个测试TestMulti中,则是一个更复杂的多协程并发测试,除了测试并发环境下Join等的功能外,还对multijoins和multileaves情况下最少的shards移动进行了测试。

三. Part B: Sharded Key/Value Server

和shardmaster比起来,Shardkv更像是一个key-value server,所以它对外提供的接口也是和kvraft类似的 Put/Append。不过,这个Part的hints比Part A要多很多。

内容介绍

shardkv是一个分片且容错的键/值存储系统,需要修改shardkv/client.go, shardkv/common.go, and shardkv/server.go.

每一个shardkv server都是整个replica group的一部分,它会负责一些shards,提供Get, Put, Append操作接口。可以使用在client.go中的key2shard()方法来找到一个key属于哪个shard。

存储系统必须为应用程序提供一个线性化的接口。因此,需要实现shardkv/client.go中的Clerk.Get(), Clerk.Put(), Clerk.Append(),根据raft论文,线性化语义是指强一致性,也就是无论是Put还是Get,都需要从leader执行并得到大多数节点的认可。为了方便实现,将Get的请求也作为日志entry写入,并在其apply的时候获取线性化的结果。

一个操作需要成功,不仅需要shardkv server中的大多数servers alive,还需要能够和shardmaster中的大多数server进行通信。

需要注意的点:

  • shardkv如何和shardmaster通信
  • shardmaster发生config变更时,如何通知shardkv,或者shardkv是否会每次都获取config

两个Task:

Task1

实现静态的, 最基本的shardkv server服务, 需要获取config变更, 但是这个变更不会有shard的迁移的变更。需要让server检测出什么时候一个config发生了(新的config), 这时自己可能要为新的shards提供服务。

Task2

这部分需要实现shard迁移,而不仅是静态的sharding。需要watch config的变化,能够启动shard迁移过程,立刻停止对某个不属于它的shard的请求提供服务。迁移shards数据需要定义RPC,并且迁移期间对应的shards不能提供服务(要么超时要么时间之内可以提供服务)。

有一个要求是:确保副本组中的所有服务器在执行的操作序列中执行迁移。==有了多分片之后,server的log和状态机是如何组织的?每个shard都有单独的日志和状态机(kv map)吗?进行迁移是将日志迁移还是将状态机迁移呢?==

我认为每个shard都有一个专属的kv map,一旦一个log apply之后,他就持久化了,这时他这部分已apply的日志可以舍弃了,因为apply的日志一定是commit的。迁移数据可以是日志也可以是状态机,但是状态机更合理一些。

Note:server需要定期的从shardmaster那里获取最新的configurations,tests预期你的代码会每100ms获取一次,如果少了很可能出现问题。

Note:shardkv servers之间需要通过RPC来传输shards, 这个RPC要自定义吧. 不要忘了进行注册!可以通过make_end()来获得ClientEnd,从而进行RPC。

Hints
  1. 定期获取最新配置,拒绝不为某shard负责的请求

  2. 对于错误的shard请求返回ErrWrongGroup error;面对并发的"re-configuration"时也需要工作.

  3. 按顺序, 一次处理一个 re-configurations

  4. 如果test失败了,检查god错误。

  5. 和Part C一样,需要避免一个client request重复执行, at-most-once语义。

  6. 思考shard client和server应该怎样处理ErrWrongGroup问题,当client收到了该错误需要改变sequence number吗?server需要更新client的状态吗?(它有啥状态要保存在server端? MsgId!表示是否执行过)

    sequence number应该是指 MsgId!表示其唯一的编号。

  7. 当一个server(注意不是shard)被move到一个新的配置后,它可以继续存储它不再owns的shards。这可能简化server的实现。

  8. xxx 难理解

  9. 可以发送整个map在RPC中,让shard迁移更简单

  10. 为了避免竞争RPC的map需要是一份copy。server在RPC读取它的时候可能reading 它, 因此传递给RPC的应该是map的一个拷贝以保证线程安全。

  11. 从applyCh那里接受到map后也需要copy;否则可能发生race(靠谱),而race是在server修改map/slice和Raft做持久化时读取它的时候发生的。

  12. 两个groups之间可能需要互相给对方移动shards,如果发生死锁了这个可能是原因。

实现思路

总结了一下 todo

  • 完成StartServer,注意 labgod的注册
  • 完成像lab3一样的raft group框架,apply时分shard进行写
  • client标识命令唯一性,以及server端对重复命令的处理
  • 增加WrongGroup逻辑,看hint 6,结合client request唯一性
  • 临界区加锁,使用go test -race 检测竞争条件,先copy map再操作
  • 我认为,re-configurations逻辑也需要apply,只能由leader同步这个状态
  • 状态机部分,每个shard都有一个map来保存结构,数据结构中增加shard信息
  • 实现shardkv server定时从shardmaster获取config,注意超时重试机制
  • 实现shard迁移机制(RPC,记得注册!),Task1用不到,Task2需要;这个比较麻烦

调试日志

TestStaticShards

在调试的时候,shardmaster和shardkv各有自己的raft,因此日志混合在一起了。。有点乱

写好一个基本实现后,发现在PutAppend里一直是WrongGroup,而没有执行日志复制。

在最开始的时候有时候会一直循环,有时候又不会。打断点之后发现,死循环是执行Put的时候产生的,并且下面给Err赋值为OK的bug修复之后,依然是死循环,因为根本没有执行opt。

忘记了正常情况下给PutAppend和Get返回的Err赋值为OK,导致Err一直为空,于是client的PutAppend和Get一直死循环。

虽然还有些导致死循环的bug,但是这个static测试终于Pass一次了!还有两个bug:

  1. 第一个是执行Put的时候的死循环,一直进不了opt

    打印日志后发现,这个死循环是由一直ErrWrongGroup导致的

    1
    
    2021/10/11 21:56:04 [WARNING] client.go:132	 client PutAppend shard: 8, gid: 100, ok: true, reply: {WrongLeader:false Err:ErrWrongGroup}
    

    奇怪的是,他本不应该ErrWrongGroup的;在wrongGroup函数中,kv.config 变成了一个没有赋值的config,Num==0,但在updateConfig中打印的日志来看它确实是赋值了的。

    分析了很久,突然明白了。。。我的ErrWrongGroup的判断是在isLeader之前的,而我设置的更新config是只能leader更新,所以follower的config是没有更新的,所以导致了client一直访问到follower然后ErrWrongGroup,然后一直循环。只有当server为0的时候是leader,这个测试才会通过!所以就会出现不确定的情况!

    果然如此,将只有leader才能update config的逻辑删除掉了,测试就通过了,而且没有出现下面的第二种错的情况了。

  2. 第二个是执行完Put、check之后,又出现Get的死循环

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    
    2021/10/11 21:28:34 [WARNING] server.go:76	 [0] Get args: &{Key:3}
    2021/10/11 21:28:34 [WARNING] server.go:76	 [0] Get args: &{Key:2}
    2021/10/11 21:28:34 [WARNING] server.go:76	 [0] Get args: &{Key:6}
    2021/10/11 21:28:34 [WARNING] server.go:76	 [0] Get args: &{Key:5}
    2021/10/11 21:28:34 [WARNING] server.go:76	 [0] Get args: &{Key:2}
    2021/10/11 21:28:34 [WARNING] server.go:76	 [0] Get args: &{Key:3}
    2021/10/11 21:28:34 [WARNING] server.go:76	 [0] Get args: &{Key:4}
    2021/10/11 21:28:34 [WARNING] server.go:76	 [0] Get args: &{Key:2}
    2021/10/11 21:28:35 [WARNING] server.go:76	 [0] Get args: &{Key:6}
    2021/10/11 21:28:35 [WARNING] server.go:76	 [0] Get args: &{Key:2}
    2021/10/11 21:28:35 [WARNING] server.go:76	 [0] Get args: &{Key:5}
    2021/10/11 21:28:35 [WARNING] server.go:76	 [0] Get args: &{Key:3}
    2021/10/11 21:28:35 [WARNING] server.go:76	 [0] Get args: &{Key:4}
    2021/10/11 21:28:35 [WARNING] server.go:76	 [0] Get args: &{Key:2}
    

    解决了第一种bug之后,第二个bug也没有出现了。

TestJoinLeave

通过第一个测试之后直接来到第二个,不出所料没通过,原因就是执行Leave后,gid100没有shards了,所以返回空数据。**接下来要实现的逻辑就是在leave的时候将gid100的数据传递给gid101. **

但是有个小问题就是,client本应该收到server那边ErrWrongGroup错误的,但是没有,是因为我把ErrWrongGroup这个判断放到了RPC最开始,不需要进入raft的apply环节就可以判断,所以最新的config还没有来得及更新。在实现数据迁移环节前,应该先调整ErrWrongGroup结构,把他放到apply里去处理好一点

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
2021/10/12 08:59:26 [WARNING] server.go:76	 gid: 101, [0] Get args: &{Key:0}
2021/10/12 08:59:26 [WARNING] server.go:50	 gid: 101, [0] prepare to opt: {Ch:0xc000351500 Req:{Key:0}}
2021/10/12 08:59:26 [WARNING] server.go:153	 gid: 101, [0] apply: {Ch:0xc000351500 Req:{Key:0}}
2021/10/12 08:59:26 [WARNING] server.go:128	 gid: 101, [0] get key: 0, value: 
2021/10/12 08:59:26 [WARNING] server.go:153	 gid: 101, [1] apply: {Ch:<nil> Req:{Key:0}}
2021/10/12 08:59:26 [WARNING] server.go:128	 gid: 101, [1] get key: 0, value: 
2021/10/12 08:59:26 [WARNING] server.go:153	 gid: 101, [2] apply: {Ch:<nil> Req:{Key:0}}
2021/10/12 08:59:26 [WARNING] server.go:128	 gid: 101, [2] get key: 0, value: 
    TestJoinLeave: test_test.go:21: Get(0): expected:
        Dg6DN7YYzr
        received:
--- FAIL: TestJoinLeave (5.75s)
FAIL

后面这种出现value为空,而ErrWrongGroup为false的情况,是因为没有将shards数据进行迁移,导致在Join,Leave之后,gid101的raft servers里没有数据,所以即使gid匹配了,value都是空,所以一直会出错。

急匆匆实现了一个RPC逻辑,以及在updateConfig的时候进行shard数据迁移,没想到进行TestJoinLeave的时候一遍过了。还以为又要调试到爆炸。不过还是不完善的,因为这个测试还比较基本,没有什么错误的逻辑,所以我这个基本的逻辑也可以过了。

TestSnapshot

在上一个测试中,Join、Leave等都不是连续的(有的操作间还sleep了1s),所以shardkv可以比较从容地传输数据。但是在这个测试里,很多操作都是连续的,所以这里需要处理他们的顺序,怎么让他们依次执行并且不会出错。

上一节留下的坑:是否只有leader才可以向其他server的leader发送shard数据。上一节实现的逻辑是让follower也发送了,不太应该。

让我感到非常莫名其妙的是,run和debug(不打断点)两种模式下,出现的Fail原因是不一样的,而且每次都是不一样的,这太奇怪了。

执行测试,一堆很长很长的运行错误。。。而执行debug,则没有运行错误,而是在checklogs的时候出现这个maxraftstate错误;这可太诡异了

1
TestSnapshot: config.go:94: persister.RaftStateSize() 11094, but maxraftstate 1000

看了一下运行错误的报错,发现它是提示map的并发写(可是debug模式执行为什么没有这个问题,debug模式难道不会并发吗?)

1
fatal error: concurrent map iteration and map write

这就是lab文档中多次提到的问题,我在前面实现的代码中为了先追求简单而忽略了这个问题!于是回去检查,发现几个要修改的点:

  1. shard migration数据apply时候,应该对args的kv数据进行一份copy,再赋值给kv;否则会发生竞争。

  2. 在进行shard migration的时候,需要通过RPC传递map数据,这时不能直接用kv的map赋值,而是应该先copy一份,用copy的map传递给RPC,避免出错。

    PS. 但我感觉对于map很大的情况,这样copy map是一个很消耗内存和CPU的操作。是否可以实现一个(或使用一个)支持并发的map,让它本身支持多协程并发,从而避免拷贝。

修改第一个点之后,果然没有了那个运行错误。

目前看到的还剩两个问题:

  1. persister.RaftStateSize() 11094, but maxraftstate 1000 问题,在checklog那里产生的
  2. 中间变更状态导致状态、数据不一致的问题(通过在Join、Leave之后添加sleep可以缓解这一点,但治标不治本)

造成第一个问题的原因是因为先前没有实现snapshot逻辑,所以它checklog检测时出现了问题,日志长度过大。把snapshot的逻辑添加进去,但是它出现了奇怪的循环现象

1
2
3
4
5
6
7
8
2021/10/12 18:56:10 [WARNING] server.go:186	 gid: 100, [0] start snapshot, raftStateSize: 3111
2021/10/12 18:56:10 [WARNING] server.go:193	 gid: 100, [0] end snapshot, raftStateSize: 3111
2021/10/12 18:56:10 [WARNING] server.go:186	 gid: 100, [0] start snapshot, raftStateSize: 3111
2021/10/12 18:56:10 [WARNING] server.go:193	 gid: 100, [0] end snapshot, raftStateSize: 3111
2021/10/12 18:56:11 [WARNING] server.go:186	 gid: 100, [0] start snapshot, raftStateSize: 3111
2021/10/12 18:56:11 [WARNING] server.go:193	 gid: 100, [0] end snapshot, raftStateSize: 3111
2021/10/12 18:56:12 [WARNING] server.go:186	 gid: 100, [0] start snapshot, raftStateSize: 3111
2021/10/12 18:56:12 [WARNING] server.go:193	 gid: 100, [0] end snapshot, raftStateSize: 3111

一直在执行snapshot,而这个RaftStateSize一直没有减少。很奇怪。

定位问题到saveSnapshot逻辑,调试发现有很多很多协程在raft实现的persist这里阻塞了,而这里的逻辑是在persist之后,通知apply触发快照。尝试给applyCh加了很大size的缓冲,于是不阻塞了。

1
rf.applyCh <- msg

解决了第一个问题,这个测试有时候能够通过,但是有时候会出现check失败的情况,所以第二个问题还是存在的。

尽管Pass的情况比Fail要多,但是这个问题还是尽量要解决掉。出现这个问题的根源应该还是shardkv更新config以及迁移shard数据的细节造成的。

四. 坑点&注意点

注册自定义RPC struct

记得将RPC中的自定义struct使用labgod注册!尤其是这个Config(如果实现中使用了它)

1
2
3
4
5
6
labgob.Register(Op{})
labgob.Register(JoinArgs{})
labgob.Register(LeaveArgs{})
labgob.Register(MoveArgs{})
labgob.Register(QueryArgs{})
labgob.Register(Config{}) // fuck!

一开始我没有注册这个Config,导致实验过程中出现了奇怪的现象,具体表现为当leader被kill之后,本应从followers中选出一个新的leader然后新leader快速恢复状态,但是状态却全部丢失了!

打印了许多日志进行调试后,发现除了leader apply的msg的config全是nil, 例如下面这条日志

1
2021/10/05 17:24:14   [INFO]  raft.go:441   [2] apply msg: {CommandValid:true Command:<nil> CommandIndex:3}

之后对RequestAppendEntries进行了调试, 发现在这里获得的log entries中, Get的都是OK的, 而Config的都是nil的, 想到之前踩过的坑,就是没有Register某struct,导致数据丢失。注册之后果然是这样!

初始化map,channel

注意,项目中会用到很多的map和channel,需要手动初始化map和channel,map还好会报错,channel的问题有时候比较隐蔽!

以及,map是引用类型,直接copy不会拷贝实际的值