project2

目录

project2 题解

概述

project1和project2是分水岭,2直接开始实现raft库,涉及的代码更多并且更复杂。这个project一共有三个部分:

  1. raft库的实现(2A)
  2. 基于raft的K/V存储服务(2B)
  3. 增加对snapshot的支持(2C)

raft库是一个一致性算法库,它本身并不具有KV存储的功能,因此我们需要在raft之上建立一个KV存储,并且实现快照。

2A

2A也有多个模块,被分为三个部分:2AA, 2AB, 2AC

  • 2AA是实现领导选举
  • 2AB是实现日志复制
  • 2AC是实现raw node接口

MessageType

raft算法本质上是一个大的状态机,任何的操作例如选举、提交数据等,最后的操作一定是封装成一个消息结构体,输入到raft算法库的状态机中。raft算法其实由好几个协议组成,但是在这里,统一定义在了Message这个结构体之中,以下总结了该结构体的成员用途。

成员 类型 作用
type MessageType 消息类型
to uint64 消息接收者的节点ID
from uint64 消息发送者的节点ID
term uint64 任期ID
logTerm uint64 日志所处的任期ID
index uint64 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID
entries Entry 日志条目数组
commit uint64 提交日志索引
snapshot Snapshot 快照数据
reject bool 是否拒绝
rejectHint uint64 拒绝同步日志请求时返回的当前节点日志ID,用于被拒绝方快速定位到下一次合适的同步日志位置

由于这个Message结构体,全部将raft协议相关的数据都定义在了一起,有些协议不是用到其中的全部数据,所以这里的字段都是optinal的。下面将介绍各种不同的消息

MsgHup

成员 类型 作用
type MsgHup 不用于节点间通信,仅用于发送给本节点让本节点进行选举
to uint64 消息接收者的节点ID
from uint64 本节点ID

MsgBeat

成员 类型 作用
type MsgBeat 不用于节点间通信,仅用于leader节点在heartbeat定时器到期时向集群中其他节点发送心跳消息
to uint64 消息接收者的节点ID
from uint64 本节点ID

MsgPropose

成员 类型 作用
type MsgProp raft库使用者提议(propose)数据
to uint64 消息接收者的节点ID
from uint64 本节点ID
entries Entry 日志条目数组

raft库的使用者向raft库propose数据时,最后会封装成这个类型的消息来进行提交,不同类型的节点处理还不尽相同。

MsgAppend

成员 类型 作用
type MsgApp 用于leader向集群中其他节点同步数据的消息
to uint64 消息接收者的节点ID
from uint64 本节点ID
entries Entry 日志条目数组
logTerm uint64 日志所处的任期ID
index uint64 索引ID

MsgSnapshot

成员 类型 作用
type MsgSnap 用于leader向follower同步数据用的快照消息
to uint64 消息接收者的节点ID
from uint64 本节点ID
snapshot Snapshot 快照数据

MsgAppendResponse

成员 类型 作用
type MsgAppResp 集群中其他节点针对leader的MsgApp/MsgSnap消息的应答消息
to uint64 消息接收者的节点ID
from uint64 本节点ID
index uint64 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID
reject bool 是否拒绝同步日志的请求
rejectHint uint64 拒绝同步日志请求时返回的当前节点日志ID,用于被拒绝方快速定位到下一次合适的同步日志位置

MsgRequestVote

成员 类型 作用
type MsgVote/MsgPreVote 节点投票给自己以进行新一轮的选举
to uint64 消息接收者的节点ID
from uint64 本节点ID
term uint64 任期ID
index uint64 日志索引ID,用于节点向leader汇报自己已经commit的日志数据ID
logTerm uint64 日志所处的任期ID
context bytes 上下文数据

MsgRequestVoteResponse

成员 类型 作用
type MsgHeartbeatResp 用于follower向leader应答心跳消息
to uint64 消息接收者的节点ID
from uint64 本节点ID
context bytes 上下文数据,在这里保存一致性读相关的数据

2AA

选举流程

  • 节点启动时都以follower状态启动,同时随机选择自己的选举超时时间。之所以每个节点随机选择自己的超时时间,是为了避免同时有两个节点同时进行选举,这种情况下会出现没有任何一个节点赢得半数以上的投票从而这一轮选举失败,继续再进行下一轮选举
  • 在follower的tick函数tickElection函数中,当选举超时到时,节点向自己发送HUP消息。
  • 否则进入campaign函数中进行选举:首先将任期号+1,然后广播给其他节点选举消息,带上的其它字段包括:节点当前的最后一条日志索引(Index字段),最后一条日志对应的任期号(LogTerm字段),选举任期号(Term字段,即前面已经进行+1之后的任期号),Context字段(目的是为了告知这一次是否是leader转让类需要强制进行选举的消息)。
  • 如果在一个选举超时之内,该发起新的选举流程的节点,得到了超过半数的节点投票,那么状态就切换到leader状态,成为leader的同时,leader将发送一条dummy的append消息,目的是为了提交该节点上在此任期之前的值(见疑问部分如何提交之前任期的值)

收到选举消息的节点

一个节点在一个任期内只能进行一次投票,当收到其他节点的投票请求时,需要满足一下条件:

  1. 只有在没有给其他节点进行过投票,或者消息的term任期号大于当前节点的任期号,或者之前的投票给的就是这个发出消息的节点
  2. 进行选举的节点,它的日志是更新的,条件为:logterm比本节点最新日志的任期号大,在两者相同的情况下,消息的index大于等于当前节点最新日志的index,即总要保证该选举节点的日志比自己的大。

只有在同时满足以上两个条件的情况下,才能同意该节点的选举,否则都会被拒绝。这么做的原因是:保证最后能胜出来当新的leader的节点,它上面的日志都是最新的。

几个关键函数

Step

step函数接收一个Msg并对他进行处理。是外部传递消息的接口。根据当前节点的不同state,进行特定地处理:stepLeader,stepCandidate,stepFollower。

Tick

raft库中并不会进行实际的计时,其时钟是逻辑时钟,是外部通过调用tick方法作为一次逻辑时钟的推进。具体多长时间调用一次tick由外部控制。不同类型的节点对于tick的反应是不一样的。对于follower和candidate,会检测当前时间和上次心跳之间是否超时,如果是那么需要通过Step发送 MessageType_MsgHup。对于leader,则需要检测距离上次发送心跳是否超时,如果是则需要发送 MessageType_MsgBeat。

2AB

2AB是实现raft的日志复制。

日志复制的流程为:

  1. Leader 为客户端提供服务,客户端的每个请求都包含一条即将被状态复制机执行的指令。
  2. Leader 把该指令作为一条新的日志附加到自身的日志集合,然后向其它节点发起附加条目请求(AppendEntries RPC),来要求它们将这条日志附加到各自本地的日志集合。
  3. Leader会为每个其他节点维护Match和Next数据,Match表示已经复制过去的进度,Next表示下一个待复制的日志索引。只有Match和Follower存在的日志相匹配时才能复制成功。
  4. 当这条日志已经确保被安全的复制,即大多数(N/2+1) 节点都已经复制后,leader 会将该日志 apply 到它本地的状态机中,然后把操作成功的结果返回给客户端。

2AC

2AC是实现raw node接口,即raft库对外提供的调用,在构建基于raft的K/V存储时,每个peer将会有一个RawNode接口。在这个部分,主要需要实现的方法是:

  1. HasReady
  2. Ready
  3. Advance

RawNode是raft库的接口,把raft库作为一个黑箱来看,其输入是Propose,传递一个消息,其输出是Ready,将会返回一个Ready结构体,从而可以持久化日志和进行apply。

当客户端需要向raft中发送消息时,他会调用Propose,其调用Step处理Msg。客户端将会不断地调用HasReady,判断出是否有需要处理的数据被更新,如果有那么调用Ready将这些数据通过Ready结构传递出来。在对Ready进行处理之后,会调用Advance告知raft某个Ready已经被处理,在raft中可以更新对应结构了。

Ready结构为

成员名称 类型 作用
SoftState SoftState 软状态,软状态易变且不需要保存在WAL日志中的状态数据,包括:集群leader、节点的当前状态
HardState HardState 硬状态,与软状态相反,需要写入持久化存储中,包括:节点当前Term、Vote、Commit
ReadStates []ReadStates 用于读一致性的数据,后续会详细介绍
Entries []pb.Entry 在向其他集群发送消息之前需要先写入持久化存储的日志数据
Snapshot pb.Snapshot 需要写入持久化存储中的快照数据
CommittedEntries []pb.Entry 需要输入到状态机中的数据,这些数据之前已经被保存到持久化存储中了
Messages []pb.Message 需要发送出去的数据

2B

2B是实现一个基于raft的K/V存储引擎。这部分涉及的代码较多,较为复杂,主要实现 peer_storage.go 中 SaveReadyState() 方法和 Append() 方法,以及peer_storage_handler.go 中的 proposeRaftCommand() 和 HandleRaftReady() 方法。

struct及含义

需要了解raftstore中的各种struct及含义:

  1. raftstore.peer:raft group中的单个节点,每个peer都有一个rawnode结构,表示他所在的raft group。每个peer还有一个PeerStorage结构,用来存储raft的state
  2. raftstore.PeerStorage:peer的成员,用来存储raft的state。在2B中,需要实现它的Append和SaveReadyState方法。
  3. engine_util.Engines: PeerStorage的成员,他有两个badger实例,分别是raftdb和kvdb,kvdb可以 看成是 Raft 论文中提到的状态机。
  4. raftstore.raftWorker:它有一个run方法会一直运行,接收raftCh传过来的消息(chan message.Msg类型),然后根据msg中的RegionID找到对应的PeerState,PeerState中包含了peer,之后再通过调用newPeerMsgHandler来进行两种操作:HandleRaftReady,HandleRaftReady。前者用于处理各种消息,例如MsgTypeRaftMessage,MsgTypeRaftCmd等,来自同其他peer的消息就属于MsgTypeRaftMessage,而来自客户端的请求则是MsgTypeRaftCmd。后者则是用于在将消息传递给raft之后,接收它返回的Ready结构,然后处理Ready的内容,处理完后通过Advance告知raft。
  5. raftstore.peerMsgHandler:2B中需要实现的第二种类型,在raftWorker中被调用,它封装了处理raft消息和其返回的Ready结构的方法,2B中需要实现两种方法:HandleRaftReady,proposeRaftCommand。
  6. raftstore.RaftStorage: raftstore的入口,它会启动一些workers来异步处理特定的任务。例如前面的raftWorker。它有一个GlobalContext结构,包含整个raftstore的上下文,其中有engine_util.Engines,他是整个raftstore server共有的,在每个PeerStorage中都有该结构,GlobalContext还包含storeMeta结构,存储了raftstore的元数据,例如regionRanges btree.BTree用来查询某个range所属的的region(region end key -> region ID),以及regions map[uint64]metapb.Region(region_id -> region)。

代码实现

需要实现两个文件(peer_storage.go, peer_msg_handle.go)中的四个函数:

proposeRaftCommand

这个函数在HandleMsg()中被调用,用于处理类型为MsgTypeRaftCmd的消息,这个消息是来自客户端的消息(命令),有哪些命令可以看raft_cmdpb.pb, 有:get, put, delete, snap。它的第二个参数是Callback类型,包含RaftCmdResponse,客户端发出命令,raftStorage处理完之后,将结果(或者错误)通过 Callback 返回。例如客户端进行get了某个key,那么需要将value结果返回。并且如果处理过程中发生了错误,也需要将错误返回,错误通过ErrResp方法封装到RaftCmdResponse结构中。

proposals是一个记录Proposal的callback的结构,proposals在proposeRaftCommand中被append,每个RaftCommand对应一个proposal,每次处理RaftCommand之后会取出一个proposal,然后将返回的错误提交给它,之后会返回给客户端。

在2B中需要考虑的错误有:ErrNotLeader,ErrStaleCommand。

HandleRaftReady

接收来自peer的 Ready 结构,进行处理 (文档中给出了伪代码框架)。 步骤:

  1. 从Raft模块中获取 Ready(2ac中实现的 HasReady() 和 Ready()
  2. 将Ready数据保存(持久化),应该是调用 peer_storage中的SaveReadyState(我自己实现),里面要实现Entries的持久化(Append)
  3. 按照文档中的伪代码,接下来是直接发送Messages (状态和Entries被持久化后就可以发送消息了)
  4. 处理 Ready中的其他结构,Snapshot需要持久化(?),CommittedEntries需要apply到 状态机(kvdb)。
  5. 全部执行完后,调用 rawnode(raftgroup)的Advance() 告知当前 Ready结构已处理完成

伪代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
for {
  select {
  case <-s.Ticker:
    Node.Tick()
  default:
    if Node.HasReady() {
      rd := Node.Ready()
      saveToStorage(rd.State, rd.Entries, rd.Snapshot)
      send(rd.Messages)
      for _, entry := range rd.CommittedEntries {
        process(entry)
      }
      s.Node.Advance(rd)
    }
}

对于单个的entry如何进行处理呢?

处理单个 entry,根据 eraftpb.pb 中对 Entry 的注释,可以知道entry有normal entries和configuration changes两种。后者应该是 在project3中使用。 根据 EntryType 来区分这两者:EntryType_EntryNormal,EntryType_EntryConfChange。

我们咋进行 raft command 处理的时候,会将RaftCmdRequest结果通过Marshal方法编码成raft message中的Data,当处理entry的时候,也需要通过 Unmarshal 方法将entry.Data解码成RaftCmdRequest结构。从而进行处理。

而一个 RaftCmdRequest 结构中,又有两种类型,RaftCmdRequest注释说 normal requests and administrator request 不能同时装入,同时只能存在一种,2B中需要实现的就是 normal requests。

一个RaftCmdRequest结构中有一个requests的slice,需要循环遍历这个requests,根据cmd的类型(put,delete,get)进行分别处理。

之后,需要根据将RaftCmdRequest的处理结果通过proposal的CallBack返回。出现错误则返回错误。

写入需要通过engine_util.WriteBatch结果,以批量,原子地形式一次性将变化写入到状态机中(kvdb)。

SaveReadyState

这个方法在HandleRaftReady中被调用,用于将Ready变化的状态持久化。这些状态(结构)在重启后恢复一致的状态机非常重要。

  • Entry: raft 日志,需要被持久化。
  • RaftLocalState:用于存储当前 Raft 的 HardState 和 last Log IndexLastTerm
  • RaftApplyState:用于存储 Raft applies的last Log index(applied index)和一些截断的(truncated ) Log 信息。
  • RegionLocalState:用于在这个Store上存储Region信息和对应的Peer状态。Normal 表示该 Peer 正常,Tombstone 表示该 Peer 已从 Region 中移除,无法加入 Raft Group。

这些结构分别存储在raftdb 和 kvdb中,且通过writebatch.SetMeta()写入到badger中。这四类结构的key和value分别如下:

Key KeyFormat Value DB
raft_log_key 0x01 0x02 region_id 0x01 log_idx Entry raft
raft_state_key 0x01 0x02 region_id 0x02 RaftLocalState raft
apply_state_key 0x01 0x02 region_id 0x03 RaftApplyState kv
region_state_key 0x01 0x03 region_id 0x01 RegionLocalState kv

Append

该方法在SaveReadyState中被调用,用于持久化raft logs。

同时,这部分log只代表被持久化的log,并不是所有持久化了的log都commit了,有可能部分持久化了的log还需要被 truncated(和内存上的log类似)。

debug

写好了基本实现之后,往往会出现很多的bug,这时需要根据测试去排查bug。

  1. 处理proposal时,如果ntry.Term < prop.term或者entry.Index < prop.index时直接return了,但是应该break,因为可能有kvWB需要在后面进行WriteToDB。
  2. 在process单个entry的时候,应该Set applyState,但是写成 RaftLocalState了,导致apply state一直没有持久化。
  3. 总是发生 runtime error: invalid memory address or nil pointer dereference 这种错误,debug发现是txn为nil导致。发现自己代码里txn的设置是在 cb.Done(resp)之后,导致有时候txn会是nil。而他本应该在Done之前设置。

但是在CallBack的注释中,对txn的注释是 used for GetSnap,那么2B中不应该涉及啊,怎么老是会因为nil而panic?

而且我在上一个设置了txn(不管啥情况),但是txn还是nil。经过debug发现,我当时设置的txn是在Done了resp之后设置的。。。但应该在Done之前,因为一旦Done了那边就会处理,然后如果在我设置txn之前处理,那么txn为nil就会导致panic。而且由于协程调度的顺序不是确定的,因此我的这个panic的出现也是有点随机的。

而之所以txn只会在有Snap的时候才会用到,而2B按理还没有涉及snap却用到了txn是因为在进行Cluster.Scan的时候,那个req就是一个SnapCmd:req := NewSnapCmd()。这就非常坑了,之前的实现都是跳过了snap这个功能,以为在2C中才会出现,但是2B的测试结果出现涉及snap的测试,让我猝不及防。

有时候 TestOnePartition2B会失败,是下面这种情况

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
=== RUN   TestOnePartition2B
--- FAIL: TestOnePartition2B (12.64s)
panic: can't get value 7632 for key 6b32 [recovered]
   panic: can't get value 7632 for key 6b32

goroutine 8 [running]:
testing.tRunner.func1.1(0xbeb4c0, 0xc17cb0b730)
   /usr/local/go/src/testing/testing.go:1076 +0x30d
testing.tRunner.func1(0xc000001c80)
   /usr/local/go/src/testing/testing.go:1079 +0x41a
panic(0xbeb4c0, 0xc17cb0b730)
   /usr/local/go/src/runtime/panic.go:969 +0x175
github.com/pingcap-incubator/tinykv/kv/test_raftstore.MustGetCf(0xc0000f77a0, 0xcf6e44, 0x7, 0xc00014dea7, 0x2, 0x2, 0xc00014dea5, 0x2, 0x2)
   /mnt/e/zen/pingcap-talnet/tinykv2022/kv/test_raftstore/utils.go:103 +0x385
github.com/pingcap-incubator/tinykv/kv/test_raftstore.MustGetEqual(...)
   /mnt/e/zen/pingcap-talnet/tinykv2022/kv/test_raftstore/utils.go:111
github.com/pingcap-incubator/tinykv/kv/test_raftstore.TestOnePartition2B(0xc000001c80)
   /mnt/e/zen/pingcap-talnet/tinykv2022/kv/test_raftstore/test_test.go:397 +0xb17
testing.tRunner(0xc000001c80, 0xd2d220)
   /usr/local/go/src/testing/testing.go:1127 +0xef
created by testing.(*T).Run
   /usr/local/go/src/testing/testing.go:1178 +0x386
FAIL   github.com/pingcap-incubator/tinykv/kv/test_raftstore  12.695s
FAIL

为什么会是随机的呢?我发现我开启日志之后,几乎每次测试都是PASS,FAIL出现的比例大概是1/17,开启了log之后,失败的概率大幅增加,我觉得这是因为开启了log之后,在操作之间的间隔时间增加了,所以被隔离的节点能够更快地进行日志的append了,所以PASS的概率会大很多。

这显然是不对的,对于被少数分区隔离的leader不管延迟如何他都应该返回正确的且是最新的结果。例如进行get的时候,应该先确认他是不是leader,这时如果不是,就应该将查询转移到leader那边去查询才对。

看了下测试中的源码,MustGetCf的实现是尝试300次,每次间隔20ms,直接从engine.kv中获取key/value。这个时间老实说已经够久的了,所以还是打印日志看看到底是发生了什么。有两千九百多行的日志。

定位到是raft模块出了问题,在这种存在分区的情况产生了错误的结果。

定位到Append成功之后,滞后的旧leader返回的lastIndex一直都是比实际应该的lastIndex小1,导致新的leader一直进行append。

然后找到了两个bug:

  1. 在maybeAppend的实现里,return应该是 newLastIndex 而不应该是 l.LastIndex()!l.LastIndex()可能更大。因为是Append会覆盖,没有把后面的都删除掉,所以新的合法的lastIndex是newLastIndex。
  2. 而是分区恢复之后,旧leader(term 6)向其他节点(非leader,term 9)发送append,然后这个节点同意了它的append,导致append成功然后它append了log并且commit了,但是本不应该同意,因为这个节点的term 9更大!这是因为在Step的时候,m.Term < r.Term时没有return,导致term小的旧leader的msg生效了。

2C

这部分是给raft增加快照的功能。主要是实现ApplySnapshot方法。

snapshot发送流程

(1)raft.go 中的 sendAppend 函数中,调用 raftLog.storage.Snapshot() 函数,新建一个 RegionTaskGen 的任务,异步产生快照,并且将 snapState 的状态改为 Generating 表示正在产生快照。另外,这次调用应该返回一个快照暂时不能获得的Error。

(2) region_task.go 中的 handleGen 函数负责产生快照,调用 doSnapshot 产生快照。

(3)当下次调用 Snapshot() 的时候,如果快照已经产生了,则接收并验证快照,返回结果。

(4)那么 raft.gosnapshotMsg 被存储在了 msgs 中。等待通过 Ready 处理。

(5) HandleRaftReady 中通过调用 send 通过 transport 将消息封装成 RaftMessage 发送到对应的store。

(6)通过 transport.goServerTransportsend 函数发送。在 SendStore 中调用 Resolve ,产生一个 resolveAddrTask 任务,通过 HandlegetAddr 获得StoreId的地址,并通过 callback 更新 raftClientaddrs

(7)在 WriteData 函数中,由于是发送快照的消息,所以通过 SendSnapshotSock 函数建立一个 sendSnapTask 任务。

(8)在 snap_runner.go 中,通过 sendSnap 发送快照

ApplySnapshot方法的实现

  1. 首先判断peerStorage是否是刚创建的,如果不是,那么通过 ps.clearMetaps.clearExtraData() 清除原来的数据,因为新的 snapshot 会包含新的 meta 信息,需要先清除老的。
  2. 然后根据snapshot设置raftState和applyState
  3. 发送 RegionTaskApplyregionSched 安装 snapshot。

总结

Project2非常具有挑战性,它实现了raft库,然后基于raft实现了一个K/V存储,并且还实现了snapshot功能。其中raft库中的代码较多,需要详细阅读raft论文。代码可以参考ETCD的raft库。