This page looks best with JavaScript enabled

MIT-6.824 分布式系统 课后实验 Lab2 A/B/C (Raft Election, Log Replication, Persistence)

 ·  ☕ 9 min read

raft 简要介绍

raft是一种分布式一致性算法,可以先看看这里:Raft Consensus Algorithm 的互动小程序来对raft有个感性的认知,然后再读 paper。

raft 的角色

论文:/In Search of an Understandable Consensus Algorithm 里描述的 raft 算法有这几种角色:

  • leader :负责处理来自客户端的请求,已经向集群中剩下的所有节点发起日志复制请求
  • candidate :当集群中没有 leader 时,follower 会成为 candidate 发起投票
  • follower :最好的情况下,集群中只有一个 ledaer,剩下的全都是接收 leader 命令的 follower

leader 的选举

节点最初加入集群的角色都是followerfollower如果在ElectionTimeout内没收到来自leader的心跳后会将自己的任期数加一,并开启新一轮选举,只有当集群中过半的节点投了赞成票后,candidate才会成为leader,并在集群中扮演leader的角色,向其他节点发心跳以维护自己的权威。

raft 的日志

raft最重要的东西就是日志,日志必须包含这两样东西:

  • term :日志的任期
  • index :日志的序号
  • command :日志的具体内容
    leader向集群中其他节点发AE来完成日志复制,只有leader认为集群中过半的节点完成了日志复制,他才会认为这条日志可以提交。

实验要求

lab2包含了一下三类测试:

  • lab 2A 的任务是 使 raft 实现启动后的 leader 选举, 和leader挂了之后的重新选举要通过 TestInitialElection2ATestRelection2A 这俩测试场景
  • lab 2B 要求完成日志复制,要处理集群出现网络分区,和分区后的恢复
  • lab 2C 要求raft能在崩溃后恢复,原有的代码为我们提供了persistor来做日志的持久化。

raft 实现

代码结构

Raft 结构体是这样,下面逐一解释

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
type Raft struct {
    ...

	title title  // raft 的角色:leader,candidate,或 follower
	currentTerm Term // raft 的当前任期
	logs        map[int]Command 

	commitedIndex, lastApplied int // 已经持久化的日志序号,最后一次上报给 state machine 的日志序号

	nextIndex, matchIndex []int // 这是 leader 角色才有用到,分别表示 follower 的下一次心跳同步的日志起始位置和已经确认保持一致的日志位置

	Requests     chan Request // 一个 channel, 别的 raft 想和本 raft 通信的话,请求都放进这个 channel
	RequestsDone chan struct{} 
	Reply        chan Reply // 这是 leader/candidate 才会用到的,发出心跳请求和投票请求的回复信息会进这个管道

	_log *zap.SugaredLogger

	applyCh chan ApplyMsg 

	leaderTicker, electionTicker *time.Ticker // 分别是 leader 的嘀嗒器,还有超时未收到心跳的嘀嗒器
}

leader 的 ticker

论文中要求:broadcastTime ≪ electionTimeout ≪ MTBF
即:正常情况下的 节点通信时间远远小于 raft 的选举超时时间,选举超时时间远远小于单机的平均故障时间间隔

值得注意的是:raft的选举超时时间是随机值,之所以设计为随机值是为了防止同一时刻有多个raft节点都成为candidate
心跳超时时间为: 50 毫秒
选举超时时间为: 250~500 毫秒

1
2
3
4
5
6
7
8
9
const (
	HeartBeat       = 50 * time.Millisecond
	ElectionTimeout = 500 * time.Millisecond
)

func randomElectionTimeout() time.Duration {
	r := math.Abs(rand.Float64()/2) + 0.5
	return time.Duration(float64(ElectionTimeout) * r)
}

raft 的角色:title

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type title string
const (
	Follower  title = "Follower "
	Candidate title = "Candidate"
	Leader    title = "Leader   "
)
type title string

func (rf *Raft) change(nt title) {
	if rf.electionTicker == nil {
		rf.electionTicker = time.NewTicker(randomElectionTimeout())
	}
	if rf.leaderTicker == nil {
		rf.leaderTicker = time.NewTicker(HeartBeat)
	}

	if rf.title != nt {
		rf.Infof("change to [%s]", nt)
	} else {
		//rf.Infof("continue as [%s]", nt)
	}
	rf.title = nt
	if nt == Leader {
		rf.leaderTicker.Reset(HeartBeat)
		rf.electionTicker.Stop()
		for i := 0; i < len(rf.peers); i++ {
			rf.nextIndex[i] = len(rf.logs) + 1
			rf.matchIndex[i] = 0
		}
	} else {
		rf.leaderTicker.Stop()
		rf.electionTicker.Reset(randomElectionTimeout())
	}
}

我之所以将rafttitleticker的变更绑定在一块,是因为raft的角色变更后,ticker必须得及时变更:

  • candidate变为 leader 时,要设置心跳 ticker,并且要重新初始化 matchindexnextindex
  • 当别的角色变为follower时,要设置选举超时 ticker

我是这样组织raft的结构的:

type Raft struct {
    ...

	Requests     chan Request
	RequestsDone chan struct{}
	Reply        chan Reply

	applyCh chan ApplyMsg

	leaderTicker, electionTicker *time.Ticker
}

任期: term

1
2
3
4
type Term struct {
	VoteFor *int
	Term    int
}

因为raft的选举机制要求一个任期内只能为一个candidate投票,所以我将raft的当前任期(currentTerm)的任期数和VoteFor(当前任期为谁投了票,nil 表示未投票)放在同一个结构体内,Term有以下方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func (rf *Raft) IncTerm() {
	rf.currentTerm.VoteFor = nil
	rf.currentTerm.Term++
}

func (rf *Raft) SetVoteFor(voteFor *int) {
	rf.currentTerm.VoteFor = voteFor
}

func (rf *Raft) SetTerm(tm int) {
	rf.currentTerm.VoteFor = nil
	if tm <= rf.currentTerm.Term {
		panic("set equal or lower Term")
	}
	rf.currentTerm.Term = tm
}

每当raft更换任期后,VoteFor 重置为nil

核心代码:

来看看这串raft的核心代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (rf *Raft) run() {
	round := 0  // 为了给 leader 发心跳和 candidate 发投票请求用的,然 raft 接受回复时,判断消息是否过期
	for {
		if rf.killed() {
			time.Sleep(HeartBeat)
			continue
		}
		rf.Infof("\n\n\n\n\n new round")
		round++

// 每次循环做的第一件事情先判断下身份,如果是 leader 则立马发心跳,如果是 candidate 则立马发投票请求
		switch rf.title {
		case Leader:
			rf.doLeader(round)
			if rf.title != Leader {
				continue
			}
		case Candidate:
			rf.doCandidate(round)
			if rf.title != Candidate {
				continue
			}
			grantVote++
		default:
		}

// 如果是 leader 或者 candidate 的话,上面已经发了心跳或投票请求了
// 下来进入漫长的接收请求起
        rf.procEvent(round)
    }
}

这里专门用来处理外面来的请求或者回复

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (rf *Raft) procEvent(round int) {
	var grantVote = 1
	old := rf.title
	for {
		if rf.killed() {
			return
		}
		select {
		case <-rf.leaderTicker.C: //如果 leader 的心跳超时时间到了,则终止事件处理
			return
		case <-rf.electionTicker.C: // 等待选举超时
			rf.change(Candidate)
			return
		case event := <-rf.Requests: // 等待心跳请求,投票请求,还有客户端发来的请求
			switch event.request.(type) {
			case *RequestVoteArgs:
				args := event.request.(*RequestVoteArgs)
				reply := event.response.(*RequestVoteReply)
				rf.handleRequestVote(args, reply)
			case *AppendEntriesArgs:
				args := event.request.(*AppendEntriesArgs)
				reply := event.response.(*AppendEntriesReply)
				rf.handleAppendEntries(args, reply)
			case *StartRequest:
				args := event.request.(*StartRequest)
				reply := event.response.(*StartReply)
				rf.handleStartCommand(args, reply)
			case *GetState:
				reply := event.response.(*GetStateReply)
				rf.handleGetState(reply)
			default:
				panic("unknown request")
			}
			rf.RequestsDone <- struct{}{}

		case reply := <-rf.Reply: // 这里处理 raft 自己受到的响应
			if reply.Round != round {
				break
			}
			switch reply.Content.(type) {
			case AppendEntriesReply:
				reply := reply.Content.(AppendEntriesReply)
				rf.receiveAppendEntries(reply)

			case RequestVoteReply:
				reply := reply.Content.(RequestVoteReply)
				rf.receiveVoteRequest(reply, &grantVote)

			default:
				panic("unknown reply type")
			}
		}
		if rf.title != old { 
			return
		}
	}
}

Leader Election

发起投票请求

raft要是在Electiontimeout内没收到来自leader的请求时,会先让自己的Term加一,然后开启一轮选举请求:
请求是并发进行的。
发起选举请求需要向别的节点告诉自己的任期和识别号(CandidateId)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (rf *Raft) doCandidate(round int) {
	rf.Info("do candidate")
	defer rf.Info("do candidate done")
	rf.IncTerm()
	var tm = rf.me
	rf.currentTerm.VoteFor = &tm

	for peerId := range rf.peers {
		if peerId == rf.me {
			continue
		}

		args := RequestVoteArgs{
			Term:         rf.currentTerm.Term,
			CandidateId:  rf.me,
			LastLogIndex: len(rf.logs),
			LastLogTerm:  rf.logs[len(rf.logs)].Term,
		}
		reply := RequestVoteReply{}

		go func(peerId int) {

			if !rf.sendRequestVote(peerId, &args, &reply) {
				return
			}
			rf.Infof("send RV: %s to %d, get %s", String(args), peerId, String(reply))
			rf.Reply <- Reply{
				Content: reply,
				Round:   round,
			}
		}(peerId)

	}
}

处理投票请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (rf *Raft) handleRequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	rf.Infof("RV: args: %s", String(args))
	defer rf.Infof("RV: reply%s", String(reply))
	reply.Term = rf.currentTerm.Term
	if args.Term < rf.currentTerm.Term {
		reply.Debug = "Term less than me"
		return
	} else if args.Term > rf.currentTerm.Term {
		rf.SetTerm(args.Term)
		if rf.title != Follower {
			rf.change(Follower)
		}
	}
	if err := rf.asUpdateAsMine(args.LastLogIndex, args.LastLogTerm); err != nil {
		reply.Debug = err.Error()
		return
	}
	if rf.currentTerm.VoteFor == nil || *rf.currentTerm.VoteFor == args.CandidateId {
		if len(rf.logs) == 0 {
			reply.VoteGranted = true
			rf.currentTerm.VoteFor = &args.CandidateId
			rf.change(Follower)
		} else if rf.logs[len(rf.logs)].Term < args.LastLogTerm ||
			(rf.logs[len(rf.logs)].Term == args.LastLogTerm &&
				len(rf.logs) <= args.LastLogIndex) {
			reply.VoteGranted = true
			rf.currentTerm.VoteFor = &args.CandidateId
			rf.change(Follower)
		} else {
			reply.Debug = fmt.Sprintf("prev log check failed %d:%#v, %#v | %t (%d==%d?%t) %t", len(rf.logs), String(rf.logs[len(rf.logs)]), String(args),
				rf.logs[len(rf.logs)].Term < args.LastLogTerm,
				rf.logs[len(rf.logs)].Term, args.LastLogTerm,
				rf.logs[len(rf.logs)].Term == args.LastLogTerm,

				len(rf.logs) <= args.LastLogIndex,
			)
		}
	} else {
		reply.Debug = fmt.Sprintf("already voted %d", *rf.currentTerm.VoteFor)
	}
}

处理投票响应

发完请求后,进入procEvent等待别的节点响应请求。
要是别人的任期高于自己,则立即转成Follower角色
只有当集群中超过一半的人同意时,立即成为Leader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
func (rf *Raft) receiveVoteRequest(reply RequestVoteReply, grantVote *int) {
	if reply.Term > rf.currentTerm.Term {
		rf.SetTerm(reply.Term)
		rf.change(Follower)
		return
	}

	if reply.VoteGranted {
		*grantVote++
		if *grantVote*2 > len(rf.peers) {
			rf.change(Leader)
			return
		}
	}
}

HeartBeat

Leader要定时发起心跳, 目的是在集群中树立自己的权威。
如果别的节点没有在Electiontimeout内收到来自Leader的心跳请求时,则发起选举。
如果Entries是空的,则相当于心跳请求,否则是日志复制请求

发起心跳/日志复制请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func (rf *Raft) doLeader(round int) {
	rf.Infof("do leader:%#v", rf.logs)
	{
		var matchindex string
		var nextIndex string
		for _, ni := range rf.nextIndex {
			nextIndex += fmt.Sprintf("%d,", ni)
		}
		for _, mi := range rf.matchIndex {
			matchindex += fmt.Sprintf("%d,", mi)
		}
		rf.Infof("matchIndex %s, nextIndex %s", matchindex, nextIndex)
	}
	for peerId := range rf.peers {
		if peerId == rf.me { 
			continue
		}

		args := AppendEntriesArgs{
			Term:         rf.currentTerm.Term,
			LeaderId:     rf.me,
			PrevLogIndex: rf.nextIndex[peerId] - 1,
			PrevLogTerm:  rf.logs[rf.nextIndex[peerId]-1].Term,
			LeaderCommit: rf.commitedIndex,
			Entries:      make([]Command, 0),
		}
		for ni := rf.nextIndex[peerId]; ni <= len(rf.logs); ni++ {
			args.Entries = append(args.Entries, rf.logs[ni])
		}
		reply := AppendEntriesReply{Id: peerId, NEntries: len(args.Entries)}
		go func(peerId int) { // 异步请求
			if !rf.sendAppendEntries(peerId, &args, &reply) {
				return
			}
			rf.Infof("sent, AE: %s to %d, get %s", String(args), peerId, String(reply))
			rf.Reply <- Reply{
				Content: reply,
				Round:   round,
			}
		}(peerId)

	}
}

处理心跳/日志复制请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (rf *Raft) handleAppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
	rf.Infof("handleAE: got : %s", String(args))
	defer func() {
		rf.Infof("handleAE: reply: %s", String(reply))
	}()

	reply.Term = rf.currentTerm.Term
	if args.Term < rf.currentTerm.Term { // 要是 leader 的任期小于自己的任期,拒绝 AE
		reply.Debug = "Term less than me"
		//rf.Info("handleAE: Term less than me")
		return
	} else if args.Term > rf.currentTerm.Term { // 要是 leader 的任期大于自己的任期,更新自己的任期为最新
		rf.SetTerm(args.Term)
	}
	rf.change(Follower)  // 这时候已经说明当前 leader 是合法的,reset 选举超时时间
	if args.PrevLogIndex > len(rf.logs) { 
		reply.Debug = "Pev log index greater than mine"
		//rf.Infof("prev log index > mine")
		return
	}
	if args.PrevLogIndex != 0 {
		minePrev := rf.logs[args.PrevLogIndex]
		if minePrev.Term != args.PrevLogTerm {
			reply.Debug = fmt.Sprintf("prev Term not equal %d != %d", minePrev.Term, args.PrevLogTerm)
			return
		}
	}

	for i, entry := range args.Entries { // 开始处理 leader 发来的日志,要是 leader 的日志中有一条序号相同但任期不同, 那么用 leader 的日志覆盖自己的日志
		tlog, ok := rf.logs[args.PrevLogIndex+i+1]
		if ok && tlog.Term != entry.Term {
			//if rf.lastApplied >= args.PrevLogIndex+i+1 {
			//}
			logn := len(rf.logs)
			for di := args.PrevLogIndex + i + 1; di <= logn; di++ {
				delete(rf.logs, di)
				rf.Infof("appending: DELETED %d", di)
			}
		}
		rf.logs[args.PrevLogIndex+i+1] = entry
	}
	rf.commitedIndex = min(len(rf.logs), args.LeaderCommit) // 更新自己的 commitIndex
	reply.Success = true // 向 leader 报告 AE 成功
	rf.persist()
	rf.apply()
}

处理心跳/日志复制响应

和处理投票响应一样,当对方的任期大于自己的任期时,立即转成Follower

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (rf *Raft) receiveAppendEntries(reply AppendEntriesReply) {

	if reply.Term > rf.currentTerm.Term { // 要是对方的任期大于自己的任期, 立即转为 follower
		rf.SetTerm(reply.Term)
		rf.change(Follower)
		return
	}
	if !reply.Success { // 要是 AE 没成功,那就减小 nextIndex 再试试
		if rf.nextIndex[reply.Id] > 1 {
			oldidx := rf.nextIndex[reply.Id]
			nidx := rf.nextIndex[reply.Id]
			rf.nextIndex[reply.Id] = 1
			var targetTerm = rf.logs[nidx].Term
			if rf.logs[nidx].Term != rf.logs[nidx-1].Term {
				targetTerm = rf.logs[nidx-1].Term
			}

			for i := 1; i <= nidx-1; i++ {
				if rf.logs[i].Term == targetTerm {
					rf.nextIndex[reply.Id] = i
					break
				}
			}

			rf.Infof("DESC %d next idx from %d -> %d", reply.Id, oldidx, rf.nextIndex[reply.Id])
		}
	} else { // AE 成功了,更新 matchIndex 和 nextIndex
		rf.nextIndex[reply.Id] = rf.nextIndex[reply.Id] + reply.NEntries
		if reply.NEntries == 0 {
			rf.matchIndex[reply.Id] = rf.nextIndex[reply.Id] - 1
		}

	 	var midxs []int // leader 要计算出哪个日志序号是集群中过半节点都复制完成的,以增加 commitIndex
		for pid, mi := range rf.matchIndex {
			if pid == rf.me {
				midxs = append(midxs, len(rf.logs))
			} else {
				midxs = append(midxs, mi)
			}
		}
		sort.Ints(midxs)
		// 0 1 2 3 4
		mid := len(rf.matchIndex) / 2
		if midxs[mid] > rf.commitedIndex {
			old := rf.commitedIndex
			rf.commitedIndex = midxs[mid]
			rf.Infof("upgrade committedIndex %d->%d", old, rf.commitedIndex)
			rf.persist()
			rf.apply()
		}
	}
}

持久化

论文中指出只有 currentTerm,VoteFor 和 logs 是需要持久化的。
follower要在响应请求之前完成持久化
leadercommitIndex增加之后完成持久化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func (rf *Raft) persist() {
	data := rf.raftState()
	rf.persister.SaveRaftState(data)
}

func (rf *Raft) raftState() []byte {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	if err := e.Encode(rf.currentTerm); err != nil {
		rf.Errorf("encode current Term %s", e)
	}
	if err := e.Encode(rf.logs); err != nil {
		rf.Errorf("encode logs %s", err)
	}
	data := w.Bytes()
	return data
}

func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	if err := d.Decode(&rf.currentTerm); err != nil {
		rf.Errorf("read persist %s", err)
	}
	if err := d.Decode(&rf.logs); err != nil {
		rf.Errorf("read persist %s", err)
	}
}

总结

lab2A 还挺简单,到了lab2B就有点难度了。lab2B完成后,lab2c的持久化和网络分区又让我栽了跟头
完成lab2B 真的挺折腾人的,比如我出过这些问题:

  • 没有让节点在收到合法的AppendEntries之后重置选举超时嘀嗒器,使得集群中频繁发生选举。
  • 没有完整的校验Leader发来的日志信息是否比自己更新就给AEsuccess使得集群中的日志出现不一致。
  • 不管是AE还是RV,收到请求后没有及时更新自己的Term,或没有判断对方的Term是否有效就进入逻辑判断

分布式系统出问题真不好排查,为了查问题方便,我将响应leader的错误信息放在了Debug字段里,这样我只关注ledaer打印的处理日志就能排查问题了。

但是这个demo是假定集群成员数量固定不变的,没有包含集群成员变更,真实场景下,集群成员的配置是有可能改变的

参考资料

Share on

EXEC
WRITTEN BY
EXEC
Eval EXEC