This page looks best with JavaScript enabled

MIT-6.824 分布式系统 课后实验 Lab3 A (Fault-tolerant Key/Value Service)

 ·  ☕ 3 min read

lab2已经完成了简单的raft, 在lab3里要完成一个key-value数据库。

数据库服务端应该有这三个接口:

  • Get(key string)
  • Put(key, value string)
  • Append(key, value string) // 如果 key 不存在,那么有等同于 Put

客户端和 Clerk交互, Clerkraft server 交互。

1
2
3
4
5
6
7
type Operation string

const (
	PutOp       Operation = "Put"
	AppendOp    Operation = "Append"
	GetOp       Operation = "Get"
)

Server 提供给 Clerk 的调用方法

先来看看 Clerk 发给 server 的结构体:

  • uuid: 我直接简单的用时间戳(time.Now().UnixNano()) 来标识 Clerk 发来的请求,因为 Clerk 可能会重复请求,这里要求 Server 做到幂等。
  • Operation,key,value,: 操作类型,还有键,值
  • setReply :是setReplyFunc 类型,当 Server 完成请求处理后执行 setReply 之后,请求才算完成。
1
2
3
4
5
6
7
8
type setReplyFunc func(err Err, value string)
type request struct {
	uuid             int64
	argOp            Operation
	argKey, argValue string

	setReply setReplyFunc
}

Get

服务端将Get request 都放进 requestCh 这个管道内

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	done := make(chan struct{})
	r := &request{
		uuid:     args.Uuid,
		argOp:    GetOp,
		argKey:   args.Key,
		argValue: "",

		setReply: func(err Err, value string) {
			reply.Err = err
			reply.Value = value
			done <- struct{}{}
			kv.Infof("!!!!!!!!!!!!get %#v", reply)
		},
	}
	kv.Infof("get request %#v", args)
	kv.requestCh <- r
	<-done
	kv.Infof("sent request %#v", args)
}

PutAppend

服务端处理PutAppend和处理Get一样,都放进requestch 管道内

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	done := make(chan struct{})
	r := &request{
		uuid:     args.Uuid,
		argOp:    args.Op,
		argKey:   args.Key,
		argValue: args.Value,
		setReply: func(err Err, value string) {
			reply.Err = err
			done <- struct{}{}
			kv.Infof("!!!!!!!!!!!!get %#v", reply)
		},
	}

	kv.Infof("get request %#v, len chan %d", args, len(kv.requestCh))
	kv.requestCh <- r
	<-done
	kv.Infof("sent request %#v", args)
}

服务端运行

startAgreement()

Clerk请求Server后,Server要向集群中的所有节点复制日志(如果该serverleader的话)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func (kv *KVServer) startAgreement(uuid int64, op Operation, key, value string) error {
	if kv.killed() {
		kv.Info("KILLED")
		return errors.New(ErrWrongLeader)
	}
	command := Op{
		Uuid:      uuid,
		Operation: op,
		Key:       key,
		Value:     value,
	}
	if expi, expt, ok := kv.rf.Start( // 先看看当前 server 是不是 leader?
		command); !ok {
		return errors.New(ErrWrongLeader)
	} else { // 当前 server 是 leader,那等两个心跳,在理想情况下,这条日志就被整个集群复制了
		kv.Infof("START AGREE: to %d; %d:%s,[\"%s\",\"%s\"]", expi, uuid, op, key, value)
		time.Sleep(raft.HeartBeat * 2)
		now := time.Now()
		for time.Since(now) < time.Second*10 {
			if cmi, ct, ok := kv.rf.Start(raft.CmdGetIdx{}); ok {
				if expt != ct { // 任期数变了? 说明这个 leader 落后了,并且接收到了别的 candidate/leader 的请求了,
					kv.Infof("term changed")
					return errors.New(ErrWrongLeader)
				} else if cmi == expi { // commitIndex 增加了到了期望的值,说明日志复制成功
					kv.Infof("AGREED")
					return nil
				}else{ // 都等了俩个心跳了,集群中过半的节点还没成功地复制日志?那在等等吧,如果等了 10 秒还没成功,大概率是 raft 集群不可用了
					kv.Infof("wait reach cmi")
				}
			} else {
				kv.Infof("not continue as leader")
				return errors.New(ErrWrongLeader)
			}
			time.Sleep(raft.HeartBeat)
		}
		kv.Infof("wait timeout")
		return errors.New(ErrWrongLeader)
	}
}

处理 Clerk 发来的请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
	kv.store = make(map[string]string)
	kv.history = make(map[int64]bool)
	kv.requestCh = make(chan *request)

	var ret response
	go func() {
		for applyMsg := range kv.applyCh {
			ret = kv.handleApplyMsg(applyMsg)
		}
	}()

	go func() {
		for request := range kv.requestCh { //在这里监听来自 Clerk 的请求,并尝试让集群复制这条请求。
			if err := kv.startAgreement(request.uuid, request.argOp, request.argKey, request.argValue); err != nil {
				kv.Infof("send err wrong leader") // 共识失败
				request.setReply(ErrWrongLeader, "")
			} else { // 如果集群一致通过了这个请求,会从 ApplyCh 里得到刚刚的 request,再将 request 的处理结果通过 setReply()返回给 Clerk
				request.setReply(ret.err, ret.value)
			}
		}
	}()

	return kv
}

监听 raft 的 applyMsg

如果从 applyMsg 里得到了命令,就说明这个命令被集群中超过一半的节点成功复制了
根据请求里的 uuid 判断命令是否重复,执行命令。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (kv *KVServer) handleApplyMsg(applyMsg raft.ApplyMsg) (ret response) {
	if op, ok := applyMsg.Command.(Op); ok {
		ret.err = OK
		switch op.Operation {
		case GetOp:
			var ok bool
			ret.value, ok = kv.store[op.Key]
			if !ok {
				ret.err = ErrNoKey
			}
		case PutOp:
			if _, ok := kv.history[op.Uuid]; !ok {
				kv.store[op.Key] = op.Value
				kv.history[op.Uuid] = true
			}
		case AppendOp:
			if _, ok := kv.history[op.Uuid]; !ok {
				kv.store[op.Key] += op.Value
				kv.history[op.Uuid] = true
			}
		default:
			panic("unknown operation")
		}
	}
	return ret
}
Share on

EXEC
WRITTEN BY
EXEC
Eval EXEC