在lab2
已经完成了简单的raft
, 在lab3
里要完成一个key-value
数据库。
数据库服务端应该有这三个接口:
Get(key string)
Put(key, value string)
Append(key, value string)
// 如果 key 不存在,那么有等同于 Put
客户端和 Clerk
交互, Clerk
和raft server
交互。
1
2
3
4
5
6
7
|
type Operation string
const (
PutOp Operation = "Put"
AppendOp Operation = "Append"
GetOp Operation = "Get"
)
|
Server 提供给 Clerk 的调用方法
先来看看 Clerk 发给 server 的结构体:
uuid
: 我直接简单的用时间戳(time.Now().UnixNano()
) 来标识 Clerk 发来的请求,因为 Clerk 可能会重复请求,这里要求 Server 做到幂等。
Operation
,key
,value
,: 操作类型,还有键,值
setReply
:是setReplyFunc
类型,当 Server 完成请求处理后执行 setReply 之后,请求才算完成。
1
2
3
4
5
6
7
8
|
type setReplyFunc func(err Err, value string)
type request struct {
uuid int64
argOp Operation
argKey, argValue string
setReply setReplyFunc
}
|
Get
服务端将Get request
都放进 requestCh
这个管道内
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
done := make(chan struct{})
r := &request{
uuid: args.Uuid,
argOp: GetOp,
argKey: args.Key,
argValue: "",
setReply: func(err Err, value string) {
reply.Err = err
reply.Value = value
done <- struct{}{}
kv.Infof("!!!!!!!!!!!!get %#v", reply)
},
}
kv.Infof("get request %#v", args)
kv.requestCh <- r
<-done
kv.Infof("sent request %#v", args)
}
|
PutAppend
服务端处理PutAppend
和处理Get
一样,都放进requestch
管道内
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
done := make(chan struct{})
r := &request{
uuid: args.Uuid,
argOp: args.Op,
argKey: args.Key,
argValue: args.Value,
setReply: func(err Err, value string) {
reply.Err = err
done <- struct{}{}
kv.Infof("!!!!!!!!!!!!get %#v", reply)
},
}
kv.Infof("get request %#v, len chan %d", args, len(kv.requestCh))
kv.requestCh <- r
<-done
kv.Infof("sent request %#v", args)
}
|
服务端运行
startAgreement()
Clerk
请求Server
后,Server
要向集群中的所有节点复制日志(如果该server
是leader
的话)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
func (kv *KVServer) startAgreement(uuid int64, op Operation, key, value string) error {
if kv.killed() {
kv.Info("KILLED")
return errors.New(ErrWrongLeader)
}
command := Op{
Uuid: uuid,
Operation: op,
Key: key,
Value: value,
}
if expi, expt, ok := kv.rf.Start( // 先看看当前 server 是不是 leader?
command); !ok {
return errors.New(ErrWrongLeader)
} else { // 当前 server 是 leader,那等两个心跳,在理想情况下,这条日志就被整个集群复制了
kv.Infof("START AGREE: to %d; %d:%s,[\"%s\",\"%s\"]", expi, uuid, op, key, value)
time.Sleep(raft.HeartBeat * 2)
now := time.Now()
for time.Since(now) < time.Second*10 {
if cmi, ct, ok := kv.rf.Start(raft.CmdGetIdx{}); ok {
if expt != ct { // 任期数变了? 说明这个 leader 落后了,并且接收到了别的 candidate/leader 的请求了,
kv.Infof("term changed")
return errors.New(ErrWrongLeader)
} else if cmi == expi { // commitIndex 增加了到了期望的值,说明日志复制成功
kv.Infof("AGREED")
return nil
}else{ // 都等了俩个心跳了,集群中过半的节点还没成功地复制日志?那在等等吧,如果等了 10 秒还没成功,大概率是 raft 集群不可用了
kv.Infof("wait reach cmi")
}
} else {
kv.Infof("not continue as leader")
return errors.New(ErrWrongLeader)
}
time.Sleep(raft.HeartBeat)
}
kv.Infof("wait timeout")
return errors.New(ErrWrongLeader)
}
}
|
处理 Clerk 发来的请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {
kv.store = make(map[string]string)
kv.history = make(map[int64]bool)
kv.requestCh = make(chan *request)
var ret response
go func() {
for applyMsg := range kv.applyCh {
ret = kv.handleApplyMsg(applyMsg)
}
}()
go func() {
for request := range kv.requestCh { //在这里监听来自 Clerk 的请求,并尝试让集群复制这条请求。
if err := kv.startAgreement(request.uuid, request.argOp, request.argKey, request.argValue); err != nil {
kv.Infof("send err wrong leader") // 共识失败
request.setReply(ErrWrongLeader, "")
} else { // 如果集群一致通过了这个请求,会从 ApplyCh 里得到刚刚的 request,再将 request 的处理结果通过 setReply()返回给 Clerk
request.setReply(ret.err, ret.value)
}
}
}()
return kv
}
|
监听 raft 的 applyMsg
如果从 applyMsg 里得到了命令,就说明这个命令被集群中超过一半的节点成功复制了
根据请求里的 uuid 判断命令是否重复,执行命令。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
func (kv *KVServer) handleApplyMsg(applyMsg raft.ApplyMsg) (ret response) {
if op, ok := applyMsg.Command.(Op); ok {
ret.err = OK
switch op.Operation {
case GetOp:
var ok bool
ret.value, ok = kv.store[op.Key]
if !ok {
ret.err = ErrNoKey
}
case PutOp:
if _, ok := kv.history[op.Uuid]; !ok {
kv.store[op.Key] = op.Value
kv.history[op.Uuid] = true
}
case AppendOp:
if _, ok := kv.history[op.Uuid]; !ok {
kv.store[op.Key] += op.Value
kv.history[op.Uuid] = true
}
default:
panic("unknown operation")
}
}
return ret
}
|