raft 简要介绍
raft
是一种分布式一致性算法,可以先看看这里:Raft Consensus Algorithm 的互动小程序来对raft
有个感性的认知,然后再读 paper。
raft 的角色
论文:/In Search of an Understandable Consensus Algorithm 里描述的 raft 算法有这几种角色:
- leader :负责处理来自客户端的请求,已经向集群中剩下的所有节点发起日志复制请求
- candidate :当集群中没有 leader 时,follower 会成为 candidate 发起投票
- follower :最好的情况下,集群中只有一个 ledaer,剩下的全都是接收 leader 命令的 follower
leader 的选举
节点最初加入集群的角色都是follower
,follower
如果在ElectionTimeout
内没收到来自leader
的心跳后会将自己的任期数加一,并开启新一轮选举,只有当集群中过半的节点投了赞成票后,candidate
才会成为leader
,并在集群中扮演leader
的角色,向其他节点发心跳以维护自己的权威。
raft 的日志
raft
最重要的东西就是日志,日志必须包含这两样东西:
- term :日志的任期
- index :日志的序号
- command :日志的具体内容
leader
向集群中其他节点发AE
来完成日志复制,只有leader
认为集群中过半的节点完成了日志复制,他才会认为这条日志可以提交。
实验要求
lab2
包含了一下三类测试:
lab 2A
的任务是 使 raft 实现启动后的 leader
选举, 和leader
挂了之后的重新选举要通过 TestInitialElection2A
和 TestRelection2A
这俩测试场景
lab 2B
要求完成日志复制,要处理集群出现网络分区,和分区后的恢复
lab 2C
要求raft
能在崩溃后恢复,原有的代码为我们提供了persistor
来做日志的持久化。
raft 实现
代码结构
Raft
结构体是这样,下面逐一解释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
type Raft struct {
...
title title // raft 的角色:leader,candidate,或 follower
currentTerm Term // raft 的当前任期
logs map[int]Command
commitedIndex, lastApplied int // 已经持久化的日志序号,最后一次上报给 state machine 的日志序号
nextIndex, matchIndex []int // 这是 leader 角色才有用到,分别表示 follower 的下一次心跳同步的日志起始位置和已经确认保持一致的日志位置
Requests chan Request // 一个 channel, 别的 raft 想和本 raft 通信的话,请求都放进这个 channel
RequestsDone chan struct{}
Reply chan Reply // 这是 leader/candidate 才会用到的,发出心跳请求和投票请求的回复信息会进这个管道
_log *zap.SugaredLogger
applyCh chan ApplyMsg
leaderTicker, electionTicker *time.Ticker // 分别是 leader 的嘀嗒器,还有超时未收到心跳的嘀嗒器
}
|
leader 的 ticker
论文中要求:broadcastTime ≪ electionTimeout ≪ MTBF
即:正常情况下的 节点通信时间远远小于 raft 的选举超时时间,选举超时时间远远小于单机的平均故障时间间隔
值得注意的是:raft
的选举超时时间是随机值,之所以设计为随机值是为了防止同一时刻有多个raft
节点都成为candidate
心跳超时时间为: 50 毫秒
选举超时时间为: 250~500 毫秒
1
2
3
4
5
6
7
8
9
|
const (
HeartBeat = 50 * time.Millisecond
ElectionTimeout = 500 * time.Millisecond
)
func randomElectionTimeout() time.Duration {
r := math.Abs(rand.Float64()/2) + 0.5
return time.Duration(float64(ElectionTimeout) * r)
}
|
raft 的角色:title
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
type title string
const (
Follower title = "Follower "
Candidate title = "Candidate"
Leader title = "Leader "
)
type title string
func (rf *Raft) change(nt title) {
if rf.electionTicker == nil {
rf.electionTicker = time.NewTicker(randomElectionTimeout())
}
if rf.leaderTicker == nil {
rf.leaderTicker = time.NewTicker(HeartBeat)
}
if rf.title != nt {
rf.Infof("change to [%s]", nt)
} else {
//rf.Infof("continue as [%s]", nt)
}
rf.title = nt
if nt == Leader {
rf.leaderTicker.Reset(HeartBeat)
rf.electionTicker.Stop()
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = len(rf.logs) + 1
rf.matchIndex[i] = 0
}
} else {
rf.leaderTicker.Stop()
rf.electionTicker.Reset(randomElectionTimeout())
}
}
|
我之所以将raft
的title
和ticker
的变更绑定在一块,是因为raft
的角色变更后,ticker
必须得及时变更:
- 当
candidate
变为 leader 时,要设置心跳 ticker,并且要重新初始化 matchindex
和nextindex
- 当别的角色变为
follower
时,要设置选举超时 ticker
我是这样组织raft
的结构的:
type Raft struct {
...
Requests chan Request
RequestsDone chan struct{}
Reply chan Reply
applyCh chan ApplyMsg
leaderTicker, electionTicker *time.Ticker
}
任期: term
1
2
3
4
|
type Term struct {
VoteFor *int
Term int
}
|
因为raft
的选举机制要求一个任期内只能为一个candidate
投票,所以我将raft
的当前任期(currentTerm)的任期数和VoteFor
(当前任期为谁投了票,nil 表示未投票)放在同一个结构体内,Term
有以下方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
func (rf *Raft) IncTerm() {
rf.currentTerm.VoteFor = nil
rf.currentTerm.Term++
}
func (rf *Raft) SetVoteFor(voteFor *int) {
rf.currentTerm.VoteFor = voteFor
}
func (rf *Raft) SetTerm(tm int) {
rf.currentTerm.VoteFor = nil
if tm <= rf.currentTerm.Term {
panic("set equal or lower Term")
}
rf.currentTerm.Term = tm
}
|
每当raft
更换任期后,VoteFor
重置为nil
核心代码:
来看看这串raft
的核心代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
func (rf *Raft) run() {
round := 0 // 为了给 leader 发心跳和 candidate 发投票请求用的,然 raft 接受回复时,判断消息是否过期
for {
if rf.killed() {
time.Sleep(HeartBeat)
continue
}
rf.Infof("\n\n\n\n\n new round")
round++
// 每次循环做的第一件事情先判断下身份,如果是 leader 则立马发心跳,如果是 candidate 则立马发投票请求
switch rf.title {
case Leader:
rf.doLeader(round)
if rf.title != Leader {
continue
}
case Candidate:
rf.doCandidate(round)
if rf.title != Candidate {
continue
}
grantVote++
default:
}
// 如果是 leader 或者 candidate 的话,上面已经发了心跳或投票请求了
// 下来进入漫长的接收请求起
rf.procEvent(round)
}
}
|
这里专门用来处理外面来的请求或者回复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
func (rf *Raft) procEvent(round int) {
var grantVote = 1
old := rf.title
for {
if rf.killed() {
return
}
select {
case <-rf.leaderTicker.C: //如果 leader 的心跳超时时间到了,则终止事件处理
return
case <-rf.electionTicker.C: // 等待选举超时
rf.change(Candidate)
return
case event := <-rf.Requests: // 等待心跳请求,投票请求,还有客户端发来的请求
switch event.request.(type) {
case *RequestVoteArgs:
args := event.request.(*RequestVoteArgs)
reply := event.response.(*RequestVoteReply)
rf.handleRequestVote(args, reply)
case *AppendEntriesArgs:
args := event.request.(*AppendEntriesArgs)
reply := event.response.(*AppendEntriesReply)
rf.handleAppendEntries(args, reply)
case *StartRequest:
args := event.request.(*StartRequest)
reply := event.response.(*StartReply)
rf.handleStartCommand(args, reply)
case *GetState:
reply := event.response.(*GetStateReply)
rf.handleGetState(reply)
default:
panic("unknown request")
}
rf.RequestsDone <- struct{}{}
case reply := <-rf.Reply: // 这里处理 raft 自己受到的响应
if reply.Round != round {
break
}
switch reply.Content.(type) {
case AppendEntriesReply:
reply := reply.Content.(AppendEntriesReply)
rf.receiveAppendEntries(reply)
case RequestVoteReply:
reply := reply.Content.(RequestVoteReply)
rf.receiveVoteRequest(reply, &grantVote)
default:
panic("unknown reply type")
}
}
if rf.title != old {
return
}
}
}
|
Leader Election
发起投票请求
raft
要是在Electiontimeout
内没收到来自leader
的请求时,会先让自己的Term
加一,然后开启一轮选举请求:
请求是并发进行的。
发起选举请求需要向别的节点告诉自己的任期和识别号(CandidateId)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
func (rf *Raft) doCandidate(round int) {
rf.Info("do candidate")
defer rf.Info("do candidate done")
rf.IncTerm()
var tm = rf.me
rf.currentTerm.VoteFor = &tm
for peerId := range rf.peers {
if peerId == rf.me {
continue
}
args := RequestVoteArgs{
Term: rf.currentTerm.Term,
CandidateId: rf.me,
LastLogIndex: len(rf.logs),
LastLogTerm: rf.logs[len(rf.logs)].Term,
}
reply := RequestVoteReply{}
go func(peerId int) {
if !rf.sendRequestVote(peerId, &args, &reply) {
return
}
rf.Infof("send RV: %s to %d, get %s", String(args), peerId, String(reply))
rf.Reply <- Reply{
Content: reply,
Round: round,
}
}(peerId)
}
}
|
处理投票请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
func (rf *Raft) handleRequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.Infof("RV: args: %s", String(args))
defer rf.Infof("RV: reply%s", String(reply))
reply.Term = rf.currentTerm.Term
if args.Term < rf.currentTerm.Term {
reply.Debug = "Term less than me"
return
} else if args.Term > rf.currentTerm.Term {
rf.SetTerm(args.Term)
if rf.title != Follower {
rf.change(Follower)
}
}
if err := rf.asUpdateAsMine(args.LastLogIndex, args.LastLogTerm); err != nil {
reply.Debug = err.Error()
return
}
if rf.currentTerm.VoteFor == nil || *rf.currentTerm.VoteFor == args.CandidateId {
if len(rf.logs) == 0 {
reply.VoteGranted = true
rf.currentTerm.VoteFor = &args.CandidateId
rf.change(Follower)
} else if rf.logs[len(rf.logs)].Term < args.LastLogTerm ||
(rf.logs[len(rf.logs)].Term == args.LastLogTerm &&
len(rf.logs) <= args.LastLogIndex) {
reply.VoteGranted = true
rf.currentTerm.VoteFor = &args.CandidateId
rf.change(Follower)
} else {
reply.Debug = fmt.Sprintf("prev log check failed %d:%#v, %#v | %t (%d==%d?%t) %t", len(rf.logs), String(rf.logs[len(rf.logs)]), String(args),
rf.logs[len(rf.logs)].Term < args.LastLogTerm,
rf.logs[len(rf.logs)].Term, args.LastLogTerm,
rf.logs[len(rf.logs)].Term == args.LastLogTerm,
len(rf.logs) <= args.LastLogIndex,
)
}
} else {
reply.Debug = fmt.Sprintf("already voted %d", *rf.currentTerm.VoteFor)
}
}
|
处理投票响应
发完请求后,进入procEvent
等待别的节点响应请求。
要是别人的任期高于自己,则立即转成Follower
角色
只有当集群中超过一半的人同意时,立即成为Leader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func (rf *Raft) receiveVoteRequest(reply RequestVoteReply, grantVote *int) {
if reply.Term > rf.currentTerm.Term {
rf.SetTerm(reply.Term)
rf.change(Follower)
return
}
if reply.VoteGranted {
*grantVote++
if *grantVote*2 > len(rf.peers) {
rf.change(Leader)
return
}
}
}
|
HeartBeat
Leader
要定时发起心跳, 目的是在集群中树立自己的权威。
如果别的节点没有在Electiontimeout
内收到来自Leader
的心跳请求时,则发起选举。
如果Entries
是空的,则相当于心跳请求,否则是日志复制请求
发起心跳/日志复制请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
func (rf *Raft) doLeader(round int) {
rf.Infof("do leader:%#v", rf.logs)
{
var matchindex string
var nextIndex string
for _, ni := range rf.nextIndex {
nextIndex += fmt.Sprintf("%d,", ni)
}
for _, mi := range rf.matchIndex {
matchindex += fmt.Sprintf("%d,", mi)
}
rf.Infof("matchIndex %s, nextIndex %s", matchindex, nextIndex)
}
for peerId := range rf.peers {
if peerId == rf.me {
continue
}
args := AppendEntriesArgs{
Term: rf.currentTerm.Term,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[peerId] - 1,
PrevLogTerm: rf.logs[rf.nextIndex[peerId]-1].Term,
LeaderCommit: rf.commitedIndex,
Entries: make([]Command, 0),
}
for ni := rf.nextIndex[peerId]; ni <= len(rf.logs); ni++ {
args.Entries = append(args.Entries, rf.logs[ni])
}
reply := AppendEntriesReply{Id: peerId, NEntries: len(args.Entries)}
go func(peerId int) { // 异步请求
if !rf.sendAppendEntries(peerId, &args, &reply) {
return
}
rf.Infof("sent, AE: %s to %d, get %s", String(args), peerId, String(reply))
rf.Reply <- Reply{
Content: reply,
Round: round,
}
}(peerId)
}
}
|
处理心跳/日志复制请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
func (rf *Raft) handleAppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.Infof("handleAE: got : %s", String(args))
defer func() {
rf.Infof("handleAE: reply: %s", String(reply))
}()
reply.Term = rf.currentTerm.Term
if args.Term < rf.currentTerm.Term { // 要是 leader 的任期小于自己的任期,拒绝 AE
reply.Debug = "Term less than me"
//rf.Info("handleAE: Term less than me")
return
} else if args.Term > rf.currentTerm.Term { // 要是 leader 的任期大于自己的任期,更新自己的任期为最新
rf.SetTerm(args.Term)
}
rf.change(Follower) // 这时候已经说明当前 leader 是合法的,reset 选举超时时间
if args.PrevLogIndex > len(rf.logs) {
reply.Debug = "Pev log index greater than mine"
//rf.Infof("prev log index > mine")
return
}
if args.PrevLogIndex != 0 {
minePrev := rf.logs[args.PrevLogIndex]
if minePrev.Term != args.PrevLogTerm {
reply.Debug = fmt.Sprintf("prev Term not equal %d != %d", minePrev.Term, args.PrevLogTerm)
return
}
}
for i, entry := range args.Entries { // 开始处理 leader 发来的日志,要是 leader 的日志中有一条序号相同但任期不同, 那么用 leader 的日志覆盖自己的日志
tlog, ok := rf.logs[args.PrevLogIndex+i+1]
if ok && tlog.Term != entry.Term {
//if rf.lastApplied >= args.PrevLogIndex+i+1 {
//}
logn := len(rf.logs)
for di := args.PrevLogIndex + i + 1; di <= logn; di++ {
delete(rf.logs, di)
rf.Infof("appending: DELETED %d", di)
}
}
rf.logs[args.PrevLogIndex+i+1] = entry
}
rf.commitedIndex = min(len(rf.logs), args.LeaderCommit) // 更新自己的 commitIndex
reply.Success = true // 向 leader 报告 AE 成功
rf.persist()
rf.apply()
}
|
处理心跳/日志复制响应
和处理投票响应一样,当对方的任期大于自己的任期时,立即转成Follower
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
func (rf *Raft) receiveAppendEntries(reply AppendEntriesReply) {
if reply.Term > rf.currentTerm.Term { // 要是对方的任期大于自己的任期, 立即转为 follower
rf.SetTerm(reply.Term)
rf.change(Follower)
return
}
if !reply.Success { // 要是 AE 没成功,那就减小 nextIndex 再试试
if rf.nextIndex[reply.Id] > 1 {
oldidx := rf.nextIndex[reply.Id]
nidx := rf.nextIndex[reply.Id]
rf.nextIndex[reply.Id] = 1
var targetTerm = rf.logs[nidx].Term
if rf.logs[nidx].Term != rf.logs[nidx-1].Term {
targetTerm = rf.logs[nidx-1].Term
}
for i := 1; i <= nidx-1; i++ {
if rf.logs[i].Term == targetTerm {
rf.nextIndex[reply.Id] = i
break
}
}
rf.Infof("DESC %d next idx from %d -> %d", reply.Id, oldidx, rf.nextIndex[reply.Id])
}
} else { // AE 成功了,更新 matchIndex 和 nextIndex
rf.nextIndex[reply.Id] = rf.nextIndex[reply.Id] + reply.NEntries
if reply.NEntries == 0 {
rf.matchIndex[reply.Id] = rf.nextIndex[reply.Id] - 1
}
var midxs []int // leader 要计算出哪个日志序号是集群中过半节点都复制完成的,以增加 commitIndex
for pid, mi := range rf.matchIndex {
if pid == rf.me {
midxs = append(midxs, len(rf.logs))
} else {
midxs = append(midxs, mi)
}
}
sort.Ints(midxs)
// 0 1 2 3 4
mid := len(rf.matchIndex) / 2
if midxs[mid] > rf.commitedIndex {
old := rf.commitedIndex
rf.commitedIndex = midxs[mid]
rf.Infof("upgrade committedIndex %d->%d", old, rf.commitedIndex)
rf.persist()
rf.apply()
}
}
}
|
持久化
论文中指出只有 currentTerm,VoteFor 和 logs 是需要持久化的。
follower
要在响应请求之前完成持久化
leader
在commitIndex
增加之后完成持久化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
func (rf *Raft) persist() {
data := rf.raftState()
rf.persister.SaveRaftState(data)
}
func (rf *Raft) raftState() []byte {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if err := e.Encode(rf.currentTerm); err != nil {
rf.Errorf("encode current Term %s", e)
}
if err := e.Encode(rf.logs); err != nil {
rf.Errorf("encode logs %s", err)
}
data := w.Bytes()
return data
}
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
if err := d.Decode(&rf.currentTerm); err != nil {
rf.Errorf("read persist %s", err)
}
if err := d.Decode(&rf.logs); err != nil {
rf.Errorf("read persist %s", err)
}
}
|
总结
lab2A
还挺简单,到了lab2B
就有点难度了。lab2B
完成后,lab2c
的持久化和网络分区又让我栽了跟头
完成lab2B
真的挺折腾人的,比如我出过这些问题:
- 没有让节点在收到合法的
AppendEntries
之后重置选举超时嘀嗒器,使得集群中频繁发生选举。
- 没有完整的校验
Leader
发来的日志信息是否比自己更新就给AE
报success
使得集群中的日志出现不一致。
- 不管是
AE
还是RV
,收到请求后没有及时更新自己的Term
,或没有判断对方的Term
是否有效就进入逻辑判断
分布式系统出问题真不好排查,为了查问题方便,我将响应leader
的错误信息放在了Debug
字段里,这样我只关注ledaer
打印的处理日志就能排查问题了。
但是这个demo
是假定集群成员数量固定不变的,没有包含集群成员变更,真实场景下,集群成员的配置是有可能改变的
参考资料